use crate::structure::{time::Timestamp, guid::GUID};
use crate::{
dds::traits::key::{Key, Keyed},
};
use crate::dds::with_key::datasample::DataSample;
use crate::dds::sampleinfo::*;
use crate::dds::qos::QosPolicies;
use crate::dds::qos::policy;
use crate::dds::readcondition::ReadCondition;
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::ops::Bound::*;
pub(crate) fn result_ok_as_ref_err_clone<T, E: Clone>(
r: &std::result::Result<T, E>,
) -> std::result::Result<&T, E> {
match *r {
Ok(ref x) => Ok(x),
Err(ref x) => Err(x.clone()),
}
}
pub struct DataSampleCache<D: Keyed> {
qos: QosPolicies,
datasamples: BTreeMap<Timestamp, SampleWithMetaData<D>>,
pub(crate) instance_map: BTreeMap<D::K, InstanceMetaData>,
hash_to_key_map: BTreeMap<u128, D::K>,
}
pub(crate) struct InstanceMetaData {
instance_samples: BTreeSet<Timestamp>,
instance_state: InstanceState,
latest_generation_available: NotAliveGenerationCounts,
last_generation_accessed: NotAliveGenerationCounts,
}
struct SampleWithMetaData<D: Keyed> {
generation_counts: NotAliveGenerationCounts,
writer_guid: GUID,
source_timestamp: Option<Timestamp>,
sample_has_been_read: bool,
sample: Result<D, D::K>,
}
impl<D> SampleWithMetaData<D>
where
D: Keyed,
<D as Keyed>::K: Key,
{
pub fn get_key(&self) -> D::K {
match &self.sample {
Ok(d) => d.get_key(),
Err(k) => k.clone(),
}
}
}
impl<D> DataSampleCache<D>
where
D: Keyed,
<D as Keyed>::K: Key,
{
pub fn new(qos: QosPolicies) -> DataSampleCache<D> {
DataSampleCache {
qos,
datasamples: BTreeMap::new(),
instance_map: BTreeMap::new(),
hash_to_key_map: BTreeMap::new(),
}
}
pub fn add_sample(
&mut self,
new_sample: Result<D, D::K>,
writer_guid: GUID,
receive_timestamp: Timestamp,
source_timestamp: Option<Timestamp>,
) {
let instance_key = match &new_sample {
Ok(d) => d.get_key(),
Err(k) => k.clone(),
};
let new_instance_state = match new_sample {
Ok(_) => InstanceState::Alive,
Err(_) => InstanceState::NotAlive_Disposed,
};
let instance_metadata = match self.instance_map.get_mut(&instance_key) {
Some(imd) => imd,
None => {
let imd = InstanceMetaData {
instance_samples: BTreeSet::new(),
instance_state: new_instance_state,
latest_generation_available: NotAliveGenerationCounts::zero(),
last_generation_accessed: NotAliveGenerationCounts::sub_zero(),
};
self.instance_map.insert(instance_key.clone(), imd);
self
.hash_to_key_map
.insert(instance_key.into_hash_key(), instance_key.clone());
self.instance_map.get_mut(&instance_key).unwrap()
}
};
instance_metadata
.instance_samples
.insert(receive_timestamp.clone());
match (instance_metadata.instance_state, new_instance_state) {
(InstanceState::Alive, _) => (),
(InstanceState::NotAlive_Disposed, InstanceState::Alive) =>
{
instance_metadata
.latest_generation_available
.disposed_generation_count += 1
}
(InstanceState::NotAlive_Disposed, _) => (),
(InstanceState::NotAlive_NoWriters, InstanceState::Alive) =>
{
instance_metadata
.latest_generation_available
.no_writers_generation_count += 1
}
(InstanceState::NotAlive_NoWriters, _) => (),
}
instance_metadata.instance_state = new_instance_state;
self
.datasamples
.insert(
receive_timestamp,
SampleWithMetaData {
generation_counts: instance_metadata.latest_generation_available,
writer_guid,
source_timestamp,
sample_has_been_read: false,
sample: new_sample,
},
)
.map_or_else(
|| (),
|_already_existed| {
panic!(
"Tried to add duplicate datasample with the same key {:?}",
receive_timestamp
)
},
);
let sample_keep_history_limit: Option<i32> = match self.qos.history() {
Some(policy::History::KeepAll) => None,
Some(policy::History::KeepLast { depth }) => Some(depth),
None => Some(1),
};
let sample_keep_resource_limit = if let Some(policy::ResourceLimits {
max_samples: _,
max_instances: _,
max_samples_per_instance,
}) = self.qos.resource_limits
{
Some(max_samples_per_instance)
} else {
None
};
if let Some(instance_keep_count) = sample_keep_history_limit.or(sample_keep_resource_limit) {
let remove_count = instance_metadata.instance_samples.len() as i32 - instance_keep_count;
if remove_count > 0 {
let keys_to_remove: Vec<_> = instance_metadata
.instance_samples
.iter()
.take(remove_count as usize)
.cloned()
.collect();
for k in keys_to_remove {
instance_metadata.instance_samples.remove(&k);
self.datasamples.remove(&k);
}
}
}
}
pub fn select_keys_for_access(&self, rc: ReadCondition) -> Vec<(Timestamp, D::K)> {
self
.datasamples
.iter()
.filter_map(|(ts, dsm)| {
let key = dsm.get_key();
if self.sample_selector(&rc, self.instance_map.get(&key).unwrap(), &dsm) {
Some((ts.clone(), key.clone()))
} else {
None
}
})
.collect()
}
pub fn select_instance_keys_for_access(
&self,
instance: D::K,
rc: ReadCondition,
) -> Vec<(Timestamp, D::K)> {
match self.instance_map.get(&instance) {
None => Vec::new(),
Some(imd) => imd
.instance_samples
.iter()
.filter_map(|ts| {
if let Some(ds) = self.datasamples.get(&ts) {
if self.sample_selector(&rc, &imd, ds) {
Some((ts.clone(), instance.clone()))
} else {
None
}
} else {
None
}
})
.collect(),
}
}
fn sample_selector(
&self,
rc: &ReadCondition,
imd: &InstanceMetaData,
d: &SampleWithMetaData<D>,
) -> bool {
(*rc.sample_state_mask() == SampleState::any()
|| rc.sample_state_mask()
.contains( if d.sample_has_been_read { SampleState::Read } else {SampleState::NotRead} ) )
&&
(*rc.view_state_mask() == ViewState::any()
||
{ let sample_gen = d.generation_counts.total();
let last_accessed = imd.last_generation_accessed.total();
let is_new = sample_gen > last_accessed;
rc.view_state_mask()
.contains( if is_new { ViewState::New} else { ViewState::NotNew } )
}
)
&&
(*rc.instance_state_mask() == InstanceState::any()
|| rc.instance_state_mask()
.contains( imd.instance_state )
)
}
fn make_sample_info(
dswm: &SampleWithMetaData<D>,
imd: &InstanceMetaData,
sample_rank: usize,
mrs_generations: i32,
mrsic_generations: i32,
) -> SampleInfo {
SampleInfo {
sample_state: if dswm.sample_has_been_read {
SampleState::Read
} else {
SampleState::NotRead
},
view_state: if dswm.generation_counts.total() > imd.last_generation_accessed.total() {
ViewState::New
} else {
ViewState::NotNew
},
instance_state: imd.instance_state,
generation_counts: dswm.generation_counts.clone(),
sample_rank: sample_rank as i32,
generation_rank: mrsic_generations - dswm.generation_counts.total(),
absolute_generation_rank: mrs_generations - dswm.generation_counts.total(),
source_timestamp: dswm.source_timestamp.clone(),
publication_handle: dswm.writer_guid,
}
}
fn record_instance_generation_viewed(
instance_generations: &mut HashMap<D::K, NotAliveGenerationCounts>,
accessed_generations: NotAliveGenerationCounts,
instance_key: &D::K,
) {
instance_generations
.entry(instance_key.clone())
.and_modify(|old_gens| {
if accessed_generations.total() > old_gens.total() {
*old_gens = accessed_generations
}
})
.or_insert(accessed_generations);
}
fn mark_instances_viewed(
&mut self,
instance_generations: HashMap<D::K, NotAliveGenerationCounts>,
) {
for (inst, gen) in instance_generations.iter() {
if let Some(imd) = self.instance_map.get_mut(inst) {
imd.last_generation_accessed = *gen;
} else {
panic!("Instance disappeared!?!!1!")
}
}
}
pub fn read_by_keys(&mut self, keys: &[(Timestamp, D::K)]) -> Vec<DataSample<&D>> {
let len = keys.len();
let mut result = Vec::with_capacity(len);
if len == 0 {
return result;
}
let mut instance_generations: HashMap<D::K, NotAliveGenerationCounts> = HashMap::new();
let mrsic_total = self
.instance_map
.get(&keys.last().unwrap().1)
.unwrap()
.latest_generation_available
.total();
let mrs_total = self
.datasamples
.iter()
.next_back()
.unwrap()
.1
.generation_counts
.total();
let mut sample_infos = VecDeque::with_capacity(len);
for (index, (ts, key)) in keys.iter().enumerate() {
let dswm = self.datasamples.get_mut(ts).unwrap();
let imd = self.instance_map.get(key).unwrap();
let sample_info = Self::make_sample_info(dswm, imd, len - index - 1, mrs_total, mrsic_total);
dswm.sample_has_been_read = true;
Self::record_instance_generation_viewed(
&mut instance_generations,
dswm.generation_counts,
key,
);
sample_infos.push_back(sample_info);
}
self.mark_instances_viewed(instance_generations);
for (ts, _key) in keys.iter() {
let sample_info = sample_infos.pop_front().unwrap();
let sample: &std::result::Result<D, D::K> = &self.datasamples.get(ts).unwrap().sample;
result.push(DataSample::new(
sample_info,
result_ok_as_ref_err_clone(sample),
));
}
result
}
pub fn take_by_keys(&mut self, keys: &[(Timestamp, D::K)]) -> Vec<DataSample<D>> {
let len = keys.len();
let mut result = Vec::with_capacity(len);
if len == 0 {
return result;
}
let mut instance_generations: HashMap<D::K, NotAliveGenerationCounts> = HashMap::new();
let mrsic_total = self
.instance_map
.get(&keys.last().unwrap().1)
.unwrap()
.latest_generation_available
.total();
let mrs_total = self
.datasamples
.iter()
.next_back()
.unwrap()
.1
.generation_counts
.total();
for (index, (ts, key)) in keys.iter().enumerate() {
let dswm = self.datasamples.remove(ts).unwrap();
let imd = self.instance_map.get(key).unwrap();
let sample_info = Self::make_sample_info(&dswm, imd, len - index - 1, mrs_total, mrsic_total);
Self::record_instance_generation_viewed(
&mut instance_generations,
dswm.generation_counts,
key,
);
result.push(DataSample::new(sample_info, dswm.sample));
}
self.mark_instances_viewed(instance_generations);
result
}
pub fn read_bare_by_keys(
&mut self,
keys: &[(Timestamp, D::K)],
) -> Vec<std::result::Result<&D, D::K>> {
let len = keys.len();
let mut result = Vec::with_capacity(len);
if len == 0 {
return result;
}
let mut instance_generations: HashMap<D::K, NotAliveGenerationCounts> = HashMap::new();
for (ts, key) in keys.iter() {
let dswm = self.datasamples.get_mut(ts).unwrap();
dswm.sample_has_been_read = true;
Self::record_instance_generation_viewed(
&mut instance_generations,
dswm.generation_counts,
key,
);
}
self.mark_instances_viewed(instance_generations);
for (ts, _key) in keys.iter() {
result.push(result_ok_as_ref_err_clone(
&self.datasamples.get(ts).unwrap().sample,
));
}
result
}
pub fn take_bare_by_keys(
&mut self,
keys: &[(Timestamp, D::K)],
) -> Vec<std::result::Result<D, D::K>> {
let len = keys.len();
let mut result = Vec::with_capacity(len);
if len == 0 {
return result;
}
let mut instance_generations: HashMap<D::K, NotAliveGenerationCounts> = HashMap::new();
for (ts, key) in keys.iter() {
let dswm = self.datasamples.remove(ts).unwrap();
Self::record_instance_generation_viewed(
&mut instance_generations,
dswm.generation_counts,
key,
);
result.push(dswm.sample);
}
self.mark_instances_viewed(instance_generations);
result
}
pub fn get_key_by_hash(&self, key_hash: u128) -> Option<D::K> {
self.hash_to_key_map.get(&key_hash).map(|key| key.clone())
}
pub fn get_next_key(&self, key: &D::K) -> Option<D::K> {
self
.instance_map
.range((Excluded(key), Unbounded))
.map(|(k, _)| k.clone())
.next()
}
pub fn set_qos_policy(&mut self, qos: QosPolicies) {
self.qos = qos
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
structure::{time::Timestamp},
};
use crate::dds::ddsdata::DDSData;
use crate::dds::traits::key::Keyed;
use crate::test::random_data::*;
#[test]
fn dsc_empty_qos() {
let qos = QosPolicies::qos_none();
let mut datasample_cache = DataSampleCache::<RandomData>::new(qos);
let timestamp = Timestamp::now();
let data = RandomData {
a: 4,
b: "Fobar".to_string(),
};
let org_ddsdata = DDSData::from(&data, Some(timestamp));
let key = data.get_key().clone();
datasample_cache.add_sample(Ok(data.clone()), GUID::GUID_UNKNOWN, timestamp, None);
let samples = datasample_cache.read_by_keys(&[(timestamp, key)]);
assert_eq!(samples.len(), 1);
match &samples.get(0).unwrap().value() {
Ok(huh) => {
let ddssample = DDSData::from(huh, Some(timestamp));
assert_eq!(org_ddsdata, ddssample);
}
_ => (),
}
}
}