Skip to main content

atomr_core/actor/
stash.rs

1//! Stash — buffers messages that should be deferred until `unstash_all`.
2//!
3//! Phase 3.6 of `docs/full-port-plan.md`. Two layers:
4//!
5//! * The unbounded stash storage on [`crate::actor::Context`] is the
6//!   default and matches the legacy API.
7//! * [`BoundedStash`] is a free-standing bounded buffer with a
8//!   pluggable [`StashOverflow`] policy. Actor authors can hold one
9//!   per actor instance for back-pressure-aware stashing.
10//!
11//! `Stash` (marker trait) stays as an opt-in marker that an actor uses
12//! the stash storage on its `Context`.
13
14use std::collections::VecDeque;
15
16/// Marker — any actor may opt in to document stash usage.
17/// Stash storage itself is provided unconditionally by `Context`.
18pub trait Stash {}
19
20/// What to do when the stash is full and a new message arrives.
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22#[non_exhaustive]
23pub enum StashOverflow {
24    /// Drop the oldest stashed message; queue the new one.
25    DropOldest,
26    /// Drop the new message.
27    DropNewest,
28    /// Drop the new message AND surface it to the caller as an error
29    /// (so the runtime can route it to DeadLetters).
30    Reject,
31}
32
33/// Result of [`BoundedStash::stash`].
34#[derive(Debug, Clone, PartialEq, Eq)]
35#[non_exhaustive]
36pub enum StashResult<M> {
37    /// Stashed successfully; depth is the new buffer length.
38    Stashed { depth: usize },
39    /// `Reject` policy refused the message; caller must route it
40    /// (e.g. publish on DeadLetters).
41    Rejected(M),
42    /// `DropOldest` displaced an existing message; caller may want
43    /// to surface it on DeadLetters.
44    DroppedOldest(M),
45    /// `DropNewest` discarded the incoming message — depth unchanged.
46    DroppedNewest,
47}
48
49/// Bounded stash with a configurable overflow policy.
50pub struct BoundedStash<M> {
51    capacity: usize,
52    policy: StashOverflow,
53    buf: VecDeque<M>,
54}
55
56impl<M> BoundedStash<M> {
57    pub fn new(capacity: usize, policy: StashOverflow) -> Self {
58        assert!(capacity >= 1, "capacity must be >= 1");
59        Self { capacity, policy, buf: VecDeque::with_capacity(capacity) }
60    }
61
62    pub fn capacity(&self) -> usize {
63        self.capacity
64    }
65
66    pub fn len(&self) -> usize {
67        self.buf.len()
68    }
69
70    pub fn is_empty(&self) -> bool {
71        self.buf.is_empty()
72    }
73
74    pub fn is_full(&self) -> bool {
75        self.buf.len() >= self.capacity
76    }
77
78    /// Stash `msg`, applying the configured overflow policy if
79    /// the buffer is full.
80    pub fn stash(&mut self, msg: M) -> StashResult<M> {
81        if !self.is_full() {
82            self.buf.push_back(msg);
83            return StashResult::Stashed { depth: self.buf.len() };
84        }
85        match self.policy {
86            StashOverflow::DropOldest => {
87                let dropped = self.buf.pop_front();
88                self.buf.push_back(msg);
89                match dropped {
90                    Some(old) => StashResult::DroppedOldest(old),
91                    None => StashResult::Stashed { depth: self.buf.len() },
92                }
93            }
94            StashOverflow::DropNewest => StashResult::DroppedNewest,
95            StashOverflow::Reject => StashResult::Rejected(msg),
96        }
97    }
98
99    /// Drain the stash front-to-back. Maintains
100    /// "messages prepended in order" semantic — caller front-prepends
101    /// to the mailbox.
102    pub fn unstash_all(&mut self) -> Vec<M> {
103        let mut out = Vec::with_capacity(self.buf.len());
104        while let Some(m) = self.buf.pop_front() {
105            out.push(m);
106        }
107        out
108    }
109
110    /// Pop a single stashed message (oldest first).
111    pub fn pop(&mut self) -> Option<M> {
112        self.buf.pop_front()
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119
120    #[test]
121    fn stash_until_capacity() {
122        let mut s = BoundedStash::<u32>::new(3, StashOverflow::Reject);
123        assert!(matches!(s.stash(1), StashResult::Stashed { depth: 1 }));
124        assert!(matches!(s.stash(2), StashResult::Stashed { depth: 2 }));
125        assert!(matches!(s.stash(3), StashResult::Stashed { depth: 3 }));
126        assert!(s.is_full());
127    }
128
129    #[test]
130    fn reject_returns_message_back() {
131        let mut s = BoundedStash::<u32>::new(1, StashOverflow::Reject);
132        s.stash(1);
133        let r = s.stash(99);
134        assert_eq!(r, StashResult::Rejected(99));
135        assert_eq!(s.len(), 1);
136    }
137
138    #[test]
139    fn drop_oldest_displaces() {
140        let mut s = BoundedStash::<u32>::new(2, StashOverflow::DropOldest);
141        s.stash(1);
142        s.stash(2);
143        let r = s.stash(3);
144        assert_eq!(r, StashResult::DroppedOldest(1));
145        let drained = s.unstash_all();
146        assert_eq!(drained, vec![2, 3]);
147    }
148
149    #[test]
150    fn drop_newest_keeps_old() {
151        let mut s = BoundedStash::<u32>::new(2, StashOverflow::DropNewest);
152        s.stash(1);
153        s.stash(2);
154        let r = s.stash(3);
155        assert_eq!(r, StashResult::DroppedNewest);
156        let drained = s.unstash_all();
157        assert_eq!(drained, vec![1, 2]);
158    }
159
160    #[test]
161    fn unstash_all_drains_in_order() {
162        let mut s = BoundedStash::<u32>::new(4, StashOverflow::Reject);
163        for i in 1..=4 {
164            s.stash(i);
165        }
166        let drained = s.unstash_all();
167        assert_eq!(drained, vec![1, 2, 3, 4]);
168        assert!(s.is_empty());
169    }
170
171    #[test]
172    #[should_panic]
173    fn zero_capacity_panics() {
174        let _: BoundedStash<u32> = BoundedStash::new(0, StashOverflow::Reject);
175    }
176}