extern crate alloc;
use alloc::boxed::Box;
use crate::entity::StatusMask;
use crate::instance_handle::InstanceHandle;
use crate::psm_constants::status as status_bits;
use crate::status::{
InconsistentTopicStatus, LivelinessChangedStatus, LivelinessLostStatus,
OfferedDeadlineMissedStatus, OfferedIncompatibleQosStatus, PublicationMatchedStatus,
RequestedDeadlineMissedStatus, RequestedIncompatibleQosStatus, SampleLostStatus,
SampleRejectedStatus, SubscriptionMatchedStatus,
};
pub trait TopicListener: Send + Sync {
fn on_inconsistent_topic(&self, _topic: InstanceHandle, _status: InconsistentTopicStatus) {}
}
pub trait DataWriterListener: Send + Sync {
fn on_offered_deadline_missed(
&self,
_writer: InstanceHandle,
_status: OfferedDeadlineMissedStatus,
) {
}
fn on_offered_incompatible_qos(
&self,
_writer: InstanceHandle,
_status: OfferedIncompatibleQosStatus,
) {
}
fn on_liveliness_lost(&self, _writer: InstanceHandle, _status: LivelinessLostStatus) {}
fn on_publication_matched(&self, _writer: InstanceHandle, _status: PublicationMatchedStatus) {}
}
pub trait PublisherListener: Send + Sync {
fn on_offered_deadline_missed(
&self,
_writer: InstanceHandle,
_status: OfferedDeadlineMissedStatus,
) {
}
fn on_offered_incompatible_qos(
&self,
_writer: InstanceHandle,
_status: OfferedIncompatibleQosStatus,
) {
}
fn on_liveliness_lost(&self, _writer: InstanceHandle, _status: LivelinessLostStatus) {}
fn on_publication_matched(&self, _writer: InstanceHandle, _status: PublicationMatchedStatus) {}
}
pub trait DataReaderListener: Send + Sync {
fn on_data_available(&self, _reader: InstanceHandle) {}
fn on_sample_lost(&self, _reader: InstanceHandle, _status: SampleLostStatus) {}
fn on_sample_rejected(&self, _reader: InstanceHandle, _status: SampleRejectedStatus) {}
fn on_requested_deadline_missed(
&self,
_reader: InstanceHandle,
_status: RequestedDeadlineMissedStatus,
) {
}
fn on_requested_incompatible_qos(
&self,
_reader: InstanceHandle,
_status: RequestedIncompatibleQosStatus,
) {
}
fn on_liveliness_changed(&self, _reader: InstanceHandle, _status: LivelinessChangedStatus) {}
fn on_subscription_matched(&self, _reader: InstanceHandle, _status: SubscriptionMatchedStatus) {
}
}
pub trait SubscriberListener: Send + Sync {
fn on_data_on_readers(&self, _subscriber: InstanceHandle) {}
fn on_data_available(&self, _reader: InstanceHandle) {}
fn on_sample_lost(&self, _reader: InstanceHandle, _status: SampleLostStatus) {}
fn on_sample_rejected(&self, _reader: InstanceHandle, _status: SampleRejectedStatus) {}
fn on_requested_deadline_missed(
&self,
_reader: InstanceHandle,
_status: RequestedDeadlineMissedStatus,
) {
}
fn on_requested_incompatible_qos(
&self,
_reader: InstanceHandle,
_status: RequestedIncompatibleQosStatus,
) {
}
fn on_liveliness_changed(&self, _reader: InstanceHandle, _status: LivelinessChangedStatus) {}
fn on_subscription_matched(&self, _reader: InstanceHandle, _status: SubscriptionMatchedStatus) {
}
}
pub trait DomainParticipantListener: Send + Sync {
fn on_inconsistent_topic(&self, _topic: InstanceHandle, _status: InconsistentTopicStatus) {}
fn on_offered_deadline_missed(
&self,
_writer: InstanceHandle,
_status: OfferedDeadlineMissedStatus,
) {
}
fn on_offered_incompatible_qos(
&self,
_writer: InstanceHandle,
_status: OfferedIncompatibleQosStatus,
) {
}
fn on_liveliness_lost(&self, _writer: InstanceHandle, _status: LivelinessLostStatus) {}
fn on_publication_matched(&self, _writer: InstanceHandle, _status: PublicationMatchedStatus) {}
fn on_data_on_readers(&self, _subscriber: InstanceHandle) {}
fn on_data_available(&self, _reader: InstanceHandle) {}
fn on_sample_lost(&self, _reader: InstanceHandle, _status: SampleLostStatus) {}
fn on_sample_rejected(&self, _reader: InstanceHandle, _status: SampleRejectedStatus) {}
fn on_requested_deadline_missed(
&self,
_reader: InstanceHandle,
_status: RequestedDeadlineMissedStatus,
) {
}
fn on_requested_incompatible_qos(
&self,
_reader: InstanceHandle,
_status: RequestedIncompatibleQosStatus,
) {
}
fn on_liveliness_changed(&self, _reader: InstanceHandle, _status: LivelinessChangedStatus) {}
fn on_subscription_matched(&self, _reader: InstanceHandle, _status: SubscriptionMatchedStatus) {
}
}
pub type BoxedTopicListener = Box<dyn TopicListener>;
pub type BoxedDataWriterListener = Box<dyn DataWriterListener>;
pub type BoxedPublisherListener = Box<dyn PublisherListener>;
pub type BoxedDataReaderListener = Box<dyn DataReaderListener>;
pub type BoxedSubscriberListener = Box<dyn SubscriberListener>;
pub type BoxedDomainParticipantListener = Box<dyn DomainParticipantListener>;
pub type ArcTopicListener = alloc::sync::Arc<dyn TopicListener>;
pub type ArcDataWriterListener = alloc::sync::Arc<dyn DataWriterListener>;
pub type ArcPublisherListener = alloc::sync::Arc<dyn PublisherListener>;
pub type ArcDataReaderListener = alloc::sync::Arc<dyn DataReaderListener>;
pub type ArcSubscriberListener = alloc::sync::Arc<dyn SubscriberListener>;
pub type ArcDomainParticipantListener = alloc::sync::Arc<dyn DomainParticipantListener>;
#[inline]
#[must_use]
pub fn listener_handles(listener_present: bool, mask: StatusMask, status_bit: u32) -> bool {
listener_present && (mask & status_bit) != 0
}
#[must_use]
pub fn status_bit_for_inconsistent_topic() -> u32 {
status_bits::INCONSISTENT_TOPIC
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
use core::sync::atomic::{AtomicU32, Ordering};
#[test]
fn topic_listener_is_object_safe() {
struct Counter(AtomicU32);
impl TopicListener for Counter {
fn on_inconsistent_topic(
&self,
_topic: InstanceHandle,
_status: InconsistentTopicStatus,
) {
self.0.fetch_add(1, Ordering::Relaxed);
}
}
let _: BoxedTopicListener = Box::new(Counter(AtomicU32::new(0)));
}
#[test]
fn datawriter_listener_is_object_safe() {
struct L;
impl DataWriterListener for L {}
let _: BoxedDataWriterListener = Box::new(L);
}
#[test]
fn publisher_listener_is_object_safe() {
struct L;
impl PublisherListener for L {}
let _: BoxedPublisherListener = Box::new(L);
}
#[test]
fn datareader_listener_is_object_safe() {
struct L;
impl DataReaderListener for L {}
let _: BoxedDataReaderListener = Box::new(L);
}
#[test]
fn subscriber_listener_is_object_safe() {
struct L;
impl SubscriberListener for L {}
let _: BoxedSubscriberListener = Box::new(L);
}
#[test]
fn participant_listener_is_object_safe() {
struct L;
impl DomainParticipantListener for L {}
let _: BoxedDomainParticipantListener = Box::new(L);
}
#[test]
fn default_callbacks_do_not_panic() {
struct Noop;
impl TopicListener for Noop {}
impl DataWriterListener for Noop {}
impl PublisherListener for Noop {}
impl DataReaderListener for Noop {}
impl SubscriberListener for Noop {}
impl DomainParticipantListener for Noop {}
let _: BoxedDomainParticipantListener = Box::new(Noop);
}
#[test]
fn listener_handles_respects_mask_and_presence() {
let bit = status_bit_for_inconsistent_topic();
assert!(listener_handles(true, bit, bit));
assert!(!listener_handles(false, bit, bit));
assert!(!listener_handles(true, 0, bit));
assert!(!listener_handles(true, status_bits::SAMPLE_LOST, bit));
}
#[test]
fn status_bit_for_inconsistent_topic_matches_psm() {
assert_eq!(
status_bit_for_inconsistent_topic(),
status_bits::INCONSISTENT_TOPIC
);
}
#[test]
fn all_listener_traits_default_methods_invoke_safely() {
struct Noop;
impl TopicListener for Noop {}
impl DataWriterListener for Noop {}
impl PublisherListener for Noop {}
impl DataReaderListener for Noop {}
impl SubscriberListener for Noop {}
impl DomainParticipantListener for Noop {}
let h = InstanceHandle::from_raw(1);
let n = Noop;
TopicListener::on_inconsistent_topic(&n, h, InconsistentTopicStatus::default());
DataWriterListener::on_offered_deadline_missed(
&n,
h,
OfferedDeadlineMissedStatus::default(),
);
DataWriterListener::on_offered_incompatible_qos(
&n,
h,
OfferedIncompatibleQosStatus::default(),
);
DataWriterListener::on_liveliness_lost(&n, h, LivelinessLostStatus::default());
DataWriterListener::on_publication_matched(&n, h, PublicationMatchedStatus::default());
PublisherListener::on_offered_deadline_missed(
&n,
h,
OfferedDeadlineMissedStatus::default(),
);
PublisherListener::on_offered_incompatible_qos(
&n,
h,
OfferedIncompatibleQosStatus::default(),
);
PublisherListener::on_liveliness_lost(&n, h, LivelinessLostStatus::default());
PublisherListener::on_publication_matched(&n, h, PublicationMatchedStatus::default());
DataReaderListener::on_data_available(&n, h);
DataReaderListener::on_sample_lost(&n, h, SampleLostStatus::default());
DataReaderListener::on_sample_rejected(&n, h, SampleRejectedStatus::default());
DataReaderListener::on_requested_deadline_missed(
&n,
h,
RequestedDeadlineMissedStatus::default(),
);
DataReaderListener::on_requested_incompatible_qos(
&n,
h,
RequestedIncompatibleQosStatus::default(),
);
DataReaderListener::on_liveliness_changed(&n, h, LivelinessChangedStatus::default());
DataReaderListener::on_subscription_matched(&n, h, SubscriptionMatchedStatus::default());
SubscriberListener::on_data_on_readers(&n, h);
SubscriberListener::on_data_available(&n, h);
SubscriberListener::on_sample_lost(&n, h, SampleLostStatus::default());
SubscriberListener::on_sample_rejected(&n, h, SampleRejectedStatus::default());
SubscriberListener::on_requested_deadline_missed(
&n,
h,
RequestedDeadlineMissedStatus::default(),
);
SubscriberListener::on_requested_incompatible_qos(
&n,
h,
RequestedIncompatibleQosStatus::default(),
);
SubscriberListener::on_liveliness_changed(&n, h, LivelinessChangedStatus::default());
SubscriberListener::on_subscription_matched(&n, h, SubscriptionMatchedStatus::default());
DomainParticipantListener::on_inconsistent_topic(&n, h, InconsistentTopicStatus::default());
DomainParticipantListener::on_offered_deadline_missed(
&n,
h,
OfferedDeadlineMissedStatus::default(),
);
DomainParticipantListener::on_offered_incompatible_qos(
&n,
h,
OfferedIncompatibleQosStatus::default(),
);
DomainParticipantListener::on_liveliness_lost(&n, h, LivelinessLostStatus::default());
DomainParticipantListener::on_publication_matched(
&n,
h,
PublicationMatchedStatus::default(),
);
DomainParticipantListener::on_data_on_readers(&n, h);
DomainParticipantListener::on_data_available(&n, h);
DomainParticipantListener::on_sample_lost(&n, h, SampleLostStatus::default());
DomainParticipantListener::on_sample_rejected(&n, h, SampleRejectedStatus::default());
DomainParticipantListener::on_requested_deadline_missed(
&n,
h,
RequestedDeadlineMissedStatus::default(),
);
DomainParticipantListener::on_requested_incompatible_qos(
&n,
h,
RequestedIncompatibleQosStatus::default(),
);
DomainParticipantListener::on_liveliness_changed(&n, h, LivelinessChangedStatus::default());
DomainParticipantListener::on_subscription_matched(
&n,
h,
SubscriptionMatchedStatus::default(),
);
}
#[test]
fn datareader_listener_call_runs_default_methods() {
struct Counters {
avail: AtomicU32,
lost: AtomicU32,
}
impl DataReaderListener for Counters {
fn on_data_available(&self, _r: InstanceHandle) {
self.avail.fetch_add(1, Ordering::Relaxed);
}
fn on_sample_lost(&self, _r: InstanceHandle, _s: SampleLostStatus) {
self.lost.fetch_add(1, Ordering::Relaxed);
}
}
let c = Counters {
avail: AtomicU32::new(0),
lost: AtomicU32::new(0),
};
let h = InstanceHandle::from_raw(1);
c.on_data_available(h);
c.on_data_available(h);
c.on_sample_lost(h, SampleLostStatus::default());
c.on_subscription_matched(h, SubscriptionMatchedStatus::default());
assert_eq!(c.avail.load(Ordering::Relaxed), 2);
assert_eq!(c.lost.load(Ordering::Relaxed), 1);
}
}