cyclonedds/listener.rs
1//! Listener types for reacting to [`status events`](crate::Status) on
2//! [`entities`](crate::entity::Entity).
3//!
4//! Each entity type has a corresponding listener struct that holds optional
5//! callbacks for the status events it can produce. Callbacks are plain function
6//! pointers and are registered via chainable `with_*` methods.
7//!
8//! The listener structure mimics the DDS entity hierarchy. [`Listener`] is the
9//! top-level type attached to a [`Participant`](crate::Participant) and
10//! composes [`SubscriberListener`] and [`PublisherListener`]. Entity-specific
11//! listeners ([`ReaderListener`] and [`WriterListener`]) are attached directly
12//! to their respective entities.
13//!
14//! ```text
15//! ╭───────────────────────╮ ╭─────────────────────────────────────╮
16//! │ Entity │ │ Listener │
17//! ╰───────────────────────╯ ╰─────────────────────────────────────╯
18//!
19//! Domain
20//! │
21//! Participant ··················································· Listener
22//! ├─ Topic<T> ······························ TopicListener<T> ─┤
23//! ├─ Subscriber ··························· SubscriberListener ─┤
24//! │ └─ Reader<T> ··········· ReaderListener<T> ───┘ │
25//! └─ Publisher ····························· PublisherListener ─┘
26//! └─ Writer<T> ············ WriterListener<T> ───┘
27//! ```
28//!
29//! Listeners can be set at any level of the entity hierarchy. A listener set on
30//! a [`Participant`](crate::Participant) will have its callbacks inherited by
31//! child entities of that participant.
32//!
33//! Alternatively, a higher-level listener can also be passed directly to the
34//! child entity's builder (as each listener type implements [`AsRef`] for the
35//! listener types below it in the hierarchy). As a result, a single
36//! [`Listener`] can be reused across multiple entity builders without
37//! constructing separate listeners for each level.
38//!
39//! ```
40//! use cyclonedds::Listener;
41//! use cyclonedds::{Domain, Participant, Subscriber};
42//!
43//! let domain = Domain::default();
44//!
45//! // Create a participant listener with the subscriber callbacks configured.
46//! let listener = Listener::new().with_subscriber(|s| {
47//! s.with_data_on_readers(|subscriber| {
48//! println!("{subscriber:?} has data");
49//! })
50//! });
51//!
52//! // Create a participant with the listener.
53//! let participant = Participant::builder(&domain)
54//! .with_listener(&listener)
55//! .build()?;
56//!
57//! // Subscribers created under the participant will inherit the `data_on_readers`
58//! // callback.
59//! let subscriber = Subscriber::new(&participant)?;
60//!
61//! // This subscriber is explicitly created with the subscriber portion of the
62//! // `listener`.
63//! let subscriber = Subscriber::builder(&participant)
64//! .with_listener(&listener)
65//! .build()?;
66//!
67//! # Ok::<_, cyclonedds::Error>(())
68//! ```
69//!
70//! Each callback fires when its corresponding [`Status`](crate::Status)
71//! condition is triggered. Most callbacks receive a status value from the
72//! [`status`](crate::status) module carrying event-specific detail such as
73//! counts and last-instance handles.
74//!
75//! # Warning
76//!
77//! <div class="warning">
78//!
79//! **Unstable:** The full DDS listener hierarchy, where [`TopicListener<T>`]
80//! composes under [`Listener`] and [`ReaderListener<T>`] and
81//! [`WriterListener<T>`] compose under [`SubscriberListener`] and
82//! [`PublisherListener`], respectively, is not yet implemented.
83//!
84//! The [`Listener`], [`SubscriberListener`], and [`PublisherListener`] may
85//! propagate to many [`Topic<T>`](crate::Topic), [`Reader<T>`](crate::Reader),
86//! and [`Writer<T>`](crate::Writer) that all have different types for `<T>`. As
87//! a result, one of two obvious solutions presents itself:
88//!
89//! - Allow these higher-level types to only have callbacks of effectively
90//! [`std::any::Any`] and require the callback to attempt to convert. This
91//! maps the most correctly onto how the API is designed in the specification
92//! but would greatly complicate the internal dispatching of these listeners.
93//!
94//! - Maintain a typed registry of all the different types of callbacks that are
95//! attached on the higher-level untyped subscribers and then add code to
96//! check if the event that fired corresponds to a type whose callback was
97//! registered. This would work but introduces semantics that do not match the
98//! other DDS implementations.
99//!
100//! </div>
101//!
102//! # Examples
103//!
104//! ```
105//! use cyclonedds::entity::Entity;
106//! use cyclonedds::{Reader, Topic, Writer};
107//! use cyclonedds::{ReaderListener, TopicListener, WriterListener};
108//! # #[derive(
109//! # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
110//! # )]
111//! # struct Data {
112//! # x: i32,
113//! # }
114//! # let domain = cyclonedds::Domain::default();
115//! # let participant = cyclonedds::Participant::new(&domain)?;
116//!
117//! let topic = Topic::<Data>::builder(&participant, "Example")
118//! .with_listener(
119//! TopicListener::new().with_inconsistent_topic(|topic, inconsistent_topic| {
120//! println!(
121//! "{topic:?} inconsistent topic: {} just encountered, {} encountered in total",
122//! inconsistent_topic.total.delta, inconsistent_topic.total.count
123//! )
124//! }),
125//! )
126//! .build()?;
127//!
128//! let reader = Reader::builder(&topic)
129//! .with_listener(
130//! ReaderListener::new()
131//! .with_subscription_matched(|reader, subscription_matched| {
132//! println!("{reader:?} had a subscription match: {subscription_matched:?}")
133//! })
134//! .with_sample_lost(|reader, sample_lost| {
135//! println!(
136//! "{reader:?} lost samples: {} just lost, {} lost in total",
137//! sample_lost.total.delta, sample_lost.total.count
138//! )
139//! }),
140//! )
141//! .build()?;
142//!
143//! let writer = Writer::builder(&topic)
144//! .with_listener(
145//! WriterListener::new()
146//! .with_publication_matched(|writer, publication_matched| {
147//! println!("{writer:?} has a publication match: {publication_matched:?}")
148//! })
149//! .with_liveliness_lost(|writer, liveliness_lost| {
150//! println!(
151//! "{writer:?} liveliness lost: {} just lost, {} lost in total",
152//! liveliness_lost.total.delta, liveliness_lost.total.count
153//! )
154//! }),
155//! )
156//! .build()?;
157//! # Ok::<_, cyclonedds::Error>(())
158//! ```
159
160use crate::Result;
161use crate::internal::ffi;
162use crate::internal::traits::AsFfi;
163use crate::status::{
164 InconsistentTopic, LivelinessChanged, LivelinessLost, OfferedDeadlineMissed,
165 OfferedIncompatibleQoS, PublicationMatched, RequestedDeadlineMissed, RequestedIncompatibleQoS,
166 SampleLost, SampleRejected, SubscriptionMatched,
167};
168
169/// Listener attached to a [`Participant`](crate::Participant).
170///
171/// In the DDS entity hierarchy this composes [`SubscriberListener`],
172/// [`PublisherListener`], and [`TopicListener`]. When attached to a
173/// participant, entities created under it inherit any of the configured
174/// callbacks that apply to that entity type.
175///
176/// # Examples
177///
178/// ```
179/// use cyclonedds::{Domain, Listener, Participant, Subscriber};
180///
181/// let domain = Domain::default();
182/// let listener = Listener::new().with_subscriber(|subscriber_listener| {
183/// subscriber_listener
184/// .with_data_on_readers(|subscriber| println!("{subscriber:?} has data on readers"))
185/// });
186/// let participant = Participant::builder(&domain)
187/// .with_listener(&listener)
188/// .build()?;
189///
190/// // This subscriber inherits the callbacks set on the `participant` via the `listener`.
191/// let subscriber = Subscriber::new(&participant)?;
192///
193/// // This subscriber will have the subscriber subset associated with the `listener` directly
194/// // applied to it.
195/// let subscriber = Subscriber::builder(&participant)
196/// .with_listener(&listener)
197/// .build()?;
198/// # Ok::<_, cyclonedds::Error>(())
199/// ```
200#[derive(Debug, Default, Clone, Copy)]
201pub struct Listener {
202 // topic: TopicListener<T>,
203 subscriber: SubscriberListener,
204 publisher: PublisherListener,
205}
206
207/// Listener attached to a [`Topic<T>`](crate::Topic<T>).
208#[derive(Debug, Clone, Copy)]
209pub struct TopicListener<T>
210where
211 T: crate::Topicable,
212{
213 inconsistent_topic: Option<fn(&crate::Topic<'_, '_, T>, InconsistentTopic)>,
214}
215
216/// Listener attached to a [`Subscriber`](crate::Subscriber).
217///
218/// <div class="warning">
219///
220/// Currently [`SubscriberListener`] is missing its configuration for composing
221/// a [`ReaderListener<T>`] under this non-generic type. See the [module-level
222/// warning](crate::listener#warning) for more detail.
223///
224/// </div>
225#[derive(Debug, Default, Clone, Copy)]
226pub struct SubscriberListener {
227 data_on_readers: Option<fn(&crate::Subscriber<'_, '_>)>,
228 // ///
229 // pub reader: ReaderListener<T>,
230}
231
232/// Listener attached to a [`Reader<T>`](crate::Reader<T>).
233#[derive(Debug, Clone, Copy)]
234pub struct ReaderListener<T>
235where
236 T: crate::Topicable,
237{
238 sample_lost: Option<fn(&crate::Reader<'_, '_, '_, T>, SampleLost)>,
239 data_available: Option<fn(&crate::Reader<'_, '_, '_, T>)>,
240 sample_rejected: Option<fn(&crate::Reader<'_, '_, '_, T>, SampleRejected)>,
241 liveliness_changed: Option<fn(&crate::Reader<'_, '_, '_, T>, LivelinessChanged)>,
242 requested_deadline_missed: Option<fn(&crate::Reader<'_, '_, '_, T>, RequestedDeadlineMissed)>,
243 requested_incompatible_qos: Option<fn(&crate::Reader<'_, '_, '_, T>, RequestedIncompatibleQoS)>,
244 subscription_matched: Option<fn(&crate::Reader<'_, '_, '_, T>, SubscriptionMatched)>,
245}
246
247/// Listener attached to a [`Publisher`](crate::Publisher).
248///
249/// <div class="warning">
250///
251/// Currently [`PublisherListener`] has no registered callbacks pending a
252/// solution for composing [`WriterListener<T>`] under this non-generic type.
253/// See the [module-level warning](crate::listener#warning) for more detail.
254///
255/// </div>
256#[derive(Debug, Default, Clone, Copy)]
257pub struct PublisherListener {
258 // ///
259 // pub writer: WriterListener<T>,
260}
261
262/// Listener attached to a [`Writer<T>`](crate::Writer<T>).
263#[derive(Debug, Clone, Copy)]
264pub struct WriterListener<T>
265where
266 T: crate::Topicable,
267{
268 liveliness_lost: Option<fn(&crate::Writer<'_, '_, '_, T>, LivelinessLost)>,
269 offered_deadline_missed: Option<fn(&crate::Writer<'_, '_, '_, T>, OfferedDeadlineMissed)>,
270 offered_incompatible_qos: Option<fn(&crate::Writer<'_, '_, '_, T>, OfferedIncompatibleQoS)>,
271 publication_matched: Option<fn(&crate::Writer<'_, '_, '_, T>, PublicationMatched)>,
272}
273
274impl<T> Default for TopicListener<T>
275where
276 T: crate::Topicable,
277{
278 fn default() -> Self {
279 Self {
280 inconsistent_topic: Option::default(),
281 }
282 }
283}
284
285impl<T> Default for ReaderListener<T>
286where
287 T: crate::Topicable,
288{
289 fn default() -> Self {
290 Self {
291 sample_lost: Option::default(),
292 data_available: Option::default(),
293 sample_rejected: Option::default(),
294 liveliness_changed: Option::default(),
295 requested_deadline_missed: Option::default(),
296 requested_incompatible_qos: Option::default(),
297 subscription_matched: Option::default(),
298 }
299 }
300}
301
302impl<T> Default for WriterListener<T>
303where
304 T: crate::Topicable,
305{
306 fn default() -> Self {
307 Self {
308 liveliness_lost: Option::default(),
309 offered_deadline_missed: Option::default(),
310 offered_incompatible_qos: Option::default(),
311 publication_matched: Option::default(),
312 }
313 }
314}
315
316impl Listener {
317 /// Creates a new [`Listener`] with no callbacks registered.
318 ///
319 /// # Examples
320 ///
321 /// ```
322 /// use cyclonedds::Listener;
323 ///
324 /// let listener = Listener::new();
325 /// ```
326 #[must_use]
327 pub fn new() -> Self {
328 Self::default()
329 }
330
331 // ///
332 // pub fn with_topic(mut self, setter: fn(TopicListener<T>) -> TopicListener<T>)
333 // -> Self { self.topic = setter(self.topic);
334 // self
335 // }
336
337 /// Configures the [`SubscriberListener`] via a setter callback.
338 ///
339 /// # Examples
340 ///
341 /// ```
342 /// use cyclonedds::Listener;
343 ///
344 /// let listener = Listener::new().with_subscriber(|s| {
345 /// s.with_data_on_readers(|subscriber| {
346 /// println!("data available on a reader");
347 /// })
348 /// });
349 /// ```
350 #[must_use]
351 pub fn with_subscriber(mut self, setter: fn(SubscriberListener) -> SubscriberListener) -> Self {
352 self.subscriber = setter(self.subscriber);
353 self
354 }
355
356 /// Configures the [`PublisherListener`] via a setter callback.
357 ///
358 /// # Examples
359 ///
360 /// <div class="warning">
361 ///
362 /// This example does not compile because the [`PublisherListener`] does not
363 /// have its `with_writer::<T>` setter yet. This is due to the fact that
364 /// the higher-level listeners are untyped in `<T>` but the lower-level
365 /// listeners are typed in `<T>` and a solution for crossing this boundary
366 /// still needs to be worked out.
367 ///
368 /// See the [module-level warning](crate::listener#warning) for more detail.
369 ///
370 /// </div>
371 ///
372 /// ```ignore
373 /// use cyclonedds::Listener;
374 ///
375 /// let listener = Listener::new().with_publisher(|p| {
376 /// p.with_writer(|w| {
377 /// w.with_publication_matched(|writer, publication_matched| {
378 /// println!("{writer:?} has publication match: {publication_matched:?}")
379 /// })
380 /// })
381 /// });
382 /// ```
383 #[must_use]
384 pub fn with_publisher(mut self, setter: fn(PublisherListener) -> PublisherListener) -> Self {
385 self.publisher = setter(self.publisher);
386 self
387 }
388
389 #[inline]
390 pub(crate) fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
391 // self.topic.apply_listener_ffi(listener);
392 self.subscriber.apply_listener_ffi(listener);
393 self.publisher.apply_listener_ffi(listener);
394 }
395}
396
397impl AsFfi for Listener {
398 type Target<'a> = Result<ffi::Listener>;
399
400 #[inline]
401 fn as_ffi(&self) -> Self::Target<'_> {
402 ffi::Listener::new().map(|mut listener| {
403 self.apply_listener_ffi(&mut listener);
404 listener
405 })
406 }
407}
408
409impl<T> TopicListener<T>
410where
411 T: crate::Topicable,
412{
413 /// Creates a new [`TopicListener<T>`] with no callbacks registered.
414 ///
415 /// # Examples
416 ///
417 /// ```
418 /// use cyclonedds::TopicListener;
419 /// # #[derive(
420 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
421 /// # )]
422 /// # struct Data {
423 /// # x: i32,
424 /// # }
425 ///
426 /// let listener = TopicListener::<Data>::new();
427 /// ```
428 #[must_use]
429 pub fn new() -> Self {
430 Self::default()
431 }
432
433 /// Sets a callback for the
434 /// [`InconsistentTopic` status event](crate::Status::InconsistentTopic).
435 ///
436 /// The callback receives an
437 /// [`InconsistentTopic` metadata struct](InconsistentTopic).
438 ///
439 /// Fired when a remote topic is discovered with the same name but an
440 /// incompatible type or [`QoS`](crate::QoS).
441 ///
442 /// # Examples
443 ///
444 /// ```
445 /// use cyclonedds::listener::TopicListener;
446 /// # #[derive(
447 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
448 /// # )]
449 /// # struct Data {
450 /// # x: i32,
451 /// # }
452 ///
453 /// let listener =
454 /// TopicListener::<Data>::new().with_inconsistent_topic(|topic, inconsistent_topic| {
455 /// println!("inconsistent topic: {inconsistent_topic:?}");
456 /// });
457 /// ```
458 #[must_use]
459 pub fn with_inconsistent_topic(
460 mut self,
461 callback: fn(&crate::Topic<'_, '_, T>, InconsistentTopic),
462 ) -> Self {
463 self.inconsistent_topic = Some(callback);
464 self
465 }
466
467 #[inline]
468 pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
469 if let Some(callback) = self.inconsistent_topic {
470 ffi::dds_listener_set_inconsistent_topic(listener, callback);
471 }
472 }
473}
474
475impl<T> AsFfi for TopicListener<T>
476where
477 T: crate::Topicable,
478{
479 type Target<'a>
480 = Result<ffi::Listener>
481 where
482 T: 'a;
483
484 #[inline]
485 fn as_ffi(&self) -> Self::Target<'_> {
486 ffi::Listener::new().map(|mut listener| {
487 self.apply_listener_ffi(&mut listener);
488 listener
489 })
490 }
491}
492
493impl SubscriberListener {
494 /// Creates a new [`SubscriberListener`] with no callbacks registered.
495 ///
496 /// # Examples
497 ///
498 /// ```
499 /// use cyclonedds::SubscriberListener;
500 ///
501 /// let listener = SubscriberListener::new();
502 /// ```
503 #[must_use]
504 pub fn new() -> Self {
505 Self::default()
506 }
507
508 // ///
509 // pub fn with_reader(mut self, setter: fn(ReaderListener<T>) ->
510 // ReaderListener<T>) -> Self { self.reader = setter(self.reader);
511 // self
512 // }
513
514 /// Sets a callback for the [`DataOnReaders` status
515 /// event](crate::Status::DataOnReaders).
516 ///
517 /// Fired when new data is available on one or more readers belonging to
518 /// this subscriber.
519 ///
520 /// # Examples
521 ///
522 /// ```
523 /// use cyclonedds::SubscriberListener;
524 ///
525 /// let listener = SubscriberListener::new().with_data_on_readers(|subscriber| {
526 /// println!("data available on {subscriber:?}");
527 /// });
528 /// ```
529 #[must_use]
530 pub fn with_data_on_readers(mut self, callback: fn(&crate::Subscriber<'_, '_>)) -> Self {
531 self.data_on_readers = Some(callback);
532 self
533 }
534
535 #[inline]
536 pub(crate) fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
537 if let Some(callback) = self.data_on_readers {
538 ffi::dds_listener_set_data_on_readers(listener, callback);
539 }
540 // self.reader.apply_listener_ffi(listener);
541 }
542}
543
544impl AsFfi for SubscriberListener {
545 type Target<'a> = Result<ffi::Listener>;
546
547 #[inline]
548 fn as_ffi(&self) -> Self::Target<'_> {
549 ffi::Listener::new().map(|mut listener| {
550 self.apply_listener_ffi(&mut listener);
551 listener
552 })
553 }
554}
555
556impl PublisherListener {
557 /// Creates a new [`PublisherListener`] with no callbacks registered.
558 ///
559 /// # Examples
560 ///
561 /// ```
562 /// use cyclonedds::PublisherListener;
563 ///
564 /// let listener = PublisherListener::new();
565 /// ```
566 #[must_use]
567 pub fn new() -> Self {
568 Self::default()
569 }
570
571 // ///
572 // pub fn with_writer(mut self, setter: fn(WriterListener<T>) ->
573 // WriterListener<T>) -> Self { self.writer = setter(self.writer);
574 // self
575 // }
576
577 #[inline]
578 pub(crate) const fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
579 let _ = self;
580 let _ = listener;
581 // self.writer.apply_listener_ffi(listener);
582 }
583}
584
585impl AsFfi for PublisherListener {
586 type Target<'a> = Result<ffi::Listener>;
587
588 #[inline]
589 fn as_ffi(&self) -> Self::Target<'_> {
590 ffi::Listener::new().map(|mut listener| {
591 self.apply_listener_ffi(&mut listener);
592 listener
593 })
594 }
595}
596
597impl<T> ReaderListener<T>
598where
599 T: crate::Topicable,
600{
601 /// Creates a new [`ReaderListener<T>`] with no callbacks registered.
602 ///
603 /// # Examples
604 ///
605 /// ```
606 /// use cyclonedds::listener::ReaderListener;
607 /// # #[derive(
608 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
609 /// # )]
610 /// # struct Data {
611 /// # x: i32,
612 /// # }
613 ///
614 /// let listener = ReaderListener::<Data>::new();
615 /// ```
616 #[must_use]
617 pub fn new() -> Self {
618 Self::default()
619 }
620
621 /// Sets a callback for the [`SampleLost` status
622 /// event](crate::Status::SampleLost).
623 ///
624 /// The callback receives a [`SampleLost` metadata struct](SampleLost).
625 ///
626 /// Fired when a sample is lost, meaning it was never received by this
627 /// reader due to resource limits or [`QoS`](crate::QoS) constraints.
628 ///
629 /// # Examples
630 ///
631 /// ```
632 /// use cyclonedds::listener::ReaderListener;
633 /// # #[derive(
634 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
635 /// # )]
636 /// # struct Data {
637 /// # x: i32,
638 /// # }
639 ///
640 /// let listener = ReaderListener::<Data>::new().with_sample_lost(|reader, sample_lost| {
641 /// println!("samples lost: {}", sample_lost.total.count);
642 /// });
643 /// ```
644 #[must_use]
645 pub fn with_sample_lost(
646 mut self,
647 callback: fn(&crate::Reader<'_, '_, '_, T>, SampleLost),
648 ) -> Self {
649 self.sample_lost = Some(callback);
650 self
651 }
652
653 /// Sets a callback for the [`DataAvailable` status
654 /// event](crate::Status::DataAvailable).
655 ///
656 /// Fired when new data is available to be [`peeked`](crate::Reader::peek),
657 /// [`read`](crate::Reader::read), or [`taken`](crate::Reader::take) from
658 /// this reader.
659 ///
660 /// # Examples
661 ///
662 /// ```
663 /// use cyclonedds::listener::ReaderListener;
664 /// # #[derive(
665 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
666 /// # )]
667 /// # struct Data {
668 /// # x: i32,
669 /// # }
670 ///
671 /// let listener = ReaderListener::<Data>::new().with_data_available(|reader| {
672 /// println!("data available on {reader:?}");
673 /// });
674 /// ```
675 #[must_use]
676 pub fn with_data_available(mut self, callback: fn(&crate::Reader<'_, '_, '_, T>)) -> Self {
677 self.data_available = Some(callback);
678 self
679 }
680
681 /// Sets a callback for the
682 /// [`SampleRejected` status event](crate::Status::SampleRejected).
683 ///
684 /// The callback receives a [`SampleRejected` metadata
685 /// struct](SampleRejected).
686 ///
687 /// Fired when an incoming sample is rejected due to
688 /// [`ResourceLimits`](crate::qos::policy::ResourceLimits).
689 ///
690 /// # Examples
691 ///
692 /// ```
693 /// use cyclonedds::listener::ReaderListener;
694 /// # #[derive(
695 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
696 /// # )]
697 /// # struct Data {
698 /// # x: i32,
699 /// # }
700 /// let listener = ReaderListener::<Data>::new().with_sample_rejected(|reader, status| {
701 /// println!("sample rejected: {status:?}");
702 /// });
703 /// ```
704 #[must_use]
705 pub fn with_sample_rejected(
706 mut self,
707 callback: fn(&crate::Reader<'_, '_, '_, T>, SampleRejected),
708 ) -> Self {
709 self.sample_rejected = Some(callback);
710 self
711 }
712
713 /// Sets a callback for the
714 /// [`LivelinessChanged` status event](crate::Status::LivelinessChanged).
715 ///
716 /// The callback receives a
717 /// [`LivelinessChanged` metadata struct](LivelinessChanged).
718 ///
719 /// Fired when the [`Liveliness`](crate::qos::policy::Liveliness) of a
720 /// matched writer changes, i.e. a writer becomes active or inactive.
721 ///
722 /// # Examples
723 ///
724 /// ```
725 /// use cyclonedds::listener::ReaderListener;
726 /// # #[derive(
727 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
728 /// # )]
729 /// # struct Data {
730 /// # x: i32,
731 /// # }
732 ///
733 /// let listener =
734 /// ReaderListener::<Data>::new().with_liveliness_changed(|reader, liveliness_changed| {
735 /// println!("active writers: {}", liveliness_changed.alive.count);
736 /// });
737 /// ```
738 #[must_use]
739 pub fn with_liveliness_changed(
740 mut self,
741 callback: fn(&crate::Reader<'_, '_, '_, T>, LivelinessChanged),
742 ) -> Self {
743 self.liveliness_changed = Some(callback);
744 self
745 }
746
747 /// Sets a callback for the
748 /// [`RequestedDeadlineMissed` status
749 /// event](crate::Status::RequestedDeadlineMissed).
750 ///
751 /// The callback receives a
752 /// [`RequestedDeadlineMissed` metadata struct](RequestedDeadlineMissed).
753 ///
754 /// Fired when a sample is not received within the
755 /// [`Deadline`](crate::qos::policy::Deadline) period offered by a matched
756 /// writer.
757 ///
758 /// # Examples
759 ///
760 /// ```
761 /// use cyclonedds::listener::ReaderListener;
762 /// # #[derive(
763 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
764 /// # )]
765 /// # struct Data {
766 /// # x: i32,
767 /// # }
768 ///
769 /// let listener = ReaderListener::<Data>::new().with_requested_deadline_missed(
770 /// |reader, requested_deadline_missed| {
771 /// println!("deadline missed: {}", requested_deadline_missed.total.count);
772 /// },
773 /// );
774 /// ```
775 #[must_use]
776 pub fn with_requested_deadline_missed(
777 mut self,
778 callback: fn(&crate::Reader<'_, '_, '_, T>, RequestedDeadlineMissed),
779 ) -> Self {
780 self.requested_deadline_missed = Some(callback);
781 self
782 }
783
784 /// Sets a callback for the
785 /// [`RequestedIncompatibleQoS` status
786 /// event](crate::Status::RequestedIncompatibleQoS).
787 ///
788 /// The callback receives a
789 /// [`RequestedIncompatibleQoS` metadata struct](RequestedIncompatibleQoS).
790 ///
791 /// Fired when a writer is discovered whose offered [`QoS`](crate::QoS) is
792 /// incompatible with this reader's requested [`QoS`](crate::QoS).
793 ///
794 /// # Examples
795 ///
796 /// ```
797 /// use cyclonedds::listener::ReaderListener;
798 /// # #[derive(
799 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
800 /// # )]
801 /// # struct Data {
802 /// # x: i32,
803 /// # }
804 ///
805 /// let listener = ReaderListener::<Data>::new().with_requested_incompatible_qos(
806 /// |reader, requested_incompatible_qos| {
807 /// println!("incompatible QoS: {requested_incompatible_qos:?}");
808 /// },
809 /// );
810 /// ```
811 #[must_use]
812 pub fn with_requested_incompatible_qos(
813 mut self,
814 callback: fn(&crate::Reader<'_, '_, '_, T>, RequestedIncompatibleQoS),
815 ) -> Self {
816 self.requested_incompatible_qos = Some(callback);
817 self
818 }
819
820 /// Sets a callback for the
821 /// [`SubscriptionMatched` status event](crate::Status::SubscriptionMatched)
822 /// status event.
823 ///
824 /// The callback receives a
825 /// [`SubscriptionMatched` metadata struct](SubscriptionMatched).
826 ///
827 /// Fired when a writer matching this reader's topic and [`QoS`](crate::QoS)
828 /// is discovered or lost.
829 ///
830 /// # Examples
831 ///
832 /// ```
833 /// use cyclonedds::listener::ReaderListener;
834 /// # #[derive(
835 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
836 /// # )]
837 /// # struct Data {
838 /// # x: i32,
839 /// # }
840 ///
841 /// let listener =
842 /// ReaderListener::<Data>::new().with_subscription_matched(|reader, subscription_matched| {
843 /// println!("matched writers: {}", subscription_matched.current.count);
844 /// });
845 /// ```
846 #[must_use]
847 pub fn with_subscription_matched(
848 mut self,
849 callback: fn(&crate::Reader<'_, '_, '_, T>, SubscriptionMatched),
850 ) -> Self {
851 self.subscription_matched = Some(callback);
852 self
853 }
854
855 #[inline]
856 pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
857 if let Some(callback) = self.sample_lost {
858 ffi::dds_listener_set_sample_lost(listener, callback);
859 }
860 if let Some(callback) = self.data_available {
861 ffi::dds_listener_set_data_available(listener, callback);
862 }
863 if let Some(callback) = self.sample_rejected {
864 ffi::dds_listener_set_sample_rejected(listener, callback);
865 }
866 if let Some(callback) = self.liveliness_changed {
867 ffi::dds_listener_set_liveliness_changed(listener, callback);
868 }
869 if let Some(callback) = self.requested_deadline_missed {
870 ffi::dds_listener_set_requested_deadline_missed(listener, callback);
871 }
872 if let Some(callback) = self.requested_incompatible_qos {
873 ffi::dds_listener_set_requested_incompatible_qos(listener, callback);
874 }
875 if let Some(callback) = self.subscription_matched {
876 ffi::dds_listener_set_subscription_matched(listener, callback);
877 }
878 }
879}
880
881impl<T> AsFfi for ReaderListener<T>
882where
883 T: crate::Topicable,
884{
885 type Target<'a>
886 = Result<ffi::Listener>
887 where
888 T: 'a;
889
890 #[inline]
891 fn as_ffi(&self) -> Self::Target<'_> {
892 ffi::Listener::new().map(|mut listener| {
893 self.apply_listener_ffi(&mut listener);
894 listener
895 })
896 }
897}
898
899impl<T> WriterListener<T>
900where
901 T: crate::Topicable,
902{
903 /// Creates a new [`WriterListener<T>`] with no callbacks registered.
904 ///
905 /// # Examples
906 ///
907 /// ```
908 /// use cyclonedds::TopicListener;
909 /// # #[derive(
910 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
911 /// # )]
912 /// # struct Data {
913 /// # x: i32,
914 /// # }
915 ///
916 /// let listener = TopicListener::<Data>::new();
917 /// ```
918 #[must_use]
919 pub fn new() -> Self {
920 Self::default()
921 }
922
923 /// Sets a callback for the
924 /// [`LivelinessLost` status event](crate::Status::LivelinessLost).
925 ///
926 /// The callback receives a [`LivelinessLost` metadata
927 /// struct](LivelinessLost).
928 ///
929 /// Fired when the writer fails to meet its
930 /// [`Liveliness`](crate::qos::policy::Liveliness) policy and is considered
931 /// inactive by matched readers.
932 ///
933 /// # Examples
934 ///
935 /// ```
936 /// use cyclonedds::listener::WriterListener;
937 /// # #[derive(
938 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
939 /// # )]
940 /// # struct Data {
941 /// # x: i32,
942 /// # }
943 ///
944 /// let listener = WriterListener::<Data>::new().with_liveliness_lost(|writer, liveliness_lost| {
945 /// println!(
946 /// "{writer:?} liveliness lost: {}",
947 /// liveliness_lost.total.count
948 /// );
949 /// });
950 /// ```
951 #[must_use]
952 pub fn with_liveliness_lost(
953 mut self,
954 callback: fn(&crate::Writer<'_, '_, '_, T>, LivelinessLost),
955 ) -> Self {
956 self.liveliness_lost = Some(callback);
957 self
958 }
959
960 /// Sets a callback for the
961 /// [`OfferedDeadlineMissed` status
962 /// event](crate::Status::OfferedDeadlineMissed) status event.
963 ///
964 /// The callback receives an
965 /// [`OfferedDeadlineMissed` metadata struct](OfferedDeadlineMissed).
966 ///
967 /// Fired when the writer fails to write a new sample within its offered
968 /// [`Deadline`](crate::qos::policy::Deadline) period for one or more
969 /// instances.
970 ///
971 /// # Examples
972 ///
973 /// ```
974 /// use cyclonedds::listener::WriterListener;
975 /// # #[derive(
976 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
977 /// # )]
978 /// # struct Data {
979 /// # x: i32,
980 /// # }
981 ///
982 /// let listener = WriterListener::<Data>::new().with_offered_deadline_missed(
983 /// |writer, offered_deadline_missed| {
984 /// println!(
985 /// "{writer:?} deadline missed: {}",
986 /// offered_deadline_missed.total.count
987 /// );
988 /// },
989 /// );
990 /// ```
991 #[must_use]
992 pub fn with_offered_deadline_missed(
993 mut self,
994 callback: fn(&crate::Writer<'_, '_, '_, T>, OfferedDeadlineMissed),
995 ) -> Self {
996 self.offered_deadline_missed = Some(callback);
997 self
998 }
999
1000 /// Sets a callback for the
1001 /// [`OfferedIncompatibleQoS` status
1002 /// event](crate::Status::OfferedIncompatibleQoS) status event.
1003 ///
1004 /// The callback receives an
1005 /// [`OfferedIncompatibleQoS` metadata struct](OfferedIncompatibleQoS).
1006 ///
1007 /// Fired when a reader is discovered whose requested [`QoS`](crate::QoS) is
1008 /// incompatible with this writer's offered [`QoS`](crate::QoS).
1009 ///
1010 /// # Examples
1011 ///
1012 /// ```
1013 /// use cyclonedds::listener::WriterListener;
1014 /// # #[derive(
1015 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
1016 /// # )]
1017 /// # struct Data {
1018 /// # x: i32,
1019 /// # }
1020 ///
1021 /// let listener = WriterListener::<Data>::new().with_offered_incompatible_qos(
1022 /// |writer, offered_incompatible_qos| {
1023 /// println!("{writer:?} discovered incompatible QoS: {offered_incompatible_qos:?}");
1024 /// },
1025 /// );
1026 /// ```
1027 #[must_use]
1028 pub fn with_offered_incompatible_qos(
1029 mut self,
1030 callback: fn(&crate::Writer<'_, '_, '_, T>, OfferedIncompatibleQoS),
1031 ) -> Self {
1032 self.offered_incompatible_qos = Some(callback);
1033 self
1034 }
1035
1036 /// Sets a callback for the
1037 /// [`PublicationMatched` status event](crate::Status::PublicationMatched).
1038 ///
1039 /// The callback receives a
1040 /// [`PublicationMatched` metadata struct](PublicationMatched).
1041 ///
1042 /// Fired when a reader matching this writer's topic and [`QoS`](crate::QoS)
1043 /// is discovered.
1044 ///
1045 /// # Examples
1046 ///
1047 /// ```
1048 /// use cyclonedds::listener::WriterListener;
1049 /// # #[derive(
1050 /// # cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
1051 /// # )]
1052 /// # struct Data {
1053 /// # x: i32,
1054 /// # }
1055 ///
1056 /// let listener = WriterListener::<Data>::new().with_publication_matched(|writer, status| {
1057 /// println!("{writer:?} matched readers: {}", status.current.count);
1058 /// });
1059 /// ```
1060 #[must_use]
1061 pub fn with_publication_matched(
1062 mut self,
1063 callback: fn(&crate::Writer<'_, '_, '_, T>, PublicationMatched),
1064 ) -> Self
1065 where
1066 T: crate::Topicable,
1067 {
1068 self.publication_matched = Some(callback);
1069 self
1070 }
1071
1072 #[inline]
1073 pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
1074 if let Some(callback) = self.liveliness_lost {
1075 ffi::dds_listener_set_liveliness_lost(listener, callback);
1076 }
1077 if let Some(callback) = self.offered_deadline_missed {
1078 ffi::dds_listener_set_offered_deadline_missed(listener, callback);
1079 }
1080 if let Some(callback) = self.offered_incompatible_qos {
1081 ffi::dds_listener_set_offered_incompatible_qos(listener, callback);
1082 }
1083 if let Some(callback) = self.publication_matched {
1084 ffi::dds_listener_set_publication_matched(listener, callback);
1085 }
1086 }
1087}
1088
1089impl<T> AsFfi for WriterListener<T>
1090where
1091 T: crate::Topicable,
1092{
1093 type Target<'a>
1094 = Result<ffi::Listener>
1095 where
1096 T: 'a;
1097
1098 #[inline]
1099 fn as_ffi(&self) -> Self::Target<'_> {
1100 ffi::Listener::new().map(|mut listener| {
1101 self.apply_listener_ffi(&mut listener);
1102 listener
1103 })
1104 }
1105}
1106
1107impl<T> AsRef<ReaderListener<T>> for ReaderListener<T>
1108where
1109 T: crate::Topicable,
1110{
1111 fn as_ref(&self) -> &ReaderListener<T> {
1112 self
1113 }
1114}
1115impl<T> AsRef<WriterListener<T>> for WriterListener<T>
1116where
1117 T: crate::Topicable,
1118{
1119 fn as_ref(&self) -> &WriterListener<T> {
1120 self
1121 }
1122}
1123impl AsRef<SubscriberListener> for SubscriberListener {
1124 fn as_ref(&self) -> &SubscriberListener {
1125 self
1126 }
1127}
1128impl AsRef<PublisherListener> for PublisherListener {
1129 fn as_ref(&self) -> &PublisherListener {
1130 self
1131 }
1132}
1133impl<T> AsRef<TopicListener<T>> for TopicListener<T>
1134where
1135 T: crate::Topicable,
1136{
1137 fn as_ref(&self) -> &TopicListener<T> {
1138 self
1139 }
1140}
1141impl AsRef<Listener> for Listener {
1142 fn as_ref(&self) -> &Listener {
1143 self
1144 }
1145}
1146
1147// impl<T> AsRef<ReaderListener<T>> for Listener<T> {
1148// fn as_ref(&self) -> &ReaderListener<T> {
1149// &self.subscriber.reader
1150// }
1151// }
1152// impl<T> AsRef<WriterListener<T>> for Listener<T> {
1153// fn as_ref(&self) -> &WriterListener<T> {
1154// &self.publisher.writer
1155// }
1156// }
1157impl AsRef<SubscriberListener> for Listener {
1158 fn as_ref(&self) -> &SubscriberListener {
1159 &self.subscriber
1160 }
1161}
1162impl AsRef<PublisherListener> for Listener {
1163 fn as_ref(&self) -> &PublisherListener {
1164 &self.publisher
1165 }
1166}
1167// impl<T> AsRef<TopicListener<T>> for Listener<T> {
1168// fn as_ref(&self) -> &TopicListener<T> {
1169// &self.topic
1170// }
1171// }
1172
1173// impl<T> AsRef<ReaderListener<T>> for SubscriberListener<T> {
1174// fn as_ref(&self) -> &ReaderListener<T> {
1175// &self.reader
1176// }
1177// }
1178// impl<T> AsRef<WriterListener<T>> for PublisherListener<T> {
1179// fn as_ref(&self) -> &WriterListener<T> {
1180// &self.writer
1181// }
1182// }
1183
1184#[cfg(test)]
1185mod tests {
1186 use super::*;
1187 use crate::Topicable;
1188
1189 fn receive_listener<L>(listener: L)
1190 where
1191 L: AsRef<Listener>,
1192 {
1193 let _ = listener.as_ref();
1194 }
1195
1196 fn receive_topic_listener<L, T>(listener: L)
1197 where
1198 L: AsRef<TopicListener<T>>,
1199 T: crate::Topicable,
1200 {
1201 let _ = listener.as_ref();
1202 }
1203
1204 fn receive_subscriber_listener<L>(listener: L)
1205 where
1206 L: AsRef<SubscriberListener>,
1207 {
1208 let _ = listener.as_ref();
1209 }
1210
1211 fn receive_publisher_listener<L>(listener: L)
1212 where
1213 L: AsRef<PublisherListener>,
1214 {
1215 let _ = listener.as_ref();
1216 }
1217
1218 fn receive_reader_listener<L, T>(listener: L)
1219 where
1220 L: AsRef<ReaderListener<T>>,
1221 T: crate::Topicable,
1222 {
1223 let _ = listener.as_ref();
1224 }
1225
1226 fn receive_writer_listener<L, T>(listener: L)
1227 where
1228 L: AsRef<WriterListener<T>>,
1229 T: crate::Topicable,
1230 {
1231 let _ = listener.as_ref();
1232 }
1233
1234 #[test]
1235 fn test_listener_create() {
1236 let listener = Listener::new()
1237 // .with_topic(|topic| topic.with_inconsistent_topic(|_, _| ()))
1238 .with_subscriber(|subscriber| {
1239 subscriber.with_data_on_readers(|_| ())
1240 // .with_reader(|reader| {
1241 // reader
1242 // .with_data_available(|_| ())
1243 // .with_liveliness_changed(|_, _| ())
1244 // .with_requested_deadline_missed(|_, _| ())
1245 // .with_requested_incompatible_qos(|_, _| ())
1246 // .with_sample_lost(|_, _| ())
1247 // .with_sample_rejected(|_, _| ())
1248 // .with_subscription_matched(|_, _| ())
1249 // })
1250 })
1251 .with_publisher(|publisher| {
1252 publisher
1253 // .with_writer(|writer| {
1254 // writer
1255 // .with_liveliness_lost(|_, _| ())
1256 // .with_offered_deadline_missed(|_, _| ())
1257 // .with_offered_incompatible_qos(|_, _| ())
1258 // .with_publication_matched(|_, _| ())
1259 // })
1260 });
1261 let topic_listener =
1262 TopicListener::<crate::tests::topic::Data>::new().with_inconsistent_topic(|_, _| ());
1263 let subscriber_listener = SubscriberListener::new()
1264 .with_data_on_readers(|_| ())
1265 // .with_reader(|reader| {
1266 // reader
1267 // .with_data_available(|_| ())
1268 // .with_liveliness_changed(|_, _| ())
1269 // .with_requested_deadline_missed(|_, _| ())
1270 // .with_requested_incompatible_qos(|_, _| ())
1271 // .with_sample_lost(|_, _| ())
1272 // .with_sample_rejected(|_, _| ())
1273 // .with_subscription_matched(|_, _| ())
1274 // })
1275 ;
1276 let publisher_listener =
1277 PublisherListener::new()
1278 // .with_writer(|writer| {
1279 // writer
1280 // .with_liveliness_lost(|_, _| ())
1281 // .with_offered_deadline_missed(|_, _| ())
1282 // .with_offered_incompatible_qos(|_, _| ())
1283 // .with_publication_matched(|_, _| ())
1284 // })
1285 ;
1286 let reader_listener = ReaderListener::<crate::tests::topic::Data>::new()
1287 .with_data_available(|_| ())
1288 .with_liveliness_changed(|_, _| ())
1289 .with_requested_deadline_missed(|_, _| ())
1290 .with_requested_incompatible_qos(|_, _| ())
1291 .with_sample_lost(|_, _| ())
1292 .with_sample_rejected(|_, _| ())
1293 .with_subscription_matched(|_, _| ());
1294 let writer_listener = WriterListener::<crate::tests::topic::Data>::new()
1295 .with_liveliness_lost(|_, _| ())
1296 .with_offered_deadline_missed(|_, _| ())
1297 .with_offered_incompatible_qos(|_, _| ())
1298 .with_publication_matched(|_, _| ());
1299
1300 receive_listener(listener);
1301
1302 receive_topic_listener(&topic_listener);
1303 // receive_topic_listener(&listener);
1304
1305 receive_subscriber_listener(subscriber_listener);
1306 receive_subscriber_listener(listener);
1307
1308 receive_publisher_listener(publisher_listener);
1309 receive_publisher_listener(listener);
1310
1311 receive_reader_listener(&reader_listener);
1312 // receive_reader_listener(&subscriber_listener);
1313 // receive_reader_listener(&listener);
1314
1315 receive_writer_listener(&writer_listener);
1316 // receive_writer_listener(&publisher_listener);
1317 // receive_writer_listener(&listener);
1318 }
1319
1320 #[test]
1321 fn test_subscriber_listener_callbacks() {
1322 #[derive(Debug, PartialEq)]
1323 struct Triggered {
1324 data_on_readers: u32,
1325 }
1326
1327 static TRIGGERED: std::sync::Mutex<Triggered> =
1328 std::sync::Mutex::new(Triggered { data_on_readers: 0 });
1329
1330 let domain_id = crate::tests::domain::unique_id();
1331 let topic_name = crate::tests::topic::unique_name();
1332 let domain = crate::Domain::new(domain_id).unwrap();
1333
1334 let participant = crate::Participant::new(&domain).unwrap();
1335 let topic =
1336 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1337 let subscriber = crate::Subscriber::builder(&participant)
1338 .with_listener(
1339 crate::SubscriberListener::new().with_data_on_readers(|_subscriber| {
1340 TRIGGERED.lock().unwrap().data_on_readers += 1;
1341 }),
1342 )
1343 .build()
1344 .unwrap();
1345 let reader = crate::Reader::builder(&topic)
1346 .with_subscriber(&subscriber)
1347 .build()
1348 .unwrap();
1349 let writer = crate::Writer::new(&topic).unwrap();
1350
1351 let sample = crate::tests::topic::Data::default();
1352 writer.write(&sample).unwrap();
1353
1354 let samples = reader.read().unwrap();
1355 assert_eq!(samples.len(), 1);
1356
1357 assert_eq!(*samples[0], sample);
1358
1359 assert_eq!(*TRIGGERED.lock().unwrap(), Triggered { data_on_readers: 1 });
1360 }
1361
1362 #[test]
1363 fn test_publisher_listener_callbacks() {
1364 let domain_id = crate::tests::domain::unique_id();
1365 let topic_name = crate::tests::topic::unique_name();
1366 let domain = crate::Domain::new(domain_id).unwrap();
1367
1368 let participant = crate::Participant::new(&domain).unwrap();
1369 let topic =
1370 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1371 let publisher = crate::Publisher::builder(&participant)
1372 .with_listener(crate::PublisherListener::new())
1373 .build()
1374 .unwrap();
1375 let reader = crate::Reader::new(&topic).unwrap();
1376 let writer = crate::Writer::builder(&topic)
1377 .with_publisher(&publisher)
1378 .build()
1379 .unwrap();
1380
1381 let sample = crate::tests::topic::Data::default();
1382 writer.write(&sample).unwrap();
1383
1384 let samples = reader.read().unwrap();
1385 assert_eq!(samples.len(), 1);
1386
1387 assert_eq!(*samples[0], sample);
1388 }
1389
1390 #[test]
1391 fn test_reader_listener_callbacks() {
1392 #[derive(Debug, PartialEq)]
1393 struct Triggered {
1394 requested_incompatible_qos: u32,
1395 requested_deadline_missed: bool,
1396 sample_rejected: u32,
1397 data_available: u32,
1398 subscription_matched: u32,
1399 liveliness_changed: u32,
1400 sample_lost: u32,
1401 }
1402
1403 static TRIGGERED: std::sync::Mutex<Triggered> = std::sync::Mutex::new(Triggered {
1404 requested_incompatible_qos: 0,
1405 requested_deadline_missed: false,
1406 sample_rejected: 0,
1407 data_available: 0,
1408 subscription_matched: 0,
1409 liveliness_changed: 0,
1410 sample_lost: 0,
1411 });
1412
1413 let domain_id = crate::tests::domain::unique_id();
1414 let topic_name = crate::tests::topic::unique_name();
1415 let domain = crate::Domain::new(domain_id).unwrap();
1416
1417 let participant = crate::Participant::new(&domain).unwrap();
1418 let qos = crate::QoS::new()
1419 .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1420 let topic = crate::Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1421 .with_qos(&qos)
1422 .build()
1423 .unwrap();
1424
1425 {
1426 let _writer = crate::Writer::new(&topic).unwrap();
1427 let _reader = crate::Reader::builder(&topic)
1428 .with_qos(
1429 &crate::QoS::new().with_durability(crate::qos::policy::Durability::Persistent),
1430 )
1431 .with_listener(
1432 crate::ReaderListener::new().with_requested_incompatible_qos(
1433 |_reader, _metadata| {
1434 TRIGGERED.lock().unwrap().requested_incompatible_qos += 1;
1435 },
1436 ),
1437 )
1438 .build()
1439 .unwrap();
1440 }
1441
1442 {
1443 let qos = crate::QoS::new().with_deadline(crate::qos::policy::Deadline {
1444 period: crate::Duration::from_nanos(1_000_000),
1445 });
1446 let reader = crate::Reader::builder(&topic)
1447 .with_qos(&qos)
1448 .with_listener(crate::ReaderListener::new().with_requested_deadline_missed(
1449 |_reader, _metadata| {
1450 TRIGGERED.lock().unwrap().requested_deadline_missed |= true;
1451 },
1452 ))
1453 .build()
1454 .unwrap();
1455 let writer = crate::Writer::builder(&topic)
1456 .with_qos(&qos)
1457 .build()
1458 .unwrap();
1459
1460 let sample = crate::tests::topic::Data::default();
1461 writer.write(&sample).unwrap();
1462
1463 let samples = reader.take().unwrap();
1464 assert_eq!(samples.len(), 1);
1465 assert_eq!(*samples[0], sample);
1466
1467 while !TRIGGERED.lock().unwrap().requested_deadline_missed {
1468 std::thread::sleep(std::time::Duration::from_nanos(50));
1469 }
1470 }
1471
1472 {
1473 let reader = crate::Reader::builder(&topic)
1474 .with_qos(&crate::QoS::new().with_resource_limits(
1475 crate::qos::policy::ResourceLimits {
1476 max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1477 max_instances: crate::qos::policy::ResourceLimit::Limited(1),
1478 max_samples_per_instance: crate::qos::policy::ResourceLimit::Unlimited,
1479 },
1480 ))
1481 .with_listener(crate::ReaderListener::new().with_sample_rejected(
1482 |_reader, _metadata| {
1483 TRIGGERED.lock().unwrap().sample_rejected += 1;
1484 },
1485 ))
1486 .build()
1487 .unwrap();
1488 let writer = crate::Writer::new(&topic).unwrap();
1489
1490 let sample = crate::tests::topic::Data {
1491 x: 1,
1492 y: 2,
1493 ..crate::tests::topic::Data::default()
1494 };
1495 writer.write(&sample).unwrap();
1496 writer
1497 .write(&crate::tests::topic::Data {
1498 x: 2,
1499 y: 3,
1500 ..crate::tests::topic::Data::default()
1501 })
1502 .unwrap();
1503
1504 let samples = reader.take().unwrap();
1505 assert_eq!(samples.len(), 1);
1506 assert_eq!(*samples[0], sample);
1507 }
1508
1509 {
1510 let reader = crate::Reader::builder(&topic)
1511 .with_listener(
1512 crate::ReaderListener::new()
1513 .with_data_available(|_reader| {
1514 TRIGGERED.lock().unwrap().data_available += 1;
1515 })
1516 .with_subscription_matched(|_reader, _matched| {
1517 TRIGGERED.lock().unwrap().subscription_matched += 1;
1518 })
1519 .with_liveliness_changed(|_reader, _changed| {
1520 TRIGGERED.lock().unwrap().liveliness_changed += 1;
1521 })
1522 .with_sample_lost(|_reader, _metadata| {
1523 TRIGGERED.lock().unwrap().sample_lost += 1;
1524 }),
1525 )
1526 .build()
1527 .unwrap();
1528 let writer = crate::Writer::new(&topic).unwrap();
1529
1530 let sample = crate::tests::topic::Data::default();
1531 writer.write(&sample).unwrap();
1532
1533 let key = sample.as_key();
1534 writer
1535 .unregister_instance_with_timestamp(
1536 &key,
1537 (std::time::SystemTime::now() - std::time::Duration::from_secs(1))
1538 .try_into()
1539 .unwrap(),
1540 )
1541 .unwrap();
1542
1543 let samples = reader.take().unwrap();
1544 assert_eq!(samples.len(), 1);
1545
1546 assert_eq!(*samples[0], sample);
1547
1548 assert_eq!(
1549 *TRIGGERED.lock().unwrap(),
1550 Triggered {
1551 requested_incompatible_qos: 1,
1552 requested_deadline_missed: true,
1553 sample_rejected: 1,
1554 data_available: 2,
1555 subscription_matched: 1,
1556 liveliness_changed: 1,
1557 sample_lost: 1,
1558 }
1559 );
1560 }
1561 }
1562
1563 #[test]
1564 fn test_writer_listener_callbacks() {
1565 #[derive(Debug, PartialEq)]
1566 struct Triggered {
1567 liveliness_lost: bool,
1568 offered_deadline_missed: bool,
1569 offered_incompatible_qos: u32,
1570 publication_matched: u32,
1571 }
1572
1573 static TRIGGERED: std::sync::Mutex<Triggered> = std::sync::Mutex::new(Triggered {
1574 liveliness_lost: false,
1575 offered_deadline_missed: false,
1576 offered_incompatible_qos: 0,
1577 publication_matched: 0,
1578 });
1579
1580 let domain_id = crate::tests::domain::unique_id();
1581 let topic_name = crate::tests::topic::unique_name();
1582 let domain = crate::Domain::new(domain_id).unwrap();
1583
1584 let participant = crate::Participant::new(&domain).unwrap();
1585 let topic =
1586 crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1587
1588 {
1589 let _reader = crate::Reader::builder(&topic)
1590 .with_qos(
1591 &crate::QoS::new().with_durability(crate::qos::policy::Durability::Persistent),
1592 )
1593 .build()
1594 .unwrap();
1595 let _writer = crate::Writer::builder(&topic)
1596 .with_listener(crate::WriterListener::new().with_offered_incompatible_qos(
1597 |_writer, _metadata| {
1598 TRIGGERED.lock().unwrap().offered_incompatible_qos += 1;
1599 },
1600 ))
1601 .build()
1602 .unwrap();
1603 }
1604
1605 {
1606 let qos = crate::QoS::new().with_deadline(crate::qos::policy::Deadline {
1607 period: crate::Duration::from_nanos(1_000_000),
1608 });
1609 let writer = crate::Writer::builder(&topic)
1610 .with_qos(&qos)
1611 .with_listener(crate::WriterListener::new().with_offered_deadline_missed(
1612 |_writer, _metadata| {
1613 TRIGGERED.lock().unwrap().offered_deadline_missed |= true;
1614 },
1615 ))
1616 .build()
1617 .unwrap();
1618 let reader = crate::Reader::builder(&topic)
1619 .with_qos(&qos)
1620 .build()
1621 .unwrap();
1622
1623 let sample = crate::tests::topic::Data::default();
1624 writer.write(&sample).unwrap();
1625
1626 let samples = reader.take().unwrap();
1627 assert_eq!(samples.len(), 1);
1628 assert_eq!(*samples[0], sample);
1629
1630 while !TRIGGERED.lock().unwrap().offered_deadline_missed {
1631 std::thread::sleep(std::time::Duration::from_nanos(50));
1632 }
1633 }
1634
1635 {
1636 let writer = crate::Writer::builder(&topic)
1637 .with_listener(
1638 crate::WriterListener::new()
1639 .with_liveliness_lost(|_writer, _metadata| {
1640 TRIGGERED.lock().unwrap().liveliness_lost |= true;
1641 })
1642 .with_publication_matched(|_writer, _metadata| {
1643 TRIGGERED.lock().unwrap().publication_matched += 1;
1644 }),
1645 )
1646 .with_qos(&crate::QoS::new().with_liveliness(
1647 crate::qos::policy::Liveliness::ManualByParticipant {
1648 lease_duration: crate::Duration::from_nanos(1_000_000),
1649 },
1650 ))
1651 .build()
1652 .unwrap();
1653
1654 let reader = crate::Reader::new(&topic).unwrap();
1655
1656 let sample = crate::tests::topic::Data::default();
1657 writer.write(&sample).unwrap();
1658
1659 let key = sample.as_key();
1660 writer
1661 .unregister_instance_with_timestamp(
1662 &key,
1663 (std::time::SystemTime::now() - std::time::Duration::from_secs(1))
1664 .try_into()
1665 .unwrap(),
1666 )
1667 .unwrap();
1668
1669 let samples = reader.take().unwrap();
1670 assert_eq!(samples.len(), 1);
1671
1672 assert_eq!(*samples[0], sample);
1673
1674 while !TRIGGERED.lock().unwrap().liveliness_lost {
1675 std::thread::sleep(std::time::Duration::from_nanos(50));
1676 }
1677 }
1678
1679 assert_eq!(
1680 *TRIGGERED.lock().unwrap(),
1681 Triggered {
1682 liveliness_lost: true,
1683 offered_deadline_missed: true,
1684 offered_incompatible_qos: 1,
1685 publication_matched: 2,
1686 }
1687 );
1688 }
1689}