use super::{
condition::StatusConditionAsync, data_reader::DataReaderAsync,
data_reader_listener::DataReaderListener, domain_participant::DomainParticipantAsync,
subscriber_listener::SubscriberListener,
};
use crate::{
dcps::{
actor::ActorAddress,
channels::{mpsc::MpscSender, oneshot::oneshot},
domain_participant_mail::{DcpsDomainParticipantMail, SubscriberServiceMail},
listeners::{
data_reader_listener::DcpsDataReaderListener,
subscriber_listener::DcpsSubscriberListener,
},
status_condition::DcpsStatusCondition,
},
dds_async::topic_description::TopicDescriptionAsync,
infrastructure::{
error::{DdsError, DdsResult},
instance::InstanceHandle,
qos::{DataReaderQos, QosKind, SubscriberQos, TopicQos},
status::{SampleLostStatus, StatusKind},
},
};
use alloc::{string::String, vec::Vec};
pub struct SubscriberAsync {
handle: InstanceHandle,
status_condition_address: ActorAddress<DcpsStatusCondition>,
participant: DomainParticipantAsync,
}
impl Clone for SubscriberAsync {
fn clone(&self) -> Self {
Self {
handle: self.handle,
status_condition_address: self.status_condition_address.clone(),
participant: self.participant.clone(),
}
}
}
impl SubscriberAsync {
pub(crate) fn new(
handle: InstanceHandle,
status_condition_address: ActorAddress<DcpsStatusCondition>,
participant: DomainParticipantAsync,
) -> Self {
Self {
handle,
status_condition_address,
participant,
}
}
pub(crate) fn participant_address(&self) -> &MpscSender<DcpsDomainParticipantMail> {
self.participant.participant_address()
}
}
impl SubscriberAsync {
#[tracing::instrument(skip(self, a_topic, a_listener))]
pub async fn create_datareader<Foo>(
&self,
a_topic: &TopicDescriptionAsync,
qos: QosKind<DataReaderQos>,
a_listener: Option<impl DataReaderListener<Foo> + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<DataReaderAsync<Foo>> {
let dcps_listener = a_listener.map(DcpsDataReaderListener::new);
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Subscriber(
SubscriberServiceMail::CreateDataReader {
subscriber_handle: self.handle,
topic_name: a_topic.get_name(),
qos,
dcps_listener,
mask: mask.to_vec(),
domain_participant_address: self.participant_address().clone(),
reply_sender,
},
))
.await?;
let (guid, reader_status_condition_address) = reply_receiver.await??;
Ok(DataReaderAsync::new(
guid,
reader_status_condition_address,
self.clone(),
a_topic.clone(),
))
}
#[tracing::instrument(skip(self, a_datareader))]
pub async fn delete_datareader<Foo>(
&self,
a_datareader: &DataReaderAsync<Foo>,
) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Subscriber(
SubscriberServiceMail::DeleteDataReader {
subscriber_handle: self.handle,
datareader_handle: a_datareader.get_instance_handle().await,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn lookup_datareader<Foo>(
&self,
topic_name: &str,
) -> DdsResult<Option<DataReaderAsync<Foo>>> {
if let Some(topic) = self.participant.lookup_topicdescription(topic_name).await? {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Subscriber(
SubscriberServiceMail::LookupDataReader {
subscriber_handle: self.handle,
topic_name: String::from(topic_name),
reply_sender,
},
))
.await?;
if let Some((reader_handle, reader_status_condition_address)) = reply_receiver.await?? {
Ok(Some(DataReaderAsync::new(
reader_handle,
reader_status_condition_address,
self.clone(),
topic,
)))
} else {
Ok(None)
}
} else {
Err(DdsError::BadParameter)
}
}
#[tracing::instrument(skip(self))]
pub async fn notify_datareaders(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub fn get_participant(&self) -> DomainParticipantAsync {
self.participant.clone()
}
#[tracing::instrument(skip(self))]
pub async fn get_sample_lost_status(&self) -> DdsResult<SampleLostStatus> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn delete_contained_entities(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn set_default_datareader_qos(&self, qos: QosKind<DataReaderQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Subscriber(
SubscriberServiceMail::SetDefaultDataReaderQos {
subscriber_handle: self.handle,
qos,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_default_datareader_qos(&self) -> DdsResult<DataReaderQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Subscriber(
SubscriberServiceMail::GetDefaultDataReaderQos {
subscriber_handle: self.handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument]
pub async fn copy_from_topic_qos(
_a_datareader_qos: &mut DataReaderQos,
_a_topic_qos: &TopicQos,
) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn set_qos(&self, qos: QosKind<SubscriberQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Subscriber(
SubscriberServiceMail::SetQos {
subscriber_handle: self.handle,
qos,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_qos(&self) -> DdsResult<SubscriberQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Subscriber(
SubscriberServiceMail::GetSubscriberQos {
subscriber_handle: self.handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self, a_listener))]
pub async fn set_listener(
&self,
a_listener: Option<impl SubscriberListener + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
let dcps_listener = a_listener.map(|l| DcpsSubscriberListener::new(l));
self.participant_address()
.send(DcpsDomainParticipantMail::Subscriber(
SubscriberServiceMail::SetListener {
subscriber_handle: self.handle,
dcps_listener,
mask: mask.to_vec(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub fn get_statuscondition(&self) -> StatusConditionAsync {
StatusConditionAsync::new(self.status_condition_address.clone())
}
#[tracing::instrument(skip(self))]
pub async fn get_status_changes(&self) -> DdsResult<Vec<StatusKind>> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn enable(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn get_instance_handle(&self) -> InstanceHandle {
self.handle
}
}