Skip to main content

DomainParticipant

Struct DomainParticipant 

Source
pub struct DomainParticipant { /* private fields */ }
Expand description

Der Participant.

Implementations§

Source§

impl DomainParticipant

Source

pub fn runtime(&self) -> Option<&Arc<DcpsRuntime>>

Interner Zugriff auf die Runtime — von Publisher/Subscriber verwendet, um DataWriter/Reader anzulegen. None wenn der Participant im offline-Modus ist.

Source

pub fn domain_id(&self) -> DomainId

Domain-Id.

Source

pub fn qos(&self) -> DomainParticipantQos

Liefert eine Kopie der DomainParticipantQos (Spec §2.2.2.2.1.4 get_qos).

Source

pub fn set_qos(&self, qos: DomainParticipantQos) -> Result<()>

Setzt die DomainParticipantQos (Spec §2.2.2.2.1.3 set_qos).

§Errors

Aktuell keine — die Methode liefert Ok(()) immer. Spec laesst IMMUTABLE_POLICY zu, was wir aber nicht aktiv produzieren (alle Policies sind im RC1 mutable).

Source

pub fn register_builtin_types(&self)

Registriert die 4 Spec-§7.6.5-Built-in-Types (DDS::String, DDS::KeyedString, DDS::Bytes, DDS::KeyedBytes) im lokalen TypeRegistry. Idempotent — doppelter Aufruf ueber- schreibt die Eintraege deterministisch.

Wird automatisch aus new()/new_with_runtime() aufgerufen, kann aber auch nach einem unregister_builtin_types()-Disable erneut aufgerufen werden.

Source

pub fn unregister_builtin_types(&self)

Loescht alle registrierten Built-in-Types. Wird heute nicht von Default-Pfaden gerufen — Test-Hilfsfunktion fuer Disable-Flag-Tests.

Source

pub fn find_builtin_type(&self, name: &str) -> Option<DynamicType>

Lookup eines Built-in-Types via Spec-Name (Spec §7.6.5). Gibt Some(DynamicType) zurueck wenn der Name bekannt ist (registriert via register_builtin_types).

Source

pub fn registered_type_count(&self) -> usize

Anzahl registrierter Built-in-Types. Nach new() == 4.

Source

pub fn enqueue_type_lookup(&self, hash: EquivalenceHash) -> bool

Versucht einen TypeLookup-Request fuer einen unbekannten EquivalenceHash zu queuen. Beachtet Backoff (5s zwischen Versuchen) und maximal 3 Wiederholungen pro Hash.

Returns: true wenn der Request gequeued wurde, false bei Backoff-Suppression oder Max-Attempts.

Source

pub fn drain_type_lookup_requests(&self) -> Vec<(EquivalenceHash, u64)>

Drainet die queued TypeLookup-Requests. Liefert Vec<(hash, seq)>. In Production-Umgebung wuerde der Caller die Hashes via TypeLookupClient + Reliable-Writer auf den TL_SVC_REQ_WRITER-Endpoint senden.

Source

pub fn ingest_type_lookup_reply( &self, types: Vec<(EquivalenceHash, MinimalTypeObject)>, ) -> usize

Empfaengt ein TypeLookup-Reply (TypeObjects pro Hash). Registriert die TypeObjects in einem internen TypeRegistry- Spiegel — danach kann ein gestoppter QoS-Match retried werden.

Anzahl erfolgreich registrierter Typen wird zurueckgegeben.

Source

pub fn on_remote_publication_discovered( &self, type_information_blob: Option<&[u8]>, ) -> usize

SEDP-Discovery-Hook: prueft eine eingehende PublicationBuiltinTopicData auf Type-Hashes, die lokal nicht aufloesbar sind. Bei Bedarf wird ein TypeLookup-Request via enqueue_type_lookup gequeued.

Der RPC-Pfad ist via DcpsRuntime::send_type_lookup_request auf den TL_SVC_REQ_*-Endpoints (XTypes 1.3 §7.6.3.3.4) live; diese Methode entscheidet pro Hash, ob ein Re-Request lohnt (lokale Registry-Lookup + Backoff-Tracking).

Returns: Anzahl gequeued unbekannter Hashes (max 2 — minimal + complete).

Source

pub fn on_remote_subscription_discovered( &self, type_information_blob: Option<&[u8]>, ) -> usize

SEDP-Discovery-Hook fuer SubscriptionBuiltinTopicData. Symmetrisch zu on_remote_publication_discovered.

Source

pub fn type_lookup_exhausted(&self, hash: EquivalenceHash) -> bool

True wenn fuer den Hash bereits MAX_ATTEMPTS erreicht. Wird vom Match-Re-Try-Pfad konsultiert: spaeter aufgeben statt endlos zu pollen.

Source

pub fn create_topic<T: DdsType>( &self, name: &str, qos: TopicQos, ) -> Result<Topic<T>>

Erzeugt einen typed Topic-Handle. Wiederholte Aufrufe mit gleichem Namen + Typ liefern denselben Handle (Ref-geteilt).

§Errors
  • InconsistentPolicy wenn ein Topic mit diesem Namen bereits unter anderem Typ registriert ist.
  • BadParameter bei leerem Namen.
