Skip to main content

zerodds_perf/
zerodds_perf.rs

1//! ZeroDDS Performance-Bench-Binary (CI-3b Welle).
2//!
3//! Drei Modi:
4//! * `zerodds_perf pub <size> <runtime_secs>` — schreibt ShapeType-Samples
5//!   so schnell wie möglich (write-best-effort), zählt total+rate.
6//! * `zerodds_perf sub <runtime_secs>` — liest und zählt empfangene Samples,
7//!   misst rate alle Sekunde.
8//! * `zerodds_perf pingpong <runtime_secs>` — Round-Trip-Time-Test:
9//!   sendet PING über Topic "PerfPing", wartet auf PONG über "PerfPong".
10//!   Misst RTT in µs (timestamp encoded in shapesize i32).
11//!
12//! # Output-Format
13//!
14//! ddsperf-kompatible Form (dass `llvm_bench_runner.sh` denselben Regex-
15//! Parser nutzen kann):
16//! ```text
17//! 1.000  size 1024 total 1234 rate 1234.5 kS/s 9876.5 Mb/s
18//! 1.000  rtt mean 145us min 100 50% 130 90% 180 99% 250 max 500
19//! ```
20
21#![allow(clippy::print_stdout, clippy::print_stderr)]
22
23use std::env;
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27
28use zerodds_dcps::interop::ShapeType;
29use zerodds_dcps::{
30    DataReaderQos, DataWriterQos, DomainParticipantFactory, DomainParticipantQos, PublisherQos,
31    SubscriberQos, TopicQos,
32};
33
34const PERF_TOPIC: &str = "PerfTest";
35const PING_TOPIC: &str = "PerfPing";
36const PONG_TOPIC: &str = "PerfPong";
37
38fn now_micros() -> u64 {
39    SystemTime::now()
40        .duration_since(UNIX_EPOCH)
41        .unwrap_or_default()
42        .as_micros() as u64
43}
44
45fn run_pub(size: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
46    let factory = DomainParticipantFactory::instance();
47    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
48    let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
49    let publisher = participant.create_publisher(PublisherQos::default());
50    let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
51
52    // Discovery-Wait
53    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
54
55    let payload_color = "X".repeat(size.saturating_sub(4 * 4)); // size minus i32-Felder approx
56    let start = Instant::now();
57    let mut total = 0u64;
58    let mut last_report = start;
59    let mut samples_since_report = 0u64;
60
61    while start.elapsed() < runtime {
62        let s = ShapeType::new(payload_color.clone(), 0, 0, total as i32);
63        if writer.write(&s).is_ok() {
64            total += 1;
65            samples_since_report += 1;
66        }
67        // Report jede Sekunde
68        if last_report.elapsed() >= Duration::from_secs(1) {
69            let elapsed_s = last_report.elapsed().as_secs_f64();
70            let rate_ks = (samples_since_report as f64 / elapsed_s) / 1000.0;
71            let rate_mb =
72                (samples_since_report as f64 * size as f64 * 8.0 / elapsed_s) / 1_000_000.0;
73            println!(
74                "{:.3}  size {} total {} rate {:.2} kS/s {:.2} Mb/s",
75                start.elapsed().as_secs_f64(),
76                size,
77                total,
78                rate_ks,
79                rate_mb
80            );
81            last_report = Instant::now();
82            samples_since_report = 0;
83        }
84    }
85    println!(
86        "# pub-done: total={total} runtime={:.3}s",
87        start.elapsed().as_secs_f64()
88    );
89    Ok(())
90}
91
92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93    let factory = DomainParticipantFactory::instance();
94    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95    let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96    let subscriber = participant.create_subscriber(SubscriberQos::default());
97    let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99    let total = Arc::new(AtomicU64::new(0));
100    let start = Instant::now();
101    let mut last_report = start;
102    let mut last_total = 0u64;
103
104    while start.elapsed() < runtime {
105        if let Ok(samples) = reader.take() {
106            for _ in samples {
107                total.fetch_add(1, Ordering::Relaxed);
108            }
109        }
110        if last_report.elapsed() >= Duration::from_secs(1) {
111            let now_total = total.load(Ordering::Relaxed);
112            let delta = now_total - last_total;
113            let elapsed_s = last_report.elapsed().as_secs_f64();
114            let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115            println!(
116                "{:.3}  size N total {} delta {} rate {:.2} kS/s",
117                start.elapsed().as_secs_f64(),
118                now_total,
119                delta,
120                rate_ks
121            );
122            last_report = Instant::now();
123            last_total = now_total;
124        }
125        std::thread::sleep(Duration::from_millis(1));
126    }
127    println!(
128        "# sub-done: total={} runtime={:.3}s",
129        total.load(Ordering::Relaxed),
130        start.elapsed().as_secs_f64()
131    );
132    Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136    // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137    // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138    // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139    // ist der PINGER.
140    let factory = DomainParticipantFactory::instance();
141    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142    let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143    let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144    let publisher = participant.create_publisher(PublisherQos::default());
145    let subscriber = participant.create_subscriber(SubscriberQos::default());
146    let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147    let reader =
148        subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151    let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153    let mut rtts_us: Vec<u64> = Vec::new();
154    let start = Instant::now();
155    let mut seq = 0i32;
156    while start.elapsed() < runtime {
157        let send_us = now_micros();
158        // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159        // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160        // RTT-Berechnung passiert lokal via timestamp-storage.
161        let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162        seq = seq.wrapping_add(1);
163
164        // Wartet bis 50ms auf Pong
165        let deadline = Instant::now() + Duration::from_millis(50);
166        while Instant::now() < deadline {
167            if let Ok(samples) = reader.take() {
168                for s in samples {
169                    if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170                        let recv_us = now_micros();
171                        rtts_us.push(recv_us - send_us);
172                        break;
173                    }
174                }
175                if !rtts_us.is_empty() && rtts_us.last().is_some() {
176                    break;
177                }
178            }
179            std::thread::sleep(Duration::from_micros(100));
180        }
181        std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182    }
183
184    if rtts_us.is_empty() {
185        println!("# pingpong: no RTTs collected — pong missing?");
186        return Ok(());
187    }
188    rtts_us.sort_unstable();
189    let n = rtts_us.len();
190    let mean = rtts_us.iter().sum::<u64>() / n as u64;
191    let p50 = rtts_us[n / 2];
192    let p90 = rtts_us[(n * 9) / 10];
193    let p99 = rtts_us[(n * 99) / 100];
194    println!(
195        "{:.3}  rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196        start.elapsed().as_secs_f64(),
197        mean,
198        rtts_us.first().copied().unwrap_or(0),
199        p50,
200        p90,
201        p99,
202        rtts_us.last().copied().unwrap_or(0),
203        n
204    );
205    Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209    // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210    let factory = DomainParticipantFactory::instance();
211    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212    let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213    let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214    let publisher = participant.create_publisher(PublisherQos::default());
215    let subscriber = participant.create_subscriber(SubscriberQos::default());
216    let reader =
217        subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218    let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219    let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220    let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222    let start = Instant::now();
223    let mut echoed = 0u64;
224    while start.elapsed() < runtime {
225        if let Ok(samples) = reader.take() {
226            for s in samples {
227                if s.color == "PING" {
228                    let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229                    echoed += 1;
230                }
231            }
232        }
233        std::thread::sleep(Duration::from_micros(100));
234    }
235    println!(
236        "# pong-done: echoed={echoed} runtime={:.3}s",
237        start.elapsed().as_secs_f64()
238    );
239    Ok(())
240}
241
242fn usage() -> ! {
243    eprintln!("usage:");
244    eprintln!("  zerodds_perf pub <size_bytes> <runtime_secs>");
245    eprintln!("  zerodds_perf sub <runtime_secs>");
246    eprintln!("  zerodds_perf pingpong <runtime_secs>");
247    eprintln!("  zerodds_perf pong <runtime_secs>");
248    std::process::exit(1);
249}
250
251fn main() -> Result<(), Box<dyn std::error::Error>> {
252    let args: Vec<String> = env::args().collect();
253    if args.len() < 2 {
254        usage();
255    }
256    let mode = args[1].as_str();
257    match mode {
258        "pub" => {
259            let size: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(1024);
260            let secs: u64 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(10);
261            run_pub(size, Duration::from_secs(secs))
262        }
263        "sub" => {
264            let secs: u64 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(10);
265            run_sub(Duration::from_secs(secs))
266        }
267        "pingpong" => {
268            let secs: u64 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(30);
269            run_pingpong(Duration::from_secs(secs))
270        }
271        "pong" => {
272            let secs: u64 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(30);
273            run_pong(Duration::from_secs(secs))
274        }
275        _ => usage(),
276    }
277}