use std::{collections::BTreeMap, time::Instant};
use chrono::Utc;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
dds::{
participant::DomainParticipant, qos::HasQoSPolicy, reader::ReaderIngredients,
rtps_reader_proxy::RtpsReaderProxy, rtps_writer_proxy::RtpsWriterProxy, topic::Topic,
traits::TopicDescription,
},
structure::{
duration::Duration,
entity::RTPSEntity,
guid::{EntityId, GuidPrefix, GUID},
},
};
use super::data_types::{
spdp_participant_data::SpdpDiscoveredParticipantData,
topic_data::{
DiscoveredReaderData, DiscoveredTopicData, DiscoveredWriterData, ParticipantMessageData,
ReaderProxy, SubscriptionBuiltinTopicData, TopicBuiltinTopicData, WriterProxy,
},
};
const DEFAULT_PARTICIPANT_LEASE_DURATION: Duration = Duration::from_secs(60);
const PARTICIPANT_LEASE_DURATION_TOLREANCE: Duration = Duration::from_secs(0);
pub(crate) struct DiscoveryDB {
my_guid: GUID,
participant_proxies: BTreeMap<GuidPrefix, SpdpDiscoveredParticipantData>,
participant_last_life_signs: BTreeMap<GuidPrefix, Instant>,
local_topic_writers: BTreeMap<GUID, DiscoveredWriterData>,
local_topic_readers: BTreeMap<GUID, DiscoveredReaderData>,
external_topic_readers: BTreeMap<GUID, DiscoveredReaderData>,
external_topic_writers: BTreeMap<GUID, DiscoveredWriterData>,
topics: BTreeMap<String, BTreeMap<GuidPrefix, (DiscoveredVia, DiscoveredTopicData)>>,
topic_updated_sender: mio_extras::channel::SyncSender<()>,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug)]
pub(crate) enum DiscoveredVia {
Topic, Publication, Subscription, }
impl DiscoveryDB {
pub fn new(my_guid: GUID, topic_updated_sender: mio_extras::channel::SyncSender<()>) -> Self {
Self {
my_guid,
participant_proxies: BTreeMap::new(),
participant_last_life_signs: BTreeMap::new(),
local_topic_writers: BTreeMap::new(),
local_topic_readers: BTreeMap::new(),
external_topic_readers: BTreeMap::new(),
external_topic_writers: BTreeMap::new(),
topics: BTreeMap::new(),
topic_updated_sender,
}
}
pub fn update_participant(&mut self, data: &SpdpDiscoveredParticipantData) -> bool {
debug!("update_participant: {:?}", &data);
let guid = data.participant_guid;
if guid.entity_id != EntityId::PARTICIPANT {
error!(
"Discovered participant GUID entity_id is not for participant: {:?}",
guid
);
return false;
}
let mut new_participant = false;
if self.participant_proxies.get(&guid.prefix).is_none() {
info!("New remote participant: {:?}", &data);
new_participant = true;
if guid == self.my_guid {
info!(
"Remote participant {:?} is myself, but some reflection is good.",
guid
);
new_participant = false;
}
}
self.participant_proxies.insert(guid.prefix, data.clone());
self
.participant_last_life_signs
.insert(guid.prefix, Instant::now());
new_participant
}
pub fn participant_is_alive(&mut self, guid_prefix: GuidPrefix) {
if let Some(ts) = self.participant_last_life_signs.get_mut(&guid_prefix) {
let now = Instant::now();
if now.duration_since(*ts) > std::time::Duration::from_secs(1) {
debug!(
"Participant alive update for {:?}, but no full update.",
guid_prefix
);
}
*ts = now;
} else {
info!("Participant alive update for unknown {:?}. This is normal, if the message does not repeat.", guid_prefix);
}
}
pub fn remove_participant(&mut self, guid_prefix: GuidPrefix) {
info!("removing participant {:?}", guid_prefix);
self.participant_proxies.remove(&guid_prefix);
self.participant_last_life_signs.remove(&guid_prefix);
self.remove_topic_reader_with_prefix(guid_prefix);
self.remove_topic_writer_with_prefix(guid_prefix);
}
pub fn find_participant_proxy(
&self,
guid_prefix: GuidPrefix,
) -> Option<&SpdpDiscoveredParticipantData> {
self.participant_proxies.get(&guid_prefix)
}
fn remove_topic_reader_with_prefix(&mut self, guid_prefix: GuidPrefix) {
let to_remove: Vec<GUID> = self
.external_topic_readers
.range(guid_prefix.range())
.map(|(g, _)| *g)
.collect();
for guid in to_remove {
self.external_topic_readers.remove(&guid);
}
}
pub fn remove_topic_reader(&mut self, guid: GUID) {
info!("remove_topic_reader {:?}", guid);
self.external_topic_readers.remove(&guid);
}
fn remove_topic_writer_with_prefix(&mut self, guid_prefix: GuidPrefix) {
let to_remove: Vec<GUID> = self
.external_topic_writers
.range(guid_prefix.range())
.map(|(g, _)| *g)
.collect();
for guid in to_remove {
self.external_topic_writers.remove(&guid);
}
}
pub fn remove_topic_writer(&mut self, guid: GUID) {
self.external_topic_writers.remove(&guid);
}
pub fn participant_cleanup(&mut self) -> Vec<GuidPrefix> {
let inow = Instant::now();
let mut to_remove = Vec::new();
for (&guid, sp) in &self.participant_proxies {
let lease_duration = sp
.lease_duration
.unwrap_or(DEFAULT_PARTICIPANT_LEASE_DURATION);
match self.participant_last_life_signs.get(&guid) {
Some(&last_life) => {
let elapsed = Duration::from_std(inow.duration_since(last_life));
if elapsed <= lease_duration + PARTICIPANT_LEASE_DURATION_TOLREANCE {
} else {
info!("participant cleanup - deleting participant proxy {:?}. lease_duration = {:?} elapsed = {:?}",
guid, lease_duration, elapsed);
to_remove.push(guid);
}
}
None => {
error!("Participant {:?} not in last_life_signs table?", guid);
}
} } for guid in &to_remove {
self.remove_participant(*guid);
}
to_remove
}
fn topic_has_writers_or_readers(&self, topic_name: &str) -> bool {
if self
.local_topic_readers
.iter()
.any(|(_, p)| p.subscription_topic_data.topic_name() == topic_name)
{
return true;
}
if self
.local_topic_writers
.iter()
.any(|(_, p)| p.publication_topic_data.topic_name == topic_name)
{
return true;
}
if self
.external_topic_readers
.values()
.any(|p| p.subscription_topic_data.topic_name() == topic_name)
{
return true;
}
if self
.external_topic_writers
.values()
.any(|p| p.publication_topic_data.topic_name == topic_name)
{
return true;
}
false
}
pub fn topic_cleanup(&mut self) {
let dead_topics: Vec<_> = self
.topics
.iter()
.map(|(tn, _)| tn)
.filter(|tn| !self.topic_has_writers_or_readers(tn))
.cloned()
.collect();
for dt in &dead_topics {
self.topics.remove(dt);
}
}
pub fn update_local_topic_writer(&mut self, writer: DiscoveredWriterData) {
self
.local_topic_writers
.insert(writer.writer_proxy.remote_writer_guid, writer);
}
pub fn remove_local_topic_writer(&mut self, guid: GUID) {
self.local_topic_writers.remove(&guid);
}
pub fn update_subscription(&mut self, data: &DiscoveredReaderData) -> DiscoveredReaderData {
let guid = data.reader_proxy.remote_reader_guid;
self.external_topic_readers.insert(guid, data.clone());
let default_locator_lists = self
.find_participant_proxy(guid.prefix)
.map(|pp| {
debug!("Added participant locators to Reader {:?}", guid);
(
pp.default_unicast_locators.clone(),
pp.default_multicast_locators.clone(),
)
})
.unwrap_or_else(|| {
if guid.prefix != GuidPrefix::UNKNOWN {
debug!(
"No remote participant known for {:?}\nSearched with {:?} in {:?}",
data,
guid.prefix,
self.participant_proxies.keys()
);
}
(Vec::default(), Vec::default())
});
debug!("External reader: {:?}", data);
let dtd = data.subscription_topic_data.to_topic_data();
self.update_topic_data(
&DiscoveredTopicData::new(Utc::now(), dtd),
guid,
DiscoveredVia::Subscription,
);
DiscoveredReaderData {
reader_proxy: ReaderProxy::from(RtpsReaderProxy::from_discovered_reader_data(
data,
&default_locator_lists.0,
&default_locator_lists.1,
)),
..data.clone()
}
}
pub fn update_publication(&mut self, data: &DiscoveredWriterData) -> DiscoveredWriterData {
let guid = data.writer_proxy.remote_writer_guid;
self
.external_topic_writers
.insert(data.writer_proxy.remote_writer_guid, data.clone());
let default_locator_lists = self
.find_participant_proxy(guid.prefix)
.map(|pp| {
debug!("Added participant locators to Reader {:?}", guid);
(
pp.default_unicast_locators.clone(),
pp.default_multicast_locators.clone(),
)
})
.unwrap_or_else(|| {
if guid.prefix != GuidPrefix::UNKNOWN {
debug!(
"No remote participant known for {:?}\nSearched with {:?} in {:?}",
data,
guid.prefix,
self.participant_proxies.keys()
);
}
(Vec::default(), Vec::default())
});
debug!("External writer: {:?}", data);
let dtd = data.publication_topic_data.to_topic_data();
self.update_topic_data(
&DiscoveredTopicData::new(Utc::now(), dtd),
guid,
DiscoveredVia::Publication,
);
DiscoveredWriterData {
writer_proxy: WriterProxy::from(RtpsWriterProxy::from_discovered_writer_data(
data,
&default_locator_lists.0,
&default_locator_lists.1,
)),
..data.clone()
}
}
pub fn update_topic_data_p(&mut self, topic: &Topic) {
let topic_data = DiscoveredTopicData::new(
Utc::now(),
TopicBuiltinTopicData::new(
None,
topic.name(),
topic.get_type().name().to_owned(),
&topic.qos(),
),
);
self.update_topic_data(&topic_data, self.my_guid, DiscoveredVia::Topic);
}
pub fn update_topic_data(
&mut self,
dtd: &DiscoveredTopicData,
updater: GUID,
discovered_via: DiscoveredVia,
) {
trace!("Update topic data: {:?}", &dtd);
let topic_name = dtd.topic_data.name.clone();
let mut notify = false;
if let Some(t) = self.topics.get_mut(&dtd.topic_data.name) {
if let Some(old_dtd) = t.get_mut(&updater.prefix) {
if dtd.topic_data.type_name == old_dtd.1.topic_data.type_name
{
if discovered_via == DiscoveredVia::Topic {
*old_dtd = (discovered_via, dtd.clone()); notify = true;
} else {
info!(
"Topic {:?} update ignored from {:?}. Already have this.",
&topic_name, &updater
);
}
} else {
error!(
"Inconsistent topic update from {:?}: type was: {:?} new type: {:?}",
updater, old_dtd.1.topic_data.type_name, dtd.topic_data.type_name,
);
}
} else {
t.insert(updater.prefix, (discovered_via, dtd.clone())); notify = true;
}
} else {
let mut b = BTreeMap::new();
b.insert(updater.prefix, (discovered_via, dtd.clone()));
self.topics.insert(topic_name, b);
};
if notify {
self
.topic_updated_sender
.try_send(())
.unwrap_or_else(|e| trace!("update_topic_data: Notification send failed: {:?}", e));
}
}
pub fn update_local_topic_reader(
&mut self,
domain_participant: &DomainParticipant,
topic: &Topic,
reader: &ReaderIngredients,
) {
let reader_guid = reader.guid;
let reader_proxy = RtpsReaderProxy::from_reader(reader, domain_participant);
let subscription_data = SubscriptionBuiltinTopicData::new(
reader_guid,
Some(domain_participant.guid()),
topic.name(),
topic.get_type().name().to_string(),
&topic.qos(),
);
let content_filter = None;
let discovered_reader_data = DiscoveredReaderData {
reader_proxy: ReaderProxy::from(reader_proxy),
subscription_topic_data: subscription_data,
content_filter,
};
self
.local_topic_readers
.insert(reader_guid, discovered_reader_data);
}
pub fn remove_local_topic_reader(&mut self, guid: GUID) {
self.local_topic_readers.remove(&guid);
}
pub fn get_all_local_topic_readers(&self) -> impl Iterator<Item = &DiscoveredReaderData> {
self.local_topic_readers.iter().map(|(_, p)| p)
}
pub fn get_all_local_topic_writers(&self) -> impl Iterator<Item = &DiscoveredWriterData> {
self.local_topic_writers.iter().map(|(_, p)| p)
}
pub fn all_user_topics(&self) -> impl Iterator<Item = &DiscoveredTopicData> {
self
.topics
.iter()
.filter(|(s, _)| !s.starts_with("DCPS"))
.flat_map(|(_, gm)| gm.iter().map(|(_, dtd)| &dtd.1))
}
pub fn local_user_topics(&self) -> impl Iterator<Item = &DiscoveredTopicData> {
let me = self.my_guid.prefix;
self
.topics
.iter()
.filter(|(s, _)| !s.starts_with("DCPS"))
.flat_map(move |(_, gm)| {
gm.iter()
.filter(move |(guid, _)| **guid == me)
.map(|(_, dtd)| &dtd.1)
})
}
pub fn get_topic(&self, topic_name: &str) -> Option<&DiscoveredTopicData> {
self
.topics
.get(topic_name)
.and_then(|m| m.values().next().map(|t| &t.1))
}
pub fn writers_on_topic_and_participant(
&self,
topic_name: &str,
participant: GuidPrefix,
) -> Vec<DiscoveredWriterData> {
let on_particiapnt = self
.external_topic_writers
.range(participant.range())
.map(|(_guid, dwd)| dwd);
info!(
"Writers on participant {:?} are {:?}",
participant, on_particiapnt
);
on_particiapnt
.filter(|dwd| dwd.publication_topic_data.topic_name == topic_name)
.cloned()
.collect()
}
pub fn readers_on_topic_and_participant(
&self,
topic_name: &str,
participant: GuidPrefix,
) -> Vec<DiscoveredReaderData> {
self
.external_topic_readers
.range(participant.range())
.map(|(_guid, drd)| drd)
.filter(|drd| drd.subscription_topic_data.topic_name() == topic_name)
.cloned()
.collect()
}
#[cfg(test)] pub fn get_local_topic_readers<'a, T: TopicDescription>(
&'a self,
topic: &'a T,
) -> Vec<&DiscoveredReaderData> {
let topic_name = topic.name();
self
.local_topic_readers
.iter()
.filter(|(_, p)| *p.subscription_topic_data.topic_name() == topic_name)
.map(|(_, p)| p)
.collect()
}
pub fn update_lease_duration(&mut self, data: &ParticipantMessageData) {
let now = Instant::now();
let prefix = data.guid;
self
.external_topic_writers
.range_mut(prefix.range())
.for_each(|(_guid, p)| p.last_updated = now);
}
}
#[cfg(test)]
mod tests {
use std::{
rc::Rc,
sync::{Arc, RwLock},
time::Duration as StdDuration,
};
use byteorder::LittleEndian;
use mio_extras::channel as mio_channel;
use super::*;
use crate::{
dds::{
qos::QosPolicies, reader::Reader, statusevents::DataReaderStatus, topic::TopicKind,
with_key::datareader::ReaderCommand,
},
network::udp_sender::UDPSender,
serialization::cdr_serializer::CDRSerializerAdapter,
structure::{dds_cache::DDSCache, guid::*},
test::{
random_data::RandomData,
test_data::{reader_proxy_data, spdp_participant_data, subscription_builtin_topic_data},
},
};
#[test]
fn discdb_participant_operations() {
let (discovery_db_event_sender, _discovery_db_event_receiver) =
mio_channel::sync_channel::<()>(4);
let mut discoverydb = DiscoveryDB::new(GUID::new_participant_guid(), discovery_db_event_sender);
let mut data = spdp_participant_data().unwrap();
data.lease_duration = Some(Duration::from(StdDuration::from_secs(1)));
discoverydb.update_participant(&data);
assert!(discoverydb.participant_proxies.len() == 1);
discoverydb.update_participant(&data);
assert!(discoverydb.participant_proxies.len() == 1);
std::thread::sleep(StdDuration::from_secs(2));
discoverydb.participant_cleanup();
assert!(discoverydb.participant_proxies.is_empty());
}
#[test]
fn discdb_writer_proxies() {
let (discovery_db_event_sender, _discovery_db_event_receiver) =
mio_channel::sync_channel::<()>(4);
let _discoverydb = DiscoveryDB::new(GUID::new_participant_guid(), discovery_db_event_sender);
let topic_name = String::from("some_topic");
let type_name = String::from("RandomData");
let _dreader = DiscoveredReaderData::default(topic_name, type_name);
}
#[test]
fn discdb_subscription_operations() {
let (discovery_db_event_sender, _discovery_db_event_receiver) =
mio_channel::sync_channel::<()>(4);
let mut discovery_db =
DiscoveryDB::new(GUID::new_participant_guid(), discovery_db_event_sender);
let domain_participant = DomainParticipant::new(0).expect("Failed to create publisher");
let topic = domain_participant
.create_topic(
"Foobar".to_string(),
"RandomData".to_string(),
&QosPolicies::qos_none(),
TopicKind::WithKey,
)
.unwrap();
let topic2 = domain_participant
.create_topic(
"Barfoo".to_string(),
"RandomData".to_string(),
&QosPolicies::qos_none(),
TopicKind::WithKey,
)
.unwrap();
let publisher1 = domain_participant
.create_publisher(&QosPolicies::qos_none())
.unwrap();
let dw = publisher1
.create_datawriter::<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>>(&topic, None)
.unwrap();
let writer_data = DiscoveredWriterData::new(&dw, &topic, &domain_participant);
discovery_db.update_local_topic_writer(writer_data);
assert_eq!(discovery_db.local_topic_writers.len(), 1);
let publisher2 = domain_participant
.create_publisher(&QosPolicies::qos_none())
.unwrap();
let dw2 = publisher2
.create_datawriter::<RandomData, CDRSerializerAdapter<RandomData, LittleEndian>>(&topic, None)
.unwrap();
let writer_data2 = DiscoveredWriterData::new(&dw2, &topic, &domain_participant);
discovery_db.update_local_topic_writer(writer_data2);
assert_eq!(discovery_db.local_topic_writers.len(), 2);
let reader1 = reader_proxy_data().unwrap();
let reader1sub = subscription_builtin_topic_data().unwrap();
let dreader1 = DiscoveredReaderData {
reader_proxy: reader1.clone(),
subscription_topic_data: reader1sub.clone(),
content_filter: None,
};
discovery_db.update_subscription(&dreader1);
let reader2 = reader_proxy_data().unwrap();
let reader2sub = subscription_builtin_topic_data().unwrap();
let dreader2 = DiscoveredReaderData {
reader_proxy: reader2,
subscription_topic_data: reader2sub,
content_filter: None,
};
discovery_db.update_subscription(&dreader2);
let reader3 = reader1;
let reader3sub = reader1sub.clone();
let dreader3 = DiscoveredReaderData {
reader_proxy: reader3,
subscription_topic_data: reader3sub,
content_filter: None,
};
discovery_db.update_subscription(&dreader3);
}
#[test]
fn discdb_local_topic_reader() {
let (discovery_db_event_sender, _discovery_db_event_receiver) =
mio_channel::sync_channel::<()>(4);
let dp = DomainParticipant::new(0).expect("Failed to create participant");
let topic = dp
.create_topic(
"some topic name".to_string(),
"Wazzup".to_string(),
&QosPolicies::qos_none(),
TopicKind::WithKey,
)
.unwrap();
let mut discoverydb = DiscoveryDB::new(GUID::new_participant_guid(), discovery_db_event_sender);
let (notification_sender, _notification_receiver) = mio_extras::channel::sync_channel(100);
let (status_sender, _status_receiver) =
mio_extras::channel::sync_channel::<DataReaderStatus>(100);
let (_reader_commander1, reader_command_receiver1) =
mio_extras::channel::sync_channel::<ReaderCommand>(100);
let (_reader_commander2, reader_command_receiver2) =
mio_extras::channel::sync_channel::<ReaderCommand>(100);
let reader_ing = ReaderIngredients {
guid: GUID::dummy_test_guid(EntityKind::READER_NO_KEY_USER_DEFINED),
notification_sender: notification_sender.clone(),
status_sender: status_sender.clone(),
topic_name: topic.name(),
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver1,
};
discoverydb.update_local_topic_reader(&dp, &topic, &reader_ing);
assert_eq!(discoverydb.local_topic_readers.len(), 1);
assert_eq!(discoverydb.get_local_topic_readers(&topic).len(), 1);
discoverydb.update_local_topic_reader(&dp, &topic, &reader_ing);
assert_eq!(discoverydb.local_topic_readers.len(), 1);
assert_eq!(discoverydb.get_local_topic_readers(&topic).len(), 1);
let _reader = Reader::new(
reader_ing,
Arc::new(RwLock::new(DDSCache::new())),
Rc::new(UDPSender::new(0).unwrap()),
mio_extras::timer::Builder::default().build(),
);
let reader_ing = ReaderIngredients {
guid: GUID::new_with_prefix_and_id(
GuidPrefix::new(b"Another fake"),
EntityId {
entity_key: [1, 2, 3],
entity_kind: EntityKind::READER_NO_KEY_USER_DEFINED,
},
), notification_sender,
status_sender,
topic_name: topic.name(),
qos_policy: QosPolicies::qos_none(),
data_reader_command_receiver: reader_command_receiver2,
};
discoverydb.update_local_topic_reader(&dp, &topic, &reader_ing);
assert_eq!(discoverydb.get_local_topic_readers(&topic).len(), 2);
assert_eq!(discoverydb.get_all_local_topic_readers().count(), 2);
let _reader = Reader::new(
reader_ing,
Arc::new(RwLock::new(DDSCache::new())),
Rc::new(UDPSender::new(0).unwrap()),
mio_extras::timer::Builder::default().build(),
);
}
}