Skip to main content

ash_flare/
mailbox.rs

1//! Optional mailbox system for workers
2//!
3//! This module provides simple message-passing capabilities for workers,
4//! allowing them to receive string messages via a mailbox.
5
6use tokio::sync::mpsc;
7
8/// Configuration for mailbox capacity
9#[derive(Debug, Clone, Copy)]
10pub enum MailboxConfig {
11    /// Unbounded mailbox - no capacity limit
12    Unbounded,
13    /// Bounded mailbox with fixed capacity
14    Bounded {
15        /// Maximum number of messages the mailbox can hold
16        capacity: usize,
17    },
18}
19
20impl Default for MailboxConfig {
21    fn default() -> Self {
22        Self::Bounded { capacity: 100 }
23    }
24}
25
26impl MailboxConfig {
27    /// Create unbounded mailbox configuration
28    #[must_use]
29    pub const fn unbounded() -> Self {
30        Self::Unbounded
31    }
32
33    /// Create bounded mailbox configuration
34    ///
35    /// # Panics
36    ///
37    /// Panics if `capacity` is 0.
38    #[must_use]
39    pub fn bounded(capacity: usize) -> Self {
40        assert!(capacity > 0, "mailbox capacity must be > 0");
41        Self::Bounded { capacity }
42    }
43}
44
45/// Handle for sending messages to a mailbox
46#[derive(Clone)]
47pub struct MailboxHandle {
48    tx: MailboxSender,
49    worker_id: String,
50}
51
52enum MailboxSender {
53    Bounded(mpsc::Sender<String>),
54    Unbounded(mpsc::UnboundedSender<String>),
55}
56
57impl Clone for MailboxSender {
58    fn clone(&self) -> Self {
59        match self {
60            Self::Bounded(tx) => Self::Bounded(tx.clone()),
61            Self::Unbounded(tx) => Self::Unbounded(tx.clone()),
62        }
63    }
64}
65
66impl MailboxHandle {
67    pub(crate) fn new_bounded(tx: mpsc::Sender<String>, worker_id: impl Into<String>) -> Self {
68        Self {
69            tx: MailboxSender::Bounded(tx),
70            worker_id: worker_id.into(),
71        }
72    }
73
74    pub(crate) fn new_unbounded(
75        tx: mpsc::UnboundedSender<String>,
76        worker_id: impl Into<String>,
77    ) -> Self {
78        Self {
79            tx: MailboxSender::Unbounded(tx),
80            worker_id: worker_id.into(),
81        }
82    }
83
84    /// Send a message to the worker (async, waits if mailbox full)
85    ///
86    /// # Errors
87    ///
88    /// Returns `SendError::Closed` if the receiver has been dropped.
89    pub async fn send(&self, message: impl Into<String>) -> Result<(), SendError> {
90        let msg = message.into();
91        match &self.tx {
92            MailboxSender::Bounded(tx) => tx
93                .send(msg)
94                .await
95                .map_err(|_| SendError::Closed(self.worker_id.clone())),
96            MailboxSender::Unbounded(tx) => tx
97                .send(msg)
98                .map_err(|_| SendError::Closed(self.worker_id.clone())),
99        }
100    }
101
102    /// Try to send a message without blocking
103    ///
104    /// # Errors
105    ///
106    /// Returns `TrySendError::Full` if the mailbox is full or `TrySendError::Closed` if the receiver has been dropped.
107    pub fn try_send(&self, message: impl Into<String>) -> Result<(), TrySendError> {
108        let msg = message.into();
109        match &self.tx {
110            MailboxSender::Bounded(tx) => tx.try_send(msg).map_err(|e| match e {
111                mpsc::error::TrySendError::Full(_) => TrySendError::Full,
112                mpsc::error::TrySendError::Closed(_) => {
113                    TrySendError::Closed(self.worker_id.clone())
114                }
115            }),
116            MailboxSender::Unbounded(tx) => tx
117                .send(msg)
118                .map_err(|_| TrySendError::Closed(self.worker_id.clone())),
119        }
120    }
121
122    /// Get the worker ID
123    #[must_use]
124    pub fn worker_id(&self) -> &str {
125        &self.worker_id
126    }
127
128    /// Check if the mailbox is still open
129    #[must_use]
130    pub fn is_open(&self) -> bool {
131        match &self.tx {
132            MailboxSender::Bounded(tx) => !tx.is_closed(),
133            MailboxSender::Unbounded(tx) => !tx.is_closed(),
134        }
135    }
136}
137
138/// Mailbox for receiving messages in a worker
139pub struct Mailbox {
140    rx: MailboxReceiver,
141}
142
143enum MailboxReceiver {
144    Bounded(mpsc::Receiver<String>),
145    Unbounded(mpsc::UnboundedReceiver<String>),
146}
147
148impl Mailbox {
149    pub(crate) fn new_bounded(rx: mpsc::Receiver<String>) -> Self {
150        Self {
151            rx: MailboxReceiver::Bounded(rx),
152        }
153    }
154
155    pub(crate) fn new_unbounded(rx: mpsc::UnboundedReceiver<String>) -> Self {
156        Self {
157            rx: MailboxReceiver::Unbounded(rx),
158        }
159    }
160
161    /// Receive a message from the mailbox
162    pub async fn recv(&mut self) -> Option<String> {
163        match &mut self.rx {
164            MailboxReceiver::Bounded(rx) => rx.recv().await,
165            MailboxReceiver::Unbounded(rx) => rx.recv().await,
166        }
167    }
168
169    /// Try to receive a message without blocking
170    ///
171    /// # Errors
172    ///
173    /// Returns `TryRecvError::Empty` if no messages available or `TryRecvError::Disconnected` if all senders dropped.
174    pub fn try_recv(&mut self) -> Result<String, TryRecvError> {
175        match &mut self.rx {
176            MailboxReceiver::Bounded(rx) => rx.try_recv().map_err(|e| match e {
177                mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
178                mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
179            }),
180            MailboxReceiver::Unbounded(rx) => rx.try_recv().map_err(|e| match e {
181                mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
182                mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
183            }),
184        }
185    }
186}
187
188/// Create a mailbox channel
189#[must_use]
190pub fn mailbox(config: MailboxConfig) -> (MailboxHandle, Mailbox) {
191    match config {
192        MailboxConfig::Unbounded => {
193            let (tx, rx) = mpsc::unbounded_channel();
194            (
195                MailboxHandle::new_unbounded(tx, "unnamed"),
196                Mailbox::new_unbounded(rx),
197            )
198        }
199        MailboxConfig::Bounded { capacity } => {
200            let (tx, rx) = mpsc::channel(capacity);
201            (
202                MailboxHandle::new_bounded(tx, "unnamed"),
203                Mailbox::new_bounded(rx),
204            )
205        }
206    }
207}
208
209/// Create a mailbox channel with named worker
210pub fn mailbox_named(
211    config: MailboxConfig,
212    worker_id_input: impl Into<String>,
213) -> (MailboxHandle, Mailbox) {
214    let worker_id = worker_id_input.into();
215    match config {
216        MailboxConfig::Unbounded => {
217            let (tx, rx) = mpsc::unbounded_channel();
218            (
219                MailboxHandle::new_unbounded(tx, worker_id),
220                Mailbox::new_unbounded(rx),
221            )
222        }
223        MailboxConfig::Bounded { capacity } => {
224            let (tx, rx) = mpsc::channel(capacity);
225            (
226                MailboxHandle::new_bounded(tx, worker_id),
227                Mailbox::new_bounded(rx),
228            )
229        }
230    }
231}
232
233/// Error when sending a message fails
234#[derive(Debug, Clone, thiserror::Error)]
235pub enum SendError {
236    /// The worker's mailbox is closed
237    #[error("worker '{0}' mailbox is closed")]
238    Closed(String),
239}
240
241/// Error when trying to send without blocking
242#[derive(Debug, Clone, thiserror::Error)]
243pub enum TrySendError {
244    /// The mailbox is full
245    #[error("mailbox is full")]
246    Full,
247    /// The worker's mailbox is closed
248    #[error("worker '{0}' mailbox is closed")]
249    Closed(String),
250}
251
252/// Error when trying to receive without blocking
253#[derive(Debug, Clone, thiserror::Error)]
254pub enum TryRecvError {
255    /// The mailbox is empty
256    #[error("mailbox is empty")]
257    Empty,
258    /// The mailbox is disconnected
259    #[error("mailbox is disconnected")]
260    Disconnected,
261}