cyclonedds/subscriber.rs
1use crate::internal::ffi;
2use crate::internal::traits::AsFfi;
3use crate::{Participant, Result};
4
5/// A `Subscriber` groups [`Readers`](crate::Reader) and controls their shared
6/// [`QoS`](crate::QoS). Readers created under a subscriber inherit its
7/// [`QoS`](crate::QoS) where applicable.
8///
9/// Use [`Subscriber::new`] for simple construction or [`Subscriber::builder`]
10/// for [`QoS`](crate::QoS) and
11/// [`listener`](crate::listener::SubscriberListener) configuration.
12///
13/// In most applications a subscriber is created implicitly when constructing a
14/// [`Reader`](crate::Reader) directly. Use an explicit subscriber when you need
15/// coordinated reads across multiple readers.
16#[derive(Debug)]
17pub struct Subscriber<'domain, 'participant> {
18 pub(crate) inner: cyclonedds_sys::dds_entity_t,
19 phantom: std::marker::PhantomData<&'participant Participant<'domain>>,
20}
21
22/// Builder for [`Subscriber`] (accessible via [`Subscriber::builder`]).
23#[derive(Debug)]
24pub struct SubscriberBuilder<'domain, 'participant, 'qos> {
25 participant: &'participant Participant<'domain>,
26 qos: Option<&'qos crate::QoS>,
27 listener: Option<crate::SubscriberListener>,
28}
29
30impl<'d, 'p, 'q> SubscriberBuilder<'d, 'p, 'q> {
31 /// Creates a new [`SubscriberBuilder`] for the given [`Participant`].
32 ///
33 /// # Examples
34 ///
35 /// ```
36 /// use cyclonedds::builder::SubscriberBuilder;
37 /// use cyclonedds::{Domain, Participant};
38 ///
39 /// let domain = Domain::default();
40 /// let participant = Participant::new(&domain)?;
41 /// let subscriber_builder = SubscriberBuilder::new(&participant);
42 /// # Ok::<_, cyclonedds::Error>(())
43 /// ```
44 #[must_use]
45 pub const fn new(participant: &'p Participant<'d>) -> Self {
46 Self {
47 participant,
48 qos: None,
49 listener: None,
50 }
51 }
52
53 /// Sets the [`QoS`](crate::QoS) for this subscriber builder.
54 ///
55 /// # Examples
56 ///
57 /// ```
58 /// use cyclonedds::builder::SubscriberBuilder;
59 /// use cyclonedds::qos::policy;
60 /// use cyclonedds::{Duration, QoS};
61 /// # use cyclonedds::{Domain, Participant};
62 /// # let domain = Domain::default();
63 /// # let participant = Participant::new(&domain)?;
64 ///
65 /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
66 /// max_blocking_time: Duration::from_millis(100),
67 /// });
68 /// let subscriber_builder = SubscriberBuilder::new(&participant).with_qos(&qos);
69 /// # Ok::<_, cyclonedds::Error>(())
70 /// ```
71 #[must_use]
72 pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
73 self.qos = Some(qos);
74 self
75 }
76
77 ///
78 /// Sets the [`Listener`](crate::Listener) on this subscriber builder.
79 ///
80 /// # Examples
81 ///
82 /// ```
83 /// use cyclonedds::Listener;
84 /// use cyclonedds::builder::SubscriberBuilder;
85 /// # use cyclonedds::{Domain, Participant};
86 /// # let domain = Domain::default();
87 /// # let participant = Participant::new(&domain)?;
88 ///
89 /// let subscriber_builder = SubscriberBuilder::new(&participant).with_listener(Listener::new());
90 /// # Ok::<_, cyclonedds::Error>(())
91 /// ```
92 #[must_use]
93 pub fn with_listener<L>(mut self, listener: L) -> Self
94 where
95 L: AsRef<crate::SubscriberListener>,
96 {
97 self.listener = Some(*listener.as_ref());
98 self
99 }
100
101 /// Builds the [`Subscriber`].
102 ///
103 /// # Errors
104 ///
105 /// Returns an [`Error`](crate::Error) if the subscriber failed to create.
106 ///
107 /// # Examples
108 ///
109 /// ```
110 /// use cyclonedds::QoS;
111 /// use cyclonedds::builder::SubscriberBuilder;
112 /// use cyclonedds::qos::policy;
113 /// # use cyclonedds::{Domain, Participant};
114 /// # let domain = Domain::default();
115 /// # let participant = Participant::new(&domain)?;
116 ///
117 /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
118 /// let subscriber = SubscriberBuilder::new(&participant)
119 /// .with_qos(&qos)
120 /// .build()?;
121 /// # Ok::<_, cyclonedds::Error>(())
122 /// ```
123 pub fn build(self) -> Result<Subscriber<'d, 'p>> {
124 // NOTE: using `and_then` to avoid ? branch on the listener for coverage
125 // since the C lib currently panics on OOM rather than returning null.
126 self.listener
127 .map(|listener| listener.as_ffi())
128 .transpose()
129 .and_then(|listener| {
130 Ok(Subscriber {
131 inner: ffi::dds_create_subscriber(
132 self.participant.inner,
133 self.qos.map(|qos| &qos.inner),
134 listener.as_ref(),
135 )?,
136 phantom: std::marker::PhantomData,
137 })
138 })
139 }
140}
141
142impl<'d, 'p> Subscriber<'d, 'p> {
143 /// Creates a new `Subscriber` under `participant` with default
144 /// [`QoS`](crate::QoS) and no
145 /// [`listener`](crate::listener::SubscriberListener).
146 ///
147 /// # Errors
148 ///
149 /// Returns an [`Error`](crate::Error) if the subscriber fails to create.
150 ///
151 /// # Examples
152 ///
153 /// ```
154 /// use cyclonedds::Subscriber;
155 /// # use cyclonedds::{Domain, Participant};
156 /// # let domain = Domain::default();
157 /// # let participant = Participant::new(&domain)?;
158 ///
159 /// let subscriber = Subscriber::new(&participant)?;
160 /// Ok::<_, cyclonedds::Error>(())
161 /// ```
162 pub fn new(participant: &'p Participant<'d>) -> Result<Self> {
163 Self::builder(participant).build()
164 }
165
166 /// Returns a [`SubscriberBuilder`](crate::builder::SubscriberBuilder) for
167 /// constructing a subscriber with custom [`QoS`](crate::QoS) or a
168 /// [`listener`](crate::listener::SubscriberListener).
169 ///
170 /// # Examples
171 ///
172 /// ```
173 /// use cyclonedds::{
174 /// QoS, Subscriber,
175 /// qos::policy::{Durability, Presentation},
176 /// };
177 /// # use cyclonedds::{Domain, Participant};
178 /// # let domain = Domain::default();
179 /// # let participant = Participant::new(&domain)?;
180 ///
181 /// let qos = QoS::new().with_presentation(Presentation::Topic {
182 /// coherent_access: true,
183 /// ordered_access: true,
184 /// });
185 /// let subscriber = Subscriber::builder(&participant).with_qos(&qos).build()?;
186 /// Ok::<_, cyclonedds::Error>(())
187 /// ```
188 #[must_use]
189 pub const fn builder<'q>(participant: &'p Participant<'d>) -> SubscriberBuilder<'d, 'p, 'q> {
190 SubscriberBuilder::new(participant)
191 }
192
193 /// (WARN: unimplemented in C lib): Notifies all readers belonging to this
194 /// subscriber that data is available.
195 ///
196 /// <div class="warning">
197 ///
198 /// This function is currently not implemented by the underlying C library
199 /// and will thus always return an unsupported error.
200 ///
201 /// </div>
202 ///
203 /// Triggers the
204 /// [`DataOnReaders`](crate::listener::SubscriberListener::with_data_on_readers)
205 /// callback on the subscriber's listener and the
206 /// [`DataAvailable`](crate::listener::ReaderListener::with_data_available)
207 /// callback on each reader's listener.
208 ///
209 /// # Errors
210 ///
211 /// Returns an [`Error`](crate::Error) if the subscriber fails to notify the
212 /// readers.
213 ///
214 /// # Examples
215 ///
216 /// ```no_run
217 /// use cyclonedds::Subscriber;
218 /// # use cyclonedds::{Domain, Participant};
219 /// # let domain = Domain::default();
220 /// # let participant = Participant::new(&domain)?;
221 ///
222 /// let subscriber = Subscriber::new(&participant)?;
223 /// subscriber.notify_readers()?;
224 /// # Ok::<_, cyclonedds::Error>(())
225 /// ```
226 pub fn notify_readers(&self) -> Result<()> {
227 ffi::dds_notify_readers(self.inner)
228 }
229
230 pub(crate) const fn from_existing(
231 inner: cyclonedds_sys::dds_entity_t,
232 ) -> std::mem::ManuallyDrop<Self> {
233 std::mem::ManuallyDrop::new(Self {
234 inner,
235 phantom: std::marker::PhantomData,
236 })
237 }
238
239 /// Sets the [`SubscriberListener`](crate::SubscriberListener) on this
240 /// subscriber, replacing any previously set listener.
241 ///
242 /// # Errors
243 ///
244 /// Returns an [`Error`](crate::Error) if the subscriber fails to set the
245 /// listener.
246 ///
247 /// # Examples
248 ///
249 /// ```
250 /// use cyclonedds::SubscriberListener;
251 /// # use cyclonedds::{Domain, Participant, Subscriber};
252 /// # let domain = Domain::default();
253 /// # let participant = Participant::new(&domain)?;
254 ///
255 /// let mut subscriber = Subscriber::new(&participant)?;
256 /// subscriber.set_listener(SubscriberListener::new())?;
257 /// # Ok::<_, cyclonedds::Error>(())
258 /// ```
259 pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
260 where
261 L: AsRef<crate::SubscriberListener>,
262 {
263 listener
264 .as_ref()
265 .as_ffi()
266 .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
267 }
268
269 /// Removes the listener from this subscriber.
270 ///
271 /// # Errors
272 ///
273 /// Returns an [`Error`](crate::Error) if the subscriber fails to unset the
274 /// listener.
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// # use cyclonedds::{Domain, Participant, Subscriber};
280 /// # let domain = Domain::default();
281 /// # let participant = Participant::new(&domain)?;
282 /// let mut subscriber = Subscriber::new(&participant)?;
283 /// subscriber.unset_listener()?;
284 /// # Ok::<_, cyclonedds::Error>(())
285 /// ```
286 pub fn unset_listener(&mut self) -> Result<()> {
287 ffi::dds_set_listener(self.inner, None)?;
288 Ok(())
289 }
290
291 /// Sets the [`SubscriberListener`](crate::SubscriberListener) on this
292 /// subscriber, consuming and returning `self`.
293 ///
294 /// # Errors
295 ///
296 /// Returns an [`Error`](crate::Error) if the subscriber fails to set the
297 /// listener.
298 ///
299 /// # Examples
300 ///
301 /// ```
302 /// use cyclonedds::SubscriberListener;
303 /// # use cyclonedds::{Domain, Participant, Subscriber};
304 /// # let domain = Domain::default();
305 /// # let participant = Participant::new(&domain)?;
306 ///
307 /// let subscriber = Subscriber::new(&participant)?.with_listener(SubscriberListener::new())?;
308 /// # Ok::<_, cyclonedds::Error>(())
309 /// ```
310 pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
311 where
312 L: AsRef<crate::SubscriberListener>,
313 {
314 self.set_listener(listener).map(|_err| self)
315 }
316}
317
318impl Drop for Subscriber<'_, '_> {
319 fn drop(&mut self) {
320 let result = ffi::dds_delete(self.inner);
321 debug_assert!(
322 result.is_ok(),
323 "unable to delete {self:?}: failed with {result:?}"
324 );
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_subscriber_create() {
334 let domain_id = crate::tests::domain::unique_id();
335 let domain = crate::Domain::new(domain_id).unwrap();
336 let qos = crate::QoS::new();
337 let participant = Participant::new(&domain).unwrap();
338 let _ = Subscriber::new(&participant).unwrap();
339 let _ = Subscriber::builder(&participant)
340 .with_qos(&qos)
341 .build()
342 .unwrap();
343 }
344
345 #[test]
346 fn test_subscriber_create_with_invalid_participant() {
347 let domain_id = crate::tests::domain::unique_id();
348 let domain = crate::Domain::new(domain_id).unwrap();
349 let qos = crate::QoS::new();
350 let mut participant = Participant::new(&domain).unwrap();
351 let participant_id = participant.inner;
352 participant.inner = 0;
353 let result = Subscriber::new(&participant).unwrap_err();
354 assert_eq!(result, crate::Error::BadParameter);
355 let result = Subscriber::builder(&participant)
356 .with_qos(&qos)
357 .build()
358 .unwrap_err();
359 assert_eq!(result, crate::Error::BadParameter);
360 participant.inner = participant_id;
361 }
362
363 #[test]
364 fn test_subscriber_from_existing_subscriber() {
365 let domain_id = crate::tests::domain::unique_id();
366 let domain = crate::Domain::new(domain_id).unwrap();
367 let participant = crate::Participant::new(&domain).unwrap();
368 let subscriber = Subscriber::new(&participant).unwrap();
369
370 let new_subscriber = Subscriber::from_existing(subscriber.inner);
371
372 assert_eq!(new_subscriber.inner, subscriber.inner);
373 }
374
375 #[test]
376 fn test_subscriber_notify_readers_not_yet_supported_by_c_lib() {
377 let domain_id = crate::tests::domain::unique_id();
378 let domain = crate::Domain::new(domain_id).unwrap();
379 let participant = crate::Participant::new(&domain).unwrap();
380
381 let subscriber = Subscriber::new(&participant).unwrap();
382
383 let result = subscriber.notify_readers();
384 assert_eq!(
385 result,
386 Err(crate::Error::Unsupported),
387 "result was not unsupported (might be implemented now?)"
388 );
389 }
390
391 #[test]
392 fn test_subscriber_with_listener() {
393 let domain_id = crate::tests::domain::unique_id();
394 let domain = crate::Domain::new(domain_id).unwrap();
395 let participant = crate::Participant::new(&domain).unwrap();
396
397 let listener = crate::SubscriberListener::new().with_data_on_readers(|_| ());
398
399 let _ = Subscriber::new(&participant)
400 .unwrap()
401 .with_listener(listener)
402 .unwrap();
403
404 let _ = Subscriber::builder(&participant)
405 .with_listener(listener)
406 .build()
407 .unwrap();
408
409 let mut subscriber = Subscriber::new(&participant).unwrap();
410 subscriber.set_listener(listener).unwrap();
411 subscriber.unset_listener().unwrap();
412 }
413
414 #[test]
415 fn test_subscriber_with_listener_on_invalid_subscriber() {
416 let domain_id = crate::tests::domain::unique_id();
417 let domain = crate::Domain::new(domain_id).unwrap();
418 let participant = crate::Participant::new(&domain).unwrap();
419
420 let listener = crate::SubscriberListener::new().with_data_on_readers(|_| ());
421
422 let mut subscriber = Subscriber::new(&participant).unwrap();
423 let subscriber_id = subscriber.inner;
424 subscriber.inner = 0;
425 let result = subscriber.set_listener(listener).unwrap_err();
426 assert_eq!(result, crate::Error::BadParameter);
427 let result = subscriber.unset_listener().unwrap_err();
428 assert_eq!(result, crate::Error::BadParameter);
429 subscriber.inner = subscriber_id;
430 }
431}