use std::collections::HashMap;
use std::sync::RwLock;
use nodedb_cluster::DescriptorId;
use nodedb_types::Hlc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct DrainEntry {
pub up_to_version: u64,
pub expires_at: Hlc,
}
#[derive(Debug, Default)]
pub struct DescriptorDrainTracker {
active: RwLock<HashMap<DescriptorId, DrainEntry>>,
}
impl DescriptorDrainTracker {
pub fn new() -> Self {
Self::default()
}
pub fn install_start(&self, id: DescriptorId, up_to_version: u64, expires_at: Hlc) {
let mut map = self.active.write().unwrap_or_else(|p| p.into_inner());
map.insert(
id,
DrainEntry {
up_to_version,
expires_at,
},
);
}
pub fn install_end(&self, id: &DescriptorId) {
let mut map = self.active.write().unwrap_or_else(|p| p.into_inner());
map.remove(id);
}
pub fn is_draining(&self, id: &DescriptorId, requested_version: u64, now_wall_ns: u64) -> bool {
let map = self.active.read().unwrap_or_else(|p| p.into_inner());
match map.get(id) {
Some(entry) => {
entry.expires_at.wall_ns > now_wall_ns && requested_version <= entry.up_to_version
}
None => false,
}
}
pub fn snapshot(&self) -> Vec<(DescriptorId, DrainEntry)> {
let map = self.active.read().unwrap_or_else(|p| p.into_inner());
map.iter().map(|(id, e)| (id.clone(), *e)).collect()
}
pub fn count_active(&self, now_wall_ns: u64) -> usize {
let map = self.active.read().unwrap_or_else(|p| p.into_inner());
map.values()
.filter(|e| e.expires_at.wall_ns > now_wall_ns)
.count()
}
pub fn total_count(&self) -> usize {
let map = self.active.read().unwrap_or_else(|p| p.into_inner());
map.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use nodedb_cluster::DescriptorKind;
fn id(name: &str) -> DescriptorId {
DescriptorId::new(1, DescriptorKind::Collection, name.to_string())
}
fn hlc(wall_ns: u64) -> Hlc {
Hlc::new(wall_ns, 0)
}
#[test]
fn install_then_is_draining_true_for_versions_in_range() {
let tracker = DescriptorDrainTracker::new();
let d = id("orders");
tracker.install_start(d.clone(), 5, hlc(1_000_000));
assert!(tracker.is_draining(&d, 1, 500_000));
assert!(tracker.is_draining(&d, 3, 500_000));
assert!(tracker.is_draining(&d, 5, 500_000));
assert!(!tracker.is_draining(&d, 6, 500_000));
assert!(!tracker.is_draining(&d, 100, 500_000));
}
#[test]
fn install_end_clears_entry() {
let tracker = DescriptorDrainTracker::new();
let d = id("orders");
tracker.install_start(d.clone(), 5, hlc(1_000_000));
assert!(tracker.is_draining(&d, 5, 500_000));
tracker.install_end(&d);
assert!(!tracker.is_draining(&d, 5, 500_000));
assert_eq!(tracker.total_count(), 0);
}
#[test]
fn is_draining_filters_expired_entries() {
let tracker = DescriptorDrainTracker::new();
let d = id("stale");
tracker.install_start(d.clone(), 5, hlc(1_000));
assert!(!tracker.is_draining(&d, 1, 2_000));
assert!(!tracker.is_draining(&d, 5, 2_000));
assert!(tracker.is_draining(&d, 5, 500));
}
#[test]
fn multiple_descriptors_are_independent() {
let tracker = DescriptorDrainTracker::new();
let a = id("a");
let b = id("b");
tracker.install_start(a.clone(), 1, hlc(1_000_000));
tracker.install_start(b.clone(), 10, hlc(1_000_000));
assert!(tracker.is_draining(&a, 1, 500_000));
assert!(!tracker.is_draining(&a, 2, 500_000));
assert!(tracker.is_draining(&b, 5, 500_000));
assert!(tracker.is_draining(&b, 10, 500_000));
assert!(!tracker.is_draining(&b, 11, 500_000));
}
#[test]
fn install_start_overwrites_prior_entry() {
let tracker = DescriptorDrainTracker::new();
let d = id("orders");
tracker.install_start(d.clone(), 5, hlc(1_000_000));
tracker.install_start(d.clone(), 10, hlc(2_000_000));
assert!(tracker.is_draining(&d, 10, 500_000));
assert_eq!(tracker.total_count(), 1);
let snap = tracker.snapshot();
assert_eq!(snap[0].1.up_to_version, 10);
assert_eq!(snap[0].1.expires_at.wall_ns, 2_000_000);
}
#[test]
fn count_active_filters_expired() {
let tracker = DescriptorDrainTracker::new();
let a = id("live");
let b = id("dead");
tracker.install_start(a, 1, hlc(10_000_000));
tracker.install_start(b, 1, hlc(100));
assert_eq!(tracker.total_count(), 2);
assert_eq!(tracker.count_active(1_000), 1);
assert_eq!(tracker.count_active(20_000_000), 0);
}
}