Skip to main content

ma_core/
inbox.rs

1//! Service inbox — a bounded FIFO receive queue with a default TTL.
2//!
3//! Wraps [`TtlQueue`] and adds a configurable default TTL so callers can
4//! `push(now, item)` without computing `expires_at` each time.
5
6use crate::ttl_queue::TtlQueue;
7
8/// A bounded FIFO receive queue with a default message TTL.
9///
10/// Items pushed without an explicit TTL use the default. Pass `default_ttl_secs = 0`
11/// for items that never expire.
12///
13/// # Examples
14///
15/// ```
16/// use ma_core::Inbox;
17///
18/// let mut inbox: Inbox<&str> = Inbox::new(64, 300);
19/// let now = 1_000_000;
20/// inbox.push(now, "hello");
21/// assert_eq!(inbox.pop(now), Some("hello"));
22///
23/// // After the TTL expires, the message is gone:
24/// inbox.push(now, "ephemeral");
25/// assert_eq!(inbox.pop(now + 301), None);
26/// ```
27#[derive(Debug, Clone)]
28pub struct Inbox<T> {
29    queue: TtlQueue<T>,
30    default_ttl_secs: u64,
31}
32
33impl<T> Inbox<T> {
34    /// Create an inbox with the given capacity and default TTL (seconds).
35    ///
36    /// `default_ttl_secs = 0` means items never expire by default.
37    pub fn new(capacity: usize, default_ttl_secs: u64) -> Self {
38        Self {
39            queue: TtlQueue::new(capacity),
40            default_ttl_secs,
41        }
42    }
43
44    /// Push an item using the default TTL.
45    pub fn push(&mut self, now: u64, item: T) {
46        let expires_at = if self.default_ttl_secs == 0 {
47            0
48        } else {
49            now.saturating_add(self.default_ttl_secs)
50        };
51        self.queue.push(now, expires_at, item);
52    }
53
54    /// Push an item with a custom TTL (seconds). `ttl_secs = 0` means no expiry.
55    pub fn push_with_ttl(&mut self, now: u64, ttl_secs: u64, item: T) {
56        let expires_at = if ttl_secs == 0 {
57            0
58        } else {
59            now.saturating_add(ttl_secs)
60        };
61        self.queue.push(now, expires_at, item);
62    }
63
64    /// Pop the oldest non-expired item.
65    pub fn pop(&mut self, now: u64) -> Option<T> {
66        self.queue.pop(now)
67    }
68
69    /// Peek at the oldest non-expired item.
70    pub fn peek(&mut self, now: u64) -> Option<&T> {
71        self.queue.peek(now)
72    }
73
74    /// Drain all non-expired items in FIFO order.
75    pub fn drain(&mut self, now: u64) -> Vec<T> {
76        self.queue.drain(now)
77    }
78
79    /// Number of items in the queue (may include not-yet-evicted expired items).
80    pub fn len(&self) -> usize {
81        self.queue.len()
82    }
83
84    /// Whether the queue is empty.
85    pub fn is_empty(&self) -> bool {
86        self.queue.is_empty()
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    #[test]
95    fn default_ttl_applied() {
96        let mut inbox = Inbox::new(8, 60);
97        inbox.push(100, "msg");
98        // At t=100, item expires at 160 — still fresh
99        assert_eq!(inbox.peek(100), Some(&"msg"));
100        // At t=161, expired
101        assert_eq!(inbox.pop(161), None);
102    }
103
104    #[test]
105    fn custom_ttl_overrides_default() {
106        let mut inbox = Inbox::new(8, 60);
107        inbox.push_with_ttl(100, 10, "short");
108        inbox.push(100, "default");
109        // At t=111, "short" is expired, "default" still fresh
110        assert_eq!(inbox.pop(111), Some("default"));
111    }
112
113    #[test]
114    fn zero_default_ttl_never_expires() {
115        let mut inbox = Inbox::new(4, 0);
116        inbox.push(100, "forever");
117        assert_eq!(inbox.pop(u64::MAX), Some("forever"));
118    }
119}