use std::{
cmp::max,
collections::{BTreeMap, HashMap},
ops::Bound::{Excluded, Included},
sync::{Arc, Mutex},
};
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
use crate::{
create_error_internal,
dds::{
qos::{
policy::{History, ResourceLimits},
QosPolicies,
},
typedesc::TypeDesc,
CreateError, CreateResult,
},
structure::{sequence_number::SequenceNumber, time::Timestamp},
GUID,
};
use super::cache_change::CacheChange;
#[derive(Debug, Default)]
pub struct DDSCache {
topic_caches: HashMap<String, Arc<Mutex<TopicCache>>>,
}
impl DDSCache {
pub fn new() -> Self {
Self::default()
}
pub(crate) fn add_new_topic(
&mut self,
topic_name: String,
topic_data_type: TypeDesc,
qos: &QosPolicies,
) -> Arc<Mutex<TopicCache>> {
let topic_cache_handle = self
.topic_caches
.entry(topic_name.clone())
.and_modify(|tc| tc.lock().unwrap().update_keep_limits(qos))
.or_insert(Arc::new(Mutex::new(TopicCache::new(
topic_name,
topic_data_type,
qos,
))));
topic_cache_handle.clone()
}
pub(crate) fn get_existing_topic_cache(
&self,
topic_name: &str,
) -> CreateResult<Arc<Mutex<TopicCache>>> {
match self.topic_caches.get(topic_name) {
Some(tc) => Ok(tc.clone()),
None => create_error_internal!("Topic cache for topic {topic_name} not found in DDS cache"),
}
}
#[allow(dead_code)]
pub fn remove_topic(&mut self, topic_name: &str) {
if self.topic_caches.contains_key(topic_name) {
self.topic_caches.remove(topic_name);
}
}
pub fn garbage_collect(&mut self) {
for tc in self.topic_caches.values_mut() {
let mut tc = tc.lock().unwrap();
if let Some((last_timestamp, _)) = tc.changes.iter().next_back() {
if *last_timestamp > tc.changes_reallocated_up_to {
tc.remove_changes_before(Timestamp::ZERO);
}
}
}
}
}
#[derive(Debug)]
pub(crate) struct TopicCache {
topic_name: String,
#[allow(dead_code)] topic_data_type: TypeDesc,
#[allow(dead_code)]
topic_qos: QosPolicies,
min_keep_samples: History,
max_keep_samples: i32,
changes: BTreeMap<Timestamp, CacheChange>,
changes_reallocated_up_to: Timestamp,
sequence_numbers: BTreeMap<GUID, BTreeMap<SequenceNumber, Timestamp>>,
received_reliably_before: BTreeMap<GUID, SequenceNumber>,
}
impl TopicCache {
pub fn new(topic_name: String, topic_data_type: TypeDesc, topic_qos: &QosPolicies) -> Self {
let mut new_self = Self {
topic_name,
topic_data_type,
topic_qos: topic_qos.clone(),
min_keep_samples: History::KeepLast { depth: 1 },
max_keep_samples: 1, changes: BTreeMap::new(),
changes_reallocated_up_to: Timestamp::ZERO,
sequence_numbers: BTreeMap::new(),
received_reliably_before: BTreeMap::new(),
};
new_self.update_keep_limits(topic_qos);
new_self
}
pub fn update_keep_limits(&mut self, qos: &QosPolicies) {
let min_keep_samples = qos
.history()
.unwrap_or(History::KeepLast { depth: 1 });
let max_keep_samples = qos
.resource_limits()
.unwrap_or(ResourceLimits {
max_samples: 64,
max_instances: 64,
max_samples_per_instance: 64,
})
.max_samples;
let max_keep_samples = match min_keep_samples {
History::KeepLast { depth: n } if n > max_keep_samples => n,
_ => max_keep_samples,
};
self.min_keep_samples = max(min_keep_samples, self.min_keep_samples);
self.max_keep_samples = max(max_keep_samples, self.max_keep_samples);
}
pub fn mark_reliably_received_before(&mut self, writer: GUID, sn: SequenceNumber) -> bool {
let prev_sn = self.received_reliably_before.insert(writer, sn);
prev_sn.unwrap_or(SequenceNumber::new(1)) < sn
}
pub fn get_change(&self, instant: &Timestamp) -> Option<&CacheChange> {
self.changes.get(instant)
}
pub fn add_change(&mut self, instant: &Timestamp, cache_change: CacheChange) {
self
.add_change_internal(instant, cache_change)
.map(|cc_back| {
warn!(
"DDSCache insert failed topic={:?} cache_change={:?}",
self.topic_name, cc_back
);
});
}
fn add_change_internal(
&mut self,
instant: &Timestamp,
cache_change: CacheChange,
) -> Option<CacheChange> {
let semi_random_number = i64::from(cache_change.sequence_number) as usize;
let modulus = 64;
if modulus == 0 || semi_random_number % modulus == 0 {
debug!("Garbage collecting topic {}", self.topic_name);
self.remove_changes_before(Timestamp::ZERO);
}
if let Some(old_instant) = self.find_by_sn(&cache_change) {
trace!(
"add_change: discarding duplicate {:?} from {:?}. old timestamp = {:?}, new = {:?}",
cache_change.sequence_number,
cache_change.writer_guid,
old_instant,
instant,
);
None
} else {
self.insert_sn(*instant, &cache_change);
self.changes.insert(*instant, cache_change).map(|old_cc| {
error!(
"DDSHistoryCache already contained element with key {:?} !!!",
instant
);
self.remove_sn(&old_cc);
old_cc
})
}
}
fn find_by_sn(&self, cc: &CacheChange) -> Option<Timestamp> {
self
.sequence_numbers
.get(&cc.writer_guid)
.and_then(|snm| snm.get(&cc.sequence_number))
.copied()
}
fn insert_sn(&mut self, instant: Timestamp, cc: &CacheChange) {
self
.sequence_numbers
.entry(cc.writer_guid)
.or_default()
.insert(cc.sequence_number, instant);
}
pub fn get_changes_in_range_best_effort(
&self,
start_instant: Timestamp,
end_instant: Timestamp,
) -> Box<dyn Iterator<Item = (Timestamp, &CacheChange)> + '_> {
Box::new(
self
.changes
.range((Excluded(start_instant), Included(end_instant)))
.map(|(i, c)| (*i, c)),
)
}
pub fn get_changes_in_range_reliable<'a>(
&'a self,
last_read_sn: &'a BTreeMap<GUID, SequenceNumber>,
) -> Box<dyn Iterator<Item = (Timestamp, &CacheChange)> + 'a> {
Box::new(
self
.sequence_numbers
.iter()
.flat_map(|(guid, sn_map)| {
let lower_bound_exc = last_read_sn
.get(guid)
.cloned()
.unwrap_or(SequenceNumber::zero());
let upper_bound_exc = self.reliable_before(*guid);
let upper_bound_exc = max(upper_bound_exc, lower_bound_exc.plus_1());
sn_map.range((Excluded(lower_bound_exc), Excluded(upper_bound_exc)))
}) .filter_map(|(_sn, t)| self.get_change(t).map(|cc| (*t, cc))),
)
}
fn reliable_before(&self, writer: GUID) -> SequenceNumber {
self
.received_reliably_before
.get(&writer)
.cloned()
.unwrap_or(SequenceNumber::default())
}
fn remove_sn(&mut self, cc: &CacheChange) {
let mut emptied = false;
self.sequence_numbers.entry(cc.writer_guid).and_modify(|s| {
s.remove(&cc.sequence_number);
emptied = s.is_empty();
});
if emptied {
self.sequence_numbers.remove(&cc.writer_guid);
}
}
pub fn remove_changes_before(&mut self, remove_before: Timestamp) {
let sample_count = self.changes.len();
let must_remove_count = sample_count.saturating_sub(self.max_keep_samples as usize);
let may_remove_count = match self.min_keep_samples {
History::KeepAll => must_remove_count,
History::KeepLast { depth } => max(
sample_count.saturating_sub(depth as usize),
must_remove_count,
),
};
let oldest_timestamp_to_retain_opt: Option<Timestamp> = self
.changes
.keys()
.enumerate()
.skip_while(|(i, ts)|
*i < must_remove_count
|| (**ts < remove_before && *i < may_remove_count))
.map(|(_, ts)| ts) .next() .copied();
let to_retain = if let Some(split_key) = oldest_timestamp_to_retain_opt {
self.changes.split_off(&split_key)
} else {
BTreeMap::new()
};
let to_remove = std::mem::replace(&mut self.changes, to_retain);
to_remove.values().for_each(|r| self.remove_sn(r));
let reallocate_timeout = crate::Duration::from_secs(5);
let now = Timestamp::now();
let reallocate_limit = now - reallocate_timeout;
self
.changes
.range_mut((
Excluded(self.changes_reallocated_up_to),
Included(reallocate_limit),
))
.for_each(|(_, cc)| cc.reallocate());
self.changes_reallocated_up_to = reallocate_limit;
}
pub fn topic_name(&self) -> String {
self.topic_name.clone()
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, RwLock},
thread,
};
use super::DDSCache;
use crate::{
dds::{
ddsdata::DDSData, qos::QosPolicies, typedesc::TypeDesc, with_key::datawriter::WriteOptions,
},
messages::submessages::elements::serialized_payload::SerializedPayload,
structure::{cache_change::CacheChange, guid::GUID, sequence_number::SequenceNumber},
};
#[test]
fn create_dds_cache_and_topic_cache() {
let dds_cache = Arc::new(RwLock::new(DDSCache::new()));
let topic_name = String::from("ImJustATopic");
let qos = QosPolicies::qos_none();
let topic_cache_handle = dds_cache.write().unwrap().add_new_topic(
topic_name,
TypeDesc::new("IDontKnowIfThisIsNecessary".to_string()),
&qos,
);
let change1 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(1),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
topic_cache_handle
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change1);
let topic_cache_handle2 = topic_cache_handle.clone();
thread::spawn(move || {
let change2 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(2),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
let change3 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(3),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
topic_cache_handle2
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change2);
topic_cache_handle2
.lock()
.unwrap()
.add_change(&crate::Timestamp::now(), change3);
})
.join()
.unwrap();
assert_eq!(
topic_cache_handle
.lock()
.unwrap()
.get_changes_in_range_best_effort(
crate::Timestamp::now() - crate::Duration::from_secs(23),
crate::Timestamp::now()
)
.count(),
3
);
}
}