use std::{
cmp::max,
collections::{btree_map, BTreeMap, HashMap},
iter,
ops::Bound::{Excluded, Included},
sync::{Arc, Mutex},
};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
create_error_internal,
dds::{
qos::{
policy::{History, ResourceLimits},
QosPolicies,
},
typedesc::TypeDesc,
CreateError, CreateResult,
},
structure::{sequence_number::SequenceNumber, time::Timestamp},
GUID,
};
use super::cache_change::CacheChange;
#[derive(Debug, Default)]
pub struct DDSCache {
topic_caches: HashMap<String, Arc<Mutex<TopicCache>>>,
}
impl DDSCache {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn add_new_topic(
&mut self,
topic_name: String,
topic_data_type: TypeDesc,
qos: &QosPolicies,
) -> Arc<Mutex<TopicCache>> {
let topic_cache_handle = self
.topic_caches
.entry(topic_name.clone())
.and_modify(|tc| tc.lock().unwrap().update_keep_limits(qos))
.or_insert(Arc::new(Mutex::new(TopicCache::new(
topic_name,
topic_data_type,
qos,
))));
topic_cache_handle.clone()
}
pub(crate) fn get_existing_topic_cache(
&self,
topic_name: &str,
) -> CreateResult<Arc<Mutex<TopicCache>>> {
match self.topic_caches.get(topic_name) {
Some(tc) => Ok(tc.clone()),
None => create_error_internal!("Topic cache for topic {topic_name} not found in DDS cache"),
}
}
#[allow(dead_code)]
pub fn remove_topic(&mut self, topic_name: &str) {
if self.topic_caches.contains_key(topic_name) {
self.topic_caches.remove(topic_name);
}
}
pub fn garbage_collect(&mut self) {
for tc in self.topic_caches.values_mut() {
let mut tc = tc.lock().unwrap();
if let Some((last_timestamp, _)) = tc.changes.iter().next_back() {
if *last_timestamp > tc.changes_reallocated_up_to {
tc.remove_changes_before(Timestamp::ZERO);
}
}
}
}
}
#[derive(Debug)]
pub(crate) struct TopicCache {
topic_name: String,
#[allow(dead_code)] topic_data_type: TypeDesc,
#[allow(dead_code)]
topic_qos: QosPolicies,
min_keep_samples: History,
max_keep_samples: i32,
changes: BTreeMap<Timestamp, CacheChange>,
changes_reallocated_up_to: Timestamp,
sequence_numbers: BTreeMap<GUID, BTreeMap<SequenceNumber, Timestamp>>,
received_reliably_before: BTreeMap<GUID, SequenceNumber>,
}
impl TopicCache {
pub fn new(topic_name: String, topic_data_type: TypeDesc, topic_qos: &QosPolicies) -> Self {
let mut new_self = Self {
topic_name,
topic_data_type,
topic_qos: topic_qos.clone(),
min_keep_samples: History::KeepLast { depth: 1 },
max_keep_samples: 1, changes: BTreeMap::new(),
changes_reallocated_up_to: Timestamp::ZERO,
sequence_numbers: BTreeMap::new(),
received_reliably_before: BTreeMap::new(),
};
new_self.update_keep_limits(topic_qos);
new_self
}
pub fn update_keep_limits(&mut self, qos: &QosPolicies) {
let min_keep_samples = qos
.history()
.unwrap_or(History::KeepLast { depth: 1 });
let max_keep_samples = qos
.resource_limits()
.unwrap_or(ResourceLimits {
max_samples: 64,
max_instances: 64,
max_samples_per_instance: 64,
})
.max_samples;
let max_keep_samples = match min_keep_samples {
History::KeepLast { depth: n } if n > max_keep_samples => n,
_ => max_keep_samples,
};
self.min_keep_samples = max(min_keep_samples, self.min_keep_samples);
self.max_keep_samples = max(max_keep_samples, self.max_keep_samples);
}
pub fn mark_reliably_received_before(&mut self, writer: GUID, sn: SequenceNumber) -> bool {
let prev_sn = self.received_reliably_before.insert(writer, sn);
prev_sn.unwrap_or(SequenceNumber::new(1)) < sn
}
pub fn get_change(&self, instant: &Timestamp) -> Option<&CacheChange> {
self.changes.get(instant)
}
pub fn add_change(&mut self, instant: &Timestamp, cache_change: CacheChange) {
self
.add_change_internal(instant, cache_change)
.map(|cc_back| {
warn!(
"DDSCache insert failed topic={:?} cache_change={:?}",
self.topic_name, cc_back
);
});
}
fn add_change_internal(
&mut self,
instant: &Timestamp,
cache_change: CacheChange,
) -> Option<CacheChange> {
let semi_random_number = i64::from(cache_change.sequence_number) as usize;
let modulus = 64;
if modulus == 0 || semi_random_number % modulus == 0 {
debug!("Garbage collecting topic {}", self.topic_name);
self.remove_changes_before(Timestamp::ZERO);
}
if let Some(old_instant) = self.find_by_sn(&cache_change) {
trace!(
"add_change: discarding duplicate {:?} from {:?}. old timestamp = {:?}, new = {:?}",
cache_change.sequence_number,
cache_change.writer_guid,
old_instant,
instant,
);
None
} else {
self.insert_sn(*instant, &cache_change);
self.changes.insert(*instant, cache_change).map(|old_cc| {
error!("DDSHistoryCache already contained element with key {instant:?} !!!");
self.remove_sn(&old_cc);
old_cc
})
}
}
fn find_by_sn(&self, cc: &CacheChange) -> Option<Timestamp> {
self
.sequence_numbers
.get(&cc.writer_guid)
.and_then(|snm| snm.get(&cc.sequence_number))
.copied()
}
fn insert_sn(&mut self, instant: Timestamp, cc: &CacheChange) {
self
.sequence_numbers
.entry(cc.writer_guid)
.or_default()
.insert(cc.sequence_number, instant);
}
pub fn get_changes_in_range_best_effort<'a>(
&'a self,
start_instant: Timestamp,
end_instant: Timestamp,
) -> ChangesInRangeBestEffort<'a, impl FnMut(BestEffortMap<'a>) -> ChangesItem<'a>> {
let start_instant = if start_instant <= end_instant {
start_instant
} else {
error!(
"get_changes_in_range_best_effort: Did my clock jump backwards? start={start_instant:?} \
end={end_instant:?}"
);
end_instant
};
self
.changes
.range((Excluded(start_instant), Included(end_instant)))
.map(|(i, c)| (*i, c))
}
pub fn get_changes_in_range_reliable<'a, 'b>(
&'a self,
last_read_sn: &'b BTreeMap<GUID, SequenceNumber>,
) -> ChangesInRangeReliable<
'a,
impl FnMut(ReliableFlat<'a>) -> ReliableFlatItem<'a> + use<'a, 'b>,
impl FnMut(ReliableMap<'a>) -> Option<ChangesItem<'a>>,
> {
self
.sequence_numbers
.iter()
.flat_map(|(guid, sn_map)| {
let lower_bound_exc = last_read_sn
.get(guid)
.cloned()
.unwrap_or(SequenceNumber::zero());
let upper_bound_exc = self.reliable_before(*guid);
let upper_bound_exc = max(upper_bound_exc, lower_bound_exc.plus_1());
sn_map.range((Excluded(lower_bound_exc), Excluded(upper_bound_exc)))
}) .filter_map(|(_sn, t)| self.get_change(t).map(|cc| (*t, cc)))
}
pub fn get_changes_in_range<'a>(
&'a self,
reliable: bool,
latest_instant: Timestamp,
last_read_sn: &'a BTreeMap<GUID, SequenceNumber>,
) -> impl Iterator<Item = (Timestamp, &'a CacheChange)> {
if reliable {
ChangesInRange::Reliable(self.get_changes_in_range_reliable(last_read_sn))
} else {
ChangesInRange::BestEffort(
self.get_changes_in_range_best_effort(latest_instant, Timestamp::now()),
)
}
}
fn reliable_before(&self, writer: GUID) -> SequenceNumber {
self
.received_reliably_before
.get(&writer)
.cloned()
.unwrap_or(SequenceNumber::default())
}
fn remove_sn(&mut self, cc: &CacheChange) {
let mut emptied = false;
self.sequence_numbers.entry(cc.writer_guid).and_modify(|s| {
s.remove(&cc.sequence_number);
emptied = s.is_empty();
});
if emptied {
self.sequence_numbers.remove(&cc.writer_guid);
}
}
pub fn remove_changes_before(&mut self, remove_before: Timestamp) {
let sample_count = self.changes.len();
let must_remove_count = sample_count.saturating_sub(self.max_keep_samples as usize);
let may_remove_count = match self.min_keep_samples {
History::KeepAll => must_remove_count,
History::KeepLast { depth } => max(
sample_count.saturating_sub(depth as usize),
must_remove_count,
),
};
let oldest_timestamp_to_retain_opt: Option<Timestamp> = self
.changes
.keys()
.enumerate()
.skip_while(|(i, ts)|
*i < must_remove_count
|| (**ts < remove_before && *i < may_remove_count))
.map(|(_, ts)| ts) .next() .copied();
let to_retain = if let Some(split_key) = oldest_timestamp_to_retain_opt {
self.changes.split_off(&split_key)
} else {
BTreeMap::new()
};
let to_remove = std::mem::replace(&mut self.changes, to_retain);
to_remove.values().for_each(|r| self.remove_sn(r));
let reallocate_timeout = crate::Duration::from_secs(5);
let now = Timestamp::now();
let reallocate_limit = max(now - reallocate_timeout, self.changes_reallocated_up_to);
self
.changes
.range_mut((
Excluded(self.changes_reallocated_up_to),
Included(reallocate_limit),
))
.for_each(|(_, cc)| cc.reallocate());
self.changes_reallocated_up_to = reallocate_limit;
}
pub fn topic_name(&self) -> String {
self.topic_name.clone()
}
}
type ChangesInRangeBestEffort<'a, F> = iter::Map<btree_map::Range<'a, Timestamp, CacheChange>, F>;
type ChangesInRangeReliable<'a, F1, F2> = iter::FilterMap<
iter::FlatMap<
btree_map::Iter<'a, GUID, BTreeMap<SequenceNumber, Timestamp>>,
btree_map::Range<'a, SequenceNumber, Timestamp>,
F1,
>,
F2,
>;
type BestEffortMap<'a> = (&'a Timestamp, &'a CacheChange);
type ReliableMap<'a> = (&'a SequenceNumber, &'a Timestamp);
type ReliableFlat<'a> = (&'a GUID, &'a BTreeMap<SequenceNumber, Timestamp>);
type ReliableFlatItem<'a> = btree_map::Range<'a, SequenceNumber, Timestamp>;
type ChangesItem<'a> = (Timestamp, &'a CacheChange);
pub(crate) enum ChangesInRange<'a, F, F1, F2> {
BestEffort(ChangesInRangeBestEffort<'a, F>),
Reliable(ChangesInRangeReliable<'a, F1, F2>),
}
impl<'a, F, F1, F2> Iterator for ChangesInRange<'a, F, F1, F2>
where
F: FnMut(BestEffortMap<'a>) -> ChangesItem<'a>,
F1: FnMut(ReliableFlat<'a>) -> ReliableFlatItem<'a>,
F2: FnMut(ReliableMap<'a>) -> Option<ChangesItem<'a>>,
{
type Item = ChangesItem<'a>;
fn next(&mut self) -> Option<Self::Item> {
match self {
Self::Reliable(r) => r.next(),
Self::BestEffort(b) => b.next(),
}
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, RwLock},
thread,
};
use super::DDSCache;
use crate::{
dds::{
ddsdata::DDSData, qos::QosPolicies, typedesc::TypeDesc, with_key::datawriter::WriteOptions,
},
messages::submessages::elements::serialized_payload::SerializedPayload,
structure::{cache_change::CacheChange, guid::GUID, sequence_number::SequenceNumber},
};
#[test]
fn create_dds_cache_and_topic_cache() {
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
let topic_name = String::from("ImJustATopic");
let qos = QosPolicies::qos_none();
let topic_cache_handle = dds_cache.write().unwrap().add_new_topic(
topic_name,
TypeDesc::new("IDontKnowIfThisIsNecessary".to_string()),
&qos,
);
let change1 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(1),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
topic_cache_handle
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change1);
let topic_cache_handle2 = topic_cache_handle.clone();
thread::spawn(move || {
let change2 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(2),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
let change3 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(3),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
topic_cache_handle2
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change2);
topic_cache_handle2
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change3);
})
.join()
.unwrap();
assert_eq!(
topic_cache_handle
.lock()
.unwrap()
.get_changes_in_range_best_effort(
crate::Timestamp::now() - crate::Duration::from_secs(23),
crate::Timestamp::now()
)
.count(),
3
);
}
}