Skip to main content

atomr_core/actor/
stash.rs

1//! Stash — buffers messages that should be deferred until `unstash_all`.
2//! akka.net: `Actor/Stash/*`.
3//!
4//! Phase 3.6 of `docs/full-port-plan.md`. Two layers:
5//!
6//! * The unbounded stash storage on [`crate::actor::Context`] is the
7//!   default and matches the legacy API.
8//! * [`BoundedStash`] is a free-standing bounded buffer with a
9//!   pluggable [`StashOverflow`] policy. Actor authors can hold one
10//!   per actor instance for back-pressure-aware stashing.
11//!
12//! `Stash` (marker trait) stays for symmetry with akka.net's
13//! `IWithStash`.
14
15use std::collections::VecDeque;
16
17/// Marker — any actor may opt in to document stash usage.
18/// Stash storage itself is provided unconditionally by `Context`.
19pub trait Stash {}
20
21/// What to do when the stash is full and a new message arrives.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23#[non_exhaustive]
24pub enum StashOverflow {
25    /// Drop the oldest stashed message; queue the new one.
26    DropOldest,
27    /// Drop the new message.
28    DropNewest,
29    /// Drop the new message AND surface it to the caller as an error
30    /// (so the runtime can route it to DeadLetters).
31    Reject,
32}
33
34/// Result of [`BoundedStash::stash`].
35#[derive(Debug, Clone, PartialEq, Eq)]
36#[non_exhaustive]
37pub enum StashResult<M> {
38    /// Stashed successfully; depth is the new buffer length.
39    Stashed { depth: usize },
40    /// `Reject` policy refused the message; caller must route it
41    /// (e.g. publish on DeadLetters).
42    Rejected(M),
43    /// `DropOldest` displaced an existing message; caller may want
44    /// to surface it on DeadLetters.
45    DroppedOldest(M),
46    /// `DropNewest` discarded the incoming message — depth unchanged.
47    DroppedNewest,
48}
49
50/// Bounded stash with a configurable overflow policy.
51pub struct BoundedStash<M> {
52    capacity: usize,
53    policy: StashOverflow,
54    buf: VecDeque<M>,
55}
56
57impl<M> BoundedStash<M> {
58    pub fn new(capacity: usize, policy: StashOverflow) -> Self {
59        assert!(capacity >= 1, "capacity must be >= 1");
60        Self { capacity, policy, buf: VecDeque::with_capacity(capacity) }
61    }
62
63    pub fn capacity(&self) -> usize {
64        self.capacity
65    }
66
67    pub fn len(&self) -> usize {
68        self.buf.len()
69    }
70
71    pub fn is_empty(&self) -> bool {
72        self.buf.is_empty()
73    }
74
75    pub fn is_full(&self) -> bool {
76        self.buf.len() >= self.capacity
77    }
78
79    /// Stash `msg`, applying the configured overflow policy if
80    /// the buffer is full.
81    pub fn stash(&mut self, msg: M) -> StashResult<M> {
82        if !self.is_full() {
83            self.buf.push_back(msg);
84            return StashResult::Stashed { depth: self.buf.len() };
85        }
86        match self.policy {
87            StashOverflow::DropOldest => {
88                let dropped = self.buf.pop_front();
89                self.buf.push_back(msg);
90                match dropped {
91                    Some(old) => StashResult::DroppedOldest(old),
92                    None => StashResult::Stashed { depth: self.buf.len() },
93                }
94            }
95            StashOverflow::DropNewest => StashResult::DroppedNewest,
96            StashOverflow::Reject => StashResult::Rejected(msg),
97        }
98    }
99
100    /// Drain the stash front-to-back. Maintains akka.net's
101    /// "messages prepended in order" semantic — caller front-prepends
102    /// to the mailbox.
103    pub fn unstash_all(&mut self) -> Vec<M> {
104        let mut out = Vec::with_capacity(self.buf.len());
105        while let Some(m) = self.buf.pop_front() {
106            out.push(m);
107        }
108        out
109    }
110
111    /// Pop a single stashed message (oldest first).
112    pub fn pop(&mut self) -> Option<M> {
113        self.buf.pop_front()
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    #[test]
122    fn stash_until_capacity() {
123        let mut s = BoundedStash::<u32>::new(3, StashOverflow::Reject);
124        assert!(matches!(s.stash(1), StashResult::Stashed { depth: 1 }));
125        assert!(matches!(s.stash(2), StashResult::Stashed { depth: 2 }));
126        assert!(matches!(s.stash(3), StashResult::Stashed { depth: 3 }));
127        assert!(s.is_full());
128    }
129
130    #[test]
131    fn reject_returns_message_back() {
132        let mut s = BoundedStash::<u32>::new(1, StashOverflow::Reject);
133        s.stash(1);
134        let r = s.stash(99);
135        assert_eq!(r, StashResult::Rejected(99));
136        assert_eq!(s.len(), 1);
137    }
138
139    #[test]
140    fn drop_oldest_displaces() {
141        let mut s = BoundedStash::<u32>::new(2, StashOverflow::DropOldest);
142        s.stash(1);
143        s.stash(2);
144        let r = s.stash(3);
145        assert_eq!(r, StashResult::DroppedOldest(1));
146        let drained = s.unstash_all();
147        assert_eq!(drained, vec![2, 3]);
148    }
149
150    #[test]
151    fn drop_newest_keeps_old() {
152        let mut s = BoundedStash::<u32>::new(2, StashOverflow::DropNewest);
153        s.stash(1);
154        s.stash(2);
155        let r = s.stash(3);
156        assert_eq!(r, StashResult::DroppedNewest);
157        let drained = s.unstash_all();
158        assert_eq!(drained, vec![1, 2]);
159    }
160
161    #[test]
162    fn unstash_all_drains_in_order() {
163        let mut s = BoundedStash::<u32>::new(4, StashOverflow::Reject);
164        for i in 1..=4 {
165            s.stash(i);
166        }
167        let drained = s.unstash_all();
168        assert_eq!(drained, vec![1, 2, 3, 4]);
169        assert!(s.is_empty());
170    }
171
172    #[test]
173    #[should_panic]
174    fn zero_capacity_panics() {
175        let _: BoundedStash<u32> = BoundedStash::new(0, StashOverflow::Reject);
176    }
177}