Examples found in repository?
examples/hello_dds_subscriber.rs (line 31)
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28    let factory = DomainParticipantFactory::instance();
29    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31    let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32    let subscriber = participant.create_subscriber(SubscriberQos::default());
33    let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35    println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37    loop {
38        match reader.wait_for_data(Duration::from_secs(1)) {
39            Ok(()) => {
40                for sample in reader.take()? {
41                    match std::str::from_utf8(&sample.data) {
42                        Ok(s) => println!("  <- {s}"),
43                        Err(_) => println!("  <- <{} bytes of non-UTF8>", sample.data.len()),
44                    }
45                }
46            }
47            Err(DdsError::Timeout) => {} // idle tick
48            Err(e) => return Err(e.into()),
49        }
50    }
51}
More examples
Hide additional examples
examples/hello_dds_publisher.rs (line 42)
38fn main() -> Result<(), Box<dyn std::error::Error>> {
39    let factory = DomainParticipantFactory::instance();
40    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
41
42    let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
43    let publisher = participant.create_publisher(PublisherQos::default());
44    let writer = publisher.create_datawriter::<RawBytes>(&topic, DataWriterQos::default())?;
45
46    println!("hello_dds_publisher: sending on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
47
48    // Ein paar Sekunden warten, damit SPDP+SEDP Subscriber entdecken kann.
49    // Ohne Warte-Phase gehen die ersten Samples ins Leere (noch kein Reader
50    // gematched). v1.3 bringt `wait_for_matched_subscription`.
51    thread::sleep(Duration::from_secs(3));
52
53    let mut counter: u32 = 0;
54    loop {
55        let payload = format!("hello #{counter}");
56        writer.write(&RawBytes::new(payload.clone().into_bytes()))?;
57        println!("  -> {payload}");
58        counter = counter.wrapping_add(1);
59        thread::sleep(Duration::from_secs(1));
60    }
61}
examples/shapes_demo_subscriber.rs (line 25)
18fn main() -> Result<(), Box<dyn std::error::Error>> {
19    let args: Vec<String> = env::args().collect();
20    let topic_name = args.get(1).map_or("Square", String::as_str);
21    let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23    let factory = DomainParticipantFactory::instance();
24    let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25    let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26    let subscriber = participant.create_subscriber(SubscriberQos::default());
27    let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29    println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31    loop {
32        match reader.wait_for_data(Duration::from_secs(1)) {
33            Ok(()) => {
34                for sample in reader.take()? {
35                    println!(
36                        "  <- color={:8} x={:4} y={:4} size={}",
37                        sample.color, sample.x, sample.y, sample.shapesize
38                    );
39                }
40            }
41            Err(DdsError::Timeout) => {}
42            Err(e) => return Err(e.into()),
43        }
44    }
45}
examples/zerodds_perf.rs (line 48)
45fn run_pub(size: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
46    let factory = DomainParticipantFactory::instance();
47    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
48    let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
49    let publisher = participant.create_publisher(PublisherQos::default());
50    let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
51
52    // Discovery-Wait
53    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
54
55    let payload_color = "X".repeat(size.saturating_sub(4 * 4)); // size minus i32-Felder approx
56    let start = Instant::now();
57    let mut total = 0u64;
58    let mut last_report = start;
59    let mut samples_since_report = 0u64;
60
61    while start.elapsed() < runtime {
62        let s = ShapeType::new(payload_color.clone(), 0, 0, total as i32);
63        if writer.write(&s).is_ok() {
64            total += 1;
65            samples_since_report += 1;
66        }
67        // Report jede Sekunde
68        if last_report.elapsed() >= Duration::from_secs(1) {
69            let elapsed_s = last_report.elapsed().as_secs_f64();
70            let rate_ks = (samples_since_report as f64 / elapsed_s) / 1000.0;
71            let rate_mb =
72                (samples_since_report as f64 * size as f64 * 8.0 / elapsed_s) / 1_000_000.0;
73            println!(
74                "{:.3}  size {} total {} rate {:.2} kS/s {:.2} Mb/s",
75                start.elapsed().as_secs_f64(),
76                size,
77                total,
78                rate_ks,
79                rate_mb
80            );
81            last_report = Instant::now();
82            samples_since_report = 0;
83        }
84    }
85    println!(
86        "# pub-done: total={total} runtime={:.3}s",
87        start.elapsed().as_secs_f64()
88    );
89    Ok(())
90}
91
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93    let factory = DomainParticipantFactory::instance();
94    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95    let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96    let subscriber = participant.create_subscriber(SubscriberQos::default());
97    let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99    let total = Arc::new(AtomicU64::new(0));
100    let start = Instant::now();
101    let mut last_report = start;
102    let mut last_total = 0u64;
103
104    while start.elapsed() < runtime {
105        if let Ok(samples) = reader.take() {
106            for _ in samples {
107                total.fetch_add(1, Ordering::Relaxed);
108            }
109        }
110        if last_report.elapsed() >= Duration::from_secs(1) {
111            let now_total = total.load(Ordering::Relaxed);
112            let delta = now_total - last_total;
113            let elapsed_s = last_report.elapsed().as_secs_f64();
114            let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115            println!(
116                "{:.3}  size N total {} delta {} rate {:.2} kS/s",
117                start.elapsed().as_secs_f64(),
118                now_total,
119                delta,
120                rate_ks
121            );
122            last_report = Instant::now();
123            last_total = now_total;
124        }
125        std::thread::sleep(Duration::from_millis(1));
126    }
127    println!(
128        "# sub-done: total={} runtime={:.3}s",
129        total.load(Ordering::Relaxed),
130        start.elapsed().as_secs_f64()
131    );
132    Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136    // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137    // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138    // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139    // ist der PINGER.
140    let factory = DomainParticipantFactory::instance();
141    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142    let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143    let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144    let publisher = participant.create_publisher(PublisherQos::default());
145    let subscriber = participant.create_subscriber(SubscriberQos::default());
146    let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147    let reader =
148        subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151    let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153    let mut rtts_us: Vec<u64> = Vec::new();
154    let start = Instant::now();
155    let mut seq = 0i32;
156    while start.elapsed() < runtime {
157        let send_us = now_micros();
158        // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159        // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160        // RTT-Berechnung passiert lokal via timestamp-storage.
161        let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162        seq = seq.wrapping_add(1);
163
164        // Wartet bis 50ms auf Pong
165        let deadline = Instant::now() + Duration::from_millis(50);
166        while Instant::now() < deadline {
167            if let Ok(samples) = reader.take() {
168                for s in samples {
169                    if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170                        let recv_us = now_micros();
171                        rtts_us.push(recv_us - send_us);
172                        break;
173                    }
174                }
175                if !rtts_us.is_empty() && rtts_us.last().is_some() {
176                    break;
177                }
178            }
179            std::thread::sleep(Duration::from_micros(100));
180        }
181        std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182    }
183
184    if rtts_us.is_empty() {
185        println!("# pingpong: no RTTs collected — pong missing?");
186        return Ok(());
187    }
188    rtts_us.sort_unstable();
189    let n = rtts_us.len();
190    let mean = rtts_us.iter().sum::<u64>() / n as u64;
191    let p50 = rtts_us[n / 2];
192    let p90 = rtts_us[(n * 9) / 10];
193    let p99 = rtts_us[(n * 99) / 100];
194    println!(
195        "{:.3}  rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196        start.elapsed().as_secs_f64(),
197        mean,
198        rtts_us.first().copied().unwrap_or(0),
199        p50,
200        p90,
201        p99,
202        rtts_us.last().copied().unwrap_or(0),
203        n
204    );
205    Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209    // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210    let factory = DomainParticipantFactory::instance();
211    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212    let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213    let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214    let publisher = participant.create_publisher(PublisherQos::default());
215    let subscriber = participant.create_subscriber(SubscriberQos::default());
216    let reader =
217        subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218    let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220    let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222    let start = Instant::now();
223    let mut echoed = 0u64;
224    while start.elapsed() < runtime {
225        if let Ok(samples) = reader.take() {
226            for s in samples {
227                if s.color == "PING" {
228                    let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229                    echoed += 1;
230                }
231            }
232        }
233        std::thread::sleep(Duration::from_micros(100));
234    }
235    println!(
236        "# pong-done: echoed={echoed} runtime={:.3}s",
237        start.elapsed().as_secs_f64()
238    );
239    Ok(())
240}
examples/shapes_demo_publisher.rs (line 42)
28fn main() -> Result<(), Box<dyn std::error::Error>> {
29    let args: Vec<String> = env::args().collect();
30    let topic_name = args.get(1).map_or("Square", String::as_str);
31    let color = args.get(2).map_or("BLUE", String::as_str);
32    let domain_id: i32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(0);
33
34    if !["Square", "Circle", "Triangle"].contains(&topic_name) {
35        eprintln!(
36            "warning: topic '{topic_name}' ist kein Standard-ShapesDemo-Topic (Square/Circle/Triangle)",
37        );
38    }
39
40    let factory = DomainParticipantFactory::instance();
41    let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
42    let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
43    let publisher = participant.create_publisher(PublisherQos::default());
44    let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
45
46    println!(
47        "shapes_demo_publisher: Topic={topic_name} Color={color} Domain={domain_id} — Ctrl-C to stop"
48    );
49
50    // Discovery-Wait — bis mindestens 1 Subscriber gematched hat.
51    let matched = writer.wait_for_matched_subscription(1, Duration::from_secs(10));
52    match matched {
53        Ok(()) => println!("matched subscriber found, starting publication"),
54        Err(_) => println!("no subscriber in 10s — publishing anyway, samples may drop"),
55    }
56
57    // Sinus-Bewegung auf 240x270-Canvas (ShapesDemo-Default).
58    let shapesize = 30;
59    let mut t: f32 = 0.0;
60    loop {
61        let x = (120.0 + 80.0 * (t).sin()) as i32;
62        let y = (135.0 + 90.0 * (t * 1.3).cos()) as i32;
63        let sample = ShapeType::new(color, x, y, shapesize);
64        writer.write(&sample)?;
65        println!("  -> color={color} x={x} y={y} size={shapesize}");
66        t += 0.15;
67        thread::sleep(Duration::from_millis(100));
68    }
69}
examples/multi_endpoint_perf.rs (line 45)
36fn run_pub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
37    let factory = DomainParticipantFactory::instance();
38    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
39    let publisher = participant.create_publisher(PublisherQos::default());
40
41    let mut writers = Vec::with_capacity(count);
42    println!("[multi_pub] creating {count} writers");
43    let create_start = Instant::now();
44    for i in 0..count {
45        let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
46        let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
47        writers.push(writer);
48    }
49    println!(
50        "[multi_pub] created {count} writers in {:.2}s",
51        create_start.elapsed().as_secs_f64()
52    );
53
54    let start = Instant::now();
55    let mut tick = 0u64;
56    let mut total_writes = 0u64;
57    while start.elapsed() < runtime {
58        let tick_start = Instant::now();
59        for (i, w) in writers.iter().enumerate() {
60            let s = ShapeType::new(format!("MEP{i:04}"), tick as i32, 0, 30);
61            if w.write(&s).is_ok() {
62                total_writes += 1;
63            }
64        }
65        tick += 1;
66        // 1 Hz pro Writer — schlaeft bis zur naechsten Sekunde
67        if tick % 60 == 0 {
68            println!(
69                "{:.3}  multi_pub tick={} writers={} total_writes={} loop_us={}",
70                start.elapsed().as_secs_f64(),
71                tick,
72                count,
73                total_writes,
74                tick_start.elapsed().as_micros()
75            );
76        }
77        let elapsed = tick_start.elapsed();
78        if elapsed < Duration::from_secs(1) {
79            std::thread::sleep(Duration::from_secs(1) - elapsed);
80        }
81    }
82    println!(
83        "# multi_pub-done: writers={count} total_writes={} runtime={:.3}s",
84        total_writes,
85        start.elapsed().as_secs_f64()
86    );
87    Ok(())
88}
89
90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91    let factory = DomainParticipantFactory::instance();
92    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93    let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95    let mut readers = Vec::with_capacity(count);
96    println!("[multi_sub] creating {count} readers");
97    let create_start = Instant::now();
98    for i in 0..count {
99        let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100        let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101        readers.push(reader);
102    }
103    println!(
104        "[multi_sub] created {count} readers in {:.2}s",
105        create_start.elapsed().as_secs_f64()
106    );
107
108    let start = Instant::now();
109    let mut total_received = 0u64;
110    let mut last_report = start;
111    while start.elapsed() < runtime {
112        for r in &readers {
113            if let Ok(samples) = r.take() {
114                total_received += samples.len() as u64;
115            }
116        }
117        if last_report.elapsed() >= Duration::from_secs(60) {
118            println!(
119                "{:.3}  multi_sub readers={} total_received={}",
120                start.elapsed().as_secs_f64(),
121                count,
122                total_received
123            );
124            last_report = Instant::now();
125        }
126        std::thread::sleep(Duration::from_millis(10));
127    }
128    println!(
129        "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130        total_received,
131        start.elapsed().as_secs_f64()
132    );
133    Ok(())
134}
Source

