use std::collections::VecDeque;
use crate::cache::metrics::{Method, RequestOutcome, SequentialCacheConfig, METRICS};
#[derive(Debug, Clone)]
pub struct SequentialCache<K, V> {
name: &'static str,
data: VecDeque<(K, V)>,
capacity: usize,
}
impl<K: Ord + Copy, V: Clone> SequentialCache<K, V> {
pub fn new(name: &'static str, capacity: usize) -> Self {
assert!(capacity > 0, "Cache capacity must be greater than 0");
let config = SequentialCacheConfig {
capacity: capacity as u64,
};
tracing::info!("Configured sequential cache `{name}` with capacity {capacity} items");
if let Err(err) = METRICS.sequential_info[&name].set(config) {
tracing::warn!(
"Sequential cache `{name}` was already created with config {:?}; new config: {:?}",
METRICS.sequential_info[&name].get(),
err.into_inner()
);
}
SequentialCache {
name,
data: VecDeque::with_capacity(capacity),
capacity,
}
}
pub fn insert(&mut self, items: Vec<(K, V)>) -> anyhow::Result<()> {
for (key, value) in items {
let latency = METRICS.latency[&(self.name, Method::Insert)].start();
anyhow::ensure!(
Some(key) >= self.get_last_key(),
"Keys must be inserted in sequential order"
);
if self.data.len() == self.capacity {
self.data.pop_front();
}
self.data.push_back((key, value));
latency.observe();
}
self.report_size();
Ok(())
}
pub fn query(&self, after: K) -> Option<Vec<(K, V)>> {
let latency = METRICS.latency[&(self.name, Method::Get)].start();
let result = match self.data.partition_point(|&(key, _)| key <= after) {
0 => None,
pos => Some(self.data.range(pos..).cloned().collect()),
};
latency.observe();
METRICS.requests[&(self.name, RequestOutcome::from_hit(result.is_some()))].inc();
result
}
pub fn get_last_key(&self) -> Option<K> {
self.data.back().map(|&(key, _)| key)
}
fn report_size(&self) {
METRICS.len[&self.name].set(self.data.len() as u64);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn non_sequential_insertion() {
let mut cache = SequentialCache::<u32, u32>::new("non_sequential_insertion", 3);
let err = cache
.insert(vec![
(1, 1),
(3, 3), (2, 2),
])
.unwrap_err()
.to_string();
assert!(
err.contains("Keys must be inserted in sequential order"),
"{err}"
);
}
#[test]
fn non_sequential_insertion_multiple_invocations() {
let mut cache = SequentialCache::<u32, u32>::new("non_sequential_insertion", 3);
cache.insert(vec![(1, 1), (2, 2)]).unwrap();
assert!(cache.insert(vec![(1, 1)]).is_err());
}
#[test]
fn query() {
let mut cache = SequentialCache::<u32, u32>::new("query", 100);
cache.insert(vec![(1, 1), (2, 2), (2, 5), (3, 6)]).unwrap();
cache.insert(vec![(3, 7), (100, 8)]).unwrap();
assert_eq!(cache.query(0), None);
assert_eq!(
cache.query(1),
Some(vec![(2, 2), (2, 5), (3, 6), (3, 7), (100, 8)])
);
assert_eq!(cache.query(2), Some(vec![(3, 6), (3, 7), (100, 8)]));
assert_eq!(cache.query(3), Some(vec![(100, 8)]));
assert_eq!(cache.query(4), Some(vec![(100, 8)]));
assert_eq!(cache.query(100), Some(vec![]));
assert_eq!(cache.query(1000), Some(vec![]));
}
#[test]
fn query_at_capacity() {
let mut cache = SequentialCache::<u32, u32>::new("query_at_capacity", 3);
cache.insert(vec![(1, 1), (2, 2), (3, 3)]).unwrap();
cache.insert(vec![(4, 4)]).unwrap();
assert_eq!(cache.query(1), None);
assert_eq!(cache.query(2), Some(vec![(3, 3), (4, 4)]));
}
#[test]
fn insertion_at_capacity_limit() {
let mut cache = SequentialCache::<u32, String>::new("insertion_at_capacity_limit", 2);
cache
.insert(vec![(1, "One".to_string()), (2, "Two".to_string())])
.unwrap();
cache
.insert(vec![(3, "Three".to_string()), (4, "Four".to_string())])
.unwrap();
assert_eq!(cache.query(1), None);
assert_eq!(cache.query(3), Some(vec![(4, "Four".to_string())]));
}
}