Skip to main content

cyclonedds/
waitset.rs

1use crate::entity::{Entity, EntityId};
2use crate::internal::ffi;
3use crate::{Participant, Result};
4
5/// An entity for blocking until one or more conditions are met.
6///
7/// A `WaitSet` collects conditions from attached entities and blocks via
8/// [`wait`](WaitSet::wait) or [`wait_until`](WaitSet::wait_until) until at
9/// least one of them triggers. Entities are attached with an optional typed
10/// blob `A` that is returned alongside the triggered condition, allowing the
11/// caller to identify which entity triggered the wakeup.
12///
13/// Pass `()` as `A` when blobs are not needed.
14///
15/// # Examples
16///
17/// ```no_run
18/// use cyclonedds::{Duration, WaitSet};
19/// # use cyclonedds::{Domain, Participant, Topic, Reader};
20/// # let domain = Domain::default();
21/// # let participant = Participant::new(&domain)?;
22/// # #[derive(
23/// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
24/// # )]
25/// # struct Data {
26/// #     x: i32,
27/// #     y: i32,
28/// # }
29///
30/// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
31/// let reader = Reader::new(&topic)?;
32///
33/// let mut waitset = WaitSet::<()>::new(&participant)?;
34/// waitset.attach(&reader, None)?;
35/// waitset.wait(Duration::INFINITE)?;
36///
37/// let samples = reader.take()?;
38/// # Ok::<_, cyclonedds::Error>(())
39/// ```
40pub struct WaitSet<'domain, 'participant, 'attached, A> {
41    pub(crate) inner: cyclonedds_sys::dds_entity_t,
42    attached: std::collections::HashMap<EntityId, &'attached dyn Entity>,
43    phantom_blobs: std::marker::PhantomData<&'attached A>,
44    phantom: std::marker::PhantomData<&'participant Participant<'domain>>,
45}
46
47impl<A> std::fmt::Debug for WaitSet<'_, '_, '_, A> {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("WaitSet")
50            .field("inner", &self.inner)
51            .field("attached", &self.attached.keys())
52            .field("phantom", &self.phantom)
53            .finish()
54    }
55}
56
57impl<'d, 'p, 'a, A> WaitSet<'d, 'p, 'a, A> {
58    /// Creates a new `WaitSet` under a `participant`.
59    ///
60    /// # Errors
61    ///
62    /// Returns an [`Error`](crate::Error) if the waitset fails to create.
63    ///
64    /// # Examples
65    ///
66    /// ```
67    /// use cyclonedds::WaitSet;
68    /// # use cyclonedds::{Domain, Participant};
69    /// # let domain = Domain::default();
70    /// # let participant = Participant::new(&domain)?;
71    ///
72    /// let mut waitset = WaitSet::<()>::new(&participant)?;
73    /// # Ok::<_, cyclonedds::Error>(())
74    /// ```
75    pub fn new(participant: &'p Participant<'d>) -> Result<Self> {
76        let inner = ffi::dds_create_waitset(participant.inner)?;
77        Ok(Self {
78            inner,
79            attached: std::collections::HashMap::new(),
80            phantom_blobs: std::marker::PhantomData,
81            phantom: std::marker::PhantomData,
82        })
83    }
84
85    /// Attaches `entity` to this waitset with an optional `blob`.
86    ///
87    /// When the entity triggers a wakeup, the associated `blob` is returned
88    /// by [`wait`](WaitSet::wait). If the entity is already attached, this is
89    /// a no-op.
90    ///
91    /// # Errors
92    ///
93    /// Returns an [`Error`](crate::Error) if the waitset fails to attach the
94    /// entity.
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// use cyclonedds::WaitSet;
100    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
101    /// # let domain = Domain::default();
102    /// # let participant = Participant::new(&domain)?;
103    /// # #[derive(
104    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
105    /// # )]
106    /// # struct Data {
107    /// #     x: i32,
108    /// #     y: i32,
109    /// # }
110    ///
111    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
112    /// let reader = Reader::new(&topic)?;
113    ///
114    /// let mut waitset = WaitSet::<()>::new(&participant)?;
115    /// waitset.attach(&reader, None)?;
116    /// # Ok::<_, cyclonedds::Error>(())
117    /// ```
118    pub fn attach(&mut self, entity: &'a dyn Entity, blob: Option<&'a A>) -> Result<()> {
119        let id = entity.id();
120        if !self.attached.contains_key(&id) {
121            ffi::dds_waitset_attach(
122                self.inner,
123                id.inner,
124                blob.map_or(std::ptr::null(), |blob| std::ptr::from_ref(blob)) as isize,
125            )?;
126            self.attached.insert(id, entity);
127        }
128        Ok(())
129    }
130
131    /// Detaches `entity` from this waitset.
132    ///
133    /// If the entity is not attached, this is a no-op.
134    ///
135    /// # Errors
136    ///
137    /// Returns an [`Error`](crate::Error) if the waitset fails to detach the
138    /// entity.
139    ///
140    /// # Examples
141    ///
142    /// ```
143    /// use cyclonedds::WaitSet;
144    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
145    /// # let domain = Domain::default();
146    /// # let participant = Participant::new(&domain)?;
147    /// # #[derive(
148    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
149    /// # )]
150    /// # struct Data {
151    /// #     x: i32,
152    /// #     y: i32,
153    /// # }
154    ///
155    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
156    /// let reader = Reader::new(&topic)?;
157    ///
158    /// let mut waitset = WaitSet::<()>::new(&participant)?;
159    /// waitset.attach(&reader, None)?;
160    /// waitset.detach(&reader)?;
161    /// # Ok::<_, cyclonedds::Error>(())
162    /// ```
163    pub fn detach(&mut self, entity: &'a dyn Entity) -> Result<()> {
164        let entity = entity.id();
165        self.detach_id(entity)
166    }
167
168    fn detach_id(&mut self, entity_id: EntityId) -> Result<()> {
169        if self.attached.contains_key(&entity_id) {
170            ffi::dds_waitset_detach(self.inner, entity_id.inner)?;
171            self.attached.remove(&entity_id);
172        }
173
174        Ok(())
175    }
176
177    /// Sets the trigger state of this waitset directly.
178    ///
179    /// Setting to `true` causes any current or future [`wait`](WaitSet::wait)
180    /// call to return immediately. Setting to `false` resets it.
181    ///
182    /// # Errors
183    ///
184    /// Returns an [`Error`](crate::Error) if the waitset fails to set the
185    /// trigger.
186    ///
187    /// # Examples
188    ///
189    /// ```
190    /// use cyclonedds::WaitSet;
191    /// # use cyclonedds::{Domain, Participant};
192    /// # let domain = Domain::default();
193    /// # let participant = Participant::new(&domain)?;
194    ///
195    /// let mut waitset = WaitSet::<()>::new(&participant)?;
196    /// waitset.set_trigger(true)?;
197    /// # Ok::<_, cyclonedds::Error>(())
198    /// ```
199    pub fn set_trigger(&mut self, trigger: bool) -> Result<()> {
200        ffi::dds_waitset_set_trigger(self.inner, trigger)
201    }
202
203    /// Blocks until at least one attached condition triggers or `timeout`
204    /// elapses.
205    ///
206    /// Returns the blobs associated with the triggered conditions. Pass
207    /// [`Duration::INFINITE`](crate::Duration::INFINITE) to block
208    /// indefinitely.
209    ///
210    /// # Errors
211    ///
212    /// Returns an [`Error`](crate::Error) if the timeout elapses without any
213    /// condition triggering or if the waitset returns an error.
214    ///
215    /// # Examples
216    ///
217    /// ```no_run
218    /// use cyclonedds::{Duration, WaitSet};
219    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
220    /// # let domain = Domain::default();
221    /// # let participant = Participant::new(&domain)?;
222    /// # #[derive(
223    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
224    /// # )]
225    /// # struct Data {
226    /// #     x: i32,
227    /// #     y: i32,
228    /// # }
229    /// # let topic = Topic::<Data>::new(&participant, "MyTopic")?;
230    /// # let reader = Reader::new(&topic)?;
231    ///
232    /// let mut waitset = WaitSet::<()>::new(&participant)?;
233    /// waitset.attach(&reader, None)?;
234    /// waitset.wait(Duration::from_secs(5))?;
235    /// # Ok::<_, cyclonedds::Error>(())
236    /// ```
237    pub fn wait(&mut self, timeout: crate::Duration) -> Result<Vec<&'a A>> {
238        let (_, attachments) =
239            ffi::dds_waitset_wait::<A>(self.inner, self.attached.len(), timeout.inner)?;
240        Ok(attachments)
241    }
242
243    /// Blocks until at least one attached condition triggers or
244    /// `absolute_time` is reached.
245    ///
246    /// Like [`wait`](WaitSet::wait) but takes an absolute [`Time`](crate::Time)
247    /// rather than a relative timeout.
248    ///
249    /// # Errors
250    ///
251    /// Returns an [`Error`](crate::Error) if the deadline passes without any
252    /// condition triggering or if the waitset returns an error.
253    ///
254    /// # Examples
255    ///
256    /// ```no_run
257    /// use cyclonedds::{Time, WaitSet};
258    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
259    /// # let domain = Domain::default();
260    /// # let participant = Participant::new(&domain)?;
261    /// # #[derive(
262    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
263    /// # )]
264    /// # struct Data {
265    /// #     x: i32,
266    /// #     y: i32,
267    /// # }
268    /// # let topic = Topic::<Data>::new(&participant, "MyTopic")?;
269    /// # let reader = Reader::new(&topic)?;
270    ///
271    /// let mut waitset = WaitSet::<()>::new(&participant)?;
272    /// waitset.attach(&reader, None)?;
273    /// waitset.wait_until(
274    ///     (std::time::SystemTime::now() + std::time::Duration::from_secs(5))
275    ///         .try_into()
276    ///         .unwrap(),
277    /// )?;
278    /// # Ok::<_, cyclonedds::Error>(())
279    /// ```
280    pub fn wait_until(&mut self, absolute_time: crate::Time) -> Result<Vec<&'a A>> {
281        let (_, attachments) =
282            ffi::dds_waitset_wait_until::<A>(self.inner, self.attached.len(), absolute_time.inner)?;
283        Ok(attachments)
284    }
285
286    /// Returns `true` if `entity` is currently attached to this waitset.
287    ///
288    /// # Examples
289    ///
290    /// ```
291    /// use cyclonedds::WaitSet;
292    /// # use cyclonedds::{Domain, Participant, Topic, Reader};
293    /// # let domain = Domain::default();
294    /// # let participant = Participant::new(&domain)?;
295    /// # #[derive(
296    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
297    /// # )]
298    /// # struct Data {
299    /// #     x: i32,
300    /// #     y: i32,
301    /// # }
302    ///
303    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
304    /// let reader = Reader::new(&topic)?;
305    ///
306    /// let mut waitset = WaitSet::<()>::new(&participant)?;
307    /// assert!(!waitset.is_attached(&reader));
308    /// waitset.attach(&reader, None)?;
309    /// assert!(waitset.is_attached(&reader));
310    /// # Ok::<_, cyclonedds::Error>(())
311    /// ```
312    pub fn is_attached(&self, entity: &'a dyn Entity) -> bool {
313        self.attached.contains_key(&entity.id())
314    }
315}
316
317impl<A> Drop for WaitSet<'_, '_, '_, A> {
318    fn drop(&mut self) {
319        for entity_id in self.attached.keys() {
320            let result = ffi::dds_waitset_detach(self.inner, entity_id.inner);
321            debug_assert!(
322                result.is_ok(),
323                "unable to detach entity: {entity_id:?} from {self:?}: {result:?}"
324            );
325        }
326
327        let result = ffi::dds_delete(self.inner);
328        debug_assert!(
329            result.is_ok(),
330            "unable to delete {self:?}: failed with {result:?}"
331        );
332    }
333}
334
335#[cfg(test)]
336mod tests {
337    use super::*;
338    use crate::state;
339
340    #[test]
341    fn test_waitset_create() {
342        let domain_id = crate::tests::domain::unique_id();
343        let domain = crate::Domain::new(domain_id).unwrap();
344        let participant = crate::Participant::new(&domain).unwrap();
345        let _ = WaitSet::<()>::new(&participant).unwrap();
346    }
347
348    #[test]
349    fn test_waitset_create_with_invalid_participant() {
350        let domain_id = crate::tests::domain::unique_id();
351        let domain = crate::Domain::new(domain_id).unwrap();
352        let mut participant = crate::Participant::new(&domain).unwrap();
353        let participant_id = participant.inner;
354        participant.inner = 0;
355        let result = WaitSet::<()>::new(&participant).unwrap_err();
356        participant.inner = participant_id;
357
358        assert_eq!(result, crate::Error::BadParameter);
359    }
360
361    #[test]
362    fn test_waitset_debug_formatting() {
363        let domain_id = crate::tests::domain::unique_id();
364        let domain = crate::Domain::new(domain_id).unwrap();
365        let participant = crate::Participant::new(&domain).unwrap();
366        let waitset = WaitSet::<()>::new(&participant).unwrap();
367
368        let result = format!("{waitset:?}");
369        assert!(result.contains(&format!("{}", waitset.inner)));
370    }
371
372    #[test]
373    fn test_waitset_attachment() {
374        let domain_id = crate::tests::domain::unique_id();
375        let domain = crate::Domain::new(domain_id).unwrap();
376        let participant = crate::Participant::new(&domain).unwrap();
377        let topic_name = crate::tests::topic::unique_name();
378        let topic =
379            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
380        let reader = crate::Reader::new(&topic).unwrap();
381        let mask = state::sample::Any | state::view::Any | state::instance::Any;
382        let read_condition = crate::ReadCondition::new(&reader, mask).unwrap();
383
384        let mut waitset = WaitSet::<()>::new(&participant).unwrap();
385
386        let result = waitset.attach(&topic, None);
387        assert!(result.is_ok());
388        let result = waitset.attach(&topic, None);
389        assert!(result.is_ok());
390        let result = waitset.attach(&read_condition, None);
391        assert!(result.is_ok());
392
393        assert!(waitset.is_attached(&topic));
394        assert!(waitset.is_attached(&read_condition));
395
396        let result = waitset.detach(&read_condition);
397        assert!(result.is_ok());
398
399        assert!(waitset.is_attached(&topic));
400        assert!(!waitset.is_attached(&read_condition));
401
402        let result = waitset.detach(&read_condition);
403        assert!(result.is_ok());
404    }
405
406    #[test]
407    fn test_waitset_attachment_with_invalid_waitset() {
408        let domain_id = crate::tests::domain::unique_id();
409        let domain = crate::Domain::new(domain_id).unwrap();
410        let participant = crate::Participant::new(&domain).unwrap();
411        let topic_name = crate::tests::topic::unique_name();
412        let topic =
413            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
414        let reader = crate::Reader::new(&topic).unwrap();
415        let mask = state::sample::Any | state::view::Any | state::instance::Any;
416        let read_condition = crate::ReadCondition::new(&reader, mask).unwrap();
417
418        let mut waitset = WaitSet::<()>::new(&participant).unwrap();
419
420        let result = waitset.attach(&topic, None);
421        assert!(result.is_ok());
422
423        let waitset_id = waitset.inner;
424        waitset.inner = 0;
425
426        let result = waitset.attach(&read_condition, None).unwrap_err();
427        assert_eq!(result, crate::Error::BadParameter);
428
429        let result = waitset.detach(&topic).unwrap_err();
430        assert_eq!(result, crate::Error::BadParameter);
431
432        waitset.inner = waitset_id;
433    }
434
435    #[test]
436    fn test_waitset_wait() {
437        let domain_id = crate::tests::domain::unique_id();
438        let domain = crate::Domain::new(domain_id).unwrap();
439        let participant = crate::Participant::new(&domain).unwrap();
440        let topic_name = crate::tests::topic::unique_name();
441        let topic =
442            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
443        let reader = crate::Reader::new(&topic).unwrap();
444        let writer = crate::Writer::new(&topic).unwrap();
445        let mask = state::sample::Any | state::view::Any | state::instance::Any;
446        let read_condition_1 = crate::ReadCondition::new(&reader, mask).unwrap();
447
448        let mask = state::sample::Any | state::view::Any | state::instance::Any;
449        let read_condition_2 = crate::ReadCondition::new(&reader, mask).unwrap();
450
451        let mask = state::sample::Any | state::view::Any | state::instance::Any;
452        let read_condition_3 = crate::ReadCondition::new(&reader, mask).unwrap();
453
454        let attach01 = String::from("hello");
455        let attach02 = String::from("world");
456        let mut waitset = WaitSet::new(&participant).unwrap();
457
458        waitset.attach(&read_condition_1, Some(&attach01)).unwrap();
459
460        let actual = waitset
461            .wait(crate::Duration::from_nanos(5_000_000))
462            .unwrap_err();
463        assert_eq!(actual, crate::Error::Timeout);
464
465        let actual = waitset.wait_until(crate::Time::from_nanos(0)).unwrap_err();
466        assert_eq!(actual, crate::Error::Timeout);
467
468        writer.write(&crate::tests::topic::Data::default()).unwrap();
469        let actual = waitset
470            .wait(crate::Duration::from_nanos(1_000_000_000))
471            .unwrap();
472        assert_eq!(actual, vec![&attach01]);
473
474        let actual = waitset.wait_until(crate::Time::from_nanos(1)).unwrap();
475        assert_eq!(actual, vec![&attach01]);
476
477        waitset.attach(&read_condition_2, Some(&attach02)).unwrap();
478        let actual = waitset
479            .wait(crate::Duration::from_nanos(1_000_000_000))
480            .unwrap();
481        assert_eq!(actual, vec![&attach01, &attach02]);
482
483        let actual = waitset.wait_until(crate::Time::from_nanos(1)).unwrap();
484        assert_eq!(actual, vec![&attach01, &attach02]);
485
486        waitset.attach(&read_condition_3, None).unwrap();
487        let actual = waitset
488            .wait(crate::Duration::from_nanos(1_000_000_000))
489            .unwrap();
490        assert_eq!(actual, vec![&attach01, &attach02]);
491
492        let actual = waitset.wait_until(crate::Time::from_nanos(1)).unwrap();
493        assert_eq!(actual, vec![&attach01, &attach02]);
494    }
495
496    #[test]
497    fn test_waitset_wait_with_invalid_waitset() {
498        let domain_id = crate::tests::domain::unique_id();
499        let domain = crate::Domain::new(domain_id).unwrap();
500        let participant = crate::Participant::new(&domain).unwrap();
501
502        let attach01 = String::from("hello");
503        let mut waitset = WaitSet::new(&participant).unwrap();
504        waitset.attach(&participant, Some(&attach01)).unwrap();
505
506        let waitset_id = waitset.inner;
507        waitset.inner = 0;
508        let result = waitset.wait(crate::Duration::INFINITE).unwrap_err();
509        assert_eq!(result, crate::Error::BadParameter);
510        let result = waitset.wait_until(crate::Time::NEVER).unwrap_err();
511        assert_eq!(result, crate::Error::BadParameter);
512
513        waitset.inner = waitset_id;
514    }
515
516    #[test]
517    fn test_waitset_set_trigger() {
518        let domain_id = crate::tests::domain::unique_id();
519        let domain = crate::Domain::new(domain_id).unwrap();
520        let participant = crate::Participant::new(&domain).unwrap();
521        let mut waitset = WaitSet::<()>::new(&participant).unwrap();
522
523        let result = waitset.set_trigger(true);
524        assert!(result.is_ok());
525    }
526
527    #[test]
528    fn test_waitset_set_trigger_with_invalid_waitset() {
529        let domain_id = crate::tests::domain::unique_id();
530        let domain = crate::Domain::new(domain_id).unwrap();
531        let participant = crate::Participant::new(&domain).unwrap();
532        let mut waitset = WaitSet::<()>::new(&participant).unwrap();
533        let waitset_id = waitset.inner;
534        waitset.inner = 0;
535
536        let result = waitset.set_trigger(true).unwrap_err();
537        assert_eq!(result, crate::Error::BadParameter);
538        waitset.inner = waitset_id;
539    }
540
541    #[test]
542    fn test_waitset_wait_dynamic_data() {
543        let domain_id = crate::tests::domain::unique_id();
544        let domain = crate::Domain::new(domain_id).unwrap();
545        let participant = crate::Participant::new(&domain).unwrap();
546        let topic_name = crate::tests::topic::unique_name();
547        let topic =
548            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
549        let reader = crate::Reader::new(&topic).unwrap();
550        let writer = crate::Writer::new(&topic).unwrap();
551
552        let data01 = 10;
553        let data02 = "String";
554        let attach01 = Box::new(data01) as _;
555        let attach02 = Box::new(data02) as _;
556        let mut waitset = WaitSet::<Box<dyn std::any::Any>>::new(&participant).unwrap();
557        waitset.attach(&reader, Some(&attach01)).unwrap();
558        waitset.attach(&writer, Some(&attach02)).unwrap();
559
560        writer.write(&crate::tests::topic::Data::default()).unwrap();
561
562        let attachments = waitset.wait(crate::Duration::INFINITE).unwrap();
563
564        assert_eq!(attachments.len(), 2);
565
566        let attach01_result = attachments[0].downcast_ref::<i32>().unwrap();
567        let attach02_result = attachments[1].downcast_ref::<&str>().unwrap();
568
569        assert_eq!(*attach01_result, data01);
570        assert_eq!(*attach02_result, data02);
571    }
572}