Skip to main content

ma_core/
inbox.rs

1//! Service inbox — a bounded FIFO receive queue with per-message TTL.
2//!
3//! Messages are pushed by the [`MaEndpoint`](crate::endpoint::MaEndpoint)
4//! implementation after validating incoming data. Each message carries its
5//! own `created_at` and `ttl` — the endpoint computes `expires_at` from
6//! those fields. Consumers only read from the inbox via
7//! [`pop`](Inbox::pop), [`peek`](Inbox::peek), or [`drain`](Inbox::drain).
8
9use crate::ttl_queue::TtlQueue;
10use std::sync::{Arc, Mutex};
11
12/// A bounded FIFO receive queue for incoming ma messages.
13///
14/// Only the endpoint pushes messages into the inbox after validation.
15/// Expiry is determined per-message from the message's own `created_at + ttl`.
16/// Consumers read via [`pop`](Inbox::pop), [`peek`](Inbox::peek),
17/// or [`drain`](Inbox::drain).
18#[derive(Debug)]
19pub struct Inbox<T> {
20    queue: Arc<Mutex<TtlQueue<T>>>,
21}
22
23impl<T> Clone for Inbox<T> {
24    fn clone(&self) -> Self {
25        Self {
26            queue: Arc::clone(&self.queue),
27        }
28    }
29}
30
31impl<T> Inbox<T> {
32    /// Create an inbox with the given capacity.
33    pub fn new(capacity: usize) -> Self {
34        Self {
35            queue: Arc::new(Mutex::new(TtlQueue::new(capacity))),
36        }
37    }
38
39    /// Push an item with a computed expiry timestamp.
40    ///
41    /// `expires_at` should be `message.created_at + message.ttl`.
42    /// Pass `expires_at = 0` for items that never expire.
43    ///
44    /// Used by endpoint implementations and for local in-process delivery
45    /// (e.g. world routing a message directly to an object/room inbox).
46    pub fn push(&self, now: u64, expires_at: u64, item: T) {
47        self.queue
48            .lock()
49            .unwrap_or_else(|poisoned| poisoned.into_inner())
50            .push(now, expires_at, item);
51    }
52
53    /// Pop the oldest non-expired item.
54    pub fn pop(&self, now: u64) -> Option<T> {
55        self.queue
56            .lock()
57            .unwrap_or_else(|poisoned| poisoned.into_inner())
58            .pop(now)
59    }
60
61    /// Peek at the oldest non-expired item.
62    pub fn peek(&self, now: u64) -> Option<T>
63    where
64        T: Clone,
65    {
66        self.queue
67            .lock()
68            .unwrap_or_else(|poisoned| poisoned.into_inner())
69            .peek(now)
70            .cloned()
71    }
72
73    /// Drain all non-expired items in FIFO order.
74    pub fn drain(&self, now: u64) -> Vec<T> {
75        self.queue
76            .lock()
77            .unwrap_or_else(|poisoned| poisoned.into_inner())
78            .drain(now)
79    }
80
81    /// Number of items in the queue (may include not-yet-evicted expired items).
82    pub fn len(&self) -> usize {
83        self.queue
84            .lock()
85            .unwrap_or_else(|poisoned| poisoned.into_inner())
86            .len()
87    }
88
89    /// Whether the queue is empty.
90    pub fn is_empty(&self) -> bool {
91        self.queue
92            .lock()
93            .unwrap_or_else(|poisoned| poisoned.into_inner())
94            .is_empty()
95    }
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn message_ttl_respected() {
104        let inbox = Inbox::new(8);
105        // Message created at t=100 with ttl=60 → expires at 160
106        inbox.push(100, 160, "msg");
107        assert_eq!(inbox.peek(100), Some("msg"));
108        assert_eq!(inbox.pop(161), None);
109    }
110
111    #[test]
112    fn different_message_ttls() {
113        let inbox = Inbox::new(8);
114        // Short-lived message: created_at=100, ttl=10 → expires_at=110
115        inbox.push(100, 110, "short");
116        // Long-lived message: created_at=100, ttl=60 → expires_at=160
117        inbox.push(100, 160, "long");
118        // At t=111, "short" is expired, "long" still fresh
119        assert_eq!(inbox.pop(111), Some("long"));
120    }
121
122    #[test]
123    fn zero_expires_at_never_expires() {
124        let inbox = Inbox::new(4);
125        inbox.push(100, 0, "forever");
126        assert_eq!(inbox.pop(u64::MAX), Some("forever"));
127    }
128}