pub fn lookup_topicdescription( &self, name: &str, ) -> Option<TopicDescriptionHandle>

Sofortiger lokaler Lookup eines Topics nach Name — gibt None zurueck, wenn kein lokales create_topic mit diesem Namen erfolgt ist. Macht keinen Discovery-Wait (das ist find_topic). Spec-Referenz: OMG DDS 1.4 §2.2.2.2.1.12 “lookup_topicdescription”.

Source

pub fn find_topic( &self, name: &str, timeout: Duration, ) -> Result<TopicDescriptionHandle>

Wartet bis ein Topic mit dem gegebenen Namen via Discovery (SEDP-Publication oder -Subscription) sichtbar ist — oder bis timeout abgelaufen ist. Spec-Referenz: OMG DDS 1.4 §2.2.2.2.1.11 find_topic.

Returns:

  • Ok(handle) mit Name + Type-Name + Participant, falls waehrend timeout ein passendes SEDP-Endpoint sichtbar wurde. Lokale Topics zaehlen ebenfalls (keine Pflicht zu warten, wenn create_topic schon lief).
  • Err(Timeout) wenn timeout abgelaufen ist.
§Errors
  • DdsError::Timeout wenn timeout ohne Discovery-Match abgelaufen ist.
  • DdsError::BadParameter bei leerem Namen.
Source

pub fn create_contentfilteredtopic<T: DdsType>( &self, name: &str, related_topic: &Topic<T>, filter_expression: &str, filter_parameters: Vec<String>, ) -> Result<ContentFilteredTopic<T>>

Erzeugt ein ContentFilteredTopic als Subset eines bereits vorhandenen Topic<T>. Spec-Referenz: OMG DDS 1.4 §2.2.2.2.1.13 create_contentfilteredtopic.

