Skip to main content

cyclonedds/
subscriber.rs

1use crate::internal::ffi;
2use crate::internal::traits::AsFfi;
3use crate::{Participant, Result};
4
5/// A `Subscriber` groups [`Readers`](crate::Reader) and controls their shared
6/// [`QoS`](crate::QoS). Readers created under a subscriber inherit its
7/// [`QoS`](crate::QoS) where applicable.
8///
9/// Use [`Subscriber::new`] for simple construction or [`Subscriber::builder`]
10/// for [`QoS`](crate::QoS) and
11/// [`listener`](crate::listener::SubscriberListener) configuration.
12///
13/// In most applications a subscriber is created implicitly when constructing a
14/// [`Reader`](crate::Reader) directly. Use an explicit subscriber when you need
15/// coordinated reads across multiple readers.
16#[derive(Debug)]
17pub struct Subscriber<'domain, 'participant> {
18    pub(crate) inner: cyclonedds_sys::dds_entity_t,
19    phantom: std::marker::PhantomData<&'participant Participant<'domain>>,
20}
21
22/// Builder for [`Subscriber`] (accessible via [`Subscriber::builder`]).
23#[derive(Debug)]
24pub struct SubscriberBuilder<'domain, 'participant, 'qos> {
25    participant: &'participant Participant<'domain>,
26    qos: Option<&'qos crate::QoS>,
27    listener: Option<crate::SubscriberListener>,
28}
29
30impl<'d, 'p, 'q> SubscriberBuilder<'d, 'p, 'q> {
31    /// Creates a new [`SubscriberBuilder`] for the given [`Participant`].
32    ///
33    /// # Examples
34    ///
35    /// ```
36    /// use cyclonedds::builder::SubscriberBuilder;
37    /// use cyclonedds::{Domain, Participant};
38    ///
39    /// let domain = Domain::default();
40    /// let participant = Participant::new(&domain)?;
41    /// let subscriber_builder = SubscriberBuilder::new(&participant);
42    /// # Ok::<_, cyclonedds::Error>(())
43    /// ```
44    #[must_use]
45    pub const fn new(participant: &'p Participant<'d>) -> Self {
46        Self {
47            participant,
48            qos: None,
49            listener: None,
50        }
51    }
52
53    /// Sets the [`QoS`](crate::QoS) for this subscriber builder.
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// use cyclonedds::builder::SubscriberBuilder;
59    /// use cyclonedds::qos::policy;
60    /// use cyclonedds::{Duration, QoS};
61    /// # use cyclonedds::{Domain, Participant};
62    /// # let domain = Domain::default();
63    /// # let participant = Participant::new(&domain)?;
64    ///
65    /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
66    ///     max_blocking_time: Duration::from_millis(100),
67    /// });
68    /// let subscriber_builder = SubscriberBuilder::new(&participant).with_qos(&qos);
69    /// # Ok::<_, cyclonedds::Error>(())
70    /// ```
71    #[must_use]
72    pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
73        self.qos = Some(qos);
74        self
75    }
76
77    ///
78    /// Sets the [`Listener`](crate::Listener) on this subscriber builder.
79    ///
80    /// # Examples
81    ///
82    /// ```
83    /// use cyclonedds::Listener;
84    /// use cyclonedds::builder::SubscriberBuilder;
85    /// # use cyclonedds::{Domain, Participant};
86    /// # let domain = Domain::default();
87    /// # let participant = Participant::new(&domain)?;
88    ///
89    /// let subscriber_builder = SubscriberBuilder::new(&participant).with_listener(Listener::new());
90    /// # Ok::<_, cyclonedds::Error>(())
91    /// ```
92    #[must_use]
93    pub fn with_listener<L>(mut self, listener: L) -> Self
94    where
95        L: AsRef<crate::SubscriberListener>,
96    {
97        self.listener = Some(*listener.as_ref());
98        self
99    }
100
101    /// Builds the [`Subscriber`].
102    ///
103    /// # Errors
104    ///
105    /// Returns an [`Error`](crate::Error) if the subscriber failed to create.
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// use cyclonedds::QoS;
111    /// use cyclonedds::builder::SubscriberBuilder;
112    /// use cyclonedds::qos::policy;
113    /// # use cyclonedds::{Domain, Participant};
114    /// # let domain = Domain::default();
115    /// # let participant = Participant::new(&domain)?;
116    ///
117    /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
118    /// let subscriber = SubscriberBuilder::new(&participant)
119    ///     .with_qos(&qos)
120    ///     .build()?;
121    /// # Ok::<_, cyclonedds::Error>(())
122    /// ```
123    pub fn build(self) -> Result<Subscriber<'d, 'p>> {
124        // NOTE: using `and_then` to avoid ? branch on the listener for coverage
125        // since the C lib currently panics on OOM rather than returning null.
126        self.listener
127            .map(|listener| listener.as_ffi())
128            .transpose()
129            .and_then(|listener| {
130                Ok(Subscriber {
131                    inner: ffi::dds_create_subscriber(
132                        self.participant.inner,
133                        self.qos.map(|qos| &qos.inner),
134                        listener.as_ref(),
135                    )?,
136                    phantom: std::marker::PhantomData,
137                })
138            })
139    }
140}
141
142impl<'d, 'p> Subscriber<'d, 'p> {
143    /// Creates a new `Subscriber` under `participant` with default
144    /// [`QoS`](crate::QoS) and no
145    /// [`listener`](crate::listener::SubscriberListener).
146    ///
147    /// # Errors
148    ///
149    /// Returns an [`Error`](crate::Error) if the subscriber fails to create.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// use cyclonedds::Subscriber;
155    /// # use cyclonedds::{Domain, Participant};
156    /// # let domain = Domain::default();
157    /// # let participant = Participant::new(&domain)?;
158    ///
159    /// let subscriber = Subscriber::new(&participant)?;
160    /// Ok::<_, cyclonedds::Error>(())
161    /// ```
162    pub fn new(participant: &'p Participant<'d>) -> Result<Self> {
163        Self::builder(participant).build()
164    }
165
166    /// Returns a [`SubscriberBuilder`](crate::builder::SubscriberBuilder) for
167    /// constructing a subscriber with custom [`QoS`](crate::QoS) or a
168    /// [`listener`](crate::listener::SubscriberListener).
169    ///
170    /// # Examples
171    ///
172    /// ```
173    /// use cyclonedds::{
174    ///     QoS, Subscriber,
175    ///     qos::policy::{Durability, Presentation},
176    /// };
177    /// # use cyclonedds::{Domain, Participant};
178    /// # let domain = Domain::default();
179    /// # let participant = Participant::new(&domain)?;
180    ///
181    /// let qos = QoS::new().with_presentation(Presentation::Topic {
182    ///     coherent_access: true,
183    ///     ordered_access: true,
184    /// });
185    /// let subscriber = Subscriber::builder(&participant).with_qos(&qos).build()?;
186    /// Ok::<_, cyclonedds::Error>(())
187    /// ```
188    #[must_use]
189    pub const fn builder<'q>(participant: &'p Participant<'d>) -> SubscriberBuilder<'d, 'p, 'q> {
190        SubscriberBuilder::new(participant)
191    }
192
193    /// (WARN: unimplemented in C lib): Notifies all readers belonging to this
194    /// subscriber that data is available.
195    ///
196    /// <div class="warning">
197    ///
198    /// This function is currently not implemented by the underlying C library
199    /// and will thus always return an unsupported error.
200    ///
201    /// </div>
202    ///
203    /// Triggers the
204    /// [`DataOnReaders`](crate::listener::SubscriberListener::with_data_on_readers)
205    /// callback on the subscriber's listener and the
206    /// [`DataAvailable`](crate::listener::ReaderListener::with_data_available)
207    /// callback on each reader's listener.
208    ///
209    /// # Errors
210    ///
211    /// Returns an [`Error`](crate::Error) if the subscriber fails to notify the
212    /// readers.
213    ///
214    /// # Examples
215    ///
216    /// ```no_run
217    /// use cyclonedds::Subscriber;
218    /// # use cyclonedds::{Domain, Participant};
219    /// # let domain = Domain::default();
220    /// # let participant = Participant::new(&domain)?;
221    ///
222    /// let subscriber = Subscriber::new(&participant)?;
223    /// subscriber.notify_readers()?;
224    /// # Ok::<_, cyclonedds::Error>(())
225    /// ```
226    pub fn notify_readers(&self) -> Result<()> {
227        ffi::dds_notify_readers(self.inner)
228    }
229
230    pub(crate) const fn from_existing(
231        inner: cyclonedds_sys::dds_entity_t,
232    ) -> std::mem::ManuallyDrop<Self> {
233        std::mem::ManuallyDrop::new(Self {
234            inner,
235            phantom: std::marker::PhantomData,
236        })
237    }
238
239    /// Sets the [`SubscriberListener`](crate::SubscriberListener) on this
240    /// subscriber, replacing any previously set listener.
241    ///
242    /// # Errors
243    ///
244    /// Returns an [`Error`](crate::Error) if the subscriber fails to set the
245    /// listener.
246    ///
247    /// # Examples
248    ///
249    /// ```
250    /// use cyclonedds::SubscriberListener;
251    /// # use cyclonedds::{Domain, Participant, Subscriber};
252    /// # let domain = Domain::default();
253    /// # let participant = Participant::new(&domain)?;
254    ///
255    /// let mut subscriber = Subscriber::new(&participant)?;
256    /// subscriber.set_listener(SubscriberListener::new())?;
257    /// # Ok::<_, cyclonedds::Error>(())
258    /// ```
259    pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
260    where
261        L: AsRef<crate::SubscriberListener>,
262    {
263        listener
264            .as_ref()
265            .as_ffi()
266            .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
267    }
268
269    /// Removes the listener from this subscriber.
270    ///
271    /// # Errors
272    ///
273    /// Returns an [`Error`](crate::Error) if the subscriber fails to unset the
274    /// listener.
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// # use cyclonedds::{Domain, Participant, Subscriber};
280    /// # let domain = Domain::default();
281    /// # let participant = Participant::new(&domain)?;
282    /// let mut subscriber = Subscriber::new(&participant)?;
283    /// subscriber.unset_listener()?;
284    /// # Ok::<_, cyclonedds::Error>(())
285    /// ```
286    pub fn unset_listener(&mut self) -> Result<()> {
287        ffi::dds_set_listener(self.inner, None)?;
288        Ok(())
289    }
290
291    /// Sets the [`SubscriberListener`](crate::SubscriberListener) on this
292    /// subscriber, consuming and returning `self`.
293    ///
294    /// # Errors
295    ///
296    /// Returns an [`Error`](crate::Error) if the subscriber fails to set the
297    /// listener.
298    ///
299    /// # Examples
300    ///
301    /// ```
302    /// use cyclonedds::SubscriberListener;
303    /// # use cyclonedds::{Domain, Participant, Subscriber};
304    /// # let domain = Domain::default();
305    /// # let participant = Participant::new(&domain)?;
306    ///
307    /// let subscriber = Subscriber::new(&participant)?.with_listener(SubscriberListener::new())?;
308    /// # Ok::<_, cyclonedds::Error>(())
309    /// ```
310    pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
311    where
312        L: AsRef<crate::SubscriberListener>,
313    {
314        self.set_listener(listener).map(|_err| self)
315    }
316}
317
318impl Drop for Subscriber<'_, '_> {
319    fn drop(&mut self) {
320        let result = ffi::dds_delete(self.inner);
321        debug_assert!(
322            result.is_ok(),
323            "unable to delete {self:?}: failed with {result:?}"
324        );
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_subscriber_create() {
334        let domain_id = crate::tests::domain::unique_id();
335        let domain = crate::Domain::new(domain_id).unwrap();
336        let qos = crate::QoS::new();
337        let participant = Participant::new(&domain).unwrap();
338        let _ = Subscriber::new(&participant).unwrap();
339        let _ = Subscriber::builder(&participant)
340            .with_qos(&qos)
341            .build()
342            .unwrap();
343    }
344
345    #[test]
346    fn test_subscriber_create_with_invalid_participant() {
347        let domain_id = crate::tests::domain::unique_id();
348        let domain = crate::Domain::new(domain_id).unwrap();
349        let qos = crate::QoS::new();
350        let mut participant = Participant::new(&domain).unwrap();
351        let participant_id = participant.inner;
352        participant.inner = 0;
353        let result = Subscriber::new(&participant).unwrap_err();
354        assert_eq!(result, crate::Error::BadParameter);
355        let result = Subscriber::builder(&participant)
356            .with_qos(&qos)
357            .build()
358            .unwrap_err();
359        assert_eq!(result, crate::Error::BadParameter);
360        participant.inner = participant_id;
361    }
362
363    #[test]
364    fn test_subscriber_from_existing_subscriber() {
365        let domain_id = crate::tests::domain::unique_id();
366        let domain = crate::Domain::new(domain_id).unwrap();
367        let participant = crate::Participant::new(&domain).unwrap();
368        let subscriber = Subscriber::new(&participant).unwrap();
369
370        let new_subscriber = Subscriber::from_existing(subscriber.inner);
371
372        assert_eq!(new_subscriber.inner, subscriber.inner);
373    }
374
375    #[test]
376    fn test_subscriber_notify_readers_not_yet_supported_by_c_lib() {
377        let domain_id = crate::tests::domain::unique_id();
378        let domain = crate::Domain::new(domain_id).unwrap();
379        let participant = crate::Participant::new(&domain).unwrap();
380
381        let subscriber = Subscriber::new(&participant).unwrap();
382
383        let result = subscriber.notify_readers();
384        assert_eq!(
385            result,
386            Err(crate::Error::Unsupported),
387            "result was not unsupported (might be implemented now?)"
388        );
389    }
390
391    #[test]
392    fn test_subscriber_with_listener() {
393        let domain_id = crate::tests::domain::unique_id();
394        let domain = crate::Domain::new(domain_id).unwrap();
395        let participant = crate::Participant::new(&domain).unwrap();
396
397        let listener = crate::SubscriberListener::new().with_data_on_readers(|_| ());
398
399        let _ = Subscriber::new(&participant)
400            .unwrap()
401            .with_listener(listener)
402            .unwrap();
403
404        let _ = Subscriber::builder(&participant)
405            .with_listener(listener)
406            .build()
407            .unwrap();
408
409        let mut subscriber = Subscriber::new(&participant).unwrap();
410        subscriber.set_listener(listener).unwrap();
411        subscriber.unset_listener().unwrap();
412    }
413
414    #[test]
415    fn test_subscriber_with_listener_on_invalid_subscriber() {
416        let domain_id = crate::tests::domain::unique_id();
417        let domain = crate::Domain::new(domain_id).unwrap();
418        let participant = crate::Participant::new(&domain).unwrap();
419
420        let listener = crate::SubscriberListener::new().with_data_on_readers(|_| ());
421
422        let mut subscriber = Subscriber::new(&participant).unwrap();
423        let subscriber_id = subscriber.inner;
424        subscriber.inner = 0;
425        let result = subscriber.set_listener(listener).unwrap_err();
426        assert_eq!(result, crate::Error::BadParameter);
427        let result = subscriber.unset_listener().unwrap_err();
428        assert_eq!(result, crate::Error::BadParameter);
429        subscriber.inner = subscriber_id;
430    }
431}