use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::Path;
#[derive(Debug)]
pub struct FreshnessIndexV2 {
entries: HashMap<String, FreshnessEntry>,
deadline_index: BTreeMap<u64, HashSet<String>>,
threshold: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct FreshnessEntry {
stored_at: u64,
decay_rate: f32,
deadline: u64,
}
#[derive(Debug, Serialize, Deserialize)]
struct PersistedIndex {
version: u32,
threshold: f32,
entries: Vec<(String, FreshnessEntry)>,
}
const PERSIST_VERSION: u32 = 1;
impl FreshnessIndexV2 {
pub fn new() -> Self {
Self::with_threshold(0.5)
}
pub fn with_threshold(threshold: f32) -> Self {
Self {
entries: HashMap::new(),
deadline_index: BTreeMap::new(),
threshold: threshold.clamp(0.001, 0.999),
}
}
pub fn insert(&mut self, key: &str, decay_rate: f32) {
let now = now_micros();
let deadline = self.compute_deadline(now, decay_rate);
if let Some(old) = self.entries.get(key) {
self.remove_from_deadline_index(key, old.deadline);
}
self.entries.insert(
key.to_string(),
FreshnessEntry {
stored_at: now,
decay_rate,
deadline,
},
);
self.deadline_index
.entry(deadline)
.or_default()
.insert(key.to_string());
}
pub fn get_freshness(&self, key: &str) -> Option<f32> {
self.entries.get(key).map(|e| {
let age = now_micros().saturating_sub(e.stored_at);
let age_secs = age as f32 / 1_000_000.0;
(-e.decay_rate * age_secs).exp()
})
}
pub fn is_stale(&self, key: &str) -> Option<bool> {
self.entries.get(key).map(|e| now_micros() >= e.deadline)
}
pub fn query_stale(&self) -> Vec<String> {
let now = now_micros();
self.deadline_index
.range(..=now)
.flat_map(|(_, keys)| keys.iter().cloned())
.collect()
}
pub fn query_fresh(&self) -> Vec<String> {
let now = now_micros();
self.deadline_index
.range((now + 1)..)
.flat_map(|(_, keys)| keys.iter().cloned())
.collect()
}
pub fn query_fresh_pattern(&self, pattern: &str, min_freshness: f32) -> Vec<String> {
let now = now_micros();
self.entries
.iter()
.filter(|(key, entry)| {
if !Self::matches_pattern(key, pattern) {
return false;
}
let age = now.saturating_sub(entry.stored_at);
let age_secs = age as f32 / 1_000_000.0;
let freshness = (-entry.decay_rate * age_secs).exp();
freshness >= min_freshness
})
.map(|(key, _)| key.clone())
.collect()
}
pub fn evict_stale(&mut self) -> Vec<String> {
let now = now_micros();
let mut evicted = Vec::new();
let stale_deadlines: Vec<u64> =
self.deadline_index.range(..=now).map(|(d, _)| *d).collect();
for deadline in stale_deadlines {
if let Some(keys) = self.deadline_index.remove(&deadline) {
for key in keys {
self.entries.remove(&key);
evicted.push(key);
}
}
}
evicted
}
pub fn count_stale(&self) -> usize {
let now = now_micros();
self.deadline_index
.range(..=now)
.map(|(_, keys)| keys.len())
.sum()
}
pub fn count_fresh(&self) -> usize {
self.entries.len() - self.count_stale()
}
pub fn average_freshness(&self) -> f32 {
if self.entries.is_empty() {
return 1.0;
}
let now = now_micros();
let total: f32 = self
.entries
.values()
.map(|entry| {
let age = now.saturating_sub(entry.stored_at);
let age_secs = age as f32 / 1_000_000.0;
(-entry.decay_rate * age_secs).exp()
})
.sum();
total / self.entries.len() as f32
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn threshold(&self) -> f32 {
self.threshold
}
pub fn remove(&mut self, key: &str) -> bool {
if let Some(entry) = self.entries.remove(key) {
self.remove_from_deadline_index(key, entry.deadline);
true
} else {
false
}
}
pub fn save<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
let persisted = PersistedIndex {
version: PERSIST_VERSION,
threshold: self.threshold,
entries: self
.entries
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
};
let bytes = bincode::serialize(&persisted)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
std::fs::write(path, bytes)
}
pub fn load<P: AsRef<Path>>(path: P) -> std::io::Result<Self> {
let bytes = std::fs::read(path)?;
let persisted: PersistedIndex = bincode::deserialize(&bytes)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
if persisted.version != PERSIST_VERSION {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"unsupported FreshnessIndexV2 persist version: {} (expected {})",
persisted.version, PERSIST_VERSION
),
));
}
let mut index = Self::with_threshold(persisted.threshold);
for (key, entry) in persisted.entries {
index
.deadline_index
.entry(entry.deadline)
.or_default()
.insert(key.clone());
index.entries.insert(key, entry);
}
Ok(index)
}
fn compute_deadline(&self, stored_at: u64, decay_rate: f32) -> u64 {
if decay_rate <= 0.0 {
return u64::MAX; }
let time_to_stale_secs = -self.threshold.ln() / decay_rate;
let time_to_stale_micros = (time_to_stale_secs * 1_000_000.0) as u64;
stored_at.saturating_add(time_to_stale_micros)
}
fn remove_from_deadline_index(&mut self, key: &str, deadline: u64) {
if let Some(keys) = self.deadline_index.get_mut(&deadline) {
keys.remove(key);
if keys.is_empty() {
self.deadline_index.remove(&deadline);
}
}
}
fn matches_pattern(key: &str, pattern: &str) -> bool {
if pattern == "*" {
return true;
}
if let Some(prefix) = pattern.strip_suffix("/*") {
return key.starts_with(prefix);
}
if let Some(prefix) = pattern.strip_suffix('*') {
return key.starts_with(prefix);
}
key == pattern
}
}
impl Default for FreshnessIndexV2 {
fn default() -> Self {
Self::new()
}
}
fn now_micros() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_micros() as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_insert_and_query() {
let mut index = FreshnessIndexV2::new();
index.insert("fast", 10.0);
index.insert("slow", 0.0001);
assert!(index.get_freshness("fast").unwrap() > 0.99);
assert!(index.get_freshness("slow").unwrap() > 0.99);
}
#[test]
fn test_deadline_computation() {
let index = FreshnessIndexV2::with_threshold(0.5);
let now = 1_000_000; let deadline = index.compute_deadline(now, 1.0);
let expected = now + 693_147;
assert!((deadline as i64 - expected as i64).abs() < 1000);
}
#[test]
fn test_stale_query_efficiency() {
let mut index = FreshnessIndexV2::new();
for i in 0..1000 {
let decay = if i < 100 { 100.0 } else { 0.00001 }; index.insert(&format!("key_{}", i), decay);
}
std::thread::sleep(std::time::Duration::from_millis(10));
let stale = index.query_stale();
assert!(stale.len() >= 50); assert!(stale.len() <= 200); }
#[test]
fn test_eviction() {
let mut index = FreshnessIndexV2::new();
index.insert("stale1", 1000.0); index.insert("stale2", 1000.0);
index.insert("fresh", 0.00001);
std::thread::sleep(std::time::Duration::from_millis(5));
let evicted = index.evict_stale();
assert!(evicted.contains(&"stale1".to_string()));
assert!(evicted.contains(&"stale2".to_string()));
assert!(!evicted.contains(&"fresh".to_string()));
assert_eq!(index.len(), 1);
}
#[test]
fn test_static_data_never_stale() {
let mut index = FreshnessIndexV2::new();
index.insert("static", 0.0);
assert!(!index.is_stale("static").unwrap());
assert!((index.get_freshness("static").unwrap() - 1.0).abs() < 0.001);
}
#[test]
fn test_pattern_matching() {
assert!(FreshnessIndexV2::matches_pattern(
"gradient/layer_0",
"gradient/*"
));
assert!(FreshnessIndexV2::matches_pattern(
"gradient/layer_0/weights",
"gradient/*"
));
assert!(!FreshnessIndexV2::matches_pattern(
"model/weights",
"gradient/*"
));
assert!(FreshnessIndexV2::matches_pattern("anything", "*"));
}
#[test]
fn test_remove() {
let mut index = FreshnessIndexV2::new();
index.insert("key1", 0.001);
index.insert("key2", 0.001);
assert_eq!(index.len(), 2);
assert!(index.remove("key1"));
assert_eq!(index.len(), 1);
assert!(!index.remove("key1")); assert!(index.get_freshness("key1").is_none());
}
}