Skip to main content

cyclonedds/
read_condition.rs

1use crate::internal::ffi;
2use crate::{Reader, Result, State};
3
4/// A filter on a [`Reader`](crate::Reader) that restricts samples by their
5/// [`State`](crate::State).
6///
7/// A `ReadCondition` is created against a reader with a state mask and can be
8/// attached to a [`WaitSet`](crate::WaitSet) to trigger when matching samples
9/// become available. Reading via the condition returns only samples whose
10/// combined sample, view, and instance state matches the mask.
11///
12/// # Examples
13///
14/// ```no_run
15/// use cyclonedds::state;
16/// use cyclonedds::{Duration, ReadCondition, WaitSet};
17/// # use cyclonedds::{Domain, Participant, Topic, Reader};
18/// # let domain = Domain::default();
19/// # let participant = Participant::new(&domain)?;
20/// # #[derive(
21/// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
22/// # )]
23/// # struct Data {
24/// #     x: i32,
25/// #     y: i32,
26/// # }
27///
28/// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
29/// let reader = Reader::new(&topic)?;
30///
31/// let condition = ReadCondition::new(
32///     &reader,
33///     state::sample::Fresh | state::instance::Any | state::view::Any,
34/// )?;
35/// let mut waitset = WaitSet::<()>::new(&participant)?;
36/// waitset.attach(&condition, None)?;
37/// waitset.wait(Duration::INFINITE)?;
38///
39/// let samples = condition.take()?;
40/// # Ok::<_, cyclonedds::Error>(())
41/// ```
42#[derive(Debug)]
43pub struct ReadCondition<'domain, 'participant, 'topic, 'reader, T>
44where
45    T: crate::Topicable,
46{
47    pub(crate) inner: cyclonedds_sys::dds_entity_t,
48    phantom: std::marker::PhantomData<&'reader Reader<'domain, 'participant, 'topic, T>>,
49}
50
51impl<'d, 'p, 't, 'r, T> ReadCondition<'d, 'p, 't, 'r, T>
52where
53    T: crate::Topicable,
54{
55    /// Creates a new [`ReadCondition`] on `reader` that matches samples whose
56    /// state satisfies `mask`.
57    ///
58    /// # Errors
59    ///
60    /// Returns an [`Error`](crate::Error) if the read condition fails to
61    /// create.
62    ///
63    /// # Examples
64    ///
65    /// ```
66    /// use cyclonedds::{ReadCondition, state};
67    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
68    /// # let domain = Domain::default();
69    /// # let participant = Participant::new(&domain)?;
70    /// # #[derive(
71    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
72    /// # )]
73    /// # struct Data {
74    /// #     x: i32,
75    /// #     y: i32,
76    /// # }
77    ///
78    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
79    /// let reader = Reader::new(&topic)?;
80    /// let condition = ReadCondition::new(&reader, state::sample::Fresh)?;
81    /// # Ok::<_, cyclonedds::Error>(())
82    /// ```
83    pub fn new(reader: &'r Reader<'d, 'p, 't, T>, mask: State) -> Result<Self> {
84        let inner = ffi::dds_create_readcondition(reader.inner, mask.bits())?;
85        Ok(Self {
86            inner,
87            phantom: std::marker::PhantomData,
88        })
89    }
90
91    /// Returns the state mask this condition was created with.
92    ///
93    /// # Errors
94    ///
95    /// Returns an [`Error`](crate::Error) if the mask returned by the read
96    /// condition is invalid.
97    ///
98    /// # Examples
99    ///
100    /// ```
101    /// use cyclonedds::{ReadCondition, state};
102    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
103    /// # let domain = Domain::default();
104    /// # let participant = Participant::new(&domain)?;
105    /// # #[derive(
106    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
107    /// # )]
108    /// # struct Data {
109    /// #     x: i32,
110    /// #     y: i32,
111    /// # }
112    ///
113    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
114    /// let reader = Reader::new(&topic)?;
115    /// let condition = ReadCondition::new(&reader, state::sample::Fresh)?;
116    /// assert_eq!(condition.mask()?, state::sample::Fresh);
117    /// # Ok::<_, cyclonedds::Error>(())
118    /// ```
119    pub fn mask(&self) -> Result<State> {
120        let mask = ffi::dds_get_mask(self.inner)?;
121        crate::state::State::from_bits(mask).ok_or(crate::error::Error::NonSpecific)
122    }
123
124    /// Returns `true` if this condition is currently triggered.
125    ///
126    /// A condition is triggered when samples matching its mask are available
127    /// in the reader cache.
128    ///
129    /// # Errors
130    ///
131    /// Returns an [`Error`](crate::Error) if the read condition fails to read
132    /// the trigger state.
133    ///
134    /// # Examples
135    ///
136    /// ```
137    /// use cyclonedds::{ReadCondition, state};
138    /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
139    /// # let domain = Domain::default();
140    /// # let participant = Participant::new(&domain)?;
141    /// # #[derive(
142    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
143    /// # )]
144    /// # struct Data {
145    /// #     x: i32,
146    /// #     y: i32,
147    /// # }
148    ///
149    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
150    /// let reader = Reader::new(&topic)?;
151    /// let writer = Writer::new(&topic)?;
152    ///
153    /// let condition = ReadCondition::new(&reader, state::sample::Fresh)?;
154    /// writer.write(&Data::default())?;
155    /// assert!(condition.triggered()?);
156    /// Ok::<_, cyclonedds::Error>(())
157    /// ```
158    pub fn triggered(&self) -> Result<bool> {
159        ffi::dds_triggered(self.inner)
160    }
161
162    /// Removes and returns all samples matching this condition's mask from the
163    /// reader cache.
164    ///
165    /// # Errors
166    ///
167    /// Returns an [`Error`](crate::Error) if the read condition fails to take
168    /// samples.
169    ///
170    /// # Examples
171    ///
172    /// ```
173    /// use cyclonedds::{ReadCondition, state};
174    /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
175    /// # let domain = Domain::default();
176    /// # let participant = Participant::new(&domain)?;
177    /// # #[derive(
178    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
179    /// # )]
180    /// # struct Data {
181    /// #     x: i32,
182    /// #     y: i32,
183    /// # }
184    ///
185    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
186    /// let reader = Reader::new(&topic)?;
187    /// let writer = Writer::new(&topic)?;
188    ///
189    /// let condition = ReadCondition::new(
190    ///     &reader,
191    ///     state::sample::Stale | state::instance::Any | state::view::Any,
192    /// )?;
193    /// writer.write(&Data::default())?;
194    ///
195    /// // No sample matches this state initially.
196    /// let samples = condition.take()?;
197    /// assert_eq!(samples.len(), 0);
198    ///
199    /// // Attempt a normal read.
200    /// assert_eq!(reader.read()?.len(), 1);
201    ///
202    /// // Sample should now match this state because they're stale.
203    /// let samples = condition.take()?;
204    /// assert_eq!(samples.len(), 1);
205    ///
206    /// // Samples should be removed from the cache.
207    /// assert_eq!(condition.take()?.len(), 0);
208    /// # Ok::<_, cyclonedds::Error>(())
209    /// ```
210    pub fn take(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
211    where
212        T: std::clone::Clone,
213    {
214        ffi::dds_take(self.inner)
215    }
216
217    /// Returns all samples matching this condition's mask without removing
218    /// them from the reader cache.
219    ///
220    /// # Errors
221    ///
222    /// Returns an [`Error`](crate::Error) if the read condition fails to read
223    /// samples.
224    ///
225    /// # Examples
226    ///
227    /// ```
228    /// use cyclonedds::{ReadCondition, state};
229    /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
230    /// # let domain = Domain::default();
231    /// # let participant = Participant::new(&domain)?;
232    /// # #[derive(
233    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
234    /// # )]
235    /// # struct Data {
236    /// #     x: i32,
237    /// #     y: i32,
238    /// # }
239    ///
240    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
241    /// let reader = Reader::new(&topic)?;
242    /// let writer = Writer::new(&topic)?;
243    ///
244    /// let condition = ReadCondition::new(
245    ///     &reader,
246    ///     state::sample::Stale | state::instance::Any | state::view::Any,
247    /// )?;
248    /// writer.write(&Data::default())?;
249    ///
250    /// // No sample matches this state initially.
251    /// let samples = condition.read()?;
252    /// assert_eq!(samples.len(), 0);
253    ///
254    /// // Attempt a normal read.
255    /// assert_eq!(reader.read()?.len(), 1);
256    ///
257    /// // Sample should now match this state because they're stale.
258    /// let samples = condition.read()?;
259    /// assert_eq!(samples.len(), 1);
260    ///
261    /// // Samples remain in the cache.
262    /// assert_eq!(condition.read()?.len(), 1);
263    /// # Ok::<_, cyclonedds::Error>(())
264    /// ```
265    pub fn read(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
266    where
267        T: std::clone::Clone,
268    {
269        ffi::dds_read(self.inner)
270    }
271
272    /// Returns all samples matching this condition's mask without marking them
273    /// as read or removing them from the cache.
274    ///
275    /// # Errors
276    ///
277    /// Returns an [`Error`](crate::Error) if the read condition fails to peek
278    /// samples.
279    ///
280    /// # Examples
281    ///
282    /// ```
283    /// use cyclonedds::{ReadCondition, state};
284    /// # use cyclonedds::{Domain, Participant, Topic, Reader, Writer};
285    /// # let domain = Domain::default();
286    /// # let participant = Participant::new(&domain)?;
287    /// # #[derive(
288    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
289    /// # )]
290    /// # struct Data {
291    /// #     x: i32,
292    /// #     y: i32,
293    /// # }
294    ///
295    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
296    /// let reader = Reader::new(&topic)?;
297    /// let writer = Writer::new(&topic)?;
298    ///
299    /// let condition = ReadCondition::new(
300    ///     &reader,
301    ///     state::sample::Stale | state::instance::Any | state::view::Any,
302    /// )?;
303    /// writer.write(&Data::default())?;
304    ///
305    /// // No sample matches this state initially.
306    /// let samples = condition.peek()?;
307    /// assert_eq!(samples.len(), 0);
308    ///
309    /// // Attempt a normal read.
310    /// assert_eq!(reader.read()?.len(), 1);
311    ///
312    /// // Sample should now match this state because they're stale.
313    /// let samples = condition.peek()?;
314    /// assert_eq!(samples.len(), 1);
315    ///
316    /// // Samples remain in the cache.
317    /// assert_eq!(condition.peek()?.len(), 1);
318    /// # Ok::<_, cyclonedds::Error>(())
319    /// ```
320    pub fn peek(&self) -> Result<Vec<crate::sample::SampleOrKey<T>>>
321    where
322        T: std::clone::Clone,
323    {
324        ffi::dds_peek(self.inner)
325    }
326}
327
328impl<T> Drop for ReadCondition<'_, '_, '_, '_, T>
329where
330    T: crate::Topicable,
331{
332    fn drop(&mut self) {
333        let result = ffi::dds_delete(self.inner);
334        debug_assert!(
335            result.is_ok(),
336            "unable to delete {self:?}: failed with {result:?}"
337        );
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use crate::state;
345
346    #[test]
347    fn test_read_condition_create() {
348        let domain_id = crate::tests::domain::unique_id();
349        let domain = crate::Domain::new(domain_id).unwrap();
350        let topic_name = crate::tests::topic::unique_name();
351        let participant = crate::Participant::new(&domain).unwrap();
352        let topic =
353            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
354        let reader = crate::Reader::new(&topic).unwrap();
355        let _ = ReadCondition::new(
356            &reader,
357            state::sample::Any | state::instance::Any | state::view::Any,
358        )
359        .unwrap();
360    }
361
362    #[test]
363    fn test_read_condition_create_with_invalid_reader() {
364        let domain_id = crate::tests::domain::unique_id();
365        let domain = crate::Domain::new(domain_id).unwrap();
366        let topic_name = crate::tests::topic::unique_name();
367        let participant = crate::Participant::new(&domain).unwrap();
368        let topic =
369            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
370        let mut reader = crate::Reader::new(&topic).unwrap();
371        let reader_id = reader.inner;
372        reader.inner = 0;
373        let result = ReadCondition::new(
374            &reader,
375            state::sample::Any | state::instance::Any | state::view::Any,
376        )
377        .unwrap_err();
378        reader.inner = reader_id;
379        assert_eq!(result, crate::Error::BadParameter);
380    }
381
382    #[test]
383    fn test_read_condition_get_mask() {
384        let domain_id = crate::tests::domain::unique_id();
385        let domain = crate::Domain::new(domain_id).unwrap();
386        let topic_name = crate::tests::topic::unique_name();
387        let participant = crate::Participant::new(&domain).unwrap();
388        let topic =
389            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
390        let reader = crate::Reader::new(&topic).unwrap();
391
392        let mask = state::sample::Any | state::instance::Any | state::view::Any;
393
394        let read_condition = ReadCondition::new(&reader, mask).unwrap();
395        let result = read_condition.mask().unwrap();
396        assert_eq!(result, mask);
397
398        let mask = state::sample::Fresh | state::instance::Unregistered | state::view::Old;
399        let result = read_condition.mask().unwrap();
400        assert_ne!(result, mask);
401
402        let read_condition = ReadCondition::new(&reader, mask).unwrap();
403        let result = read_condition.mask().unwrap();
404        assert_eq!(result, mask);
405    }
406
407    #[test]
408    fn test_read_condition_get_mask_on_invalid_read_condition() {
409        let domain_id = crate::tests::domain::unique_id();
410        let domain = crate::Domain::new(domain_id).unwrap();
411        let topic_name = crate::tests::topic::unique_name();
412        let participant = crate::Participant::new(&domain).unwrap();
413        let topic =
414            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
415        let reader = crate::Reader::new(&topic).unwrap();
416        let mut read_condition = ReadCondition::new(
417            &reader,
418            state::sample::Any | state::instance::Any | state::view::Any,
419        )
420        .unwrap();
421        let read_condition_id = read_condition.inner;
422        read_condition.inner = 0;
423        let result = read_condition.mask().unwrap_err();
424        assert_eq!(result, crate::Error::BadParameter);
425        let result = read_condition.triggered().unwrap_err();
426        assert_eq!(result, crate::Error::BadParameter);
427        read_condition.inner = read_condition_id;
428    }
429
430    #[test]
431    fn test_read_condition_triggering_reads() {
432        let domain_id = crate::tests::domain::unique_id();
433        let domain = crate::Domain::new(domain_id).unwrap();
434        let topic_name = crate::tests::topic::unique_name();
435        let participant = crate::Participant::new(&domain).unwrap();
436        let topic =
437            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
438        let reader = crate::Reader::new(&topic).unwrap();
439        let writer = crate::Writer::new(&topic).unwrap();
440
441        let mask = state::sample::Stale | state::instance::Any | state::view::Any;
442
443        let read_condition = ReadCondition::new(&reader, mask).unwrap();
444
445        let sample = crate::tests::topic::Data {
446            x: 101,
447            y: 202,
448            message: "hello".to_string(),
449        };
450        writer.write(&sample).unwrap();
451
452        let read_condition_received = read_condition.read().unwrap();
453        assert_eq!(read_condition_received.len(), 0);
454        let triggered = read_condition.triggered().unwrap();
455        assert!(!triggered);
456
457        let reader_received = reader.read().unwrap();
458        assert_eq!(reader_received.len(), 1);
459        assert_eq!(*reader_received[0], sample);
460        assert_eq!(
461            reader_received[0].info().state,
462            state::sample::Fresh | state::view::New | state::instance::Alive
463        );
464
465        let triggered = read_condition.triggered().unwrap();
466        assert!(triggered);
467
468        let read_condition_received = read_condition.peek().unwrap();
469        assert_eq!(read_condition_received.len(), 1);
470        assert_eq!(*read_condition_received[0], sample);
471
472        let triggered = read_condition.triggered().unwrap();
473        assert!(triggered);
474
475        let read_condition_received = read_condition.take().unwrap();
476        assert_eq!(read_condition_received.len(), 1);
477        assert_eq!(*read_condition_received[0], sample);
478
479        let triggered = read_condition.triggered().unwrap();
480        assert!(!triggered);
481
482        let reader_received = reader.read().unwrap();
483        assert!(reader_received.is_empty());
484
485        let read_condition_received = read_condition.read().unwrap();
486        assert!(read_condition_received.is_empty());
487    }
488}