const FINE_SLOTS: usize = 256;
const COARSE_SLOTS: usize = 64;
const COARSE_RANGE: u64 = (FINE_SLOTS * COARSE_SLOTS) as u64;
#[derive(Debug, Clone)]
struct ExpiryEntry {
key: Vec<u8>,
expire_at_ms: u64,
}
pub struct ReapBatch {
pub expired: Vec<(Vec<u8>, u64)>,
pub remaining: usize,
}
pub struct ExpiryWheel {
fine: Vec<Vec<ExpiryEntry>>,
coarse: Vec<Vec<ExpiryEntry>>,
spillover: Vec<ExpiryEntry>,
base_ms: u64,
tick_ms: u64,
fine_cursor: usize,
coarse_cursor: usize,
reap_budget: usize,
total_entries: usize,
deferred: Vec<ExpiryEntry>,
}
impl ExpiryWheel {
pub fn new(now_ms: u64, tick_ms: u64, reap_budget: usize) -> Self {
Self {
fine: (0..FINE_SLOTS).map(|_| Vec::new()).collect(),
coarse: (0..COARSE_SLOTS).map(|_| Vec::new()).collect(),
spillover: Vec::new(),
base_ms: now_ms,
tick_ms,
fine_cursor: 0,
coarse_cursor: 0,
reap_budget,
total_entries: 0,
deferred: Vec::new(),
}
}
pub fn len(&self) -> usize {
self.total_entries
}
pub fn is_empty(&self) -> bool {
self.total_entries == 0
}
pub fn backlog(&self) -> usize {
self.deferred.len()
}
pub fn insert(&mut self, key: Vec<u8>, expire_at_ms: u64) {
let entry = ExpiryEntry { key, expire_at_ms };
self.place_entry(entry);
self.total_entries += 1;
}
pub fn cancel(&mut self, key: &[u8], expire_at_ms: u64) -> bool {
if let Some(pos) = self
.deferred
.iter()
.position(|e| e.key == key && e.expire_at_ms == expire_at_ms)
{
self.deferred.swap_remove(pos);
self.total_entries -= 1;
return true;
}
let ticks_from_base = expire_at_ms.saturating_sub(self.base_ms) / self.tick_ms;
if ticks_from_base < FINE_SLOTS as u64 {
let slot = (self.fine_cursor as u64 + ticks_from_base) as usize % FINE_SLOTS;
return remove_from_slot(
&mut self.fine[slot],
key,
expire_at_ms,
&mut self.total_entries,
);
}
if ticks_from_base < COARSE_RANGE {
let coarse_offset = (ticks_from_base - FINE_SLOTS as u64) / FINE_SLOTS as u64;
let slot = (self.coarse_cursor as u64 + coarse_offset) as usize % COARSE_SLOTS;
return remove_from_slot(
&mut self.coarse[slot],
key,
expire_at_ms,
&mut self.total_entries,
);
}
remove_from_slot(
&mut self.spillover,
key,
expire_at_ms,
&mut self.total_entries,
)
}
pub fn tick(&mut self, now_ms: u64) -> ReapBatch {
let mut expired = Vec::new();
while !self.deferred.is_empty() && expired.len() < self.reap_budget {
let entry = self.deferred.pop().unwrap();
expired.push((entry.key, entry.expire_at_ms));
self.total_entries -= 1;
}
if expired.len() >= self.reap_budget {
return ReapBatch {
remaining: self.deferred.len(),
expired,
};
}
for slot in &mut self.fine {
let entries = std::mem::take(slot);
for entry in entries {
if entry.expire_at_ms <= now_ms {
if expired.len() < self.reap_budget {
expired.push((entry.key, entry.expire_at_ms));
self.total_entries -= 1;
} else {
self.deferred.push(entry);
}
} else {
slot.push(entry);
}
}
}
for slot in &mut self.coarse {
let entries = std::mem::take(slot);
for entry in entries {
if entry.expire_at_ms <= now_ms {
if expired.len() < self.reap_budget {
expired.push((entry.key, entry.expire_at_ms));
self.total_entries -= 1;
} else {
self.deferred.push(entry);
}
} else {
slot.push(entry);
}
}
}
let spilled = std::mem::take(&mut self.spillover);
for entry in spilled {
if entry.expire_at_ms <= now_ms {
if expired.len() < self.reap_budget {
expired.push((entry.key, entry.expire_at_ms));
self.total_entries -= 1;
} else {
self.deferred.push(entry);
}
} else {
self.spillover.push(entry);
}
}
self.base_ms = now_ms;
let remaining = self.deferred.len();
ReapBatch { expired, remaining }
}
fn place_entry(&mut self, entry: ExpiryEntry) {
let ticks_from_base = entry.expire_at_ms.saturating_sub(self.base_ms) / self.tick_ms;
if ticks_from_base < FINE_SLOTS as u64 {
let slot = (self.fine_cursor as u64 + ticks_from_base) as usize % FINE_SLOTS;
self.fine[slot].push(entry);
return;
}
if ticks_from_base < COARSE_RANGE {
let coarse_offset = (ticks_from_base - FINE_SLOTS as u64) / FINE_SLOTS as u64;
let slot = (self.coarse_cursor as u64 + coarse_offset) as usize % COARSE_SLOTS;
self.coarse[slot].push(entry);
return;
}
self.spillover.push(entry);
}
}
fn remove_from_slot(
slot: &mut Vec<ExpiryEntry>,
key: &[u8],
expire_at_ms: u64,
total_entries: &mut usize,
) -> bool {
if let Some(pos) = slot
.iter()
.position(|e| e.key == key && e.expire_at_ms == expire_at_ms)
{
slot.swap_remove(pos);
*total_entries -= 1;
true
} else {
false
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn insert_and_tick_expires_key() {
let mut w = ExpiryWheel::new(0, 1000, 1024);
w.insert(b"k1".to_vec(), 2000);
assert_eq!(w.len(), 1);
let batch = w.tick(1000);
assert!(batch.expired.is_empty());
let batch = w.tick(2000);
assert_eq!(batch.expired.len(), 1);
assert_eq!(batch.expired[0].0, b"k1");
assert_eq!(w.len(), 0);
}
#[test]
fn cancel_removes_entry() {
let mut w = ExpiryWheel::new(0, 1000, 1024);
w.insert(b"k1".to_vec(), 5000);
assert_eq!(w.len(), 1);
assert!(w.cancel(b"k1", 5000));
assert_eq!(w.len(), 0);
let batch = w.tick(6000);
assert!(batch.expired.is_empty());
}
#[test]
fn cancel_wrong_expire_fails() {
let mut w = ExpiryWheel::new(0, 1000, 1024);
w.insert(b"k1".to_vec(), 5000);
assert!(!w.cancel(b"k1", 9999)); assert_eq!(w.len(), 1);
}
#[test]
fn reap_budget_limits_per_tick() {
let mut w = ExpiryWheel::new(0, 1000, 3);
for i in 0..10u32 {
w.insert(i.to_be_bytes().to_vec(), 1000);
}
assert_eq!(w.len(), 10);
let batch = w.tick(1000);
assert_eq!(batch.expired.len(), 3);
assert!(batch.remaining > 0);
let batch = w.tick(1000);
assert_eq!(batch.expired.len(), 3);
let batch = w.tick(1000);
assert_eq!(batch.expired.len(), 3);
let batch = w.tick(1000);
assert_eq!(batch.expired.len(), 1);
assert_eq!(batch.remaining, 0);
assert_eq!(w.len(), 0);
}
#[test]
fn multiple_keys_same_slot() {
let mut w = ExpiryWheel::new(0, 1000, 1024);
w.insert(b"a".to_vec(), 3000);
w.insert(b"b".to_vec(), 3000);
w.insert(b"c".to_vec(), 3000);
assert_eq!(w.len(), 3);
let batch = w.tick(3000);
assert_eq!(batch.expired.len(), 3);
assert_eq!(w.len(), 0);
}
#[test]
fn coarse_slot_cascade() {
let mut w = ExpiryWheel::new(0, 1000, 1024);
w.insert(b"far".to_vec(), 300_000);
assert_eq!(w.len(), 1);
let batch = w.tick(299_000);
assert!(batch.expired.is_empty());
let batch = w.tick(300_000);
assert_eq!(batch.expired.len(), 1);
assert_eq!(batch.expired[0].0, b"far");
}
#[test]
fn spillover_for_very_far_future() {
let mut w = ExpiryWheel::new(0, 1000, 1024);
w.insert(b"distant".to_vec(), 100_000_000);
assert_eq!(w.len(), 1);
let batch = w.tick(99_999_000);
assert!(batch.expired.is_empty());
let batch = w.tick(100_000_000);
assert_eq!(batch.expired.len(), 1);
}
#[test]
fn backlog_gauge_tracks_deferred() {
let mut w = ExpiryWheel::new(0, 1000, 2);
for i in 0..5u32 {
w.insert(i.to_be_bytes().to_vec(), 1000);
}
let batch = w.tick(1000);
assert_eq!(batch.expired.len(), 2);
assert!(w.backlog() > 0);
let batch = w.tick(1000);
assert_eq!(batch.expired.len(), 2);
let batch = w.tick(1000);
assert_eq!(batch.expired.len(), 1);
assert_eq!(w.backlog(), 0);
}
}