Die filter_expression ist ein SQL-Subset (siehe Annex B). filter_parameters sind Strings, die %0, %1, … in der Expression ersetzen.

§Errors
  • BadParameter bei leerem Namen oder leerer Expression.
  • BadParameter wenn die Filter-Expression nicht parst.
  • BadParameter wenn ein referenzierter %N-Parameter nicht im filter_parameters-Vec geliefert wird.
Source

pub fn create_multitopic<T: DdsType>( &self, name: &str, type_name: &str, related_topic_names: Vec<String>, subscription_expression: &str, expression_parameters: Vec<String>, ) -> Result<MultiTopic<T>>

Erzeugt eine MultiTopic als kombinierende TopicDescription ueber 1+ Underlying-Topics mit SQL-Subscription-Expression. Spec-Referenz: OMG DDS 1.4 §2.2.2.2.1.15 create_multitopic (optionales Spec-Feature).

§Errors
  • BadParameter bei leerem Namen oder Type-Namen.
  • BadParameter wenn related_topic_names leer ist.
  • BadParameter wenn die Subscription-Expression nicht parst.
  • BadParameter wenn ein referenzierter %N-Parameter nicht im expression_parameters-Vec geliefert wird.
Source

pub fn delete_multitopic<T: DdsType>(&self, mt: &MultiTopic<T>) -> Result<()>

Loescht eine MultiTopic. Spec §2.2.2.2.1.16 delete_multitopic. v1.2 ist es ein no-op-shim mit Participant- Match-Check.

§Errors

BadParameter wenn die MultiTopic zu einem anderen Participant gehoert.

Source

pub fn delete_contentfilteredtopic<T: DdsType>( &self, cft: &ContentFilteredTopic<T>, ) -> Result<()>

Loescht ein ContentFilteredTopic. Spec-Referenz: §2.2.2.2.1.14 delete_contentfilteredtopic.

In Rust ist das Lifetime-Handle des CFT bereits durch Drop abgedeckt — die zugrundeliegenden Ressourcen werden frei, sobald der ContentFilteredTopic<T> aus dem Scope geht. Diese Methode existiert fuer Spec-Compliance der C++-API und validiert den Participant-Match (Spec verlangt BadParameter, wenn das CFT zu einem anderen Participant gehoert).

§Errors
  • BadParameter wenn das CFT zu einem anderen Participant gehoert.
Source

pub fn create_publisher(&self, qos: PublisherQos) -> Publisher

Erzeugt einen Publisher mit gegebener QoS (Default reicht fuer v1.2).

