Skip to main content

ma_core/
ttl_queue.rs

1//! A bounded FIFO queue with per-item TTL and lazy eviction.
2//!
3//! Designed for service mailboxes: incoming messages sit in a queue until
4//! consumed or expired. Wasm-compatible (no std::time — caller supplies
5//! `now` as a Unix-epoch seconds value).
6
7use std::collections::VecDeque;
8
9/// A bounded FIFO queue where each item carries an expiry timestamp.
10///
11/// Expired items are evicted lazily on `push`, `pop`, and `drain`.
12/// When the queue is at capacity, the oldest item is dropped on `push`.
13///
14/// # Examples
15///
16/// ```
17/// use ma_core::TtlQueue;
18///
19/// let mut q = TtlQueue::new(8);
20/// let now = 100;
21/// q.push(now, now + 60, "expires in 60s");
22/// q.push(now, 0, "never expires");
23///
24/// // Both present at t=100
25/// assert_eq!(q.drain(now).len(), 2);
26/// ```
27#[derive(Debug, Clone)]
28pub struct TtlQueue<T> {
29    buf: VecDeque<(u64, T)>,
30    capacity: usize,
31}
32
33impl<T> TtlQueue<T> {
34    /// Create a new queue with the given maximum capacity.
35    ///
36    /// # Panics
37    /// Panics if `capacity` is 0.
38    pub fn new(capacity: usize) -> Self {
39        assert!(capacity > 0, "TtlQueue capacity must be > 0");
40        Self {
41            buf: VecDeque::with_capacity(capacity),
42            capacity,
43        }
44    }
45
46    /// Push an item with an absolute expiry timestamp (Unix epoch seconds).
47    ///
48    /// Pass `expires_at = 0` for items that never expire.
49    /// If the queue is full after eviction, the oldest item is silently dropped.
50    pub fn push(&mut self, now: u64, expires_at: u64, item: T) {
51        self.evict(now);
52        if self.buf.len() >= self.capacity {
53            self.buf.pop_front();
54        }
55        self.buf.push_back((expires_at, item));
56    }
57
58    /// Pop the oldest non-expired item, evicting any expired head entries first.
59    pub fn pop(&mut self, now: u64) -> Option<T> {
60        self.evict(now);
61        self.buf.pop_front().map(|(_, item)| item)
62    }
63
64    /// Peek at the oldest non-expired item without removing it.
65    pub fn peek(&mut self, now: u64) -> Option<&T> {
66        self.evict(now);
67        self.buf.front().map(|(_, item)| item)
68    }
69
70    /// Drain all non-expired items, returning them in FIFO order.
71    pub fn drain(&mut self, now: u64) -> Vec<T> {
72        self.evict(now);
73        self.buf.drain(..).map(|(_, item)| item).collect()
74    }
75
76    /// Number of items currently in the queue (including not-yet-evicted expired items).
77    pub fn len(&self) -> usize {
78        self.buf.len()
79    }
80
81    /// Whether the queue is empty.
82    pub fn is_empty(&self) -> bool {
83        self.buf.is_empty()
84    }
85
86    /// Remove expired items from the front of the queue.
87    fn evict(&mut self, now: u64) {
88        while let Some(&(expires_at, _)) = self.buf.front() {
89            if expires_at > 0 && expires_at <= now {
90                self.buf.pop_front();
91            } else {
92                break;
93            }
94        }
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn push_pop_basic() {
104        let mut q = TtlQueue::new(8);
105        q.push(100, 200, "a");
106        q.push(100, 200, "b");
107        assert_eq!(q.pop(100), Some("a"));
108        assert_eq!(q.pop(100), Some("b"));
109        assert_eq!(q.pop(100), None);
110    }
111
112    #[test]
113    fn expired_items_evicted() {
114        let mut q = TtlQueue::new(8);
115        q.push(100, 150, "old");
116        q.push(100, 300, "fresh");
117        // At t=200, "old" (expires_at=150) should be evicted
118        assert_eq!(q.pop(200), Some("fresh"));
119        assert!(q.is_empty());
120    }
121
122    #[test]
123    fn zero_ttl_never_expires() {
124        let mut q = TtlQueue::new(4);
125        q.push(100, 0, "forever");
126        // Even far in the future, expires_at=0 means no expiry
127        assert_eq!(q.pop(u64::MAX), Some("forever"));
128    }
129
130    #[test]
131    fn capacity_drops_oldest() {
132        let mut q = TtlQueue::new(2);
133        q.push(100, 0, "a");
134        q.push(100, 0, "b");
135        q.push(100, 0, "c"); // "a" should be dropped
136        assert_eq!(q.len(), 2);
137        assert_eq!(q.pop(100), Some("b"));
138        assert_eq!(q.pop(100), Some("c"));
139    }
140
141    #[test]
142    fn drain_returns_all_non_expired() {
143        let mut q = TtlQueue::new(8);
144        q.push(100, 150, "expired");
145        q.push(100, 300, "ok1");
146        q.push(100, 0, "ok2");
147        let items = q.drain(200);
148        assert_eq!(items, vec!["ok1", "ok2"]);
149        assert!(q.is_empty());
150    }
151
152    #[test]
153    fn peek_does_not_remove() {
154        let mut q = TtlQueue::new(4);
155        q.push(100, 0, "x");
156        assert_eq!(q.peek(100), Some(&"x"));
157        assert_eq!(q.len(), 1);
158    }
159
160    #[test]
161    #[should_panic(expected = "capacity must be > 0")]
162    fn zero_capacity_panics() {
163        TtlQueue::<u8>::new(0);
164    }
165}