use std::collections::BTreeMap;
use std::time::{Duration, Instant};
pub const K_WINDOW: usize = 128;
pub const K_MIN: usize = 12;
#[doc = "#[tunable]"]
pub const IDLE_TTL: Duration = Duration::from_hours(1);
pub const GC_SWEEP_INTERVAL: Duration = Duration::from_secs(3600 / 8);
pub const MAX_SESSIONS: usize = 10 * K_WINDOW;
#[derive(Debug, Clone)]
pub struct SessionShard {
samples: Vec<f32>,
n_seen: u64,
last_observe: Instant,
}
impl Default for SessionShard {
fn default() -> Self {
Self::new()
}
}
impl SessionShard {
#[must_use]
pub fn new() -> Self {
Self {
samples: Vec::with_capacity(K_WINDOW),
n_seen: 0,
last_observe: Instant::now(),
}
}
pub fn observe(&mut self, value: f32) {
if self.samples.len() == K_WINDOW {
self.samples.remove(0);
}
self.samples.push(value);
self.n_seen = self.n_seen.saturating_add(1);
self.last_observe = Instant::now();
}
#[must_use]
pub fn warmup(&self) -> bool {
self.n_seen < K_MIN as u64
}
#[must_use]
pub fn median(&self) -> Option<f32> {
if self.warmup() || self.samples.is_empty() {
return None;
}
let mut sorted: Vec<f32> = self.samples.clone();
sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
let mid = sorted.len() / 2;
Some(if sorted.len() % 2 == 0 {
f32::midpoint(sorted[mid - 1], sorted[mid])
} else {
sorted[mid]
})
}
#[must_use]
pub fn idle_at(&self, now: Instant) -> Duration {
now.saturating_duration_since(self.last_observe)
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct GcSweepReport {
pub evicted_ttl: u32,
pub evicted_cap: u32,
pub active: u32,
}
#[derive(Debug, Default)]
pub struct SessionReservoir {
shards: BTreeMap<String, SessionShard>,
}
impl SessionReservoir {
#[must_use]
pub fn new() -> Self {
Self {
shards: BTreeMap::new(),
}
}
pub fn observe(&mut self, session_id: &str, value: f32) {
let shard = self.shards.entry(session_id.to_string()).or_default();
shard.observe(value);
}
#[must_use]
pub fn median(&self, session_id: &str) -> Option<f32> {
self.shards.get(session_id).and_then(SessionShard::median)
}
#[must_use]
pub fn warmup(&self, session_id: &str) -> bool {
self.shards.get(session_id).is_none_or(SessionShard::warmup)
}
#[must_use]
pub fn len(&self) -> usize {
self.shards.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.shards.is_empty()
}
pub fn gc_sweep(&mut self, now: Instant) -> GcSweepReport {
let before = self.shards.len() as u32;
let expired: Vec<String> = self
.shards
.iter()
.filter_map(|(k, v)| {
if v.idle_at(now) > IDLE_TTL {
Some(k.clone())
} else {
None
}
})
.collect();
let evicted_ttl = expired.len() as u32;
for k in &expired {
self.shards.remove(k);
}
let mut evicted_cap = 0u32;
if self.shards.len() > MAX_SESSIONS {
let mut ages: Vec<(String, Instant)> = self
.shards
.iter()
.map(|(k, v)| (k.clone(), v.last_observe))
.collect();
ages.sort_by_key(|(_, t)| *t);
let overflow = self.shards.len() - MAX_SESSIONS;
for (k, _) in ages.into_iter().take(overflow) {
self.shards.remove(&k);
evicted_cap += 1;
}
}
let active = self.shards.len() as u32;
debug_assert!(active + evicted_ttl + evicted_cap <= before.max(active));
GcSweepReport {
evicted_ttl,
evicted_cap,
active,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fresh_session_is_warmup() {
let r = SessionReservoir::new();
assert!(r.warmup("sid-a"));
assert_eq!(r.median("sid-a"), None);
}
#[test]
fn observe_tracks_n_seen() {
let mut r = SessionReservoir::new();
for i in 0..K_MIN {
r.observe("sid-a", i as f32);
}
assert!(!r.warmup("sid-a"));
assert!(r.median("sid-a").is_some());
}
#[test]
fn window_is_bounded_by_k_window() {
let mut r = SessionReservoir::new();
for i in 0..(K_WINDOW + 50) {
r.observe("sid-a", i as f32);
}
let m = r.median("sid-a").expect("post-warmup");
assert!(m > 50.0 && m < 180.0);
}
#[test]
fn gc_sweep_noop_on_fresh_session() {
let mut r = SessionReservoir::new();
r.observe("sid-a", 1.0);
let report = r.gc_sweep(Instant::now());
assert_eq!(report.evicted_ttl, 0);
assert_eq!(report.evicted_cap, 0);
assert_eq!(report.active, 1);
}
#[test]
fn gc_sweep_evicts_idle_shards() {
let mut r = SessionReservoir::new();
r.observe("sid-a", 1.0);
let far_future = Instant::now() + IDLE_TTL + Duration::from_secs(1);
let report = r.gc_sweep(far_future);
assert_eq!(report.evicted_ttl, 1);
assert_eq!(report.active, 0);
}
}