ma_core/inbox.rs
1//! Service inbox — a bounded FIFO receive queue with per-message TTL.
2//!
3//! Messages are pushed by the [`MaEndpoint`](crate::endpoint::MaEndpoint)
4//! implementation after validating incoming data. Each message carries its
5//! own `created_at` and `ttl` — the endpoint computes `expires_at` from
6//! those fields. Consumers only read from the inbox via
7//! [`pop`](Inbox::pop), [`peek`](Inbox::peek), or [`drain`](Inbox::drain).
8
9use crate::ttl_queue::TtlQueue;
10
11/// A bounded FIFO receive queue for incoming ma messages.
12///
13/// Only the endpoint pushes messages into the inbox after validation.
14/// Expiry is determined per-message from the message's own `created_at + ttl`.
15/// Consumers read via [`pop`](Inbox::pop), [`peek`](Inbox::peek),
16/// or [`drain`](Inbox::drain).
17#[derive(Debug, Clone)]
18pub struct Inbox<T> {
19 queue: TtlQueue<T>,
20}
21
22impl<T> Inbox<T> {
23 /// Create an inbox with the given capacity.
24 pub fn new(capacity: usize) -> Self {
25 Self {
26 queue: TtlQueue::new(capacity),
27 }
28 }
29
30 /// Push an item with a computed expiry timestamp.
31 ///
32 /// `expires_at` should be `message.created_at + message.ttl`.
33 /// Pass `expires_at = 0` for items that never expire.
34 ///
35 /// This is `pub(crate)` — only endpoint implementations should
36 /// write to an inbox.
37 pub(crate) fn push(&mut self, now: u64, expires_at: u64, item: T) {
38 self.queue.push(now, expires_at, item);
39 }
40
41 /// Pop the oldest non-expired item.
42 pub fn pop(&mut self, now: u64) -> Option<T> {
43 self.queue.pop(now)
44 }
45
46 /// Peek at the oldest non-expired item.
47 pub fn peek(&mut self, now: u64) -> Option<&T> {
48 self.queue.peek(now)
49 }
50
51 /// Drain all non-expired items in FIFO order.
52 pub fn drain(&mut self, now: u64) -> Vec<T> {
53 self.queue.drain(now)
54 }
55
56 /// Number of items in the queue (may include not-yet-evicted expired items).
57 pub fn len(&self) -> usize {
58 self.queue.len()
59 }
60
61 /// Whether the queue is empty.
62 pub fn is_empty(&self) -> bool {
63 self.queue.is_empty()
64 }
65}
66
67#[cfg(test)]
68mod tests {
69 use super::*;
70
71 #[test]
72 fn message_ttl_respected() {
73 let mut inbox = Inbox::new(8);
74 // Message created at t=100 with ttl=60 → expires at 160
75 inbox.push(100, 160, "msg");
76 assert_eq!(inbox.peek(100), Some(&"msg"));
77 assert_eq!(inbox.pop(161), None);
78 }
79
80 #[test]
81 fn different_message_ttls() {
82 let mut inbox = Inbox::new(8);
83 // Short-lived message: created_at=100, ttl=10 → expires_at=110
84 inbox.push(100, 110, "short");
85 // Long-lived message: created_at=100, ttl=60 → expires_at=160
86 inbox.push(100, 160, "long");
87 // At t=111, "short" is expired, "long" still fresh
88 assert_eq!(inbox.pop(111), Some("long"));
89 }
90
91 #[test]
92 fn zero_expires_at_never_expires() {
93 let mut inbox = Inbox::new(4);
94 inbox.push(100, 0, "forever");
95 assert_eq!(inbox.pop(u64::MAX), Some("forever"));
96 }
97}