#[cfg(feature = "track-cache-size")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{collections::VecDeque, time::Duration};
#[cfg(feature = "track-cache-size")]
pub static GLOBAL_CACHE_SIZE: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone, Debug, PartialEq)]
pub struct Step<T> {
pub signal: &'static str,
pub value: T,
pub timestamp: Duration,
}
impl<T> Step<T> {
pub fn new(signal: &'static str, value: T, timestamp: Duration) -> Self {
Step {
signal,
value,
timestamp,
}
}
}
pub trait RingBufferTrait {
type Value;
type Container: IntoIterator;
type Iter<'a>: Iterator<Item = &'a Step<Self::Value>>
where
Self: 'a;
type IterMut<'a>: Iterator<Item = &'a mut Step<Self::Value>>
where
Self: 'a;
fn new() -> Self;
fn is_empty(&self) -> bool;
fn len(&self) -> usize;
fn get_back(&self) -> Option<&Step<Self::Value>>;
fn get_front(&self) -> Option<&Step<Self::Value>>;
fn pop_front(&mut self) -> Option<Step<Self::Value>>;
fn pop_back(&mut self) -> Option<Step<Self::Value>>;
fn add_step(&mut self, step: Step<Self::Value>);
fn update_step(&mut self, step: Step<Self::Value>) -> bool;
fn prune(&mut self, max_age: Duration);
fn iter<'a>(&'a self) -> Self::Iter<'a>;
#[cfg(feature = "track-cache-size")]
fn set_tracked(&mut self, tracked: bool);
}
#[derive(Clone, Debug)]
pub struct RingBuffer<T> {
steps: VecDeque<Step<T>>,
#[cfg(feature = "track-cache-size")]
is_tracked: bool,
}
impl<T> Default for RingBuffer<T>
where
T: Copy,
{
fn default() -> Self {
Self::new()
}
}
impl<T> RingBuffer<T>
where
T: Copy,
{
pub fn new() -> Self {
RingBuffer {
steps: VecDeque::new(),
#[cfg(feature = "track-cache-size")]
is_tracked: false, }
}
pub fn add_step(&mut self, step: Step<T>) {
self.steps.push_back(step);
#[cfg(feature = "track-cache-size")]
if self.is_tracked {
GLOBAL_CACHE_SIZE.fetch_add(1, Ordering::Relaxed);
}
}
pub fn update_step(&mut self, step: Step<T>) -> bool {
self.steps
.binary_search_by(|s| s.timestamp.cmp(&step.timestamp))
.map(|index| {
self.steps[index] = step;
})
.is_ok()
}
pub fn iter(&self) -> std::collections::vec_deque::Iter<'_, Step<T>> {
self.steps.iter()
}
}
impl<T> RingBufferTrait for RingBuffer<T>
where
T: Copy,
{
type Value = T;
type Container = VecDeque<Step<T>>;
type Iter<'a>
= std::collections::vec_deque::Iter<'a, Step<T>>
where
Self: 'a;
type IterMut<'a>
= std::collections::vec_deque::IterMut<'a, Step<T>>
where
Self: 'a;
fn new() -> Self {
Self::new()
}
fn is_empty(&self) -> bool {
self.steps.is_empty()
}
fn len(&self) -> usize {
self.steps.len()
}
fn get_back(&self) -> Option<&Step<T>> {
self.steps.back()
}
fn get_front(&self) -> Option<&Step<T>> {
self.steps.front()
}
fn pop_front(&mut self) -> Option<Step<Self::Value>> {
let res = self.steps.pop_front();
#[cfg(feature = "track-cache-size")]
if res.is_some() && self.is_tracked {
GLOBAL_CACHE_SIZE.fetch_sub(1, Ordering::Relaxed);
}
res
}
fn pop_back(&mut self) -> Option<Step<Self::Value>> {
let res = self.steps.pop_back();
#[cfg(feature = "track-cache-size")]
if res.is_some() && self.is_tracked {
GLOBAL_CACHE_SIZE.fetch_sub(1, Ordering::Relaxed);
}
res
}
fn add_step(&mut self, step: Step<T>) {
self.add_step(step)
}
fn update_step(&mut self, step: Step<Self::Value>) -> bool {
self.update_step(step)
}
fn prune(&mut self, max_age: Duration) {
let current_time = match self.get_back() {
Some(step) => step.timestamp,
None => return, };
let max_age = current_time.saturating_sub(max_age);
#[cfg(feature = "track-cache-size")]
{
let mut removed = 0;
while let Some(front_step) = self.steps.front() {
if front_step.timestamp < max_age {
removed += 1;
self.steps.pop_front();
} else {
break;
}
}
if self.is_tracked && removed > 0 {
GLOBAL_CACHE_SIZE.fetch_sub(removed, Ordering::Relaxed);
}
}
#[cfg(not(feature = "track-cache-size"))]
{
while let Some(front_step) = self.steps.front() {
if front_step.timestamp < max_age {
self.steps.pop_front();
} else {
break;
}
}
}
}
fn iter<'a>(&'a self) -> Self::Iter<'a> {
self.iter()
}
#[cfg(feature = "track-cache-size")]
fn set_tracked(&mut self, tracked: bool) {
self.is_tracked = tracked;
}
}
#[cfg(feature = "track-cache-size")]
impl<T> Drop for RingBuffer<T> {
fn drop(&mut self) {
let remaining = self.steps.len();
if remaining > 0 && self.is_tracked {
GLOBAL_CACHE_SIZE.fetch_sub(remaining, Ordering::Relaxed);
}
}
}
pub fn guarded_prune<C>(cache: &mut C, lookahead: Duration, protected_ts: Duration)
where
C: RingBufferTrait,
{
let Some(back) = cache.get_back() else { return };
let distance_to_protected = back.timestamp.saturating_sub(protected_ts);
let effective_max_age = lookahead.max(distance_to_protected);
cache.prune(effective_max_age);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ring_creation() {
let mut signal = RingBuffer::new();
signal.add_step(Step {
signal: "x",
value: 1,
timestamp: Duration::new(0, 0),
});
signal.add_step(Step {
signal: "x",
value: 2,
timestamp: Duration::new(0, 0),
});
signal.add_step(Step {
signal: "x",
value: 3,
timestamp: Duration::new(0, 0),
});
for i in 0..3 {
if let Some(step) = signal.steps.get(i) {
assert_eq!(step.value, i + 1)
}
}
}
#[test]
fn ring_prune() {
let mut signal = RingBuffer::new();
signal.add_step(Step {
signal: "x",
value: 1,
timestamp: Duration::from_secs(1),
});
signal.add_step(Step {
signal: "x",
value: 2,
timestamp: Duration::from_secs(2),
});
signal.add_step(Step {
signal: "x",
value: 3,
timestamp: Duration::from_secs(3),
});
signal.prune(Duration::from_secs(1));
assert_eq!(signal.len(), 2);
assert_eq!(signal.get_front().unwrap().value, 2);
assert_eq!(signal.get_back().unwrap().value, 3);
}
#[test]
fn ring_get_back() {
let mut signal = RingBuffer::new();
signal.add_step(Step {
signal: "x",
value: 1,
timestamp: Duration::from_secs(1),
});
signal.add_step(Step {
signal: "x",
value: 2,
timestamp: Duration::from_secs(2),
});
signal.add_step(Step {
signal: "x",
value: 3,
timestamp: Duration::from_secs(3),
});
let back_step = signal.get_back().unwrap();
assert_eq!(back_step.value, 3);
}
#[test]
fn ring_get_front() {
let mut signal = RingBuffer::new();
signal.add_step(Step {
signal: "x",
value: 1,
timestamp: Duration::from_secs(1),
});
signal.add_step(Step {
signal: "x",
value: 2,
timestamp: Duration::from_secs(2),
});
signal.add_step(Step {
signal: "x",
value: 3,
timestamp: Duration::from_secs(3),
});
let front_step = signal.get_front().unwrap();
assert_eq!(front_step.value, 1);
}
#[test]
fn ring_pop_front() {
let mut signal = RingBuffer::new();
signal.add_step(Step {
signal: "x",
value: 1,
timestamp: Duration::from_secs(1),
});
signal.add_step(Step {
signal: "x",
value: 2,
timestamp: Duration::from_secs(2),
});
signal.add_step(Step {
signal: "x",
value: 3,
timestamp: Duration::from_secs(3),
});
let popped_step = signal.pop_front().unwrap();
assert_eq!(popped_step.value, 1);
assert_eq!(signal.len(), 2);
assert_eq!(signal.get_front().unwrap().value, 2);
assert_eq!(signal.get_back().unwrap().value, 3);
}
#[test]
fn ring_iter() {
let mut signal = RingBuffer::new();
signal.add_step(Step {
signal: "x",
value: 1,
timestamp: Duration::from_secs(1),
});
signal.add_step(Step {
signal: "x",
value: 2,
timestamp: Duration::from_secs(2),
});
signal.add_step(Step {
signal: "x",
value: 3,
timestamp: Duration::from_secs(3),
});
let mut iter = signal.iter();
assert_eq!(iter.next().unwrap().value, 1);
assert_eq!(iter.next().unwrap().value, 2);
assert_eq!(iter.next().unwrap().value, 3);
assert!(iter.next().is_none());
}
#[test]
fn ring_is_empty() {
let signal: RingBuffer<bool> = RingBuffer::new();
assert!(signal.is_empty());
}
#[test]
fn default_ring_buffer() {
let signal: RingBuffer<i32> = RingBuffer::default();
assert!(signal.is_empty());
}
#[test]
fn ring_prune_empty_buffer() {
let mut signal: RingBuffer<f64> = RingBuffer::new();
signal.prune(Duration::from_secs(1));
assert!(signal.is_empty());
}
#[test]
fn ring_update_step() {
let mut signal = RingBuffer::new();
signal.add_step(Step {
signal: "x",
value: 1,
timestamp: Duration::from_secs(1),
});
signal.add_step(Step {
signal: "x",
value: 2,
timestamp: Duration::from_secs(2),
});
let updated = signal.update_step(Step {
signal: "x",
value: 10,
timestamp: Duration::from_secs(1),
});
assert!(updated);
assert_eq!(signal.get_front().unwrap().value, 10);
let not_updated = signal.update_step(Step {
signal: "x",
value: 20,
timestamp: Duration::from_secs(5),
});
assert!(!not_updated);
}
}