use std::collections::{HashMap, VecDeque};
use std::sync::Mutex;
use std::time::{Duration, Instant};
const MAX_ENTRIES: usize = 1024;
const DEFAULT_TTL: Duration = Duration::from_secs(30);
#[derive(Clone)]
struct Entry {
origin: Option<String>,
inserted_at: Instant,
}
pub(crate) struct PendingOrigins {
map: Mutex<HashMap<String, VecDeque<Entry>>>,
ttl: Duration,
}
impl PendingOrigins {
pub fn new() -> Self {
Self::with_ttl(DEFAULT_TTL)
}
pub fn with_ttl(ttl: Duration) -> Self {
Self {
map: Mutex::new(HashMap::new()),
ttl,
}
}
pub fn insert(&self, doc_key: String, origin: Option<String>) {
let now = Instant::now();
let mut map = self.map.lock().unwrap();
sweep_expired(&mut map, now, self.ttl);
map.entry(doc_key).or_default().push_back(Entry {
origin,
inserted_at: now,
});
cap_total(&mut map, MAX_ENTRIES);
}
pub fn pop(&self, doc_key: &str) -> Option<String> {
let now = Instant::now();
let mut map = self.map.lock().unwrap();
let queue = map.get_mut(doc_key)?;
let entry = queue.pop_front()?;
if queue.is_empty() {
map.remove(doc_key);
}
if now.saturating_duration_since(entry.inserted_at) > self.ttl {
return None;
}
entry.origin
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.map.lock().unwrap().values().map(|q| q.len()).sum()
}
}
impl Default for PendingOrigins {
fn default() -> Self {
Self::new()
}
}
fn sweep_expired(map: &mut HashMap<String, VecDeque<Entry>>, now: Instant, ttl: Duration) {
map.retain(|_, queue| {
while let Some(front) = queue.front() {
if now.saturating_duration_since(front.inserted_at) > ttl {
queue.pop_front();
} else {
break;
}
}
!queue.is_empty()
});
}
fn cap_total(map: &mut HashMap<String, VecDeque<Entry>>, max: usize) {
let total: usize = map.values().map(|q| q.len()).sum();
if total <= max {
return;
}
let mut overage = total - max;
while overage > 0 {
let oldest_key = {
map.iter()
.filter_map(|(k, q)| q.front().map(|e| (k.clone(), e.inserted_at)))
.min_by_key(|(_, t)| *t)
.map(|(k, _)| k)
};
let Some(k) = oldest_key else { break };
let queue = match map.get_mut(&k) {
Some(q) => q,
None => break,
};
queue.pop_front();
if queue.is_empty() {
map.remove(&k);
}
overage -= 1;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn insert_then_pop_returns_origin() {
let p = PendingOrigins::new();
p.insert("tracks:doc-1".into(), Some("ble".into()));
assert_eq!(p.pop("tracks:doc-1"), Some("ble".into()));
}
#[test]
fn pop_consumes_entry() {
let p = PendingOrigins::new();
p.insert("tracks:doc-1".into(), Some("ble".into()));
let _ = p.pop("tracks:doc-1");
assert_eq!(
p.pop("tracks:doc-1"),
None,
"second pop must be empty (consumed on read)"
);
}
#[test]
fn pop_absent_key_returns_none() {
let p = PendingOrigins::new();
assert_eq!(p.pop("nope"), None);
}
#[test]
fn rapid_same_key_drains_in_fifo_order() {
let p = PendingOrigins::new();
for i in 0..5 {
p.insert("tracks:doc-1".into(), Some(format!("origin-{}", i)));
}
assert_eq!(p.len(), 5);
for i in 0..5 {
assert_eq!(p.pop("tracks:doc-1"), Some(format!("origin-{}", i)));
}
assert_eq!(p.pop("tracks:doc-1"), None, "queue exhausted");
assert_eq!(p.len(), 0);
}
#[test]
fn empty_queue_removed_after_drain() {
let p = PendingOrigins::new();
p.insert("tracks:doc-1".into(), Some("ble".into()));
let _ = p.pop("tracks:doc-1");
assert_eq!(p.len(), 0);
}
#[test]
fn ttl_sweep_drops_expired_on_next_insert() {
let p = PendingOrigins::with_ttl(Duration::from_millis(50));
p.insert("tracks:expires".into(), Some("ble".into()));
std::thread::sleep(Duration::from_millis(80));
p.insert("tracks:fresh".into(), Some("iroh".into()));
assert_eq!(p.len(), 1, "expired entry must be swept on next insert");
assert_eq!(
p.pop("tracks:expires"),
None,
"expired entry must not be poppable"
);
assert_eq!(p.pop("tracks:fresh"), Some("iroh".into()));
}
#[test]
fn pop_returns_none_for_expired_entry() {
let p = PendingOrigins::with_ttl(Duration::from_millis(20));
p.insert("tracks:doc-1".into(), Some("ble".into()));
std::thread::sleep(Duration::from_millis(50));
assert_eq!(p.pop("tracks:doc-1"), None);
}
#[test]
fn bounded_cap_evicts_oldest_when_full() {
let p = PendingOrigins::new();
for i in 0..(MAX_ENTRIES + 100) {
p.insert(format!("tracks:doc-{}", i), Some("ble".into()));
}
assert!(
p.len() <= MAX_ENTRIES,
"total {} exceeds cap {}",
p.len(),
MAX_ENTRIES
);
assert_eq!(p.pop("tracks:doc-0"), None);
assert_eq!(
p.pop(&format!("tracks:doc-{}", MAX_ENTRIES + 99)),
Some("ble".into())
);
}
#[test]
fn bounded_cap_evicts_oldest_across_keys() {
let p = PendingOrigins::new();
for i in 0..MAX_ENTRIES {
p.insert("a".into(), Some(format!("a-{}", i)));
}
for i in 0..50 {
p.insert("b".into(), Some(format!("b-{}", i)));
}
assert!(p.len() <= MAX_ENTRIES);
for i in 0..50 {
assert_eq!(
p.pop("b"),
Some(format!("b-{}", i)),
"b queue must be intact in FIFO order"
);
}
assert_eq!(
p.pop("a"),
Some("a-50".to_string()),
"key a must have dropped its first 50 entries"
);
}
#[test]
fn none_origin_round_trips() {
let p = PendingOrigins::new();
p.insert("tracks:doc-1".into(), None);
assert_eq!(p.pop("tracks:doc-1"), None);
assert_eq!(p.pop("tracks:doc-1"), None);
assert_eq!(p.len(), 0);
}
}