use std::{
cmp::max,
collections::{BTreeMap, HashMap},
ops::Bound::{Excluded, Included},
};
#[allow(unused_imports)]
use log::{debug, error, info, trace};
use crate::{
dds::{
data_types::GUID,
qos::{policy::ResourceLimits, QosPolicies, QosPolicyBuilder},
typedesc::TypeDesc,
},
structure::{sequence_number::SequenceNumber, time::Timestamp},
};
use super::cache_change::CacheChange;
#[derive(Debug, Default)]
pub struct DDSCache {
topic_caches: HashMap<String, TopicCache>,
}
impl DDSCache {
pub fn new() -> Self {
Self::default()
}
pub fn add_new_topic(&mut self, topic_name: String, topic_data_type: TypeDesc) {
self
.topic_caches
.entry(topic_name.clone())
.or_insert_with(|| TopicCache::new(topic_name, topic_data_type));
}
#[allow(dead_code)]
pub fn remove_topic(&mut self, topic_name: &str) {
if self.topic_caches.contains_key(topic_name) {
self.topic_caches.remove(topic_name);
}
}
pub fn topic_get_change(&self, topic_name: &str, instant: &Timestamp) -> Option<&CacheChange> {
self
.topic_caches
.get(topic_name)
.and_then(|tc| tc.get_change(instant))
}
pub fn topic_remove_change(
&mut self,
topic_name: &str,
instant: &Timestamp,
) -> Option<CacheChange> {
if let Some(tc) = self.topic_caches.get_mut(topic_name) {
tc.remove_change(instant)
} else {
error!(
"topic_remove_change: Topic {:?} is not in DDSCache",
topic_name
);
None
}
}
pub fn topic_remove_before(&mut self, topic_name: &str, instant: Timestamp) {
match self.topic_caches.get_mut(topic_name) {
Some(tc) => tc.remove_changes_before(instant),
None => {
error!(
"topic_remove_before: topic: {:?} is not in DDSCache",
topic_name
);
}
}
}
pub fn topic_get_changes_in_range(
&self,
topic_name: &str,
start_instant: &Timestamp,
end_instant: &Timestamp,
) -> Box<dyn Iterator<Item = (Timestamp, &CacheChange)> + '_> {
match self.topic_caches.get(topic_name) {
Some(tc) => Box::new(tc.get_changes_in_range(start_instant, end_instant)),
None => Box::new(vec![].into_iter()),
}
}
pub fn add_change(&mut self, topic_name: &str, instant: &Timestamp, cache_change: CacheChange) {
match self.topic_caches.get_mut(topic_name) {
Some(tc) => tc.add_change(instant, cache_change),
None => {
error!(
"to_topic_add_change: Topic: {:?} is not in DDSCache",
topic_name
);
}
}
}
}
#[derive(Debug)]
pub struct TopicCache {
topic_name: String,
#[allow(dead_code)] topic_data_type: TypeDesc,
topic_qos: QosPolicies,
history_cache: DDSHistoryCache,
}
impl TopicCache {
pub fn new(topic_name: String, topic_data_type: TypeDesc) -> Self {
Self {
topic_name,
topic_data_type,
topic_qos: QosPolicyBuilder::new().build(),
history_cache: DDSHistoryCache::new(),
}
}
pub fn get_change(&self, instant: &Timestamp) -> Option<&CacheChange> {
self.history_cache.get_change(instant)
}
pub fn add_change(&mut self, instant: &Timestamp, cache_change: CacheChange) {
self
.history_cache
.add_change(instant, cache_change)
.map(|cc_back| {
debug!(
"DDSCache insert failed topic={:?} cache_change={:?}",
self.topic_name, cc_back
);
});
}
pub fn get_changes_in_range(
&self,
start_instant: &Timestamp,
end_instant: &Timestamp,
) -> Box<dyn Iterator<Item = (Timestamp, &CacheChange)> + '_> {
self
.history_cache
.get_range_of_changes(start_instant, end_instant)
}
pub fn remove_change(&mut self, instant: &Timestamp) -> Option<CacheChange> {
self.history_cache.remove_change(instant)
}
pub fn remove_changes_before(&mut self, instant: Timestamp) {
let max_keep_samples = self
.topic_qos
.resource_limits()
.unwrap_or(ResourceLimits {
max_samples: 1024,
max_instances: 1024,
max_samples_per_instance: 64,
})
.max_samples;
let remove_count = self.history_cache.changes.len() as i32 - max_keep_samples as i32;
let split_key = *self
.history_cache
.changes
.keys()
.take(max(0, remove_count) as usize + 1)
.last()
.map_or(&instant, |lim| max(lim, &instant));
self.history_cache.remove_changes_before(split_key);
}
}
#[derive(Debug, Default)]
pub struct DDSHistoryCache {
pub(crate) changes: BTreeMap<Timestamp, CacheChange>,
sequence_numbers: BTreeMap<GUID, BTreeMap<SequenceNumber, Timestamp>>,
}
impl DDSHistoryCache {
pub fn new() -> Self {
Self::default()
}
fn find_by_sn(&self, cc: &CacheChange) -> Option<Timestamp> {
self
.sequence_numbers
.get(&cc.writer_guid)
.and_then(|snm| snm.get(&cc.sequence_number))
.copied()
}
fn insert_sn(&mut self, instant: Timestamp, cc: &CacheChange) {
self
.sequence_numbers
.entry(cc.writer_guid)
.or_insert_with(BTreeMap::new)
.insert(cc.sequence_number, instant);
}
fn remove_sn(&mut self, cc: &CacheChange) {
let mut emptied = false;
self.sequence_numbers.entry(cc.writer_guid).and_modify(|s| {
s.remove(&cc.sequence_number);
emptied = s.is_empty();
});
if emptied {
self.sequence_numbers.remove(&cc.writer_guid);
}
}
pub fn add_change(
&mut self,
instant: &Timestamp,
cache_change: CacheChange,
) -> Option<CacheChange> {
if let Some(old_instant) = self.find_by_sn(&cache_change) {
debug!(
"add_change: discarding duplicate {:?} from {:?}. old timestamp = {:?}, new = {:?}",
cache_change.sequence_number, cache_change.writer_guid, old_instant, instant,
);
Some(cache_change)
} else {
self.insert_sn(*instant, &cache_change);
self.changes.insert(*instant, cache_change).map(|old_cc| {
error!(
"DDSHistoryCache already contained element with key {:?} !!!",
instant
);
self.remove_sn(&old_cc);
old_cc
})
}
}
pub fn get_change(&self, instant: &Timestamp) -> Option<&CacheChange> {
self.changes.get(instant)
}
pub fn get_range_of_changes(
&self,
start_instant: &Timestamp,
end_instant: &Timestamp,
) -> Box<dyn Iterator<Item = (Timestamp, &CacheChange)> + '_> {
Box::new(
self
.changes
.range((Excluded(start_instant), Included(end_instant)))
.map(|(i, c)| (*i, c)),
)
}
pub fn remove_change(&mut self, instant: &Timestamp) -> Option<CacheChange> {
self.changes.remove(instant).map(|cc| {
self.remove_sn(&cc);
cc
})
}
pub fn remove_changes_before(&mut self, instant: Timestamp) {
let to_retain = self.changes.split_off(&instant);
let to_remove = std::mem::replace(&mut self.changes, to_retain);
for r in to_remove.values() {
self.remove_sn(r);
}
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, RwLock},
thread,
};
use super::DDSCache;
use crate::{
dds::{ddsdata::DDSData, typedesc::TypeDesc, with_key::datawriter::WriteOptions},
messages::submessages::submessage_elements::serialized_payload::SerializedPayload,
structure::{cache_change::CacheChange, guid::GUID, sequence_number::SequenceNumber},
};
#[test]
fn create_dds_cache() {
let cache = Arc::new(RwLock::new(DDSCache::new()));
let topic_name = String::from("ImJustATopic");
let change1 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(1),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
cache.write().unwrap().add_new_topic(
topic_name.clone(),
TypeDesc::new("IDontKnowIfThisIsNecessary".to_string()),
);
cache
.write()
.unwrap()
.add_change(&topic_name, &crate::Timestamp::now(), change1);
let pointer_to_cache_1 = cache.clone();
thread::spawn(move || {
let topic_name = String::from("ImJustATopic");
let cahange2 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(2),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
pointer_to_cache_1.write().unwrap().add_change(
&topic_name,
&crate::Timestamp::now(),
cahange2,
);
let cahange3 = CacheChange::new(
GUID::GUID_UNKNOWN,
SequenceNumber::new(3),
WriteOptions::default(),
DDSData::new(SerializedPayload::default()),
);
pointer_to_cache_1.write().unwrap().add_change(
&topic_name,
&crate::Timestamp::now(),
cahange3,
);
})
.join()
.unwrap();
cache
.read()
.unwrap()
.topic_get_change(&topic_name, &crate::Timestamp::now());
assert_eq!(
cache
.read()
.unwrap()
.topic_get_changes_in_range(
&topic_name,
&(crate::Timestamp::now() - crate::Duration::from_secs(23)),
&crate::Timestamp::now()
)
.count(),
3
);
}
}