cyclonedds/topic.rs
1use crate::internal::ffi;
2use crate::internal::sertype::Sertype;
3use crate::internal::traits::AsFfi;
4use crate::{Participant, Result};
5
6/// A typed communication channel.
7///
8/// A `Topic` binds a name to a data type [`T`](crate::Topicable) within a
9/// [`Participant`](crate::Participant). [`Writers`](crate::Writer) and
10/// [`Readers`](crate::Reader) are created against a topic and only match each
11/// other when they share the same topic name and compatible type and
12/// [`QoS`](crate::QoS).
13///
14/// Use [`Topic::new`] for simple construction or [`Topic::builder`] for
15/// [`QoS`](crate::QoS) and [`listener`](crate::listener::TopicListener)
16/// configuration.
17#[derive(Debug)]
18pub struct Topic<'domain, 'participant, T>
19where
20 T: crate::Topicable,
21{
22 pub(crate) inner: cyclonedds_sys::dds_entity_t,
23 phantom_type: std::marker::PhantomData<T>,
24 phantom_participant: std::marker::PhantomData<&'participant Participant<'domain>>,
25}
26
27/// Builder for [`Topic<T>`] (accessible via [`Topic::builder`]).
28#[derive(Debug)]
29pub struct TopicBuilder<'domain, 'participant, 'qos, 'name, T>
30where
31 T: crate::Topicable,
32{
33 participant: &'participant Participant<'domain>,
34 topic_name: &'name str,
35 qos: Option<&'qos crate::QoS>,
36 listener: Option<crate::TopicListener<T>>,
37}
38
39impl<'d, 'p, 'q, 'n, T> TopicBuilder<'d, 'p, 'q, 'n, T>
40where
41 T: crate::Topicable,
42{
43 /// Creates a new [`TopicBuilder`] for the given [`Participant`].
44 ///
45 /// # Examples
46 ///
47 /// ```
48 /// use cyclonedds::builder::TopicBuilder;
49 /// use cyclonedds::{Domain, Participant};
50 /// # #[derive(
51 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
52 /// # )]
53 /// # struct Data {
54 /// # x: i32,
55 /// # }
56 ///
57 /// let domain = Domain::default();
58 /// let participant = Participant::new(&domain)?;
59 /// let topic_builder = TopicBuilder::<Data>::new(&participant, "MyTopic");
60 /// # Ok::<_, cyclonedds::Error>(())
61 /// ```
62 #[must_use]
63 pub const fn new(participant: &'p Participant<'d>, topic_name: &'n str) -> Self {
64 Self {
65 participant,
66 topic_name,
67 qos: None,
68 listener: None,
69 }
70 }
71
72 /// Sets the [`QoS`](crate::QoS) for this topic builder.
73 ///
74 /// # Examples
75 ///
76 /// ```
77 /// use cyclonedds::builder::TopicBuilder;
78 /// use cyclonedds::qos::policy;
79 /// use cyclonedds::{Duration, QoS};
80 /// # use cyclonedds::{Domain, Participant};
81 /// # let domain = Domain::default();
82 /// # let participant = Participant::new(&domain)?;
83 /// # #[derive(
84 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
85 /// # )]
86 /// # struct Data {
87 /// # x: i32,
88 /// # }
89 ///
90 /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
91 /// max_blocking_time: Duration::from_millis(100),
92 /// });
93 /// let topic_builder = TopicBuilder::<Data>::new(&participant, "MyTopic").with_qos(&qos);
94 /// # Ok::<_, cyclonedds::Error>(())
95 /// ```
96 #[must_use]
97 pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
98 self.qos = Some(qos);
99 self
100 }
101
102 /// Sets the [`Listener`](crate::Listener) on this topic builder.
103 ///
104 /// # Examples
105 ///
106 /// ```
107 /// use cyclonedds::TopicListener;
108 /// use cyclonedds::builder::TopicBuilder;
109 /// # use cyclonedds::{Domain, Participant};
110 /// # let domain = Domain::default();
111 /// # let participant = Participant::new(&domain)?;
112 /// # #[derive(
113 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
114 /// # )]
115 /// # struct Data {
116 /// # x: i32,
117 /// # }
118 ///
119 /// let participant_builder =
120 /// TopicBuilder::<Data>::new(&participant, "MyTopic").with_listener(TopicListener::new());
121 /// # Ok::<_, cyclonedds::Error>(())
122 /// ```
123 #[must_use]
124 pub fn with_listener<L>(mut self, listener: L) -> Self
125 where
126 L: AsRef<crate::TopicListener<T>>,
127 {
128 self.listener = Some(listener.as_ref().clone());
129 self
130 }
131
132 /// Builds the [`Topic`].
133 ///
134 /// # Errors
135 ///
136 /// Returns an [`Error`](crate::Error) if the topic failed to create.
137 ///
138 /// # Examples
139 ///
140 /// ```
141 /// use cyclonedds::QoS;
142 /// use cyclonedds::builder::TopicBuilder;
143 /// use cyclonedds::qos::policy;
144 /// # use cyclonedds::{Domain, Participant};
145 /// # let domain = Domain::default();
146 /// # let participant = Participant::new(&domain)?;
147 /// # #[derive(
148 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
149 /// # )]
150 /// # struct Data {
151 /// # x: i32,
152 /// # }
153 ///
154 /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
155 /// let topic = TopicBuilder::<Data>::new(&participant, "MyTopic")
156 /// .with_qos(&qos)
157 /// .build()?;
158 /// # Ok::<_, cyclonedds::Error>(())
159 /// ```
160 pub fn build(self) -> Result<Topic<'d, 'p, T>> {
161 let name = std::ffi::CString::new(self.topic_name)
162 .map_err(|_err| crate::error::Error::BadParameter)?;
163 let type_name = std::ffi::CString::new(T::dds_type_name().as_ref())
164 .map_err(|_err| crate::error::Error::BadParameter)?;
165
166 let mut sertype =
167 std::mem::ManuallyDrop::new(Box::new(Sertype::<T>::new(&type_name, T::IS_KEYED)));
168
169 self.listener
170 .map(|listener| listener.as_ffi())
171 .transpose()
172 .and_then(|listener| {
173 let inner = ffi::dds_create_topic(
174 self.participant.inner,
175 &name,
176 &mut &mut sertype.inner,
177 self.qos.map(|qos| &qos.inner),
178 listener.as_ref(),
179 )
180 .inspect_err(|_| {
181 ffi::ddsi_sertype_unref(&mut sertype.inner);
182 })?;
183
184 Ok(Topic {
185 inner,
186 phantom_type: std::marker::PhantomData,
187 phantom_participant: std::marker::PhantomData,
188 })
189 })
190 }
191}
192
193impl<'d, 'p, T> Topic<'d, 'p, T>
194where
195 T: crate::Topicable,
196{
197 /// Creates a new `Topic` with the given name under `participant` using
198 /// default [`QoS`](crate::QoS) and no
199 /// [`listener`](crate::listener::TopicListener).
200 ///
201 /// The topic name identifies the communication channel. Writers and
202 /// readers match when they share the same name and compatible type.
203 ///
204 /// # Errors
205 ///
206 /// Returns an [`Error`](crate::Error) if topic fails to create.
207 ///
208 /// # Examples
209 ///
210 /// ```
211 /// # use cyclonedds::{Domain, Participant};
212 /// # let domain = Domain::default();
213 /// # let participant = Participant::new(&domain)?;
214 /// # #[derive(
215 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
216 /// # )]
217 /// # struct Data {
218 /// # x: i32,
219 /// # y: i32,
220 /// # }
221 /// use cyclonedds::Topic;
222 ///
223 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
224 /// # Ok::<_, cyclonedds::Error>(())
225 /// ```
226 pub fn new(participant: &'p Participant<'d>, topic_name: &str) -> Result<Self> {
227 Self::builder(participant, topic_name).build()
228 }
229
230 /// Returns a [`TopicBuilder`](crate::builder::TopicBuilder) for
231 /// constructing a topic with custom [`QoS`](crate::QoS) or a
232 /// [`listener`](crate::listener::TopicListener).
233 ///
234 /// # Examples
235 ///
236 /// ```
237 /// use cyclonedds::Topic;
238 /// # use cyclonedds::{Domain, Participant};
239 /// # let domain = Domain::default();
240 /// # let participant = Participant::new(&domain)?;
241 /// # #[derive(
242 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
243 /// # )]
244 /// # struct Data {
245 /// # x: i32,
246 /// # y: i32,
247 /// # }
248 ///
249 /// let topic = Topic::<Data>::builder(&participant, "MyTopic").build()?;
250 /// # Ok::<_, cyclonedds::Error>(())
251 /// ```
252 #[must_use]
253 pub const fn builder<'q, 'n>(
254 participant: &'p Participant<'d>,
255 topic_name: &'n str,
256 ) -> TopicBuilder<'d, 'p, 'q, 'n, T> {
257 TopicBuilder::new(participant, topic_name)
258 }
259
260 pub(crate) const fn from_existing(
261 inner: cyclonedds_sys::dds_entity_t,
262 ) -> std::mem::ManuallyDrop<Self> {
263 std::mem::ManuallyDrop::new(Self {
264 inner,
265 phantom_type: std::marker::PhantomData,
266 phantom_participant: std::marker::PhantomData,
267 })
268 }
269
270 /// Sets the [`TopicListener`](crate::TopicListener) on this topic,
271 /// replacing any previously set listener.
272 ///
273 /// # Errors
274 ///
275 /// Returns an [`Error`](crate::Error) if the topic fails to set the
276 /// listener.
277 ///
278 /// # Examples
279 ///
280 /// ```
281 /// use cyclonedds::listener::TopicListener;
282 /// # use cyclonedds::{Domain, Participant, Topic};
283 /// # let domain = Domain::default();
284 /// # let participant = Participant::new(&domain)?;
285 /// # #[derive(
286 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
287 /// # )]
288 /// # struct Data {
289 /// # x: i32,
290 /// # y: i32,
291 /// # }
292 ///
293 /// let mut topic = Topic::<Data>::new(&participant, "MyTopic")?;
294 /// topic.set_listener(TopicListener::new().with_inconsistent_topic(|_, status| {
295 /// println!("inconsistent topic: {status:?}");
296 /// }))?;
297 /// # Ok::<_, cyclonedds::Error>(())
298 /// ```
299 pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
300 where
301 T: serde::ser::Serialize + serde::de::DeserializeOwned + std::clone::Clone + Default,
302 L: AsRef<crate::TopicListener<T>>,
303 {
304 listener
305 .as_ref()
306 .as_ffi()
307 .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
308 }
309
310 /// Removes the listener from this topic.
311 ///
312 /// # Errors
313 ///
314 /// Returns an [`Error`](crate::Error) if the topic fails to unset the
315 /// listener.
316 ///
317 /// # Examples
318 ///
319 /// ```
320 /// # use cyclonedds::{Domain, Participant, Topic};
321 /// # let domain = Domain::default();
322 /// # let participant = Participant::new(&domain)?;
323 /// # #[derive(
324 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
325 /// # )]
326 /// # struct Data {
327 /// # x: i32,
328 /// # y: i32,
329 /// # }
330 /// let mut topic = Topic::<Data>::new(&participant, "MyTopic")?;
331 /// topic.unset_listener()?;
332 /// # Ok::<_, cyclonedds::Error>(())
333 /// ```
334 pub fn unset_listener(&mut self) -> Result<()> {
335 ffi::dds_set_listener(self.inner, None)?;
336 Ok(())
337 }
338
339 /// Sets the [`TopicListener`](crate::TopicListener) on this topic,
340 /// consuming and returning `self`.
341 ///
342 /// # Errors
343 ///
344 /// Returns an [`Error`](crate::Error) if the topic fails to set the
345 /// listener.
346 ///
347 /// # Examples
348 ///
349 /// ```
350 /// use cyclonedds::listener::TopicListener;
351 /// # use cyclonedds::{Domain, Participant, Topic};
352 /// # let domain = Domain::default();
353 /// # let participant = Participant::new(&domain)?;
354 /// # #[derive(
355 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
356 /// # )]
357 /// # struct Data {
358 /// # x: i32,
359 /// # y: i32,
360 /// # }
361 ///
362 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?.with_listener(TopicListener::new())?;
363 /// # Ok::<_, cyclonedds::Error>(())
364 /// ```
365 pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
366 where
367 T: serde::ser::Serialize + serde::de::DeserializeOwned + std::clone::Clone + Default,
368 L: AsRef<crate::TopicListener<T>>,
369 {
370 self.set_listener(listener).map(|_err| self)
371 }
372}
373
374impl<T> Drop for Topic<'_, '_, T>
375where
376 T: crate::Topicable,
377{
378 fn drop(&mut self) {
379 let result = ffi::dds_delete(self.inner);
380 debug_assert!(
381 result.is_ok(),
382 "unable to delete {self:?}: failed with {result:?}"
383 );
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use super::*;
390
391 #[test]
392 fn test_topic_create() {
393 let domain_id = crate::tests::domain::unique_id();
394 let domain = crate::Domain::new(domain_id).unwrap();
395 let qos = crate::QoS::new();
396 let topic_name = crate::tests::topic::unique_name();
397 let participant = Participant::new(&domain).unwrap();
398 let _ = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
399 let _ = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
400 .with_qos(&qos)
401 .build()
402 .unwrap();
403 }
404
405 #[test]
406 fn test_topic_create_with_invalid_names() {
407 use crate::Topicable;
408
409 #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq)]
410 struct MockedTypeNameData;
411 static MOCKED_NAME: std::sync::Mutex<&str> = std::sync::Mutex::new("");
412
413 impl Topicable for MockedTypeNameData {
414 type Key = ();
415
416 fn from_key((): &Self::Key) -> Self {
417 Self {}
418 }
419
420 fn as_key(&self) -> Self::Key {}
421
422 fn dds_type_name() -> impl AsRef<str> {
423 MOCKED_NAME.lock().unwrap().clone()
424 }
425 }
426
427 let domain_id = crate::tests::domain::unique_id();
428 let domain = crate::Domain::new(domain_id).unwrap();
429 let mut participant = crate::Participant::new(&domain).unwrap();
430
431 let data = MockedTypeNameData {};
432 let key = ();
433
434 assert_eq!(data, MockedTypeNameData::from_key(&key));
435 assert!(matches!(data.as_key(), ()));
436
437 // (invalid type name, invalid topic name)
438 *MOCKED_NAME.lock().unwrap() = "\0";
439 let topic_name = "\0";
440
441 let result = Topic::<crate::tests::topic::Data>::new(&participant, topic_name).unwrap_err();
442 assert_eq!(result, crate::Error::BadParameter);
443 let result = Topic::<crate::tests::topic::Data>::builder(&participant, topic_name)
444 .build()
445 .unwrap_err();
446 assert_eq!(result, crate::Error::BadParameter);
447
448 // (invalid type name, valid topic name)
449 *MOCKED_NAME.lock().unwrap() = "\0";
450 let topic_name = &crate::tests::topic::unique_name();
451
452 let result = Topic::<MockedTypeNameData>::new(&participant, topic_name).unwrap_err();
453 assert_eq!(result, crate::Error::BadParameter);
454 let result = Topic::<MockedTypeNameData>::builder(&participant, topic_name)
455 .build()
456 .unwrap_err();
457 assert_eq!(result, crate::Error::BadParameter);
458
459 // (valid type name, invalid topic name)
460 *MOCKED_NAME.lock().unwrap() = "ValidName";
461 let topic_name = "\0";
462
463 let result = Topic::<MockedTypeNameData>::new(&participant, topic_name).unwrap_err();
464 assert_eq!(result, crate::Error::BadParameter);
465 let result = Topic::<MockedTypeNameData>::builder(&participant, topic_name)
466 .build()
467 .unwrap_err();
468 assert_eq!(result, crate::Error::BadParameter);
469
470 // (valid type name, valid topic name) on invalid participant
471 *MOCKED_NAME.lock().unwrap() = "ValidName";
472 let topic_name = &crate::tests::topic::unique_name();
473 let participant_id = participant.inner;
474 participant.inner = 0;
475 let result = Topic::<MockedTypeNameData>::new(&participant, topic_name).unwrap_err();
476 assert_eq!(result, crate::Error::BadParameter);
477 let result = Topic::<MockedTypeNameData>::builder(&participant, topic_name)
478 .build()
479 .unwrap_err();
480 assert_eq!(result, crate::Error::BadParameter);
481 participant.inner = participant_id;
482
483 // (valid type name, valid topic name)
484 *MOCKED_NAME.lock().unwrap() = "ValidName";
485 let topic_name = &crate::tests::topic::unique_name();
486 let _ = Topic::<MockedTypeNameData>::new(&participant, topic_name).unwrap();
487 let _ = Topic::<MockedTypeNameData>::builder(&participant, topic_name)
488 .build()
489 .unwrap();
490 }
491
492 #[test]
493 fn test_topic_create_with_invalid_participant() {
494 let domain_id = crate::tests::domain::unique_id();
495 let domain = crate::Domain::new(domain_id).unwrap();
496 let qos = crate::QoS::new();
497 let topic_name = crate::tests::topic::unique_name();
498 let mut participant = Participant::new(&domain).unwrap();
499 let participant_id = participant.inner;
500 participant.inner = 0;
501 let result =
502 Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap_err();
503 assert_eq!(result, crate::Error::BadParameter);
504 let result = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
505 .with_qos(&qos)
506 .build()
507 .unwrap_err();
508 assert_eq!(result, crate::Error::BadParameter);
509 participant.inner = participant_id;
510 }
511
512 #[test]
513 fn test_topic_with_listener() {
514 let domain_id = crate::tests::domain::unique_id();
515 let domain = crate::Domain::new(domain_id).unwrap();
516 let topic_name = crate::tests::topic::unique_name();
517 let participant = crate::Participant::new(&domain).unwrap();
518
519 let listener = crate::TopicListener::new().with_inconsistent_topic(|_, _| ());
520
521 let _ = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name)
522 .unwrap()
523 .with_listener(&listener)
524 .unwrap();
525 let _ = Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
526 .with_listener(&listener)
527 .build()
528 .unwrap();
529
530 let mut topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
531 topic.set_listener(&listener).unwrap();
532 topic.unset_listener().unwrap();
533 }
534
535 #[test]
536 fn test_topic_with_listener_on_invalid_topic() {
537 let domain_id = crate::tests::domain::unique_id();
538 let domain = crate::Domain::new(domain_id).unwrap();
539 let topic_name = crate::tests::topic::unique_name();
540 let participant = crate::Participant::new(&domain).unwrap();
541
542 let listener = crate::TopicListener::new();
543
544 let mut topic = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
545 let topic_id = topic.inner;
546 topic.inner = 0;
547 let result = topic.set_listener(&listener).unwrap_err();
548 assert_eq!(result, crate::Error::BadParameter);
549 let result = topic.unset_listener().unwrap_err();
550 assert_eq!(result, crate::Error::BadParameter);
551 topic.inner = topic_id;
552 }
553
554 #[test]
555 fn test_topic_create_from_existing() {
556 let domain_id = crate::tests::domain::unique_id();
557 let domain = crate::Domain::new(domain_id).unwrap();
558 let topic_name = crate::tests::topic::unique_name();
559 let participant = crate::Participant::new(&domain).unwrap();
560 let topic_01 = Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
561 let topic_02 = Topic::<crate::tests::topic::Data>::from_existing(topic_01.inner);
562 assert_eq!(topic_01.inner, topic_02.inner);
563 }
564}