Skip to main content

multi_endpoint_perf/
multi_endpoint_perf.rs

1//! Multi-Endpoint-Soak (CI-4c Welle).
2//!
3//! Erzeugt N Topics + N Writer (Mode `pub_n`) bzw. N Reader (Mode `sub_n`)
4//! in **einem** DomainParticipant. Stress-Test fuer Endpoint-Skalierung
5//! im DDS-Runtime: WriterCache, ReaderCache, SEDP-Endpoint-Liste,
6//! Discovery-Match-Loops mit N x N Match-Punkten.
7//!
8//! # Modi
9//!
10//! * `pub_n <count> <runtime_secs>` โ€” N Writer auf Topics
11//!   `MultiPerf0`..`MultiPerf{N-1}`, jeder schreibt 1 Sample/Sekunde.
12//! * `sub_n <count> <runtime_secs>` โ€” N Reader auf den gleichen Topics,
13//!   zaehlt empfangene Samples. Output alle 60 s mit total + per-Topic-
14//!   delta.
15//!
16//! # Auswahl von N
17//!
18//! Default-CI-4c-Welle: N=100. Empirisch tragbar bei 16+ GB RAM
19//! (jeder Reader/Writer ~50-200 KB History-Cache + Discovery-State).
20
21#![allow(clippy::print_stdout, clippy::print_stderr)]
22
23use std::env;
24use std::time::{Duration, Instant};
25
26use zerodds_dcps::interop::ShapeType;
27use zerodds_dcps::{
28    DataReaderQos, DataWriterQos, DomainParticipantFactory, DomainParticipantQos, PublisherQos,
29    SubscriberQos, TopicQos,
30};
31
32fn topic_name(i: usize) -> String {
33    format!("MultiPerf{i}")
34}
35
36fn run_pub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
37    let factory = DomainParticipantFactory::instance();
38    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
39    let publisher = participant.create_publisher(PublisherQos::default());
40
41    let mut writers = Vec::with_capacity(count);
42    println!("[multi_pub] creating {count} writers");
43    let create_start = Instant::now();
44    for i in 0..count {
45        let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
46        let writer = publisher.create_datawriter::<ShapeType>(&topic, DataWriterQos::default())?;
47        writers.push(writer);
48    }
49    println!(
50        "[multi_pub] created {count} writers in {:.2}s",
51        create_start.elapsed().as_secs_f64()
52    );
53
54    let start = Instant::now();
55    let mut tick = 0u64;
56    let mut total_writes = 0u64;
57    while start.elapsed() < runtime {
58        let tick_start = Instant::now();
59        for (i, w) in writers.iter().enumerate() {
60            let s = ShapeType::new(format!("MEP{i:04}"), tick as i32, 0, 30);
61            if w.write(&s).is_ok() {
62                total_writes += 1;
63            }
64        }
65        tick += 1;
66        // 1 Hz pro Writer โ€” schlaeft bis zur naechsten Sekunde
67        if tick % 60 == 0 {
68            println!(
69                "{:.3}  multi_pub tick={} writers={} total_writes={} loop_us={}",
70                start.elapsed().as_secs_f64(),
71                tick,
72                count,
73                total_writes,
74                tick_start.elapsed().as_micros()
75            );
76        }
77        let elapsed = tick_start.elapsed();
78        if elapsed < Duration::from_secs(1) {
79            std::thread::sleep(Duration::from_secs(1) - elapsed);
80        }
81    }
82    println!(
83        "# multi_pub-done: writers={count} total_writes={} runtime={:.3}s",
84        total_writes,
85        start.elapsed().as_secs_f64()
86    );
87    Ok(())
88}
89
90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91    let factory = DomainParticipantFactory::instance();
92    let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93    let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95    let mut readers = Vec::with_capacity(count);
96    println!("[multi_sub] creating {count} readers");
97    let create_start = Instant::now();
98    for i in 0..count {
99        let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100        let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101        readers.push(reader);
102    }
103    println!(
104        "[multi_sub] created {count} readers in {:.2}s",
105        create_start.elapsed().as_secs_f64()
106    );
107
108    let start = Instant::now();
109    let mut total_received = 0u64;
110    let mut last_report = start;
111    while start.elapsed() < runtime {
112        for r in &readers {
113            if let Ok(samples) = r.take() {
114                total_received += samples.len() as u64;
115            }
116        }
117        if last_report.elapsed() >= Duration::from_secs(60) {
118            println!(
119                "{:.3}  multi_sub readers={} total_received={}",
120                start.elapsed().as_secs_f64(),
121                count,
122                total_received
123            );
124            last_report = Instant::now();
125        }
126        std::thread::sleep(Duration::from_millis(10));
127    }
128    println!(
129        "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130        total_received,
131        start.elapsed().as_secs_f64()
132    );
133    Ok(())
134}
135
136fn usage() -> ! {
137    eprintln!("usage:");
138    eprintln!("  multi_endpoint_perf pub_n <count> <runtime_secs>");
139    eprintln!("  multi_endpoint_perf sub_n <count> <runtime_secs>");
140    std::process::exit(1);
141}
142
143fn main() -> Result<(), Box<dyn std::error::Error>> {
144    let args: Vec<String> = env::args().collect();
145    if args.len() < 4 {
146        usage();
147    }
148    let mode = args[1].as_str();
149    let count: usize = args[2].parse().unwrap_or_else(|_| usage());
150    let secs: u64 = args[3].parse().unwrap_or_else(|_| usage());
151    match mode {
152        "pub_n" => run_pub_n(count, Duration::from_secs(secs)),
153        "sub_n" => run_sub_n(count, Duration::from_secs(secs)),
154        _ => usage(),
155    }
156}