Skip to main content

cyclonedds/
publisher.rs

1use crate::internal::ffi;
2use crate::internal::traits::AsFfi;
3use crate::{Participant, Result};
4
5/// A `Publisher` groups [`Writers`](crate::Writer) and controls their shared
6/// [`QoS`](crate::QoS). Writers created under a publisher inherit its
7/// [`QoS`](crate::QoS) where applicable.
8///
9/// Use [`Publisher::new`] for simple construction or [`Publisher::builder`] for
10/// [`QoS`](crate::QoS) and [`listener`](crate::listener::PublisherListener)
11/// configuration.
12///
13/// In most applications a publisher is created implicitly when constructing a
14/// [`Writer`](crate::Writer) directly. Use an explicit publisher when you need
15/// coordinated writes across multiple writers.
16#[derive(Debug)]
17pub struct Publisher<'domain, 'participant> {
18    pub(crate) inner: cyclonedds_sys::dds_entity_t,
19    phantom: std::marker::PhantomData<&'participant Participant<'domain>>,
20}
21
22/// Builder for [`Publisher`] (accessible via [`Publisher::builder`]).
23#[derive(Debug)]
24pub struct PublisherBuilder<'domain, 'participant, 'qos> {
25    participant: &'participant Participant<'domain>,
26    qos: Option<&'qos crate::QoS>,
27    listener: Option<crate::PublisherListener>,
28}
29
30impl<'d, 'p, 'q> PublisherBuilder<'d, 'p, 'q> {
31    /// Creates a new [`PublisherBuilder`] for the given [`Participant`].
32    ///
33    /// # Examples
34    ///
35    /// ```
36    /// use cyclonedds::builder::PublisherBuilder;
37    /// use cyclonedds::{Domain, Participant};
38    ///
39    /// let domain = Domain::default();
40    /// let participant = Participant::new(&domain)?;
41    /// let publisher_builder = PublisherBuilder::new(&participant);
42    /// # Ok::<_, cyclonedds::Error>(())
43    /// ```
44    #[must_use]
45    pub const fn new(participant: &'p Participant<'d>) -> Self {
46        Self {
47            participant,
48            qos: None,
49            listener: None,
50        }
51    }
52
53    /// Sets the [`QoS`](crate::QoS) for this publisher builder.
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// use cyclonedds::builder::PublisherBuilder;
59    /// use cyclonedds::qos::policy;
60    /// use cyclonedds::{Duration, QoS};
61    /// # use cyclonedds::{Domain, Participant};
62    /// # let domain = Domain::default();
63    /// # let participant = Participant::new(&domain)?;
64    ///
65    /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
66    ///     max_blocking_time: Duration::from_millis(100),
67    /// });
68    /// let publisher_builder = PublisherBuilder::new(&participant).with_qos(&qos);
69    /// # Ok::<_, cyclonedds::Error>(())
70    /// ```
71    #[must_use]
72    pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
73        self.qos = Some(qos);
74        self
75    }
76
77    /// Sets the [`Listener`](crate::Listener) on this publisher builder.
78    ///
79    /// # Examples
80    ///
81    /// ```
82    /// use cyclonedds::Listener;
83    /// use cyclonedds::builder::PublisherBuilder;
84    /// # use cyclonedds::{Domain, Participant};
85    /// # let domain = Domain::default();
86    /// # let participant = Participant::new(&domain)?;
87    ///
88    /// let publisher_builder = PublisherBuilder::new(&participant).with_listener(Listener::new());
89    /// # Ok::<_, cyclonedds::Error>(())
90    /// ```
91    #[must_use]
92    pub fn with_listener<L>(mut self, listener: L) -> Self
93    where
94        L: AsRef<crate::PublisherListener>,
95    {
96        self.listener = Some(*listener.as_ref());
97        self
98    }
99
100    /// Builds the [`Publisher`].
101    ///
102    /// # Errors
103    ///
104    /// Returns an [`Error`](crate::Error) if the publisher failed to create.
105    ///
106    /// # Examples
107    ///
108    /// ```
109    /// use cyclonedds::QoS;
110    /// use cyclonedds::builder::PublisherBuilder;
111    /// use cyclonedds::qos::policy;
112    /// # use cyclonedds::{Domain, Participant};
113    /// # let domain = Domain::default();
114    /// # let participant = Participant::new(&domain)?;
115    ///
116    /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
117    /// let publisher = PublisherBuilder::new(&participant).with_qos(&qos).build()?;
118    /// # Ok::<_, cyclonedds::Error>(())
119    /// ```
120    pub fn build(self) -> Result<Publisher<'d, 'p>> {
121        // NOTE: using `and_then` to avoid ? branch on the listener for coverage
122        // since the C lib currently panics on OOM rather than returning null.
123        self.listener
124            .map(|listener| listener.as_ffi())
125            .transpose()
126            .and_then(|listener| {
127                Ok(Publisher {
128                    inner: ffi::dds_create_publisher(
129                        self.participant.inner,
130                        self.qos.map(|qos| &qos.inner),
131                        listener.as_ref(),
132                    )?,
133                    phantom: std::marker::PhantomData,
134                })
135            })
136    }
137}
138
139impl<'d, 'p> Publisher<'d, 'p> {
140    /// Creates a new `Publisher` under `participant` with default
141    /// [`QoS`](crate::QoS) and no
142    /// [`listener`](crate::listener::PublisherListener).
143    ///
144    /// # Errors
145    ///
146    /// Returns an [`Error`](crate::Error) if the publisher fails to create.
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// use cyclonedds::Publisher;
152    /// # use cyclonedds::{Domain, Participant};
153    /// # let domain = Domain::default();
154    /// # let participant = Participant::new(&domain)?;
155    ///
156    /// let publisher = Publisher::new(&participant)?;
157    /// Ok::<_, cyclonedds::Error>(())
158    /// ```
159    pub fn new(participant: &'p Participant<'d>) -> Result<Self> {
160        Self::builder(participant).build()
161    }
162
163    /// Returns a [`PublisherBuilder`](crate::builder::PublisherBuilder) for
164    /// constructing a publisher with custom [`QoS`](crate::QoS) or a
165    /// [`listener`](crate::listener::PublisherListener).
166    ///
167    /// # Examples
168    ///
169    /// ```
170    /// use cyclonedds::{
171    ///     Publisher, QoS,
172    ///     qos::policy::{Durability, Presentation},
173    /// };
174    /// # use cyclonedds::{Domain, Participant};
175    /// # let domain = Domain::default();
176    /// # let participant = Participant::new(&domain)?;
177    ///
178    /// let qos = QoS::new().with_presentation(Presentation::Topic {
179    ///     coherent_access: true,
180    ///     ordered_access: true,
181    /// });
182    /// let publisher = Publisher::builder(&participant).with_qos(&qos).build()?;
183    /// Ok::<_, cyclonedds::Error>(())
184    /// ```
185    #[must_use]
186    pub const fn builder<'q>(participant: &'p Participant<'d>) -> PublisherBuilder<'d, 'p, 'q> {
187        PublisherBuilder::new(participant)
188    }
189
190    /// (WARN: unimplemented in C lib): Suspends publication on all writers
191    /// belonging to this publisher.
192    ///
193    /// <div class="warning">
194    ///
195    /// This function is currently not implemented by the underlying C library
196    /// and will thus always return an unsupported error.
197    ///
198    /// </div>
199    ///
200    /// While suspended, calls to [`Writer::write`](crate::Writer::write) may
201    /// be batched by the middleware. Call [`resume`](Publisher::resume) to
202    /// flush and resume normal publication. Suspend and resume are typically
203    /// used together to send a coherent set of updates.
204    ///
205    /// # Errors
206    ///
207    /// Returns an [`Error`](crate::Error) if publisher fails to suspend.
208    ///
209    /// # Examples
210    ///
211    /// ```no_run
212    /// use cyclonedds::{Topic, Writer};
213    /// # use cyclonedds::{Domain, Participant, Publisher};
214    /// # let domain = Domain::default();
215    /// # let participant = Participant::new(&domain)?;
216    /// # #[derive(
217    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
218    /// # )]
219    /// # struct Data {
220    /// #     x: i32,
221    /// #     y: i32,
222    /// # }
223    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
224    ///
225    /// // Create the publisher.
226    /// let publisher = Publisher::new(&participant)?;
227    ///
228    /// // Create two Writers under the publisher.
229    /// let writer01 = Writer::builder(&topic).with_publisher(&publisher).build()?;
230    /// let writer02 = Writer::builder(&topic).with_publisher(&publisher).build()?;
231    ///
232    /// // Suspend all the writers.
233    /// publisher.suspend()?;
234    ///
235    /// writer01.write(&Data { x: 0, y: 1 })?;
236    /// writer02.write(&Data { x: 2, y: 3 })?;
237    ///
238    /// // Resume all the writers.
239    /// publisher.resume()?;
240    ///
241    /// Ok::<_, cyclonedds::Error>(())
242    /// ```
243    pub fn suspend(&self) -> Result<()> {
244        ffi::dds_suspend(self.inner)
245    }
246
247    /// (WARN: unimplemented in C lib): Resumes publication on all writers
248    /// belonging to this publisher.
249    ///
250    /// <div class="warning">
251    ///
252    /// This function is currently not implemented by the underlying C library
253    /// and will thus always return an unsupported error.
254    ///
255    /// </div>
256    ///
257    /// Flushes any writes that were batched during a
258    /// [`suspend`](Publisher::suspend) and resumes normal publication.
259    ///
260    /// # Errors
261    ///
262    /// Returns an [`Error`](crate::Error) if the publisher fails to resume.
263    ///
264    /// # Examples
265    ///
266    /// ```no_run
267    /// use cyclonedds::{Topic, Writer};
268    /// # use cyclonedds::{Domain, Participant, Publisher};
269    /// # let domain = Domain::default();
270    /// # let participant = Participant::new(&domain)?;
271    /// # #[derive(
272    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
273    /// # )]
274    /// # struct Data {
275    /// #     x: i32,
276    /// #     y: i32,
277    /// # }
278    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
279    ///
280    /// // Create the publisher.
281    /// let publisher = Publisher::new(&participant)?;
282    ///
283    /// // Create two Writers under the publisher.
284    /// let writer01 = Writer::builder(&topic).with_publisher(&publisher).build()?;
285    /// let writer02 = Writer::builder(&topic).with_publisher(&publisher).build()?;
286    ///
287    /// // Suspend all the writers.
288    /// publisher.suspend()?;
289    ///
290    /// writer01.write(&Data { x: 0, y: 1 })?;
291    /// writer02.write(&Data { x: 2, y: 3 })?;
292    ///
293    /// // Resume all the writers.
294    /// publisher.resume()?;
295    ///
296    /// Ok::<_, cyclonedds::Error>(())
297    /// ```
298    pub fn resume(&self) -> Result<()> {
299        ffi::dds_resume(self.inner)
300    }
301
302    /// (WARN: unimplemented in C lib): Blocks until all samples written by
303    /// writers under this publisher have been acknowledged by all matched
304    /// reliable readers, or until `timeout` elapses.
305    ///
306    /// <div class="warning">
307    ///
308    /// This function is currently not implemented by the underlying C library
309    /// and will thus always return an unsupported error.
310    ///
311    /// </div>
312    ///
313    ///
314    /// # Errors
315    ///
316    /// Returns an [`Error`](crate::Error) if the timeout elapses before all
317    /// acknowledgements are received or if the publisher returns an error.
318    ///
319    /// # Examples
320    ///
321    /// ```no_run
322    /// use cyclonedds::Duration;
323    /// # use cyclonedds::{Domain, Participant, Publisher};
324    /// # let domain = Domain::default();
325    /// # let participant = Participant::new(&domain)?;
326    ///
327    /// let publisher = Publisher::new(&participant)?;
328    /// publisher.wait_for_acks(Duration::from_secs(1))?;
329    /// Ok::<_, cyclonedds::Error>(())
330    /// ```
331    pub fn wait_for_acks(&self, timeout: crate::Duration) -> Result<()> {
332        ffi::dds_wait_for_acks(self.inner, timeout.inner)
333    }
334
335    #[allow(unused)]
336    pub(crate) const fn from_existing(
337        inner: cyclonedds_sys::dds_entity_t,
338    ) -> std::mem::ManuallyDrop<Self> {
339        std::mem::ManuallyDrop::new(Self {
340            inner,
341            phantom: std::marker::PhantomData,
342        })
343    }
344
345    /// Sets the [`PublisherListener`](crate::PublisherListener) on this
346    /// publisher, replacing any previously set listener.
347    ///
348    /// # Errors
349    ///
350    /// Returns an [`Error`](crate::Error) if the publisher fails to set the
351    /// listener.
352    ///
353    /// # Examples
354    ///
355    /// ```
356    /// use cyclonedds::PublisherListener;
357    /// # use cyclonedds::{Domain, Participant, Publisher};
358    /// # let domain = Domain::default();
359    /// # let participant = Participant::new(&domain)?;
360    ///
361    /// let mut publisher = Publisher::new(&participant)?;
362    /// publisher.set_listener(PublisherListener::new())?;
363    /// # Ok::<_, cyclonedds::Error>(())
364    /// ```
365    pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
366    where
367        L: AsRef<crate::PublisherListener>,
368    {
369        listener
370            .as_ref()
371            .as_ffi()
372            .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
373    }
374
375    /// Removes the listener from this publisher.
376    ///
377    /// # Errors
378    ///
379    /// Returns an [`Error`](crate::Error) if the publisher fails to unset the
380    /// listener.
381    ///
382    /// # Examples
383    ///
384    /// ```
385    /// # use cyclonedds::{Domain, Participant, Publisher};
386    /// # let domain = Domain::default();
387    /// # let participant = Participant::new(&domain)?;
388    /// let mut publisher = Publisher::new(&participant)?;
389    /// publisher.unset_listener()?;
390    /// # Ok::<_, cyclonedds::Error>(())
391    /// ```
392    pub fn unset_listener(&mut self) -> Result<()> {
393        ffi::dds_set_listener(self.inner, None)?;
394        Ok(())
395    }
396
397    /// Sets the [`PublisherListener`](crate::PublisherListener) on this
398    /// publisher, consuming and returning `self`.
399    ///
400    /// # Errors
401    ///
402    /// Returns an [`Error`](crate::Error) if the publisher fails to set the
403    /// listener.
404    ///
405    /// # Examples
406    ///
407    /// ```
408    /// use cyclonedds::PublisherListener;
409    /// # use cyclonedds::{Domain, Participant, Publisher};
410    /// # let domain = Domain::default();
411    /// # let participant = Participant::new(&domain)?;
412    ///
413    /// let publisher = Publisher::new(&participant)?.with_listener(PublisherListener::new())?;
414    /// # Ok::<_, cyclonedds::Error>(())
415    /// ```
416    pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
417    where
418        L: AsRef<crate::PublisherListener>,
419    {
420        self.set_listener(listener).map(|()| self)
421    }
422}
423
424impl Drop for Publisher<'_, '_> {
425    fn drop(&mut self) {
426        let result = ffi::dds_delete(self.inner);
427        debug_assert!(
428            result.is_ok(),
429            "unable to delete {self:?}: failed with {result:?}"
430        );
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    #[test]
439    fn test_publisher_create() {
440        let domain_id = crate::tests::domain::unique_id();
441        let domain = crate::Domain::new(domain_id).unwrap();
442        let qos = crate::QoS::new();
443        let participant = Participant::new(&domain).unwrap();
444        let _ = Publisher::new(&participant).unwrap();
445        let _ = Publisher::builder(&participant)
446            .with_qos(&qos)
447            .build()
448            .unwrap();
449    }
450
451    #[test]
452    fn test_publisher_create_with_invalid_participant() {
453        let domain_id = crate::tests::domain::unique_id();
454        let domain = crate::Domain::new(domain_id).unwrap();
455        let qos = crate::QoS::new();
456        let mut participant = Participant::new(&domain).unwrap();
457        let participant_id = participant.inner;
458        participant.inner = 0;
459        let result = Publisher::new(&participant).unwrap_err();
460        assert_eq!(result, crate::Error::BadParameter);
461        let result = Publisher::builder(&participant)
462            .with_qos(&qos)
463            .build()
464            .unwrap_err();
465        assert_eq!(result, crate::Error::BadParameter);
466        participant.inner = participant_id;
467    }
468
469    #[test]
470    fn test_publisher_from_existing_publisher() {
471        let domain_id = crate::tests::domain::unique_id();
472        let domain = crate::Domain::new(domain_id).unwrap();
473        let participant = crate::Participant::new(&domain).unwrap();
474        let publisher = Publisher::new(&participant).unwrap();
475
476        let new_publisher = Publisher::from_existing(publisher.inner);
477
478        assert_eq!(new_publisher.inner, publisher.inner);
479    }
480
481    #[test]
482    fn test_publisher_suspend_not_yet_supported_by_c_lib() {
483        let domain_id = crate::tests::domain::unique_id();
484        let domain = crate::Domain::new(domain_id).unwrap();
485        let participant = crate::Participant::new(&domain).unwrap();
486        let publisher = Publisher::new(&participant).unwrap();
487
488        let result = publisher.suspend();
489        assert_eq!(
490            result,
491            Err(crate::Error::Unsupported),
492            "result was not unsupported (might be implemented now?)"
493        );
494    }
495
496    #[test]
497    fn test_publisher_resume_not_yet_supported_by_c_lib() {
498        let domain_id = crate::tests::domain::unique_id();
499        let domain = crate::Domain::new(domain_id).unwrap();
500        let participant = crate::Participant::new(&domain).unwrap();
501        let publisher = Publisher::new(&participant).unwrap();
502
503        let result = publisher.resume();
504        assert_eq!(
505            result,
506            Err(crate::Error::Unsupported),
507            "result was not unsupported (might be implemented now?)"
508        );
509    }
510
511    #[test]
512    fn test_publisher_wait_for_acks_not_yet_supported_by_c_lib() {
513        let domain_id = crate::tests::domain::unique_id();
514        let domain = crate::Domain::new(domain_id).unwrap();
515        let participant = crate::Participant::new(&domain).unwrap();
516        let publisher = Publisher::new(&participant).unwrap();
517
518        let result =
519            publisher.wait_for_acks(std::time::Duration::from_millis(10).try_into().unwrap());
520        assert_eq!(
521            result,
522            Err(crate::Error::Unsupported),
523            "result was not unsupported (might be implemented now?)"
524        );
525    }
526
527    #[test]
528    fn test_publisher_with_listener() {
529        let domain_id = crate::tests::domain::unique_id();
530        let domain = crate::Domain::new(domain_id).unwrap();
531        let participant = crate::Participant::new(&domain).unwrap();
532
533        let listener = crate::PublisherListener::new();
534
535        let _ = Publisher::new(&participant)
536            .unwrap()
537            .with_listener(listener)
538            .unwrap();
539        let _ = Publisher::builder(&participant)
540            .with_listener(listener)
541            .build()
542            .unwrap();
543
544        let mut publisher = Publisher::new(&participant).unwrap();
545        publisher.set_listener(listener).unwrap();
546        publisher.unset_listener().unwrap();
547    }
548
549    #[test]
550    fn test_publisher_with_listener_on_invalid_publisher() {
551        let domain_id = crate::tests::domain::unique_id();
552        let domain = crate::Domain::new(domain_id).unwrap();
553        let participant = crate::Participant::new(&domain).unwrap();
554
555        let listener = crate::PublisherListener::new();
556
557        let mut publisher = Publisher::new(&participant).unwrap();
558        let publisher_id = publisher.inner;
559        publisher.inner = 0;
560        let result = publisher.set_listener(listener).unwrap_err();
561        assert_eq!(result, crate::Error::BadParameter);
562        let result = publisher.unset_listener().unwrap_err();
563        assert_eq!(result, crate::Error::BadParameter);
564        publisher.inner = publisher_id;
565    }
566}