dust_dds 0.15.0

Data Distribution Service (DDS) implementation
Documentation
use super::domain_participant::DomainParticipantAsync;
use crate::{
    dcps::{
        channels::{
            mpsc::{MpscSender, mpsc_channel},
            oneshot::oneshot,
        },
        domain_participant_factory::DcpsParticipantFactory,
        domain_participant_factory_mail::DcpsParticipantFactoryMail,
        domain_participant_mail::{
            DcpsDomainParticipantMail, DiscoveryServiceMail, ParticipantServiceMail,
        },
        listeners::domain_participant_listener::DcpsDomainParticipantListener,
    },
    dds_async::{
        configuration::DustDdsConfiguration, domain_participant_listener::DomainParticipantListener,
    },
    infrastructure::{
        domain::DomainId,
        error::{DdsError, DdsResult},
        qos::{DomainParticipantFactoryQos, DomainParticipantQos, QosKind},
        status::StatusKind,
    },
    runtime::{DdsRuntime, Spawner},
    transport::interface::TransportParticipantFactory,
};
use alloc::string::String;

/// Async version of [`DomainParticipantFactory`](crate::domain::domain_participant_factory::DomainParticipantFactory).
/// Unlike the sync version, the [`DomainParticipantFactoryAsync`] is not a singleton and can be created by means of
/// a constructor by passing a DDS runtime. This allows the factory
/// to spin tasks on an existing runtime which can be shared with other things outside Dust DDS.
pub struct DomainParticipantFactoryAsync<R: DdsRuntime> {
    runtime: R,
    domain_participant_factory_sender: MpscSender<DcpsParticipantFactoryMail<R>>,
}

impl<R: DdsRuntime> DomainParticipantFactoryAsync<R> {
    /// Async version of [`create_participant`](crate::domain::domain_participant_factory::DomainParticipantFactory::create_participant).
    pub async fn create_participant(
        &self,
        domain_id: DomainId,
        qos: QosKind<DomainParticipantQos>,
        a_listener: Option<impl DomainParticipantListener + Send + 'static>,
        mask: &[StatusKind],
    ) -> DdsResult<DomainParticipantAsync> {
        let clock_handle = self.runtime.clock();
        let timer_handle = self.runtime.timer();
        let spawner_handle = self.runtime.spawner();
        let status_kind = mask.to_vec();
        let dcps_listener = a_listener.map(DcpsDomainParticipantListener::new);
        let (reply_sender, reply_receiver) = oneshot();
        self.domain_participant_factory_sender
            .send(DcpsParticipantFactoryMail::CreateParticipant {
                domain_id,
                qos,
                dcps_listener,
                status_kind,
                reply_sender,
                clock_handle: clock_handle.clone(),
                timer_handle: timer_handle.clone(),
                spawner_handle: spawner_handle.clone(),
            })
            .await?;

        let (participant_address, participant_handle, builtin_subscriber_status_condition_address) =
            reply_receiver.await??;

        let domain_participant = DomainParticipantAsync::new(
            participant_address.clone(),
            builtin_subscriber_status_condition_address,
            domain_id,
            participant_handle,
        );

        Ok(domain_participant)
    }

    /// Async version of [`delete_participant`](crate::domain::domain_participant_factory::DomainParticipantFactory::delete_participant).
    pub async fn delete_participant(&self, participant: &DomainParticipantAsync) -> DdsResult<()> {
        let (reply_sender, reply_receiver) = oneshot();
        participant
            .participant_address()
            .send(DcpsDomainParticipantMail::Participant(
                ParticipantServiceMail::IsEmpty { reply_sender },
            ))
            .await?;
        let is_participant_empty = reply_receiver.await?;
        if is_participant_empty {
            let (reply_sender, reply_receiver) = oneshot();
            let handle = participant.get_instance_handle().await;

            self.domain_participant_factory_sender
                .send(DcpsParticipantFactoryMail::DeleteParticipant {
                    handle,
                    reply_sender,
                })
                .await?;
            let deleted_participant = reply_receiver.await??;
            deleted_participant
                .send(DcpsDomainParticipantMail::Discovery(
                    DiscoveryServiceMail::AnnounceDeletedParticipant,
                ))
                .await
                .ok();
            Ok(())
        } else {
            Err(DdsError::PreconditionNotMet(String::from(
                "Domain participant still contains other entities",
            )))
        }
    }

    /// Async version of [`lookup_participant`](crate::domain::domain_participant_factory::DomainParticipantFactory::lookup_participant).
    pub async fn lookup_participant(
        &self,
        _domain_id: DomainId,
    ) -> DdsResult<Option<DomainParticipantAsync>> {
        todo!()
    }

    /// Async version of [`set_default_participant_qos`](crate::domain::domain_participant_factory::DomainParticipantFactory::set_default_participant_qos).
    pub async fn set_default_participant_qos(
        &self,
        qos: QosKind<DomainParticipantQos>,
    ) -> DdsResult<()> {
        let (reply_sender, reply_receiver) = oneshot();
        self.domain_participant_factory_sender
            .send(DcpsParticipantFactoryMail::SetDefaultParticipantQos { qos, reply_sender })
            .await?;
        reply_receiver.await?
    }