Examples found in repository?
examples/hello_dds_publisher.rs (line 43)
38fn main() -> Result<(), Box<dyn std::error::Error>> {
39    let factory = DomainParticipantFactory::instance();
40    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
41
42    let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
43    let publisher = participant.create_publisher(PublisherQos::default());
44    let writer = publisher.create_datawriter::<RawBytes>(&topic, DataWriterQos::default())?;
45
46    println!("hello_dds_publisher: sending on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
47
48    // Ein paar Sekunden warten, damit SPDP+SEDP Subscriber entdecken kann.
49    // Ohne Warte-Phase gehen die ersten Samples ins Leere (noch kein Reader
50    // gematched). v1.3 bringt `wait_for_matched_subscription`.
51    thread::sleep(Duration::from_secs(3));
52
53    let mut counter: u32 = 0;
54    loop {
55        let payload = format!("hello #{counter}");
56        writer.write(&RawBytes::new(payload.clone().into_bytes()))?;
57        println!("  -> {payload}");
58        counter = counter.wrapping_add(1);
59        thread::sleep(Duration::from_secs(1));
60    }
61}
More examples
Hide additional examples
examples/zerodds_perf.rs (line 49)
45fn run_pub(size: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
46    let factory = DomainParticipantFactory::instance();
47    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
48    let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
49    let publisher = participant.create_publisher(PublisherQos::default());
50    let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
51
52    // Discovery-Wait
53    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
54
55    let payload_color = "X".repeat(size.saturating_sub(4 * 4)); // size minus i32-Felder approx
56    let start = Instant::now();
57    let mut total = 0u64;
58    let mut last_report = start;
59    let mut samples_since_report = 0u64;
60
61    while start.elapsed() < runtime {
62        let s = ShapeType::new(payload_color.clone(), 0, 0, total as i32);
63        if writer.write(&s).is_ok() {
64            total += 1;
65            samples_since_report += 1;
66        }
67        // Report jede Sekunde
68        if last_report.elapsed() >= Duration::from_secs(1) {
69            let elapsed_s = last_report.elapsed().as_secs_f64();
70            let rate_ks = (samples_since_report as f64 / elapsed_s) / 1000.0;
71            let rate_mb =
72                (samples_since_report as f64 * size as f64 * 8.0 / elapsed_s) / 1_000_000.0;
73            println!(
74                "{:.3}  size {} total {} rate {:.2} kS/s {:.2} Mb/s",
75                start.elapsed().as_secs_f64(),
76                size,
77                total,
78                rate_ks,
79                rate_mb
80            );
81            last_report = Instant::now();
82            samples_since_report = 0;
83        }
84    }
85    println!(
86        "# pub-done: total={total} runtime={:.3}s",
87        start.elapsed().as_secs_f64()
88    );
89    Ok(())
90}
91
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93    let factory = DomainParticipantFactory::instance();
94    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95    let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96    let subscriber = participant.create_subscriber(SubscriberQos::default());
97    let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99    let total = Arc::new(AtomicU64::new(0));
100    let start = Instant::now();
101    let mut last_report = start;
102    let mut last_total = 0u64;
103
104    while start.elapsed() < runtime {
105        if let Ok(samples) = reader.take() {
106            for _ in samples {
107                total.fetch_add(1, Ordering::Relaxed);
108            }
109        }
110        if last_report.elapsed() >= Duration::from_secs(1) {
111            let now_total = total.load(Ordering::Relaxed);
112            let delta = now_total - last_total;
113            let elapsed_s = last_report.elapsed().as_secs_f64();
114            let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115            println!(
116                "{:.3}  size N total {} delta {} rate {:.2} kS/s",
117                start.elapsed().as_secs_f64(),
118                now_total,
119                delta,
120                rate_ks
121            );
122            last_report = Instant::now();
123            last_total = now_total;
124        }
125        std::thread::sleep(Duration::from_millis(1));
126    }
127    println!(
128        "# sub-done: total={} runtime={:.3}s",
129        total.load(Ordering::Relaxed),
130        start.elapsed().as_secs_f64()
131    );
132    Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136    // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137    // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138    // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139    // ist der PINGER.
140    let factory = DomainParticipantFactory::instance();
141    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142    let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143    let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144    let publisher = participant.create_publisher(PublisherQos::default());
145    let subscriber = participant.create_subscriber(SubscriberQos::default());
146    let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147    let reader =
148        subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151    let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153    let mut rtts_us: Vec<u64> = Vec::new();
154    let start = Instant::now();
155    let mut seq = 0i32;
156    while start.elapsed() < runtime {
157        let send_us = now_micros();
158        // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159        // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160        // RTT-Berechnung passiert lokal via timestamp-storage.
161        let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162        seq = seq.wrapping_add(1);
163
164        // Wartet bis 50ms auf Pong
165        let deadline = Instant::now() + Duration::from_millis(50);
166        while Instant::now() < deadline {
167            if let Ok(samples) = reader.take() {
168                for s in samples {
169                    if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170                        let recv_us = now_micros();
171                        rtts_us.push(recv_us - send_us);
172                        break;
173                    }
174                }
175                if !rtts_us.is_empty() && rtts_us.last().is_some() {
176                    break;
177                }
178            }
179            std::thread::sleep(Duration::from_micros(100));
180        }
181        std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182    }
183
184    if rtts_us.is_empty() {
185        println!("# pingpong: no RTTs collected — pong missing?");
186        return Ok(());
187    }
188    rtts_us.sort_unstable();
189    let n = rtts_us.len();
190    let mean = rtts_us.iter().sum::<u64>() / n as u64;
191    let p50 = rtts_us[n / 2];
192    let p90 = rtts_us[(n * 9) / 10];
193    let p99 = rtts_us[(n * 99) / 100];
194    println!(
195        "{:.3}  rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196        start.elapsed().as_secs_f64(),
197        mean,
198        rtts_us.first().copied().unwrap_or(0),
199        p50,
200        p90,
201        p99,
202        rtts_us.last().copied().unwrap_or(0),
203        n
204    );
205    Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209    // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210    let factory = DomainParticipantFactory::instance();
211    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212    let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213    let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214    let publisher = participant.create_publisher(PublisherQos::default());
215    let subscriber = participant.create_subscriber(SubscriberQos::default());
216    let reader =
217        subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218    let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220    let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222    let start = Instant::now();
223    let mut echoed = 0u64;
224    while start.elapsed() < runtime {
225        if let Ok(samples) = reader.take() {
226            for s in samples {
227                if s.color == "PING" {
228                    let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229                    echoed += 1;
230                }
231            }
232        }
233        std::thread::sleep(Duration::from_micros(100));
234    }
235    println!(
236        "# pong-done: echoed={echoed} runtime={:.3}s",
237        start.elapsed().as_secs_f64()
238    );
239    Ok(())
240}
examples/shapes_demo_publisher.rs (line 43)
28fn main() -> Result<(), Box<dyn std::error::Error>> {
29    let args: Vec<String> = env::args().collect();
30    let topic_name = args.get(1).map_or("Square", String::as_str);
31    let color = args.get(2).map_or("BLUE", String::as_str);
32    let domain_id: i32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(0);
33
34    if !["Square", "Circle", "Triangle"].contains(&topic_name) {
35        eprintln!(
36            "warning: topic '{topic_name}' ist kein Standard-ShapesDemo-Topic (Square/Circle/Triangle)",
37        );
38    }
39
40    let factory = DomainParticipantFactory::instance();
41    let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
42    let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
43    let publisher = participant.create_publisher(PublisherQos::default());
44    let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
45
46    println!(
47        "shapes_demo_publisher: Topic={topic_name} Color={color} Domain={domain_id} — Ctrl-C to stop"
48    );
49
50    // Discovery-Wait — bis mindestens 1 Subscriber gematched hat.
51    let matched = writer.wait_for_matched_subscription(1, Duration::from_secs(10));
52    match matched {
53        Ok(()) => println!("matched subscriber found, starting publication"),
54        Err(_) => println!("no subscriber in 10s — publishing anyway, samples may drop"),
55    }
56
57    // Sinus-Bewegung auf 240x270-Canvas (ShapesDemo-Default).
58    let shapesize = 30;
59    let mut t: f32 = 0.0;
60    loop {
61        let x = (120.0 + 80.0 * (t).sin()) as i32;
62        let y = (135.0 + 90.0 * (t * 1.3).cos()) as i32;
63        let sample = ShapeType::new(color, x, y, shapesize);
64        writer.write(&sample)?;
65        println!("  -> color={color} x={x} y={y} size={shapesize}");
66        t += 0.15;
67        thread::sleep(Duration::from_millis(100));
68    }
69}
examples/multi_endpoint_perf.rs (line 39)
36fn run_pub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
37    let factory = DomainParticipantFactory::instance();
38    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
39    let publisher = participant.create_publisher(PublisherQos::default());
40
41    let mut writers = Vec::with_capacity(count);
42    println!("[multi_pub] creating {count} writers");
43    let create_start = Instant::now();
44    for i in 0..count {
45        let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
46        let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
47        writers.push(writer);
48    }
49    println!(
50        "[multi_pub] created {count} writers in {:.2}s",
51        create_start.elapsed().as_secs_f64()
52    );
53
54    let start = Instant::now();
55    let mut tick = 0u64;
56    let mut total_writes = 0u64;
57    while start.elapsed() < runtime {
58        let tick_start = Instant::now();
59        for (i, w) in writers.iter().enumerate() {
60            let s = ShapeType::new(format!("MEP{i:04}"), tick as i32, 0, 30);
61            if w.write(&s).is_ok() {
62                total_writes += 1;
63            }
64        }
65        tick += 1;
66        // 1 Hz pro Writer — schlaeft bis zur naechsten Sekunde
67        if tick % 60 == 0 {
68            println!(
69                "{:.3}  multi_pub tick={} writers={} total_writes={} loop_us={}",
70                start.elapsed().as_secs_f64(),
71                tick,
72                count,
73                total_writes,
74                tick_start.elapsed().as_micros()
75            );
76        }
77        let elapsed = tick_start.elapsed();
78        if elapsed < Duration::from_secs(1) {
79            std::thread::sleep(Duration::from_secs(1) - elapsed);
80        }
81    }
82    println!(
83        "# multi_pub-done: writers={count} total_writes={} runtime={:.3}s",
84        total_writes,
85        start.elapsed().as_secs_f64()
86    );
87    Ok(())
88}
Source

pub fn create_subscriber(&self, qos: SubscriberQos) -> Subscriber

Erzeugt einen Subscriber.

