1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//! A bounded FIFO queue with per-item TTL and lazy eviction.
//!
//! Designed for service mailboxes: incoming messages sit in a queue until
//! consumed or expired. Wasm-compatible (no `std::time` — caller supplies
//! `now` as a Unix-epoch seconds value).
use std::collections::VecDeque;
/// A bounded FIFO queue where each item carries an expiry timestamp.
///
/// Expired items are evicted lazily on `push`, `pop`, and `drain`.
/// When the queue is at capacity, the oldest item is dropped on `push`.
#[derive(Debug, Clone)]
pub struct TtlQueue<T> {
buf: VecDeque<(u64, T)>,
capacity: usize,
}
impl<T> TtlQueue<T> {
/// Create a new queue with the given maximum capacity.
///
/// # Panics
/// Panics if `capacity` is 0.
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0, "TtlQueue capacity must be > 0");
Self {
buf: VecDeque::with_capacity(capacity),
capacity,
}
}
/// Push an item with an absolute expiry timestamp (Unix epoch seconds).
///
/// Pass `expires_at = 0` for items that never expire.
/// If the queue is full after eviction, the oldest item is silently dropped.
pub fn push(&mut self, now: u64, expires_at: u64, item: T) {
self.evict(now);
if self.buf.len() >= self.capacity {
self.buf.pop_front();
}
self.buf.push_back((expires_at, item));
}
/// Pop the oldest non-expired item, evicting any expired head entries first.
pub fn pop(&mut self, now: u64) -> Option<T> {
self.evict(now);
self.buf.pop_front().map(|(_, item)| item)
}
/// Peek at the oldest non-expired item without removing it.
pub fn peek(&mut self, now: u64) -> Option<&T> {
self.evict(now);
self.buf.front().map(|(_, item)| item)
}
/// Drain all non-expired items, returning them in FIFO order.
pub fn drain(&mut self, now: u64) -> Vec<T> {
self.evict(now);
self.buf.drain(..).map(|(_, item)| item).collect()
}
/// Number of items currently in the queue (including not-yet-evicted expired items).
pub fn len(&self) -> usize {
self.buf.len()
}
/// Whether the queue is empty.
pub fn is_empty(&self) -> bool {
self.buf.is_empty()
}
/// Remove expired items from the front of the queue.
fn evict(&mut self, now: u64) {
while let Some(&(expires_at, _)) = self.buf.front() {
if expires_at > 0 && expires_at <= now {
self.buf.pop_front();
} else {
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_pop_basic() {
let mut q = TtlQueue::new(8);
q.push(100, 200, "a");
q.push(100, 200, "b");
assert_eq!(q.pop(100), Some("a"));
assert_eq!(q.pop(100), Some("b"));
assert_eq!(q.pop(100), None);
}
#[test]
fn expired_items_evicted() {
let mut q = TtlQueue::new(8);
q.push(100, 150, "old");
q.push(100, 300, "fresh");
// At t=200, "old" (expires_at=150) should be evicted
assert_eq!(q.pop(200), Some("fresh"));
assert!(q.is_empty());
}
#[test]
fn zero_ttl_never_expires() {
let mut q = TtlQueue::new(4);
q.push(100, 0, "forever");
// Even far in the future, expires_at=0 means no expiry
assert_eq!(q.pop(u64::MAX), Some("forever"));
}
#[test]
fn capacity_drops_oldest() {
let mut q = TtlQueue::new(2);
q.push(100, 0, "a");
q.push(100, 0, "b");
q.push(100, 0, "c"); // "a" should be dropped
assert_eq!(q.len(), 2);
assert_eq!(q.pop(100), Some("b"));
assert_eq!(q.pop(100), Some("c"));
}
#[test]
fn drain_returns_all_non_expired() {
let mut q = TtlQueue::new(8);
q.push(100, 150, "expired");
q.push(100, 300, "ok1");
q.push(100, 0, "ok2");
let items = q.drain(200);
assert_eq!(items, vec!["ok1", "ok2"]);
assert!(q.is_empty());
}
#[test]
fn peek_does_not_remove() {
let mut q = TtlQueue::new(4);
q.push(100, 0, "x");
assert_eq!(q.peek(100), Some(&"x"));
assert_eq!(q.len(), 1);
}
#[test]
#[should_panic(expected = "capacity must be > 0")]
fn zero_capacity_panics() {
TtlQueue::<u8>::new(0);
}
}