use crate::Result;
use crate::internal::ffi;
use crate::internal::traits::AsFfi;
use crate::status::{
InconsistentTopic, LivelinessChanged, LivelinessLost, OfferedDeadlineMissed,
OfferedIncompatibleQoS, PublicationMatched, RequestedDeadlineMissed, RequestedIncompatibleQoS,
SampleLost, SampleRejected, SubscriptionMatched,
};
#[derive(Debug, Default, Clone, Copy)]
pub struct Listener {
subscriber: SubscriberListener,
publisher: PublisherListener,
}
#[derive(Debug, Clone, Copy)]
pub struct TopicListener<T>
where
T: crate::Topicable,
{
inconsistent_topic: Option<fn(&crate::Topic<'_, '_, T>, InconsistentTopic)>,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct SubscriberListener {
data_on_readers: Option<fn(&crate::Subscriber<'_, '_>)>,
}
#[derive(Debug, Clone, Copy)]
pub struct ReaderListener<T>
where
T: crate::Topicable,
{
sample_lost: Option<fn(&crate::Reader<'_, '_, '_, T>, SampleLost)>,
data_available: Option<fn(&crate::Reader<'_, '_, '_, T>)>,
sample_rejected: Option<fn(&crate::Reader<'_, '_, '_, T>, SampleRejected)>,
liveliness_changed: Option<fn(&crate::Reader<'_, '_, '_, T>, LivelinessChanged)>,
requested_deadline_missed: Option<fn(&crate::Reader<'_, '_, '_, T>, RequestedDeadlineMissed)>,
requested_incompatible_qos: Option<fn(&crate::Reader<'_, '_, '_, T>, RequestedIncompatibleQoS)>,
subscription_matched: Option<fn(&crate::Reader<'_, '_, '_, T>, SubscriptionMatched)>,
}
#[derive(Debug, Default, Clone, Copy)]
pub struct PublisherListener {
}
#[derive(Debug, Clone, Copy)]
pub struct WriterListener<T>
where
T: crate::Topicable,
{
liveliness_lost: Option<fn(&crate::Writer<'_, '_, '_, T>, LivelinessLost)>,
offered_deadline_missed: Option<fn(&crate::Writer<'_, '_, '_, T>, OfferedDeadlineMissed)>,
offered_incompatible_qos: Option<fn(&crate::Writer<'_, '_, '_, T>, OfferedIncompatibleQoS)>,
publication_matched: Option<fn(&crate::Writer<'_, '_, '_, T>, PublicationMatched)>,
}
impl<T> Default for TopicListener<T>
where
T: crate::Topicable,
{
fn default() -> Self {
Self {
inconsistent_topic: Option::default(),
}
}
}
impl<T> Default for ReaderListener<T>
where
T: crate::Topicable,
{
fn default() -> Self {
Self {
sample_lost: Option::default(),
data_available: Option::default(),
sample_rejected: Option::default(),
liveliness_changed: Option::default(),
requested_deadline_missed: Option::default(),
requested_incompatible_qos: Option::default(),
subscription_matched: Option::default(),
}
}
}
impl<T> Default for WriterListener<T>
where
T: crate::Topicable,
{
fn default() -> Self {
Self {
liveliness_lost: Option::default(),
offered_deadline_missed: Option::default(),
offered_incompatible_qos: Option::default(),
publication_matched: Option::default(),
}
}
}
impl Listener {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_subscriber(mut self, setter: fn(SubscriberListener) -> SubscriberListener) -> Self {
self.subscriber = setter(self.subscriber);
self
}
#[must_use]
pub fn with_publisher(mut self, setter: fn(PublisherListener) -> PublisherListener) -> Self {
self.publisher = setter(self.publisher);
self
}
#[inline]
pub(crate) fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
self.subscriber.apply_listener_ffi(listener);
self.publisher.apply_listener_ffi(listener);
}
}
impl AsFfi for Listener {
type Target<'a> = Result<ffi::Listener>;
#[inline]
fn as_ffi(&self) -> Self::Target<'_> {
ffi::Listener::new().map(|mut listener| {
self.apply_listener_ffi(&mut listener);
listener
})
}
}
impl<T> TopicListener<T>
where
T: crate::Topicable,
{
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_inconsistent_topic(
mut self,
callback: fn(&crate::Topic<'_, '_, T>, InconsistentTopic),
) -> Self {
self.inconsistent_topic = Some(callback);
self
}
#[inline]
pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
if let Some(callback) = self.inconsistent_topic {
ffi::dds_listener_set_inconsistent_topic(listener, callback);
}
}
}
impl<T> AsFfi for TopicListener<T>
where
T: crate::Topicable,
{
type Target<'a>
= Result<ffi::Listener>
where
T: 'a;
#[inline]
fn as_ffi(&self) -> Self::Target<'_> {
ffi::Listener::new().map(|mut listener| {
self.apply_listener_ffi(&mut listener);
listener
})
}
}
impl SubscriberListener {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_data_on_readers(mut self, callback: fn(&crate::Subscriber<'_, '_>)) -> Self {
self.data_on_readers = Some(callback);
self
}
#[inline]
pub(crate) fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
if let Some(callback) = self.data_on_readers {
ffi::dds_listener_set_data_on_readers(listener, callback);
}
}
}
impl AsFfi for SubscriberListener {
type Target<'a> = Result<ffi::Listener>;
#[inline]
fn as_ffi(&self) -> Self::Target<'_> {
ffi::Listener::new().map(|mut listener| {
self.apply_listener_ffi(&mut listener);
listener
})
}
}
impl PublisherListener {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub(crate) const fn apply_listener_ffi(self, listener: &mut ffi::Listener) {
let _ = self;
let _ = listener;
}
}
impl AsFfi for PublisherListener {
type Target<'a> = Result<ffi::Listener>;
#[inline]
fn as_ffi(&self) -> Self::Target<'_> {
ffi::Listener::new().map(|mut listener| {
self.apply_listener_ffi(&mut listener);
listener
})
}
}
impl<T> ReaderListener<T>
where
T: crate::Topicable,
{
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_sample_lost(
mut self,
callback: fn(&crate::Reader<'_, '_, '_, T>, SampleLost),
) -> Self {
self.sample_lost = Some(callback);
self
}
#[must_use]
pub fn with_data_available(mut self, callback: fn(&crate::Reader<'_, '_, '_, T>)) -> Self {
self.data_available = Some(callback);
self
}
#[must_use]
pub fn with_sample_rejected(
mut self,
callback: fn(&crate::Reader<'_, '_, '_, T>, SampleRejected),
) -> Self {
self.sample_rejected = Some(callback);
self
}
#[must_use]
pub fn with_liveliness_changed(
mut self,
callback: fn(&crate::Reader<'_, '_, '_, T>, LivelinessChanged),
) -> Self {
self.liveliness_changed = Some(callback);
self
}
#[must_use]
pub fn with_requested_deadline_missed(
mut self,
callback: fn(&crate::Reader<'_, '_, '_, T>, RequestedDeadlineMissed),
) -> Self {
self.requested_deadline_missed = Some(callback);
self
}
#[must_use]
pub fn with_requested_incompatible_qos(
mut self,
callback: fn(&crate::Reader<'_, '_, '_, T>, RequestedIncompatibleQoS),
) -> Self {
self.requested_incompatible_qos = Some(callback);
self
}
#[must_use]
pub fn with_subscription_matched(
mut self,
callback: fn(&crate::Reader<'_, '_, '_, T>, SubscriptionMatched),
) -> Self {
self.subscription_matched = Some(callback);
self
}
#[inline]
pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
if let Some(callback) = self.sample_lost {
ffi::dds_listener_set_sample_lost(listener, callback);
}
if let Some(callback) = self.data_available {
ffi::dds_listener_set_data_available(listener, callback);
}
if let Some(callback) = self.sample_rejected {
ffi::dds_listener_set_sample_rejected(listener, callback);
}
if let Some(callback) = self.liveliness_changed {
ffi::dds_listener_set_liveliness_changed(listener, callback);
}
if let Some(callback) = self.requested_deadline_missed {
ffi::dds_listener_set_requested_deadline_missed(listener, callback);
}
if let Some(callback) = self.requested_incompatible_qos {
ffi::dds_listener_set_requested_incompatible_qos(listener, callback);
}
if let Some(callback) = self.subscription_matched {
ffi::dds_listener_set_subscription_matched(listener, callback);
}
}
}
impl<T> AsFfi for ReaderListener<T>
where
T: crate::Topicable,
{
type Target<'a>
= Result<ffi::Listener>
where
T: 'a;
#[inline]
fn as_ffi(&self) -> Self::Target<'_> {
ffi::Listener::new().map(|mut listener| {
self.apply_listener_ffi(&mut listener);
listener
})
}
}
impl<T> WriterListener<T>
where
T: crate::Topicable,
{
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_liveliness_lost(
mut self,
callback: fn(&crate::Writer<'_, '_, '_, T>, LivelinessLost),
) -> Self {
self.liveliness_lost = Some(callback);
self
}
#[must_use]
pub fn with_offered_deadline_missed(
mut self,
callback: fn(&crate::Writer<'_, '_, '_, T>, OfferedDeadlineMissed),
) -> Self {
self.offered_deadline_missed = Some(callback);
self
}
#[must_use]
pub fn with_offered_incompatible_qos(
mut self,
callback: fn(&crate::Writer<'_, '_, '_, T>, OfferedIncompatibleQoS),
) -> Self {
self.offered_incompatible_qos = Some(callback);
self
}
#[must_use]
pub fn with_publication_matched(
mut self,
callback: fn(&crate::Writer<'_, '_, '_, T>, PublicationMatched),
) -> Self
where
T: crate::Topicable,
{
self.publication_matched = Some(callback);
self
}
#[inline]
pub(crate) fn apply_listener_ffi(&self, listener: &mut ffi::Listener) {
if let Some(callback) = self.liveliness_lost {
ffi::dds_listener_set_liveliness_lost(listener, callback);
}
if let Some(callback) = self.offered_deadline_missed {
ffi::dds_listener_set_offered_deadline_missed(listener, callback);
}
if let Some(callback) = self.offered_incompatible_qos {
ffi::dds_listener_set_offered_incompatible_qos(listener, callback);
}
if let Some(callback) = self.publication_matched {
ffi::dds_listener_set_publication_matched(listener, callback);
}
}
}
impl<T> AsFfi for WriterListener<T>
where
T: crate::Topicable,
{
type Target<'a>
= Result<ffi::Listener>
where
T: 'a;
#[inline]
fn as_ffi(&self) -> Self::Target<'_> {
ffi::Listener::new().map(|mut listener| {
self.apply_listener_ffi(&mut listener);
listener
})
}
}
impl<T> AsRef<ReaderListener<T>> for ReaderListener<T>
where
T: crate::Topicable,
{
fn as_ref(&self) -> &ReaderListener<T> {
self
}
}
impl<T> AsRef<WriterListener<T>> for WriterListener<T>
where
T: crate::Topicable,
{
fn as_ref(&self) -> &WriterListener<T> {
self
}
}
impl AsRef<SubscriberListener> for SubscriberListener {
fn as_ref(&self) -> &SubscriberListener {
self
}
}
impl AsRef<PublisherListener> for PublisherListener {
fn as_ref(&self) -> &PublisherListener {
self
}
}
impl<T> AsRef<TopicListener<T>> for TopicListener<T>
where
T: crate::Topicable,
{
fn as_ref(&self) -> &TopicListener<T> {
self
}
}
impl AsRef<Listener> for Listener {
fn as_ref(&self) -> &Listener {
self
}
}
impl AsRef<SubscriberListener> for Listener {
fn as_ref(&self) -> &SubscriberListener {
&self.subscriber
}
}
impl AsRef<PublisherListener> for Listener {
fn as_ref(&self) -> &PublisherListener {
&self.publisher
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Topicable;
fn receive_listener<L>(listener: L)
where
L: AsRef<Listener>,
{
let _ = listener.as_ref();
}
fn receive_topic_listener<L, T>(listener: L)
where
L: AsRef<TopicListener<T>>,
T: crate::Topicable,
{
let _ = listener.as_ref();
}
fn receive_subscriber_listener<L>(listener: L)
where
L: AsRef<SubscriberListener>,
{
let _ = listener.as_ref();
}
fn receive_publisher_listener<L>(listener: L)
where
L: AsRef<PublisherListener>,
{
let _ = listener.as_ref();
}
fn receive_reader_listener<L, T>(listener: L)
where
L: AsRef<ReaderListener<T>>,
T: crate::Topicable,
{
let _ = listener.as_ref();
}
fn receive_writer_listener<L, T>(listener: L)
where
L: AsRef<WriterListener<T>>,
T: crate::Topicable,
{
let _ = listener.as_ref();
}
#[test]
fn test_listener_create() {
let listener = Listener::new()
.with_subscriber(|subscriber| {
subscriber.with_data_on_readers(|_| ())
})
.with_publisher(|publisher| {
publisher
});
let topic_listener =
TopicListener::<crate::tests::topic::Data>::new().with_inconsistent_topic(|_, _| ());
let subscriber_listener = SubscriberListener::new()
.with_data_on_readers(|_| ())
;
let publisher_listener =
PublisherListener::new()
;
let reader_listener = ReaderListener::<crate::tests::topic::Data>::new()
.with_data_available(|_| ())
.with_liveliness_changed(|_, _| ())
.with_requested_deadline_missed(|_, _| ())
.with_requested_incompatible_qos(|_, _| ())
.with_sample_lost(|_, _| ())
.with_sample_rejected(|_, _| ())
.with_subscription_matched(|_, _| ());
let writer_listener = WriterListener::<crate::tests::topic::Data>::new()
.with_liveliness_lost(|_, _| ())
.with_offered_deadline_missed(|_, _| ())
.with_offered_incompatible_qos(|_, _| ())
.with_publication_matched(|_, _| ());
receive_listener(listener);
receive_topic_listener(&topic_listener);
receive_subscriber_listener(subscriber_listener);
receive_subscriber_listener(listener);
receive_publisher_listener(publisher_listener);
receive_publisher_listener(listener);
receive_reader_listener(&reader_listener);
receive_writer_listener(&writer_listener);
}
#[test]
fn test_subscriber_listener_callbacks() {
#[derive(Debug, PartialEq)]
struct Triggered {
data_on_readers: u32,
}
static TRIGGERED: std::sync::Mutex<Triggered> =
std::sync::Mutex::new(Triggered { data_on_readers: 0 });
let domain_id = crate::tests::domain::unique_id();
let topic_name = crate::tests::topic::unique_name();
let domain = crate::Domain::new(domain_id).unwrap();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let subscriber = crate::Subscriber::builder(&participant)
.with_listener(
crate::SubscriberListener::new().with_data_on_readers(|_subscriber| {
TRIGGERED.lock().unwrap().data_on_readers += 1;
}),
)
.build()
.unwrap();
let reader = crate::Reader::builder(&topic)
.with_subscriber(&subscriber)
.build()
.unwrap();
let writer = crate::Writer::new(&topic).unwrap();
let sample = crate::tests::topic::Data::default();
writer.write(&sample).unwrap();
let samples = reader.read().unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(*samples[0], sample);
assert_eq!(*TRIGGERED.lock().unwrap(), Triggered { data_on_readers: 1 });
}
#[test]
fn test_publisher_listener_callbacks() {
let domain_id = crate::tests::domain::unique_id();
let topic_name = crate::tests::topic::unique_name();
let domain = crate::Domain::new(domain_id).unwrap();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
let publisher = crate::Publisher::builder(&participant)
.with_listener(crate::PublisherListener::new())
.build()
.unwrap();
let reader = crate::Reader::new(&topic).unwrap();
let writer = crate::Writer::builder(&topic)
.with_publisher(&publisher)
.build()
.unwrap();
let sample = crate::tests::topic::Data::default();
writer.write(&sample).unwrap();
let samples = reader.read().unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(*samples[0], sample);
}
#[test]
fn test_reader_listener_callbacks() {
#[derive(Debug, PartialEq)]
struct Triggered {
requested_incompatible_qos: u32,
requested_deadline_missed: bool,
sample_rejected: u32,
data_available: u32,
subscription_matched: u32,
liveliness_changed: u32,
sample_lost: u32,
}
static TRIGGERED: std::sync::Mutex<Triggered> = std::sync::Mutex::new(Triggered {
requested_incompatible_qos: 0,
requested_deadline_missed: false,
sample_rejected: 0,
data_available: 0,
subscription_matched: 0,
liveliness_changed: 0,
sample_lost: 0,
});
let domain_id = crate::tests::domain::unique_id();
let topic_name = crate::tests::topic::unique_name();
let domain = crate::Domain::new(domain_id).unwrap();
let participant = crate::Participant::new(&domain).unwrap();
let qos = crate::QoS::new()
.with_destination_order(crate::qos::policy::DestinationOrder::BySourceTimestamp);
let topic = crate::Topic::<crate::tests::topic::Data>::builder(&participant, &topic_name)
.with_qos(&qos)
.build()
.unwrap();
{
let _writer = crate::Writer::new(&topic).unwrap();
let _reader = crate::Reader::builder(&topic)
.with_qos(
&crate::QoS::new().with_durability(crate::qos::policy::Durability::Persistent),
)
.with_listener(
crate::ReaderListener::new().with_requested_incompatible_qos(
|_reader, _metadata| {
TRIGGERED.lock().unwrap().requested_incompatible_qos += 1;
},
),
)
.build()
.unwrap();
}
{
let qos = crate::QoS::new().with_deadline(crate::qos::policy::Deadline {
period: crate::Duration::from_nanos(1_000_000),
});
let reader = crate::Reader::builder(&topic)
.with_qos(&qos)
.with_listener(crate::ReaderListener::new().with_requested_deadline_missed(
|_reader, _metadata| {
TRIGGERED.lock().unwrap().requested_deadline_missed |= true;
},
))
.build()
.unwrap();
let writer = crate::Writer::builder(&topic)
.with_qos(&qos)
.build()
.unwrap();
let sample = crate::tests::topic::Data::default();
writer.write(&sample).unwrap();
let samples = reader.take().unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(*samples[0], sample);
while !TRIGGERED.lock().unwrap().requested_deadline_missed {
std::thread::sleep(std::time::Duration::from_nanos(50));
}
}
{
let reader = crate::Reader::builder(&topic)
.with_qos(&crate::QoS::new().with_resource_limits(
crate::qos::policy::ResourceLimits {
max_samples: crate::qos::policy::ResourceLimit::Unlimited,
max_instances: crate::qos::policy::ResourceLimit::Limited(1),
max_samples_per_instance: crate::qos::policy::ResourceLimit::Unlimited,
},
))
.with_listener(crate::ReaderListener::new().with_sample_rejected(
|_reader, _metadata| {
TRIGGERED.lock().unwrap().sample_rejected += 1;
},
))
.build()
.unwrap();
let writer = crate::Writer::new(&topic).unwrap();
let sample = crate::tests::topic::Data {
x: 1,
y: 2,
..crate::tests::topic::Data::default()
};
writer.write(&sample).unwrap();
writer
.write(&crate::tests::topic::Data {
x: 2,
y: 3,
..crate::tests::topic::Data::default()
})
.unwrap();
let samples = reader.take().unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(*samples[0], sample);
}
{
let reader = crate::Reader::builder(&topic)
.with_listener(
crate::ReaderListener::new()
.with_data_available(|_reader| {
TRIGGERED.lock().unwrap().data_available += 1;
})
.with_subscription_matched(|_reader, _matched| {
TRIGGERED.lock().unwrap().subscription_matched += 1;
})
.with_liveliness_changed(|_reader, _changed| {
TRIGGERED.lock().unwrap().liveliness_changed += 1;
})
.with_sample_lost(|_reader, _metadata| {
TRIGGERED.lock().unwrap().sample_lost += 1;
}),
)
.build()
.unwrap();
let writer = crate::Writer::new(&topic).unwrap();
let sample = crate::tests::topic::Data::default();
writer.write(&sample).unwrap();
let key = sample.as_key();
writer
.unregister_instance_with_timestamp(
&key,
(std::time::SystemTime::now() - std::time::Duration::from_secs(1))
.try_into()
.unwrap(),
)
.unwrap();
let samples = reader.take().unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(*samples[0], sample);
assert_eq!(
*TRIGGERED.lock().unwrap(),
Triggered {
requested_incompatible_qos: 1,
requested_deadline_missed: true,
sample_rejected: 1,
data_available: 2,
subscription_matched: 1,
liveliness_changed: 1,
sample_lost: 1,
}
);
}
}
#[test]
fn test_writer_listener_callbacks() {
#[derive(Debug, PartialEq)]
struct Triggered {
liveliness_lost: bool,
offered_deadline_missed: bool,
offered_incompatible_qos: u32,
publication_matched: u32,
}
static TRIGGERED: std::sync::Mutex<Triggered> = std::sync::Mutex::new(Triggered {
liveliness_lost: false,
offered_deadline_missed: false,
offered_incompatible_qos: 0,
publication_matched: 0,
});
let domain_id = crate::tests::domain::unique_id();
let topic_name = crate::tests::topic::unique_name();
let domain = crate::Domain::new(domain_id).unwrap();
let participant = crate::Participant::new(&domain).unwrap();
let topic =
crate::Topic::<crate::tests::topic::Data>::new(&participant, &topic_name).unwrap();
{
let _reader = crate::Reader::builder(&topic)
.with_qos(
&crate::QoS::new().with_durability(crate::qos::policy::Durability::Persistent),
)
.build()
.unwrap();
let _writer = crate::Writer::builder(&topic)
.with_listener(crate::WriterListener::new().with_offered_incompatible_qos(
|_writer, _metadata| {
TRIGGERED.lock().unwrap().offered_incompatible_qos += 1;
},
))
.build()
.unwrap();
}
{
let qos = crate::QoS::new().with_deadline(crate::qos::policy::Deadline {
period: crate::Duration::from_nanos(1_000_000),
});
let writer = crate::Writer::builder(&topic)
.with_qos(&qos)
.with_listener(crate::WriterListener::new().with_offered_deadline_missed(
|_writer, _metadata| {
TRIGGERED.lock().unwrap().offered_deadline_missed |= true;
},
))
.build()
.unwrap();
let reader = crate::Reader::builder(&topic)
.with_qos(&qos)
.build()
.unwrap();
let sample = crate::tests::topic::Data::default();
writer.write(&sample).unwrap();
let samples = reader.take().unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(*samples[0], sample);
while !TRIGGERED.lock().unwrap().offered_deadline_missed {
std::thread::sleep(std::time::Duration::from_nanos(50));
}
}
{
let writer = crate::Writer::builder(&topic)
.with_listener(
crate::WriterListener::new()
.with_liveliness_lost(|_writer, _metadata| {
TRIGGERED.lock().unwrap().liveliness_lost |= true;
})
.with_publication_matched(|_writer, _metadata| {
TRIGGERED.lock().unwrap().publication_matched += 1;
}),
)
.with_qos(&crate::QoS::new().with_liveliness(
crate::qos::policy::Liveliness::ManualByParticipant {
lease_duration: crate::Duration::from_nanos(1_000_000),
},
))
.build()
.unwrap();
let reader = crate::Reader::new(&topic).unwrap();
let sample = crate::tests::topic::Data::default();
writer.write(&sample).unwrap();
let key = sample.as_key();
writer
.unregister_instance_with_timestamp(
&key,
(std::time::SystemTime::now() - std::time::Duration::from_secs(1))
.try_into()
.unwrap(),
)
.unwrap();
let samples = reader.take().unwrap();
assert_eq!(samples.len(), 1);
assert_eq!(*samples[0], sample);
while !TRIGGERED.lock().unwrap().liveliness_lost {
std::thread::sleep(std::time::Duration::from_nanos(50));
}
}
assert_eq!(
*TRIGGERED.lock().unwrap(),
Triggered {
liveliness_lost: true,
offered_deadline_missed: true,
offered_incompatible_qos: 1,
publication_matched: 2,
}
);
}
}