use super::domain_participant::DomainParticipantAsync;
use crate::{
dcps::{
channels::{
mpsc::{MpscSender, mpsc_channel},
oneshot::oneshot,
},
domain_participant_factory::DcpsParticipantFactory,
domain_participant_factory_mail::DcpsParticipantFactoryMail,
domain_participant_mail::{
DcpsDomainParticipantMail, DiscoveryServiceMail, ParticipantServiceMail,
},
listeners::domain_participant_listener::DcpsDomainParticipantListener,
},
dds_async::{
configuration::DustDdsConfiguration, domain_participant_listener::DomainParticipantListener,
},
infrastructure::{
domain::DomainId,
error::{DdsError, DdsResult},
qos::{DomainParticipantFactoryQos, DomainParticipantQos, QosKind},
status::StatusKind,
},
runtime::{DdsRuntime, Spawner},
transport::interface::TransportParticipantFactory,
};
use alloc::string::String;
pub struct DomainParticipantFactoryAsync<R: DdsRuntime> {
runtime: R,
domain_participant_factory_sender: MpscSender<DcpsParticipantFactoryMail<R>>,
}
impl<R: DdsRuntime> DomainParticipantFactoryAsync<R> {
pub async fn create_participant(
&self,
domain_id: DomainId,
qos: QosKind<DomainParticipantQos>,
a_listener: Option<impl DomainParticipantListener + Send + 'static>,
mask: &[StatusKind],
) -> DdsResult<DomainParticipantAsync> {
let clock_handle = self.runtime.clock();
let timer_handle = self.runtime.timer();
let spawner_handle = self.runtime.spawner();
let status_kind = mask.to_vec();
let dcps_listener = a_listener.map(DcpsDomainParticipantListener::new);
let (reply_sender, reply_receiver) = oneshot();
self.domain_participant_factory_sender
.send(DcpsParticipantFactoryMail::CreateParticipant {
domain_id,
qos,
dcps_listener,
status_kind,
reply_sender,
clock_handle: clock_handle.clone(),
timer_handle: timer_handle.clone(),
spawner_handle: spawner_handle.clone(),
})
.await?;
let (participant_address, participant_handle, builtin_subscriber_status_condition_address) =
reply_receiver.await??;
let domain_participant = DomainParticipantAsync::new(
participant_address.clone(),
builtin_subscriber_status_condition_address,
domain_id,
participant_handle,
);
Ok(domain_participant)
}
pub async fn delete_participant(&self, participant: &DomainParticipantAsync) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
participant
.participant_address()
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::IsEmpty { reply_sender },
))
.await?;
let is_participant_empty = reply_receiver.await?;
if is_participant_empty {
let (reply_sender, reply_receiver) = oneshot();
let handle = participant.get_instance_handle().await;
self.domain_participant_factory_sender
.send(DcpsParticipantFactoryMail::DeleteParticipant {
handle,
reply_sender,
})
.await?;
let deleted_participant = reply_receiver.await??;
deleted_participant
.send(DcpsDomainParticipantMail::Discovery(
DiscoveryServiceMail::AnnounceDeletedParticipant,
))
.await
.ok();
Ok(())
} else {
Err(DdsError::PreconditionNotMet(String::from(
"Domain participant still contains other entities",
)))
}
}
pub async fn lookup_participant(
&self,
_domain_id: DomainId,
) -> DdsResult<Option<DomainParticipantAsync>> {
todo!()
}
pub async fn set_default_participant_qos(
&self,
qos: QosKind<DomainParticipantQos>,
) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.domain_participant_factory_sender
.send(DcpsParticipantFactoryMail::SetDefaultParticipantQos { qos, reply_sender })
.await?;
reply_receiver.await?
}
pub async fn get_default_participant_qos(&self) -> DdsResult<DomainParticipantQos> {
let (reply_sender, reply_receiver) = oneshot();
self.domain_participant_factory_sender
.send(DcpsParticipantFactoryMail::GetDefaultParticipantQos { reply_sender })
.await?;
reply_receiver.await
}
pub async fn set_qos(&self, qos: QosKind<DomainParticipantFactoryQos>) -> DdsResult<()> {
let (reply_sender, reply_receiver) = oneshot();
self.domain_participant_factory_sender
.send(DcpsParticipantFactoryMail::SetQos { qos, reply_sender })
.await?;
reply_receiver.await?
}
pub async fn get_qos(&self) -> DdsResult<DomainParticipantFactoryQos> {
let (reply_sender, reply_receiver) = oneshot();
self.domain_participant_factory_sender
.send(DcpsParticipantFactoryMail::GetQos { reply_sender })
.await?;
reply_receiver.await
}
pub async fn set_configuration(&self, configuration: DustDdsConfiguration) -> DdsResult<()> {
self.domain_participant_factory_sender
.send(DcpsParticipantFactoryMail::SetConfiguration { configuration })
.await?;
Ok(())
}
pub async fn get_configuration(&self) -> DdsResult<DustDdsConfiguration> {
let (reply_sender, reply_receiver) = oneshot();
self.domain_participant_factory_sender
.send(DcpsParticipantFactoryMail::GetConfiguration { reply_sender })
.await?;
reply_receiver.await
}
}
impl<R: DdsRuntime> DomainParticipantFactoryAsync<R> {
#[doc(hidden)]
pub fn new<T: TransportParticipantFactory>(
runtime: R,
app_id: [u8; 4],
host_id: [u8; 4],
transport: T,
) -> DomainParticipantFactoryAsync<R> {
let mut domain_participant_factory =
DcpsParticipantFactory::new(app_id, host_id, transport);
let (domain_participant_factory_sender, mailbox_recv) = mpsc_channel();
runtime.spawner().spawn(async move {
while let Some(m) = mailbox_recv.receive().await {
domain_participant_factory.handle(m).await;
}
});
DomainParticipantFactoryAsync {
runtime,
domain_participant_factory_sender,
}
}
}
#[cfg(feature = "std")]
impl DomainParticipantFactoryAsync<crate::std_runtime::StdRuntime> {
#[tracing::instrument]
pub fn get_instance() -> &'static DomainParticipantFactoryAsync<crate::std_runtime::StdRuntime>
{
use core::net::IpAddr;
use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig};
use std::sync::OnceLock;
use tracing::warn;
static PARTICIPANT_FACTORY_ASYNC: OnceLock<
DomainParticipantFactoryAsync<crate::std_runtime::StdRuntime>,
> = OnceLock::new();
PARTICIPANT_FACTORY_ASYNC.get_or_init(|| {
let executor = crate::std_runtime::executor::Executor::new();
let timer_driver = crate::std_runtime::timer::TimerDriver::new();
let runtime = crate::std_runtime::StdRuntime::new(executor, timer_driver);
let interface_address = NetworkInterface::show()
.expect("Could not scan interfaces")
.into_iter()
.flat_map(|i| {
i.addr
.into_iter()
.filter(|a| matches!(a, Addr::V4(v4) if !v4.ip.is_loopback()))
})
.next();
let host_id = if let Some(interface) = interface_address {
match interface.ip() {
IpAddr::V4(a) => a.octets(),
IpAddr::V6(_) => unimplemented!("IPv6 not yet implemented"),
}
} else {
warn!("Failed to get Host ID from IP address, use 0 instead");
[0; 4]
};
let app_id = std::process::id().to_ne_bytes();
let transport = crate::rtps_udp_transport::udp_transport::RtpsUdpTransportParticipantFactory::default();
DomainParticipantFactoryAsync::new(
runtime,
app_id,
host_id,
transport,
)
})
}
}