#[derive(Debug)]
pub struct CachedPartition {
pub min_ts: i64,
pub max_ts: i64,
pub timestamps: Vec<i64>,
pub values: Vec<f64>,
pub memory_bytes: usize,
}
pub struct RecentWindowCache {
entries: std::collections::HashMap<(String, i64), CachedPartition>,
order: std::collections::VecDeque<(String, i64)>,
max_bytes: usize,
current_bytes: usize,
hot_window_ms: i64,
}
impl RecentWindowCache {
pub fn new(max_bytes: usize, hot_window_ms: i64) -> Self {
Self {
entries: std::collections::HashMap::new(),
order: std::collections::VecDeque::new(),
max_bytes,
current_bytes: 0,
hot_window_ms,
}
}
pub fn is_in_hot_window(&self, max_ts: i64, now_ms: i64) -> bool {
if self.hot_window_ms <= 0 {
return false;
}
max_ts >= now_ms - self.hot_window_ms
}
pub fn get(&self, collection: &str, min_ts: i64) -> Option<&CachedPartition> {
self.entries.get(&(collection.to_string(), min_ts))
}
pub fn insert(&mut self, collection: &str, partition: CachedPartition) {
let key = (collection.to_string(), partition.min_ts);
let size = partition.memory_bytes;
if size > self.max_bytes {
return;
}
if let Some(old) = self.entries.remove(&key) {
self.current_bytes -= old.memory_bytes;
self.order.retain(|k| k != &key);
}
while self.current_bytes + size > self.max_bytes {
if let Some(evict_key) = self.order.pop_front() {
if let Some(evicted) = self.entries.remove(&evict_key) {
self.current_bytes -= evicted.memory_bytes;
}
} else {
break;
}
}
self.current_bytes += size;
self.order.push_back(key.clone());
self.entries.insert(key, partition);
}
pub fn evict_cold(&mut self, now_ms: i64) {
let cutoff = now_ms - self.hot_window_ms;
let cold_keys: Vec<(String, i64)> = self
.entries
.iter()
.filter(|(_, p)| p.max_ts < cutoff)
.map(|(k, _)| k.clone())
.collect();
for key in cold_keys {
if let Some(removed) = self.entries.remove(&key) {
self.current_bytes -= removed.memory_bytes;
}
self.order.retain(|k| k != &key);
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn memory_bytes(&self) -> usize {
self.current_bytes
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_partition(min_ts: i64, max_ts: i64, rows: usize) -> CachedPartition {
CachedPartition {
min_ts,
max_ts,
timestamps: vec![min_ts; rows],
values: vec![0.0; rows],
memory_bytes: rows * 16,
}
}
#[test]
fn basic_insert_and_get() {
let mut cache = RecentWindowCache::new(1_000_000, 3_600_000);
cache.insert("metrics", make_partition(1000, 2000, 100));
assert!(cache.get("metrics", 1000).is_some());
assert_eq!(cache.len(), 1);
}
#[test]
fn hot_window_check() {
let cache = RecentWindowCache::new(1_000_000, 3_600_000);
let now = 10_000_000;
assert!(cache.is_in_hot_window(now - 1_000_000, now));
assert!(!cache.is_in_hot_window(now - 5_000_000, now));
}
#[test]
fn evict_cold() {
let mut cache = RecentWindowCache::new(1_000_000, 3_600_000);
cache.insert("m", make_partition(1000, 2000, 10));
cache.insert("m", make_partition(8_000_000, 9_000_000, 10));
assert_eq!(cache.len(), 2);
cache.evict_cold(10_000_000);
assert_eq!(cache.len(), 1);
assert!(cache.get("m", 1000).is_none()); assert!(cache.get("m", 8_000_000).is_some()); }
#[test]
fn lru_eviction() {
let mut cache = RecentWindowCache::new(500, 3_600_000);
cache.insert("m", make_partition(1000, 2000, 20)); cache.insert("m", make_partition(3000, 4000, 20)); assert_eq!(cache.len(), 1);
assert!(cache.get("m", 1000).is_none());
assert!(cache.get("m", 3000).is_some());
}
}