use std::{
cmp::max,
collections::{BTreeMap, HashMap},
ops::Bound::{Excluded, Included},
sync::{Arc, Mutex},
};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use crate::{
create_error_internal,
dds::{
qos::{
policy::{History, ResourceLimits},
QosPolicies,
},
typedesc::TypeDesc,
CreateError, CreateResult,
},
structure::{sequence_number::SequenceNumber, time::Timestamp},
GUID,
};
use super::cache_change::CacheChange;
#[derive(Debug, Default)]
pub struct DDSCache {
topic_caches: HashMap<String, Arc<Mutex<TopicCache>>>,
}
impl DDSCache {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn add_new_topic(
&mut self,
topic_name: String,
topic_data_type: TypeDesc,
qos: &QosPolicies,
) -> Arc<Mutex<TopicCache>> {
let topic_cache_handle = self
.topic_caches
.entry(topic_name.clone())
.and_modify(|tc| tc.lock().unwrap().update_keep_limits(qos))
.or_insert(Arc::new(Mutex::new(TopicCache::new(
topic_name,
topic_data_type,
qos,
))));
topic_cache_handle.clone()
}
pub(crate) fn get_existing_topic_cache(
&self,
topic_name: &str,
) -> CreateResult<Arc<Mutex<TopicCache>>> {
match self.topic_caches.get(topic_name) {
Some(tc) => Ok(tc.clone()),
None => create_error_internal!("Topic cache for topic {topic_name} not found in DDS cache"),
}
}
#[allow(dead_code)]
pub fn remove_topic(&mut self, topic_name: &str) {
if self.topic_caches.contains_key(topic_name) {
self.topic_caches.remove(topic_name);
}
}
}
#[derive(Debug)]
pub(crate) struct TopicCache {
topic_name: String,
#[allow(dead_code)] topic_data_type: TypeDesc,
#[allow(dead_code)]
topic_qos: QosPolicies,
min_keep_samples: History,
max_keep_samples: i32, changes: BTreeMap<Timestamp, CacheChange>,
sequence_numbers: BTreeMap<GUID, BTreeMap<SequenceNumber, Timestamp>>,
received_reliably_before: BTreeMap<GUID, SequenceNumber>,
}
impl TopicCache {
pub fn new(topic_name: String, topic_data_type: TypeDesc, topic_qos: &QosPolicies) -> Self {
let mut new_self = Self {
topic_name,
topic_data_type,
topic_qos: topic_qos.clone(),
min_keep_samples: History::KeepLast { depth: 1 }, max_keep_samples: 1, changes: BTreeMap::new(),
sequence_numbers: BTreeMap::new(),
received_reliably_before: BTreeMap::new(),
};
new_self.update_keep_limits(topic_qos);
new_self
}
pub fn update_keep_limits(&mut self, qos: &QosPolicies) {
let min_keep_samples = qos
.history()
.unwrap_or(History::KeepLast { depth: 1 });
let max_keep_samples = qos
.resource_limits()
.unwrap_or(ResourceLimits {
max_samples: 1024,
max_instances: 1024,
max_samples_per_instance: 64,
})
.max_samples;
let max_keep_samples = match min_keep_samples {
History::KeepLast { depth: n } if n > max_keep_samples => n,
_ => max_keep_samples,
};
self.min_keep_samples = max(min_keep_samples, self.min_keep_samples);
self.max_keep_samples = max(max_keep_samples, self.max_keep_samples);
}
pub fn mark_reliably_received_before(&mut self, writer: GUID, sn: SequenceNumber) {
self.received_reliably_before.insert(writer, sn);
}
pub fn get_change(&self, instant: &Timestamp) -> Option<&CacheChange> {
self.changes.get(instant)
}
pub fn add_change(&mut self, instant: &Timestamp, cache_change: CacheChange) {
self
.add_change_internal(instant, cache_change)
.map(|cc_back| {
debug!(
"DDSCache insert failed topic={:?} cache_change={:?}",
self.topic_name, cc_back
);
});
}
fn add_change_internal(
&mut self,
instant: &Timestamp,
cache_change: CacheChange,
) -> Option<CacheChange> {
let payload_size = max(1, cache_change.data_value.payload_size());
let semi_random_number = i64::from(cache_change.sequence_number) as usize;
let fairly_large_constant = 0xffff;
let modulus = fairly_large_constant / payload_size;
if modulus == 0 || semi_random_number % modulus == 0 {
debug!("Garbage collecting topic {}", self.topic_name);
self.remove_changes_before(Timestamp::ZERO);
}
if let Some(old_instant) = self.find_by_sn(&cache_change) {
debug!(
"add_change: discarding duplicate {:?} from {:?}. old timestamp = {:?}, new = {:?}",
cache_change.sequence_number, cache_change.writer_guid, old_instant, instant,
);
Some(cache_change)
} else {
self.insert_sn(*instant, &cache_change);
self.changes.insert(*instant, cache_change).map(|old_cc| {
error!(
"DDSHistoryCache already contained element with key {:?} !!!",
instant
);
self.remove_sn(&old_cc);
old_cc
})
}
}
fn find_by_sn(&self, cc: &CacheChange) -> Option<Timestamp> {
self
.sequence_numbers
.get(&cc.writer_guid)
.and_then(|snm| snm.get(&cc.sequence_number))
.copied()
}
fn insert_sn(&mut self, instant: Timestamp, cc: &CacheChange) {
self
.sequence_numbers
.entry(cc.writer_guid)
.or_default()
.insert(cc.sequence_number, instant);
}
pub fn get_changes_in_range_best_effort(
&self,
start_instant: Timestamp,
end_instant: Timestamp,
) -> Box<dyn Iterator<Item = (Timestamp, &CacheChange)> + '_> {
Box::new(
self
.changes
.range((Excluded(start_instant), Included(end_instant)))
.map(|(i, c)| (*i, c)),
)
}
pub fn get_changes_in_range_reliable<'a>(
&'a self,
last_read_sn: &'a BTreeMap<GUID, SequenceNumber>,
) -> Box<dyn Iterator<Item = (Timestamp, &CacheChange)> + 'a> {
Box::new(
self
.sequence_numbers
.iter()
.flat_map(|(guid, sn_map)| {
let lower_bound_exc = last_read_sn
.get(guid)
.cloned()
.unwrap_or(SequenceNumber::zero());
let upper_bound_exc = self.reliable_before(*guid);
let upper_bound_exc = max(upper_bound_exc, lower_bound_exc.plus_1());
sn_map.range((Excluded(lower_bound_exc), Excluded(upper_bound_exc)))
}) .filter_map(|(_sn, t)| self.get_change(t).map(|cc| (*t, cc))),
)
}
pub fn writers_smallest_sn_in_cache(&self, writer_guid: GUID) -> Option<SequenceNumber> {
self
.sequence_numbers
.get(&writer_guid)
.and_then(|sn_to_ts| sn_to_ts.first_key_value())
.map(|(sn, _ts)| sn)
.copied()
}
fn reliable_before(&self, writer: GUID) -> SequenceNumber {
self
.received_reliably_before
.get(&writer)
.cloned()
.unwrap_or(SequenceNumber::default())
}
fn remove_sn(&mut self, cc: &CacheChange) {
let mut emptied = false;
self.sequence_numbers.entry(cc.writer_guid).and_modify(|s| {
s.remove(&cc.sequence_number);
emptied = s.is_empty();
});
if emptied {
self.sequence_numbers.remove(&cc.writer_guid);
}
}
pub fn remove_changes_before(&mut self, remove_before: Timestamp) {
let min_remove_count = max(
0,
self
.changes
.len()
.wrapping_sub(self.max_keep_samples as usize),
);
let max_remove_count = min_remove_count;
let split_key = *self
.changes
.keys()
.take(max_remove_count)
.enumerate()
.skip_while(|(i, ts)| {
*i < min_remove_count || (**ts < remove_before && *i < max_remove_count)
})
.map(|(_, ts)| ts) .next() .unwrap_or(&Timestamp::ZERO); let to_retain = self.changes.split_off(&split_key);
let to_remove = std::mem::replace(&mut self.changes, to_retain);
for r in to_remove.values() {
self.remove_sn(r);
}
}
pub fn topic_name(&self) -> String {
self.topic_name.clone()
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, RwLock},
thread,
};
use super::DDSCache;
use crate::{
dds::{
ddsdata::DDSData, qos::QosPolicies, typedesc::TypeDesc, with_key::datawriter::WriteOptions,
},
messages::submessages::elements::serialized_payload::SerializedPayload,
structure::{cache_change::CacheChange, guid::GUID, sequence_number::SequenceNumber},
};
#[test]
fn create_dds_cache_and_topic_cache() {
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
let topic_name = String::from("ImJustATopic");
let qos = QosPolicies::qos_none();
let topic_cache_handle = dds_cache.write().unwrap().add_new_topic(
topic_name,
TypeDesc::new("IDontKnowIfThisIsNecessary".to_string()),
&qos,
);
let change1 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(1),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
topic_cache_handle
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change1);
let topic_cache_handle2 = topic_cache_handle.clone();
thread::spawn(move || {
let change2 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(2),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
let change3 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(3),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
topic_cache_handle2
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change2);
topic_cache_handle2
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change3);
})
.join()
.unwrap();
assert_eq!(
topic_cache_handle
.lock()
.unwrap()
.get_changes_in_range_best_effort(
crate::Timestamp::now() - crate::Duration::from_secs(23),
crate::Timestamp::now()
)
.count(),
3
);
}
}