use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use crate::cache::InvalidationEvent;
use crate::commit::TenantId;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DecayedRow {
pub rid: String,
pub importance: f64,
pub last_access_at: SystemTime,
pub half_life_hours: f64,
}
impl DecayedRow {
pub fn decayed_score(&self, now: SystemTime) -> f64 {
if self.half_life_hours <= 0.0 {
return self.importance;
}
let elapsed_secs = now
.duration_since(self.last_access_at)
.unwrap_or(Duration::ZERO)
.as_secs_f64();
let elapsed_hours = elapsed_secs / 3600.0;
let exponent = -elapsed_hours / self.half_life_hours;
self.importance * exponent.exp2()
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct DecayConfig {
pub default_half_life_hours: f64,
pub min_score: f64,
}
impl Default for DecayConfig {
fn default() -> Self {
Self {
default_half_life_hours: 168.0,
min_score: 0.0,
}
}
}
pub trait TimeDecayProvider: Send + Sync {
fn top_decayed(
&self,
tenant_id: TenantId,
namespace: &str,
limit: usize,
now: SystemTime,
) -> Vec<(DecayedRow, f64)>;
fn top_rids(
&self,
tenant_id: TenantId,
namespace: &str,
limit: usize,
now: SystemTime,
) -> Vec<String> {
self.top_decayed(tenant_id, namespace, limit, now)
.into_iter()
.map(|(row, _)| row.rid)
.collect()
}
}
pub struct TimeDecayIndex {
inner: Arc<RwLock<HashMap<TenantId, HashMap<String, Vec<DecayedRow>>>>>,
config: DecayConfig,
}
impl TimeDecayIndex {
pub fn new(config: DecayConfig) -> Self {
Self {
inner: Arc::new(RwLock::new(HashMap::new())),
config,
}
}
pub fn config(&self) -> &DecayConfig {
&self.config
}
pub fn upsert(
&self,
tenant_id: TenantId,
namespace: &str,
rid: &str,
importance: f64,
half_life_hours: Option<f64>,
when: SystemTime,
) {
let half_life = half_life_hours.unwrap_or(self.config.default_half_life_hours);
let mut map = self.inner.write();
let ns_map = map.entry(tenant_id).or_default();
let list = ns_map.entry(namespace.to_string()).or_default();
if let Some(existing) = list.iter_mut().find(|r| r.rid == rid) {
existing.importance = importance;
existing.last_access_at = when;
existing.half_life_hours = half_life;
} else {
list.push(DecayedRow {
rid: rid.to_string(),
importance,
last_access_at: when,
half_life_hours: half_life,
});
}
list.sort_by(|a, b| b.last_access_at.cmp(&a.last_access_at));
}
pub fn touch(&self, tenant_id: TenantId, namespace: &str, rid: &str, when: SystemTime) {
let mut map = self.inner.write();
let Some(ns_map) = map.get_mut(&tenant_id) else {
return;
};
let Some(list) = ns_map.get_mut(namespace) else {
return;
};
if let Some(existing) = list.iter_mut().find(|r| r.rid == rid) {
existing.last_access_at = when;
list.sort_by(|a, b| b.last_access_at.cmp(&a.last_access_at));
}
}
pub fn remove(&self, tenant_id: TenantId, namespace: &str, rid: &str) {
let mut map = self.inner.write();
let Some(ns_map) = map.get_mut(&tenant_id) else {
return;
};
let Some(list) = ns_map.get_mut(namespace) else {
return;
};
list.retain(|r| r.rid != rid);
if list.is_empty() {
ns_map.remove(namespace);
}
if ns_map.is_empty() {
map.remove(&tenant_id);
}
}
pub fn clear_tenant(&self, tenant_id: TenantId) {
self.inner.write().remove(&tenant_id);
}
pub fn total_entries(&self) -> usize {
self.inner
.read()
.values()
.flat_map(|m| m.values())
.map(|v| v.len())
.sum()
}
pub fn tenant_count(&self) -> usize {
self.inner.read().len()
}
pub fn namespace_count(&self, tenant_id: TenantId) -> usize {
self.inner
.read()
.get(&tenant_id)
.map(|m| m.len())
.unwrap_or(0)
}
}
impl Default for TimeDecayIndex {
fn default() -> Self {
Self::new(DecayConfig::default())
}
}
impl Clone for TimeDecayIndex {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
config: self.config.clone(),
}
}
}
impl TimeDecayProvider for TimeDecayIndex {
fn top_decayed(
&self,
tenant_id: TenantId,
namespace: &str,
limit: usize,
now: SystemTime,
) -> Vec<(DecayedRow, f64)> {
let map = self.inner.read();
let Some(ns_map) = map.get(&tenant_id) else {
return Vec::new();
};
let Some(list) = ns_map.get(namespace) else {
return Vec::new();
};
let candidate_window = (limit * 4).max(64).min(list.len());
let mut scored: Vec<(DecayedRow, f64)> = list[..candidate_window]
.iter()
.map(|r| (r.clone(), r.decayed_score(now)))
.filter(|(_, score)| *score >= self.config.min_score)
.collect();
scored.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
scored.truncate(limit);
scored
}
}
pub fn spawn_invalidation_bus_subscriber(
index: TimeDecayIndex,
bus: &crate::cache::InvalidationBus,
) -> tokio::task::JoinHandle<()> {
let mut rx = bus.subscribe();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(InvalidationEvent::Tombstoned { tenant_id, rid }) => {
let mut map = index.inner.write();
if let Some(ns_map) = map.get_mut(&tenant_id) {
for list in ns_map.values_mut() {
list.retain(|r| r.rid != rid);
}
ns_map.retain(|_, v| !v.is_empty());
}
}
Ok(InvalidationEvent::Updated {
tenant_id: _,
rid: _,
})
| Ok(InvalidationEvent::EdgeChanged { .. })
| Ok(InvalidationEvent::TenantConfigChanged { .. }) => {
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
index.inner.write().clear();
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn t0() -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000)
}
#[test]
fn config_defaults_are_one_week_half_life() {
let cfg = DecayConfig::default();
assert_eq!(cfg.default_half_life_hours, 168.0);
assert_eq!(cfg.min_score, 0.0);
}
#[test]
fn decay_at_zero_elapsed_equals_importance() {
let now = t0();
let row = DecayedRow {
rid: "a".into(),
importance: 0.7,
last_access_at: now,
half_life_hours: 24.0,
};
assert!((row.decayed_score(now) - 0.7).abs() < 1e-9);
}
#[test]
fn decay_at_one_half_life_halves_score() {
let now = t0();
let row = DecayedRow {
rid: "a".into(),
importance: 0.8,
last_access_at: now,
half_life_hours: 24.0,
};
let later = now + Duration::from_secs(24 * 3600);
let score = row.decayed_score(later);
assert!((score - 0.4).abs() < 1e-9);
}
#[test]
fn decay_at_two_half_lives_quarters_score() {
let now = t0();
let row = DecayedRow {
rid: "a".into(),
importance: 1.0,
last_access_at: now,
half_life_hours: 24.0,
};
let later = now + Duration::from_secs(48 * 3600);
assert!((row.decayed_score(later) - 0.25).abs() < 1e-9);
}
#[test]
fn decay_zero_half_life_returns_importance_flat() {
let row = DecayedRow {
rid: "a".into(),
importance: 0.5,
last_access_at: t0(),
half_life_hours: 0.0,
};
let later = t0() + Duration::from_secs(100 * 3600);
assert_eq!(row.decayed_score(later), 0.5);
}
#[test]
fn empty_index_returns_no_rows() {
let idx = TimeDecayIndex::default();
let r = idx.top_decayed(TenantId::new(1), "ns", 10, t0());
assert!(r.is_empty());
}
#[test]
fn upsert_then_top_returns_row() {
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns", "a", 0.7, Some(24.0), t0());
let r = idx.top_decayed(TenantId::new(1), "ns", 10, t0());
assert_eq!(r.len(), 1);
assert_eq!(r[0].0.rid, "a");
assert!((r[0].1 - 0.7).abs() < 1e-9);
}
#[test]
fn upsert_replaces_does_not_duplicate() {
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns", "a", 0.3, Some(24.0), t0());
idx.upsert(TenantId::new(1), "ns", "a", 0.9, Some(24.0), t0());
let r = idx.top_decayed(TenantId::new(1), "ns", 10, t0());
assert_eq!(r.len(), 1);
assert!((r[0].1 - 0.9).abs() < 1e-9);
}
#[test]
fn top_decayed_orders_by_decayed_score() {
let idx = TimeDecayIndex::default();
let now = t0();
let day_ago = now - Duration::from_secs(24 * 3600);
idx.upsert(TenantId::new(1), "ns", "old_high", 1.0, Some(24.0), day_ago);
idx.upsert(TenantId::new(1), "ns", "fresh_low", 0.4, Some(24.0), now);
idx.upsert(TenantId::new(1), "ns", "fresh_mid", 0.6, Some(24.0), now);
let r = idx.top_decayed(TenantId::new(1), "ns", 3, now);
assert_eq!(r.len(), 3);
assert_eq!(r[0].0.rid, "fresh_mid");
assert_eq!(r[1].0.rid, "old_high");
assert_eq!(r[2].0.rid, "fresh_low");
}
#[test]
fn limit_truncates_top_decayed() {
let idx = TimeDecayIndex::default();
for i in 0..10 {
idx.upsert(
TenantId::new(1),
"ns",
&format!("r{i}"),
0.5,
Some(24.0),
t0() + Duration::from_secs(i),
);
}
let r = idx.top_decayed(TenantId::new(1), "ns", 3, t0() + Duration::from_secs(100));
assert_eq!(r.len(), 3);
}
#[test]
fn min_score_filters_out_decayed_below_floor() {
let idx = TimeDecayIndex::new(DecayConfig {
default_half_life_hours: 24.0,
min_score: 0.3,
});
idx.upsert(TenantId::new(1), "ns", "high", 0.9, Some(24.0), t0());
idx.upsert(TenantId::new(1), "ns", "low", 0.1, Some(24.0), t0());
let r = idx.top_decayed(TenantId::new(1), "ns", 10, t0());
assert_eq!(r.len(), 1);
assert_eq!(r[0].0.rid, "high");
}
#[test]
fn touch_updates_recency_only() {
let idx = TimeDecayIndex::default();
let now = t0();
let two_days_ago = now - Duration::from_secs(48 * 3600);
idx.upsert(TenantId::new(1), "ns", "a", 0.5, Some(24.0), two_days_ago);
let r1 = idx.top_decayed(TenantId::new(1), "ns", 10, now);
assert!((r1[0].1 - 0.125).abs() < 1e-6);
idx.touch(TenantId::new(1), "ns", "a", now);
let r2 = idx.top_decayed(TenantId::new(1), "ns", 10, now);
assert!((r2[0].1 - 0.5).abs() < 1e-6);
}
#[test]
fn touch_unknown_rid_is_noop() {
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns", "a", 0.5, Some(24.0), t0());
idx.touch(TenantId::new(1), "ns", "ghost", t0());
idx.touch(TenantId::new(2), "ns", "a", t0()); idx.touch(TenantId::new(1), "missing_ns", "a", t0()); assert_eq!(idx.total_entries(), 1);
}
#[test]
fn remove_drops_row() {
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns", "a", 0.5, Some(24.0), t0());
idx.upsert(TenantId::new(1), "ns", "b", 0.5, Some(24.0), t0());
idx.remove(TenantId::new(1), "ns", "a");
let r = idx.top_decayed(TenantId::new(1), "ns", 10, t0());
assert_eq!(r.len(), 1);
assert_eq!(r[0].0.rid, "b");
}
#[test]
fn remove_last_in_namespace_drops_namespace() {
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns", "a", 0.5, Some(24.0), t0());
idx.remove(TenantId::new(1), "ns", "a");
assert_eq!(idx.namespace_count(TenantId::new(1)), 0);
assert_eq!(idx.tenant_count(), 0);
}
#[test]
fn per_tenant_isolation() {
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns", "a", 0.5, Some(24.0), t0());
idx.upsert(TenantId::new(2), "ns", "b", 0.5, Some(24.0), t0());
let t1 = idx.top_decayed(TenantId::new(1), "ns", 10, t0());
let t2 = idx.top_decayed(TenantId::new(2), "ns", 10, t0());
assert_eq!(t1.len(), 1);
assert_eq!(t1[0].0.rid, "a");
assert_eq!(t2.len(), 1);
assert_eq!(t2[0].0.rid, "b");
}
#[test]
fn per_namespace_isolation() {
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns_a", "a", 0.5, Some(24.0), t0());
idx.upsert(TenantId::new(1), "ns_b", "b", 0.5, Some(24.0), t0());
let a = idx.top_decayed(TenantId::new(1), "ns_a", 10, t0());
let b = idx.top_decayed(TenantId::new(1), "ns_b", 10, t0());
assert_eq!(a.len(), 1);
assert_eq!(a[0].0.rid, "a");
assert_eq!(b.len(), 1);
assert_eq!(b[0].0.rid, "b");
}
#[test]
fn top_rids_helper_returns_just_rids() {
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns", "a", 0.7, Some(24.0), t0());
idx.upsert(TenantId::new(1), "ns", "b", 0.5, Some(24.0), t0());
let rids = idx.top_rids(TenantId::new(1), "ns", 10, t0());
assert_eq!(rids, vec!["a", "b"]);
}
#[test]
fn dyn_dispatch_works() {
let idx: Arc<dyn TimeDecayProvider> = Arc::new(TimeDecayIndex::default());
let r = idx.top_decayed(TenantId::new(1), "ns", 10, t0());
assert!(r.is_empty());
}
#[tokio::test]
async fn invalidation_bus_subscriber_removes_tombstoned_rid() {
let bus = crate::cache::InvalidationBus::new();
let idx = TimeDecayIndex::default();
idx.upsert(TenantId::new(1), "ns_a", "memory_x", 0.5, Some(24.0), t0());
idx.upsert(TenantId::new(1), "ns_b", "memory_x", 0.5, Some(24.0), t0());
idx.upsert(TenantId::new(1), "ns_a", "memory_y", 0.5, Some(24.0), t0());
let handle = spawn_invalidation_bus_subscriber(idx.clone(), &bus);
bus.publish(InvalidationEvent::Tombstoned {
tenant_id: TenantId::new(1),
rid: "memory_x".into(),
});
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(
idx.top_rids(TenantId::new(1), "ns_a", 10, t0()),
vec!["memory_y"]
);
assert!(idx.top_rids(TenantId::new(1), "ns_b", 10, t0()).is_empty());
handle.abort();
}
}