pub struct DomainParticipantFactory { /* private fields */ }Expand description
Factory-Singleton.
Implementations§
Source§impl DomainParticipantFactory
impl DomainParticipantFactory
Sourcepub fn instance() -> &'static Self
pub fn instance() -> &'static Self
Liefert den Prozess-weiten Factory-Singleton (Spec §2.2.2.2.2.1
get_instance).
Examples found in repository?
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28 let factory = DomainParticipantFactory::instance();
29 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32 let subscriber = participant.create_subscriber(SubscriberQos::default());
33 let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35 println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37 loop {
38 match reader.wait_for_data(Duration::from_secs(1)) {
39 Ok(()) => {
40 for sample in reader.take()? {
41 match std::str::from_utf8(&sample.data) {
42 Ok(s) => println!(" <- {s}"),
43 Err(_) => println!(" <- <{} bytes of non-UTF8>", sample.data.len()),
44 }
45 }
46 }
47 Err(DdsError::Timeout) => {} // idle tick
48 Err(e) => return Err(e.into()),
49 }
50 }
51}More examples
38fn main() -> Result<(), Box<dyn std::error::Error>> {
39 let factory = DomainParticipantFactory::instance();
40 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
41
42 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
43 let publisher = participant.create_publisher(PublisherQos::default());
44 let writer = publisher.create_datawriter::<RawBytes>(&topic, DataWriterQos::default())?;
45
46 println!("hello_dds_publisher: sending on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
47
48 // Ein paar Sekunden warten, damit SPDP+SEDP Subscriber entdecken kann.
49 // Ohne Warte-Phase gehen die ersten Samples ins Leere (noch kein Reader
50 // gematched). v1.3 bringt `wait_for_matched_subscription`.
51 thread::sleep(Duration::from_secs(3));
52
53 let mut counter: u32 = 0;
54 loop {
55 let payload = format!("hello #{counter}");
56 writer.write(&RawBytes::new(payload.clone().into_bytes()))?;
57 println!(" -> {payload}");
58 counter = counter.wrapping_add(1);
59 thread::sleep(Duration::from_secs(1));
60 }
61}18fn main() -> Result<(), Box<dyn std::error::Error>> {
19 let args: Vec<String> = env::args().collect();
20 let topic_name = args.get(1).map_or("Square", String::as_str);
21 let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23 let factory = DomainParticipantFactory::instance();
24 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26 let subscriber = participant.create_subscriber(SubscriberQos::default());
27 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29 println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31 loop {
32 match reader.wait_for_data(Duration::from_secs(1)) {
33 Ok(()) => {
34 for sample in reader.take()? {
35 println!(
36 " <- color={:8} x={:4} y={:4} size={}",
37 sample.color, sample.x, sample.y, sample.shapesize
38 );
39 }
40 }
41 Err(DdsError::Timeout) => {}
42 Err(e) => return Err(e.into()),
43 }
44 }
45}45fn run_pub(size: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
46 let factory = DomainParticipantFactory::instance();
47 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
48 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
49 let publisher = participant.create_publisher(PublisherQos::default());
50 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
51
52 // Discovery-Wait
53 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
54
55 let payload_color = "X".repeat(size.saturating_sub(4 * 4)); // size minus i32-Felder approx
56 let start = Instant::now();
57 let mut total = 0u64;
58 let mut last_report = start;
59 let mut samples_since_report = 0u64;
60
61 while start.elapsed() < runtime {
62 let s = ShapeType::new(payload_color.clone(), 0, 0, total as i32);
63 if writer.write(&s).is_ok() {
64 total += 1;
65 samples_since_report += 1;
66 }
67 // Report jede Sekunde
68 if last_report.elapsed() >= Duration::from_secs(1) {
69 let elapsed_s = last_report.elapsed().as_secs_f64();
70 let rate_ks = (samples_since_report as f64 / elapsed_s) / 1000.0;
71 let rate_mb =
72 (samples_since_report as f64 * size as f64 * 8.0 / elapsed_s) / 1_000_000.0;
73 println!(
74 "{:.3} size {} total {} rate {:.2} kS/s {:.2} Mb/s",
75 start.elapsed().as_secs_f64(),
76 size,
77 total,
78 rate_ks,
79 rate_mb
80 );
81 last_report = Instant::now();
82 samples_since_report = 0;
83 }
84 }
85 println!(
86 "# pub-done: total={total} runtime={:.3}s",
87 start.elapsed().as_secs_f64()
88 );
89 Ok(())
90}
91
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93 let factory = DomainParticipantFactory::instance();
94 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96 let subscriber = participant.create_subscriber(SubscriberQos::default());
97 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99 let total = Arc::new(AtomicU64::new(0));
100 let start = Instant::now();
101 let mut last_report = start;
102 let mut last_total = 0u64;
103
104 while start.elapsed() < runtime {
105 if let Ok(samples) = reader.take() {
106 for _ in samples {
107 total.fetch_add(1, Ordering::Relaxed);
108 }
109 }
110 if last_report.elapsed() >= Duration::from_secs(1) {
111 let now_total = total.load(Ordering::Relaxed);
112 let delta = now_total - last_total;
113 let elapsed_s = last_report.elapsed().as_secs_f64();
114 let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115 println!(
116 "{:.3} size N total {} delta {} rate {:.2} kS/s",
117 start.elapsed().as_secs_f64(),
118 now_total,
119 delta,
120 rate_ks
121 );
122 last_report = Instant::now();
123 last_total = now_total;
124 }
125 std::thread::sleep(Duration::from_millis(1));
126 }
127 println!(
128 "# sub-done: total={} runtime={:.3}s",
129 total.load(Ordering::Relaxed),
130 start.elapsed().as_secs_f64()
131 );
132 Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136 // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137 // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138 // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139 // ist der PINGER.
140 let factory = DomainParticipantFactory::instance();
141 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144 let publisher = participant.create_publisher(PublisherQos::default());
145 let subscriber = participant.create_subscriber(SubscriberQos::default());
146 let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147 let reader =
148 subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153 let mut rtts_us: Vec<u64> = Vec::new();
154 let start = Instant::now();
155 let mut seq = 0i32;
156 while start.elapsed() < runtime {
157 let send_us = now_micros();
158 // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159 // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160 // RTT-Berechnung passiert lokal via timestamp-storage.
161 let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162 seq = seq.wrapping_add(1);
163
164 // Wartet bis 50ms auf Pong
165 let deadline = Instant::now() + Duration::from_millis(50);
166 while Instant::now() < deadline {
167 if let Ok(samples) = reader.take() {
168 for s in samples {
169 if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170 let recv_us = now_micros();
171 rtts_us.push(recv_us - send_us);
172 break;
173 }
174 }
175 if !rtts_us.is_empty() && rtts_us.last().is_some() {
176 break;
177 }
178 }
179 std::thread::sleep(Duration::from_micros(100));
180 }
181 std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182 }
183
184 if rtts_us.is_empty() {
185 println!("# pingpong: no RTTs collected — pong missing?");
186 return Ok(());
187 }
188 rtts_us.sort_unstable();
189 let n = rtts_us.len();
190 let mean = rtts_us.iter().sum::<u64>() / n as u64;
191 let p50 = rtts_us[n / 2];
192 let p90 = rtts_us[(n * 9) / 10];
193 let p99 = rtts_us[(n * 99) / 100];
194 println!(
195 "{:.3} rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196 start.elapsed().as_secs_f64(),
197 mean,
198 rtts_us.first().copied().unwrap_or(0),
199 p50,
200 p90,
201 p99,
202 rtts_us.last().copied().unwrap_or(0),
203 n
204 );
205 Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209 // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210 let factory = DomainParticipantFactory::instance();
211 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214 let publisher = participant.create_publisher(PublisherQos::default());
215 let subscriber = participant.create_subscriber(SubscriberQos::default());
216 let reader =
217 subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218 let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222 let start = Instant::now();
223 let mut echoed = 0u64;
224 while start.elapsed() < runtime {
225 if let Ok(samples) = reader.take() {
226 for s in samples {
227 if s.color == "PING" {
228 let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229 echoed += 1;
230 }
231 }
232 }
233 std::thread::sleep(Duration::from_micros(100));
234 }
235 println!(
236 "# pong-done: echoed={echoed} runtime={:.3}s",
237 start.elapsed().as_secs_f64()
238 );
239 Ok(())
240}28fn main() -> Result<(), Box<dyn std::error::Error>> {
29 let args: Vec<String> = env::args().collect();
30 let topic_name = args.get(1).map_or("Square", String::as_str);
31 let color = args.get(2).map_or("BLUE", String::as_str);
32 let domain_id: i32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(0);
33
34 if !["Square", "Circle", "Triangle"].contains(&topic_name) {
35 eprintln!(
36 "warning: topic '{topic_name}' ist kein Standard-ShapesDemo-Topic (Square/Circle/Triangle)",
37 );
38 }
39
40 let factory = DomainParticipantFactory::instance();
41 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
42 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
43 let publisher = participant.create_publisher(PublisherQos::default());
44 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
45
46 println!(
47 "shapes_demo_publisher: Topic={topic_name} Color={color} Domain={domain_id} — Ctrl-C to stop"
48 );
49
50 // Discovery-Wait — bis mindestens 1 Subscriber gematched hat.
51 let matched = writer.wait_for_matched_subscription(1, Duration::from_secs(10));
52 match matched {
53 Ok(()) => println!("matched subscriber found, starting publication"),
54 Err(_) => println!("no subscriber in 10s — publishing anyway, samples may drop"),
55 }
56
57 // Sinus-Bewegung auf 240x270-Canvas (ShapesDemo-Default).
58 let shapesize = 30;
59 let mut t: f32 = 0.0;
60 loop {
61 let x = (120.0 + 80.0 * (t).sin()) as i32;
62 let y = (135.0 + 90.0 * (t * 1.3).cos()) as i32;
63 let sample = ShapeType::new(color, x, y, shapesize);
64 writer.write(&sample)?;
65 println!(" -> color={color} x={x} y={y} size={shapesize}");
66 t += 0.15;
67 thread::sleep(Duration::from_millis(100));
68 }
69}36fn run_pub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
37 let factory = DomainParticipantFactory::instance();
38 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
39 let publisher = participant.create_publisher(PublisherQos::default());
40
41 let mut writers = Vec::with_capacity(count);
42 println!("[multi_pub] creating {count} writers");
43 let create_start = Instant::now();
44 for i in 0..count {
45 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
46 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
47 writers.push(writer);
48 }
49 println!(
50 "[multi_pub] created {count} writers in {:.2}s",
51 create_start.elapsed().as_secs_f64()
52 );
53
54 let start = Instant::now();
55 let mut tick = 0u64;
56 let mut total_writes = 0u64;
57 while start.elapsed() < runtime {
58 let tick_start = Instant::now();
59 for (i, w) in writers.iter().enumerate() {
60 let s = ShapeType::new(format!("MEP{i:04}"), tick as i32, 0, 30);
61 if w.write(&s).is_ok() {
62 total_writes += 1;
63 }
64 }
65 tick += 1;
66 // 1 Hz pro Writer — schlaeft bis zur naechsten Sekunde
67 if tick % 60 == 0 {
68 println!(
69 "{:.3} multi_pub tick={} writers={} total_writes={} loop_us={}",
70 start.elapsed().as_secs_f64(),
71 tick,
72 count,
73 total_writes,
74 tick_start.elapsed().as_micros()
75 );
76 }
77 let elapsed = tick_start.elapsed();
78 if elapsed < Duration::from_secs(1) {
79 std::thread::sleep(Duration::from_secs(1) - elapsed);
80 }
81 }
82 println!(
83 "# multi_pub-done: writers={count} total_writes={} runtime={:.3}s",
84 total_writes,
85 start.elapsed().as_secs_f64()
86 );
87 Ok(())
88}
89
90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91 let factory = DomainParticipantFactory::instance();
92 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93 let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95 let mut readers = Vec::with_capacity(count);
96 println!("[multi_sub] creating {count} readers");
97 let create_start = Instant::now();
98 for i in 0..count {
99 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101 readers.push(reader);
102 }
103 println!(
104 "[multi_sub] created {count} readers in {:.2}s",
105 create_start.elapsed().as_secs_f64()
106 );
107
108 let start = Instant::now();
109 let mut total_received = 0u64;
110 let mut last_report = start;
111 while start.elapsed() < runtime {
112 for r in &readers {
113 if let Ok(samples) = r.take() {
114 total_received += samples.len() as u64;
115 }
116 }
117 if last_report.elapsed() >= Duration::from_secs(60) {
118 println!(
119 "{:.3} multi_sub readers={} total_received={}",
120 start.elapsed().as_secs_f64(),
121 count,
122 total_received
123 );
124 last_report = Instant::now();
125 }
126 std::thread::sleep(Duration::from_millis(10));
127 }
128 println!(
129 "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130 total_received,
131 start.elapsed().as_secs_f64()
132 );
133 Ok(())
134}Sourcepub fn create_participant(
&self,
domain_id: DomainId,
qos: DomainParticipantQos,
) -> Result<DomainParticipant>
pub fn create_participant( &self, domain_id: DomainId, qos: DomainParticipantQos, ) -> Result<DomainParticipant>
Erzeugt einen neuen DomainParticipant fuer die gegebene
Domain-Id. Startet die DcpsRuntime mit Default-Config —
UDP-Sockets + SPDP/SEDP-Threads.
§Errors
DdsError::TransportError wenn die UDP-Sockets nicht binden.
Examples found in repository?
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28 let factory = DomainParticipantFactory::instance();
29 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32 let subscriber = participant.create_subscriber(SubscriberQos::default());
33 let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35 println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37 loop {
38 match reader.wait_for_data(Duration::from_secs(1)) {
39 Ok(()) => {
40 for sample in reader.take()? {
41 match std::str::from_utf8(&sample.data) {
42 Ok(s) => println!(" <- {s}"),
43 Err(_) => println!(" <- <{} bytes of non-UTF8>", sample.data.len()),
44 }
45 }
46 }
47 Err(DdsError::Timeout) => {} // idle tick
48 Err(e) => return Err(e.into()),
49 }
50 }
51}More examples
38fn main() -> Result<(), Box<dyn std::error::Error>> {
39 let factory = DomainParticipantFactory::instance();
40 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
41
42 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
43 let publisher = participant.create_publisher(PublisherQos::default());
44 let writer = publisher.create_datawriter::<RawBytes>(&topic, DataWriterQos::default())?;
45
46 println!("hello_dds_publisher: sending on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
47
48 // Ein paar Sekunden warten, damit SPDP+SEDP Subscriber entdecken kann.
49 // Ohne Warte-Phase gehen die ersten Samples ins Leere (noch kein Reader
50 // gematched). v1.3 bringt `wait_for_matched_subscription`.
51 thread::sleep(Duration::from_secs(3));
52
53 let mut counter: u32 = 0;
54 loop {
55 let payload = format!("hello #{counter}");
56 writer.write(&RawBytes::new(payload.clone().into_bytes()))?;
57 println!(" -> {payload}");
58 counter = counter.wrapping_add(1);
59 thread::sleep(Duration::from_secs(1));
60 }
61}18fn main() -> Result<(), Box<dyn std::error::Error>> {
19 let args: Vec<String> = env::args().collect();
20 let topic_name = args.get(1).map_or("Square", String::as_str);
21 let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23 let factory = DomainParticipantFactory::instance();
24 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26 let subscriber = participant.create_subscriber(SubscriberQos::default());
27 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29 println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31 loop {
32 match reader.wait_for_data(Duration::from_secs(1)) {
33 Ok(()) => {
34 for sample in reader.take()? {
35 println!(
36 " <- color={:8} x={:4} y={:4} size={}",
37 sample.color, sample.x, sample.y, sample.shapesize
38 );
39 }
40 }
41 Err(DdsError::Timeout) => {}
42 Err(e) => return Err(e.into()),
43 }
44 }
45}45fn run_pub(size: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
46 let factory = DomainParticipantFactory::instance();
47 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
48 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
49 let publisher = participant.create_publisher(PublisherQos::default());
50 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
51
52 // Discovery-Wait
53 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
54
55 let payload_color = "X".repeat(size.saturating_sub(4 * 4)); // size minus i32-Felder approx
56 let start = Instant::now();
57 let mut total = 0u64;
58 let mut last_report = start;
59 let mut samples_since_report = 0u64;
60
61 while start.elapsed() < runtime {
62 let s = ShapeType::new(payload_color.clone(), 0, 0, total as i32);
63 if writer.write(&s).is_ok() {
64 total += 1;
65 samples_since_report += 1;
66 }
67 // Report jede Sekunde
68 if last_report.elapsed() >= Duration::from_secs(1) {
69 let elapsed_s = last_report.elapsed().as_secs_f64();
70 let rate_ks = (samples_since_report as f64 / elapsed_s) / 1000.0;
71 let rate_mb =
72 (samples_since_report as f64 * size as f64 * 8.0 / elapsed_s) / 1_000_000.0;
73 println!(
74 "{:.3} size {} total {} rate {:.2} kS/s {:.2} Mb/s",
75 start.elapsed().as_secs_f64(),
76 size,
77 total,
78 rate_ks,
79 rate_mb
80 );
81 last_report = Instant::now();
82 samples_since_report = 0;
83 }
84 }
85 println!(
86 "# pub-done: total={total} runtime={:.3}s",
87 start.elapsed().as_secs_f64()
88 );
89 Ok(())
90}
91
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93 let factory = DomainParticipantFactory::instance();
94 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96 let subscriber = participant.create_subscriber(SubscriberQos::default());
97 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99 let total = Arc::new(AtomicU64::new(0));
100 let start = Instant::now();
101 let mut last_report = start;
102 let mut last_total = 0u64;
103
104 while start.elapsed() < runtime {
105 if let Ok(samples) = reader.take() {
106 for _ in samples {
107 total.fetch_add(1, Ordering::Relaxed);
108 }
109 }
110 if last_report.elapsed() >= Duration::from_secs(1) {
111 let now_total = total.load(Ordering::Relaxed);
112 let delta = now_total - last_total;
113 let elapsed_s = last_report.elapsed().as_secs_f64();
114 let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115 println!(
116 "{:.3} size N total {} delta {} rate {:.2} kS/s",
117 start.elapsed().as_secs_f64(),
118 now_total,
119 delta,
120 rate_ks
121 );
122 last_report = Instant::now();
123 last_total = now_total;
124 }
125 std::thread::sleep(Duration::from_millis(1));
126 }
127 println!(
128 "# sub-done: total={} runtime={:.3}s",
129 total.load(Ordering::Relaxed),
130 start.elapsed().as_secs_f64()
131 );
132 Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136 // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137 // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138 // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139 // ist der PINGER.
140 let factory = DomainParticipantFactory::instance();
141 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144 let publisher = participant.create_publisher(PublisherQos::default());
145 let subscriber = participant.create_subscriber(SubscriberQos::default());
146 let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147 let reader =
148 subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153 let mut rtts_us: Vec<u64> = Vec::new();
154 let start = Instant::now();
155 let mut seq = 0i32;
156 while start.elapsed() < runtime {
157 let send_us = now_micros();
158 // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159 // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160 // RTT-Berechnung passiert lokal via timestamp-storage.
161 let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162 seq = seq.wrapping_add(1);
163
164 // Wartet bis 50ms auf Pong
165 let deadline = Instant::now() + Duration::from_millis(50);
166 while Instant::now() < deadline {
167 if let Ok(samples) = reader.take() {
168 for s in samples {
169 if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170 let recv_us = now_micros();
171 rtts_us.push(recv_us - send_us);
172 break;
173 }
174 }
175 if !rtts_us.is_empty() && rtts_us.last().is_some() {
176 break;
177 }
178 }
179 std::thread::sleep(Duration::from_micros(100));
180 }
181 std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182 }
183
184 if rtts_us.is_empty() {
185 println!("# pingpong: no RTTs collected — pong missing?");
186 return Ok(());
187 }
188 rtts_us.sort_unstable();
189 let n = rtts_us.len();
190 let mean = rtts_us.iter().sum::<u64>() / n as u64;
191 let p50 = rtts_us[n / 2];
192 let p90 = rtts_us[(n * 9) / 10];
193 let p99 = rtts_us[(n * 99) / 100];
194 println!(
195 "{:.3} rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196 start.elapsed().as_secs_f64(),
197 mean,
198 rtts_us.first().copied().unwrap_or(0),
199 p50,
200 p90,
201 p99,
202 rtts_us.last().copied().unwrap_or(0),
203 n
204 );
205 Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209 // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210 let factory = DomainParticipantFactory::instance();
211 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214 let publisher = participant.create_publisher(PublisherQos::default());
215 let subscriber = participant.create_subscriber(SubscriberQos::default());
216 let reader =
217 subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218 let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222 let start = Instant::now();
223 let mut echoed = 0u64;
224 while start.elapsed() < runtime {
225 if let Ok(samples) = reader.take() {
226 for s in samples {
227 if s.color == "PING" {
228 let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229 echoed += 1;
230 }
231 }
232 }
233 std::thread::sleep(Duration::from_micros(100));
234 }
235 println!(
236 "# pong-done: echoed={echoed} runtime={:.3}s",
237 start.elapsed().as_secs_f64()
238 );
239 Ok(())
240}28fn main() -> Result<(), Box<dyn std::error::Error>> {
29 let args: Vec<String> = env::args().collect();
30 let topic_name = args.get(1).map_or("Square", String::as_str);
31 let color = args.get(2).map_or("BLUE", String::as_str);
32 let domain_id: i32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(0);
33
34 if !["Square", "Circle", "Triangle"].contains(&topic_name) {
35 eprintln!(
36 "warning: topic '{topic_name}' ist kein Standard-ShapesDemo-Topic (Square/Circle/Triangle)",
37 );
38 }
39
40 let factory = DomainParticipantFactory::instance();
41 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
42 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
43 let publisher = participant.create_publisher(PublisherQos::default());
44 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
45
46 println!(
47 "shapes_demo_publisher: Topic={topic_name} Color={color} Domain={domain_id} — Ctrl-C to stop"
48 );
49
50 // Discovery-Wait — bis mindestens 1 Subscriber gematched hat.
51 let matched = writer.wait_for_matched_subscription(1, Duration::from_secs(10));
52 match matched {
53 Ok(()) => println!("matched subscriber found, starting publication"),
54 Err(_) => println!("no subscriber in 10s — publishing anyway, samples may drop"),
55 }
56
57 // Sinus-Bewegung auf 240x270-Canvas (ShapesDemo-Default).
58 let shapesize = 30;
59 let mut t: f32 = 0.0;
60 loop {
61 let x = (120.0 + 80.0 * (t).sin()) as i32;
62 let y = (135.0 + 90.0 * (t * 1.3).cos()) as i32;
63 let sample = ShapeType::new(color, x, y, shapesize);
64 writer.write(&sample)?;
65 println!(" -> color={color} x={x} y={y} size={shapesize}");
66 t += 0.15;
67 thread::sleep(Duration::from_millis(100));
68 }
69}36fn run_pub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
37 let factory = DomainParticipantFactory::instance();
38 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
39 let publisher = participant.create_publisher(PublisherQos::default());
40
41 let mut writers = Vec::with_capacity(count);
42 println!("[multi_pub] creating {count} writers");
43 let create_start = Instant::now();
44 for i in 0..count {
45 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
46 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
47 writers.push(writer);
48 }
49 println!(
50 "[multi_pub] created {count} writers in {:.2}s",
51 create_start.elapsed().as_secs_f64()
52 );
53
54 let start = Instant::now();
55 let mut tick = 0u64;
56 let mut total_writes = 0u64;
57 while start.elapsed() < runtime {
58 let tick_start = Instant::now();
59 for (i, w) in writers.iter().enumerate() {
60 let s = ShapeType::new(format!("MEP{i:04}"), tick as i32, 0, 30);
61 if w.write(&s).is_ok() {
62 total_writes += 1;
63 }
64 }
65 tick += 1;
66 // 1 Hz pro Writer — schlaeft bis zur naechsten Sekunde
67 if tick % 60 == 0 {
68 println!(
69 "{:.3} multi_pub tick={} writers={} total_writes={} loop_us={}",
70 start.elapsed().as_secs_f64(),
71 tick,
72 count,
73 total_writes,
74 tick_start.elapsed().as_micros()
75 );
76 }
77 let elapsed = tick_start.elapsed();
78 if elapsed < Duration::from_secs(1) {
79 std::thread::sleep(Duration::from_secs(1) - elapsed);
80 }
81 }
82 println!(
83 "# multi_pub-done: writers={count} total_writes={} runtime={:.3}s",
84 total_writes,
85 start.elapsed().as_secs_f64()
86 );
87 Ok(())
88}
89
90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91 let factory = DomainParticipantFactory::instance();
92 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93 let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95 let mut readers = Vec::with_capacity(count);
96 println!("[multi_sub] creating {count} readers");
97 let create_start = Instant::now();
98 for i in 0..count {
99 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101 readers.push(reader);
102 }
103 println!(
104 "[multi_sub] created {count} readers in {:.2}s",
105 create_start.elapsed().as_secs_f64()
106 );
107
108 let start = Instant::now();
109 let mut total_received = 0u64;
110 let mut last_report = start;
111 while start.elapsed() < runtime {
112 for r in &readers {
113 if let Ok(samples) = r.take() {
114 total_received += samples.len() as u64;
115 }
116 }
117 if last_report.elapsed() >= Duration::from_secs(60) {
118 println!(
119 "{:.3} multi_sub readers={} total_received={}",
120 start.elapsed().as_secs_f64(),
121 count,
122 total_received
123 );
124 last_report = Instant::now();
125 }
126 std::thread::sleep(Duration::from_millis(10));
127 }
128 println!(
129 "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130 total_received,
131 start.elapsed().as_secs_f64()
132 );
133 Ok(())
134}Sourcepub fn create_participant_with_config(
&self,
domain_id: DomainId,
qos: DomainParticipantQos,
config: RuntimeConfig,
) -> Result<DomainParticipant>
pub fn create_participant_with_config( &self, domain_id: DomainId, qos: DomainParticipantQos, config: RuntimeConfig, ) -> Result<DomainParticipant>
Variante mit explizit uebergebener RuntimeConfig (z.B. fuer
Tests mit kurzen SPDP-Periods).
§Errors
DdsError::TransportError wenn die UDP-Sockets nicht binden.
Sourcepub fn create_participant_offline(
&self,
domain_id: DomainId,
qos: DomainParticipantQos,
) -> DomainParticipant
pub fn create_participant_offline( &self, domain_id: DomainId, qos: DomainParticipantQos, ) -> DomainParticipant
Offline-Variante ohne Runtime — nur fuer Unit-Tests die kein Netzwerk wollen. Der zurueckgegebene Participant kann Topics erzeugen, aber keine DataWriter/Reader.
Sourcepub fn lookup_participant(
&self,
domain_id: DomainId,
) -> Option<DomainParticipant>
pub fn lookup_participant( &self, domain_id: DomainId, ) -> Option<DomainParticipant>
Spec §2.2.2.2.2.4 lookup_participant(domain_id) — liefert
einen vorher erzeugten Participant zur gleichen Domain-Id, oder
None wenn keiner registriert ist. Bei mehreren Participants
derselben Domain liefert die Implementation den ersten.
Sourcepub fn delete_participant(&self, p: &DomainParticipant) -> Result<()>
pub fn delete_participant(&self, p: &DomainParticipant) -> Result<()>
Spec §2.2.2.2.2.3 delete_participant. Entfernt den Participant
aus der Factory-Registry und ruft delete_contained_entities
auf. Liefert PreconditionNotMet wenn der Participant nicht
in der Registry ist.
§Errors
DdsError::PreconditionNotMet wenn der Participant nicht
registriert ist.
Sourcepub fn set_default_participant_qos(
&self,
qos: DomainParticipantQos,
) -> Result<()>
pub fn set_default_participant_qos( &self, qos: DomainParticipantQos, ) -> Result<()>
Spec §2.2.2.2.2.5 set_default_participant_qos — Default-QoS
fuer ab jetzt erzeugte Participants.
§Errors
DdsError::PreconditionNotMet bei Lock-Poisoning.
Sourcepub fn get_default_participant_qos(&self) -> DomainParticipantQos
pub fn get_default_participant_qos(&self) -> DomainParticipantQos
Spec §2.2.2.2.2.5 get_default_participant_qos.
Sourcepub fn set_qos(&self, qos: DomainParticipantFactoryQos) -> Result<()>
pub fn set_qos(&self, qos: DomainParticipantFactoryQos) -> Result<()>
Spec §2.2.2.2.2.6 set_qos (Factory-Level QoS).
§Errors
DdsError::PreconditionNotMet bei Lock-Poisoning.
Sourcepub fn get_qos(&self) -> DomainParticipantFactoryQos
pub fn get_qos(&self) -> DomainParticipantFactoryQos
Spec §2.2.2.2.2.6 get_qos (Factory-Level QoS).