use tracing::warn;
use super::{condition::StatusConditionAsync, subscriber::SubscriberAsync};
use crate::{
builtin_topics::PublicationBuiltinTopicData,
dcps::{
actor::ActorAddress,
channels::{mpsc::MpscSender, oneshot::oneshot},
domain_participant_mail::{DcpsDomainParticipantMail, ReaderServiceMail},
listeners::data_reader_listener::DcpsDataReaderListener,
status_condition::DcpsStatusCondition,
},
dds_async::{
data_reader_listener::DataReaderListener, topic_description::TopicDescriptionAsync,
},
infrastructure::{
error::DdsResult,
instance::InstanceHandle,
qos::{DataReaderQos, QosKind},
sample_info::{
ANY_INSTANCE_STATE, ANY_VIEW_STATE, InstanceStateKind, Sample, SampleStateKind,
ViewStateKind,
},
status::{
LivelinessChangedStatus, RequestedDeadlineMissedStatus, RequestedIncompatibleQosStatus,
SampleLostStatus, SampleRejectedStatus, StatusKind, SubscriptionMatchedStatus,
},
time::Duration,
type_support::TypeSupport,
},
};
use alloc::{vec, vec::Vec};
use core::marker::PhantomData;
pub struct DataReaderAsync<Foo> {
handle: InstanceHandle,
status_condition_address: ActorAddress<DcpsStatusCondition>,
subscriber: SubscriberAsync,
topic: TopicDescriptionAsync,
phantom: PhantomData<Foo>,
}
impl<Foo> DataReaderAsync<Foo> {
pub(crate) fn new(
handle: InstanceHandle,
status_condition_address: ActorAddress<DcpsStatusCondition>,
subscriber: SubscriberAsync,
topic: TopicDescriptionAsync,
) -> Self {
Self {
handle,
status_condition_address,
subscriber,
topic,
phantom: PhantomData,
}
}
pub(crate) fn participant_address(&self) -> &MpscSender<DcpsDomainParticipantMail> {
self.subscriber.participant_address()
}
pub(crate) fn change_foo_type<T>(self) -> DataReaderAsync<T> {
DataReaderAsync {
handle: self.handle,
status_condition_address: self.status_condition_address,
subscriber: self.subscriber,
topic: self.topic,
phantom: PhantomData,
}
}
}
impl<Foo> Clone for DataReaderAsync<Foo> {
fn clone(&self) -> Self {
Self {
handle: self.handle,
status_condition_address: self.status_condition_address.clone(),
subscriber: self.subscriber.clone(),
topic: self.topic.clone(),
phantom: self.phantom,
}
}
}
impl<Foo: TypeSupport> DataReaderAsync<Foo> {
#[tracing::instrument(skip(self))]
pub async fn read(
&self,
max_samples: i32,
sample_states: &[SampleStateKind],
view_states: &[ViewStateKind],
instance_states: &[InstanceStateKind],
) -> DdsResult<Vec<Sample<Foo>>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(ReaderServiceMail::Read {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_samples,
sample_states: sample_states.to_vec(),
view_states: view_states.to_vec(),
instance_states: instance_states.to_vec(),
specific_instance_handle: None,
reply_sender,
}))
.await?;
let samples = reply_receiver.await??;
Ok(samples
.into_iter()
.map(|(data, sample_info)| Sample::new(data, sample_info))
.collect())
}
#[tracing::instrument(skip(self))]
pub async fn take(
&self,
max_samples: i32,
sample_states: &[SampleStateKind],
view_states: &[ViewStateKind],
instance_states: &[InstanceStateKind],
) -> DdsResult<Vec<Sample<Foo>>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(ReaderServiceMail::Take {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_samples,
sample_states: sample_states.to_vec(),
view_states: view_states.to_vec(),
instance_states: instance_states.to_vec(),
specific_instance_handle: None,
reply_sender,
}))
.await?;
let samples = reply_receiver.await??;
Ok(samples
.into_iter()
.map(|(data, sample_info)| Sample::new(data, sample_info))
.collect())
}
#[tracing::instrument(skip(self))]
pub async fn read_next_sample(&self) -> DdsResult<Sample<Foo>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(ReaderServiceMail::Read {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_samples: 1,
sample_states: vec![SampleStateKind::NotRead],
view_states: ANY_VIEW_STATE.to_vec(),
instance_states: ANY_INSTANCE_STATE.to_vec(),
specific_instance_handle: None,
reply_sender,
}))
.await?;
let mut samples = reply_receiver.await??;
let (data, sample_info) = samples.pop().expect("Would return NoData if empty");
Ok(Sample::new(data, sample_info))
}
#[tracing::instrument(skip(self))]
pub async fn take_next_sample(&self) -> DdsResult<Sample<Foo>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(ReaderServiceMail::Take {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_samples: 1,
sample_states: vec![SampleStateKind::NotRead],
view_states: ANY_VIEW_STATE.to_vec(),
instance_states: ANY_INSTANCE_STATE.to_vec(),
specific_instance_handle: None,
reply_sender,
}))
.await?;
let mut samples = reply_receiver.await??;
let (data, sample_info) = samples.pop().expect("Would return NoData if empty");
Ok(Sample::new(data, sample_info))
}
#[tracing::instrument(skip(self))]
pub async fn read_instance(
&self,
max_samples: i32,
a_handle: InstanceHandle,
sample_states: &[SampleStateKind],
view_states: &[ViewStateKind],
instance_states: &[InstanceStateKind],
) -> DdsResult<Vec<Sample<Foo>>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(ReaderServiceMail::Read {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_samples,
sample_states: sample_states.to_vec(),
view_states: view_states.to_vec(),
instance_states: instance_states.to_vec(),
specific_instance_handle: Some(a_handle),
reply_sender,
}))
.await?;
let samples = reply_receiver.await??;
Ok(samples
.into_iter()
.map(|(data, sample_info)| Sample::new(data, sample_info))
.collect())
}
#[tracing::instrument(skip(self))]
pub async fn take_instance(
&self,
max_samples: i32,
a_handle: InstanceHandle,
sample_states: &[SampleStateKind],
view_states: &[ViewStateKind],
instance_states: &[InstanceStateKind],
) -> DdsResult<Vec<Sample<Foo>>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(ReaderServiceMail::Take {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_samples,
sample_states: sample_states.to_vec(),
view_states: view_states.to_vec(),
instance_states: instance_states.to_vec(),
specific_instance_handle: Some(a_handle),
reply_sender,
}))
.await?;
let samples = reply_receiver.await??;
Ok(samples
.into_iter()
.map(|(data, sample_info)| Sample::new(data, sample_info))
.collect())
}
#[tracing::instrument(skip(self))]
pub async fn read_next_instance(
&self,
max_samples: i32,
previous_handle: Option<InstanceHandle>,
sample_states: &[SampleStateKind],
view_states: &[ViewStateKind],
instance_states: &[InstanceStateKind],
) -> DdsResult<Vec<Sample<Foo>>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::ReadNextInstance {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_samples,
previous_handle,
sample_states: sample_states.to_vec(),
view_states: view_states.to_vec(),
instance_states: instance_states.to_vec(),
reply_sender,
},
))
.await?;
let samples = reply_receiver.await??;
Ok(samples
.into_iter()
.map(|(data, sample_info)| Sample::new(data, sample_info))
.collect())
}
#[tracing::instrument(skip(self))]
pub async fn take_next_instance(
&self,
max_samples: i32,
previous_handle: Option<InstanceHandle>,
sample_states: &[SampleStateKind],
view_states: &[ViewStateKind],
instance_states: &[InstanceStateKind],
) -> DdsResult<Vec<Sample<Foo>>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::TakeNextInstance {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_samples,
previous_handle,
sample_states: sample_states.to_vec(),
view_states: view_states.to_vec(),
instance_states: instance_states.to_vec(),
reply_sender,
},
))
.await?;
let samples = reply_receiver.await??;
Ok(samples
.into_iter()
.map(|(data, sample_info)| Sample::new(data, sample_info))
.collect())
}
#[tracing::instrument(skip(self, _key_holder))]
pub async fn get_key_value(
&self,
_key_holder: &mut Foo,
_handle: InstanceHandle,
) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self, _instance))]
pub async fn lookup_instance(&self, _instance: &Foo) -> DdsResult<Option<InstanceHandle>> {
todo!()
}
}
impl<Foo> DataReaderAsync<Foo> {
#[tracing::instrument(skip(self))]
pub async fn get_liveliness_changed_status(&self) -> DdsResult<LivelinessChangedStatus> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn get_requested_deadline_missed_status(
&self,
) -> DdsResult<RequestedDeadlineMissedStatus> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn get_requested_incompatible_qos_status(
&self,
) -> DdsResult<RequestedIncompatibleQosStatus> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn get_sample_lost_status(&self) -> DdsResult<SampleLostStatus> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn get_sample_rejected_status(&self) -> DdsResult<SampleRejectedStatus> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn get_subscription_matched_status(&self) -> DdsResult<SubscriptionMatchedStatus> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::GetSubscriptionMatchedStatus {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub fn get_topicdescription(&self) -> TopicDescriptionAsync {
self.topic.clone()
}
#[tracing::instrument(skip(self))]
pub fn get_subscriber(&self) -> SubscriberAsync {
self.subscriber.clone()
}
#[tracing::instrument(skip(self))]
pub async fn wait_for_historical_data(&self, max_wait: Duration) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::WaitForHistoricalData {
participant_address: self.participant_address().clone(),
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
max_wait,
reply_sender,
},
))
.await?;
reply_receiver.await?.await
}
#[tracing::instrument(skip(self))]
pub async fn get_matched_publication_data(
&self,
publication_handle: InstanceHandle,
) -> DdsResult<PublicationBuiltinTopicData> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::GetMatchedPublicationData {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
publication_handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_matched_publications(&self) -> DdsResult<Vec<InstanceHandle>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::GetMatchedPublications {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
}
impl<Foo> DataReaderAsync<Foo> {
pub async fn set_qos(&self, qos: QosKind<DataReaderQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::SetQos {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
qos,
participant_address: self.participant_address().clone(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_qos(&self) -> DdsResult<DataReaderQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::GetQos {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub fn get_statuscondition(&self) -> StatusConditionAsync {
StatusConditionAsync::new(self.status_condition_address.clone())
}
#[tracing::instrument(skip(self))]
pub async fn get_status_changes(&self) -> DdsResult<Vec<StatusKind>> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn enable(&self) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::Enable {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
participant_address: self.participant_address().clone(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_instance_handle(&self) -> InstanceHandle {
self.handle
}
}
impl<Foo> DataReaderAsync<Foo> {
#[tracing::instrument(skip(self, a_listener))]
pub async fn set_listener(
&self,
a_listener: Option<impl DataReaderListener<Foo> + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
let dcps_listener = a_listener.map(DcpsDataReaderListener::new);
self.participant_address()
.send(DcpsDomainParticipantMail::Reader(
ReaderServiceMail::SetListener {
subscriber_handle: self.subscriber.get_instance_handle().await,
data_reader_handle: self.handle,
dcps_listener,
listener_mask: mask.to_vec(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
}