1extern crate alloc;
55
56use alloc::boxed::Box;
57
58use crate::entity::StatusMask;
59use crate::instance_handle::InstanceHandle;
60use crate::psm_constants::status as status_bits;
61use crate::status::{
62 InconsistentTopicStatus, LivelinessChangedStatus, LivelinessLostStatus,
63 OfferedDeadlineMissedStatus, OfferedIncompatibleQosStatus, PublicationMatchedStatus,
64 RequestedDeadlineMissedStatus, RequestedIncompatibleQosStatus, SampleLostStatus,
65 SampleRejectedStatus, SubscriptionMatchedStatus,
66};
67
68pub trait TopicListener: Send + Sync {
77 fn on_inconsistent_topic(&self, _topic: InstanceHandle, _status: InconsistentTopicStatus) {}
81}
82
83pub trait DataWriterListener: Send + Sync {
92 fn on_offered_deadline_missed(
95 &self,
96 _writer: InstanceHandle,
97 _status: OfferedDeadlineMissedStatus,
98 ) {
99 }
100
101 fn on_offered_incompatible_qos(
104 &self,
105 _writer: InstanceHandle,
106 _status: OfferedIncompatibleQosStatus,
107 ) {
108 }
109
110 fn on_liveliness_lost(&self, _writer: InstanceHandle, _status: LivelinessLostStatus) {}
113
114 fn on_publication_matched(&self, _writer: InstanceHandle, _status: PublicationMatchedStatus) {}
117}
118
119pub trait PublisherListener: Send + Sync {
129 fn on_offered_deadline_missed(
131 &self,
132 _writer: InstanceHandle,
133 _status: OfferedDeadlineMissedStatus,
134 ) {
135 }
136
137 fn on_offered_incompatible_qos(
139 &self,
140 _writer: InstanceHandle,
141 _status: OfferedIncompatibleQosStatus,
142 ) {
143 }
144
145 fn on_liveliness_lost(&self, _writer: InstanceHandle, _status: LivelinessLostStatus) {}
147
148 fn on_publication_matched(&self, _writer: InstanceHandle, _status: PublicationMatchedStatus) {}
150}
151
152pub trait DataReaderListener: Send + Sync {
161 fn on_data_available(&self, _reader: InstanceHandle) {}
163
164 fn on_sample_lost(&self, _reader: InstanceHandle, _status: SampleLostStatus) {}
167
168 fn on_sample_rejected(&self, _reader: InstanceHandle, _status: SampleRejectedStatus) {}
171
172 fn on_requested_deadline_missed(
175 &self,
176 _reader: InstanceHandle,
177 _status: RequestedDeadlineMissedStatus,
178 ) {
179 }
180
181 fn on_requested_incompatible_qos(
184 &self,
185 _reader: InstanceHandle,
186 _status: RequestedIncompatibleQosStatus,
187 ) {
188 }
189
190 fn on_liveliness_changed(&self, _reader: InstanceHandle, _status: LivelinessChangedStatus) {}
193
194 fn on_subscription_matched(&self, _reader: InstanceHandle, _status: SubscriptionMatchedStatus) {
196 }
197}
198
199pub trait SubscriberListener: Send + Sync {
207 fn on_data_on_readers(&self, _subscriber: InstanceHandle) {}
210
211 fn on_data_available(&self, _reader: InstanceHandle) {}
213
214 fn on_sample_lost(&self, _reader: InstanceHandle, _status: SampleLostStatus) {}
216
217 fn on_sample_rejected(&self, _reader: InstanceHandle, _status: SampleRejectedStatus) {}
219
220 fn on_requested_deadline_missed(
222 &self,
223 _reader: InstanceHandle,
224 _status: RequestedDeadlineMissedStatus,
225 ) {
226 }
227
228 fn on_requested_incompatible_qos(
230 &self,
231 _reader: InstanceHandle,
232 _status: RequestedIncompatibleQosStatus,
233 ) {
234 }
235
236 fn on_liveliness_changed(&self, _reader: InstanceHandle, _status: LivelinessChangedStatus) {}
238
239 fn on_subscription_matched(&self, _reader: InstanceHandle, _status: SubscriptionMatchedStatus) {
241 }
242}
243
244pub trait DomainParticipantListener: Send + Sync {
262 fn on_inconsistent_topic(&self, _topic: InstanceHandle, _status: InconsistentTopicStatus) {}
266
267 fn on_offered_deadline_missed(
271 &self,
272 _writer: InstanceHandle,
273 _status: OfferedDeadlineMissedStatus,
274 ) {
275 }
276
277 fn on_offered_incompatible_qos(
279 &self,
280 _writer: InstanceHandle,
281 _status: OfferedIncompatibleQosStatus,
282 ) {
283 }
284
285 fn on_liveliness_lost(&self, _writer: InstanceHandle, _status: LivelinessLostStatus) {}
287
288 fn on_publication_matched(&self, _writer: InstanceHandle, _status: PublicationMatchedStatus) {}
290
291 fn on_data_on_readers(&self, _subscriber: InstanceHandle) {}
295
296 fn on_data_available(&self, _reader: InstanceHandle) {}
298
299 fn on_sample_lost(&self, _reader: InstanceHandle, _status: SampleLostStatus) {}
301
302 fn on_sample_rejected(&self, _reader: InstanceHandle, _status: SampleRejectedStatus) {}
304
305 fn on_requested_deadline_missed(
307 &self,
308 _reader: InstanceHandle,
309 _status: RequestedDeadlineMissedStatus,
310 ) {
311 }
312
313 fn on_requested_incompatible_qos(
315 &self,
316 _reader: InstanceHandle,
317 _status: RequestedIncompatibleQosStatus,
318 ) {
319 }
320
321 fn on_liveliness_changed(&self, _reader: InstanceHandle, _status: LivelinessChangedStatus) {}
323
324 fn on_subscription_matched(&self, _reader: InstanceHandle, _status: SubscriptionMatchedStatus) {
326 }
327}
328
329pub type BoxedTopicListener = Box<dyn TopicListener>;
336pub type BoxedDataWriterListener = Box<dyn DataWriterListener>;
338pub type BoxedPublisherListener = Box<dyn PublisherListener>;
340pub type BoxedDataReaderListener = Box<dyn DataReaderListener>;
342pub type BoxedSubscriberListener = Box<dyn SubscriberListener>;
344pub type BoxedDomainParticipantListener = Box<dyn DomainParticipantListener>;
346
347pub type ArcTopicListener = alloc::sync::Arc<dyn TopicListener>;
352pub type ArcDataWriterListener = alloc::sync::Arc<dyn DataWriterListener>;
354pub type ArcPublisherListener = alloc::sync::Arc<dyn PublisherListener>;
356pub type ArcDataReaderListener = alloc::sync::Arc<dyn DataReaderListener>;
358pub type ArcSubscriberListener = alloc::sync::Arc<dyn SubscriberListener>;
360pub type ArcDomainParticipantListener = alloc::sync::Arc<dyn DomainParticipantListener>;
362
363#[inline]
371#[must_use]
372pub fn listener_handles(listener_present: bool, mask: StatusMask, status_bit: u32) -> bool {
373 listener_present && (mask & status_bit) != 0
374}
375
376#[must_use]
380pub fn status_bit_for_inconsistent_topic() -> u32 {
381 status_bits::INCONSISTENT_TOPIC
382}
383
384#[cfg(test)]
385#[allow(clippy::expect_used, clippy::unwrap_used)]
386mod tests {
387 use super::*;
388 use core::sync::atomic::{AtomicU32, Ordering};
389
390 #[test]
393 fn topic_listener_is_object_safe() {
394 struct Counter(AtomicU32);
395 impl TopicListener for Counter {
396 fn on_inconsistent_topic(
397 &self,
398 _topic: InstanceHandle,
399 _status: InconsistentTopicStatus,
400 ) {
401 self.0.fetch_add(1, Ordering::Relaxed);
402 }
403 }
404 let _: BoxedTopicListener = Box::new(Counter(AtomicU32::new(0)));
405 }
406
407 #[test]
408 fn datawriter_listener_is_object_safe() {
409 struct L;
410 impl DataWriterListener for L {}
411 let _: BoxedDataWriterListener = Box::new(L);
412 }
413
414 #[test]
415 fn publisher_listener_is_object_safe() {
416 struct L;
417 impl PublisherListener for L {}
418 let _: BoxedPublisherListener = Box::new(L);
419 }
420
421 #[test]
422 fn datareader_listener_is_object_safe() {
423 struct L;
424 impl DataReaderListener for L {}
425 let _: BoxedDataReaderListener = Box::new(L);
426 }
427
428 #[test]
429 fn subscriber_listener_is_object_safe() {
430 struct L;
431 impl SubscriberListener for L {}
432 let _: BoxedSubscriberListener = Box::new(L);
433 }
434
435 #[test]
436 fn participant_listener_is_object_safe() {
437 struct L;
438 impl DomainParticipantListener for L {}
439 let _: BoxedDomainParticipantListener = Box::new(L);
440 }
441
442 #[test]
445 fn default_callbacks_do_not_panic() {
446 struct Noop;
448 impl TopicListener for Noop {}
449 impl DataWriterListener for Noop {}
450 impl PublisherListener for Noop {}
451 impl DataReaderListener for Noop {}
452 impl SubscriberListener for Noop {}
453 impl DomainParticipantListener for Noop {}
454 let _: BoxedDomainParticipantListener = Box::new(Noop);
457 }
458
459 #[test]
460 fn listener_handles_respects_mask_and_presence() {
461 let bit = status_bit_for_inconsistent_topic();
462 assert!(listener_handles(true, bit, bit));
463 assert!(!listener_handles(false, bit, bit));
464 assert!(!listener_handles(true, 0, bit));
465 assert!(!listener_handles(true, status_bits::SAMPLE_LOST, bit));
467 }
468
469 #[test]
470 fn status_bit_for_inconsistent_topic_matches_psm() {
471 assert_eq!(
472 status_bit_for_inconsistent_topic(),
473 status_bits::INCONSISTENT_TOPIC
474 );
475 }
476
477 #[test]
478 fn all_listener_traits_default_methods_invoke_safely() {
479 struct Noop;
483 impl TopicListener for Noop {}
484 impl DataWriterListener for Noop {}
485 impl PublisherListener for Noop {}
486 impl DataReaderListener for Noop {}
487 impl SubscriberListener for Noop {}
488 impl DomainParticipantListener for Noop {}
489
490 let h = InstanceHandle::from_raw(1);
491 let n = Noop;
492 TopicListener::on_inconsistent_topic(&n, h, InconsistentTopicStatus::default());
493
494 DataWriterListener::on_offered_deadline_missed(
495 &n,
496 h,
497 OfferedDeadlineMissedStatus::default(),
498 );
499 DataWriterListener::on_offered_incompatible_qos(
500 &n,
501 h,
502 OfferedIncompatibleQosStatus::default(),
503 );
504 DataWriterListener::on_liveliness_lost(&n, h, LivelinessLostStatus::default());
505 DataWriterListener::on_publication_matched(&n, h, PublicationMatchedStatus::default());
506
507 PublisherListener::on_offered_deadline_missed(
508 &n,
509 h,
510 OfferedDeadlineMissedStatus::default(),
511 );
512 PublisherListener::on_offered_incompatible_qos(
513 &n,
514 h,
515 OfferedIncompatibleQosStatus::default(),
516 );
517 PublisherListener::on_liveliness_lost(&n, h, LivelinessLostStatus::default());
518 PublisherListener::on_publication_matched(&n, h, PublicationMatchedStatus::default());
519
520 DataReaderListener::on_data_available(&n, h);
521 DataReaderListener::on_sample_lost(&n, h, SampleLostStatus::default());
522 DataReaderListener::on_sample_rejected(&n, h, SampleRejectedStatus::default());
523 DataReaderListener::on_requested_deadline_missed(
524 &n,
525 h,
526 RequestedDeadlineMissedStatus::default(),
527 );
528 DataReaderListener::on_requested_incompatible_qos(
529 &n,
530 h,
531 RequestedIncompatibleQosStatus::default(),
532 );
533 DataReaderListener::on_liveliness_changed(&n, h, LivelinessChangedStatus::default());
534 DataReaderListener::on_subscription_matched(&n, h, SubscriptionMatchedStatus::default());
535
536 SubscriberListener::on_data_on_readers(&n, h);
537 SubscriberListener::on_data_available(&n, h);
538 SubscriberListener::on_sample_lost(&n, h, SampleLostStatus::default());
539 SubscriberListener::on_sample_rejected(&n, h, SampleRejectedStatus::default());
540 SubscriberListener::on_requested_deadline_missed(
541 &n,
542 h,
543 RequestedDeadlineMissedStatus::default(),
544 );
545 SubscriberListener::on_requested_incompatible_qos(
546 &n,
547 h,
548 RequestedIncompatibleQosStatus::default(),
549 );
550 SubscriberListener::on_liveliness_changed(&n, h, LivelinessChangedStatus::default());
551 SubscriberListener::on_subscription_matched(&n, h, SubscriptionMatchedStatus::default());
552
553 DomainParticipantListener::on_inconsistent_topic(&n, h, InconsistentTopicStatus::default());
554 DomainParticipantListener::on_offered_deadline_missed(
555 &n,
556 h,
557 OfferedDeadlineMissedStatus::default(),
558 );
559 DomainParticipantListener::on_offered_incompatible_qos(
560 &n,
561 h,
562 OfferedIncompatibleQosStatus::default(),
563 );
564 DomainParticipantListener::on_liveliness_lost(&n, h, LivelinessLostStatus::default());
565 DomainParticipantListener::on_publication_matched(
566 &n,
567 h,
568 PublicationMatchedStatus::default(),
569 );
570 DomainParticipantListener::on_data_on_readers(&n, h);
571 DomainParticipantListener::on_data_available(&n, h);
572 DomainParticipantListener::on_sample_lost(&n, h, SampleLostStatus::default());
573 DomainParticipantListener::on_sample_rejected(&n, h, SampleRejectedStatus::default());
574 DomainParticipantListener::on_requested_deadline_missed(
575 &n,
576 h,
577 RequestedDeadlineMissedStatus::default(),
578 );
579 DomainParticipantListener::on_requested_incompatible_qos(
580 &n,
581 h,
582 RequestedIncompatibleQosStatus::default(),
583 );
584 DomainParticipantListener::on_liveliness_changed(&n, h, LivelinessChangedStatus::default());
585 DomainParticipantListener::on_subscription_matched(
586 &n,
587 h,
588 SubscriptionMatchedStatus::default(),
589 );
590 }
591
592 #[test]
593 fn datareader_listener_call_runs_default_methods() {
594 struct Counters {
595 avail: AtomicU32,
596 lost: AtomicU32,
597 }
598 impl DataReaderListener for Counters {
599 fn on_data_available(&self, _r: InstanceHandle) {
600 self.avail.fetch_add(1, Ordering::Relaxed);
601 }
602 fn on_sample_lost(&self, _r: InstanceHandle, _s: SampleLostStatus) {
603 self.lost.fetch_add(1, Ordering::Relaxed);
604 }
605 }
606 let c = Counters {
607 avail: AtomicU32::new(0),
608 lost: AtomicU32::new(0),
609 };
610 let h = InstanceHandle::from_raw(1);
611 c.on_data_available(h);
612 c.on_data_available(h);
613 c.on_sample_lost(h, SampleLostStatus::default());
614 c.on_subscription_matched(h, SubscriptionMatchedStatus::default());
617 assert_eq!(c.avail.load(Ordering::Relaxed), 2);
618 assert_eq!(c.lost.load(Ordering::Relaxed), 1);
619 }
620}