Skip to main content

cyclonedds/
listener.rs

1//! Listener types for reacting to [`status events`](crate::Status) on
2//! [`entities`](crate::entity::Entity).
3//!
4//! Each entity type has a corresponding listener struct that holds optional
5//! callbacks for the status events it can produce. Callbacks are plain function
6//! pointers and are registered via chainable `with_*` methods.
7//!
8//! The listener structure mimics the DDS entity hierarchy. [`Listener`] is the
9//! top-level type attached to a [`Participant`](crate::Participant) and
10//! composes [`SubscriberListener`] and [`PublisherListener`]. Entity-specific
11//! listeners ([`ReaderListener`] and [`WriterListener`]) are attached directly
12//! to their respective entities.
13//!
14//! ```text
15//! ╭───────────────────────╮          ╭─────────────────────────────────────╮
16//! │        Entity         │          │               Listener              │
17//! ╰───────────────────────╯          ╰─────────────────────────────────────╯
18//!  
19//!     Domain
20//!       │
21//!  Participant ··················································· Listener
22//!       ├─ Topic<T> ······························ TopicListener<T>  ─┤
23//!       ├─ Subscriber ··························· SubscriberListener ─┤
24//!       │     └─ Reader<T> ··········· ReaderListener<T> ───┘         │
25//!       └─ Publisher ····························· PublisherListener ─┘
26//!            └─ Writer<T> ············ WriterListener<T> ───┘
27//! ```
28//!
29//! Listeners can be set at any level of the entity hierarchy. A listener set on
30//! a [`Participant`](crate::Participant) will have its callbacks inherited by
31//! child entities of that participant.
32//!
33//! Alternatively, a higher-level listener can also be passed directly to the
34//! child entity's builder (as each listener type implements [`AsRef`] for the
35//! listener types below it in the hierarchy). As a result, a single
36//! [`Listener`] can be reused across multiple entity builders without
37//! constructing separate listeners for each level.
38//!
39//! ```
40//! use cyclonedds::Listener;
41//! use cyclonedds::{Domain, Participant, Subscriber};
42//!
43//! let domain = Domain::default();
44//!
45//! // Create a participant listener with the subscriber callbacks configured.
46//! let listener = Listener::new().with_subscriber(|s| {
47//!     s.with_data_on_readers(|subscriber| {
48//!         println!("{subscriber:?} has data");
49//!     })
50//! });
51//!
52//! // Create a participant with the listener.
53//! let participant = Participant::builder(&domain)
54//!     .with_listener(&listener)
55//!     .build()?;
56//!
57//! // Subscribers created under the participant will inherit the `data_on_readers`
58//! // callback.
59//! let subscriber = Subscriber::new(&participant)?;
60//!
61//! // This subscriber is explicitly created with the subscriber portion of the
62//! // `listener`.
63//! let subscriber = Subscriber::builder(&participant)
64//!     .with_listener(&listener)
65//!     .build()?;
66//!
67//! # Ok::<_, cyclonedds::Error>(())
68//! ```
69//!
70//! Each callback fires when its corresponding [`Status`](crate::Status)
71//! condition is triggered. Most callbacks receive a status value from the
72//! [`status`](crate::status) module carrying event-specific detail such as
73//! counts and last-instance handles.
74//!
75//! # Warning
76//!
77//! <div class="warning">
78//!
79//! **Unstable:** The full DDS listener hierarchy, where [`TopicListener<T>`]
80//! composes under [`Listener`] and [`ReaderListener<T>`] and
81//! [`WriterListener<T>`] compose under [`SubscriberListener`] and
82//! [`PublisherListener`], respectively, is not yet implemented.
83//!
84//! The [`Listener`], [`SubscriberListener`], and [`PublisherListener`] may
85//! propagate to many [`Topic<T>`](crate::Topic), [`Reader<T>`](crate::Reader),
86//! and [`Writer<T>`](crate::Writer) that all have different types for `<T>`. As
87//! a result, one of two obvious solutions presents itself:
88//!
89//! - Allow these higher-level types to only have callbacks of effectively
90//!   [`std::any::Any`] and require the callback to attempt to convert. This
91//!   maps the most correctly onto how the API is designed in the specification
92//!   but would greatly complicate the internal dispatching of these listeners.
93//!
94//! - Maintain a typed registry of all the different types of callbacks that are
95//!   attached on the higher-level untyped subscribers and then add code to
96//!   check if the event that fired corresponds to a type whose callback was
97//!   registered. This would work but introduces semantics that do not match the
98//!   other DDS implementations.
99//!
100//! </div>
101//!
102//! # Examples
103//!
104//! ```
105//! use cyclonedds::entity::Entity;
106//! use cyclonedds::{Reader, Topic, Writer};
107//! use cyclonedds::{ReaderListener, TopicListener, WriterListener};
108//! # #[derive(
109//! #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
110//! # )]
111//! # struct Data {
112//! #     x: i32,
113//! # }
114//! # let domain = cyclonedds::Domain::default();
115//! # let participant = cyclonedds::Participant::new(&domain)?;
116//!
117//! let topic = Topic::<Data>::builder(&participant, "Example")
118//!     .with_listener(
119//!         TopicListener::new().with_inconsistent_topic(|topic, inconsistent_topic| {
120//!             println!(
121//!                 "{topic:?} inconsistent topic: {} just encountered, {} encountered in total",
122//!                 inconsistent_topic.total.delta, inconsistent_topic.total.count
123//!             )
124//!         }),
125//!     )
126//!     .build()?;
127//!
128//! let reader = Reader::builder(&topic)
129//!     .with_listener(
130//!         ReaderListener::new()
131//!             .with_subscription_matched(|reader, subscription_matched| {
132//!                 println!("{reader:?} had a subscription match: {subscription_matched:?}")
133//!             })
134//!             .with_sample_lost(|reader, sample_lost| {
135//!                 println!(
136//!                     "{reader:?} lost samples: {} just lost, {} lost in total",
137//!                     sample_lost.total.delta, sample_lost.total.count
138//!                 )
139//!             }),
140//!     )
141//!     .build()?;
142//!
143//! let writer = Writer::builder(&topic)
144//!     .with_listener(
145//!         WriterListener::new()
146//!             .with_publication_matched(|writer, publication_matched| {
147//!                 println!("{writer:?} has a publication match: {publication_matched:?}")
148//!             })
149//!             .with_liveliness_lost(|writer, liveliness_lost| {
150//!                 println!(
151//!                     "{writer:?} liveliness lost: {} just lost, {} lost in total",
152//!                     liveliness_lost.total.delta, liveliness_lost.total.count
153//!                 )
154//!             }),
155//!     )
156//!     .build()?;
157//! # Ok::<_, cyclonedds::Error>(())
158//! ```
159
160use crate::Result;
161use crate::internal::ffi;
162use crate::internal::traits::AsFfi;
163use crate::status::{
164    InconsistentTopic, LivelinessChanged, LivelinessLost, OfferedDeadlineMissed,
165    OfferedIncompatibleQoS, PublicationMatched, RequestedDeadlineMissed, RequestedIncompatibleQoS,
166    SampleLost, SampleRejected, SubscriptionMatched,
167};
168
169/// Listener attached to a [`Participant`](crate::Participant).
170///
171/// In the DDS entity hierarchy this composes [`SubscriberListener`],
172/// [`PublisherListener`], and [`TopicListener`]. When attached to a
173/// participant, entities created under it inherit any of the configured
174/// callbacks that apply to that entity type.
175///
176/// # Examples
177///
178/// ```
179/// use cyclonedds::{Domain, Listener, Participant, Subscriber};
180///
181/// let domain = Domain::default();
182/// let listener = Listener::new().with_subscriber(|subscriber_listener| {
183///     subscriber_listener
184///         .with_data_on_readers(|subscriber| println!("{subscriber:?} has data on readers"))
185/// });
186/// let participant = Participant::builder(&domain)
187///     .with_listener(&listener)
188///     .build()?;
189///
190/// // This subscriber inherits the callbacks set on the `participant` via the `listener`.
191/// let subscriber = Subscriber::new(&participant)?;
192///
193/// // This subscriber will have the subscriber subset associated with the `listener` directly
194/// // applied to it.
195/// let subscriber = Subscriber::builder(&participant)
196///     .with_listener(&listener)
197///     .build()?;
198/// # Ok::<_, cyclonedds::Error>(())
199/// ```
200#[derive(Debug, Default, Clone, Copy)]
201pub struct Listener {
202    // topic: TopicListener<T>,
203    subscriber: SubscriberListener,
204    publisher: PublisherListener,
205}
206
207/// Listener attached to a [`Topic<T>`](crate::Topic<T>).
208#[derive(Debug, Clone, Copy)]
209pub struct TopicListener<T>
210where
211    T: crate::Topicable,
212{
213    inconsistent_topic: Option<fn(&crate::Topic<'_, '_, T>, InconsistentTopic)>,
214}
215
216/// Listener attached to a [`Subscriber`](crate::Subscriber).
217///
218/// <div class="warning">
219///
220/// Currently [`SubscriberListener`] is missing its configuration for composing
221/// a [`ReaderListener<T>`] under this non-generic type. See the [module-level
222/// warning](crate::listener#warning) for more detail.
223///
224/// </div>
225#[derive(Debug, Default, Clone, Copy)]
226pub struct SubscriberListener {
227    data_on_readers: Option<fn(&crate::Subscriber<'_, '_>)>,
228    // ///
229    // pub reader: ReaderListener<T>,
230}
231
232/// Listener attached to a [`Reader<T>`](crate::Reader<T>).
233#[derive(Debug, Clone, Copy)]
234pub struct ReaderListener<T>
235where
236    T: crate::Topicable,
237{
238    sample_lost: Option<fn(&crate::Reader<'_, '_, '_, T>, SampleLost)>,
239    data_available: Option<fn(&crate::Reader<'_, '_, '_, T>)>,
240    sample_rejected: Option<fn(&crate::Reader<'_, '_, '_, T>, SampleRejected)>,
241    liveliness_changed: Option<fn(&crate::Reader<'_, '_, '_, T>, LivelinessChanged)>,
242    requested_deadline_missed: Option<fn(&crate::Reader<'_, '_, '_, T>, RequestedDeadlineMissed)>,
243    requested_incompatible_qos: Option<fn(&crate::Reader<'_, '_, '_, T>, RequestedIncompatibleQoS)>,
244    subscription_matched: Option<fn(&crate::Reader<'_, '_, '_, T>, SubscriptionMatched)>,
245}
246
247/// Listener attached to a [`Publisher`](crate::Publisher).
248///
249/// <div class="warning">
250///
251/// Currently [`PublisherListener`] has no registered callbacks pending a
252/// solution for composing [`WriterListener<T>`] under this non-generic type.
253/// See the [module-level warning](crate::listener#warning) for more detail.
254///
255/// </div>
256#[derive(Debug, Default, Clone, Copy)]
257pub struct PublisherListener {
258    // ///
259    // pub writer: WriterListener<T>,
260}
261
262/// Listener attached to a [`Writer<T>`](crate::Writer<T>).
263#[derive(Debug, Clone, Copy)]
264pub struct WriterListener<T>
265where
266    T: crate::Topicable,
267{
268    liveliness_lost: Option<fn(&crate::Writer<'_, '_, '_, T>, LivelinessLost)>,
269    offered_deadline_missed: Option<fn(&crate::Writer<'_, '_, '_, T>, OfferedDeadlineMissed)>,
270    offered_incompatible_qos: Option<fn(&crate::Writer<'_, '_, '_, T>, OfferedIncompatibleQoS)>,
271    publication_matched: Option<fn(&crate::Writer<'_, '_, '_, T>, PublicationMatched)>,
272}
273
274impl<T> Default for TopicListener<T>
275where
276    T: crate::Topicable,
277{
278    fn default() -> Self {
279        Self {
280            inconsistent_topic: Option::default(),
281        }
282    }
283}
284
285impl<T> Default for ReaderListener<T>
286where
287    T: crate::Topicable,
288{
289    fn default() -> Self {
290        Self {
291            sample_lost: Option::default(),
292            data_available: Option::default(),
293            sample_rejected: Option::default(),
294            liveliness_changed: Option::default(),
295            requested_deadline_missed: Option::default(),
296            requested_incompatible_qos: Option::default(),
297            subscription_matched: Option::default(),
298        }
299    }
300}
301
302impl<T> Default for WriterListener<T>
303where
304    T: crate::Topicable,
305{
306    fn default() -> Self {
307        Self {
308            liveliness_lost: Option::default(),
309            offered_deadline_missed: Option::default(),
310            offered_incompatible_qos: Option::default(),
311            publication_matched: Option::default(),
312        }
313    }
314}
315
316impl Listener {
317    /// Creates a new [`Listener`] with no callbacks registered.
318    ///
319    /// # Examples
320    ///
321    /// ```
322    /// use cyclonedds::Listener;
323    ///
324    /// let listener = Listener::new();
325    /// ```
326    #[must_use]
327    pub fn new() -> Self {
328        Self::default()
329    }
330
331    // ///
332    // pub fn with_topic(mut self, setter: fn(TopicListener<T>) -> TopicListener<T>)
333    // -> Self {     self.topic = setter(self.topic);
334    //     self
335    // }
336
337    /// Configures the [`SubscriberListener`] via a setter callback.
338    ///
339    /// # Examples
340    ///
341    /// ```
342    /// use cyclonedds::Listener;
343    ///
344    /// let listener = Listener::new().with_subscriber(|s| {
345    ///     s.with_data_on_readers(|subscriber| {
346    ///         println!("data available on a reader");
347    ///     })
348    /// });
349    /// ```
350    #[must_use]
351    pub fn with_subscriber(mut self, setter: fn(SubscriberListener) -> SubscriberListener) -> Self {
352        self.subscriber = setter(self.subscriber);
353        self
354    }
355
356    /// Configures the [`PublisherListener`] via a setter callback.
357    ///
358    /// # Examples
359    ///
360    /// <div class="warning">
361    ///
362    /// This example does not compile because the [`PublisherListener`] does not
363    /// have its `with_writer::<T>` setter yet. This is due to the fact that
364    /// the higher-level listeners are untyped in `<T>` but the lower-level
365    /// listeners are typed in `<T>` and a solution for crossing this boundary
366    /// still needs to be worked out.
367    ///
368    /// See the [module-level warning](crate::listener#warning) for more detail.
369    ///
370    /// </div>
371    ///
372    /// ```ignore
373    /// use cyclonedds::Listener;
374    ///
375    /// let listener = Listener::new().with_publisher(|p| {
376    ///     p.with_writer(|w| {
377    ///         w.with_publication_matched(|writer, publication_matched| {
378    ///             println!("{writer:?} has publication match: {publication_matched:?}")
379    ///         })
380    ///     })
381    /// });
382    /// ```
383    #[must_use]
384    pub fn with_publisher(mut self, setter: fn(PublisherListener) -> PublisherListener) -> Self {
385        self.publisher = setter(self.publisher);
386        self
387    }
388
389    #[inline]
390    pub(crate) fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
391        // self.topic.apply_listener_ffi(listener);
392        self.subscriber.apply_listener_ffi(listener);
393        self.publisher.apply_listener_ffi(listener);
394    }
395}
396
397impl AsFfi for Listener {
398    type Target<'a> = Result<ffi::Listener>;
399
400    #[inline]
401    fn as_ffi(&self) -> Self::Target<'_> {
402        ffi::Listener::new().map(|mut listener| {
403            self.apply_listener_ffi(&mut listener);
404            listener
405        })
406    }
407}
408
409impl<T> TopicListener<T>
410where
411    T: crate::Topicable,
412{
413    /// Creates a new [`TopicListener<T>`] with no callbacks registered.
414    ///
415    /// # Examples
416    ///
417    /// ```
418    /// use cyclonedds::TopicListener;
419    /// # #[derive(
420    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
421    /// # )]
422    /// # struct Data {
423    /// #     x: i32,
424    /// # }
425    ///
426    /// let listener = TopicListener::<Data>::new();
427    /// ```
428    #[must_use]
429    pub fn new() -> Self {
430        Self::default()
431    }
432
433    /// Sets a callback for the
434    /// [`InconsistentTopic` status event](crate::Status::InconsistentTopic).
435    ///
436    /// The callback receives an
437    /// [`InconsistentTopic` metadata struct](InconsistentTopic).
438    ///
439    /// Fired when a remote topic is discovered with the same name but an
440    /// incompatible type or [`QoS`](crate::QoS).
441    ///
442    /// # Examples
443    ///
444    /// ```
445    /// use cyclonedds::listener::TopicListener;
446    /// # #[derive(
447    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
448    /// # )]
449    /// # struct Data {
450    /// #     x: i32,
451    /// # }
452    ///
453    /// let listener =
454    ///     TopicListener::<Data>::new().with_inconsistent_topic(|topic, inconsistent_topic| {
455    ///         println!("inconsistent topic: {inconsistent_topic:?}");
456    ///     });
457    /// ```
458    #[must_use]
459    pub fn with_inconsistent_topic(
460        mut self,
461        callback: fn(&crate::Topic<'_, '_, T>, InconsistentTopic),
462    ) -> Self {
463        self.inconsistent_topic = Some(callback);
464        self
465    }
466
467    #[inline]
468    pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
469        if let Some(callback) = self.inconsistent_topic {
470            ffi::dds_listener_set_inconsistent_topic(listener, callback);
471        }
472    }
473}
474
475impl<T> AsFfi for TopicListener<T>
476where
477    T: crate::Topicable,
478{
479    type Target<'a>
480        = Result<ffi::Listener>
481    where
482        T: 'a;
483
484    #[inline]
485    fn as_ffi(&self) -> Self::Target<'_> {
486        ffi::Listener::new().map(|mut listener| {
487            self.apply_listener_ffi(&mut listener);
488            listener
489        })
490    }
491}
492
493impl SubscriberListener {
494    /// Creates a new [`SubscriberListener`] with no callbacks registered.
495    ///
496    /// # Examples
497    ///
498    /// ```
499    /// use cyclonedds::SubscriberListener;
500    ///
501    /// let listener = SubscriberListener::new();
502    /// ```
503    #[must_use]
504    pub fn new() -> Self {
505        Self::default()
506    }
507
508    // ///
509    // pub fn with_reader(mut self, setter: fn(ReaderListener<T>) ->
510    // ReaderListener<T>) -> Self {     self.reader = setter(self.reader);
511    //     self
512    // }
513
514    /// Sets a callback for the [`DataOnReaders` status
515    /// event](crate::Status::DataOnReaders).
516    ///
517    /// Fired when new data is available on one or more readers belonging to
518    /// this subscriber.
519    ///
520    /// # Examples
521    ///
522    /// ```
523    /// use cyclonedds::SubscriberListener;
524    ///
525    /// let listener = SubscriberListener::new().with_data_on_readers(|subscriber| {
526    ///     println!("data available on {subscriber:?}");
527    /// });
528    /// ```
529    #[must_use]
530    pub fn with_data_on_readers(mut self, callback: fn(&crate::Subscriber<'_, '_>)) -> Self {
531        self.data_on_readers = Some(callback);
532        self
533    }
534
535    #[inline]
536    pub(crate) fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
537        if let Some(callback) = self.data_on_readers {
538            ffi::dds_listener_set_data_on_readers(listener, callback);
539        }
540        // self.reader.apply_listener_ffi(listener);
541    }
542}
543
544impl AsFfi for SubscriberListener {
545    type Target<'a> = Result<ffi::Listener>;
546
547    #[inline]
548    fn as_ffi(&self) -> Self::Target<'_> {
549        ffi::Listener::new().map(|mut listener| {
550            self.apply_listener_ffi(&mut listener);
551            listener
552        })
553    }
554}
555
556impl PublisherListener {
557    /// Creates a new [`PublisherListener`] with no callbacks registered.
558    ///
559    /// # Examples
560    ///
561    /// ```
562    /// use cyclonedds::PublisherListener;
563    ///
564    /// let listener = PublisherListener::new();
565    /// ```
566    #[must_use]
567    pub fn new() -> Self {
568        Self::default()
569    }
570
571    // ///
572    // pub fn with_writer(mut self, setter: fn(WriterListener<T>) ->
573    // WriterListener<T>) -> Self {     self.writer = setter(self.writer);
574    //     self
575    // }
576
577    #[inline]
578    pub(crate) const fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
579        let _ = self;
580        let _ = listener;
581        // self.writer.apply_listener_ffi(listener);
582    }
583}
584
585impl AsFfi for PublisherListener {
586    type Target<'a> = Result<ffi::Listener>;
587
588    #[inline]
589    fn as_ffi(&self) -> Self::Target<'_> {
590        ffi::Listener::new().map(|mut listener| {
591            self.apply_listener_ffi(&mut listener);
592            listener
593        })
594    }
595}
596
597impl<T> ReaderListener<T>
598where
599    T: crate::Topicable,
600{
601    /// Creates a new [`ReaderListener<T>`] with no callbacks registered.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// use cyclonedds::listener::ReaderListener;
607    /// # #[derive(
608    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
609    /// # )]
610    /// # struct Data {
611    /// #     x: i32,
612    /// # }
613    ///
614    /// let listener = ReaderListener::<Data>::new();
615    /// ```
616    #[must_use]
617    pub fn new() -> Self {
618        Self::default()
619    }
620
621    /// Sets a callback for the [`SampleLost` status
622    /// event](crate::Status::SampleLost).
623    ///
624    /// The callback receives a [`SampleLost` metadata struct](SampleLost).
625    ///
626    /// Fired when a sample is lost, meaning it was never received by this
627    /// reader due to resource limits or [`QoS`](crate::QoS) constraints.
628    ///
629    /// # Examples
630    ///
631    /// ```
632    /// use cyclonedds::listener::ReaderListener;
633    /// # #[derive(
634    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
635    /// # )]
636    /// # struct Data {
637    /// #     x: i32,
638    /// # }
639    ///
640    /// let listener = ReaderListener::<Data>::new().with_sample_lost(|reader, sample_lost| {
641    ///     println!("samples lost: {}", sample_lost.total.count);
642    /// });
643    /// ```
644    #[must_use]
645    pub fn with_sample_lost(
646        mut self,
647        callback: fn(&crate::Reader<'_, '_, '_, T>, SampleLost),
648    ) -> Self {
649        self.sample_lost = Some(callback);
650        self
651    }
652
653    /// Sets a callback for the [`DataAvailable` status
654    /// event](crate::Status::DataAvailable).
655    ///
656    /// Fired when new data is available to be [`peeked`](crate::Reader::peek),
657    /// [`read`](crate::Reader::read), or [`taken`](crate::Reader::take) from
658    /// this reader.
659    ///
660    /// # Examples
661    ///
662    /// ```
663    /// use cyclonedds::listener::ReaderListener;
664    /// # #[derive(
665    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
666    /// # )]
667    /// # struct Data {
668    /// #     x: i32,
669    /// # }
670    ///
671    /// let listener = ReaderListener::<Data>::new().with_data_available(|reader| {
672    ///     println!("data available on {reader:?}");
673    /// });
674    /// ```
675    #[must_use]
676    pub fn with_data_available(mut self, callback: fn(&crate::Reader<'_, '_, '_, T>)) -> Self {
677        self.data_available = Some(callback);
678        self
679    }
680
681    /// Sets a callback for the
682    /// [`SampleRejected` status event](crate::Status::SampleRejected).
683    ///
684    /// The callback receives a [`SampleRejected` metadata
685    /// struct](SampleRejected).
686    ///
687    /// Fired when an incoming sample is rejected due to
688    /// [`ResourceLimits`](crate::qos::policy::ResourceLimits).
689    ///
690    /// # Examples
691    ///
692    /// ```
693    /// use cyclonedds::listener::ReaderListener;
694    /// # #[derive(
695    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
696    /// # )]
697    /// # struct Data {
698    /// #     x: i32,
699    /// # }
700    /// let listener = ReaderListener::<Data>::new().with_sample_rejected(|reader, status| {
701    ///     println!("sample rejected: {status:?}");
702    /// });
703    /// ```
704    #[must_use]
705    pub fn with_sample_rejected(
706        mut self,
707        callback: fn(&crate::Reader<'_, '_, '_, T>, SampleRejected),
708    ) -> Self {
709        self.sample_rejected = Some(callback);
710        self
711    }
712
713    /// Sets a callback for the
714    /// [`LivelinessChanged` status event](crate::Status::LivelinessChanged).
715    ///
716    /// The callback receives a
717    /// [`LivelinessChanged` metadata struct](LivelinessChanged).
718    ///
719    /// Fired when the [`Liveliness`](crate::qos::policy::Liveliness) of a
720    /// matched writer changes, i.e. a writer becomes active or inactive.
721    ///
722    /// # Examples
723    ///
724    /// ```
725    /// use cyclonedds::listener::ReaderListener;
726    /// # #[derive(
727    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
728    /// # )]
729    /// # struct Data {
730    /// #     x: i32,
731    /// # }
732    ///
733    /// let listener =
734    ///     ReaderListener::<Data>::new().with_liveliness_changed(|reader, liveliness_changed| {
735    ///         println!("active writers: {}", liveliness_changed.alive.count);
736    ///     });
737    /// ```
738    #[must_use]
739    pub fn with_liveliness_changed(
740        mut self,
741        callback: fn(&crate::Reader<'_, '_, '_, T>, LivelinessChanged),
742    ) -> Self {
743        self.liveliness_changed = Some(callback);
744        self
745    }
746
747    /// Sets a callback for the
748    /// [`RequestedDeadlineMissed` status
749    /// event](crate::Status::RequestedDeadlineMissed).
750    ///
751    /// The callback receives a
752    /// [`RequestedDeadlineMissed` metadata struct](RequestedDeadlineMissed).
753    ///
754    /// Fired when a sample is not received within the
755    /// [`Deadline`](crate::qos::policy::Deadline) period offered by a matched
756    /// writer.
757    ///
758    /// # Examples
759    ///
760    /// ```
761    /// use cyclonedds::listener::ReaderListener;
762    /// # #[derive(
763    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
764    /// # )]
765    /// # struct Data {
766    /// #     x: i32,
767    /// # }
768    ///
769    /// let listener = ReaderListener::<Data>::new().with_requested_deadline_missed(
770    ///     |reader, requested_deadline_missed| {
771    ///         println!("deadline missed: {}", requested_deadline_missed.total.count);
772    ///     },
773    /// );
774    /// ```
775    #[must_use]
776    pub fn with_requested_deadline_missed(
777        mut self,
778        callback: fn(&crate::Reader<'_, '_, '_, T>, RequestedDeadlineMissed),
779    ) -> Self {
780        self.requested_deadline_missed = Some(callback);
781        self
782    }
783
784    /// Sets a callback for the
785    /// [`RequestedIncompatibleQoS` status
786    /// event](crate::Status::RequestedIncompatibleQoS).
787    ///
788    /// The callback receives a
789    /// [`RequestedIncompatibleQoS` metadata struct](RequestedIncompatibleQoS).
790    ///
791    /// Fired when a writer is discovered whose offered [`QoS`](crate::QoS) is
792    /// incompatible with this reader's requested [`QoS`](crate::QoS).
793    ///
794    /// # Examples
795    ///
796    /// ```
797    /// use cyclonedds::listener::ReaderListener;
798    /// # #[derive(
799    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
800    /// # )]
801    /// # struct Data {
802    /// #     x: i32,
803    /// # }
804    ///
805    /// let listener = ReaderListener::<Data>::new().with_requested_incompatible_qos(
806    ///     |reader, requested_incompatible_qos| {
807    ///         println!("incompatible QoS: {requested_incompatible_qos:?}");
808    ///     },
809    /// );
810    /// ```
811    #[must_use]
812    pub fn with_requested_incompatible_qos(
813        mut self,
814        callback: fn(&crate::Reader<'_, '_, '_, T>, RequestedIncompatibleQoS),
815    ) -> Self {
816        self.requested_incompatible_qos = Some(callback);
817        self
818    }
819
820    /// Sets a callback for the
821    /// [`SubscriptionMatched` status event](crate::Status::SubscriptionMatched)
822    /// status event.
823    ///
824    /// The callback receives a
825    /// [`SubscriptionMatched` metadata struct](SubscriptionMatched).
826    ///
827    /// Fired when a writer matching this reader's topic and [`QoS`](crate::QoS)
828    /// is discovered or lost.
829    ///
830    /// # Examples
831    ///
832    /// ```
833    /// use cyclonedds::listener::ReaderListener;
834    /// # #[derive(
835    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
836    /// # )]
837    /// # struct Data {
838    /// #     x: i32,
839    /// # }
840    ///
841    /// let listener =
842    ///     ReaderListener::<Data>::new().with_subscription_matched(|reader, subscription_matched| {
843    ///         println!("matched writers: {}", subscription_matched.current.count);
844    ///     });
845    /// ```
846    #[must_use]
847    pub fn with_subscription_matched(
848        mut self,
849        callback: fn(&crate::Reader<'_, '_, '_, T>, SubscriptionMatched),
850    ) -> Self {
851        self.subscription_matched = Some(callback);
852        self
853    }
854
855    #[inline]
856    pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
857        if let Some(callback) = self.sample_lost {
858            ffi::dds_listener_set_sample_lost(listener, callback);
859        }
860        if let Some(callback) = self.data_available {
861            ffi::dds_listener_set_data_available(listener, callback);
862        }
863        if let Some(callback) = self.sample_rejected {
864            ffi::dds_listener_set_sample_rejected(listener, callback);
865        }
866        if let Some(callback) = self.liveliness_changed {
867            ffi::dds_listener_set_liveliness_changed(listener, callback);
868        }
869        if let Some(callback) = self.requested_deadline_missed {
870            ffi::dds_listener_set_requested_deadline_missed(listener, callback);
871        }
872        if let Some(callback) = self.requested_incompatible_qos {
873            ffi::dds_listener_set_requested_incompatible_qos(listener, callback);
874        }
875        if let Some(callback) = self.subscription_matched {
876            ffi::dds_listener_set_subscription_matched(listener, callback);
877        }
878    }
879}
880
881impl<T> AsFfi for ReaderListener<T>
882where
883    T: crate::Topicable,
884{
885    type Target<'a>
886        = Result<ffi::Listener>
887    where
888        T: 'a;
889
890    #[inline]
891    fn as_ffi(&self) -> Self::Target<'_> {
892        ffi::Listener::new().map(|mut listener| {
893            self.apply_listener_ffi(&mut listener);
894            listener
895        })
896    }
897}
898
899impl<T> WriterListener<T>
900where
901    T: crate::Topicable,
902{
903    /// Creates a new [`WriterListener<T>`] with no callbacks registered.
904    ///
905    /// # Examples
906    ///
907    /// ```
908    /// use cyclonedds::TopicListener;
909    /// # #[derive(
910    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
911    /// # )]
912    /// # struct Data {
913    /// #     x: i32,
914    /// # }
915    ///
916    /// let listener = TopicListener::<Data>::new();
917    /// ```
918    #[must_use]
919    pub fn new() -> Self {
920        Self::default()
921    }
922
923    /// Sets a callback for the
924    /// [`LivelinessLost` status event](crate::Status::LivelinessLost).
925    ///
926    /// The callback receives a [`LivelinessLost` metadata
927    /// struct](LivelinessLost).
928    ///
929    /// Fired when the writer fails to meet its
930    /// [`Liveliness`](crate::qos::policy::Liveliness) policy and is considered
931    /// inactive by matched readers.
932    ///
933    /// # Examples
934    ///
935    /// ```
936    /// use cyclonedds::listener::WriterListener;
937    /// # #[derive(
938    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
939    /// # )]
940    /// # struct Data {
941    /// #     x: i32,
942    /// # }
943    ///
944    /// let listener = WriterListener::<Data>::new().with_liveliness_lost(|writer, liveliness_lost| {
945    ///     println!(
946    ///         "{writer:?} liveliness lost: {}",
947    ///         liveliness_lost.total.count
948    ///     );
949    /// });
950    /// ```
951    #[must_use]
952    pub fn with_liveliness_lost(
953        mut self,
954        callback: fn(&crate::Writer<'_, '_, '_, T>, LivelinessLost),
955    ) -> Self {
956        self.liveliness_lost = Some(callback);
957        self
958    }
959
960    /// Sets a callback for the
961    /// [`OfferedDeadlineMissed` status
962    /// event](crate::Status::OfferedDeadlineMissed) status event.
963    ///
964    /// The callback receives an
965    /// [`OfferedDeadlineMissed` metadata struct](OfferedDeadlineMissed).
966    ///
967    /// Fired when the writer fails to write a new sample within its offered
968    /// [`Deadline`](crate::qos::policy::Deadline) period for one or more
969    /// instances.
970    ///
971    /// # Examples
972    ///
973    /// ```
974    /// use cyclonedds::listener::WriterListener;
975    /// # #[derive(
976    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
977    /// # )]
978    /// # struct Data {
979    /// #     x: i32,
980    /// # }
981    ///
982    /// let listener = WriterListener::<Data>::new().with_offered_deadline_missed(
983    ///     |writer, offered_deadline_missed| {
984    ///         println!(
985    ///             "{writer:?} deadline missed: {}",
986    ///             offered_deadline_missed.total.count
987    ///         );
988    ///     },
989    /// );
990    /// ```
991    #[must_use]
992    pub fn with_offered_deadline_missed(
993        mut self,
994        callback: fn(&crate::Writer<'_, '_, '_, T>, OfferedDeadlineMissed),
995    ) -> Self {
996        self.offered_deadline_missed = Some(callback);
997        self
998    }
999
1000    /// Sets a callback for the
1001    /// [`OfferedIncompatibleQoS` status
1002    /// event](crate::Status::OfferedIncompatibleQoS) status event.
1003    ///
1004    /// The callback receives an
1005    /// [`OfferedIncompatibleQoS` metadata struct](OfferedIncompatibleQoS).
1006    ///
1007    /// Fired when a reader is discovered whose requested [`QoS`](crate::QoS) is
1008    /// incompatible with this writer's offered [`QoS`](crate::QoS).
1009    ///
1010    /// # Examples
1011    ///
1012    /// ```
1013    /// use cyclonedds::listener::WriterListener;
1014    /// # #[derive(
1015    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
1016    /// # )]
1017    /// # struct Data {
1018    /// #     x: i32,
1019    /// # }
1020    ///
1021    /// let listener = WriterListener::<Data>::new().with_offered_incompatible_qos(
1022    ///     |writer, offered_incompatible_qos| {
1023    ///         println!("{writer:?} discovered incompatible QoS: {offered_incompatible_qos:?}");
1024    ///     },
1025    /// );
1026    /// ```
1027    #[must_use]
1028    pub fn with_offered_incompatible_qos(
1029        mut self,
1030        callback: fn(&crate::Writer<'_, '_, '_, T>, OfferedIncompatibleQoS),
1031    ) -> Self {
1032        self.offered_incompatible_qos = Some(callback);
1033        self
1034    }
1035
1036    /// Sets a callback for the
1037    /// [`PublicationMatched` status event](crate::Status::PublicationMatched).
1038    ///
1039    /// The callback receives a
1040    /// [`PublicationMatched` metadata struct](PublicationMatched).
1041    ///
1042    /// Fired when a reader matching this writer's topic and [`QoS`](crate::QoS)
1043    /// is discovered.
1044    ///
1045    /// # Examples
1046    ///
1047    /// ```
1048    /// use cyclonedds::listener::WriterListener;
1049    /// # #[derive(
1050    /// #     cyclonedds::Topicable, serde::Serialize, serde::Deserialize, Clone, Debug, Default,
1051    /// # )]
1052    /// # struct Data {
1053    /// #     x: i32,
1054    /// # }
1055    ///
1056    /// let listener = WriterListener::<Data>::new().with_publication_matched(|writer, status| {
1057    ///     println!("{writer:?} matched readers: {}", status.current.count);
1058    /// });
1059    /// ```
1060    #[must_use]
1061    pub fn with_publication_matched(
1062        mut self,
1063        callback: fn(&crate::Writer<'_, '_, '_, T>, PublicationMatched),
1064    ) -> Self
1065    where
1066        T: crate::Topicable,
1067    {
1068        self.publication_matched = Some(callback);
1069        self
1070    }
1071
1072    #[inline]
1073    pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
1074        if let Some(callback) = self.liveliness_lost {
1075            ffi::dds_listener_set_liveliness_lost(listener, callback);
1076        }
1077        if let Some(callback) = self.offered_deadline_missed {
1078            ffi::dds_listener_set_offered_deadline_missed(listener, callback);
1079        }
1080        if let Some(callback) = self.offered_incompatible_qos {
1081            ffi::dds_listener_set_offered_incompatible_qos(listener, callback);
1082        }
1083        if let Some(callback) = self.publication_matched {
1084            ffi::dds_listener_set_publication_matched(listener, callback);
1085        }
1086    }
1087}
1088
1089impl<T> AsFfi for WriterListener<T>
1090where
1091    T: crate::Topicable,
1092{
1093    type Target<'a>
1094        = Result<ffi::Listener>
1095    where
1096        T: 'a;
1097
1098    #[inline]
1099    fn as_ffi(&self) -> Self::Target<'_> {
1100        ffi::Listener::new().map(|mut listener| {
1101            self.apply_listener_ffi(&mut listener);
1102            listener
1103        })
1104    }
1105}
1106
1107impl<T> AsRef<ReaderListener<T>> for ReaderListener<T>
1108where
1109    T: crate::Topicable,
1110{
1111    fn as_ref(&self) -> &ReaderListener<T> {
1112        self
1113    }
1114}
1115impl<T> AsRef<WriterListener<T>> for WriterListener<T>
1116where
1117    T: crate::Topicable,
1118{
1119    fn as_ref(&self) -> &WriterListener<T> {
1120        self
1121    }
1122}
1123impl AsRef<SubscriberListener> for SubscriberListener {
1124    fn as_ref(&self) -> &SubscriberListener {
1125        self
1126    }
1127}
1128impl AsRef<PublisherListener> for PublisherListener {
1129    fn as_ref(&self) -> &PublisherListener {
1130        self
1131    }
1132}
1133impl<T> AsRef<TopicListener<T>> for TopicListener<T>
1134where
1135    T: crate::Topicable,
1136{
1137    fn as_ref(&self) -> &TopicListener<T> {
1138        self
1139    }
1140}
1141impl AsRef<Listener> for Listener {
1142    fn as_ref(&self) -> &Listener {
1143        self
1144    }
1145}
1146
1147// impl<T> AsRef<ReaderListener<T>> for Listener<T> {
1148//     fn as_ref(&self) -> &ReaderListener<T> {
1149//         &self.subscriber.reader
1150//     }
1151// }
1152// impl<T> AsRef<WriterListener<T>> for Listener<T> {
1153//     fn as_ref(&self) -> &WriterListener<T> {
1154//         &self.publisher.writer
1155//     }
1156// }
1157impl AsRef<SubscriberListener> for Listener {
1158    fn as_ref(&self) -> &SubscriberListener {
1159        &self.subscriber
1160    }
1161}
1162impl AsRef<PublisherListener> for Listener {
1163    fn as_ref(&self) -> &PublisherListener {
1164        &self.publisher
1165    }
1166}
1167// impl<T> AsRef<TopicListener<T>> for Listener<T> {
1168//     fn as_ref(&self) -> &TopicListener<T> {
1169//         &self.topic
1170//     }
1171// }
1172
1173// impl<T> AsRef<ReaderListener<T>> for SubscriberListener<T> {
1174//     fn as_ref(&self) -> &ReaderListener<T> {
1175//         &self.reader
1176//     }
1177// }
1178// impl<T> AsRef<WriterListener<T>> for PublisherListener<T> {
1179//     fn as_ref(&self) -> &WriterListener<T> {
1180//         &self.writer
1181//     }
1182// }
1183
1184#[cfg(test)]
1185mod tests {
1186    use super::*;
1187    use crate::Topicable;
1188
1189    fn receive_listener<L>(listener: L)
1190    where
1191        L: AsRef<Listener>,
1192    {
1193        let _ = listener.as_ref();
1194    }
1195
1196    fn receive_topic_listener<L, T>(listener: L)
1197    where
1198        L: AsRef<TopicListener<T>>,
1199        T: crate::Topicable,
1200    {
1201        let _ = listener.as_ref();
1202    }
1203
1204    fn receive_subscriber_listener<L>(listener: L)
1205    where
1206        L: AsRef<SubscriberListener>,
1207    {
1208        let _ = listener.as_ref();
1209    }
1210
1211    fn receive_publisher_listener<L>(listener: L)
1212    where
1213        L: AsRef<PublisherListener>,
1214    {
1215        let _ = listener.as_ref();
1216    }
1217
1218    fn receive_reader_listener<L, T>(listener: L)
1219    where
1220        L: AsRef<ReaderListener<T>>,
1221        T: crate::Topicable,
1222    {
1223        let _ = listener.as_ref();
1224    }
1225
1226    fn receive_writer_listener<L, T>(listener: L)
1227    where
1228        L: AsRef<WriterListener<T>>,
1229        T: crate::Topicable,
1230    {
1231        let _ = listener.as_ref();
1232    }
1233
1234    #[test]
1235    fn test_listener_create() {
1236        let listener = Listener::new()
1237            // .with_topic(|topic| topic.with_inconsistent_topic(|_, _| ()))
1238            .with_subscriber(|subscriber| {
1239                subscriber.with_data_on_readers(|_| ())
1240                // .with_reader(|reader| {
1241                //     reader
1242                //         .with_data_available(|_| ())
1243                //         .with_liveliness_changed(|_, _| ())
1244                //         .with_requested_deadline_missed(|_, _| ())
1245                //         .with_requested_incompatible_qos(|_, _| ())
1246                //         .with_sample_lost(|_, _| ())
1247                //         .with_sample_rejected(|_, _| ())
1248                //         .with_subscription_matched(|_, _| ())
1249                // })
1250            })
1251            .with_publisher(|publisher| {
1252                publisher
1253                //     .with_writer(|writer| {
1254                //     writer
1255                //         .with_liveliness_lost(|_, _| ())
1256                //         .with_offered_deadline_missed(|_, _| ())
1257                //         .with_offered_incompatible_qos(|_, _| ())
1258                //         .with_publication_matched(|_, _| ())
1259                // })
1260            });
1261        let topic_listener =
1262            TopicListener::<crate::tests::topic::Data>::new().with_inconsistent_topic(|_, _| ());
1263        let subscriber_listener = SubscriberListener::new()
1264            .with_data_on_readers(|_| ())
1265        // .with_reader(|reader| {
1266        //     reader
1267        //         .with_data_available(|_| ())
1268        //         .with_liveliness_changed(|_, _| ())
1269        //         .with_requested_deadline_missed(|_, _| ())
1270        //         .with_requested_incompatible_qos(|_, _| ())
1271        //         .with_sample_lost(|_, _| ())
1272        //         .with_sample_rejected(|_, _| ())
1273        //         .with_subscription_matched(|_, _| ())
1274        // })
1275            ;
1276        let publisher_listener =
1277            PublisherListener::new()
1278        // .with_writer(|writer| {
1279        //     writer
1280        //         .with_liveliness_lost(|_, _| ())
1281        //         .with_offered_deadline_missed(|_, _| ())
1282        //         .with_offered_incompatible_qos(|_, _| ())
1283        //         .with_publication_matched(|_, _| ())
1284        // })
1285            ;
1286        let reader_listener = ReaderListener::<crate::tests::topic::Data>::new()
1287            .with_data_available(|_| ())
1288            .with_liveliness_changed(|_, _| ())
1289            .with_requested_deadline_missed(|_, _| ())
1290            .with_requested_incompatible_qos(|_, _| ())
1291            .with_sample_lost(|_, _| ())
1292            .with_sample_rejected(|_, _| ())
1293            .with_subscription_matched(|_, _| ());
1294        let writer_listener = WriterListener::<crate::tests::topic::Data>::new()
1295            .with_liveliness_lost(|_, _| ())
1296            .with_offered_deadline_missed(|_, _| ())
1297            .with_offered_incompatible_qos(|_, _| ())
1298            .with_publication_matched(|_, _| ());
1299
1300        receive_listener(listener);
1301
1302        receive_topic_listener(&topic_listener);
1303        // receive_topic_listener(&listener);
1304
1305        receive_subscriber_listener(subscriber_listener);
1306        receive_subscriber_listener(listener);
1307
1308        receive_publisher_listener(publisher_listener);
1309        receive_publisher_listener(listener);
1310
1311        receive_reader_listener(&reader_listener);
1312        // receive_reader_listener(&subscriber_listener);
1313        // receive_reader_listener(&listener);
1314
1315        receive_writer_listener(&writer_listener);
1316        // receive_writer_listener(&publisher_listener);
1317        // receive_writer_listener(&listener);
1318    }
1319
1320    #[test]
1321    fn test_subscriber_listener_callbacks() {
1322        #[derive(Debug, PartialEq)]
1323        struct Triggered {
1324            data_on_readers: u32,
1325        }
1326
1327        static TRIGGERED: std::sync::Mutex<Triggered> =
1328            std::sync::Mutex::new(Triggered { data_on_readers: 0 });
1329
1330        let domain_id = crate::tests::domain::unique_id();
1331        let topic_name = crate::tests::topic::unique_name();
1332        let domain = crate::Domain::new(domain_id).unwrap();
1333
1334        let participant = crate::Participant::new(&domain).unwrap();
1335        let topic =
1336            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1337        let subscriber = crate::Subscriber::builder(&participant)
1338            .with_listener(
1339                crate::SubscriberListener::new().with_data_on_readers(|_subscriber| {
1340                    TRIGGERED.lock().unwrap().data_on_readers += 1;
1341                }),
1342            )
1343            .build()
1344            .unwrap();
1345        let reader = crate::Reader::builder(&topic)
1346            .with_subscriber(&subscriber)
1347            .build()
1348            .unwrap();
1349        let writer = crate::Writer::new(&topic).unwrap();
1350
1351        let sample = crate::tests::topic::Data::default();
1352        writer.write(&sample).unwrap();
1353
1354        let samples = reader.read().unwrap();
1355        assert_eq!(samples.len(), 1);
1356
1357        assert_eq!(*samples[0], sample);
1358
1359        assert_eq!(*TRIGGERED.lock().unwrap(), Triggered { data_on_readers: 1 });
1360    }
1361
1362    #[test]
1363    fn test_publisher_listener_callbacks() {
1364        let domain_id = crate::tests::domain::unique_id();
1365        let topic_name = crate::tests::topic::unique_name();
1366        let domain = crate::Domain::new(domain_id).unwrap();
1367
1368        let participant = crate::Participant::new(&domain).unwrap();
1369        let topic =
1370            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1371        let publisher = crate::Publisher::builder(&participant)
1372            .with_listener(crate::PublisherListener::new())
1373            .build()
1374            .unwrap();
1375        let reader = crate::Reader::new(&topic).unwrap();
1376        let writer = crate::Writer::builder(&topic)
1377            .with_publisher(&publisher)
1378            .build()
1379            .unwrap();
1380
1381        let sample = crate::tests::topic::Data::default();
1382        writer.write(&sample).unwrap();
1383
1384        let samples = reader.read().unwrap();
1385        assert_eq!(samples.len(), 1);
1386
1387        assert_eq!(*samples[0], sample);
1388    }
1389
1390    #[test]
1391    fn test_reader_listener_callbacks() {
1392        #[derive(Debug, PartialEq)]
1393        struct Triggered {
1394            requested_incompatible_qos: u32,
1395            requested_deadline_missed: bool,
1396            sample_rejected: u32,
1397            data_available: u32,
1398            subscription_matched: u32,
1399            liveliness_changed: u32,
1400            sample_lost: u32,
1401        }
1402
1403        static TRIGGERED: std::sync::Mutex<Triggered> = std::sync::Mutex::new(Triggered {
1404            requested_incompatible_qos: 0,
1405            requested_deadline_missed: false,
1406            sample_rejected: 0,
1407            data_available: 0,
1408            subscription_matched: 0,
1409            liveliness_changed: 0,
1410            sample_lost: 0,
1411        });
1412
1413        let domain_id = crate::tests::domain::unique_id();
1414        let topic_name = crate::tests::topic::unique_name();
1415        let domain = crate::Domain::new(domain_id).unwrap();
1416
1417        let participant = crate::Participant::new(&domain).unwrap();
1418        let qos = crate::QoS::new()
1419            .with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
1420        let topic = crate::Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
1421            .with_qos(&qos)
1422            .build()
1423            .unwrap();
1424
1425        {
1426            let _writer = crate::Writer::new(&topic).unwrap();
1427            let _reader = crate::Reader::builder(&topic)
1428                .with_qos(
1429                    &crate::QoS::new().with_durability(crate::qos::policy::Durability::Persistent),
1430                )
1431                .with_listener(
1432                    crate::ReaderListener::new().with_requested_incompatible_qos(
1433                        |_reader, _metadata| {
1434                            TRIGGERED.lock().unwrap().requested_incompatible_qos += 1;
1435                        },
1436                    ),
1437                )
1438                .build()
1439                .unwrap();
1440        }
1441
1442        {
1443            let qos = crate::QoS::new().with_deadline(crate::qos::policy::Deadline {
1444                period: crate::Duration::from_nanos(1_000_000),
1445            });
1446            let reader = crate::Reader::builder(&topic)
1447                .with_qos(&qos)
1448                .with_listener(crate::ReaderListener::new().with_requested_deadline_missed(
1449                    |_reader, _metadata| {
1450                        TRIGGERED.lock().unwrap().requested_deadline_missed |= true;
1451                    },
1452                ))
1453                .build()
1454                .unwrap();
1455            let writer = crate::Writer::builder(&topic)
1456                .with_qos(&qos)
1457                .build()
1458                .unwrap();
1459
1460            let sample = crate::tests::topic::Data::default();
1461            writer.write(&sample).unwrap();
1462
1463            let samples = reader.take().unwrap();
1464            assert_eq!(samples.len(), 1);
1465            assert_eq!(*samples[0], sample);
1466
1467            while !TRIGGERED.lock().unwrap().requested_deadline_missed {
1468                std::thread::sleep(std::time::Duration::from_nanos(50));
1469            }
1470        }
1471
1472        {
1473            let reader = crate::Reader::builder(&topic)
1474                .with_qos(&crate::QoS::new().with_resource_limits(
1475                    crate::qos::policy::ResourceLimits {
1476                        max_samples: crate::qos::policy::ResourceLimit::Unlimited,
1477                        max_instances: crate::qos::policy::ResourceLimit::Limited(1),
1478                        max_samples_per_instance: crate::qos::policy::ResourceLimit::Unlimited,
1479                    },
1480                ))
1481                .with_listener(crate::ReaderListener::new().with_sample_rejected(
1482                    |_reader, _metadata| {
1483                        TRIGGERED.lock().unwrap().sample_rejected += 1;
1484                    },
1485                ))
1486                .build()
1487                .unwrap();
1488            let writer = crate::Writer::new(&topic).unwrap();
1489
1490            let sample = crate::tests::topic::Data {
1491                x: 1,
1492                y: 2,
1493                ..crate::tests::topic::Data::default()
1494            };
1495            writer.write(&sample).unwrap();
1496            writer
1497                .write(&crate::tests::topic::Data {
1498                    x: 2,
1499                    y: 3,
1500                    ..crate::tests::topic::Data::default()
1501                })
1502                .unwrap();
1503
1504            let samples = reader.take().unwrap();
1505            assert_eq!(samples.len(), 1);
1506            assert_eq!(*samples[0], sample);
1507        }
1508
1509        {
1510            let reader = crate::Reader::builder(&topic)
1511                .with_listener(
1512                    crate::ReaderListener::new()
1513                        .with_data_available(|_reader| {
1514                            TRIGGERED.lock().unwrap().data_available += 1;
1515                        })
1516                        .with_subscription_matched(|_reader, _matched| {
1517                            TRIGGERED.lock().unwrap().subscription_matched += 1;
1518                        })
1519                        .with_liveliness_changed(|_reader, _changed| {
1520                            TRIGGERED.lock().unwrap().liveliness_changed += 1;
1521                        })
1522                        .with_sample_lost(|_reader, _metadata| {
1523                            TRIGGERED.lock().unwrap().sample_lost += 1;
1524                        }),
1525                )
1526                .build()
1527                .unwrap();
1528            let writer = crate::Writer::new(&topic).unwrap();
1529
1530            let sample = crate::tests::topic::Data::default();
1531            writer.write(&sample).unwrap();
1532
1533            let key = sample.as_key();
1534            writer
1535                .unregister_instance_with_timestamp(
1536                    &key,
1537                    (std::time::SystemTime::now() - std::time::Duration::from_secs(1))
1538                        .try_into()
1539                        .unwrap(),
1540                )
1541                .unwrap();
1542
1543            let samples = reader.take().unwrap();
1544            assert_eq!(samples.len(), 1);
1545
1546            assert_eq!(*samples[0], sample);
1547
1548            assert_eq!(
1549                *TRIGGERED.lock().unwrap(),
1550                Triggered {
1551                    requested_incompatible_qos: 1,
1552                    requested_deadline_missed: true,
1553                    sample_rejected: 1,
1554                    data_available: 2,
1555                    subscription_matched: 1,
1556                    liveliness_changed: 1,
1557                    sample_lost: 1,
1558                }
1559            );
1560        }
1561    }
1562
1563    #[test]
1564    fn test_writer_listener_callbacks() {
1565        #[derive(Debug, PartialEq)]
1566        struct Triggered {
1567            liveliness_lost: bool,
1568            offered_deadline_missed: bool,
1569            offered_incompatible_qos: u32,
1570            publication_matched: u32,
1571        }
1572
1573        static TRIGGERED: std::sync::Mutex<Triggered> = std::sync::Mutex::new(Triggered {
1574            liveliness_lost: false,
1575            offered_deadline_missed: false,
1576            offered_incompatible_qos: 0,
1577            publication_matched: 0,
1578        });
1579
1580        let domain_id = crate::tests::domain::unique_id();
1581        let topic_name = crate::tests::topic::unique_name();
1582        let domain = crate::Domain::new(domain_id).unwrap();
1583
1584        let participant = crate::Participant::new(&domain).unwrap();
1585        let topic =
1586            crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
1587
1588        {
1589            let _reader = crate::Reader::builder(&topic)
1590                .with_qos(
1591                    &crate::QoS::new().with_durability(crate::qos::policy::Durability::Persistent),
1592                )
1593                .build()
1594                .unwrap();
1595            let _writer = crate::Writer::builder(&topic)
1596                .with_listener(crate::WriterListener::new().with_offered_incompatible_qos(
1597                    |_writer, _metadata| {
1598                        TRIGGERED.lock().unwrap().offered_incompatible_qos += 1;
1599                    },
1600                ))
1601                .build()
1602                .unwrap();
1603        }
1604
1605        {
1606            let qos = crate::QoS::new().with_deadline(crate::qos::policy::Deadline {
1607                period: crate::Duration::from_nanos(1_000_000),
1608            });
1609            let writer = crate::Writer::builder(&topic)
1610                .with_qos(&qos)
1611                .with_listener(crate::WriterListener::new().with_offered_deadline_missed(
1612                    |_writer, _metadata| {
1613                        TRIGGERED.lock().unwrap().offered_deadline_missed |= true;
1614                    },
1615                ))
1616                .build()
1617                .unwrap();
1618            let reader = crate::Reader::builder(&topic)
1619                .with_qos(&qos)
1620                .build()
1621                .unwrap();
1622
1623            let sample = crate::tests::topic::Data::default();
1624            writer.write(&sample).unwrap();
1625
1626            let samples = reader.take().unwrap();
1627            assert_eq!(samples.len(), 1);
1628            assert_eq!(*samples[0], sample);
1629
1630            while !TRIGGERED.lock().unwrap().offered_deadline_missed {
1631                std::thread::sleep(std::time::Duration::from_nanos(50));
1632            }
1633        }
1634
1635        {
1636            let writer = crate::Writer::builder(&topic)
1637                .with_listener(
1638                    crate::WriterListener::new()
1639                        .with_liveliness_lost(|_writer, _metadata| {
1640                            TRIGGERED.lock().unwrap().liveliness_lost |= true;
1641                        })
1642                        .with_publication_matched(|_writer, _metadata| {
1643                            TRIGGERED.lock().unwrap().publication_matched += 1;
1644                        }),
1645                )
1646                .with_qos(&crate::QoS::new().with_liveliness(
1647                    crate::qos::policy::Liveliness::ManualByParticipant {
1648                        lease_duration: crate::Duration::from_nanos(1_000_000),
1649                    },
1650                ))
1651                .build()
1652                .unwrap();
1653
1654            let reader = crate::Reader::new(&topic).unwrap();
1655
1656            let sample = crate::tests::topic::Data::default();
1657            writer.write(&sample).unwrap();
1658
1659            let key = sample.as_key();
1660            writer
1661                .unregister_instance_with_timestamp(
1662                    &key,
1663                    (std::time::SystemTime::now() - std::time::Duration::from_secs(1))
1664                        .try_into()
1665                        .unwrap(),
1666                )
1667                .unwrap();
1668
1669            let samples = reader.take().unwrap();
1670            assert_eq!(samples.len(), 1);
1671
1672            assert_eq!(*samples[0], sample);
1673
1674            while !TRIGGERED.lock().unwrap().liveliness_lost {
1675                std::thread::sleep(std::time::Duration::from_nanos(50));
1676            }
1677        }
1678
1679        assert_eq!(
1680            *TRIGGERED.lock().unwrap(),
1681            Triggered {
1682                liveliness_lost: true,
1683                offered_deadline_missed: true,
1684                offered_incompatible_qos: 1,
1685                publication_matched: 2,
1686            }
1687        );
1688    }
1689}