Skip to main content

cyclonedds/
topic.rs

1use crate::internal::ffi;
2use crate::internal::sertype::Sertype;
3use crate::internal::traits::AsFfi;
4use crate::{Participant, Result};
5
6/// A typed communication channel.
7///
8/// A `Topic` binds a name to a data type [`T`](crate::Topicable) within a
9/// [`Participant`](crate::Participant). [`Writers`](crate::Writer) and
10/// [`Readers`](crate::Reader) are created against a topic and only match each
11/// other when they share the same topic name and compatible type and
12/// [`QoS`](crate::QoS).
13///
14/// Use [`Topic::new`] for simple construction or [`Topic::builder`] for
15/// [`QoS`](crate::QoS) and [`listener`](crate::listener::TopicListener)
16/// configuration.
17#[derive(Debug)]
18pub struct Topic<'domain, 'participant, T>
19where
20    T: crate::Topicable,
21{
22    pub(crate) inner: cyclonedds_sys::dds_entity_t,
23    phantom_type: std::marker::PhantomData<T>,
24    phantom_participant: std::marker::PhantomData<&'participant Participant<'domain>>,
25}
26
27/// Builder for [`Topic<T>`] (accessible via [`Topic::builder`]).
28#[derive(Debug)]
29pub struct TopicBuilder<'domain, 'participant, 'qos, 'name, T>
30where
31    T: crate::Topicable,
32{
33    participant: &'participant Participant<'domain>,
34    topic_name: &'name str,
35    qos: Option<&'qos crate::QoS>,
36    listener: Option<crate::TopicListener<T>>,
37}
38
39impl<'d, 'p, 'q, 'n, T> TopicBuilder<'d, 'p, 'q, 'n, T>
40where
41    T: crate::Topicable,
42{
43    /// Creates a new [`TopicBuilder`] for the given [`Participant`].
44    ///
45    /// # Examples
46    ///
47    /// ```
48    /// use cyclonedds::builder::TopicBuilder;
49    /// use cyclonedds::{Domain, Participant};
50    /// # #[derive(
51    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
52    /// # )]
53    /// # struct Data {
54    /// #     x: i32,
55    /// # }
56    ///
57    /// let domain = Domain::default();
58    /// let participant = Participant::new(&domain)?;
59    /// let topic_builder = TopicBuilder::<Data>::new(&participant, "MyTopic");
60    /// # Ok::<_, cyclonedds::Error>(())
61    /// ```
62    #[must_use]
63    pub const fn new(participant: &'p Participant<'d>, topic_name: &'n str) -> Self {
64        Self {
65            participant,
66            topic_name,
67            qos: None,
68            listener: None,
69        }
70    }
71
72    /// Sets the [`QoS`](crate::QoS) for this topic builder.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// use cyclonedds::builder::TopicBuilder;
78    /// use cyclonedds::qos::policy;
79    /// use cyclonedds::{Duration, QoS};
80    /// # use cyclonedds::{Domain, Participant};
81    /// # let domain = Domain::default();
82    /// # let participant = Participant::new(&domain)?;
83    /// # #[derive(
84    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
85    /// # )]
86    /// # struct Data {
87    /// #     x: i32,
88    /// # }
89    ///
90    /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
91    ///     max_blocking_time: Duration::from_millis(100),
92    /// });
93    /// let topic_builder = TopicBuilder::<Data>::new(&participant, "MyTopic").with_qos(&qos);
94    /// # Ok::<_, cyclonedds::Error>(())
95    /// ```
96    #[must_use]
97    pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
98        self.qos = Some(qos);
99        self
100    }
101
102    /// Sets the [`Listener`](crate::Listener) on this topic builder.
103    ///
104    /// # Examples
105    ///
106    /// ```
107    /// use cyclonedds::TopicListener;
108    /// use cyclonedds::builder::TopicBuilder;
109    /// # use cyclonedds::{Domain, Participant};
110    /// # let domain = Domain::default();
111    /// # let participant = Participant::new(&domain)?;
112    /// # #[derive(
113    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
114    /// # )]
115    /// # struct Data {
116    /// #     x: i32,
117    /// # }
118    ///
119    /// let participant_builder =
120    ///     TopicBuilder::<Data>::new(&participant, "MyTopic").with_listener(TopicListener::new());
121    /// # Ok::<_, cyclonedds::Error>(())
122    /// ```
123    #[must_use]
124    pub fn with_listener<L>(mut self, listener: L) -> Self
125    where
126        L: AsRef<crate::TopicListener<T>>,
127    {
128        self.listener = Some(listener.as_ref().clone());
129        self
130    }
131
132    /// Builds the [`Topic`].
133    ///
134    /// # Errors
135    ///
136    /// Returns an [`Error`](crate::Error) if the topic failed to create.
137    ///
138    /// # Examples
139    ///
140    /// ```
141    /// use cyclonedds::QoS;
142    /// use cyclonedds::builder::TopicBuilder;
143    /// use cyclonedds::qos::policy;
144    /// # use cyclonedds::{Domain, Participant};
145    /// # let domain = Domain::default();
146    /// # let participant = Participant::new(&domain)?;
147    /// # #[derive(
148    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
149    /// # )]
150    /// # struct Data {
151    /// #     x: i32,
152    /// # }
153    ///
154    /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
155    /// let topic = TopicBuilder::<Data>::new(&participant, "MyTopic")
156    ///     .with_qos(&qos)
157    ///     .build()?;
158    /// # Ok::<_, cyclonedds::Error>(())
159    /// ```
160    pub fn build(self) -> Result<Topic<'d, 'p, T>> {
161        let name = std::ffi::CString::new(self.topic_name)
162            .map_err(|_err| crate::error::Error::BadParameter)?;
163        let type_name = std::ffi::CString::new(T::dds_type_name().as_ref())
164            .map_err(|_err| crate::error::Error::BadParameter)?;
165
166        let mut sertype =
167            std::mem::ManuallyDrop::new(Box::new(Sertype::<T>::new(&type_name, T::IS_KEYED)));
168
169        self.listener
170            .map(|listener| listener.as_ffi())
171            .transpose()
172            .and_then(|listener| {
173                let inner = ffi::dds_create_topic(
174                    self.participant.inner,
175                    &name,
176                    &mut &mut sertype.inner,
177                    self.qos.map(|qos| &qos.inner),
178                    listener.as_ref(),
179                )
180                .inspect_err(|_| {
181                    ffi::ddsi_sertype_unref(&mut sertype.inner);
182                })?;
183
184                Ok(Topic {
185                    inner,
186                    phantom_type: std::marker::PhantomData,
187                    phantom_participant: std::marker::PhantomData,
188                })
189            })
190    }
191}
192
193impl<'d, 'p, T> Topic<'d, 'p, T>
194where
195    T: crate::Topicable,
196{
197    /// Creates a new `Topic` with the given name under `participant` using
198    /// default [`QoS`](crate::QoS) and no
199    /// [`listener`](crate::listener::TopicListener).
200    ///
201    /// The topic name identifies the communication channel. Writers and
202    /// readers match when they share the same name and compatible type.
203    ///
204    /// # Errors
205    ///
206    /// Returns an [`Error`](crate::Error) if topic fails to create.
207    ///
208    /// # Examples
209    ///
210    /// ```
211    /// # use cyclonedds::{Domain, Participant};
212    /// # let domain = Domain::default();
213    /// # let participant = Participant::new(&domain)?;
214    /// # #[derive(
215    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
216    /// # )]
217    /// # struct Data {
218    /// #     x: i32,
219    /// #     y: i32,
220    /// # }
221    /// use cyclonedds::Topic;
222    ///
223    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
224    /// # Ok::<_, cyclonedds::Error>(())
225    /// ```
226    pub fn new(participant: &'p Participant<'d>, topic_name: &str) -> Result<Self> {
227        Self::builder(participant, topic_name).build()
228    }
229
230    /// Returns a [`TopicBuilder`](crate::builder::TopicBuilder) for
231    /// constructing a topic with custom [`QoS`](crate::QoS) or a
232    /// [`listener`](crate::listener::TopicListener).
233    ///
234    /// # Examples
235    ///
236    /// ```
237    /// use cyclonedds::Topic;
238    /// # use cyclonedds::{Domain, Participant};
239    /// # let domain = Domain::default();
240    /// # let participant = Participant::new(&domain)?;
241    /// # #[derive(
242    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
243    /// # )]
244    /// # struct Data {
245    /// #     x: i32,
246    /// #     y: i32,
247    /// # }
248    ///
249    /// let topic = Topic::<Data>::builder(&participant, "MyTopic").build()?;
250    /// # Ok::<_, cyclonedds::Error>(())
251    /// ```
252    #[must_use]
253    pub const fn builder<'q, 'n>(
254        participant: &'p Participant<'d>,
255        topic_name: &'n str,
256    ) -> TopicBuilder<'d, 'p, 'q, 'n, T> {
257        TopicBuilder::new(participant, topic_name)
258    }
259
260    pub(crate) const fn from_existing(
261        inner: cyclonedds_sys::dds_entity_t,
262    ) -> std::mem::ManuallyDrop<Self> {
263        std::mem::ManuallyDrop::new(Self {
264            inner,
265            phantom_type: std::marker::PhantomData,
266            phantom_participant: std::marker::PhantomData,
267        })
268    }
269
270    /// Sets the [`TopicListener`](crate::TopicListener) on this topic,
271    /// replacing any previously set listener.
272    ///
273    /// # Errors
274    ///
275    /// Returns an [`Error`](crate::Error) if the topic fails to set the
276    /// listener.
277    ///
278    /// # Examples
279    ///
280    /// ```
281    /// use cyclonedds::listener::TopicListener;
282    /// # use cyclonedds::{Domain, Participant, Topic};
283    /// # let domain = Domain::default();
284    /// # let participant = Participant::new(&domain)?;
285    /// # #[derive(
286    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
287    /// # )]
288    /// # struct Data {
289    /// #     x: i32,
290    /// #     y: i32,
291    /// # }
292    ///
293    /// let mut topic = Topic::<Data>::new(&participant, "MyTopic")?;
294    /// topic.set_listener(TopicListener::new().with_inconsistent_topic(|_, status| {
295    ///     println!("inconsistent topic: {status:?}");
296    /// }))?;
297    /// # Ok::<_, cyclonedds::Error>(())
298    /// ```
299    pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
300    where
301        T: serde::ser::Serialize + serde::de::DeserializeOwned + std::clone::Clone + Default,
302        L: AsRef<crate::TopicListener<T>>,
303    {
304        listener
305            .as_ref()
306            .as_ffi()
307            .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
308    }
309
310    /// Removes the listener from this topic.
311    ///
312    /// # Errors
313    ///
314    /// Returns an [`Error`](crate::Error) if the topic fails to unset the
315    /// listener.
316    ///
317    /// # Examples
318    ///
319    /// ```
320    /// # use cyclonedds::{Domain, Participant, Topic};
321    /// # let domain = Domain::default();
322    /// # let participant = Participant::new(&domain)?;
323    /// # #[derive(
324    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
325    /// # )]
326    /// # struct Data {
327    /// #     x: i32,
328    /// #     y: i32,
329    /// # }
330    /// let mut topic = Topic::<Data>::new(&participant, "MyTopic")?;
331    /// topic.unset_listener()?;
332    /// # Ok::<_, cyclonedds::Error>(())
333    /// ```
334    pub fn unset_listener(&mut self) -> Result<()> {
335        ffi::dds_set_listener(self.inner, None)?;
336        Ok(())
337    }
338
339    /// Sets the [`TopicListener`](crate::TopicListener) on this topic,
340    /// consuming and returning `self`.
341    ///
342    /// # Errors
343    ///
344    /// Returns an [`Error`](crate::Error) if the topic fails to set the
345    /// listener.
346    ///
347    /// # Examples
348    ///
349    /// ```
350    /// use cyclonedds::listener::TopicListener;
351    /// # use cyclonedds::{Domain, Participant, Topic};
352    /// # let domain = Domain::default();
353    /// # let participant = Participant::new(&domain)?;
354    /// # #[derive(
355    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
356    /// # )]
357    /// # struct Data {
358    /// #     x: i32,
359    /// #     y: i32,
360    /// # }
361    ///
362    /// let topic = Topic::<Data>::new(&participant, "MyTopic")?.with_listener(TopicListener::new())?;
363    /// # Ok::<_, cyclonedds::Error>(())
364    /// ```
365    pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
366    where
367        T: serde::ser::Serialize + serde::de::DeserializeOwned + std::clone::Clone + Default,
368        L: AsRef<crate::TopicListener<T>>,
369    {
370        self.set_listener(listener).map(|_err| self)
371    }
372}
373
374impl<T> Drop for Topic<'_, '_, T>
375where
376    T: crate::Topicable,
377{
378    fn drop(&mut self) {
379        let result = ffi::dds_delete(self.inner);
380        debug_assert!(
381            result.is_ok(),
382            "unable to delete {self:?}: failed with {result:?}"
383        );
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use super::*;
390
391    #[test]
392    fn test_topic_create() {
393        let domain_id = crate::tests::domain::unique_id();
394        let domain = crate::Domain::new(domain_id).unwrap();
395        let qos = crate::QoS::new();
396        let topic_name = crate::tests::topic::unique_name();
397        let participant = Participant::new(&domain).unwrap();
398        let _ = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
399        let _ = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
400            .with_qos(&qos)
401            .build()
402            .unwrap();
403    }
404
405    #[test]
406    fn test_topic_create_with_invalid_names() {
407        use crate::Topicable;
408
409        #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)]
410        struct MockedTypeNameData;
411        static MOCKED_NAME: std::sync::Mutex<&str> = std::sync::Mutex::new("");
412
413        impl Topicable for MockedTypeNameData {
414            type Key = ();
415
416            fn from_key((): &Self::Key) -> Self {
417                Self {}
418            }
419
420            fn as_key(&self) -> Self::Key {}
421
422            fn dds_type_name() -> impl AsRef<str> {
423                MOCKED_NAME.lock().unwrap().clone()
424            }
425        }
426
427        let domain_id = crate::tests::domain::unique_id();
428        let domain = crate::Domain::new(domain_id).unwrap();
429        let mut participant = crate::Participant::new(&domain).unwrap();
430
431        let data = MockedTypeNameData {};
432        let key = ();
433
434        assert_eq!(data, MockedTypeNameData::from_key(&key));
435        assert!(matches!(data.as_key(), ()));
436
437        // (invalid type name, invalid topic name)
438        *MOCKED_NAME.lock().unwrap() = "\0";
439        let topic_name = "\0";
440
441        let result = Topic::<crate::tests::topic::Data>::new(&participant, topic_name).unwrap_err();
442        assert_eq!(result, crate::Error::BadParameter);
443        let result = Topic::<crate::tests::topic::Data>::builder(&participant, topic_name)
444            .build()
445            .unwrap_err();
446        assert_eq!(result, crate::Error::BadParameter);
447
448        // (invalid type name, valid topic name)
449        *MOCKED_NAME.lock().unwrap() = "\0";
450        let topic_name = &crate::tests::topic::unique_name();
451
452        let result = Topic::<MockedTypeNameData>::new(&participant, topic_name).unwrap_err();
453        assert_eq!(result, crate::Error::BadParameter);
454        let result = Topic::<MockedTypeNameData>::builder(&participant, topic_name)
455            .build()
456            .unwrap_err();
457        assert_eq!(result, crate::Error::BadParameter);
458
459        // (valid type name, invalid topic name)
460        *MOCKED_NAME.lock().unwrap() = "ValidName";
461        let topic_name = "\0";
462
463        let result = Topic::<MockedTypeNameData>::new(&participant, topic_name).unwrap_err();
464        assert_eq!(result, crate::Error::BadParameter);
465        let result = Topic::<MockedTypeNameData>::builder(&participant, topic_name)
466            .build()
467            .unwrap_err();
468        assert_eq!(result, crate::Error::BadParameter);
469
470        // (valid type name, valid topic name) on invalid participant
471        *MOCKED_NAME.lock().unwrap() = "ValidName";
472        let topic_name = &crate::tests::topic::unique_name();
473        let participant_id = participant.inner;
474        participant.inner = 0;
475        let result = Topic::<MockedTypeNameData>::new(&participant, topic_name).unwrap_err();
476        assert_eq!(result, crate::Error::BadParameter);
477        let result = Topic::<MockedTypeNameData>::builder(&participant, topic_name)
478            .build()
479            .unwrap_err();
480        assert_eq!(result, crate::Error::BadParameter);
481        participant.inner = participant_id;
482
483        // (valid type name, valid topic name)
484        *MOCKED_NAME.lock().unwrap() = "ValidName";
485        let topic_name = &crate::tests::topic::unique_name();
486        let _ = Topic::<MockedTypeNameData>::new(&participant, topic_name).unwrap();
487        let _ = Topic::<MockedTypeNameData>::builder(&participant, topic_name)
488            .build()
489            .unwrap();
490    }
491
492    #[test]
493    fn test_topic_create_with_invalid_participant() {
494        let domain_id = crate::tests::domain::unique_id();
495        let domain = crate::Domain::new(domain_id).unwrap();
496        let qos = crate::QoS::new();
497        let topic_name = crate::tests::topic::unique_name();
498        let mut participant = Participant::new(&domain).unwrap();
499        let participant_id = participant.inner;
500        participant.inner = 0;
501        let result =
502            Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap_err();
503        assert_eq!(result, crate::Error::BadParameter);
504        let result = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
505            .with_qos(&qos)
506            .build()
507            .unwrap_err();
508        assert_eq!(result, crate::Error::BadParameter);
509        participant.inner = participant_id;
510    }
511
512    #[test]
513    fn test_topic_with_listener() {
514        let domain_id = crate::tests::domain::unique_id();
515        let domain = crate::Domain::new(domain_id).unwrap();
516        let topic_name = crate::tests::topic::unique_name();
517        let participant = crate::Participant::new(&domain).unwrap();
518
519        let listener = crate::TopicListener::new().with_inconsistent_topic(|_, _| ());
520
521        let _ = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name)
522            .unwrap()
523            .with_listener(&listener)
524            .unwrap();
525        let _ = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
526            .with_listener(&listener)
527            .build()
528            .unwrap();
529
530        let mut topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
531        topic.set_listener(&listener).unwrap();
532        topic.unset_listener().unwrap();
533    }
534
535    #[test]
536    fn test_topic_with_listener_on_invalid_topic() {
537        let domain_id = crate::tests::domain::unique_id();
538        let domain = crate::Domain::new(domain_id).unwrap();
539        let topic_name = crate::tests::topic::unique_name();
540        let participant = crate::Participant::new(&domain).unwrap();
541
542        let listener = crate::TopicListener::new();
543
544        let mut topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
545        let topic_id = topic.inner;
546        topic.inner = 0;
547        let result = topic.set_listener(&listener).unwrap_err();
548        assert_eq!(result, crate::Error::BadParameter);
549        let result = topic.unset_listener().unwrap_err();
550        assert_eq!(result, crate::Error::BadParameter);
551        topic.inner = topic_id;
552    }
553
554    #[test]
555    fn test_topic_create_from_existing() {
556        let domain_id = crate::tests::domain::unique_id();
557        let domain = crate::Domain::new(domain_id).unwrap();
558        let topic_name = crate::tests::topic::unique_name();
559        let participant = crate::Participant::new(&domain).unwrap();
560        let topic_01 = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
561        let topic_02 = Topic::<crate::tests::topic::Data>::from_existing(topic_01.inner);
562        assert_eq!(topic_01.inner, topic_02.inner);
563    }
564}