use super::{
data_writer::DataWriterAsync, data_writer_listener::DataWriterListener,
domain_participant::DomainParticipantAsync, publisher_listener::PublisherListener,
};
use crate::{
dcps::{
channels::{mpsc::MpscSender, oneshot::oneshot},
domain_participant_mail::{DcpsDomainParticipantMail, PublisherServiceMail},
listeners::{
data_writer_listener::DcpsDataWriterListener, publisher_listener::DcpsPublisherListener,
},
},
dds_async::topic_description::TopicDescriptionAsync,
infrastructure::{
error::DdsResult,
instance::InstanceHandle,
qos::{DataWriterQos, PublisherQos, QosKind, TopicQos},
status::StatusKind,
time::Duration,
},
};
use alloc::vec::Vec;
pub struct PublisherAsync {
handle: InstanceHandle,
participant: DomainParticipantAsync,
}
impl Clone for PublisherAsync {
fn clone(&self) -> Self {
Self {
handle: self.handle,
participant: self.participant.clone(),
}
}
}
impl PublisherAsync {
pub(crate) fn new(handle: InstanceHandle, participant: DomainParticipantAsync) -> Self {
Self {
handle,
participant,
}
}
pub(crate) fn participant_address(&self) -> &MpscSender<DcpsDomainParticipantMail> {
self.participant.participant_address()
}
}
impl PublisherAsync {
#[tracing::instrument(skip(self, a_topic, a_listener))]
pub async fn create_datawriter<Foo>(
&self,
a_topic: &TopicDescriptionAsync,
qos: QosKind<DataWriterQos>,
a_listener: Option<impl DataWriterListener<Foo> + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<DataWriterAsync<Foo>> {
let topic_name = a_topic.get_name();
let dcps_listener = a_listener.map(DcpsDataWriterListener::new);
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Publisher(
PublisherServiceMail::CreateDataWriter {
publisher_handle: self.handle,
topic_name,
qos,
dcps_listener,
mask: mask.to_vec(),
participant_address: self.participant_address().clone(),
reply_sender,
},
))
.await?;
let (guid, writer_status_condition_address) = reply_receiver.await??;
Ok(DataWriterAsync::new(
guid,
writer_status_condition_address,
self.clone(),
a_topic.clone(),
))
}
#[tracing::instrument(skip(self, a_datawriter))]
pub async fn delete_datawriter<Foo>(
&self,
a_datawriter: &DataWriterAsync<Foo>,
) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Publisher(
PublisherServiceMail::DeleteDataWriter {
publisher_handle: self.handle,
datawriter_handle: a_datawriter.get_instance_handle().await,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn lookup_datawriter<Foo>(
&self,
topic_name: &str,
) -> DdsResult<Option<DataWriterAsync<Foo>>> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn suspend_publications(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn resume_publications(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn begin_coherent_changes(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn end_coherent_changes(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn wait_for_acknowledgments(&self, _max_wait: Duration) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub fn get_participant(&self) -> DomainParticipantAsync {
self.participant.clone()
}
#[tracing::instrument(skip(self))]
pub async fn delete_contained_entities(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn set_default_datawriter_qos(&self, qos: QosKind<DataWriterQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Publisher(
PublisherServiceMail::SetDefaultDataWriterQos {
publisher_handle: self.handle,
qos,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_default_datawriter_qos(&self) -> DdsResult<DataWriterQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Publisher(
PublisherServiceMail::GetDefaultDataWriterQos {
publisher_handle: self.handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn copy_from_topic_qos(
&self,
_a_datawriter_qos: &mut DataWriterQos,
_a_topic_qos: &TopicQos,
) -> DdsResult<()> {
todo!()
}
}
impl PublisherAsync {
#[tracing::instrument(skip(self))]
pub async fn set_qos(&self, qos: QosKind<PublisherQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Publisher(
PublisherServiceMail::SetPublisherQos {
publisher_handle: self.handle,
qos,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_qos(&self) -> DdsResult<PublisherQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address()
.send(DcpsDomainParticipantMail::Publisher(
PublisherServiceMail::GetPublisherQos {
publisher_handle: self.handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self, a_listener))]
pub async fn set_listener(
&self,
a_listener: Option<impl PublisherListener + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
let dcps_listener = a_listener.map(DcpsPublisherListener::new);
self.participant_address()
.send(DcpsDomainParticipantMail::Publisher(
PublisherServiceMail::SetPublisherListener {
publisher_handle: self.handle,
dcps_listener,
mask: mask.to_vec(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_status_changes(&self) -> DdsResult<Vec<StatusKind>> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn enable(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn get_instance_handle(&self) -> InstanceHandle {
self.handle
}
}