Examples found in repository?
examples/hello_dds_subscriber.rs (line 32)
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28    let factory = DomainParticipantFactory::instance();
29    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31    let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32    let subscriber = participant.create_subscriber(SubscriberQos::default());
33    let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35    println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37    loop {
38        match reader.wait_for_data(Duration::from_secs(1)) {
39            Ok(()) => {
40                for sample in reader.take()? {
41                    match std::str::from_utf8(&sample.data) {
42                        Ok(s) => println!("  <- {s}"),
43                        Err(_) => println!("  <- <{} bytes of non-UTF8>", sample.data.len()),
44                    }
45                }
46            }
47            Err(DdsError::Timeout) => {} // idle tick
48            Err(e) => return Err(e.into()),
49        }
50    }
51}
More examples
Hide additional examples
examples/shapes_demo_subscriber.rs (line 26)
18fn main() -> Result<(), Box<dyn std::error::Error>> {
19    let args: Vec<String> = env::args().collect();
20    let topic_name = args.get(1).map_or("Square", String::as_str);
21    let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23    let factory = DomainParticipantFactory::instance();
24    let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25    let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26    let subscriber = participant.create_subscriber(SubscriberQos::default());
27    let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29    println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31    loop {
32        match reader.wait_for_data(Duration::from_secs(1)) {
33            Ok(()) => {
34                for sample in reader.take()? {
35                    println!(
36                        "  <- color={:8} x={:4} y={:4} size={}",
37                        sample.color, sample.x, sample.y, sample.shapesize
38                    );
39                }
40            }
41            Err(DdsError::Timeout) => {}
42            Err(e) => return Err(e.into()),
43        }
44    }
45}
examples/zerodds_perf.rs (line 96)
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93    let factory = DomainParticipantFactory::instance();
94    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95    let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96    let subscriber = participant.create_subscriber(SubscriberQos::default());
97    let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99    let total = Arc::new(AtomicU64::new(0));
100    let start = Instant::now();
101    let mut last_report = start;
102    let mut last_total = 0u64;
103
104    while start.elapsed() < runtime {
105        if let Ok(samples) = reader.take() {
106            for _ in samples {
107                total.fetch_add(1, Ordering::Relaxed);
108            }
109        }
110        if last_report.elapsed() >= Duration::from_secs(1) {
111            let now_total = total.load(Ordering::Relaxed);
112            let delta = now_total - last_total;
113            let elapsed_s = last_report.elapsed().as_secs_f64();
114            let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115            println!(
116                "{:.3}  size N total {} delta {} rate {:.2} kS/s",
117                start.elapsed().as_secs_f64(),
118                now_total,
119                delta,
120                rate_ks
121            );
122            last_report = Instant::now();
123            last_total = now_total;
124        }
125        std::thread::sleep(Duration::from_millis(1));
126    }
127    println!(
128        "# sub-done: total={} runtime={:.3}s",
129        total.load(Ordering::Relaxed),
130        start.elapsed().as_secs_f64()
131    );
132    Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136    // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137    // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138    // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139    // ist der PINGER.
140    let factory = DomainParticipantFactory::instance();
141    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142    let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143    let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144    let publisher = participant.create_publisher(PublisherQos::default());
145    let subscriber = participant.create_subscriber(SubscriberQos::default());
146    let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147    let reader =
148        subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151    let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153    let mut rtts_us: Vec<u64> = Vec::new();
154    let start = Instant::now();
155    let mut seq = 0i32;
156    while start.elapsed() < runtime {
157        let send_us = now_micros();
158        // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159        // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160        // RTT-Berechnung passiert lokal via timestamp-storage.
161        let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162        seq = seq.wrapping_add(1);
163
164        // Wartet bis 50ms auf Pong
165        let deadline = Instant::now() + Duration::from_millis(50);
166        while Instant::now() < deadline {
167            if let Ok(samples) = reader.take() {
168                for s in samples {
169                    if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170                        let recv_us = now_micros();
171                        rtts_us.push(recv_us - send_us);
172                        break;
173                    }
174                }
175                if !rtts_us.is_empty() && rtts_us.last().is_some() {
176                    break;
177                }
178            }
179            std::thread::sleep(Duration::from_micros(100));
180        }
181        std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182    }
183
184    if rtts_us.is_empty() {
185        println!("# pingpong: no RTTs collected — pong missing?");
186        return Ok(());
187    }
188    rtts_us.sort_unstable();
189    let n = rtts_us.len();
190    let mean = rtts_us.iter().sum::<u64>() / n as u64;
191    let p50 = rtts_us[n / 2];
192    let p90 = rtts_us[(n * 9) / 10];
193    let p99 = rtts_us[(n * 99) / 100];
194    println!(
195        "{:.3}  rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196        start.elapsed().as_secs_f64(),
197        mean,
198        rtts_us.first().copied().unwrap_or(0),
199        p50,
200        p90,
201        p99,
202        rtts_us.last().copied().unwrap_or(0),
203        n
204    );
205    Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209    // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210    let factory = DomainParticipantFactory::instance();
211    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212    let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213    let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214    let publisher = participant.create_publisher(PublisherQos::default());
215    let subscriber = participant.create_subscriber(SubscriberQos::default());
216    let reader =
217        subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218    let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220    let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222    let start = Instant::now();
223    let mut echoed = 0u64;
224    while start.elapsed() < runtime {
225        if let Ok(samples) = reader.take() {
226            for s in samples {
227                if s.color == "PING" {
228                    let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229                    echoed += 1;
230                }
231            }
232        }
233        std::thread::sleep(Duration::from_micros(100));
234    }
235    println!(
236        "# pong-done: echoed={echoed} runtime={:.3}s",
237        start.elapsed().as_secs_f64()
238    );
239    Ok(())
240}
examples/multi_endpoint_perf.rs (line 93)
90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91    let factory = DomainParticipantFactory::instance();
92    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93    let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95    let mut readers = Vec::with_capacity(count);
96    println!("[multi_sub] creating {count} readers");
97    let create_start = Instant::now();
98    for i in 0..count {
99        let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100        let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101        readers.push(reader);
102    }
103    println!(
104        "[multi_sub] created {count} readers in {:.2}s",
105        create_start.elapsed().as_secs_f64()
106    );
107
108    let start = Instant::now();
109    let mut total_received = 0u64;
110    let mut last_report = start;
111    while start.elapsed() < runtime {
112        for r in &readers {
113            if let Ok(samples) = r.take() {
114                total_received += samples.len() as u64;
115            }
116        }
117        if last_report.elapsed() >= Duration::from_secs(60) {
118            println!(
119                "{:.3}  multi_sub readers={} total_received={}",
120                start.elapsed().as_secs_f64(),
121                count,
122                total_received
123            );
124            last_report = Instant::now();
125        }
126        std::thread::sleep(Duration::from_millis(10));
127    }
128    println!(
129        "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130        total_received,
131        start.elapsed().as_secs_f64()
132    );
133    Ok(())
134}
examples/shapes_demo_viewer.rs (line 126)
117fn main() -> Result<(), Box<dyn std::error::Error>> {
118    let args: Vec<String> = env::args().collect();
119    let domain_id: i32 = args.get(1).and_then(|s| s.parse().ok()).unwrap_or(0);
120
121    let stop = Arc::new(AtomicBool::new(false));
122    install_signal_handler(stop.clone());
123
124    let factory = DomainParticipantFactory::instance();
125    let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
126    let subscriber = participant.create_subscriber(SubscriberQos::default());
127
128    let topics = ["Square", "Circle", "Triangle"];
129    let mut readers = Vec::new();
130    for t in &topics {
131        let topic = participant.create_topic::<ShapeType>(t, TopicQos::default())?;
132        let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
133        readers.push((*t, reader));
134    }
135
136    eprintln!(
137        "shapes_demo_viewer: Domain={domain_id} — subscribed to Square / Circle / Triangle. Ctrl-C beendet."
138    );
139    eprintln!("Warte auf Discovery + erste Samples...");
140
141    // Hide cursor + clear screen.
142    print!("\x1B[?25l\x1B[2J");
143    let mut stdout = std::io::stdout();
144    stdout.flush().ok();
145
146    // Pro (topic, color) speichern wir die letzte Position.
147    let mut shapes: HashMap<(String, String), (i32, i32)> = HashMap::new();
148    let mut sample_count: u64 = 0;
149    let mut grid: Vec<Vec<(char, &'static str)>> = vec![vec![(' ', ""); VIEW_W]; VIEW_H];
150
151    while !stop.load(Ordering::Relaxed) {
152        // 1) Take all pending samples per topic.
153        for (topic_name, reader) in &readers {
154            if let Ok(samples) = reader.take() {
155                for sample in samples {
156                    shapes.insert(
157                        ((*topic_name).to_string(), sample.color.clone()),
158                        (sample.x, sample.y),
159                    );
160                    sample_count += 1;
161                }
162            }
163        }
164
165        // 2) Clear grid.
166        for row in &mut grid {
167            for cell in row {
168                *cell = (' ', "");
169            }
170        }
171
172        // 3) Plot shapes.
173        for ((topic, color), (x, y)) in &shapes {
174            let gx = map_x(*x);
175            let gy = map_y(*y);
176            if gy < VIEW_H && gx < VIEW_W {
177                grid[gy][gx] = (glyph(topic), ansi_color(color));
178            }
179        }
180
181        // 4) Render: home, draw, status line.
182        print!("\x1B[H");
183        // top border
184        print!("┌");
185        for _ in 0..VIEW_W {
186            print!("─");
187        }
188        println!("┐");
189        for row in &grid {
190            print!("│");
191            for (ch, color) in row {
192                if color.is_empty() {
193                    print!(" ");
194                } else {
195                    print!("{color}{ch}{ANSI_RESET}");
196                }
197            }
198            println!("│");
199        }
200        print!("└");
201        for _ in 0..VIEW_W {
202            print!("─");
203        }
204        println!("┘");
205        println!(
206            "shapes={:3} samples={:6} domain={} — Ctrl-C beendet                            ",
207            shapes.len(),
208            sample_count,
209            domain_id,
210        );
211        stdout.flush().ok();
212
213        thread::sleep(Duration::from_millis(60));
214    }
215
216    // Show cursor again.
217    print!("\x1B[?25h");
218    println!("[shapes_demo_viewer] beendet. Total samples empfangen: {sample_count}");
219    Ok(())
220}
Source

pub fn topics_len(&self) -> usize

Anzahl aktuell registrierter Topics. Diagnose-API.

Source

pub fn discovered_participants_count(&self) -> usize

Anzahl aktuell entdeckter Remote-Participants ueber SPDP. Spec: OMG DDS 1.4 §2.2.2.2.1.7 get_discovered_participants. 0 im offline-Modus.

Source

pub fn discovered_publications_count(&self) -> usize

Anzahl aktuell im SEDP-Cache bekannter Remote-Publications. Spec: OMG DDS 1.4 §2.2.2.2.1.9 get_discovered_topics (~analog).

Source

pub fn discovered_subscriptions_count(&self) -> usize

Anzahl aktuell im SEDP-Cache bekannter Remote-Subscriptions.

Source

pub fn ignore_participant(&self, handle: InstanceHandle) -> Result<()>

Markiert einen entdeckten remote DomainParticipant als “ignoriert” — alle weiteren SPDP-Beacons mit diesem Handle fallen aus dem Builtin-Reader-Stream raus, und gleichzeitig werden alle SEDP-Endpoints, die zum gleichen Participant- Prefix gehoeren, ebenfalls verworfen (Spec §2.2.2.2.1.14).

Per Spec ist die Aktion monoton — ein einmal ignorierter Participant bleibt es fuer den Lebenszyklus dieses Participants.

§Errors

Aktuell keine — die Methode liefert Ok(()) immer. Spec laesst OUT_OF_RESOURCES zu, was wir aber nicht aktiv produzieren.

Source

pub fn ignore_topic(&self, handle: InstanceHandle) -> Result<()>

Markiert ein entdecktes remote Topic als “ignoriert”. Spec §2.2.2.2.1.15.

§Errors

Wie Self::ignore_participant.

Source

pub fn ignore_publication(&self, handle: InstanceHandle) -> Result<()>

Markiert eine entdeckte remote Publication als “ignoriert”. Spec §2.2.2.2.1.16.

§Errors

Wie Self::ignore_participant.

Source

pub fn ignore_subscription(&self, handle: InstanceHandle) -> Result<()>

Markiert eine entdeckte remote Subscription als “ignoriert”. Spec §2.2.2.2.1.17.

§Errors

Wie Self::ignore_participant.

Source

pub fn is_participant_ignored(&self, handle: InstanceHandle) -> bool

true wenn handle per ignore_participant markiert wurde.

Source

pub fn is_topic_ignored(&self, handle: InstanceHandle) -> bool

true wenn handle per ignore_topic markiert wurde.

Source

pub fn is_publication_ignored(&self, handle: InstanceHandle) -> bool

true wenn handle per ignore_publication markiert wurde.

Source

pub fn is_subscription_ignored(&self, handle: InstanceHandle) -> bool

true wenn handle per ignore_subscription markiert wurde.

Source

pub fn delete_contained_entities(&self) -> Result<()>

Loescht alle vom Participant gehaltenen Children (Publishers, Subscribers, Topics, Builtin-Reader-Inboxes). Spec §2.2.2.2.1.18 — analoger Pendant existiert in Publisher/Subscriber/DataReader, der hier rekursiv mit abgedeckt wird.

Offline-Verhalten:

  • Topic-Registry geleert (lokale Topics).
  • Publisher-/Subscriber-Tracker geleert.
  • Builtin-Topic-Reader-Inboxes geleert (so dass take() nach delete_contained_entities ein leeres Vec liefert).
  • Kein SEDP-Unannounce — das Live-Verhalten uebernimmt das, sobald die Runtime ein Drop/shutdown-Handle bekommt. Aktueller Stand: der Runtime-Thread laeuft bis zum Process-Exit.
§Errors

PreconditionNotMet wenn ein interner Mutex vergiftet ist.

Source

pub fn publishers_len(&self) -> usize

Anzahl der per create_publisher getrackten Publisher. Diagnose-API fuer Tests.

Source

pub fn subscribers_len(&self) -> usize

Anzahl der per create_subscriber getrackten Subscriber.

Source

pub fn instance_handle(&self) -> InstanceHandle

Liefert den InstanceHandle dieses Participants. Identifiziert die Entity gegenueber DCPS-API-Konsumenten (Spec §2.2.2.1.1 get_instance_handle).

Source

pub fn contains_entity(&self, handle: InstanceHandle) -> bool

Spec §2.2.2.2.1.10 contains_entitytrue wenn handle zu diesem Participant oder einer seiner direkt oder rekursiv enthaltenen Entities gehoert.

Eingeschlossene Entity-Typen:

  • der Participant selbst
  • alle per create_topic registrierten Topics
  • alle per create_publisher / create_subscriber erzeugten Publisher/Subscriber
  • rekursiv: alle per Publisher::create_datawriter / Subscriber::create_datareader erzeugten DataWriter/DataReader.
Source

pub fn get_discovered_participants(&self) -> Vec<InstanceHandle>

Liefert die InstanceHandles aller aktuell entdeckten remote Participants (Spec §2.2.2.2.1.27). Im offline-Modus leer. Ignorierte Participants tauchen nicht auf.

Source

pub fn get_discovered_participant_data( &self, handle: InstanceHandle, ) -> Result<ParticipantBuiltinTopicData>

Liefert die ParticipantBuiltinTopicData zu einem Handle aus get_discovered_participants (Spec §2.2.2.2.1.28).

§Errors

BadParameter wenn handle keinen entdeckten Participant referenziert (oder wenn er ignoriert wurde).

Source

pub fn get_discovered_topics(&self) -> Vec<InstanceHandle>

Liefert die InstanceHandles aller aktuell entdeckten remote Topics. Spec §2.2.2.2.1.29.

Topics werden via SEDP-Pub/Sub-Announcements indirekt entdeckt — pro (topic_name, type_name) synthetisieren wir einen stabilen Schluessel via TopicBuiltinTopicData:: synthesize_key. Ignorierte Topics tauchen nicht auf.

Source

pub fn get_discovered_topic_data( &self, handle: InstanceHandle, ) -> Result<TopicBuiltinTopicData>

Liefert die TopicBuiltinTopicData zu einem Handle aus get_discovered_topics. Spec §2.2.2.2.1.30.

§Errors

BadParameter wenn handle keinem entdeckten Topic entspricht (oder ignoriert wurde).

Source

pub fn get_builtin_subscriber(&self) -> Arc<BuiltinSubscriber>

Builtin-Subscriber des Participants (DDS 1.4 §2.2.2.2.1.7).

Liefert immer denselben Subscriber-Handle (genau ein Builtin-Subscriber pro Participant). Er enthaelt 4 vor-erzeugte Reader fuer die Builtin-Topics:

  • DCPSParticipantParticipantBuiltinTopicData
  • DCPSTopicTopicBuiltinTopicData
  • DCPSPublicationPublicationBuiltinTopicData
  • DCPSSubscriptionSubscriptionBuiltinTopicData

SPDP-/SEDP-Receive triggert intern einen Sample-Insert, der per take()/read() abgeholt werden kann (DDS 1.4 §2.2.5).

§Example
use zerodds_dcps::*;
let participant = DomainParticipantFactory::instance()
    .create_participant_offline(0, DomainParticipantQos::default());
let bs = participant.get_builtin_subscriber();
let r = bs
    .lookup_datareader::<DcpsParticipantBuiltinTopicData>("DCPSParticipant")
    .expect("builtin reader");
// Anfangs leer (offline-Mode → keine SPDP-Empfange).
assert!(r.take().expect("take").is_empty());
Source

pub fn set_listener( &self, listener: Option<ArcDomainParticipantListener>, mask: StatusMask, )

Setzt den DomainParticipantListener. listener=None loescht den Slot. mask ist die StatusMask, die festlegt, welche Status-Bits dieser Listener konsumiert (Spec §2.2.4.2.3 Bubble-Up).

Source

pub fn get_listener(&self) -> Option<ArcDomainParticipantListener>

Liefert den aktuell installierten Listener-Klon, falls vorhanden. Spec §2.2.2.2.3.x get_listener.

Trait Implementations§

Source§

impl Clone for DomainParticipant

Source§

fn clone(&self) -> DomainParticipant

Returns a duplicate of the value. Read more
1.0.0 (const: unstable) · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for DomainParticipant

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Entity for DomainParticipant

Source§

type Qos = DomainParticipantQos

QoS-Typ fuer diese Entity (z.B. DomainParticipantQos, DataWriterQos, …).
Source§

fn get_qos(&self) -> Self::Qos

Liefert die aktuelle QoS (clone). Spec §2.2.2.1.2 get_qos.
Source§

fn set_qos(&self, qos: Self::Qos) -> Result<()>

Aendert QoS. Pre-enable: alles erlaubt. Post-enable: nur Felder mit “Changeable=YES” — sonst ImmutablePolicy-Error. Spec §2.2.2.1.2 set_qos. Read more
Source§

fn enable(&self) -> Result<()>

Enabled die Entity (idempotent). Spec §2.2.2.1.4 enable. Read more
Source§

fn entity_state(&self) -> Arc<EntityState>

Interner Accessor — jede Impl liefert ihren Arc<EntityState>.
Source§

fn is_enabled(&self) -> bool

True wenn die Entity bereits enabled ist.
Source§

fn get_status_condition(&self) -> StatusCondition

StatusCondition dieser Entity. Spec §2.2.2.1.6 get_status_condition.
Source§

fn get_status_changes(&self) -> StatusMask

Bitmask der Status-Kinds, die seit letztem Read geaendert haben. Spec §2.2.2.1.5 get_status_changes.
Source§

fn get_instance_handle(&self) -> InstanceHandle

Lokaler 64-Bit-Identifier. Spec §2.2.2.1.7 get_instance_handle.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.