    /// Async version of [`get_default_participant_qos`](crate::domain::domain_participant_factory::DomainParticipantFactory::get_default_participant_qos).
    pub async fn get_default_participant_qos(&self) -> DdsResult<DomainParticipantQos> {
        let (reply_sender, reply_receiver) = oneshot();
        self.domain_participant_factory_sender
            .send(DcpsParticipantFactoryMail::GetDefaultParticipantQos { reply_sender })
            .await?;
        reply_receiver.await
    }

    /// Async version of [`set_qos`](crate::domain::domain_participant_factory::DomainParticipantFactory::set_qos).
    pub async fn set_qos(&self, qos: QosKind<DomainParticipantFactoryQos>) -> DdsResult<()> {
        let (reply_sender, reply_receiver) = oneshot();
        self.domain_participant_factory_sender
            .send(DcpsParticipantFactoryMail::SetQos { qos, reply_sender })
            .await?;
        reply_receiver.await?
    }

    /// Async version of [`get_qos`](crate::domain::domain_participant_factory::DomainParticipantFactory::get_qos).
    pub async fn get_qos(&self) -> DdsResult<DomainParticipantFactoryQos> {
        let (reply_sender, reply_receiver) = oneshot();
        self.domain_participant_factory_sender
            .send(DcpsParticipantFactoryMail::GetQos { reply_sender })
            .await?;
        reply_receiver.await
    }

    /// Async version of [`set_configuration`](crate::domain::domain_participant_factory::DomainParticipantFactory::set_configuration).
    pub async fn set_configuration(&self, configuration: DustDdsConfiguration) -> DdsResult<()> {
        self.domain_participant_factory_sender
            .send(DcpsParticipantFactoryMail::SetConfiguration { configuration })
            .await?;
        Ok(())
    }

    /// Async version of [`get_configuration`](crate::domain::domain_participant_factory::DomainParticipantFactory::get_configuration).
    pub async fn get_configuration(&self) -> DdsResult<DustDdsConfiguration> {
        let (reply_sender, reply_receiver) = oneshot();
        self.domain_participant_factory_sender
            .send(DcpsParticipantFactoryMail::GetConfiguration { reply_sender })
            .await?;
        reply_receiver.await
    }
}

impl<R: DdsRuntime> DomainParticipantFactoryAsync<R> {
    #[doc(hidden)]
    pub fn new<T: TransportParticipantFactory>(
        runtime: R,
        app_id: [u8; 4],
        host_id: [u8; 4],
        transport: T,
    ) -> DomainParticipantFactoryAsync<R> {
        let mut domain_participant_factory =
            DcpsParticipantFactory::new(app_id, host_id, transport);
        let (domain_participant_factory_sender, mailbox_recv) = mpsc_channel();
        runtime.spawner().spawn(async move {
            while let Some(m) = mailbox_recv.receive().await {
                domain_participant_factory.handle(m).await;
            }
        });

        DomainParticipantFactoryAsync {
            runtime,
            domain_participant_factory_sender,
        }
    }
}

#[cfg(feature = "std")]
impl DomainParticipantFactoryAsync<crate::std_runtime::StdRuntime> {
    /// This operation returns the [`DomainParticipantFactoryAsync`] singleton. The operation is idempotent, that is, it can be called multiple
    /// times without side-effects and it will return the same [`DomainParticipantFactoryAsync`] instance.
    #[tracing::instrument]
    pub fn get_instance() -> &'static DomainParticipantFactoryAsync<crate::std_runtime::StdRuntime>
    {
        use core::net::IpAddr;
        use network_interface::{Addr, NetworkInterface, NetworkInterfaceConfig};
        use std::sync::OnceLock;
        use tracing::warn;

        static PARTICIPANT_FACTORY_ASYNC: OnceLock<
            DomainParticipantFactoryAsync<crate::std_runtime::StdRuntime>,
        > = OnceLock::new();
        PARTICIPANT_FACTORY_ASYNC.get_or_init(|| {
            let executor = crate::std_runtime::executor::Executor::new();
            let timer_driver = crate::std_runtime::timer::TimerDriver::new();
            let runtime = crate::std_runtime::StdRuntime::new(executor, timer_driver);
            let interface_address = NetworkInterface::show()
                .expect("Could not scan interfaces")
                .into_iter()
                .flat_map(|i| {
                    i.addr
                        .into_iter()
                        .filter(|a| matches!(a, Addr::V4(v4) if !v4.ip.is_loopback()))
                })
                .next();
            let host_id = if let Some(interface) = interface_address {
                match interface.ip() {
                    IpAddr::V4(a) => a.octets(),
                    IpAddr::V6(_) => unimplemented!("IPv6 not yet implemented"),
                }
            } else {
                warn!("Failed to get Host ID from IP address, use 0 instead");
                [0; 4]
            };

            let app_id = std::process::id().to_ne_bytes();
            let transport = crate::rtps_udp_transport::udp_transport::RtpsUdpTransportParticipantFactory::default();
            DomainParticipantFactoryAsync::new(
                runtime,
                app_id,
                host_id,
                transport,
            )
        })
    }
}