use super::actor::MailHandler;
use crate::{
dcps::{
actor::ActorAddress,
channels::{
mpsc::{MpscSender, mpsc_channel},
oneshot::oneshot,
},
domain_participant::DcpsDomainParticipant,
domain_participant_mail::{
DcpsDomainParticipantMail, DiscoveryServiceMail, MessageServiceMail,
ParticipantServiceMail,
},
listeners::domain_participant_listener::DcpsDomainParticipantListener,
status_condition::DcpsStatusCondition,
},
dds_async::configuration::DustDdsConfiguration,
infrastructure::{
domain::DomainId,
error::{DdsError, DdsResult},
instance::InstanceHandle,
qos::{DomainParticipantFactoryQos, DomainParticipantQos, QosKind},
status::StatusKind,
},
runtime::{DdsRuntime, Either, Spawner, Timer, select_future},
transport::{interface::TransportParticipantFactory, types::GuidPrefix},
};
use alloc::{borrow::ToOwned, string::String, vec::Vec};
pub struct DcpsParticipantFactory<T> {
domain_participant_list: Vec<(InstanceHandle, MpscSender<DcpsDomainParticipantMail>)>,
qos: DomainParticipantFactoryQos,
default_participant_qos: DomainParticipantQos,
configuration: DustDdsConfiguration,
transport: T,
entity_counter: u32,
app_id: [u8; 4],
host_id: [u8; 4],
}
impl<T: TransportParticipantFactory> DcpsParticipantFactory<T> {
pub fn new(app_id: [u8; 4], host_id: [u8; 4], transport: T) -> Self {
Self {
domain_participant_list: Default::default(),
qos: Default::default(),
default_participant_qos: Default::default(),
configuration: Default::default(),
transport,
entity_counter: 0,
app_id,
host_id,
}
}
fn get_unique_participant_id(&mut self) -> u32 {
let id = self.entity_counter;
self.entity_counter += 1;
id
}
fn create_new_guid_prefix(&mut self) -> GuidPrefix {
let instance_id = self.get_unique_participant_id().to_ne_bytes();
[
self.host_id[0],
self.host_id[1],
self.host_id[2],
self.host_id[3], self.app_id[0],
self.app_id[1],
self.app_id[2],
self.app_id[3], instance_id[0],
instance_id[1],
instance_id[2],
instance_id[3], ]
}
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
pub async fn create_participant<R: DdsRuntime>(
&mut self,
domain_id: DomainId,
qos: QosKind<DomainParticipantQos>,
dcps_listener: Option<DcpsDomainParticipantListener>,
status_kind: Vec<StatusKind>,
clock_handle: R::ClockHandle,
mut timer_handle: R::TimerHandle,
spawner_handle: R::SpawnerHandle,
) -> DdsResult<(
MpscSender<DcpsDomainParticipantMail>,
InstanceHandle,
ActorAddress<DcpsStatusCondition>,
)> {
let domain_participant_qos = match qos {
QosKind::Default => self.default_participant_qos.clone(),
QosKind::Specific(q) => q,
};
let (participant_sender, participant_receiver) = mpsc_channel();
let (data_channel_sender, data_channel_receiver) = mpsc_channel();
let guid_prefix = self.create_new_guid_prefix();
let transport = self
.transport
.create_participant(domain_id, data_channel_sender)
.await;
let listener_sender = dcps_listener.map(|l| l.spawn::<R>(&spawner_handle));
let mut dcps_participant: DcpsDomainParticipant<R> = DcpsDomainParticipant::new(
domain_id,
self.configuration.domain_tag().to_owned(),
guid_prefix,
domain_participant_qos,
listener_sender,
status_kind,
transport,
participant_sender.clone(),
clock_handle,
timer_handle.clone(),
spawner_handle.clone(),
);
let participant_instance_handle = dcps_participant.get_instance_handle();
let builtin_subscriber_status_condition_address = dcps_participant
.get_builtin_subscriber_status_condition()
.address();
spawner_handle.spawn(async move {
loop {
match select_future(
participant_receiver.receive(),
data_channel_receiver.receive(),
)
.await
{
Either::A(dcps_domain_participant_mail) => {
if let Some(dcps_domain_participant_mail) = dcps_domain_participant_mail {
dcps_participant.handle(dcps_domain_participant_mail).await
}
}
Either::B(data_message) => {
if let Some(data_message) = data_message {
dcps_participant.handle_data(data_message).await
}
}
}
}
});
let participant_address = participant_sender.clone();
let mut timer_handle_clone = timer_handle.clone();
let participant_announcement_interval =
self.configuration.participant_announcement_interval();
spawner_handle.spawn(async move {
while participant_address
.send(DcpsDomainParticipantMail::Discovery(
DiscoveryServiceMail::AnnounceParticipant {
participant_address: participant_address.clone(),
},
))
.await
.is_ok()
{
timer_handle_clone
.delay(participant_announcement_interval)
.await;
}
});
let participant_address = participant_sender.clone();
spawner_handle.spawn(async move {
while participant_address
.send(DcpsDomainParticipantMail::Message(MessageServiceMail::Poke))
.await
.is_ok()
{
timer_handle
.delay(core::time::Duration::from_millis(50))
.await;
}
});
if self.qos.entity_factory.autoenable_created_entities {
let (reply_sender, _reply_receiver) = oneshot();
let participant_address = participant_sender.clone();
participant_sender
.send(DcpsDomainParticipantMail::Participant(
ParticipantServiceMail::Enable {
participant_address,
reply_sender,
},
))
.await
.ok();
}
let participant_address = participant_sender.clone();
self.domain_participant_list
.push((participant_instance_handle, participant_sender));
Ok((
participant_address,
participant_instance_handle,
builtin_subscriber_status_condition_address,
))
}
pub fn delete_participant(
&mut self,
handle: InstanceHandle,
) -> DdsResult<MpscSender<DcpsDomainParticipantMail>> {
let index = self
.domain_participant_list
.iter()
.position(|(h, _)| h == &handle)
.ok_or(DdsError::PreconditionNotMet(String::from(
"Participant can only be deleted from its parent domain participant factory",
)))?;
let (_, participant) = self.domain_participant_list.remove(index);
Ok(participant)
}
pub fn set_default_participant_qos(
&mut self,
qos: QosKind<DomainParticipantQos>,
) -> DdsResult<()> {
let qos = match qos {
QosKind::Default => DomainParticipantQos::default(),
QosKind::Specific(q) => q,
};
self.default_participant_qos = qos;
Ok(())
}
pub fn get_default_participant_qos(&mut self) -> DomainParticipantQos {
self.default_participant_qos.clone()
}
pub fn set_qos(&mut self, qos: QosKind<DomainParticipantFactoryQos>) -> DdsResult<()> {
let qos = match qos {
QosKind::Default => DomainParticipantFactoryQos::default(),
QosKind::Specific(q) => q,
};
self.qos = qos;
Ok(())
}
pub fn get_qos(&mut self) -> DomainParticipantFactoryQos {
self.qos.clone()
}
pub fn set_configuration(&mut self, configuration: DustDdsConfiguration) {
self.configuration = configuration;
}
pub fn get_configuration(&mut self) -> DustDdsConfiguration {
self.configuration.clone()
}
}