use std::{
collections::HashMap,
io,
io::ErrorKind,
net::Ipv4Addr,
pin::Pin,
sync::{atomic, Arc, Mutex, RwLock, Weak},
task::{Context, Poll},
thread,
thread::JoinHandle,
time::{Duration, Instant},
};
use mio_extras::channel as mio_channel;
use mio_06::{self, Evented};
use mio_08::{Interest, Registry};
use futures::stream::{FusedStream, Stream};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
create_error_out_of_resources, create_error_poisoned,
dds::{
pubsub::*,
qos::*,
result::*,
statusevents::{
sync_status_channel, DomainParticipantStatusEvent, StatusChannelReceiver, StatusChannelSender,
},
topic::*,
typedesc::TypeDesc,
},
discovery::{
discovery::{Discovery, DiscoveryCommand},
discovery_db::DiscoveryDB,
sedp_messages::DiscoveredTopicData,
},
network::{constant::*, udp_listener::UDPListener},
rtps::{
constant::*,
dp_event_loop::{DPEventLoop, DomainInfo, EventLoopCommand},
reader::*,
writer::WriterIngredients,
},
structure::{dds_cache::DDSCache, entity::RTPSEntity, guid::*, locator::Locator},
StatusEvented,
};
#[cfg(feature = "security")]
use crate::{
create_error_internal, create_error_not_allowed_by_security,
security::{
self,
config::DomainParticipantSecurityConfigFiles,
security_plugins::{SecurityPlugins, SecurityPluginsHandle},
AccessControl, Authentication, Cryptographic,
},
};
#[cfg(not(feature = "security"))]
use crate::no_security::SecurityPluginsHandle;
pub struct DomainParticipantBuilder {
domain_id: u16,
#[allow(dead_code)] only_networks: Option<Vec<String>>, #[cfg(feature = "security")]
security_plugins: Option<SecurityPlugins>,
#[cfg(feature = "security")]
sec_properties: Option<policy::Property>, }
impl DomainParticipantBuilder {
pub fn new(domain_id: u16) -> DomainParticipantBuilder {
DomainParticipantBuilder {
domain_id,
only_networks: None,
#[cfg(feature = "security")]
security_plugins: None,
#[cfg(feature = "security")]
sec_properties: None,
}
}
#[cfg(feature = "security")]
pub fn security(
&mut self,
auth: Box<impl Authentication + 'static>,
access: Box<impl AccessControl + 'static>,
crypto: Box<impl Cryptographic + 'static>,
sec_properties: policy::Property,
) -> &mut DomainParticipantBuilder {
self.security_plugins = Some(SecurityPlugins::new(auth, access, crypto));
self.sec_properties = Some(sec_properties);
self
}
#[cfg(feature = "security")]
pub fn builtin_security(mut self, configs: DomainParticipantSecurityConfigFiles) -> Self {
let auth = Box::new(security::AuthenticationBuiltin::new());
let access = Box::new(security::AccessControlBuiltin::new());
let crypto = Box::new(security::CryptographicBuiltin::new());
self.security(auth, access, crypto, configs.into_property_policy());
self
}
pub fn build(#[allow(unused_mut)] mut self) -> CreateResult<DomainParticipant> {
let participant_qos = QosPolicies {
#[cfg(feature = "security")]
property: self.sec_properties,
..Default::default()
};
let candidate_participant_guid = GUID::new_participant_guid();
#[cfg(not(feature = "security"))]
let participant_guid = candidate_participant_guid;
#[cfg(feature = "security")]
let participant_guid = if let Some(ref mut security_plugins) = self.security_plugins.as_mut() {
trace!("DomainParticipant security construction start");
let sec_guid = match security_plugins.validate_local_identity(
self.domain_id,
&participant_qos,
candidate_participant_guid,
) {
Ok(guid) => guid,
Err(e) => {
return create_error_not_allowed_by_security!(
"Validating local identity failed: {}",
e.msg
);
}
};
if let Err(e) = security_plugins.validate_local_permissions(
self.domain_id,
sec_guid.prefix,
&participant_qos,
) {
return create_error_not_allowed_by_security!(
"Validating local permissions failed: {}",
e.msg
);
}
match security_plugins.check_create_participant(
self.domain_id,
sec_guid.prefix,
&participant_qos,
) {
Ok(check_passed) => {
if !check_passed {
return create_error_not_allowed_by_security!(
"Access control does not allow to create the local participant",
);
}
}
Err(e) => {
return create_error_internal!(
"Something went wrong in checking local participant permissions: {}",
e
);
}
}
if let Err(e) = security_plugins
.get_participant_sec_attributes(sec_guid.prefix)
.and_then(|sec_attr| {
security_plugins.register_local_participant(
sec_guid.prefix,
participant_qos.property.clone(),
sec_attr,
)
})
{
return create_error_internal!(
"Could not register participant with crypto plugin {}",
e.msg
);
};
sec_guid
} else {
candidate_participant_guid
};
trace!("DomainParticipant construct start");
let (djh_sender, djh_receiver) = mio_channel::channel();
let (spdp_liveness_sender, spdp_liveness_receiver) = mio_channel::sync_channel(8);
let (discovery_updated_sender, discovery_update_notification_receiver) =
mio_channel::sync_channel::<DiscoveryNotificationType>(32);
let (discovery_command_sender, discovery_command_receiver) =
mio_channel::sync_channel::<DiscoveryCommand>(64);
let (status_sender, status_receiver) = sync_status_channel(16)?;
#[cfg(not(feature = "security"))]
let security_plugins_handle = None;
#[cfg(feature = "security")]
let security_plugins_handle = self.security_plugins.map(SecurityPluginsHandle::new);
let dp = DomainParticipantDisc::new(
self.domain_id,
participant_guid,
participant_qos,
djh_receiver,
discovery_update_notification_receiver,
discovery_command_sender,
spdp_liveness_sender,
status_sender.clone(),
status_receiver,
security_plugins_handle.clone(),
)?;
let self_locators = dp.self_locators();
let dp = DomainParticipant {
dpi: Arc::new(Mutex::new(dp)),
};
let (discovery_started_sender, discovery_started_receiver) = std::sync::mpsc::channel();
let dp_clone = dp.weak_clone();
let disc_db_clone = dp.discovery_db();
let discovery_handle = thread::Builder::new()
.name("RustDDS discovery thread".to_string())
.spawn(move || {
if let Ok(mut discovery) = Discovery::new(
dp_clone,
disc_db_clone,
discovery_started_sender,
discovery_updated_sender,
discovery_command_receiver,
spdp_liveness_receiver,
self_locators,
status_sender,
security_plugins_handle,
) {
discovery.discovery_event_loop(); }
})?;
djh_sender.send(discovery_handle).unwrap_or(()); debug!("Waiting for discovery to start"); match discovery_started_receiver.recv_timeout(Duration::from_secs(10)) {
Ok(Ok(())) => {
info!("Discovery started. Participant constructed.");
Ok(dp)
}
Ok(Err(e)) => {
std::mem::drop(dp);
create_error_poisoned!("Failed to start discovery thread: {e:?}")
}
Err(e) => create_error_poisoned!("Discovery thread channel error: {e:?}"),
}
}
}
#[derive(Clone)]
pub struct DomainParticipant {
dpi: Arc<Mutex<DomainParticipantDisc>>,
}
impl DomainParticipant {
pub fn new(domain_id: u16) -> CreateResult<Self> {
let dp_builder = DomainParticipantBuilder::new(domain_id);
dp_builder.build()
}
pub fn create_publisher(&self, qos: &QosPolicies) -> CreateResult<Publisher> {
let w = self.weak_clone(); self.dpi.lock()?.create_publisher(&w, qos)
}
pub fn create_subscriber(&self, qos: &QosPolicies) -> CreateResult<Subscriber> {
let w = self.weak_clone(); self.dpi.lock()?.create_subscriber(&w, qos)
}
pub fn create_topic(
&self,
name: String,
type_desc: String,
qos: &QosPolicies,
topic_kind: TopicKind,
) -> CreateResult<Topic> {
let w = self.weak_clone();
self
.dpi
.lock()?
.create_topic(&w, name, type_desc, qos, topic_kind)
}
pub fn find_topic(&self, name: &str, timeout: Duration) -> CreateResult<Option<Topic>> {
let w = self.weak_clone();
self.dpi.lock()?.find_topic(&w, name, timeout)
}
pub fn domain_id(&self) -> u16 {
self.dpi.lock().unwrap().domain_id()
}
pub fn participant_id(&self) -> u16 {
self.dpi.lock().unwrap().participant_id()
}
pub fn discovered_topics(&self) -> Vec<DiscoveredTopicData> {
self.dpi.lock().unwrap().discovered_topics()
}
pub fn assert_liveliness(&self) -> WriteResult<(), ()> {
self.dpi.lock()?.assert_liveliness()
}
pub fn status_listener(&self) -> DomainParticipantStatusListener {
DomainParticipantStatusListener {
dp_disc: Arc::clone(&self.dpi),
}
}
pub(crate) fn weak_clone(&self) -> DomainParticipantWeak {
DomainParticipantWeak::new(self)
}
pub(crate) fn dds_cache(&self) -> Arc<RwLock<DDSCache>> {
self.dpi.lock().unwrap().dds_cache()
}
#[cfg(feature = "security")] pub(crate) fn qos(&self) -> QosPolicies {
self.dpi.lock().unwrap().qos()
}
pub(crate) fn discovery_db(&self) -> Arc<RwLock<DiscoveryDB>> {
self.dpi.lock().unwrap().dpi.discovery_db.clone()
}
pub(crate) fn new_entity_id(&self, entity_kind: EntityKind) -> EntityId {
self.dpi.lock().unwrap().new_entity_id(entity_kind)
}
pub(crate) fn self_locators(&self) -> HashMap<mio_06::Token, Vec<Locator>> {
self.dpi.lock().unwrap().self_locators()
}
} pub struct DomainParticipantStatusListener {
dp_disc: Arc<Mutex<DomainParticipantDisc>>,
}
impl DomainParticipantStatusListener {}
impl<'a> StatusEvented<'a, DomainParticipantStatusEvent, DomainParticipantStatusStream<'a>>
for DomainParticipantStatusListener
{
fn as_status_evented(&mut self) -> &dyn Evented {
self
}
fn as_status_source(&mut self) -> &mut dyn mio_08::event::Source {
self
}
fn as_async_status_stream(&'a self) -> DomainParticipantStatusStream<'a> {
DomainParticipantStatusStream {
status_listener: self,
}
}
fn try_recv_status(&self) -> Option<DomainParticipantStatusEvent> {
self
.dp_disc
.lock()
.unwrap()
.status_channel_receiver()
.try_recv_status()
}
}
impl mio_08::event::Source for DomainParticipantStatusListener {
fn register(
&mut self,
registry: &Registry,
token: mio_08::Token,
interests: Interest,
) -> io::Result<()> {
self
.dp_disc
.lock()
.unwrap()
.status_channel_receiver_mut()
.register(registry, token, interests)
}
fn reregister(
&mut self,
registry: &Registry,
token: mio_08::Token,
interests: Interest,
) -> io::Result<()> {
self
.dp_disc
.lock()
.unwrap()
.status_channel_receiver_mut()
.reregister(registry, token, interests)
}
fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
self
.dp_disc
.lock()
.unwrap()
.status_channel_receiver_mut()
.deregister(registry)
}
}
impl mio_06::Evented for DomainParticipantStatusListener {
fn register(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.dp_disc
.lock()
.unwrap()
.status_channel_receiver_mut()
.as_status_evented()
.register(poll, token, interest, opts)
}
fn reregister(
&self,
poll: &mio_06::Poll,
token: mio_06::Token,
interest: mio_06::Ready,
opts: mio_06::PollOpt,
) -> io::Result<()> {
self
.dp_disc
.lock()
.unwrap()
.status_channel_receiver_mut()
.as_status_evented()
.reregister(poll, token, interest, opts)
}
fn deregister(&self, poll: &mio_06::Poll) -> io::Result<()> {
self
.dp_disc
.lock()
.unwrap()
.status_channel_receiver_mut()
.as_status_evented()
.deregister(poll)
}
}
pub struct DomainParticipantStatusStream<'a> {
status_listener: &'a DomainParticipantStatusListener,
}
impl<'a> Stream for DomainParticipantStatusStream<'a> {
type Item = DomainParticipantStatusEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let dp_lock = self.status_listener.dp_disc.lock().unwrap();
let mut w = dp_lock.status_channel_receiver().get_waker_update_lock();
match dp_lock.status_channel_receiver().try_recv() {
Err(std::sync::mpsc::TryRecvError::Empty) => {
*w = Some(cx.waker().clone());
Poll::Pending
}
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
error!("DomainParticipant status channel disconnected");
Poll::Ready(None)
}
Ok(t) => Poll::Ready(Some(t)), }
} }
impl<'a> FusedStream for DomainParticipantStatusStream<'a> {
fn is_terminated(&self) -> bool {
false
}
}
impl PartialEq for DomainParticipant {
fn eq(&self, other: &Self) -> bool {
self.guid() == other.guid()
&& self.domain_id() == other.domain_id()
&& self.participant_id() == other.participant_id()
}
}
#[derive(Clone)]
pub struct DomainParticipantWeak {
dpi: Weak<Mutex<DomainParticipantDisc>>,
#[cfg(feature = "security")] domain_id: u16,
guid: GUID,
#[cfg(feature = "security")] qos: QosPolicies,
}
impl DomainParticipantWeak {
pub fn new(dp: &DomainParticipant) -> Self {
Self {
dpi: Arc::downgrade(&dp.dpi),
#[cfg(feature="security")] domain_id: dp.domain_id(),
guid: dp.guid(),
#[cfg(feature="security")] qos: dp.qos(),
}
}
pub fn create_publisher(&self, qos: &QosPolicies) -> CreateResult<Publisher> {
self
.dpi
.upgrade()
.ok_or(CreateError::ResourceDropped {
reason: "DomainParticipant".to_string(),
})
.and_then(|dpi| dpi.lock()?.create_publisher(self, qos))
}
pub fn create_subscriber(&self, qos: &QosPolicies) -> CreateResult<Subscriber> {
self
.dpi
.upgrade()
.ok_or(CreateError::ResourceDropped {
reason: "DomainParticipant".to_string(),
})
.and_then(|dpi| dpi.lock()?.create_subscriber(self, qos))
}
#[cfg(feature = "security")] pub fn domain_id(&self) -> u16 {
self.domain_id
}
#[cfg(feature = "security")] pub fn qos(&self) -> QosPolicies {
self.qos.clone()
}
pub fn create_topic(
&self,
name: String,
type_desc: String,
qos: &QosPolicies,
topic_kind: TopicKind,
) -> CreateResult<Topic> {
self
.dpi
.upgrade()
.ok_or(CreateError::ResourceDropped {
reason: "DomainParticipant".to_string(),
})
.and_then(|dpi| {
dpi
.lock()?
.create_topic(self, name, type_desc, qos, topic_kind)
})
}
pub fn upgrade(self) -> Option<DomainParticipant> {
self.dpi.upgrade().map(|d| DomainParticipant { dpi: d })
}
} impl RTPSEntity for DomainParticipantWeak {
fn guid(&self) -> GUID {
self.guid
}
}
pub(crate) struct DomainParticipantDisc {
dpi: DomainParticipantInner,
discovery_command_sender: mio_channel::SyncSender<DiscoveryCommand>,
discovery_join_handle: mio_channel::Receiver<JoinHandle<()>>,
entity_id_generator: atomic::AtomicU32,
}
impl DomainParticipantDisc {
#[allow(clippy::too_many_arguments)]
pub fn new(
domain_id: u16,
participant_guid: GUID,
qos_policies: QosPolicies,
discovery_join_handle: mio_channel::Receiver<JoinHandle<()>>,
discovery_update_notification_receiver: mio_channel::Receiver<DiscoveryNotificationType>,
discovery_command_sender: mio_channel::SyncSender<DiscoveryCommand>,
spdp_liveness_sender: mio_channel::SyncSender<GuidPrefix>,
status_sender: StatusChannelSender<DomainParticipantStatusEvent>,
status_receiver: StatusChannelReceiver<DomainParticipantStatusEvent>,
security_plugins_handle: Option<SecurityPluginsHandle>,
) -> CreateResult<Self> {
let dpi = DomainParticipantInner::new(
domain_id,
participant_guid,
qos_policies,
discovery_update_notification_receiver,
discovery_command_sender.clone(),
spdp_liveness_sender,
status_sender,
status_receiver,
security_plugins_handle,
)?;
Ok(Self {
dpi,
discovery_command_sender,
discovery_join_handle,
entity_id_generator: atomic::AtomicU32::new(0),
})
}
pub(crate) fn new_entity_id(&self, entity_kind: EntityKind) -> EntityId {
let [_goldilocks, papa_byte, mama_byte, baby_byte] = self
.entity_id_generator
.fetch_add(1, atomic::Ordering::Relaxed)
.to_be_bytes();
EntityId::new([papa_byte, mama_byte, baby_byte], entity_kind)
}
pub fn create_publisher(
&self,
dp: &DomainParticipantWeak,
qos: &QosPolicies,
) -> CreateResult<Publisher> {
self
.dpi
.create_publisher(dp, qos, self.discovery_command_sender.clone())
}
pub fn create_subscriber(
&self,
dp: &DomainParticipantWeak,
qos: &QosPolicies,
) -> CreateResult<Subscriber> {
self
.dpi
.create_subscriber(dp, qos, self.discovery_command_sender.clone())
}
pub fn create_topic(
&self,
dp: &DomainParticipantWeak,
name: String,
type_desc: String,
qos: &QosPolicies,
topic_kind: TopicKind,
) -> CreateResult<Topic> {
self.dpi.create_topic(dp, name, type_desc, qos, topic_kind)
}
pub fn find_topic(
&self,
dp: &DomainParticipantWeak,
name: &str,
timeout: Duration,
) -> CreateResult<Option<Topic>> {
self.dpi.find_topic(dp, name, timeout)
}
pub fn domain_id(&self) -> u16 {
self.dpi.domain_id()
}
pub fn participant_id(&self) -> u16 {
self.dpi.participant_id()
}
pub fn discovered_topics(&self) -> Vec<DiscoveredTopicData> {
self.dpi.discovered_topics()
}
pub(crate) fn dds_cache(&self) -> Arc<RwLock<DDSCache>> {
self.dpi.dds_cache()
}
#[cfg(feature = "security")] pub(crate) fn qos(&self) -> QosPolicies {
self.dpi.qos()
}
pub(crate) fn assert_liveliness(&self) -> WriteResult<(), ()> {
self
.discovery_command_sender
.send(DiscoveryCommand::ManualAssertLiveliness)
.map_err(|_e| WriteError::WouldBlock { data: () })
}
pub(crate) fn self_locators(&self) -> HashMap<mio_06::Token, Vec<Locator>> {
self.dpi.self_locators.clone()
}
pub(crate) fn status_channel_receiver(
&self,
) -> &StatusChannelReceiver<DomainParticipantStatusEvent> {
self.dpi.status_channel_receiver()
}
pub(crate) fn status_channel_receiver_mut(
&mut self,
) -> &mut StatusChannelReceiver<DomainParticipantStatusEvent> {
self.dpi.status_channel_receiver_mut()
}
}
impl Drop for DomainParticipantDisc {
fn drop(&mut self) {
info!("===== RustDDS shutting down ===== .drop() DomainParticipantDisc");
debug!("Wan dp_event_loop about stop.");
if self
.dpi
.stop_poll_sender
.send(EventLoopCommand::PrepareStop)
.is_err()
{
error!("dp_event_loop not responding to prepare stop discovery_command");
}
debug!("Sending Discovery Stop signal.");
if self
.discovery_command_sender
.send(DiscoveryCommand::StopDiscovery)
.is_err()
{
warn!("Failed to send stop signal to Discovery");
return;
}
debug!("Waiting for Discovery join.");
if let Ok(handle) = self.discovery_join_handle.try_recv() {
handle.join().unwrap();
debug!("Joined Discovery.");
}
}
}
pub(crate) struct DomainParticipantInner {
domain_id: u16,
participant_id: u16,
my_guid: GUID,
#[cfg(feature = "security")] my_qos_policies: QosPolicies,
sender_add_reader: mio_channel::SyncSender<ReaderIngredients>,
sender_remove_reader: mio_channel::SyncSender<GUID>,
stop_poll_sender: mio_channel::Sender<EventLoopCommand>,
ev_loop_handle: Option<JoinHandle<()>>, add_writer_sender: mio_channel::SyncSender<WriterIngredients>,
remove_writer_sender: mio_channel::SyncSender<GUID>,
dds_cache: Arc<RwLock<DDSCache>>,
discovery_db: Arc<RwLock<DiscoveryDB>>,
discovery_db_event_receiver: mio_channel::Receiver<()>,
status_receiver: StatusChannelReceiver<DomainParticipantStatusEvent>,
self_locators: HashMap<mio_06::Token, Vec<Locator>>,
security_plugins_handle: Option<SecurityPluginsHandle>,
}
impl Drop for DomainParticipantInner {
fn drop(&mut self) {
if self.stop_poll_sender.send(EventLoopCommand::Stop).is_err() {
error!("dp_event_loop not responding to stop discovery_command");
return;
}
debug!("Waiting for dp_event_loop join");
match self.ev_loop_handle.take() {
Some(join_handle) => {
join_handle
.join()
.unwrap_or_else(|e| warn!("Failed to join dp_event_loop: {e:?}"));
}
None => {
error!("Someone managed to steal dp_event_loop join handle from DomainParticipantInner.");
}
}
debug!("Joined dp_event_loop");
}
}
impl DomainParticipantInner {
#[allow(clippy::too_many_arguments)]
fn new(
domain_id: u16,
participant_guid: GUID,
_qos_policies: QosPolicies,
discovery_update_notification_receiver: mio_channel::Receiver<DiscoveryNotificationType>,
discovery_command_sender: mio_channel::SyncSender<DiscoveryCommand>,
spdp_liveness_sender: mio_channel::SyncSender<GuidPrefix>,
status_sender: StatusChannelSender<DomainParticipantStatusEvent>,
status_receiver: StatusChannelReceiver<DomainParticipantStatusEvent>,
security_plugins_handle: Option<SecurityPluginsHandle>,
) -> CreateResult<Self> {
#[cfg(not(feature = "security"))]
let _dummy = _qos_policies; let mut listeners = HashMap::new();
match UDPListener::new_multicast(
"0.0.0.0",
spdp_well_known_multicast_port(domain_id),
Ipv4Addr::new(239, 255, 0, 1),
) {
Ok(l) => {
listeners.insert(DISCOVERY_MUL_LISTENER_TOKEN, l);
}
Err(e) => warn!("Cannot get multicast discovery listener: {e:?}"),
}
let mut participant_id = 0;
let mut discovery_listener = None;
while discovery_listener.is_none() && participant_id < 120 {
discovery_listener = UDPListener::new_unicast(
"0.0.0.0",
spdp_well_known_unicast_port(domain_id, participant_id),
)
.ok();
if discovery_listener.is_none() {
participant_id += 1;
}
}
info!("ParticipantId {} selected.", participant_id);
let discovery_listener = match discovery_listener {
Some(dl) => dl,
None => return create_error_out_of_resources!("Could not find free ParticipantId"),
};
listeners.insert(DISCOVERY_LISTENER_TOKEN, discovery_listener);
match UDPListener::new_multicast(
"0.0.0.0",
user_traffic_multicast_port(domain_id),
Ipv4Addr::new(239, 255, 0, 1),
) {
Ok(l) => {
listeners.insert(USER_TRAFFIC_MUL_LISTENER_TOKEN, l);
}
Err(e) => warn!("Cannot get multicast user traffic listener: {e:?}"),
}
let user_traffic_listener = UDPListener::new_unicast(
"0.0.0.0",
user_traffic_unicast_port(domain_id, participant_id),
)
.or_else(|e| {
if matches!(e.kind(), ErrorKind::AddrInUse) {
UDPListener::new_unicast("0.0.0.0", 0).or_else(|e| {
create_error_out_of_resources!(
"Could not open unicast user traffic listener, any port number: {:?}",
e
)
})
} else {
create_error_out_of_resources!("Could not open unicast user traffic listener: {e:?}")
}
})?;
listeners.insert(USER_TRAFFIC_LISTENER_TOKEN, user_traffic_listener);
let self_locators: HashMap<mio_06::Token, Vec<Locator>> = listeners
.iter()
.map(|(t, l)| match l.to_locator_address() {
Ok(locs) => (*t, locs),
Err(e) => {
error!("No local network address for token {:?}: {:?}", t, e);
(*t, vec![])
}
})
.collect();
let (sender_add_reader, receiver_add_reader) =
mio_channel::sync_channel::<ReaderIngredients>(100);
let (sender_remove_reader, receiver_remove_reader) = mio_channel::sync_channel::<GUID>(4);
let (add_writer_sender, add_writer_receiver) =
mio_channel::sync_channel::<WriterIngredients>(10);
let (remove_writer_sender, remove_writer_receiver) = mio_channel::sync_channel::<GUID>(4);
let domain_info = DomainInfo {
domain_participant_guid: participant_guid,
domain_id,
participant_id,
};
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
let dds_cache_clone = Arc::clone(&dds_cache);
let (discovery_db_event_sender, discovery_db_event_receiver) =
mio_channel::sync_channel::<()>(1);
let discovery_db = Arc::new(RwLock::new(DiscoveryDB::new(
participant_guid,
discovery_db_event_sender,
status_sender.clone(),
)));
let (stop_poll_sender, stop_poll_receiver) = mio_channel::channel();
let disc_db_clone = discovery_db.clone();
let security_plugins_clone = security_plugins_handle.clone();
let ev_loop_handle = thread::Builder::new()
.name(format!("RustDDS Participant {} event loop", participant_id))
.spawn(move || {
let dp_event_loop = DPEventLoop::new(
domain_info,
dds_cache_clone,
listeners,
disc_db_clone,
participant_guid.prefix,
TokenReceiverPair {
token: ADD_READER_TOKEN,
receiver: receiver_add_reader,
},
TokenReceiverPair {
token: REMOVE_READER_TOKEN,
receiver: receiver_remove_reader,
},
TokenReceiverPair {
token: ADD_WRITER_TOKEN,
receiver: add_writer_receiver,
},
TokenReceiverPair {
token: REMOVE_WRITER_TOKEN,
receiver: remove_writer_receiver,
},
stop_poll_receiver,
discovery_update_notification_receiver,
discovery_command_sender,
spdp_liveness_sender,
status_sender,
security_plugins_clone,
);
dp_event_loop.event_loop();
})?;
info!(
"New DomainParticipantInner: domain_id={:?} participant_id={:?} GUID={:?} security={}",
domain_id,
participant_id,
participant_guid,
cfg!(security)
);
Ok(Self {
domain_id,
participant_id,
#[cfg(feature = "security")]
my_qos_policies: _qos_policies,
my_guid: participant_guid,
sender_add_reader,
sender_remove_reader,
stop_poll_sender,
ev_loop_handle: Some(ev_loop_handle),
add_writer_sender,
remove_writer_sender,
dds_cache,
discovery_db,
discovery_db_event_receiver,
status_receiver,
self_locators,
security_plugins_handle,
})
}
pub fn dds_cache(&self) -> Arc<RwLock<DDSCache>> {
self.dds_cache.clone()
}
#[cfg(feature = "security")] pub(crate) fn qos(&self) -> QosPolicies {
self.my_qos_policies.clone()
}
pub fn create_publisher(
&self,
domain_participant: &DomainParticipantWeak,
qos: &QosPolicies,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
) -> CreateResult<Publisher> {
Ok(Publisher::new(
domain_participant.clone(),
self.discovery_db.clone(),
qos.clone(),
qos.clone(),
self.add_writer_sender.clone(),
self.remove_writer_sender.clone(),
discovery_command,
self.security_plugins_handle.clone(),
))
}
pub fn create_subscriber(
&self,
domain_participant: &DomainParticipantWeak,
qos: &QosPolicies,
discovery_command: mio_channel::SyncSender<DiscoveryCommand>,
) -> CreateResult<Subscriber> {
Ok(Subscriber::new(
domain_participant.clone(),
self.discovery_db.clone(),
qos.clone(),
self.sender_add_reader.clone(),
self.sender_remove_reader.clone(),
discovery_command,
self.security_plugins_handle.clone(),
))
}
pub fn create_topic(
&self,
domain_participant_weak: &DomainParticipantWeak,
name: String,
type_desc: String,
qos: &QosPolicies,
topic_kind: TopicKind,
) -> CreateResult<Topic> {
#[cfg(feature = "security")]
if let Some(sec_handle) = self.security_plugins_handle.as_ref() {
let check_res = sec_handle.get_plugins().check_create_topic(
self.my_guid.prefix,
self.domain_id,
name.clone(),
qos,
);
match check_res {
Ok(check_passed) => {
if !check_passed {
return create_error_not_allowed_by_security!(
"Not allowed to create the topic {}",
name
);
}
}
Err(e) => {
return create_error_internal!(
"Failed to check Topic rights from Access control: {}",
e.msg
);
}
};
}
let topic_type_desc = TypeDesc::new(type_desc);
let topic = Topic::new(
domain_participant_weak,
name.clone(),
topic_type_desc.clone(),
qos,
topic_kind,
);
let mut dds_cache_guard = self.dds_cache.write()?;
dds_cache_guard.add_new_topic(name, topic_type_desc, qos);
Ok(topic)
}
pub fn find_topic(
&self,
domain_participant_weak: &DomainParticipantWeak,
name: &str,
timeout: Duration,
) -> CreateResult<Option<Topic>> {
use mio_06 as mio;
let poll = mio::Poll::new()?;
let mut events = mio::Events::with_capacity(1);
poll.register(
&self.discovery_db_event_receiver,
mio_06::Token(0),
mio::Ready::readable(),
mio::PollOpt::level(),
)?;
let find_end = Instant::now() + timeout;
loop {
if let Some(topic) = self.find_topic_in_discovery_db(domain_participant_weak, name)? {
return Ok(Some(topic));
}
let timeout = find_end - Instant::now();
poll.poll(&mut events, Some(timeout))?;
if let Some(_event) = events.iter().next() {
if self.discovery_db_event_receiver.try_recv().is_ok() {
continue;
}
}
if Instant::now() > find_end {
break;
}
}
Ok(None)
}
fn find_topic_in_discovery_db(
&self,
domain_participant_weak: &DomainParticipantWeak,
name: &str,
) -> CreateResult<Option<Topic>> {
let db = self
.discovery_db
.read()
.map_err(|_| CreateError::Poisoned {
reason: "discovery db".to_string(),
})?;
let build_topic_fn = |d: &DiscoveredTopicData| {
let qos = d.topic_data.qos();
let topic_kind = match d.topic_data.key {
Some(_) => TopicKind::WithKey,
None => TopicKind::NoKey,
};
let name = d.topic_name().clone();
let type_desc = d.topic_data.type_name.clone();
self.create_topic(domain_participant_weak, name, type_desc, &qos, topic_kind)
};
if let Some(d) = db.get_topic(name) {
build_topic_fn(d).map(Some)
} else {
Ok(None)
}
}
pub fn domain_id(&self) -> u16 {
self.domain_id
}
pub fn participant_id(&self) -> u16 {
self.participant_id
}
pub fn discovered_topics(&self) -> Vec<DiscoveredTopicData> {
let db = self
.discovery_db
.read()
.unwrap_or_else(|e| panic!("DiscoveryDB is poisoned. {e:?}"));
db.all_user_topics().cloned().collect()
}
pub(crate) fn status_channel_receiver(
&self,
) -> &StatusChannelReceiver<DomainParticipantStatusEvent> {
&self.status_receiver
}
pub(crate) fn status_channel_receiver_mut(
&mut self,
) -> &mut StatusChannelReceiver<DomainParticipantStatusEvent> {
&mut self.status_receiver
}
} impl RTPSEntity for DomainParticipant {
fn guid(&self) -> GUID {
self.dpi.lock().unwrap().guid()
}
}
impl RTPSEntity for DomainParticipantDisc {
fn guid(&self) -> GUID {
self.dpi.guid()
}
}
impl RTPSEntity for DomainParticipantInner {
fn guid(&self) -> GUID {
self.my_guid
}
}
impl std::fmt::Debug for DomainParticipant {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DomainParticipant")
.field("Guid", &self.guid())
.finish()
}
}
#[cfg(test)]
mod tests {
use std::{
collections::BTreeSet,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
};
use enumflags2::BitFlags;
use log::info;
use speedy::{Endianness, Writable};
use byteorder::LittleEndian;
use crate::{
dds::{qos::QosPolicies, topic::TopicKind},
messages::{
header::Header, protocol_id::ProtocolId, protocol_version::ProtocolVersion,
submessages::submessages::*, vendor_id::VendorId,
},
network::{constant::user_traffic_unicast_port, udp_sender::UDPSender},
rtps::{submessage::*, Message},
serialization::cdr_serializer::CDRSerializerAdapter,
structure::{
guid::{EntityId, GUID},
locator::Locator,
sequence_number::{SequenceNumber, SequenceNumberSet},
},
test::random_data::RandomData,
};
use super::DomainParticipant;
#[test]
fn dp_basic_domain_participant() {
let sender = UDPSender::new(11401).unwrap();
let data: Vec<u8> = vec![0, 1, 2, 3, 4];
let addrs = vec![SocketAddr::new("127.0.0.1".parse().unwrap(), 7412)];
sender.send_to_all(&data, &addrs);
}
#[test]
fn dp_writer_heartbeat_test() {
let domain_participant = DomainParticipant::new(0).expect("Participant creation failed!");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"RandomData".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let mut _data_writer = publisher
.create_datawriter::<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>>(&topic, None)
.expect("Failed to create datawriter");
}
#[test]
fn dp_receive_acknack_message_test() {
let domain_participant = DomainParticipant::new(0).expect("Failed to create participant");
let qos = QosPolicies::qos_none();
let _default_dw_qos = QosPolicies::qos_none();
let publisher = domain_participant
.create_publisher(&qos)
.expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Aasii".to_string(),
"Huh?".to_string(),
&qos,
TopicKind::WithKey,
)
.expect("Failed to create topic");
let mut _data_writer = publisher
.create_datawriter::<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>>(&topic, None)
.expect("Failed to create datawriter");
let port_number: u16 = user_traffic_unicast_port(5, 0);
let sender = UDPSender::new(1234).unwrap();
let mut m: Message = Message::default();
let a: AckNack = AckNack {
reader_id: EntityId::SPDP_BUILTIN_PARTICIPANT_READER,
writer_id: EntityId::SPDP_BUILTIN_PARTICIPANT_WRITER,
reader_sn_state: SequenceNumberSet::from_base_and_set(
SequenceNumber::default(),
&BTreeSet::new(),
),
count: 1,
};
let flags = BitFlags::<ACKNACK_Flags>::from_endianness(Endianness::BigEndian);
let sub_header: SubmessageHeader = SubmessageHeader {
kind: SubmessageKind::ACKNACK,
flags: flags.bits(),
content_length: 24,
};
let s: Submessage = Submessage {
header: sub_header,
body: SubmessageBody::Reader(ReaderSubmessage::AckNack(a, flags)),
original_bytes: None,
};
let h = Header {
protocol_id: ProtocolId::default(),
protocol_version: ProtocolVersion { major: 2, minor: 3 },
vendor_id: VendorId::THIS_IMPLEMENTATION,
guid_prefix: GUID::default().prefix,
};
m.set_header(h);
m.add_submessage(s);
let _data: Vec<u8> = m.write_to_vec_with_ctx(Endianness::LittleEndian).unwrap();
info!("data to send via udp: {:?}", _data);
let ip = Ipv4Addr::from([0x00, 0x00, 0x00, 0x00]);
let socket_address = SocketAddrV4::new(ip, port_number);
let locators = vec![Locator::UdpV4(socket_address)];
sender.send_to_locator_list(&_data, &locators);
}
}