pub struct DomainParticipant { /* private fields */ }Expand description
Der Participant.
Implementations§
Source§impl DomainParticipant
impl DomainParticipant
Sourcepub fn runtime(&self) -> Option<&Arc<DcpsRuntime>>
pub fn runtime(&self) -> Option<&Arc<DcpsRuntime>>
Interner Zugriff auf die Runtime — von Publisher/Subscriber
verwendet, um DataWriter/Reader anzulegen. None wenn der
Participant im offline-Modus ist.
Sourcepub fn qos(&self) -> DomainParticipantQos
pub fn qos(&self) -> DomainParticipantQos
Liefert eine Kopie der DomainParticipantQos (Spec §2.2.2.2.1.4
get_qos).
Sourcepub fn set_qos(&self, qos: DomainParticipantQos) -> Result<()>
pub fn set_qos(&self, qos: DomainParticipantQos) -> Result<()>
Setzt die DomainParticipantQos (Spec §2.2.2.2.1.3 set_qos).
§Errors
Aktuell keine — die Methode liefert Ok(()) immer. Spec laesst
IMMUTABLE_POLICY zu, was wir aber nicht aktiv produzieren
(alle Policies sind im RC1 mutable).
Sourcepub fn register_builtin_types(&self)
pub fn register_builtin_types(&self)
Registriert die 4 Spec-§7.6.5-Built-in-Types
(DDS::String, DDS::KeyedString, DDS::Bytes, DDS::KeyedBytes)
im lokalen TypeRegistry. Idempotent — doppelter Aufruf ueber-
schreibt die Eintraege deterministisch.
Wird automatisch aus new()/new_with_runtime() aufgerufen,
kann aber auch nach einem unregister_builtin_types()-Disable
erneut aufgerufen werden.
Sourcepub fn unregister_builtin_types(&self)
pub fn unregister_builtin_types(&self)
Loescht alle registrierten Built-in-Types. Wird heute nicht von Default-Pfaden gerufen — Test-Hilfsfunktion fuer Disable-Flag-Tests.
Sourcepub fn find_builtin_type(&self, name: &str) -> Option<DynamicType>
pub fn find_builtin_type(&self, name: &str) -> Option<DynamicType>
Lookup eines Built-in-Types via Spec-Name (Spec §7.6.5).
Gibt Some(DynamicType) zurueck wenn der Name bekannt ist
(registriert via register_builtin_types).
Sourcepub fn registered_type_count(&self) -> usize
pub fn registered_type_count(&self) -> usize
Anzahl registrierter Built-in-Types. Nach new() == 4.
Sourcepub fn enqueue_type_lookup(&self, hash: EquivalenceHash) -> bool
pub fn enqueue_type_lookup(&self, hash: EquivalenceHash) -> bool
Versucht einen TypeLookup-Request fuer einen unbekannten
EquivalenceHash zu queuen. Beachtet Backoff (5s zwischen
Versuchen) und maximal 3 Wiederholungen pro Hash.
Returns: true wenn der Request gequeued wurde, false bei
Backoff-Suppression oder Max-Attempts.
Sourcepub fn drain_type_lookup_requests(&self) -> Vec<(EquivalenceHash, u64)>
pub fn drain_type_lookup_requests(&self) -> Vec<(EquivalenceHash, u64)>
Drainet die queued TypeLookup-Requests. Liefert
Vec<(hash, seq)>. In Production-Umgebung wuerde der Caller
die Hashes via TypeLookupClient + Reliable-Writer auf den
TL_SVC_REQ_WRITER-Endpoint senden.
Sourcepub fn ingest_type_lookup_reply(
&self,
types: Vec<(EquivalenceHash, MinimalTypeObject)>,
) -> usize
pub fn ingest_type_lookup_reply( &self, types: Vec<(EquivalenceHash, MinimalTypeObject)>, ) -> usize
Empfaengt ein TypeLookup-Reply (TypeObjects pro Hash). Registriert die TypeObjects in einem internen TypeRegistry- Spiegel — danach kann ein gestoppter QoS-Match retried werden.
Anzahl erfolgreich registrierter Typen wird zurueckgegeben.
Sourcepub fn on_remote_publication_discovered(
&self,
type_information_blob: Option<&[u8]>,
) -> usize
pub fn on_remote_publication_discovered( &self, type_information_blob: Option<&[u8]>, ) -> usize
SEDP-Discovery-Hook: prueft eine eingehende
PublicationBuiltinTopicData auf Type-Hashes, die lokal nicht
aufloesbar sind. Bei Bedarf wird ein TypeLookup-Request via
enqueue_type_lookup gequeued.
Der RPC-Pfad ist via DcpsRuntime::send_type_lookup_request
auf den TL_SVC_REQ_*-Endpoints (XTypes 1.3 §7.6.3.3.4) live;
diese Methode entscheidet pro Hash, ob ein Re-Request lohnt
(lokale Registry-Lookup + Backoff-Tracking).
Returns: Anzahl gequeued unbekannter Hashes (max 2 — minimal + complete).
Sourcepub fn on_remote_subscription_discovered(
&self,
type_information_blob: Option<&[u8]>,
) -> usize
pub fn on_remote_subscription_discovered( &self, type_information_blob: Option<&[u8]>, ) -> usize
SEDP-Discovery-Hook fuer
SubscriptionBuiltinTopicData. Symmetrisch zu
on_remote_publication_discovered.
Sourcepub fn type_lookup_exhausted(&self, hash: EquivalenceHash) -> bool
pub fn type_lookup_exhausted(&self, hash: EquivalenceHash) -> bool
True wenn fuer den Hash bereits MAX_ATTEMPTS erreicht. Wird vom Match-Re-Try-Pfad konsultiert: spaeter aufgeben statt endlos zu pollen.
Sourcepub fn create_topic<T: DdsType>(
&self,
name: &str,
qos: TopicQos,
) -> Result<Topic<T>>
pub fn create_topic<T: DdsType>( &self, name: &str, qos: TopicQos, ) -> Result<Topic<T>>
Erzeugt einen typed Topic-Handle. Wiederholte Aufrufe mit gleichem Namen + Typ liefern denselben Handle (Ref-geteilt).
§Errors
InconsistentPolicywenn ein Topic mit diesem Namen bereits unter anderem Typ registriert ist.BadParameterbei leerem Namen.
Examples found in repository?
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28 let factory = DomainParticipantFactory::instance();
29 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32 let subscriber = participant.create_subscriber(SubscriberQos::default());
33 let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35 println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37 loop {
38 match reader.wait_for_data(Duration::from_secs(1)) {
39 Ok(()) => {
40 for sample in reader.take()? {
41 match std::str::from_utf8(&sample.data) {
42 Ok(s) => println!(" <- {s}"),
43 Err(_) => println!(" <- <{} bytes of non-UTF8>", sample.data.len()),
44 }
45 }
46 }
47 Err(DdsError::Timeout) => {} // idle tick
48 Err(e) => return Err(e.into()),
49 }
50 }
51}More examples
38fn main() -> Result<(), Box<dyn std::error::Error>> {
39 let factory = DomainParticipantFactory::instance();
40 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
41
42 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
43 let publisher = participant.create_publisher(PublisherQos::default());
44 let writer = publisher.create_datawriter::<RawBytes>(&topic, DataWriterQos::default())?;
45
46 println!("hello_dds_publisher: sending on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
47
48 // Ein paar Sekunden warten, damit SPDP+SEDP Subscriber entdecken kann.
49 // Ohne Warte-Phase gehen die ersten Samples ins Leere (noch kein Reader
50 // gematched). v1.3 bringt `wait_for_matched_subscription`.
51 thread::sleep(Duration::from_secs(3));
52
53 let mut counter: u32 = 0;
54 loop {
55 let payload = format!("hello #{counter}");
56 writer.write(&RawBytes::new(payload.clone().into_bytes()))?;
57 println!(" -> {payload}");
58 counter = counter.wrapping_add(1);
59 thread::sleep(Duration::from_secs(1));
60 }
61}18fn main() -> Result<(), Box<dyn std::error::Error>> {
19 let args: Vec<String> = env::args().collect();
20 let topic_name = args.get(1).map_or("Square", String::as_str);
21 let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23 let factory = DomainParticipantFactory::instance();
24 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26 let subscriber = participant.create_subscriber(SubscriberQos::default());
27 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29 println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31 loop {
32 match reader.wait_for_data(Duration::from_secs(1)) {
33 Ok(()) => {
34 for sample in reader.take()? {
35 println!(
36 " <- color={:8} x={:4} y={:4} size={}",
37 sample.color, sample.x, sample.y, sample.shapesize
38 );
39 }
40 }
41 Err(DdsError::Timeout) => {}
42 Err(e) => return Err(e.into()),
43 }
44 }
45}45fn run_pub(size: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
46 let factory = DomainParticipantFactory::instance();
47 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
48 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
49 let publisher = participant.create_publisher(PublisherQos::default());
50 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
51
52 // Discovery-Wait
53 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
54
55 let payload_color = "X".repeat(size.saturating_sub(4 * 4)); // size minus i32-Felder approx
56 let start = Instant::now();
57 let mut total = 0u64;
58 let mut last_report = start;
59 let mut samples_since_report = 0u64;
60
61 while start.elapsed() < runtime {
62 let s = ShapeType::new(payload_color.clone(), 0, 0, total as i32);
63 if writer.write(&s).is_ok() {
64 total += 1;
65 samples_since_report += 1;
66 }
67 // Report jede Sekunde
68 if last_report.elapsed() >= Duration::from_secs(1) {
69 let elapsed_s = last_report.elapsed().as_secs_f64();
70 let rate_ks = (samples_since_report as f64 / elapsed_s) / 1000.0;
71 let rate_mb =
72 (samples_since_report as f64 * size as f64 * 8.0 / elapsed_s) / 1_000_000.0;
73 println!(
74 "{:.3} size {} total {} rate {:.2} kS/s {:.2} Mb/s",
75 start.elapsed().as_secs_f64(),
76 size,
77 total,
78 rate_ks,
79 rate_mb
80 );
81 last_report = Instant::now();
82 samples_since_report = 0;
83 }
84 }
85 println!(
86 "# pub-done: total={total} runtime={:.3}s",
87 start.elapsed().as_secs_f64()
88 );
89 Ok(())
90}
91
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93 let factory = DomainParticipantFactory::instance();
94 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96 let subscriber = participant.create_subscriber(SubscriberQos::default());
97 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99 let total = Arc::new(AtomicU64::new(0));
100 let start = Instant::now();
101 let mut last_report = start;
102 let mut last_total = 0u64;
103
104 while start.elapsed() < runtime {
105 if let Ok(samples) = reader.take() {
106 for _ in samples {
107 total.fetch_add(1, Ordering::Relaxed);
108 }
109 }
110 if last_report.elapsed() >= Duration::from_secs(1) {
111 let now_total = total.load(Ordering::Relaxed);
112 let delta = now_total - last_total;
113 let elapsed_s = last_report.elapsed().as_secs_f64();
114 let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115 println!(
116 "{:.3} size N total {} delta {} rate {:.2} kS/s",
117 start.elapsed().as_secs_f64(),
118 now_total,
119 delta,
120 rate_ks
121 );
122 last_report = Instant::now();
123 last_total = now_total;
124 }
125 std::thread::sleep(Duration::from_millis(1));
126 }
127 println!(
128 "# sub-done: total={} runtime={:.3}s",
129 total.load(Ordering::Relaxed),
130 start.elapsed().as_secs_f64()
131 );
132 Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136 // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137 // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138 // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139 // ist der PINGER.
140 let factory = DomainParticipantFactory::instance();
141 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144 let publisher = participant.create_publisher(PublisherQos::default());
145 let subscriber = participant.create_subscriber(SubscriberQos::default());
146 let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147 let reader =
148 subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153 let mut rtts_us: Vec<u64> = Vec::new();
154 let start = Instant::now();
155 let mut seq = 0i32;
156 while start.elapsed() < runtime {
157 let send_us = now_micros();
158 // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159 // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160 // RTT-Berechnung passiert lokal via timestamp-storage.
161 let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162 seq = seq.wrapping_add(1);
163
164 // Wartet bis 50ms auf Pong
165 let deadline = Instant::now() + Duration::from_millis(50);
166 while Instant::now() < deadline {
167 if let Ok(samples) = reader.take() {
168 for s in samples {
169 if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170 let recv_us = now_micros();
171 rtts_us.push(recv_us - send_us);
172 break;
173 }
174 }
175 if !rtts_us.is_empty() && rtts_us.last().is_some() {
176 break;
177 }
178 }
179 std::thread::sleep(Duration::from_micros(100));
180 }
181 std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182 }
183
184 if rtts_us.is_empty() {
185 println!("# pingpong: no RTTs collected — pong missing?");
186 return Ok(());
187 }
188 rtts_us.sort_unstable();
189 let n = rtts_us.len();
190 let mean = rtts_us.iter().sum::<u64>() / n as u64;
191 let p50 = rtts_us[n / 2];
192 let p90 = rtts_us[(n * 9) / 10];
193 let p99 = rtts_us[(n * 99) / 100];
194 println!(
195 "{:.3} rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196 start.elapsed().as_secs_f64(),
197 mean,
198 rtts_us.first().copied().unwrap_or(0),
199 p50,
200 p90,
201 p99,
202 rtts_us.last().copied().unwrap_or(0),
203 n
204 );
205 Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209 // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210 let factory = DomainParticipantFactory::instance();
211 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214 let publisher = participant.create_publisher(PublisherQos::default());
215 let subscriber = participant.create_subscriber(SubscriberQos::default());
216 let reader =
217 subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218 let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222 let start = Instant::now();
223 let mut echoed = 0u64;
224 while start.elapsed() < runtime {
225 if let Ok(samples) = reader.take() {
226 for s in samples {
227 if s.color == "PING" {
228 let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229 echoed += 1;
230 }
231 }
232 }
233 std::thread::sleep(Duration::from_micros(100));
234 }
235 println!(
236 "# pong-done: echoed={echoed} runtime={:.3}s",
237 start.elapsed().as_secs_f64()
238 );
239 Ok(())
240}28fn main() -> Result<(), Box<dyn std::error::Error>> {
29 let args: Vec<String> = env::args().collect();
30 let topic_name = args.get(1).map_or("Square", String::as_str);
31 let color = args.get(2).map_or("BLUE", String::as_str);
32 let domain_id: i32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(0);
33
34 if !["Square", "Circle", "Triangle"].contains(&topic_name) {
35 eprintln!(
36 "warning: topic '{topic_name}' ist kein Standard-ShapesDemo-Topic (Square/Circle/Triangle)",
37 );
38 }
39
40 let factory = DomainParticipantFactory::instance();
41 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
42 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
43 let publisher = participant.create_publisher(PublisherQos::default());
44 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
45
46 println!(
47 "shapes_demo_publisher: Topic={topic_name} Color={color} Domain={domain_id} — Ctrl-C to stop"
48 );
49
50 // Discovery-Wait — bis mindestens 1 Subscriber gematched hat.
51 let matched = writer.wait_for_matched_subscription(1, Duration::from_secs(10));
52 match matched {
53 Ok(()) => println!("matched subscriber found, starting publication"),
54 Err(_) => println!("no subscriber in 10s — publishing anyway, samples may drop"),
55 }
56
57 // Sinus-Bewegung auf 240x270-Canvas (ShapesDemo-Default).
58 let shapesize = 30;
59 let mut t: f32 = 0.0;
60 loop {
61 let x = (120.0 + 80.0 * (t).sin()) as i32;
62 let y = (135.0 + 90.0 * (t * 1.3).cos()) as i32;
63 let sample = ShapeType::new(color, x, y, shapesize);
64 writer.write(&sample)?;
65 println!(" -> color={color} x={x} y={y} size={shapesize}");
66 t += 0.15;
67 thread::sleep(Duration::from_millis(100));
68 }
69}36fn run_pub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
37 let factory = DomainParticipantFactory::instance();
38 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
39 let publisher = participant.create_publisher(PublisherQos::default());
40
41 let mut writers = Vec::with_capacity(count);
42 println!("[multi_pub] creating {count} writers");
43 let create_start = Instant::now();
44 for i in 0..count {
45 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
46 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
47 writers.push(writer);
48 }
49 println!(
50 "[multi_pub] created {count} writers in {:.2}s",
51 create_start.elapsed().as_secs_f64()
52 );
53
54 let start = Instant::now();
55 let mut tick = 0u64;
56 let mut total_writes = 0u64;
57 while start.elapsed() < runtime {
58 let tick_start = Instant::now();
59 for (i, w) in writers.iter().enumerate() {
60 let s = ShapeType::new(format!("MEP{i:04}"), tick as i32, 0, 30);
61 if w.write(&s).is_ok() {
62 total_writes += 1;
63 }
64 }
65 tick += 1;
66 // 1 Hz pro Writer — schlaeft bis zur naechsten Sekunde
67 if tick % 60 == 0 {
68 println!(
69 "{:.3} multi_pub tick={} writers={} total_writes={} loop_us={}",
70 start.elapsed().as_secs_f64(),
71 tick,
72 count,
73 total_writes,
74 tick_start.elapsed().as_micros()
75 );
76 }
77 let elapsed = tick_start.elapsed();
78 if elapsed < Duration::from_secs(1) {
79 std::thread::sleep(Duration::from_secs(1) - elapsed);
80 }
81 }
82 println!(
83 "# multi_pub-done: writers={count} total_writes={} runtime={:.3}s",
84 total_writes,
85 start.elapsed().as_secs_f64()
86 );
87 Ok(())
88}
89
90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91 let factory = DomainParticipantFactory::instance();
92 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93 let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95 let mut readers = Vec::with_capacity(count);
96 println!("[multi_sub] creating {count} readers");
97 let create_start = Instant::now();
98 for i in 0..count {
99 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101 readers.push(reader);
102 }
103 println!(
104 "[multi_sub] created {count} readers in {:.2}s",
105 create_start.elapsed().as_secs_f64()
106 );
107
108 let start = Instant::now();
109 let mut total_received = 0u64;
110 let mut last_report = start;
111 while start.elapsed() < runtime {
112 for r in &readers {
113 if let Ok(samples) = r.take() {
114 total_received += samples.len() as u64;
115 }
116 }
117 if last_report.elapsed() >= Duration::from_secs(60) {
118 println!(
119 "{:.3} multi_sub readers={} total_received={}",
120 start.elapsed().as_secs_f64(),
121 count,
122 total_received
123 );
124 last_report = Instant::now();
125 }
126 std::thread::sleep(Duration::from_millis(10));
127 }
128 println!(
129 "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130 total_received,
131 start.elapsed().as_secs_f64()
132 );
133 Ok(())
134}Sourcepub fn lookup_topicdescription(
&self,
name: &str,
) -> Option<TopicDescriptionHandle>
pub fn lookup_topicdescription( &self, name: &str, ) -> Option<TopicDescriptionHandle>
Sofortiger lokaler Lookup eines Topics nach Name — gibt None
zurueck, wenn kein lokales create_topic mit diesem Namen
erfolgt ist. Macht keinen Discovery-Wait (das ist
find_topic). Spec-Referenz: OMG DDS 1.4 §2.2.2.2.1.12
“lookup_topicdescription”.
Sourcepub fn find_topic(
&self,
name: &str,
timeout: Duration,
) -> Result<TopicDescriptionHandle>
pub fn find_topic( &self, name: &str, timeout: Duration, ) -> Result<TopicDescriptionHandle>
Wartet bis ein Topic mit dem gegebenen Namen via Discovery
(SEDP-Publication oder -Subscription) sichtbar ist — oder bis
timeout abgelaufen ist. Spec-Referenz: OMG DDS 1.4
§2.2.2.2.1.11 find_topic.
Returns:
Ok(handle)mit Name + Type-Name + Participant, falls waehrendtimeoutein passendes SEDP-Endpoint sichtbar wurde. Lokale Topics zaehlen ebenfalls (keine Pflicht zu warten, wenncreate_topicschon lief).Err(Timeout)wenntimeoutabgelaufen ist.
§Errors
DdsError::Timeoutwenntimeoutohne Discovery-Match abgelaufen ist.DdsError::BadParameterbei leerem Namen.
Sourcepub fn create_contentfilteredtopic<T: DdsType>(
&self,
name: &str,
related_topic: &Topic<T>,
filter_expression: &str,
filter_parameters: Vec<String>,
) -> Result<ContentFilteredTopic<T>>
pub fn create_contentfilteredtopic<T: DdsType>( &self, name: &str, related_topic: &Topic<T>, filter_expression: &str, filter_parameters: Vec<String>, ) -> Result<ContentFilteredTopic<T>>
Erzeugt ein ContentFilteredTopic als Subset eines bereits
vorhandenen Topic<T>. Spec-Referenz: OMG DDS 1.4
§2.2.2.2.1.13 create_contentfilteredtopic.
Die filter_expression ist ein SQL-Subset (siehe Annex B).
filter_parameters sind Strings, die %0, %1, … in der
Expression ersetzen.
§Errors
BadParameterbei leerem Namen oder leerer Expression.BadParameterwenn die Filter-Expression nicht parst.BadParameterwenn ein referenzierter%N-Parameter nicht imfilter_parameters-Vec geliefert wird.
Sourcepub fn create_multitopic<T: DdsType>(
&self,
name: &str,
type_name: &str,
related_topic_names: Vec<String>,
subscription_expression: &str,
expression_parameters: Vec<String>,
) -> Result<MultiTopic<T>>
pub fn create_multitopic<T: DdsType>( &self, name: &str, type_name: &str, related_topic_names: Vec<String>, subscription_expression: &str, expression_parameters: Vec<String>, ) -> Result<MultiTopic<T>>
Erzeugt eine MultiTopic als kombinierende TopicDescription
ueber 1+ Underlying-Topics mit SQL-Subscription-Expression.
Spec-Referenz: OMG DDS 1.4 §2.2.2.2.1.15 create_multitopic
(optionales Spec-Feature).
§Errors
BadParameterbei leerem Namen oder Type-Namen.BadParameterwennrelated_topic_namesleer ist.BadParameterwenn die Subscription-Expression nicht parst.BadParameterwenn ein referenzierter%N-Parameter nicht imexpression_parameters-Vec geliefert wird.
Sourcepub fn delete_multitopic<T: DdsType>(&self, mt: &MultiTopic<T>) -> Result<()>
pub fn delete_multitopic<T: DdsType>(&self, mt: &MultiTopic<T>) -> Result<()>
Loescht eine MultiTopic. Spec §2.2.2.2.1.16
delete_multitopic. v1.2 ist es ein no-op-shim mit Participant-
Match-Check.
§Errors
BadParameter wenn die MultiTopic zu einem anderen Participant
gehoert.
Sourcepub fn delete_contentfilteredtopic<T: DdsType>(
&self,
cft: &ContentFilteredTopic<T>,
) -> Result<()>
pub fn delete_contentfilteredtopic<T: DdsType>( &self, cft: &ContentFilteredTopic<T>, ) -> Result<()>
Loescht ein ContentFilteredTopic. Spec-Referenz:
§2.2.2.2.1.14 delete_contentfilteredtopic.
In Rust ist das Lifetime-Handle des CFT bereits durch Drop
abgedeckt — die zugrundeliegenden Ressourcen werden frei, sobald
der ContentFilteredTopic<T> aus dem Scope geht. Diese Methode
existiert fuer Spec-Compliance der C++-API und validiert den
Participant-Match (Spec verlangt BadParameter, wenn das CFT
zu einem anderen Participant gehoert).
§Errors
BadParameterwenn das CFT zu einem anderen Participant gehoert.
Sourcepub fn create_publisher(&self, qos: PublisherQos) -> Publisher
pub fn create_publisher(&self, qos: PublisherQos) -> Publisher
Erzeugt einen Publisher mit gegebener QoS (Default reicht fuer v1.2).
Examples found in repository?
38fn main() -> Result<(), Box<dyn std::error::Error>> {
39 let factory = DomainParticipantFactory::instance();
40 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
41
42 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
43 let publisher = participant.create_publisher(PublisherQos::default());
44 let writer = publisher.create_datawriter::<RawBytes>(&topic, DataWriterQos::default())?;
45
46 println!("hello_dds_publisher: sending on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
47
48 // Ein paar Sekunden warten, damit SPDP+SEDP Subscriber entdecken kann.
49 // Ohne Warte-Phase gehen die ersten Samples ins Leere (noch kein Reader
50 // gematched). v1.3 bringt `wait_for_matched_subscription`.
51 thread::sleep(Duration::from_secs(3));
52
53 let mut counter: u32 = 0;
54 loop {
55 let payload = format!("hello #{counter}");
56 writer.write(&RawBytes::new(payload.clone().into_bytes()))?;
57 println!(" -> {payload}");
58 counter = counter.wrapping_add(1);
59 thread::sleep(Duration::from_secs(1));
60 }
61}More examples
45fn run_pub(size: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
46 let factory = DomainParticipantFactory::instance();
47 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
48 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
49 let publisher = participant.create_publisher(PublisherQos::default());
50 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
51
52 // Discovery-Wait
53 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
54
55 let payload_color = "X".repeat(size.saturating_sub(4 * 4)); // size minus i32-Felder approx
56 let start = Instant::now();
57 let mut total = 0u64;
58 let mut last_report = start;
59 let mut samples_since_report = 0u64;
60
61 while start.elapsed() < runtime {
62 let s = ShapeType::new(payload_color.clone(), 0, 0, total as i32);
63 if writer.write(&s).is_ok() {
64 total += 1;
65 samples_since_report += 1;
66 }
67 // Report jede Sekunde
68 if last_report.elapsed() >= Duration::from_secs(1) {
69 let elapsed_s = last_report.elapsed().as_secs_f64();
70 let rate_ks = (samples_since_report as f64 / elapsed_s) / 1000.0;
71 let rate_mb =
72 (samples_since_report as f64 * size as f64 * 8.0 / elapsed_s) / 1_000_000.0;
73 println!(
74 "{:.3} size {} total {} rate {:.2} kS/s {:.2} Mb/s",
75 start.elapsed().as_secs_f64(),
76 size,
77 total,
78 rate_ks,
79 rate_mb
80 );
81 last_report = Instant::now();
82 samples_since_report = 0;
83 }
84 }
85 println!(
86 "# pub-done: total={total} runtime={:.3}s",
87 start.elapsed().as_secs_f64()
88 );
89 Ok(())
90}
91
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93 let factory = DomainParticipantFactory::instance();
94 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96 let subscriber = participant.create_subscriber(SubscriberQos::default());
97 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99 let total = Arc::new(AtomicU64::new(0));
100 let start = Instant::now();
101 let mut last_report = start;
102 let mut last_total = 0u64;
103
104 while start.elapsed() < runtime {
105 if let Ok(samples) = reader.take() {
106 for _ in samples {
107 total.fetch_add(1, Ordering::Relaxed);
108 }
109 }
110 if last_report.elapsed() >= Duration::from_secs(1) {
111 let now_total = total.load(Ordering::Relaxed);
112 let delta = now_total - last_total;
113 let elapsed_s = last_report.elapsed().as_secs_f64();
114 let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115 println!(
116 "{:.3} size N total {} delta {} rate {:.2} kS/s",
117 start.elapsed().as_secs_f64(),
118 now_total,
119 delta,
120 rate_ks
121 );
122 last_report = Instant::now();
123 last_total = now_total;
124 }
125 std::thread::sleep(Duration::from_millis(1));
126 }
127 println!(
128 "# sub-done: total={} runtime={:.3}s",
129 total.load(Ordering::Relaxed),
130 start.elapsed().as_secs_f64()
131 );
132 Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136 // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137 // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138 // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139 // ist der PINGER.
140 let factory = DomainParticipantFactory::instance();
141 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144 let publisher = participant.create_publisher(PublisherQos::default());
145 let subscriber = participant.create_subscriber(SubscriberQos::default());
146 let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147 let reader =
148 subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153 let mut rtts_us: Vec<u64> = Vec::new();
154 let start = Instant::now();
155 let mut seq = 0i32;
156 while start.elapsed() < runtime {
157 let send_us = now_micros();
158 // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159 // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160 // RTT-Berechnung passiert lokal via timestamp-storage.
161 let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162 seq = seq.wrapping_add(1);
163
164 // Wartet bis 50ms auf Pong
165 let deadline = Instant::now() + Duration::from_millis(50);
166 while Instant::now() < deadline {
167 if let Ok(samples) = reader.take() {
168 for s in samples {
169 if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170 let recv_us = now_micros();
171 rtts_us.push(recv_us - send_us);
172 break;
173 }
174 }
175 if !rtts_us.is_empty() && rtts_us.last().is_some() {
176 break;
177 }
178 }
179 std::thread::sleep(Duration::from_micros(100));
180 }
181 std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182 }
183
184 if rtts_us.is_empty() {
185 println!("# pingpong: no RTTs collected — pong missing?");
186 return Ok(());
187 }
188 rtts_us.sort_unstable();
189 let n = rtts_us.len();
190 let mean = rtts_us.iter().sum::<u64>() / n as u64;
191 let p50 = rtts_us[n / 2];
192 let p90 = rtts_us[(n * 9) / 10];
193 let p99 = rtts_us[(n * 99) / 100];
194 println!(
195 "{:.3} rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196 start.elapsed().as_secs_f64(),
197 mean,
198 rtts_us.first().copied().unwrap_or(0),
199 p50,
200 p90,
201 p99,
202 rtts_us.last().copied().unwrap_or(0),
203 n
204 );
205 Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209 // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210 let factory = DomainParticipantFactory::instance();
211 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214 let publisher = participant.create_publisher(PublisherQos::default());
215 let subscriber = participant.create_subscriber(SubscriberQos::default());
216 let reader =
217 subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218 let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222 let start = Instant::now();
223 let mut echoed = 0u64;
224 while start.elapsed() < runtime {
225 if let Ok(samples) = reader.take() {
226 for s in samples {
227 if s.color == "PING" {
228 let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229 echoed += 1;
230 }
231 }
232 }
233 std::thread::sleep(Duration::from_micros(100));
234 }
235 println!(
236 "# pong-done: echoed={echoed} runtime={:.3}s",
237 start.elapsed().as_secs_f64()
238 );
239 Ok(())
240}28fn main() -> Result<(), Box<dyn std::error::Error>> {
29 let args: Vec<String> = env::args().collect();
30 let topic_name = args.get(1).map_or("Square", String::as_str);
31 let color = args.get(2).map_or("BLUE", String::as_str);
32 let domain_id: i32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(0);
33
34 if !["Square", "Circle", "Triangle"].contains(&topic_name) {
35 eprintln!(
36 "warning: topic '{topic_name}' ist kein Standard-ShapesDemo-Topic (Square/Circle/Triangle)",
37 );
38 }
39
40 let factory = DomainParticipantFactory::instance();
41 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
42 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
43 let publisher = participant.create_publisher(PublisherQos::default());
44 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
45
46 println!(
47 "shapes_demo_publisher: Topic={topic_name} Color={color} Domain={domain_id} — Ctrl-C to stop"
48 );
49
50 // Discovery-Wait — bis mindestens 1 Subscriber gematched hat.
51 let matched = writer.wait_for_matched_subscription(1, Duration::from_secs(10));
52 match matched {
53 Ok(()) => println!("matched subscriber found, starting publication"),
54 Err(_) => println!("no subscriber in 10s — publishing anyway, samples may drop"),
55 }
56
57 // Sinus-Bewegung auf 240x270-Canvas (ShapesDemo-Default).
58 let shapesize = 30;
59 let mut t: f32 = 0.0;
60 loop {
61 let x = (120.0 + 80.0 * (t).sin()) as i32;
62 let y = (135.0 + 90.0 * (t * 1.3).cos()) as i32;
63 let sample = ShapeType::new(color, x, y, shapesize);
64 writer.write(&sample)?;
65 println!(" -> color={color} x={x} y={y} size={shapesize}");
66 t += 0.15;
67 thread::sleep(Duration::from_millis(100));
68 }
69}36fn run_pub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
37 let factory = DomainParticipantFactory::instance();
38 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
39 let publisher = participant.create_publisher(PublisherQos::default());
40
41 let mut writers = Vec::with_capacity(count);
42 println!("[multi_pub] creating {count} writers");
43 let create_start = Instant::now();
44 for i in 0..count {
45 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
46 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
47 writers.push(writer);
48 }
49 println!(
50 "[multi_pub] created {count} writers in {:.2}s",
51 create_start.elapsed().as_secs_f64()
52 );
53
54 let start = Instant::now();
55 let mut tick = 0u64;
56 let mut total_writes = 0u64;
57 while start.elapsed() < runtime {
58 let tick_start = Instant::now();
59 for (i, w) in writers.iter().enumerate() {
60 let s = ShapeType::new(format!("MEP{i:04}"), tick as i32, 0, 30);
61 if w.write(&s).is_ok() {
62 total_writes += 1;
63 }
64 }
65 tick += 1;
66 // 1 Hz pro Writer — schlaeft bis zur naechsten Sekunde
67 if tick % 60 == 0 {
68 println!(
69 "{:.3} multi_pub tick={} writers={} total_writes={} loop_us={}",
70 start.elapsed().as_secs_f64(),
71 tick,
72 count,
73 total_writes,
74 tick_start.elapsed().as_micros()
75 );
76 }
77 let elapsed = tick_start.elapsed();
78 if elapsed < Duration::from_secs(1) {
79 std::thread::sleep(Duration::from_secs(1) - elapsed);
80 }
81 }
82 println!(
83 "# multi_pub-done: writers={count} total_writes={} runtime={:.3}s",
84 total_writes,
85 start.elapsed().as_secs_f64()
86 );
87 Ok(())
88}Sourcepub fn create_subscriber(&self, qos: SubscriberQos) -> Subscriber
pub fn create_subscriber(&self, qos: SubscriberQos) -> Subscriber
Erzeugt einen Subscriber.
Examples found in repository?
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28 let factory = DomainParticipantFactory::instance();
29 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32 let subscriber = participant.create_subscriber(SubscriberQos::default());
33 let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35 println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37 loop {
38 match reader.wait_for_data(Duration::from_secs(1)) {
39 Ok(()) => {
40 for sample in reader.take()? {
41 match std::str::from_utf8(&sample.data) {
42 Ok(s) => println!(" <- {s}"),
43 Err(_) => println!(" <- <{} bytes of non-UTF8>", sample.data.len()),
44 }
45 }
46 }
47 Err(DdsError::Timeout) => {} // idle tick
48 Err(e) => return Err(e.into()),
49 }
50 }
51}More examples
18fn main() -> Result<(), Box<dyn std::error::Error>> {
19 let args: Vec<String> = env::args().collect();
20 let topic_name = args.get(1).map_or("Square", String::as_str);
21 let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23 let factory = DomainParticipantFactory::instance();
24 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26 let subscriber = participant.create_subscriber(SubscriberQos::default());
27 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29 println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31 loop {
32 match reader.wait_for_data(Duration::from_secs(1)) {
33 Ok(()) => {
34 for sample in reader.take()? {
35 println!(
36 " <- color={:8} x={:4} y={:4} size={}",
37 sample.color, sample.x, sample.y, sample.shapesize
38 );
39 }
40 }
41 Err(DdsError::Timeout) => {}
42 Err(e) => return Err(e.into()),
43 }
44 }
45}92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93 let factory = DomainParticipantFactory::instance();
94 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96 let subscriber = participant.create_subscriber(SubscriberQos::default());
97 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99 let total = Arc::new(AtomicU64::new(0));
100 let start = Instant::now();
101 let mut last_report = start;
102 let mut last_total = 0u64;
103
104 while start.elapsed() < runtime {
105 if let Ok(samples) = reader.take() {
106 for _ in samples {
107 total.fetch_add(1, Ordering::Relaxed);
108 }
109 }
110 if last_report.elapsed() >= Duration::from_secs(1) {
111 let now_total = total.load(Ordering::Relaxed);
112 let delta = now_total - last_total;
113 let elapsed_s = last_report.elapsed().as_secs_f64();
114 let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115 println!(
116 "{:.3} size N total {} delta {} rate {:.2} kS/s",
117 start.elapsed().as_secs_f64(),
118 now_total,
119 delta,
120 rate_ks
121 );
122 last_report = Instant::now();
123 last_total = now_total;
124 }
125 std::thread::sleep(Duration::from_millis(1));
126 }
127 println!(
128 "# sub-done: total={} runtime={:.3}s",
129 total.load(Ordering::Relaxed),
130 start.elapsed().as_secs_f64()
131 );
132 Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136 // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137 // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138 // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139 // ist der PINGER.
140 let factory = DomainParticipantFactory::instance();
141 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144 let publisher = participant.create_publisher(PublisherQos::default());
145 let subscriber = participant.create_subscriber(SubscriberQos::default());
146 let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147 let reader =
148 subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153 let mut rtts_us: Vec<u64> = Vec::new();
154 let start = Instant::now();
155 let mut seq = 0i32;
156 while start.elapsed() < runtime {
157 let send_us = now_micros();
158 // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159 // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160 // RTT-Berechnung passiert lokal via timestamp-storage.
161 let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162 seq = seq.wrapping_add(1);
163
164 // Wartet bis 50ms auf Pong
165 let deadline = Instant::now() + Duration::from_millis(50);
166 while Instant::now() < deadline {
167 if let Ok(samples) = reader.take() {
168 for s in samples {
169 if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170 let recv_us = now_micros();
171 rtts_us.push(recv_us - send_us);
172 break;
173 }
174 }
175 if !rtts_us.is_empty() && rtts_us.last().is_some() {
176 break;
177 }
178 }
179 std::thread::sleep(Duration::from_micros(100));
180 }
181 std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182 }
183
184 if rtts_us.is_empty() {
185 println!("# pingpong: no RTTs collected — pong missing?");
186 return Ok(());
187 }
188 rtts_us.sort_unstable();
189 let n = rtts_us.len();
190 let mean = rtts_us.iter().sum::<u64>() / n as u64;
191 let p50 = rtts_us[n / 2];
192 let p90 = rtts_us[(n * 9) / 10];
193 let p99 = rtts_us[(n * 99) / 100];
194 println!(
195 "{:.3} rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196 start.elapsed().as_secs_f64(),
197 mean,
198 rtts_us.first().copied().unwrap_or(0),
199 p50,
200 p90,
201 p99,
202 rtts_us.last().copied().unwrap_or(0),
203 n
204 );
205 Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209 // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210 let factory = DomainParticipantFactory::instance();
211 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214 let publisher = participant.create_publisher(PublisherQos::default());
215 let subscriber = participant.create_subscriber(SubscriberQos::default());
216 let reader =
217 subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218 let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222 let start = Instant::now();
223 let mut echoed = 0u64;
224 while start.elapsed() < runtime {
225 if let Ok(samples) = reader.take() {
226 for s in samples {
227 if s.color == "PING" {
228 let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229 echoed += 1;
230 }
231 }
232 }
233 std::thread::sleep(Duration::from_micros(100));
234 }
235 println!(
236 "# pong-done: echoed={echoed} runtime={:.3}s",
237 start.elapsed().as_secs_f64()
238 );
239 Ok(())
240}90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91 let factory = DomainParticipantFactory::instance();
92 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93 let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95 let mut readers = Vec::with_capacity(count);
96 println!("[multi_sub] creating {count} readers");
97 let create_start = Instant::now();
98 for i in 0..count {
99 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101 readers.push(reader);
102 }
103 println!(
104 "[multi_sub] created {count} readers in {:.2}s",
105 create_start.elapsed().as_secs_f64()
106 );
107
108 let start = Instant::now();
109 let mut total_received = 0u64;
110 let mut last_report = start;
111 while start.elapsed() < runtime {
112 for r in &readers {
113 if let Ok(samples) = r.take() {
114 total_received += samples.len() as u64;
115 }
116 }
117 if last_report.elapsed() >= Duration::from_secs(60) {
118 println!(
119 "{:.3} multi_sub readers={} total_received={}",
120 start.elapsed().as_secs_f64(),
121 count,
122 total_received
123 );
124 last_report = Instant::now();
125 }
126 std::thread::sleep(Duration::from_millis(10));
127 }
128 println!(
129 "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130 total_received,
131 start.elapsed().as_secs_f64()
132 );
133 Ok(())
134}117fn main() -> Result<(), Box<dyn std::error::Error>> {
118 let args: Vec<String> = env::args().collect();
119 let domain_id: i32 = args.get(1).and_then(|s| s.parse().ok()).unwrap_or(0);
120
121 let stop = Arc::new(AtomicBool::new(false));
122 install_signal_handler(stop.clone());
123
124 let factory = DomainParticipantFactory::instance();
125 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
126 let subscriber = participant.create_subscriber(SubscriberQos::default());
127
128 let topics = ["Square", "Circle", "Triangle"];
129 let mut readers = Vec::new();
130 for t in &topics {
131 let topic = participant.create_topic::<ShapeType>(t, TopicQos::default())?;
132 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
133 readers.push((*t, reader));
134 }
135
136 eprintln!(
137 "shapes_demo_viewer: Domain={domain_id} — subscribed to Square / Circle / Triangle. Ctrl-C beendet."
138 );
139 eprintln!("Warte auf Discovery + erste Samples...");
140
141 // Hide cursor + clear screen.
142 print!("\x1B[?25l\x1B[2J");
143 let mut stdout = std::io::stdout();
144 stdout.flush().ok();
145
146 // Pro (topic, color) speichern wir die letzte Position.
147 let mut shapes: HashMap<(String, String), (i32, i32)> = HashMap::new();
148 let mut sample_count: u64 = 0;
149 let mut grid: Vec<Vec<(char, &'static str)>> = vec![vec![(' ', ""); VIEW_W]; VIEW_H];
150
151 while !stop.load(Ordering::Relaxed) {
152 // 1) Take all pending samples per topic.
153 for (topic_name, reader) in &readers {
154 if let Ok(samples) = reader.take() {
155 for sample in samples {
156 shapes.insert(
157 ((*topic_name).to_string(), sample.color.clone()),
158 (sample.x, sample.y),
159 );
160 sample_count += 1;
161 }
162 }
163 }
164
165 // 2) Clear grid.
166 for row in &mut grid {
167 for cell in row {
168 *cell = (' ', "");
169 }
170 }
171
172 // 3) Plot shapes.
173 for ((topic, color), (x, y)) in &shapes {
174 let gx = map_x(*x);
175 let gy = map_y(*y);
176 if gy < VIEW_H && gx < VIEW_W {
177 grid[gy][gx] = (glyph(topic), ansi_color(color));
178 }
179 }
180
181 // 4) Render: home, draw, status line.
182 print!("\x1B[H");
183 // top border
184 print!("┌");
185 for _ in 0..VIEW_W {
186 print!("─");
187 }
188 println!("┐");
189 for row in &grid {
190 print!("│");
191 for (ch, color) in row {
192 if color.is_empty() {
193 print!(" ");
194 } else {
195 print!("{color}{ch}{ANSI_RESET}");
196 }
197 }
198 println!("│");
199 }
200 print!("└");
201 for _ in 0..VIEW_W {
202 print!("─");
203 }
204 println!("┘");
205 println!(
206 "shapes={:3} samples={:6} domain={} — Ctrl-C beendet ",
207 shapes.len(),
208 sample_count,
209 domain_id,
210 );
211 stdout.flush().ok();
212
213 thread::sleep(Duration::from_millis(60));
214 }
215
216 // Show cursor again.
217 print!("\x1B[?25h");
218 println!("[shapes_demo_viewer] beendet. Total samples empfangen: {sample_count}");
219 Ok(())
220}Sourcepub fn topics_len(&self) -> usize
pub fn topics_len(&self) -> usize
Anzahl aktuell registrierter Topics. Diagnose-API.
Sourcepub fn discovered_participants_count(&self) -> usize
pub fn discovered_participants_count(&self) -> usize
Anzahl aktuell entdeckter Remote-Participants ueber SPDP.
Spec: OMG DDS 1.4 §2.2.2.2.1.7 get_discovered_participants.
0 im offline-Modus.
Sourcepub fn discovered_publications_count(&self) -> usize
pub fn discovered_publications_count(&self) -> usize
Anzahl aktuell im SEDP-Cache bekannter Remote-Publications.
Spec: OMG DDS 1.4 §2.2.2.2.1.9 get_discovered_topics (~analog).
Sourcepub fn discovered_subscriptions_count(&self) -> usize
pub fn discovered_subscriptions_count(&self) -> usize
Anzahl aktuell im SEDP-Cache bekannter Remote-Subscriptions.
Sourcepub fn ignore_participant(&self, handle: InstanceHandle) -> Result<()>
pub fn ignore_participant(&self, handle: InstanceHandle) -> Result<()>
Markiert einen entdeckten remote DomainParticipant als
“ignoriert” — alle weiteren SPDP-Beacons mit diesem Handle
fallen aus dem Builtin-Reader-Stream raus, und gleichzeitig
werden alle SEDP-Endpoints, die zum gleichen Participant-
Prefix gehoeren, ebenfalls verworfen (Spec §2.2.2.2.1.14).
Per Spec ist die Aktion monoton — ein einmal ignorierter Participant bleibt es fuer den Lebenszyklus dieses Participants.
§Errors
Aktuell keine — die Methode liefert Ok(()) immer. Spec laesst
OUT_OF_RESOURCES zu, was wir aber nicht aktiv produzieren.
Sourcepub fn ignore_topic(&self, handle: InstanceHandle) -> Result<()>
pub fn ignore_topic(&self, handle: InstanceHandle) -> Result<()>
Markiert ein entdecktes remote Topic als “ignoriert”. Spec §2.2.2.2.1.15.
§Errors
Sourcepub fn ignore_publication(&self, handle: InstanceHandle) -> Result<()>
pub fn ignore_publication(&self, handle: InstanceHandle) -> Result<()>
Markiert eine entdeckte remote Publication als “ignoriert”. Spec §2.2.2.2.1.16.
§Errors
Sourcepub fn ignore_subscription(&self, handle: InstanceHandle) -> Result<()>
pub fn ignore_subscription(&self, handle: InstanceHandle) -> Result<()>
Markiert eine entdeckte remote Subscription als “ignoriert”. Spec §2.2.2.2.1.17.
§Errors
Sourcepub fn is_participant_ignored(&self, handle: InstanceHandle) -> bool
pub fn is_participant_ignored(&self, handle: InstanceHandle) -> bool
true wenn handle per ignore_participant markiert wurde.
Sourcepub fn is_topic_ignored(&self, handle: InstanceHandle) -> bool
pub fn is_topic_ignored(&self, handle: InstanceHandle) -> bool
true wenn handle per ignore_topic markiert wurde.
Sourcepub fn is_publication_ignored(&self, handle: InstanceHandle) -> bool
pub fn is_publication_ignored(&self, handle: InstanceHandle) -> bool
true wenn handle per ignore_publication markiert wurde.
Sourcepub fn is_subscription_ignored(&self, handle: InstanceHandle) -> bool
pub fn is_subscription_ignored(&self, handle: InstanceHandle) -> bool
true wenn handle per ignore_subscription markiert wurde.
Sourcepub fn delete_contained_entities(&self) -> Result<()>
pub fn delete_contained_entities(&self) -> Result<()>
Loescht alle vom Participant gehaltenen Children (Publishers, Subscribers, Topics, Builtin-Reader-Inboxes). Spec §2.2.2.2.1.18 — analoger Pendant existiert in Publisher/Subscriber/DataReader, der hier rekursiv mit abgedeckt wird.
Offline-Verhalten:
- Topic-Registry geleert (lokale Topics).
- Publisher-/Subscriber-Tracker geleert.
- Builtin-Topic-Reader-Inboxes geleert (so dass
take()nachdelete_contained_entitiesein leeres Vec liefert). - Kein SEDP-Unannounce — das Live-Verhalten
uebernimmt das, sobald die Runtime ein
Drop/shutdown-Handle bekommt. Aktueller Stand: der Runtime-Thread laeuft bis zum Process-Exit.
§Errors
PreconditionNotMet wenn ein interner Mutex vergiftet ist.
Sourcepub fn publishers_len(&self) -> usize
pub fn publishers_len(&self) -> usize
Anzahl der per create_publisher getrackten Publisher.
Diagnose-API fuer Tests.
Sourcepub fn subscribers_len(&self) -> usize
pub fn subscribers_len(&self) -> usize
Anzahl der per create_subscriber getrackten Subscriber.
Sourcepub fn instance_handle(&self) -> InstanceHandle
pub fn instance_handle(&self) -> InstanceHandle
Liefert den InstanceHandle dieses Participants. Identifiziert
die Entity gegenueber DCPS-API-Konsumenten (Spec §2.2.2.1.1
get_instance_handle).
Sourcepub fn contains_entity(&self, handle: InstanceHandle) -> bool
pub fn contains_entity(&self, handle: InstanceHandle) -> bool
Spec §2.2.2.2.1.10 contains_entity — true wenn handle zu
diesem Participant oder einer seiner direkt oder rekursiv
enthaltenen Entities gehoert.
Eingeschlossene Entity-Typen:
- der Participant selbst
- alle per
create_topicregistrierten Topics - alle per
create_publisher/create_subscribererzeugten Publisher/Subscriber - rekursiv: alle per
Publisher::create_datawriter/Subscriber::create_datareadererzeugten DataWriter/DataReader.
Sourcepub fn get_discovered_participants(&self) -> Vec<InstanceHandle>
pub fn get_discovered_participants(&self) -> Vec<InstanceHandle>
Liefert die InstanceHandles aller aktuell entdeckten
remote Participants (Spec §2.2.2.2.1.27). Im offline-Modus
leer. Ignorierte Participants tauchen nicht auf.
Sourcepub fn get_discovered_participant_data(
&self,
handle: InstanceHandle,
) -> Result<ParticipantBuiltinTopicData>
pub fn get_discovered_participant_data( &self, handle: InstanceHandle, ) -> Result<ParticipantBuiltinTopicData>
Liefert die ParticipantBuiltinTopicData zu einem Handle aus
get_discovered_participants (Spec §2.2.2.2.1.28).
§Errors
BadParameter wenn handle keinen entdeckten Participant
referenziert (oder wenn er ignoriert wurde).
Sourcepub fn get_discovered_topics(&self) -> Vec<InstanceHandle>
pub fn get_discovered_topics(&self) -> Vec<InstanceHandle>
Liefert die InstanceHandles aller aktuell entdeckten
remote Topics. Spec §2.2.2.2.1.29.
Topics werden via SEDP-Pub/Sub-Announcements indirekt
entdeckt — pro (topic_name, type_name) synthetisieren wir
einen stabilen Schluessel via TopicBuiltinTopicData:: synthesize_key. Ignorierte Topics tauchen nicht auf.
Sourcepub fn get_discovered_topic_data(
&self,
handle: InstanceHandle,
) -> Result<TopicBuiltinTopicData>
pub fn get_discovered_topic_data( &self, handle: InstanceHandle, ) -> Result<TopicBuiltinTopicData>
Liefert die TopicBuiltinTopicData zu einem Handle aus
get_discovered_topics. Spec §2.2.2.2.1.30.
§Errors
BadParameter wenn handle keinem entdeckten Topic
entspricht (oder ignoriert wurde).
Sourcepub fn get_builtin_subscriber(&self) -> Arc<BuiltinSubscriber> ⓘ
pub fn get_builtin_subscriber(&self) -> Arc<BuiltinSubscriber> ⓘ
Builtin-Subscriber des Participants (DDS 1.4 §2.2.2.2.1.7).
Liefert immer denselben Subscriber-Handle (genau ein Builtin-Subscriber pro Participant). Er enthaelt 4 vor-erzeugte Reader fuer die Builtin-Topics:
DCPSParticipant→ParticipantBuiltinTopicDataDCPSTopic→TopicBuiltinTopicDataDCPSPublication→PublicationBuiltinTopicDataDCPSSubscription→SubscriptionBuiltinTopicData
SPDP-/SEDP-Receive triggert intern einen Sample-Insert, der
per take()/read() abgeholt werden kann (DDS 1.4 §2.2.5).
§Example
use zerodds_dcps::*;
let participant = DomainParticipantFactory::instance()
.create_participant_offline(0, DomainParticipantQos::default());
let bs = participant.get_builtin_subscriber();
let r = bs
.lookup_datareader::<DcpsParticipantBuiltinTopicData>("DCPSParticipant")
.expect("builtin reader");
// Anfangs leer (offline-Mode → keine SPDP-Empfange).
assert!(r.take().expect("take").is_empty());Sourcepub fn set_listener(
&self,
listener: Option<ArcDomainParticipantListener>,
mask: StatusMask,
)
pub fn set_listener( &self, listener: Option<ArcDomainParticipantListener>, mask: StatusMask, )
Setzt den DomainParticipantListener. listener=None loescht
den Slot. mask ist die StatusMask, die festlegt, welche
Status-Bits dieser Listener konsumiert (Spec §2.2.4.2.3 Bubble-Up).
Sourcepub fn get_listener(&self) -> Option<ArcDomainParticipantListener>
pub fn get_listener(&self) -> Option<ArcDomainParticipantListener>
Liefert den aktuell installierten Listener-Klon, falls vorhanden. Spec §2.2.2.2.3.x get_listener.
Trait Implementations§
Source§impl Clone for DomainParticipant
impl Clone for DomainParticipant
Source§fn clone(&self) -> DomainParticipant
fn clone(&self) -> DomainParticipant
1.0.0 (const: unstable) · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for DomainParticipant
impl Debug for DomainParticipant
Source§impl Entity for DomainParticipant
impl Entity for DomainParticipant
Source§type Qos = DomainParticipantQos
type Qos = DomainParticipantQos
DomainParticipantQos,
DataWriterQos, …).Source§fn set_qos(&self, qos: Self::Qos) -> Result<()>
fn set_qos(&self, qos: Self::Qos) -> Result<()>
ImmutablePolicy-Error.
Spec §2.2.2.1.2 set_qos. Read moreSource§fn enable(&self) -> Result<()>
fn enable(&self) -> Result<()>
enable. Read moreSource§fn entity_state(&self) -> Arc<EntityState> ⓘ
fn entity_state(&self) -> Arc<EntityState> ⓘ
Arc<EntityState>.Source§fn is_enabled(&self) -> bool
fn is_enabled(&self) -> bool
Source§fn get_status_condition(&self) -> StatusCondition
fn get_status_condition(&self) -> StatusCondition
StatusCondition dieser Entity.
Spec §2.2.2.1.6 get_status_condition.Source§fn get_status_changes(&self) -> StatusMask
fn get_status_changes(&self) -> StatusMask
get_status_changes.Source§fn get_instance_handle(&self) -> InstanceHandle
fn get_instance_handle(&self) -> InstanceHandle
get_instance_handle.