use super::{
publisher::PublisherAsync, publisher_listener::PublisherListener, subscriber::SubscriberAsync,
subscriber_listener::SubscriberListener, topic::TopicAsync, topic_listener::TopicListener,
};
use crate::{
builtin_topics::{ParticipantBuiltinTopicData, TopicBuiltinTopicData},
dcps::{
actor::ActorAddress,
channels::{mpsc::MpscSender, oneshot::oneshot},
domain_participant_mail::{DcpsDomainParticipantMail, ParticipantServiceMail},
listeners::{
domain_participant_listener::DcpsDomainParticipantListener,
publisher_listener::DcpsPublisherListener, subscriber_listener::DcpsSubscriberListener,
topic_listener::DcpsTopicListener,
},
status_condition::DcpsStatusCondition,
},
dds_async::{
content_filtered_topic::ContentFilteredTopicAsync,
domain_participant_listener::DomainParticipantListener,
topic_description::TopicDescriptionAsync,
},
infrastructure::{
domain::DomainId,
error::{DdsError, DdsResult},
instance::InstanceHandle,
qos::{DomainParticipantQos, PublisherQos, QosKind, SubscriberQos, TopicQos},
status::StatusKind,
time::Time,
type_support::TypeSupport,
},
xtypes::dynamic_type::DynamicType,
};
use alloc::{
string::{String, ToString},
sync::Arc,
vec::Vec,
};
pub struct DomainParticipantAsync {
participant_address: MpscSender<DcpsDomainParticipantMail>,
builtin_subscriber_status_condition_address: ActorAddress<DcpsStatusCondition>,
domain_id: DomainId,
handle: InstanceHandle,
}
impl Clone for DomainParticipantAsync {
fn clone(&self) -> Self {
Self {
participant_address: self.participant_address.clone(),
builtin_subscriber_status_condition_address: self
.builtin_subscriber_status_condition_address
.clone(),
domain_id: self.domain_id,
handle: self.handle,
}
}
}
impl DomainParticipantAsync {
pub(crate) fn new(
participant_address: MpscSender<DcpsDomainParticipantMail>,
builtin_subscriber_status_condition_address: ActorAddress<DcpsStatusCondition>,
domain_id: DomainId,
handle: InstanceHandle,
) -> Self {
Self {
participant_address,
builtin_subscriber_status_condition_address,
domain_id,
handle,
}
}
pub(crate) fn participant_address(&self) -> &MpscSender<DcpsDomainParticipantMail> {
&self.participant_address
}
}
impl DomainParticipantAsync {
#[tracing::instrument(skip(self, a_listener))]
pub async fn create_publisher(
&self,
qos: QosKind<PublisherQos>,
a_listener: Option<impl PublisherListener + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<PublisherAsync> {
let (reply_sender, reply_receiver) = oneshot();
let dcps_listener = a_listener.map(DcpsPublisherListener::new);
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::CreateUserDefinedPublisher {
qos,
dcps_listener,
mask: mask.to_vec(),
reply_sender,
},
))
.await?;
let guid = reply_receiver.await??;
let publisher = PublisherAsync::new(guid, self.clone());
Ok(publisher)
}
#[tracing::instrument(skip(self, a_publisher))]
pub async fn delete_publisher(&self, a_publisher: &PublisherAsync) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::DeleteUserDefinedPublisher {
participant_handle: a_publisher.get_participant().handle,
publisher_handle: a_publisher.get_instance_handle().await,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self, a_listener))]
pub async fn create_subscriber(
&self,
qos: QosKind<SubscriberQos>,
a_listener: Option<impl SubscriberListener + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<SubscriberAsync> {
let (reply_sender, reply_receiver) = oneshot();
let dcps_listener = a_listener.map(DcpsSubscriberListener::new);
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::CreateUserDefinedSubscriber {
qos,
dcps_listener,
mask: mask.to_vec(),
reply_sender,
},
))
.await?;
let (guid, subscriber_status_condition_address) = reply_receiver.await??;
let subscriber =
SubscriberAsync::new(guid, subscriber_status_condition_address, self.clone());
Ok(subscriber)
}
#[tracing::instrument(skip(self, a_subscriber))]
pub async fn delete_subscriber(&self, a_subscriber: &SubscriberAsync) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::DeleteUserDefinedSubscriber {
participant_handle: a_subscriber.get_participant().handle,
subscriber_handle: a_subscriber.get_instance_handle().await,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self, a_listener))]
pub async fn create_topic<Foo>(
&self,
topic_name: &str,
type_name: &str,
qos: QosKind<TopicQos>,
a_listener: Option<impl TopicListener + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<TopicDescriptionAsync>
where
Foo: TypeSupport,
{
let type_support = Arc::new(Foo::get_type());
self.create_dynamic_topic(topic_name, type_name, qos, a_listener, mask, type_support)
.await
}
#[doc(hidden)]
#[tracing::instrument(skip(self, a_listener, dynamic_type_representation))]
pub async fn create_dynamic_topic(
&self,
topic_name: &str,
type_name: &str,
qos: QosKind<TopicQos>,
a_listener: Option<impl TopicListener + Send + 'static>,
mask: &[StatusKind],
dynamic_type_representation: Arc<DynamicType>,
) -> DdsResult<TopicDescriptionAsync> {
let (reply_sender, reply_receiver) = oneshot();
let dcps_listener = a_listener.map(DcpsTopicListener::new);
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::CreateTopic {
topic_name: String::from(topic_name),
type_name: String::from(type_name),
qos,
dcps_listener,
mask: mask.to_vec(),
type_support: dynamic_type_representation,
participant_address: self.participant_address.clone(),
reply_sender,
},
))
.await?;
let (guid, topic_status_condition_address) = reply_receiver.await??;
Ok(TopicDescriptionAsync::Topic(TopicAsync::new(
guid,
topic_status_condition_address,
String::from(type_name),
String::from(topic_name),
self.clone(),
)))
}
#[tracing::instrument(skip(self, a_topic))]
pub async fn delete_topic(&self, a_topic: &TopicDescriptionAsync) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::DeleteUserDefinedTopic {
participant_handle: a_topic.get_participant().handle,
topic_name: a_topic.get_name(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self, related_topic))]
pub async fn create_contentfilteredtopic(
&self,
name: &str,
related_topic: &TopicDescriptionAsync,
filter_expression: String,
expression_parameters: Vec<String>,
) -> DdsResult<TopicDescriptionAsync> {
let topic = match related_topic {
TopicDescriptionAsync::Topic(t) => t.clone(),
TopicDescriptionAsync::ContentFilteredTopic(_) => return Err(DdsError::BadParameter),
};
let name = name.to_string();
let related_topic_name = related_topic.get_name();
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::CreateContentFilteredTopic {
participant_handle: topic.get_participant().handle,
name: name.clone(),
related_topic_name,
filter_expression,
expression_parameters,
reply_sender,
},
))
.await?;
reply_receiver.await??;
Ok(TopicDescriptionAsync::ContentFilteredTopic(
ContentFilteredTopicAsync::new(name.clone(), topic),
))
}
#[tracing::instrument(skip(self, _a_contentfilteredtopic))]
pub async fn delete_contentfilteredtopic(
&self,
_a_contentfilteredtopic: &TopicDescriptionAsync,
) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn find_topic<Foo>(&self, topic_name: &str) -> DdsResult<TopicDescriptionAsync>
where
Foo: TypeSupport,
{
let type_support = Arc::new(Foo::get_type());
let topic_name = String::from(topic_name);
let participant_address = self.participant_address.clone();
let participant_async = self.clone();
loop {
let (reply_sender, reply_receiver) = oneshot();
participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::FindTopic {
topic_name: topic_name.clone(),
type_support: type_support.clone(),
reply_sender,
},
))
.await?;
if let Some((guid, topic_status_condition_address, type_name)) =
reply_receiver.await??
{
return Ok(TopicDescriptionAsync::Topic(TopicAsync::new(
guid,
topic_status_condition_address,
type_name,
topic_name,
participant_async,
)));
}
}
}
#[tracing::instrument(skip(self))]
pub async fn lookup_topicdescription(
&self,
topic_name: &str,
) -> DdsResult<Option<TopicDescriptionAsync>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::LookupTopicdescription {
topic_name: String::from(topic_name),
reply_sender,
},
))
.await?;
if let Some((type_name, topic_handle, topic_status_condition_address)) =
reply_receiver.await??
{
Ok(Some(TopicDescriptionAsync::Topic(TopicAsync::new(
topic_handle,
topic_status_condition_address,
type_name,
String::from(topic_name),
self.clone(),
))))
} else {
Ok(None)
}
}
#[tracing::instrument(skip(self))]
pub fn get_builtin_subscriber(&self) -> SubscriberAsync {
SubscriberAsync::new(
self.handle,
self.builtin_subscriber_status_condition_address.clone(),
self.clone(),
)
}
#[tracing::instrument(skip(self))]
pub async fn ignore_participant(&self, handle: InstanceHandle) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::IgnoreParticipant {
handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn ignore_topic(&self, handle: InstanceHandle) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn ignore_publication(&self, handle: InstanceHandle) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::IgnorePublication {
handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn ignore_subscription(&self, handle: InstanceHandle) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::IgnoreSubscription {
handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub fn get_domain_id(&self) -> DomainId {
self.domain_id
}
#[tracing::instrument(skip(self))]
pub async fn delete_contained_entities(&self) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::DeleteContainedEntities { reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn assert_liveliness(&self) -> DdsResult<()> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn set_default_publisher_qos(&self, qos: QosKind<PublisherQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::SetDefaultPublisherQos { qos, reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_default_publisher_qos(&self) -> DdsResult<PublisherQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetDefaultPublisherQos { reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn set_default_subscriber_qos(&self, qos: QosKind<SubscriberQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::SetDefaultSubscriberQos { qos, reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_default_subscriber_qos(&self) -> DdsResult<SubscriberQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetDefaultSubscriberQos { reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn set_default_topic_qos(&self, qos: QosKind<TopicQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::SetDefaultTopicQos { qos, reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_default_topic_qos(&self) -> DdsResult<TopicQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetDefaultTopicQos { reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_discovered_participants(&self) -> DdsResult<Vec<InstanceHandle>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetDiscoveredParticipants { reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_discovered_participant_data(
&self,
participant_handle: InstanceHandle,
) -> DdsResult<ParticipantBuiltinTopicData> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetDiscoveredParticipantData {
participant_handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_discovered_topics(&self) -> DdsResult<Vec<InstanceHandle>> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetDiscoveredTopics { reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_discovered_topic_data(
&self,
topic_handle: InstanceHandle,
) -> DdsResult<TopicBuiltinTopicData> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetDiscoveredTopicData {
topic_handle,
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn contains_entity(&self, _a_handle: InstanceHandle) -> DdsResult<bool> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn get_current_time(&self) -> DdsResult<Time> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetCurrentTime { reply_sender },
))
.await?;
reply_receiver.await
}
}
impl DomainParticipantAsync {
#[tracing::instrument(skip(self))]
pub async fn set_qos(&self, qos: QosKind<DomainParticipantQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::SetQos {
qos,
participant_address: self.participant_address.clone(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_qos(&self) -> DdsResult<DomainParticipantQos> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::GetQos { reply_sender },
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self, a_listener))]
pub async fn set_listener(
&self,
a_listener: Option<impl DomainParticipantListener + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
let dcps_listener = a_listener.map(DcpsDomainParticipantListener::new);
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::SetListener {
dcps_listener,
status_kind: mask.to_vec(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_status_changes(&self) -> DdsResult<Vec<StatusKind>> {
todo!()
}
#[tracing::instrument(skip(self))]
pub async fn enable(&self) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.participant_address
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::Enable {
participant_address: self.participant_address.clone(),
reply_sender,
},
))
.await?;
reply_receiver.await?
}
#[tracing::instrument(skip(self))]
pub async fn get_instance_handle(&self) -> InstanceHandle {
self.handle
}
}