multi_endpoint_perf/
multi_endpoint_perf.rs1#![allow(clippy::print_stdout, clippy::print_stderr)]
22
23use std::env;
24use std::time::{Duration, Instant};
25
26use zerodds_dcps::interop::ShapeType;
27use zerodds_dcps::{
28 DataReaderQos, DataWriterQos, DomainParticipantFactory, DomainParticipantQos, PublisherQos,
29 SubscriberQos, TopicQos,
30};
31
32fn topic_name(i: usize) -> String {
33 format!("MultiPerf{i}")
34}
35
36fn run_pub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
37 let factory = DomainParticipantFactory::instance();
38 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
39 let publisher = participant.create_publisher(PublisherQos::default());
40
41 let mut writers = Vec::with_capacity(count);
42 println!("[multi_pub] creating {count} writers");
43 let create_start = Instant::now();
44 for i in 0..count {
45 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
46 let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
47 writers.push(writer);
48 }
49 println!(
50 "[multi_pub] created {count} writers in {:.2}s",
51 create_start.elapsed().as_secs_f64()
52 );
53
54 let start = Instant::now();
55 let mut tick = 0u64;
56 let mut total_writes = 0u64;
57 while start.elapsed() < runtime {
58 let tick_start = Instant::now();
59 for (i, w) in writers.iter().enumerate() {
60 let s = ShapeType::new(format!("MEP{i:04}"), tick as i32, 0, 30);
61 if w.write(&s).is_ok() {
62 total_writes += 1;
63 }
64 }
65 tick += 1;
66 if tick % 60 == 0 {
68 println!(
69 "{:.3} multi_pub tick={} writers={} total_writes={} loop_us={}",
70 start.elapsed().as_secs_f64(),
71 tick,
72 count,
73 total_writes,
74 tick_start.elapsed().as_micros()
75 );
76 }
77 let elapsed = tick_start.elapsed();
78 if elapsed < Duration::from_secs(1) {
79 std::thread::sleep(Duration::from_secs(1) - elapsed);
80 }
81 }
82 println!(
83 "# multi_pub-done: writers={count} total_writes={} runtime={:.3}s",
84 total_writes,
85 start.elapsed().as_secs_f64()
86 );
87 Ok(())
88}
89
90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91 let factory = DomainParticipantFactory::instance();
92 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93 let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95 let mut readers = Vec::with_capacity(count);
96 println!("[multi_sub] creating {count} readers");
97 let create_start = Instant::now();
98 for i in 0..count {
99 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101 readers.push(reader);
102 }
103 println!(
104 "[multi_sub] created {count} readers in {:.2}s",
105 create_start.elapsed().as_secs_f64()
106 );
107
108 let start = Instant::now();
109 let mut total_received = 0u64;
110 let mut last_report = start;
111 while start.elapsed() < runtime {
112 for r in &readers {
113 if let Ok(samples) = r.take() {
114 total_received += samples.len() as u64;
115 }
116 }
117 if last_report.elapsed() >= Duration::from_secs(60) {
118 println!(
119 "{:.3} multi_sub readers={} total_received={}",
120 start.elapsed().as_secs_f64(),
121 count,
122 total_received
123 );
124 last_report = Instant::now();
125 }
126 std::thread::sleep(Duration::from_millis(10));
127 }
128 println!(
129 "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130 total_received,
131 start.elapsed().as_secs_f64()
132 );
133 Ok(())
134}
135
136fn usage() -> ! {
137 eprintln!("usage:");
138 eprintln!(" multi_endpoint_perf pub_n <count> <runtime_secs>");
139 eprintln!(" multi_endpoint_perf sub_n <count> <runtime_secs>");
140 std::process::exit(1);
141}
142
143fn main() -> Result<(), Box<dyn std::error::Error>> {
144 let args: Vec<String> = env::args().collect();
145 if args.len() < 4 {
146 usage();
147 }
148 let mode = args[1].as_str();
149 let count: usize = args[2].parse().unwrap_or_else(|_| usage());
150 let secs: u64 = args[3].parse().unwrap_or_else(|_| usage());
151 match mode {
152 "pub_n" => run_pub_n(count, Duration::from_secs(secs)),
153 "sub_n" => run_sub_n(count, Duration::from_secs(secs)),
154 _ => usage(),
155 }
156}