cyclonedds/read_condition.rs
1use crate::internal::ffi;
2use crate::{Reader, Result, State};
3
4/// A filter on a [`Reader`](crate::Reader) that restricts samples by their
5/// [`State`](crate::State).
6///
7/// A `ReadCondition` is created against a reader with a state mask and can be
8/// attached to a [`WaitSet`](crate::WaitSet) to trigger when matching samples
9/// become available. Reading via the condition returns only samples whose
10/// combined sample, view, and instance state matches the mask.
11///
12/// # Examples
13///
14/// ```no_run
15/// use cyclonedds::state;
16/// use cyclonedds::{Duration, ReadCondition, WaitSet};
17/// # use cyclonedds::{Domain, Participant, Topic, Reader};
18/// # let domain = Domain::default();
19/// # let participant = Participant::new(&domain)?;
20/// # #[derive(
21/// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
22/// # )]
23/// # struct Data {
24/// # x: i32,
25/// # y: i32,
26/// # }
27///
28/// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
29/// let reader = Reader::new(&topic)?;
30///
31/// let condition = ReadCondition::new(
32/// &reader,
33/// state::sample::Fresh | state::instance::Any | state::view::Any,
34/// )?;
35/// let mut waitset = WaitSet::<()>::new(&participant)?;
36/// waitset.attach(&condition, None)?;
37/// waitset.wait(Duration::INFINITE)?;
38///
39/// let samples = condition.take()?;
40/// # Ok::<_, cyclonedds::Error>(())
41/// ```
42#[derive(Debug)]
43pub struct ReadCondition<'domain, 'participant, 'topic, 'reader, T>
44where
45 T: crate::Topicable,
46{
47 pub(crate) inner: cyclonedds_sys::dds_entity_t,
48 phantom: std::marker::PhantomData<&'reader Reader<'domain, 'participant, 'topic, T>>,
49}
50
51impl<'d, 'p, 't, 'r, T> ReadCondition<'d, 'p, 't, 'r, T>
52where
53 T: crate::Topicable,
54{
55 /// Creates a new [`ReadCondition`] on `reader` that matches samples whose
56 /// state satisfies `mask`.
57 ///
58 /// # Errors
59 ///
60 /// Returns an [`Error`](crate::Error) if the read condition fails to
61 /// create.
62 ///
63 /// # Examples
64 ///
65 /// ```
66 /// use cyclonedds::{ReadCondition, state};
67 /// # use cyclonedds::{Domain, Participant, Topic, Reader};
68 /// # let domain = Domain::default();
69 /// # let participant = Participant::new(&domain)?;
70 /// # #[derive(
71 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
72 /// # )]
73 /// # struct Data {
74 /// # x: i32,
75 /// # y: i32,
76 /// # }
77 ///
78 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
79 /// let reader = Reader::new(&topic)?;
80 /// let condition = ReadCondition::new(&reader, state::sample::Fresh)?;
81 /// # Ok::<_, cyclonedds::Error>(())
82 /// ```
83 pub fn new(reader: &'r Reader<'d, 'p, 't, T>, mask: State) -> Result<Self> {
84 let inner = ffi::dds_create_readcondition(reader.inner, mask.bits())?;
85 Ok(Self {
86 inner,
87 phantom: std::marker::PhantomData,
88 })
89 }
90
91 /// Returns the state mask this condition was created with.
92 ///
93 /// # Errors
94 ///
95 /// Returns an [`Error`](crate::Error) if the mask returned by the read
96 /// condition is invalid.
97 ///
98 /// # Examples
99 ///
100 /// ```
101 /// use cyclonedds::{ReadCondition, state};
102 /// # use cyclonedds::{Domain, Participant, Topic, Reader};
103 /// # let domain = Domain::default();
104 /// # let participant = Participant::new(&domain)?;
105 /// # #[derive(
106 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
107 /// # )]
108 /// # struct Data {
109 /// # x: i32,
110 /// # y: i32,
111 /// # }
112 ///
113 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
114 /// let reader = Reader::new(&topic)?;
115 /// let condition = ReadCondition::new(&reader, state::sample::Fresh)?;
116 /// assert_eq!(condition.mask()?, state::sample::Fresh);
117 /// # Ok::<_, cyclonedds::Error>(())
118 /// ```
119 pub fn mask(&self) -> Result<State> {
120 let mask = ffi::dds_get_mask(self.inner)?;
121 crate::state::State::from_bits(mask).ok_or(crate::error::Error::NonSpecific)
122 }
123
124 /// Returns `true` if this condition is currently triggered.
125 ///
126 /// A condition is triggered when samples matching its mask are available
127 /// in the reader cache.
128 ///
129 /// # Errors
130 ///
131 /// Returns an [`Error`](crate::Error) if the read condition fails to read
132 /// the trigger state.
133 ///
134 /// # Examples
135 ///
136 /// ```
137 /// use cyclonedds::{ReadCondition, state};
138 /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
139 /// # let domain = Domain::default();
140 /// # let participant = Participant::new(&domain)?;
141 /// # #[derive(
142 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
143 /// # )]
144 /// # struct Data {
145 /// # x: i32,
146 /// # y: i32,
147 /// # }
148 ///
149 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
150 /// let reader = Reader::new(&topic)?;
151 /// let writer = Writer::new(&topic)?;
152 ///
153 /// let condition = ReadCondition::new(&reader, state::sample::Fresh)?;
154 /// writer.write(&Data::default())?;
155 /// assert!(condition.triggered()?);
156 /// Ok::<_, cyclonedds::Error>(())
157 /// ```
158 pub fn triggered(&self) -> Result<bool> {
159 ffi::dds_triggered(self.inner)
160 }
161
162 /// Removes and returns all samples matching this condition's mask from the
163 /// reader cache.
164 ///
165 /// # Errors
166 ///
167 /// Returns an [`Error`](crate::Error) if the read condition fails to take
168 /// samples.
169 ///
170 /// # Examples
171 ///
172 /// ```
173 /// use cyclonedds::{ReadCondition, state};
174 /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
175 /// # let domain = Domain::default();
176 /// # let participant = Participant::new(&domain)?;
177 /// # #[derive(
178 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
179 /// # )]
180 /// # struct Data {
181 /// # x: i32,
182 /// # y: i32,
183 /// # }
184 ///
185 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
186 /// let reader = Reader::new(&topic)?;
187 /// let writer = Writer::new(&topic)?;
188 ///
189 /// let condition = ReadCondition::new(
190 /// &reader,
191 /// state::sample::Stale | state::instance::Any | state::view::Any,
192 /// )?;
193 /// writer.write(&Data::default())?;
194 ///
195 /// // No sample matches this state initially.
196 /// let samples = condition.take()?;
197 /// assert_eq!(samples.len(), 0);
198 ///
199 /// // Attempt a normal read.
200 /// assert_eq!(reader.read()?.len(), 1);
201 ///
202 /// // Sample should now match this state because they're stale.
203 /// let samples = condition.take()?;
204 /// assert_eq!(samples.len(), 1);
205 ///
206 /// // Samples should be removed from the cache.
207 /// assert_eq!(condition.take()?.len(), 0);
208 /// # Ok::<_, cyclonedds::Error>(())
209 /// ```
210 pub fn take(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
211 where
212 T: std::clone::Clone,
213 {
214 ffi::dds_take(self.inner)
215 }
216
217 /// Returns all samples matching this condition's mask without removing
218 /// them from the reader cache.
219 ///
220 /// # Errors
221 ///
222 /// Returns an [`Error`](crate::Error) if the read condition fails to read
223 /// samples.
224 ///
225 /// # Examples
226 ///
227 /// ```
228 /// use cyclonedds::{ReadCondition, state};
229 /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
230 /// # let domain = Domain::default();
231 /// # let participant = Participant::new(&domain)?;
232 /// # #[derive(
233 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
234 /// # )]
235 /// # struct Data {
236 /// # x: i32,
237 /// # y: i32,
238 /// # }
239 ///
240 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
241 /// let reader = Reader::new(&topic)?;
242 /// let writer = Writer::new(&topic)?;
243 ///
244 /// let condition = ReadCondition::new(
245 /// &reader,
246 /// state::sample::Stale | state::instance::Any | state::view::Any,
247 /// )?;
248 /// writer.write(&Data::default())?;
249 ///
250 /// // No sample matches this state initially.
251 /// let samples = condition.read()?;
252 /// assert_eq!(samples.len(), 0);
253 ///
254 /// // Attempt a normal read.
255 /// assert_eq!(reader.read()?.len(), 1);
256 ///
257 /// // Sample should now match this state because they're stale.
258 /// let samples = condition.read()?;
259 /// assert_eq!(samples.len(), 1);
260 ///
261 /// // Samples remain in the cache.
262 /// assert_eq!(condition.read()?.len(), 1);
263 /// # Ok::<_, cyclonedds::Error>(())
264 /// ```
265 pub fn read(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
266 where
267 T: std::clone::Clone,
268 {
269 ffi::dds_read(self.inner)
270 }
271
272 /// Returns all samples matching this condition's mask without marking them
273 /// as read or removing them from the cache.
274 ///
275 /// # Errors
276 ///
277 /// Returns an [`Error`](crate::Error) if the read condition fails to peek
278 /// samples.
279 ///
280 /// # Examples
281 ///
282 /// ```
283 /// use cyclonedds::{ReadCondition, state};
284 /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
285 /// # let domain = Domain::default();
286 /// # let participant = Participant::new(&domain)?;
287 /// # #[derive(
288 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
289 /// # )]
290 /// # struct Data {
291 /// # x: i32,
292 /// # y: i32,
293 /// # }
294 ///
295 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
296 /// let reader = Reader::new(&topic)?;
297 /// let writer = Writer::new(&topic)?;
298 ///
299 /// let condition = ReadCondition::new(
300 /// &reader,
301 /// state::sample::Stale | state::instance::Any | state::view::Any,
302 /// )?;
303 /// writer.write(&Data::default())?;
304 ///
305 /// // No sample matches this state initially.
306 /// let samples = condition.peek()?;
307 /// assert_eq!(samples.len(), 0);
308 ///
309 /// // Attempt a normal read.
310 /// assert_eq!(reader.read()?.len(), 1);
311 ///
312 /// // Sample should now match this state because they're stale.
313 /// let samples = condition.peek()?;
314 /// assert_eq!(samples.len(), 1);
315 ///
316 /// // Samples remain in the cache.
317 /// assert_eq!(condition.peek()?.len(), 1);
318 /// # Ok::<_, cyclonedds::Error>(())
319 /// ```
320 pub fn peek(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
321 where
322 T: std::clone::Clone,
323 {
324 ffi::dds_peek(self.inner)
325 }
326}
327
328impl<T> Drop for ReadCondition<'_, '_, '_, '_, T>
329where
330 T: crate::Topicable,
331{
332 fn drop(&mut self) {
333 let result = ffi::dds_delete(self.inner);
334 debug_assert!(
335 result.is_ok(),
336 "unable to delete {self:?}: failed with {result:?}"
337 );
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::state;
345
346 #[test]
347 fn test_read_condition_create() {
348 let domain_id = crate::tests::domain::unique_id();
349 let domain = crate::Domain::new(domain_id).unwrap();
350 let topic_name = crate::tests::topic::unique_name();
351 let participant = crate::Participant::new(&domain).unwrap();
352 let topic =
353 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
354 let reader = crate::Reader::new(&topic).unwrap();
355 let _ = ReadCondition::new(
356 &reader,
357 state::sample::Any | state::instance::Any | state::view::Any,
358 )
359 .unwrap();
360 }
361
362 #[test]
363 fn test_read_condition_create_with_invalid_reader() {
364 let domain_id = crate::tests::domain::unique_id();
365 let domain = crate::Domain::new(domain_id).unwrap();
366 let topic_name = crate::tests::topic::unique_name();
367 let participant = crate::Participant::new(&domain).unwrap();
368 let topic =
369 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
370 let mut reader = crate::Reader::new(&topic).unwrap();
371 let reader_id = reader.inner;
372 reader.inner = 0;
373 let result = ReadCondition::new(
374 &reader,
375 state::sample::Any | state::instance::Any | state::view::Any,
376 )
377 .unwrap_err();
378 reader.inner = reader_id;
379 assert_eq!(result, crate::Error::BadParameter);
380 }
381
382 #[test]
383 fn test_read_condition_get_mask() {
384 let domain_id = crate::tests::domain::unique_id();
385 let domain = crate::Domain::new(domain_id).unwrap();
386 let topic_name = crate::tests::topic::unique_name();
387 let participant = crate::Participant::new(&domain).unwrap();
388 let topic =
389 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
390 let reader = crate::Reader::new(&topic).unwrap();
391
392 let mask = state::sample::Any | state::instance::Any | state::view::Any;
393
394 let read_condition = ReadCondition::new(&reader, mask).unwrap();
395 let result = read_condition.mask().unwrap();
396 assert_eq!(result, mask);
397
398 let mask = state::sample::Fresh | state::instance::Unregistered | state::view::Old;
399 let result = read_condition.mask().unwrap();
400 assert_ne!(result, mask);
401
402 let read_condition = ReadCondition::new(&reader, mask).unwrap();
403 let result = read_condition.mask().unwrap();
404 assert_eq!(result, mask);
405 }
406
407 #[test]
408 fn test_read_condition_get_mask_on_invalid_read_condition() {
409 let domain_id = crate::tests::domain::unique_id();
410 let domain = crate::Domain::new(domain_id).unwrap();
411 let topic_name = crate::tests::topic::unique_name();
412 let participant = crate::Participant::new(&domain).unwrap();
413 let topic =
414 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
415 let reader = crate::Reader::new(&topic).unwrap();
416 let mut read_condition = ReadCondition::new(
417 &reader,
418 state::sample::Any | state::instance::Any | state::view::Any,
419 )
420 .unwrap();
421 let read_condition_id = read_condition.inner;
422 read_condition.inner = 0;
423 let result = read_condition.mask().unwrap_err();
424 assert_eq!(result, crate::Error::BadParameter);
425 let result = read_condition.triggered().unwrap_err();
426 assert_eq!(result, crate::Error::BadParameter);
427 read_condition.inner = read_condition_id;
428 }
429
430 #[test]
431 fn test_read_condition_triggering_reads() {
432 let domain_id = crate::tests::domain::unique_id();
433 let domain = crate::Domain::new(domain_id).unwrap();
434 let topic_name = crate::tests::topic::unique_name();
435 let participant = crate::Participant::new(&domain).unwrap();
436 let topic =
437 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
438 let reader = crate::Reader::new(&topic).unwrap();
439 let writer = crate::Writer::new(&topic).unwrap();
440
441 let mask = state::sample::Stale | state::instance::Any | state::view::Any;
442
443 let read_condition = ReadCondition::new(&reader, mask).unwrap();
444
445 let sample = crate::tests::topic::Data {
446 x: 101,
447 y: 202,
448 message: "hello".to_string(),
449 };
450 writer.write(&sample).unwrap();
451
452 let read_condition_received = read_condition.read().unwrap();
453 assert_eq!(read_condition_received.len(), 0);
454 let triggered = read_condition.triggered().unwrap();
455 assert!(!triggered);
456
457 let reader_received = reader.read().unwrap();
458 assert_eq!(reader_received.len(), 1);
459 assert_eq!(*reader_received[0], sample);
460 assert_eq!(
461 reader_received[0].info().state,
462 state::sample::Fresh | state::view::New | state::instance::Alive
463 );
464
465 let triggered = read_condition.triggered().unwrap();
466 assert!(triggered);
467
468 let read_condition_received = read_condition.peek().unwrap();
469 assert_eq!(read_condition_received.len(), 1);
470 assert_eq!(*read_condition_received[0], sample);
471
472 let triggered = read_condition.triggered().unwrap();
473 assert!(triggered);
474
475 let read_condition_received = read_condition.take().unwrap();
476 assert_eq!(read_condition_received.len(), 1);
477 assert_eq!(*read_condition_received[0], sample);
478
479 let triggered = read_condition.triggered().unwrap();
480 assert!(!triggered);
481
482 let reader_received = reader.read().unwrap();
483 assert!(reader_received.is_empty());
484
485 let read_condition_received = read_condition.read().unwrap();
486 assert!(read_condition_received.is_empty());
487 }
488}