cyclonedds/publisher.rs
1use crate::internal::ffi;
2use crate::internal::traits::AsFfi;
3use crate::{Participant, Result};
4
5/// A `Publisher` groups [`Writers`](crate::Writer) and controls their shared
6/// [`QoS`](crate::QoS). Writers created under a publisher inherit its
7/// [`QoS`](crate::QoS) where applicable.
8///
9/// Use [`Publisher::new`] for simple construction or [`Publisher::builder`] for
10/// [`QoS`](crate::QoS) and [`listener`](crate::listener::PublisherListener)
11/// configuration.
12///
13/// In most applications a publisher is created implicitly when constructing a
14/// [`Writer`](crate::Writer) directly. Use an explicit publisher when you need
15/// coordinated writes across multiple writers.
16#[derive(Debug)]
17pub struct Publisher<'domain, 'participant> {
18 pub(crate) inner: cyclonedds_sys::dds_entity_t,
19 phantom: std::marker::PhantomData<&'participant Participant<'domain>>,
20}
21
22/// Builder for [`Publisher`] (accessible via [`Publisher::builder`]).
23#[derive(Debug)]
24pub struct PublisherBuilder<'domain, 'participant, 'qos> {
25 participant: &'participant Participant<'domain>,
26 qos: Option<&'qos crate::QoS>,
27 listener: Option<crate::PublisherListener>,
28}
29
30impl<'d, 'p, 'q> PublisherBuilder<'d, 'p, 'q> {
31 /// Creates a new [`PublisherBuilder`] for the given [`Participant`].
32 ///
33 /// # Examples
34 ///
35 /// ```
36 /// use cyclonedds::builder::PublisherBuilder;
37 /// use cyclonedds::{Domain, Participant};
38 ///
39 /// let domain = Domain::default();
40 /// let participant = Participant::new(&domain)?;
41 /// let publisher_builder = PublisherBuilder::new(&participant);
42 /// # Ok::<_, cyclonedds::Error>(())
43 /// ```
44 #[must_use]
45 pub const fn new(participant: &'p Participant<'d>) -> Self {
46 Self {
47 participant,
48 qos: None,
49 listener: None,
50 }
51 }
52
53 /// Sets the [`QoS`](crate::QoS) for this publisher builder.
54 ///
55 /// # Examples
56 ///
57 /// ```
58 /// use cyclonedds::builder::PublisherBuilder;
59 /// use cyclonedds::qos::policy;
60 /// use cyclonedds::{Duration, QoS};
61 /// # use cyclonedds::{Domain, Participant};
62 /// # let domain = Domain::default();
63 /// # let participant = Participant::new(&domain)?;
64 ///
65 /// let qos = QoS::new().with_reliability(policy::Reliability::Reliable {
66 /// max_blocking_time: Duration::from_millis(100),
67 /// });
68 /// let publisher_builder = PublisherBuilder::new(&participant).with_qos(&qos);
69 /// # Ok::<_, cyclonedds::Error>(())
70 /// ```
71 #[must_use]
72 pub const fn with_qos(mut self, qos: &'q crate::QoS) -> Self {
73 self.qos = Some(qos);
74 self
75 }
76
77 /// Sets the [`Listener`](crate::Listener) on this publisher builder.
78 ///
79 /// # Examples
80 ///
81 /// ```
82 /// use cyclonedds::Listener;
83 /// use cyclonedds::builder::PublisherBuilder;
84 /// # use cyclonedds::{Domain, Participant};
85 /// # let domain = Domain::default();
86 /// # let participant = Participant::new(&domain)?;
87 ///
88 /// let publisher_builder = PublisherBuilder::new(&participant).with_listener(Listener::new());
89 /// # Ok::<_, cyclonedds::Error>(())
90 /// ```
91 #[must_use]
92 pub fn with_listener<L>(mut self, listener: L) -> Self
93 where
94 L: AsRef<crate::PublisherListener>,
95 {
96 self.listener = Some(*listener.as_ref());
97 self
98 }
99
100 /// Builds the [`Publisher`].
101 ///
102 /// # Errors
103 ///
104 /// Returns an [`Error`](crate::Error) if the publisher failed to create.
105 ///
106 /// # Examples
107 ///
108 /// ```
109 /// use cyclonedds::QoS;
110 /// use cyclonedds::builder::PublisherBuilder;
111 /// use cyclonedds::qos::policy;
112 /// # use cyclonedds::{Domain, Participant};
113 /// # let domain = Domain::default();
114 /// # let participant = Participant::new(&domain)?;
115 ///
116 /// let qos = QoS::new().with_durability(policy::Durability::TransientLocal);
117 /// let publisher = PublisherBuilder::new(&participant).with_qos(&qos).build()?;
118 /// # Ok::<_, cyclonedds::Error>(())
119 /// ```
120 pub fn build(self) -> Result<Publisher<'d, 'p>> {
121 // NOTE: using `and_then` to avoid ? branch on the listener for coverage
122 // since the C lib currently panics on OOM rather than returning null.
123 self.listener
124 .map(|listener| listener.as_ffi())
125 .transpose()
126 .and_then(|listener| {
127 Ok(Publisher {
128 inner: ffi::dds_create_publisher(
129 self.participant.inner,
130 self.qos.map(|qos| &qos.inner),
131 listener.as_ref(),
132 )?,
133 phantom: std::marker::PhantomData,
134 })
135 })
136 }
137}
138
139impl<'d, 'p> Publisher<'d, 'p> {
140 /// Creates a new `Publisher` under `participant` with default
141 /// [`QoS`](crate::QoS) and no
142 /// [`listener`](crate::listener::PublisherListener).
143 ///
144 /// # Errors
145 ///
146 /// Returns an [`Error`](crate::Error) if the publisher fails to create.
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// use cyclonedds::Publisher;
152 /// # use cyclonedds::{Domain, Participant};
153 /// # let domain = Domain::default();
154 /// # let participant = Participant::new(&domain)?;
155 ///
156 /// let publisher = Publisher::new(&participant)?;
157 /// Ok::<_, cyclonedds::Error>(())
158 /// ```
159 pub fn new(participant: &'p Participant<'d>) -> Result<Self> {
160 Self::builder(participant).build()
161 }
162
163 /// Returns a [`PublisherBuilder`](crate::builder::PublisherBuilder) for
164 /// constructing a publisher with custom [`QoS`](crate::QoS) or a
165 /// [`listener`](crate::listener::PublisherListener).
166 ///
167 /// # Examples
168 ///
169 /// ```
170 /// use cyclonedds::{
171 /// Publisher, QoS,
172 /// qos::policy::{Durability, Presentation},
173 /// };
174 /// # use cyclonedds::{Domain, Participant};
175 /// # let domain = Domain::default();
176 /// # let participant = Participant::new(&domain)?;
177 ///
178 /// let qos = QoS::new().with_presentation(Presentation::Topic {
179 /// coherent_access: true,
180 /// ordered_access: true,
181 /// });
182 /// let publisher = Publisher::builder(&participant).with_qos(&qos).build()?;
183 /// Ok::<_, cyclonedds::Error>(())
184 /// ```
185 #[must_use]
186 pub const fn builder<'q>(participant: &'p Participant<'d>) -> PublisherBuilder<'d, 'p, 'q> {
187 PublisherBuilder::new(participant)
188 }
189
190 /// (WARN: unimplemented in C lib): Suspends publication on all writers
191 /// belonging to this publisher.
192 ///
193 /// <div class="warning">
194 ///
195 /// This function is currently not implemented by the underlying C library
196 /// and will thus always return an unsupported error.
197 ///
198 /// </div>
199 ///
200 /// While suspended, calls to [`Writer::write`](crate::Writer::write) may
201 /// be batched by the middleware. Call [`resume`](Publisher::resume) to
202 /// flush and resume normal publication. Suspend and resume are typically
203 /// used together to send a coherent set of updates.
204 ///
205 /// # Errors
206 ///
207 /// Returns an [`Error`](crate::Error) if publisher fails to suspend.
208 ///
209 /// # Examples
210 ///
211 /// ```no_run
212 /// use cyclonedds::{Topic, Writer};
213 /// # use cyclonedds::{Domain, Participant, Publisher};
214 /// # let domain = Domain::default();
215 /// # let participant = Participant::new(&domain)?;
216 /// # #[derive(
217 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
218 /// # )]
219 /// # struct Data {
220 /// # x: i32,
221 /// # y: i32,
222 /// # }
223 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
224 ///
225 /// // Create the publisher.
226 /// let publisher = Publisher::new(&participant)?;
227 ///
228 /// // Create two Writers under the publisher.
229 /// let writer01 = Writer::builder(&topic).with_publisher(&publisher).build()?;
230 /// let writer02 = Writer::builder(&topic).with_publisher(&publisher).build()?;
231 ///
232 /// // Suspend all the writers.
233 /// publisher.suspend()?;
234 ///
235 /// writer01.write(&Data { x: 0, y: 1 })?;
236 /// writer02.write(&Data { x: 2, y: 3 })?;
237 ///
238 /// // Resume all the writers.
239 /// publisher.resume()?;
240 ///
241 /// Ok::<_, cyclonedds::Error>(())
242 /// ```
243 pub fn suspend(&self) -> Result<()> {
244 ffi::dds_suspend(self.inner)
245 }
246
247 /// (WARN: unimplemented in C lib): Resumes publication on all writers
248 /// belonging to this publisher.
249 ///
250 /// <div class="warning">
251 ///
252 /// This function is currently not implemented by the underlying C library
253 /// and will thus always return an unsupported error.
254 ///
255 /// </div>
256 ///
257 /// Flushes any writes that were batched during a
258 /// [`suspend`](Publisher::suspend) and resumes normal publication.
259 ///
260 /// # Errors
261 ///
262 /// Returns an [`Error`](crate::Error) if the publisher fails to resume.
263 ///
264 /// # Examples
265 ///
266 /// ```no_run
267 /// use cyclonedds::{Topic, Writer};
268 /// # use cyclonedds::{Domain, Participant, Publisher};
269 /// # let domain = Domain::default();
270 /// # let participant = Participant::new(&domain)?;
271 /// # #[derive(
272 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
273 /// # )]
274 /// # struct Data {
275 /// # x: i32,
276 /// # y: i32,
277 /// # }
278 /// let topic = Topic::<Data>::new(&participant, "MyTopic")?;
279 ///
280 /// // Create the publisher.
281 /// let publisher = Publisher::new(&participant)?;
282 ///
283 /// // Create two Writers under the publisher.
284 /// let writer01 = Writer::builder(&topic).with_publisher(&publisher).build()?;
285 /// let writer02 = Writer::builder(&topic).with_publisher(&publisher).build()?;
286 ///
287 /// // Suspend all the writers.
288 /// publisher.suspend()?;
289 ///
290 /// writer01.write(&Data { x: 0, y: 1 })?;
291 /// writer02.write(&Data { x: 2, y: 3 })?;
292 ///
293 /// // Resume all the writers.
294 /// publisher.resume()?;
295 ///
296 /// Ok::<_, cyclonedds::Error>(())
297 /// ```
298 pub fn resume(&self) -> Result<()> {
299 ffi::dds_resume(self.inner)
300 }
301
302 /// (WARN: unimplemented in C lib): Blocks until all samples written by
303 /// writers under this publisher have been acknowledged by all matched
304 /// reliable readers, or until `timeout` elapses.
305 ///
306 /// <div class="warning">
307 ///
308 /// This function is currently not implemented by the underlying C library
309 /// and will thus always return an unsupported error.
310 ///
311 /// </div>
312 ///
313 ///
314 /// # Errors
315 ///
316 /// Returns an [`Error`](crate::Error) if the timeout elapses before all
317 /// acknowledgements are received or if the publisher returns an error.
318 ///
319 /// # Examples
320 ///
321 /// ```no_run
322 /// use cyclonedds::Duration;
323 /// # use cyclonedds::{Domain, Participant, Publisher};
324 /// # let domain = Domain::default();
325 /// # let participant = Participant::new(&domain)?;
326 ///
327 /// let publisher = Publisher::new(&participant)?;
328 /// publisher.wait_for_acks(Duration::from_secs(1))?;
329 /// Ok::<_, cyclonedds::Error>(())
330 /// ```
331 pub fn wait_for_acks(&self, timeout: crate::Duration) -> Result<()> {
332 ffi::dds_wait_for_acks(self.inner, timeout.inner)
333 }
334
335 #[allow(unused)]
336 pub(crate) const fn from_existing(
337 inner: cyclonedds_sys::dds_entity_t,
338 ) -> std::mem::ManuallyDrop<Self> {
339 std::mem::ManuallyDrop::new(Self {
340 inner,
341 phantom: std::marker::PhantomData,
342 })
343 }
344
345 /// Sets the [`PublisherListener`](crate::PublisherListener) on this
346 /// publisher, replacing any previously set listener.
347 ///
348 /// # Errors
349 ///
350 /// Returns an [`Error`](crate::Error) if the publisher fails to set the
351 /// listener.
352 ///
353 /// # Examples
354 ///
355 /// ```
356 /// use cyclonedds::PublisherListener;
357 /// # use cyclonedds::{Domain, Participant, Publisher};
358 /// # let domain = Domain::default();
359 /// # let participant = Participant::new(&domain)?;
360 ///
361 /// let mut publisher = Publisher::new(&participant)?;
362 /// publisher.set_listener(PublisherListener::new())?;
363 /// # Ok::<_, cyclonedds::Error>(())
364 /// ```
365 pub fn set_listener<L>(&mut self, listener: L) -> Result<()>
366 where
367 L: AsRef<crate::PublisherListener>,
368 {
369 listener
370 .as_ref()
371 .as_ffi()
372 .and_then(|listener| ffi::dds_set_listener(self.inner, Some(listener.inner)))
373 }
374
375 /// Removes the listener from this publisher.
376 ///
377 /// # Errors
378 ///
379 /// Returns an [`Error`](crate::Error) if the publisher fails to unset the
380 /// listener.
381 ///
382 /// # Examples
383 ///
384 /// ```
385 /// # use cyclonedds::{Domain, Participant, Publisher};
386 /// # let domain = Domain::default();
387 /// # let participant = Participant::new(&domain)?;
388 /// let mut publisher = Publisher::new(&participant)?;
389 /// publisher.unset_listener()?;
390 /// # Ok::<_, cyclonedds::Error>(())
391 /// ```
392 pub fn unset_listener(&mut self) -> Result<()> {
393 ffi::dds_set_listener(self.inner, None)?;
394 Ok(())
395 }
396
397 /// Sets the [`PublisherListener`](crate::PublisherListener) on this
398 /// publisher, consuming and returning `self`.
399 ///
400 /// # Errors
401 ///
402 /// Returns an [`Error`](crate::Error) if the publisher fails to set the
403 /// listener.
404 ///
405 /// # Examples
406 ///
407 /// ```
408 /// use cyclonedds::PublisherListener;
409 /// # use cyclonedds::{Domain, Participant, Publisher};
410 /// # let domain = Domain::default();
411 /// # let participant = Participant::new(&domain)?;
412 ///
413 /// let publisher = Publisher::new(&participant)?.with_listener(PublisherListener::new())?;
414 /// # Ok::<_, cyclonedds::Error>(())
415 /// ```
416 pub fn with_listener<L>(mut self, listener: L) -> Result<Self>
417 where
418 L: AsRef<crate::PublisherListener>,
419 {
420 self.set_listener(listener).map(|()| self)
421 }
422}
423
424impl Drop for Publisher<'_, '_> {
425 fn drop(&mut self) {
426 let result = ffi::dds_delete(self.inner);
427 debug_assert!(
428 result.is_ok(),
429 "unable to delete {self:?}: failed with {result:?}"
430 );
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[test]
439 fn test_publisher_create() {
440 let domain_id = crate::tests::domain::unique_id();
441 let domain = crate::Domain::new(domain_id).unwrap();
442 let qos = crate::QoS::new();
443 let participant = Participant::new(&domain).unwrap();
444 let _ = Publisher::new(&participant).unwrap();
445 let _ = Publisher::builder(&participant)
446 .with_qos(&qos)
447 .build()
448 .unwrap();
449 }
450
451 #[test]
452 fn test_publisher_create_with_invalid_participant() {
453 let domain_id = crate::tests::domain::unique_id();
454 let domain = crate::Domain::new(domain_id).unwrap();
455 let qos = crate::QoS::new();
456 let mut participant = Participant::new(&domain).unwrap();
457 let participant_id = participant.inner;
458 participant.inner = 0;
459 let result = Publisher::new(&participant).unwrap_err();
460 assert_eq!(result, crate::Error::BadParameter);
461 let result = Publisher::builder(&participant)
462 .with_qos(&qos)
463 .build()
464 .unwrap_err();
465 assert_eq!(result, crate::Error::BadParameter);
466 participant.inner = participant_id;
467 }
468
469 #[test]
470 fn test_publisher_from_existing_publisher() {
471 let domain_id = crate::tests::domain::unique_id();
472 let domain = crate::Domain::new(domain_id).unwrap();
473 let participant = crate::Participant::new(&domain).unwrap();
474 let publisher = Publisher::new(&participant).unwrap();
475
476 let new_publisher = Publisher::from_existing(publisher.inner);
477
478 assert_eq!(new_publisher.inner, publisher.inner);
479 }
480
481 #[test]
482 fn test_publisher_suspend_not_yet_supported_by_c_lib() {
483 let domain_id = crate::tests::domain::unique_id();
484 let domain = crate::Domain::new(domain_id).unwrap();
485 let participant = crate::Participant::new(&domain).unwrap();
486 let publisher = Publisher::new(&participant).unwrap();
487
488 let result = publisher.suspend();
489 assert_eq!(
490 result,
491 Err(crate::Error::Unsupported),
492 "result was not unsupported (might be implemented now?)"
493 );
494 }
495
496 #[test]
497 fn test_publisher_resume_not_yet_supported_by_c_lib() {
498 let domain_id = crate::tests::domain::unique_id();
499 let domain = crate::Domain::new(domain_id).unwrap();
500 let participant = crate::Participant::new(&domain).unwrap();
501 let publisher = Publisher::new(&participant).unwrap();
502
503 let result = publisher.resume();
504 assert_eq!(
505 result,
506 Err(crate::Error::Unsupported),
507 "result was not unsupported (might be implemented now?)"
508 );
509 }
510
511 #[test]
512 fn test_publisher_wait_for_acks_not_yet_supported_by_c_lib() {
513 let domain_id = crate::tests::domain::unique_id();
514 let domain = crate::Domain::new(domain_id).unwrap();
515 let participant = crate::Participant::new(&domain).unwrap();
516 let publisher = Publisher::new(&participant).unwrap();
517
518 let result =
519 publisher.wait_for_acks(std::time::Duration::from_millis(10).try_into().unwrap());
520 assert_eq!(
521 result,
522 Err(crate::Error::Unsupported),
523 "result was not unsupported (might be implemented now?)"
524 );
525 }
526
527 #[test]
528 fn test_publisher_with_listener() {
529 let domain_id = crate::tests::domain::unique_id();
530 let domain = crate::Domain::new(domain_id).unwrap();
531 let participant = crate::Participant::new(&domain).unwrap();
532
533 let listener = crate::PublisherListener::new();
534
535 let _ = Publisher::new(&participant)
536 .unwrap()
537 .with_listener(listener)
538 .unwrap();
539 let _ = Publisher::builder(&participant)
540 .with_listener(listener)
541 .build()
542 .unwrap();
543
544 let mut publisher = Publisher::new(&participant).unwrap();
545 publisher.set_listener(listener).unwrap();
546 publisher.unset_listener().unwrap();
547 }
548
549 #[test]
550 fn test_publisher_with_listener_on_invalid_publisher() {
551 let domain_id = crate::tests::domain::unique_id();
552 let domain = crate::Domain::new(domain_id).unwrap();
553 let participant = crate::Participant::new(&domain).unwrap();
554
555 let listener = crate::PublisherListener::new();
556
557 let mut publisher = Publisher::new(&participant).unwrap();
558 let publisher_id = publisher.inner;
559 publisher.inner = 0;
560 let result = publisher.set_listener(listener).unwrap_err();
561 assert_eq!(result, crate::Error::BadParameter);
562 let result = publisher.unset_listener().unwrap_err();
563 assert_eq!(result, crate::Error::BadParameter);
564 publisher.inner = publisher_id;
565 }
566}