use std::collections::{BTreeMap, HashMap};
use std::time::Instant;
use crate::adapter::net::channel::ChannelName;
use crate::adapter::net::redex::RedexFile;
pub struct GreedyCacheEntry {
pub file: RedexFile,
pub last_read: Instant,
pub bytes: u64,
pub origin_hash: u64,
lru_pos: u64,
}
impl std::fmt::Debug for GreedyCacheEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GreedyCacheEntry")
.field("bytes", &self.bytes)
.field("last_read", &self.last_read)
.field("lru_pos", &self.lru_pos)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct EvictedEntry {
pub channel: ChannelName,
pub origin_hash: u64,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct EvictionSweep {
pub evicted: Vec<EvictedEntry>,
}
impl EvictionSweep {
pub fn is_empty(&self) -> bool {
self.evicted.is_empty()
}
pub fn len(&self) -> usize {
self.evicted.len()
}
pub fn channels(&self) -> impl Iterator<Item = &ChannelName> + '_ {
self.evicted.iter().map(|e| &e.channel)
}
}
#[derive(Debug)]
pub struct GreedyCacheRegistry {
entries: HashMap<ChannelName, GreedyCacheEntry>,
lru: BTreeMap<u64, ChannelName>,
origin_counts: HashMap<u64, usize>,
next_lru_pos: u64,
total_bytes: u64,
total_cap_bytes: u64,
}
impl GreedyCacheRegistry {
pub fn new(total_cap_bytes: u64) -> Self {
Self {
entries: HashMap::new(),
lru: BTreeMap::new(),
origin_counts: HashMap::new(),
next_lru_pos: 0,
total_bytes: 0,
total_cap_bytes,
}
}
fn add_origin(&mut self, hash: u64) {
if hash == 0 {
return;
}
*self.origin_counts.entry(hash).or_insert(0) += 1;
}
fn remove_origin(&mut self, hash: u64) {
if hash == 0 {
return;
}
use std::collections::hash_map::Entry;
if let Entry::Occupied(mut e) = self.origin_counts.entry(hash) {
let v = e.get_mut();
*v -= 1;
if *v == 0 {
e.remove();
}
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn total_bytes(&self) -> u64 {
self.total_bytes
}
pub fn resync_bytes_from_files(&mut self) {
let mut new_total: u64 = 0;
for entry in self.entries.values_mut() {
let on_disk = entry.file.retained_bytes();
entry.bytes = on_disk;
new_total = new_total.saturating_add(on_disk);
}
self.total_bytes = new_total;
}
pub fn total_cap_bytes(&self) -> u64 {
self.total_cap_bytes
}
pub fn contains(&self, channel: &ChannelName) -> bool {
self.entries.contains_key(channel)
}
pub fn get(&self, channel: &ChannelName) -> Option<&GreedyCacheEntry> {
self.entries.get(channel)
}
pub fn contains_origin(&self, origin_hash: u64) -> bool {
if origin_hash == 0 {
return false;
}
self.origin_counts.contains_key(&origin_hash)
}
pub fn channels(&self) -> impl Iterator<Item = &ChannelName> + '_ {
self.entries.keys()
}
fn allocate_lru_pos(&mut self) -> u64 {
if self.next_lru_pos == u64::MAX {
debug_assert!(
false,
"GreedyCacheRegistry::next_lru_pos saturated at u64::MAX — \
LRU ordering would silently collapse",
);
return u64::MAX;
}
let pos = self.next_lru_pos;
self.next_lru_pos += 1;
pos
}
pub fn upsert(&mut self, channel: ChannelName, file: RedexFile, now: Instant) {
let new_pos = self.allocate_lru_pos();
if let Some(prev) = self.entries.get_mut(&channel) {
let old_pos = prev.lru_pos;
self.total_bytes = self.total_bytes.saturating_sub(prev.bytes);
prev.bytes = 0;
prev.file = file;
prev.last_read = now;
prev.lru_pos = new_pos;
self.lru.remove(&old_pos);
self.lru.insert(new_pos, channel);
return;
}
self.lru.insert(new_pos, channel.clone());
self.entries.insert(
channel,
GreedyCacheEntry {
file,
last_read: now,
bytes: 0,
origin_hash: 0,
lru_pos: new_pos,
},
);
}
pub fn set_origin_hash(&mut self, channel: &ChannelName, origin_hash: u64) {
if let Some(entry) = self.entries.get_mut(channel) {
let prev = entry.origin_hash;
entry.origin_hash = origin_hash;
if prev != origin_hash {
self.remove_origin(prev);
self.add_origin(origin_hash);
}
}
}
#[expect(
clippy::expect_used,
reason = "contains_key(channel) checked above before the get_mut call"
)]
pub fn touch(&mut self, channel: &ChannelName, now: Instant) {
if !self.entries.contains_key(channel) {
return;
}
let new_pos = self.allocate_lru_pos();
let entry = self
.entries
.get_mut(channel)
.expect("just checked contains_key");
let old_pos = entry.lru_pos;
entry.last_read = now;
entry.lru_pos = new_pos;
self.lru.remove(&old_pos);
self.lru.insert(new_pos, channel.clone());
}
pub fn note_appended(
&mut self,
channel: &ChannelName,
payload_bytes: u64,
_now: Instant,
) -> EvictionSweep {
let Some(entry) = self.entries.get_mut(channel) else {
return EvictionSweep::default();
};
entry.bytes = entry.bytes.saturating_add(payload_bytes);
self.total_bytes = self.total_bytes.saturating_add(payload_bytes);
self.evict_until_under_cap()
}
pub fn evict(&mut self, channel: &ChannelName) -> Option<GreedyCacheEntry> {
let entry = self.entries.remove(channel)?;
self.lru.remove(&entry.lru_pos);
self.total_bytes = self.total_bytes.saturating_sub(entry.bytes);
self.remove_origin(entry.origin_hash);
Some(entry)
}
pub fn set_total_cap_bytes(&mut self, new_cap: u64) -> EvictionSweep {
self.total_cap_bytes = new_cap;
self.evict_until_under_cap()
}
fn evict_oldest(&mut self) -> Option<(ChannelName, GreedyCacheEntry)> {
let (_, channel) = self.lru.iter().next()?;
let channel = channel.clone();
let entry = self.evict(&channel)?;
Some((channel, entry))
}
fn evict_until_under_cap(&mut self) -> EvictionSweep {
let mut evicted = Vec::new();
while self.total_bytes > self.total_cap_bytes {
let Some((channel, entry)) = self.evict_oldest() else {
break;
};
evicted.push(EvictedEntry {
channel,
origin_hash: entry.origin_hash,
});
}
EvictionSweep { evicted }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::redex::{Redex, RedexFileConfig};
use std::time::Duration;
fn cn(s: &str) -> ChannelName {
ChannelName::new(s).unwrap()
}
fn open_file(redex: &Redex, name: &str, cap_bytes: u64) -> RedexFile {
redex
.open_file(
&cn(name),
RedexFileConfig::default().with_retention_max_bytes(cap_bytes),
)
.expect("open cache file")
}
#[test]
fn new_registry_is_empty() {
let r = GreedyCacheRegistry::new(1024);
assert_eq!(r.len(), 0);
assert!(r.is_empty());
assert_eq!(r.total_bytes(), 0);
assert_eq!(r.total_cap_bytes(), 1024);
}
#[test]
fn upsert_registers_channel() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1_000_000);
let now = Instant::now();
let file = open_file(&redex, "test/a", 10_000);
reg.upsert(cn("test/a"), file, now);
assert_eq!(reg.len(), 1);
assert!(reg.contains(&cn("test/a")));
let entry = reg.get(&cn("test/a")).unwrap();
assert_eq!(entry.last_read, now);
assert_eq!(entry.bytes, 0);
}
#[test]
fn upsert_is_idempotent_on_reopen() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1_000_000);
let now1 = Instant::now();
reg.upsert(cn("test/a"), open_file(&redex, "test/a", 10_000), now1);
let now2 = now1 + Duration::from_secs(1);
reg.upsert(cn("test/a"), open_file(&redex, "test/a", 10_000), now2);
assert_eq!(reg.len(), 1, "reopen must not duplicate the entry");
assert_eq!(reg.get(&cn("test/a")).unwrap().last_read, now2);
}
#[test]
fn touch_updates_lru_position() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1_000_000);
let base = Instant::now();
reg.upsert(cn("test/a"), open_file(&redex, "test/a", 10_000), base);
reg.upsert(
cn("test/b"),
open_file(&redex, "test/b", 10_000),
base + Duration::from_secs(1),
);
reg.upsert(
cn("test/c"),
open_file(&redex, "test/c", 10_000),
base + Duration::from_secs(2),
);
reg.touch(&cn("test/a"), base + Duration::from_secs(3));
let oldest = reg.lru.values().next().unwrap();
assert_eq!(*oldest, cn("test/b"));
}
#[test]
fn note_appended_tracks_bytes() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1_000_000);
let now = Instant::now();
reg.upsert(cn("test/a"), open_file(&redex, "test/a", 10_000), now);
let sweep = reg.note_appended(&cn("test/a"), 500, now);
assert!(sweep.is_empty());
assert_eq!(reg.total_bytes(), 500);
assert_eq!(reg.get(&cn("test/a")).unwrap().bytes, 500);
}
#[test]
fn note_appended_on_missing_channel_is_noop() {
let mut reg = GreedyCacheRegistry::new(1_000_000);
let now = Instant::now();
let sweep = reg.note_appended(&cn("missing"), 1024, now);
assert!(sweep.is_empty());
assert_eq!(reg.total_bytes(), 0);
}
#[test]
fn cluster_cap_triggers_lru_eviction() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1024);
let base = Instant::now();
reg.upsert(cn("a"), open_file(&redex, "a", 10_000), base);
reg.upsert(
cn("b"),
open_file(&redex, "b", 10_000),
base + Duration::from_secs(1),
);
reg.upsert(
cn("c"),
open_file(&redex, "c", 10_000),
base + Duration::from_secs(2),
);
let sweep_a = reg.note_appended(&cn("a"), 600, base + Duration::from_secs(3));
assert!(sweep_a.is_empty());
let sweep_b = reg.note_appended(&cn("b"), 600, base + Duration::from_secs(4));
let names: Vec<_> = sweep_b.channels().cloned().collect();
assert_eq!(names, vec![cn("a")]);
assert!(!reg.contains(&cn("a")));
assert_eq!(reg.total_bytes(), 600);
}
#[test]
fn explicit_evict_drops_entry_and_bytes() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1_000_000);
let now = Instant::now();
reg.upsert(cn("a"), open_file(&redex, "a", 10_000), now);
reg.note_appended(&cn("a"), 5_000, now);
let entry = reg.evict(&cn("a")).expect("evict returns entry");
assert_eq!(entry.bytes, 5_000);
assert!(!reg.contains(&cn("a")));
assert_eq!(reg.total_bytes(), 0);
}
#[test]
fn shrinking_cap_runs_eviction_immediately() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(10_000);
let base = Instant::now();
reg.upsert(cn("a"), open_file(&redex, "a", 10_000), base);
reg.upsert(
cn("b"),
open_file(&redex, "b", 10_000),
base + Duration::from_secs(1),
);
reg.note_appended(&cn("a"), 4_000, base + Duration::from_secs(2));
reg.note_appended(&cn("b"), 4_000, base + Duration::from_secs(3));
assert_eq!(reg.total_bytes(), 8_000);
let sweep = reg.set_total_cap_bytes(3000);
let names: Vec<_> = sweep.channels().cloned().collect();
assert_eq!(names, vec![cn("a"), cn("b")]);
assert!(reg.is_empty());
}
#[test]
fn touch_on_read_promotes_channel_past_silent_peers() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1024);
let base = Instant::now();
reg.upsert(cn("a"), open_file(&redex, "a", 10_000), base);
reg.upsert(
cn("b"),
open_file(&redex, "b", 10_000),
base + Duration::from_secs(1),
);
reg.note_appended(&cn("a"), 500, base + Duration::from_secs(2));
reg.note_appended(&cn("b"), 400, base + Duration::from_secs(3));
reg.touch(&cn("a"), base + Duration::from_secs(4));
let sweep = reg.note_appended(&cn("b"), 200, base + Duration::from_secs(5));
let names: Vec<_> = sweep.channels().cloned().collect();
assert_eq!(names, vec![cn("b")]);
assert!(reg.contains(&cn("a")));
}
#[test]
fn eviction_sweep_carries_origin_hash_for_withdraw() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1024);
let base = Instant::now();
reg.upsert(cn("a"), open_file(&redex, "a", 10_000), base);
reg.upsert(
cn("b"),
open_file(&redex, "b", 10_000),
base + Duration::from_secs(1),
);
reg.set_origin_hash(&cn("a"), 0xAAAA_AAAA_AAAA_AAAA);
reg.set_origin_hash(&cn("b"), 0xBBBB_BBBB_BBBB_BBBB);
reg.note_appended(&cn("a"), 600, base + Duration::from_secs(2));
let sweep = reg.note_appended(&cn("b"), 600, base + Duration::from_secs(3));
assert_eq!(sweep.len(), 1, "A should evict");
let evicted = &sweep.evicted[0];
assert_eq!(evicted.channel, cn("a"));
assert_eq!(evicted.origin_hash, 0xAAAA_AAAA_AAAA_AAAA);
}
#[test]
fn resync_bytes_from_files_anchors_total_on_substrate_view() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1_000_000);
let now = Instant::now();
let per_channel_cap = 2048u64;
reg.upsert(
cn("test/a"),
open_file(&redex, "test/a", per_channel_cap),
now,
);
for _ in 0..20 {
let payload = vec![0u8; 1024];
let file = reg.get(&cn("test/a")).unwrap().file.clone();
file.append(&payload).unwrap();
reg.note_appended(&cn("test/a"), payload.len() as u64, now);
}
let pre_resync_bytes = reg.total_bytes();
assert!(
pre_resync_bytes >= 20 * 1024,
"registry must have accumulated monotonic bytes; got {}",
pre_resync_bytes,
);
let file = reg.get(&cn("test/a")).unwrap().file.clone();
file.sweep_retention();
reg.resync_bytes_from_files();
let post_resync_bytes = reg.total_bytes();
assert!(
post_resync_bytes < pre_resync_bytes,
"resync must reduce drift (pre {} > post {})",
pre_resync_bytes,
post_resync_bytes,
);
assert_eq!(reg.get(&cn("test/a")).unwrap().bytes, post_resync_bytes);
}
#[test]
fn upsert_on_reopen_subtracts_old_bytes_from_total() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1_000_000);
let now = Instant::now();
reg.upsert(cn("test/a"), open_file(&redex, "test/a", 10_000), now);
reg.note_appended(&cn("test/a"), 500, now);
assert_eq!(reg.total_bytes(), 500);
assert_eq!(reg.get(&cn("test/a")).unwrap().bytes, 500);
reg.upsert(
cn("test/a"),
open_file(&redex, "test/a", 10_000),
now + Duration::from_secs(1),
);
assert_eq!(reg.total_bytes(), 0, "reopen must subtract old bytes");
assert_eq!(reg.get(&cn("test/a")).unwrap().bytes, 0);
}
#[test]
fn eviction_sweep_origin_hash_zero_when_unset() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1024);
let base = Instant::now();
reg.upsert(cn("a"), open_file(&redex, "a", 10_000), base);
reg.upsert(
cn("b"),
open_file(&redex, "b", 10_000),
base + Duration::from_secs(1),
);
reg.note_appended(&cn("a"), 600, base + Duration::from_secs(2));
let sweep = reg.note_appended(&cn("b"), 600, base + Duration::from_secs(3));
assert_eq!(sweep.len(), 1);
assert_eq!(sweep.evicted[0].origin_hash, 0);
}
#[test]
fn contains_origin_uses_reverse_index_and_stays_aligned_across_mutations() {
let redex = Redex::new();
let mut reg = GreedyCacheRegistry::new(1_000_000);
let base = Instant::now();
assert!(!reg.contains_origin(0));
reg.upsert(cn("a"), open_file(&redex, "a", 10_000), base);
reg.upsert(
cn("b"),
open_file(&redex, "b", 10_000),
base + Duration::from_secs(1),
);
reg.set_origin_hash(&cn("a"), 0xAAAA);
reg.set_origin_hash(&cn("b"), 0xBBBB);
assert!(reg.contains_origin(0xAAAA));
assert!(reg.contains_origin(0xBBBB));
assert!(!reg.contains_origin(0xCCCC));
reg.set_origin_hash(&cn("b"), 0xAAAA);
assert!(reg.contains_origin(0xAAAA));
assert!(
!reg.contains_origin(0xBBBB),
"0xBBBB lost its last referer when B's origin was re-set to 0xAAAA"
);
reg.evict(&cn("a"));
assert!(
reg.contains_origin(0xAAAA),
"0xAAAA must remain held while B still claims it"
);
reg.evict(&cn("b"));
assert!(
!reg.contains_origin(0xAAAA),
"0xAAAA must drop after every claiming entry is gone"
);
reg.upsert(cn("c"), open_file(&redex, "c", 10_000), base);
reg.set_origin_hash(&cn("c"), 0xCCCC);
assert!(reg.contains_origin(0xCCCC));
reg.set_origin_hash(&cn("c"), 0);
assert!(!reg.contains_origin(0xCCCC));
assert!(!reg.contains_origin(0));
}
}