pub struct DataReader<T: DdsType> { /* private fields */ }Expand description
Typed DataReader — entnimmt Samples, die der RTPS-Reader fuer das Topic empfangen hat.
Live-Mode: rx: Some liefert Samples aus der Runtime-mpsc.
Offline-Mode: in-memory inbox fuer Unit-Tests.
Implementations§
Source§impl<T: DdsType> DataReader<T>
impl<T: DdsType> DataReader<T>
Sourcepub fn with_filter<F>(self, filter: F) -> Self
pub fn with_filter<F>(self, filter: F) -> Self
Setzt einen Content-Filter, der auf jedem Sample im take()-
Pfad evaluiert wird. Rueckgabe false verwirft das Sample.
Builder-Stil: reader.with_filter(|s| s.value > 0).
.7a — SQL-Expression-Syntax via set_filter_expression
folgt in .
Sourcepub fn subscription_handle(&self) -> InstanceHandle
pub fn subscription_handle(&self) -> InstanceHandle
Spec §2.2.2.5.3.6 / §2.2.2.1.1 — InstanceHandle dieses
DataReaders. Stabile Identitaet fuer
DomainParticipant::contains_entity.
Sourcepub fn set_listener(
&self,
listener: Option<ArcDataReaderListener>,
mask: StatusMask,
)
pub fn set_listener( &self, listener: Option<ArcDataReaderListener>, mask: StatusMask, )
setzt den DataReaderListener + StatusMask. None
loescht den Slot. Spec §2.2.2.5.7.x set_listener.
Sourcepub fn get_listener(&self) -> Option<ArcDataReaderListener>
pub fn get_listener(&self) -> Option<ArcDataReaderListener>
aktueller Listener-Klon, sofern vorhanden.
Sourcepub fn qos(&self) -> DataReaderQos
pub fn qos(&self) -> DataReaderQos
Aktuelle QoS (cloned, .1).
Sourcepub fn take(&self) -> Result<Vec<T>>
pub fn take(&self) -> Result<Vec<T>>
Nimmt alle zwischengespeicherten Samples und entfernt sie aus der Inbox. Liefert leeren Vec wenn nichts da ist.
§Errors
WireErrorwenn ein gespeicherter Payload sich nicht mehr decoden laesst (type-eval mismatch).
Examples found in repository?
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28 let factory = DomainParticipantFactory::instance();
29 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32 let subscriber = participant.create_subscriber(SubscriberQos::default());
33 let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35 println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37 loop {
38 match reader.wait_for_data(Duration::from_secs(1)) {
39 Ok(()) => {
40 for sample in reader.take()? {
41 match std::str::from_utf8(&sample.data) {
42 Ok(s) => println!(" <- {s}"),
43 Err(_) => println!(" <- <{} bytes of non-UTF8>", sample.data.len()),
44 }
45 }
46 }
47 Err(DdsError::Timeout) => {} // idle tick
48 Err(e) => return Err(e.into()),
49 }
50 }
51}More examples
18fn main() -> Result<(), Box<dyn std::error::Error>> {
19 let args: Vec<String> = env::args().collect();
20 let topic_name = args.get(1).map_or("Square", String::as_str);
21 let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23 let factory = DomainParticipantFactory::instance();
24 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26 let subscriber = participant.create_subscriber(SubscriberQos::default());
27 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29 println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31 loop {
32 match reader.wait_for_data(Duration::from_secs(1)) {
33 Ok(()) => {
34 for sample in reader.take()? {
35 println!(
36 " <- color={:8} x={:4} y={:4} size={}",
37 sample.color, sample.x, sample.y, sample.shapesize
38 );
39 }
40 }
41 Err(DdsError::Timeout) => {}
42 Err(e) => return Err(e.into()),
43 }
44 }
45}92fn run_sub(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
93 let factory = DomainParticipantFactory::instance();
94 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
95 let topic = participant.create_topic::<ShapeType>(PERF_TOPIC, TopicQos::default())?;
96 let subscriber = participant.create_subscriber(SubscriberQos::default());
97 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
98
99 let total = Arc::new(AtomicU64::new(0));
100 let start = Instant::now();
101 let mut last_report = start;
102 let mut last_total = 0u64;
103
104 while start.elapsed() < runtime {
105 if let Ok(samples) = reader.take() {
106 for _ in samples {
107 total.fetch_add(1, Ordering::Relaxed);
108 }
109 }
110 if last_report.elapsed() >= Duration::from_secs(1) {
111 let now_total = total.load(Ordering::Relaxed);
112 let delta = now_total - last_total;
113 let elapsed_s = last_report.elapsed().as_secs_f64();
114 let rate_ks = (delta as f64 / elapsed_s) / 1000.0;
115 println!(
116 "{:.3} size N total {} delta {} rate {:.2} kS/s",
117 start.elapsed().as_secs_f64(),
118 now_total,
119 delta,
120 rate_ks
121 );
122 last_report = Instant::now();
123 last_total = now_total;
124 }
125 std::thread::sleep(Duration::from_millis(1));
126 }
127 println!(
128 "# sub-done: total={} runtime={:.3}s",
129 total.load(Ordering::Relaxed),
130 start.elapsed().as_secs_f64()
131 );
132 Ok(())
133}
134
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136 // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137 // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138 // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139 // ist der PINGER.
140 let factory = DomainParticipantFactory::instance();
141 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144 let publisher = participant.create_publisher(PublisherQos::default());
145 let subscriber = participant.create_subscriber(SubscriberQos::default());
146 let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147 let reader =
148 subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153 let mut rtts_us: Vec<u64> = Vec::new();
154 let start = Instant::now();
155 let mut seq = 0i32;
156 while start.elapsed() < runtime {
157 let send_us = now_micros();
158 // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159 // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160 // RTT-Berechnung passiert lokal via timestamp-storage.
161 let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162 seq = seq.wrapping_add(1);
163
164 // Wartet bis 50ms auf Pong
165 let deadline = Instant::now() + Duration::from_millis(50);
166 while Instant::now() < deadline {
167 if let Ok(samples) = reader.take() {
168 for s in samples {
169 if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170 let recv_us = now_micros();
171 rtts_us.push(recv_us - send_us);
172 break;
173 }
174 }
175 if !rtts_us.is_empty() && rtts_us.last().is_some() {
176 break;
177 }
178 }
179 std::thread::sleep(Duration::from_micros(100));
180 }
181 std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182 }
183
184 if rtts_us.is_empty() {
185 println!("# pingpong: no RTTs collected — pong missing?");
186 return Ok(());
187 }
188 rtts_us.sort_unstable();
189 let n = rtts_us.len();
190 let mean = rtts_us.iter().sum::<u64>() / n as u64;
191 let p50 = rtts_us[n / 2];
192 let p90 = rtts_us[(n * 9) / 10];
193 let p99 = rtts_us[(n * 99) / 100];
194 println!(
195 "{:.3} rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196 start.elapsed().as_secs_f64(),
197 mean,
198 rtts_us.first().copied().unwrap_or(0),
199 p50,
200 p90,
201 p99,
202 rtts_us.last().copied().unwrap_or(0),
203 n
204 );
205 Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209 // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210 let factory = DomainParticipantFactory::instance();
211 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214 let publisher = participant.create_publisher(PublisherQos::default());
215 let subscriber = participant.create_subscriber(SubscriberQos::default());
216 let reader =
217 subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218 let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222 let start = Instant::now();
223 let mut echoed = 0u64;
224 while start.elapsed() < runtime {
225 if let Ok(samples) = reader.take() {
226 for s in samples {
227 if s.color == "PING" {
228 let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229 echoed += 1;
230 }
231 }
232 }
233 std::thread::sleep(Duration::from_micros(100));
234 }
235 println!(
236 "# pong-done: echoed={echoed} runtime={:.3}s",
237 start.elapsed().as_secs_f64()
238 );
239 Ok(())
240}90fn run_sub_n(count: usize, runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
91 let factory = DomainParticipantFactory::instance();
92 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
93 let subscriber = participant.create_subscriber(SubscriberQos::default());
94
95 let mut readers = Vec::with_capacity(count);
96 println!("[multi_sub] creating {count} readers");
97 let create_start = Instant::now();
98 for i in 0..count {
99 let topic = participant.create_topic::<ShapeType>(&topic_name(i), TopicQos::default())?;
100 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
101 readers.push(reader);
102 }
103 println!(
104 "[multi_sub] created {count} readers in {:.2}s",
105 create_start.elapsed().as_secs_f64()
106 );
107
108 let start = Instant::now();
109 let mut total_received = 0u64;
110 let mut last_report = start;
111 while start.elapsed() < runtime {
112 for r in &readers {
113 if let Ok(samples) = r.take() {
114 total_received += samples.len() as u64;
115 }
116 }
117 if last_report.elapsed() >= Duration::from_secs(60) {
118 println!(
119 "{:.3} multi_sub readers={} total_received={}",
120 start.elapsed().as_secs_f64(),
121 count,
122 total_received
123 );
124 last_report = Instant::now();
125 }
126 std::thread::sleep(Duration::from_millis(10));
127 }
128 println!(
129 "# multi_sub-done: readers={count} total_received={} runtime={:.3}s",
130 total_received,
131 start.elapsed().as_secs_f64()
132 );
133 Ok(())
134}117fn main() -> Result<(), Box<dyn std::error::Error>> {
118 let args: Vec<String> = env::args().collect();
119 let domain_id: i32 = args.get(1).and_then(|s| s.parse().ok()).unwrap_or(0);
120
121 let stop = Arc::new(AtomicBool::new(false));
122 install_signal_handler(stop.clone());
123
124 let factory = DomainParticipantFactory::instance();
125 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
126 let subscriber = participant.create_subscriber(SubscriberQos::default());
127
128 let topics = ["Square", "Circle", "Triangle"];
129 let mut readers = Vec::new();
130 for t in &topics {
131 let topic = participant.create_topic::<ShapeType>(t, TopicQos::default())?;
132 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
133 readers.push((*t, reader));
134 }
135
136 eprintln!(
137 "shapes_demo_viewer: Domain={domain_id} — subscribed to Square / Circle / Triangle. Ctrl-C beendet."
138 );
139 eprintln!("Warte auf Discovery + erste Samples...");
140
141 // Hide cursor + clear screen.
142 print!("\x1B[?25l\x1B[2J");
143 let mut stdout = std::io::stdout();
144 stdout.flush().ok();
145
146 // Pro (topic, color) speichern wir die letzte Position.
147 let mut shapes: HashMap<(String, String), (i32, i32)> = HashMap::new();
148 let mut sample_count: u64 = 0;
149 let mut grid: Vec<Vec<(char, &'static str)>> = vec![vec![(' ', ""); VIEW_W]; VIEW_H];
150
151 while !stop.load(Ordering::Relaxed) {
152 // 1) Take all pending samples per topic.
153 for (topic_name, reader) in &readers {
154 if let Ok(samples) = reader.take() {
155 for sample in samples {
156 shapes.insert(
157 ((*topic_name).to_string(), sample.color.clone()),
158 (sample.x, sample.y),
159 );
160 sample_count += 1;
161 }
162 }
163 }
164
165 // 2) Clear grid.
166 for row in &mut grid {
167 for cell in row {
168 *cell = (' ', "");
169 }
170 }
171
172 // 3) Plot shapes.
173 for ((topic, color), (x, y)) in &shapes {
174 let gx = map_x(*x);
175 let gy = map_y(*y);
176 if gy < VIEW_H && gx < VIEW_W {
177 grid[gy][gx] = (glyph(topic), ansi_color(color));
178 }
179 }
180
181 // 4) Render: home, draw, status line.
182 print!("\x1B[H");
183 // top border
184 print!("┌");
185 for _ in 0..VIEW_W {
186 print!("─");
187 }
188 println!("┐");
189 for row in &grid {
190 print!("│");
191 for (ch, color) in row {
192 if color.is_empty() {
193 print!(" ");
194 } else {
195 print!("{color}{ch}{ANSI_RESET}");
196 }
197 }
198 println!("│");
199 }
200 print!("└");
201 for _ in 0..VIEW_W {
202 print!("─");
203 }
204 println!("┘");
205 println!(
206 "shapes={:3} samples={:6} domain={} — Ctrl-C beendet ",
207 shapes.len(),
208 sample_count,
209 domain_id,
210 );
211 stdout.flush().ok();
212
213 thread::sleep(Duration::from_millis(60));
214 }
215
216 // Show cursor again.
217 print!("\x1B[?25h");
218 println!("[shapes_demo_viewer] beendet. Total samples empfangen: {sample_count}");
219 Ok(())
220}Sourcepub fn read(&self) -> Result<Vec<T>>
pub fn read(&self) -> Result<Vec<T>>
Liest alle Samples ohne sie zu entfernen. aktuell identisch
zu take minus entfernen. Sample-State (ReadCondition
§2.2.2.5.8) folgt im Wire-Up.
§Errors
Wie take.
Sourcepub fn matched_publication_count(&self) -> usize
pub fn matched_publication_count(&self) -> usize
Anzahl matched Remote-Writer. Im Offline-Mode immer 0.
Spec: OMG DDS 1.4 §2.2.2.5.3.15 get_matched_publications.
Seiteneffekt — bei einer Aenderung des Matched-Count
gegenueber dem letzten Aufruf wird on_subscription_matched
via Bubble-Up-Kette gefeuert (Spec §2.2.4.2.6.7).
Sourcepub fn wait_for_matched_publication(
&self,
min_count: usize,
timeout: Duration,
) -> Result<()>
pub fn wait_for_matched_publication( &self, min_count: usize, timeout: Duration, ) -> Result<()>
Blockiert, bis mindestens min_count Remote-Writer matched
sind oder timeout verstreicht. Event-driven via Runtime-Condvar
(D.5e Phase-1) — wakup direkt wenn SEDP einen Match propagiert,
kein 20-ms-Polling mehr.
§Errors
DdsError::Timeout wenn min_count im Zeitfenster nicht
erreicht wird.
Examples found in repository?
135fn run_pingpong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
136 // Beide Rollen in einem Prozess via zwei Topics; Pinger schreibt
137 // PerfPing (mit "send-time"-Marker als shapesize-i32-Diff), Ponger
138 // (zweiter Prozess) liest und echot auf PerfPong. Dieser Prozess
139 // ist der PINGER.
140 let factory = DomainParticipantFactory::instance();
141 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
142 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
143 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
144 let publisher = participant.create_publisher(PublisherQos::default());
145 let subscriber = participant.create_subscriber(SubscriberQos::default());
146 let writer = publisher.create_datawriter::<ShapeType>(&ping_topic, DataWriterQos::default())?;
147 let reader =
148 subscriber.create_datareader::<ShapeType>(&pong_topic, DataReaderQos::default())?;
149
150 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
151 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
152
153 let mut rtts_us: Vec<u64> = Vec::new();
154 let start = Instant::now();
155 let mut seq = 0i32;
156 while start.elapsed() < runtime {
157 let send_us = now_micros();
158 // shapesize codiert die unteren 32 bit von send_us als pseudo-id
159 // (kollisionsanfaellig, aber nur fuer matching im Pong); echte
160 // RTT-Berechnung passiert lokal via timestamp-storage.
161 let _ = writer.write(&ShapeType::new("PING", 0, 0, seq));
162 seq = seq.wrapping_add(1);
163
164 // Wartet bis 50ms auf Pong
165 let deadline = Instant::now() + Duration::from_millis(50);
166 while Instant::now() < deadline {
167 if let Ok(samples) = reader.take() {
168 for s in samples {
169 if s.color == "PONG" && s.shapesize == seq.wrapping_sub(1) {
170 let recv_us = now_micros();
171 rtts_us.push(recv_us - send_us);
172 break;
173 }
174 }
175 if !rtts_us.is_empty() && rtts_us.last().is_some() {
176 break;
177 }
178 }
179 std::thread::sleep(Duration::from_micros(100));
180 }
181 std::thread::sleep(Duration::from_millis(100)); // 10Hz ping rate
182 }
183
184 if rtts_us.is_empty() {
185 println!("# pingpong: no RTTs collected — pong missing?");
186 return Ok(());
187 }
188 rtts_us.sort_unstable();
189 let n = rtts_us.len();
190 let mean = rtts_us.iter().sum::<u64>() / n as u64;
191 let p50 = rtts_us[n / 2];
192 let p90 = rtts_us[(n * 9) / 10];
193 let p99 = rtts_us[(n * 99) / 100];
194 println!(
195 "{:.3} rtt mean {}us min {} 50% {} 90% {} 99% {} max {} cnt {}",
196 start.elapsed().as_secs_f64(),
197 mean,
198 rtts_us.first().copied().unwrap_or(0),
199 p50,
200 p90,
201 p99,
202 rtts_us.last().copied().unwrap_or(0),
203 n
204 );
205 Ok(())
206}
207
208fn run_pong(runtime: Duration) -> Result<(), Box<dyn std::error::Error>> {
209 // Liest PerfPing, schreibt PerfPong mit gleichem shapesize.
210 let factory = DomainParticipantFactory::instance();
211 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
212 let ping_topic = participant.create_topic::<ShapeType>(PING_TOPIC, TopicQos::default())?;
213 let pong_topic = participant.create_topic::<ShapeType>(PONG_TOPIC, TopicQos::default())?;
214 let publisher = participant.create_publisher(PublisherQos::default());
215 let subscriber = participant.create_subscriber(SubscriberQos::default());
216 let reader =
217 subscriber.create_datareader::<ShapeType>(&ping_topic, DataReaderQos::default())?;
218 let writer = publisher.create_datawriter::<ShapeType>(&pong_topic, DataWriterQos::default())?;
219 let _ = writer.wait_for_matched_subscription(1, Duration::from_secs(5));
220 let _ = reader.wait_for_matched_publication(1, Duration::from_secs(5));
221
222 let start = Instant::now();
223 let mut echoed = 0u64;
224 while start.elapsed() < runtime {
225 if let Ok(samples) = reader.take() {
226 for s in samples {
227 if s.color == "PING" {
228 let _ = writer.write(&ShapeType::new("PONG", s.x, s.y, s.shapesize));
229 echoed += 1;
230 }
231 }
232 }
233 std::thread::sleep(Duration::from_micros(100));
234 }
235 println!(
236 "# pong-done: echoed={echoed} runtime={:.3}s",
237 start.elapsed().as_secs_f64()
238 );
239 Ok(())
240}Sourcepub fn requested_deadline_missed_count(&self) -> u64
pub fn requested_deadline_missed_count(&self) -> u64
Counter fuer requested-Deadline-Verletzungen (Spec
§2.2.4.2.11 REQUESTED_DEADLINE_MISSED_STATUS). Monoton steigend;
steigt um 1 pro abgelaufenem Deadline-Fenster ohne empfangenes
Sample. Offline / INFINITE → 0.
feuert ggf. on_requested_deadline_missed.
Sourcepub fn requested_incompatible_qos_status(
&self,
) -> RequestedIncompatibleQosStatus
pub fn requested_incompatible_qos_status( &self, ) -> RequestedIncompatibleQosStatus
aktueller RequestedIncompatibleQosStatus. Spec
§2.2.4.2.6.5. Triggert ggf. on_requested_incompatible_qos.
Sourcepub fn sample_lost_count(&self) -> u64
pub fn sample_lost_count(&self) -> u64
SampleLost-Counter. Spec §2.2.4.2.6.2.
Sourcepub fn sample_rejected_status(&self) -> SampleRejectedStatus
pub fn sample_rejected_status(&self) -> SampleRejectedStatus
SampleRejected-Status. Spec §2.2.4.2.6.3.
Sourcepub fn drive_listeners(&self)
pub fn drive_listeners(&self)
pollt alle Reader-Statuses einmal und feuert pending Listener. Convenience-Helper fuer Tests + periodische Tick-Aufrufer.
Sourcepub fn liveliness_changed_status(&self) -> (bool, u64, u64)
pub fn liveliness_changed_status(&self) -> (bool, u64, u64)
Liveliness-Status des matched Writers (Spec §2.2.4.2.14
LIVELINESS_CHANGED_STATUS): (alive, alive_count, not_alive_count).
alive: aktueller Zustand (true = Writer hat Sample innerhalb seiner Lease-Duration geliefert).alive_count: Zaehler der “not_alive → alive”-Transitions.not_alive_count: Zaehler der “alive → not_alive”-Transitions.
Offline / INFINITE-Lease → (false, 0, 0) / (true, 0, 0) je
nach Init. Fuer v1.3 wird nur LivelinessKind::Automatic ueberwacht.
Sourcepub fn wait_for_data(&self, timeout: Duration) -> Result<()>
pub fn wait_for_data(&self, timeout: Duration) -> Result<()>
Blockiert, bis mindestens ein Sample verfuegbar ist oder der
Timeout abgelaufen ist. Das Sample wird dabei nicht entnommen —
es wird in einen Staging-Buffer gelegt, den der naechste take()
ausliest. Damit bleibt wait_for_data + take() der kanonische
Subscriber-Loop, statt busy-polling im Application-Code.
Spec-Analog: OMG DDS 1.4 §2.2.2.5.8 ReadCondition + WaitSet.
Diese API liefert die wichtigste Semantik (wake-on-data) ohne die
komplette WaitSet/Condition-Infrastruktur.
§Errors
DdsError::Timeout wenn im Zeitfenster nichts ankommt.
Examples found in repository?
27fn main() -> Result<(), Box<dyn std::error::Error>> {
28 let factory = DomainParticipantFactory::instance();
29 let participant = factory.create_participant(0, DomainParticipantQos::default())?;
30
31 let topic = participant.create_topic::<RawBytes>("Chatter", TopicQos::default())?;
32 let subscriber = participant.create_subscriber(SubscriberQos::default());
33 let reader = subscriber.create_datareader::<RawBytes>(&topic, DataReaderQos::default())?;
34
35 println!("hello_dds_subscriber: reading on Domain 0 Topic 'Chatter' — Ctrl-C to stop");
36
37 loop {
38 match reader.wait_for_data(Duration::from_secs(1)) {
39 Ok(()) => {
40 for sample in reader.take()? {
41 match std::str::from_utf8(&sample.data) {
42 Ok(s) => println!(" <- {s}"),
43 Err(_) => println!(" <- <{} bytes of non-UTF8>", sample.data.len()),
44 }
45 }
46 }
47 Err(DdsError::Timeout) => {} // idle tick
48 Err(e) => return Err(e.into()),
49 }
50 }
51}More examples
18fn main() -> Result<(), Box<dyn std::error::Error>> {
19 let args: Vec<String> = env::args().collect();
20 let topic_name = args.get(1).map_or("Square", String::as_str);
21 let domain_id: i32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(0);
22
23 let factory = DomainParticipantFactory::instance();
24 let participant = factory.create_participant(domain_id, DomainParticipantQos::default())?;
25 let topic = participant.create_topic::<ShapeType>(topic_name, TopicQos::default())?;
26 let subscriber = participant.create_subscriber(SubscriberQos::default());
27 let reader = subscriber.create_datareader::<ShapeType>(&topic, DataReaderQos::default())?;
28
29 println!("shapes_demo_subscriber: Topic={topic_name} Domain={domain_id} — Ctrl-C to stop");
30
31 loop {
32 match reader.wait_for_data(Duration::from_secs(1)) {
33 Ok(()) => {
34 for sample in reader.take()? {
35 println!(
36 " <- color={:8} x={:4} y={:4} size={}",
37 sample.color, sample.x, sample.y, sample.shapesize
38 );
39 }
40 }
41 Err(DdsError::Timeout) => {}
42 Err(e) => return Err(e.into()),
43 }
44 }
45}Sourcepub fn instance_tracker(&self) -> InstanceTracker
pub fn instance_tracker(&self) -> InstanceTracker
Liefert den aktuellen InstanceTracker (geteilt mit der
internen Buchhaltung). Hauptsaechlich fuer Tests / Inspection.
Sourcepub fn notify_writer_liveliness_lost(&self, writer_guid: [u8; 16]) -> usize
pub fn notify_writer_liveliness_lost(&self, writer_guid: [u8; 16]) -> usize
Spec §2.2.3.23 — Hook fuer “Writer X hat Liveliness verloren”. Macht zwei Dinge:
- clear OWNERSHIP=EXCLUSIVE-Owner fuer alle Instanzen, deren
Owner dieser Writer war (so dass der naechste Sample eines
anderen Writers via
should_accept_sample_under_exclusive_ownershipneu gewinnen kann); - liefert die Anzahl betroffener Instanzen zurueck.
Wird aus dem WLP-Pfad gerufen, sobald ein Writer-Lease abgelaufen
ist (siehe wlp::WlpEndpoint::lost_peers).
Sourcepub fn notify_participant_liveliness_lost(&self, prefix: [u8; 12]) -> usize
pub fn notify_participant_liveliness_lost(&self, prefix: [u8; 12]) -> usize
Wie Self::notify_writer_liveliness_lost, aber Match nur ueber
die ersten 12 Bytes (GuidPrefix). Erlaubt Failover, wenn nur die
Participant-Identitaet (z.B. bei SPDP-Lease-Expiry) bekannt ist.
Sourcepub fn lookup_instance(&self, instance: &T) -> InstanceHandle
pub fn lookup_instance(&self, instance: &T) -> InstanceHandle
Macht aus einem Sample-Wert den dazugehoerigen lokalen
InstanceHandle, oder HANDLE_NIL wenn unbekannt /
non-keyed. Spec §2.2.2.5.3.26 lookup_instance (Reader-Variante).
Sourcepub fn get_key_value(&self, handle: InstanceHandle) -> Result<T>
pub fn get_key_value(&self, handle: InstanceHandle) -> Result<T>
Spec §2.2.2.5.3.25 get_key_value. Liefert den Sample-Wert mit
nur den @key-Feldern befuellt (rekonstruiert aus dem
gespeicherten Key-Holder via T::decode).
§Errors
BadParameter wenn handle unbekannt; WireError wenn
T::decode den Key-Stream nicht rekonstruieren kann.
Sourcepub fn take_with_info(&self) -> Result<Vec<Sample<T>>>
pub fn take_with_info(&self) -> Result<Vec<Sample<T>>>
take mit voller SampleInfo. Spec §2.2.2.5.3.5
take. Konsumiert die Samples aus dem Cache (NOT_READ → READ-
Transition entfaellt, weil sie weg sind).
§Errors
Wie Self::take.
Sourcepub fn read_with_info(&self) -> Result<Vec<Sample<T>>>
pub fn read_with_info(&self) -> Result<Vec<Sample<T>>>
read mit voller SampleInfo. Konsumiert nicht — markiert
die Samples nur als READ (Spec §2.2.2.5.3.4).
§Errors
Wie Self::read.
Sourcepub fn take_filtered(
&self,
sample_mask: u32,
view_mask: u32,
instance_mask: u32,
) -> Result<Vec<Sample<T>>>
pub fn take_filtered( &self, sample_mask: u32, view_mask: u32, instance_mask: u32, ) -> Result<Vec<Sample<T>>>
Sourcepub fn read_filtered(
&self,
sample_mask: u32,
view_mask: u32,
instance_mask: u32,
) -> Result<Vec<Sample<T>>>
pub fn read_filtered( &self, sample_mask: u32, view_mask: u32, instance_mask: u32, ) -> Result<Vec<Sample<T>>>
Sourcepub fn read_w_condition(
&self,
condition: &Arc<QueryCondition>,
) -> Result<Vec<Sample<T>>>
pub fn read_w_condition( &self, condition: &Arc<QueryCondition>, ) -> Result<Vec<Sample<T>>>
read_w_condition (Spec §2.2.2.5.3.7) — wendet zusaetzlich zur
State-Mask den SQL-Filter der QueryCondition pro Sample an.
Samples bleiben im Cache (Sample-State NOT_READ → READ).
§Errors
PreconditionNotMet bei Lock-Poisoning oder SQL-Eval-Fehler.
Sourcepub fn take_w_condition(
&self,
condition: &Arc<QueryCondition>,
) -> Result<Vec<Sample<T>>>
pub fn take_w_condition( &self, condition: &Arc<QueryCondition>, ) -> Result<Vec<Sample<T>>>
take_w_condition (Spec §2.2.2.5.3.8) — wie read_w_condition,
aber konsumiert die Samples (entfernt aus dem Cache).
§Errors
PreconditionNotMet bei Lock-Poisoning oder SQL-Eval-Fehler.
Sourcepub fn read_instance(&self, handle: InstanceHandle) -> Result<Vec<Sample<T>>>
pub fn read_instance(&self, handle: InstanceHandle) -> Result<Vec<Sample<T>>>
read_instance (Spec §2.2.2.5.3.27). Liefert nur Samples der
angegebenen Instanz.
§Errors
BadParameter wenn handle == HANDLE_NIL.
Sourcepub fn take_instance(&self, handle: InstanceHandle) -> Result<Vec<Sample<T>>>
pub fn take_instance(&self, handle: InstanceHandle) -> Result<Vec<Sample<T>>>
take_instance (Spec §2.2.2.5.3.27, Take-Variante). Konsumiert.
§Errors
BadParameter wenn handle == HANDLE_NIL.
Sourcepub fn read_next_instance(
&self,
previous: InstanceHandle,
) -> Result<Vec<Sample<T>>>
pub fn read_next_instance( &self, previous: InstanceHandle, ) -> Result<Vec<Sample<T>>>
read_next_instance (Spec §2.2.2.5.3.28). Liefert die Samples
der naechsten Instanz (nach Sortier-Ordnung) hinter
previous.
previous == HANDLE_NIL startet beim ersten Handle.
§Errors
Wie read.
Sourcepub fn take_next_instance(
&self,
previous: InstanceHandle,
) -> Result<Vec<Sample<T>>>
pub fn take_next_instance( &self, previous: InstanceHandle, ) -> Result<Vec<Sample<T>>>
Trait Implementations§
Source§impl<T: DdsType> Debug for DataReader<T>
impl<T: DdsType> Debug for DataReader<T>
Source§impl<T: DdsType> Entity for DataReader<T>
Available on crate feature std only.
impl<T: DdsType> Entity for DataReader<T>
std only.Source§fn set_qos(&self, qos: Self::Qos) -> Result<()>
fn set_qos(&self, qos: Self::Qos) -> Result<()>
Spec §2.2.3 / §2.2.2.5.3: DURABILITY, RELIABILITY, HISTORY, RESOURCE_LIMITS, OWNERSHIP sind Changeable=NO post-enable.
Source§type Qos = DataReaderQos
type Qos = DataReaderQos
DomainParticipantQos,
DataWriterQos, …).Source§fn enable(&self) -> Result<()>
fn enable(&self) -> Result<()>
enable. Read moreSource§fn entity_state(&self) -> Arc<EntityState> ⓘ
fn entity_state(&self) -> Arc<EntityState> ⓘ
Arc<EntityState>.Source§fn is_enabled(&self) -> bool
fn is_enabled(&self) -> bool
Source§fn get_status_condition(&self) -> StatusCondition
fn get_status_condition(&self) -> StatusCondition
StatusCondition dieser Entity.
Spec §2.2.2.1.6 get_status_condition.Source§fn get_status_changes(&self) -> StatusMask
fn get_status_changes(&self) -> StatusMask
get_status_changes.Source§fn get_instance_handle(&self) -> InstanceHandle
fn get_instance_handle(&self) -> InstanceHandle
get_instance_handle.