Skip to main content

ash_flare/
mailbox.rs

1//! Optional mailbox system for workers
2//!
3//! This module provides simple message-passing capabilities for workers,
4//! allowing them to receive string messages via a mailbox.
5
6use tokio::sync::mpsc;
7
8/// Configuration for mailbox capacity
9#[derive(Debug, Clone, Copy)]
10pub enum MailboxConfig {
11    /// Unbounded mailbox - no capacity limit
12    Unbounded,
13    /// Bounded mailbox with fixed capacity
14    Bounded {
15        /// Maximum number of messages the mailbox can hold
16        capacity: usize,
17    },
18}
19
20impl Default for MailboxConfig {
21    fn default() -> Self {
22        Self::Bounded { capacity: 100 }
23    }
24}
25
26impl MailboxConfig {
27    /// Create unbounded mailbox configuration
28    pub const fn unbounded() -> Self {
29        Self::Unbounded
30    }
31
32    /// Create bounded mailbox configuration
33    #[must_use]
34    pub fn bounded(capacity: usize) -> Self {
35        assert!(capacity > 0, "mailbox capacity must be > 0");
36        Self::Bounded { capacity }
37    }
38}
39
40/// Handle for sending messages to a mailbox
41#[derive(Clone)]
42pub struct MailboxHandle {
43    tx: MailboxSender,
44    worker_id: String,
45}
46
47enum MailboxSender {
48    Bounded(mpsc::Sender<String>),
49    Unbounded(mpsc::UnboundedSender<String>),
50}
51
52impl Clone for MailboxSender {
53    fn clone(&self) -> Self {
54        match self {
55            Self::Bounded(tx) => Self::Bounded(tx.clone()),
56            Self::Unbounded(tx) => Self::Unbounded(tx.clone()),
57        }
58    }
59}
60
61impl MailboxHandle {
62    pub(crate) fn new_bounded(tx: mpsc::Sender<String>, worker_id: impl Into<String>) -> Self {
63        Self {
64            tx: MailboxSender::Bounded(tx),
65            worker_id: worker_id.into(),
66        }
67    }
68
69    pub(crate) fn new_unbounded(
70        tx: mpsc::UnboundedSender<String>,
71        worker_id: impl Into<String>,
72    ) -> Self {
73        Self {
74            tx: MailboxSender::Unbounded(tx),
75            worker_id: worker_id.into(),
76        }
77    }
78
79    /// Send a message to the worker (async, waits if mailbox full)
80    ///
81    /// # Errors
82    ///
83    /// Returns `SendError::Closed` if the receiver has been dropped.
84    pub async fn send(&self, message: impl Into<String>) -> Result<(), SendError> {
85        let msg = message.into();
86        match &self.tx {
87            MailboxSender::Bounded(tx) => tx
88                .send(msg)
89                .await
90                .map_err(|_| SendError::Closed(self.worker_id.clone())),
91            MailboxSender::Unbounded(tx) => tx
92                .send(msg)
93                .map_err(|_| SendError::Closed(self.worker_id.clone())),
94        }
95    }
96
97    /// Try to send a message without blocking
98    ///
99    /// # Errors
100    ///
101    /// Returns `TrySendError::Full` if the mailbox is full or `TrySendError::Closed` if the receiver has been dropped.
102    pub fn try_send(&self, message: impl Into<String>) -> Result<(), TrySendError> {
103        let msg = message.into();
104        match &self.tx {
105            MailboxSender::Bounded(tx) => tx.try_send(msg).map_err(|e| match e {
106                mpsc::error::TrySendError::Full(_) => TrySendError::Full,
107                mpsc::error::TrySendError::Closed(_) => {
108                    TrySendError::Closed(self.worker_id.clone())
109                }
110            }),
111            MailboxSender::Unbounded(tx) => tx
112                .send(msg)
113                .map_err(|_| TrySendError::Closed(self.worker_id.clone())),
114        }
115    }
116
117    /// Get the worker ID
118    #[must_use]
119    pub fn worker_id(&self) -> &str {
120        &self.worker_id
121    }
122
123    /// Check if the mailbox is still open
124    #[must_use]
125    pub fn is_open(&self) -> bool {
126        match &self.tx {
127            MailboxSender::Bounded(tx) => !tx.is_closed(),
128            MailboxSender::Unbounded(tx) => !tx.is_closed(),
129        }
130    }
131}
132
133/// Mailbox for receiving messages in a worker
134pub struct Mailbox {
135    rx: MailboxReceiver,
136}
137
138enum MailboxReceiver {
139    Bounded(mpsc::Receiver<String>),
140    Unbounded(mpsc::UnboundedReceiver<String>),
141}
142
143impl Mailbox {
144    pub(crate) fn new_bounded(rx: mpsc::Receiver<String>) -> Self {
145        Self {
146            rx: MailboxReceiver::Bounded(rx),
147        }
148    }
149
150    pub(crate) fn new_unbounded(rx: mpsc::UnboundedReceiver<String>) -> Self {
151        Self {
152            rx: MailboxReceiver::Unbounded(rx),
153        }
154    }
155
156    /// Receive a message from the mailbox
157    pub async fn recv(&mut self) -> Option<String> {
158        match &mut self.rx {
159            MailboxReceiver::Bounded(rx) => rx.recv().await,
160            MailboxReceiver::Unbounded(rx) => rx.recv().await,
161        }
162    }
163
164    /// Try to receive a message without blocking
165    ///
166    /// # Errors
167    ///
168    /// Returns `TryRecvError::Empty` if no messages available or `TryRecvError::Disconnected` if all senders dropped.
169    pub fn try_recv(&mut self) -> Result<String, TryRecvError> {
170        match &mut self.rx {
171            MailboxReceiver::Bounded(rx) => rx.try_recv().map_err(|e| match e {
172                mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
173                mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
174            }),
175            MailboxReceiver::Unbounded(rx) => rx.try_recv().map_err(|e| match e {
176                mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
177                mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
178            }),
179        }
180    }
181}
182
183/// Create a mailbox channel
184#[must_use]
185pub fn mailbox(config: MailboxConfig) -> (MailboxHandle, Mailbox) {
186    match config {
187        MailboxConfig::Unbounded => {
188            let (tx, rx) = mpsc::unbounded_channel();
189            (
190                MailboxHandle::new_unbounded(tx, "unnamed"),
191                Mailbox::new_unbounded(rx),
192            )
193        }
194        MailboxConfig::Bounded { capacity } => {
195            let (tx, rx) = mpsc::channel(capacity);
196            (
197                MailboxHandle::new_bounded(tx, "unnamed"),
198                Mailbox::new_bounded(rx),
199            )
200        }
201    }
202}
203
204/// Create a mailbox channel with named worker
205pub fn mailbox_named(
206    config: MailboxConfig,
207    worker_id_input: impl Into<String>,
208) -> (MailboxHandle, Mailbox) {
209    let worker_id = worker_id_input.into();
210    match config {
211        MailboxConfig::Unbounded => {
212            let (tx, rx) = mpsc::unbounded_channel();
213            (
214                MailboxHandle::new_unbounded(tx, worker_id),
215                Mailbox::new_unbounded(rx),
216            )
217        }
218        MailboxConfig::Bounded { capacity } => {
219            let (tx, rx) = mpsc::channel(capacity);
220            (
221                MailboxHandle::new_bounded(tx, worker_id),
222                Mailbox::new_bounded(rx),
223            )
224        }
225    }
226}
227
228/// Error when sending a message fails
229#[derive(Debug, Clone, thiserror::Error)]
230pub enum SendError {
231    /// The worker's mailbox is closed
232    #[error("worker '{0}' mailbox is closed")]
233    Closed(String),
234}
235
236/// Error when trying to send without blocking
237#[derive(Debug, Clone, thiserror::Error)]
238pub enum TrySendError {
239    /// The mailbox is full
240    #[error("mailbox is full")]
241    Full,
242    /// The worker's mailbox is closed
243    #[error("worker '{0}' mailbox is closed")]
244    Closed(String),
245}
246
247/// Error when trying to receive without blocking
248#[derive(Debug, Clone, thiserror::Error)]
249pub enum TryRecvError {
250    /// The mailbox is empty
251    #[error("mailbox is empty")]
252    Empty,
253    /// The mailbox is disconnected
254    #[error("mailbox is disconnected")]
255    Disconnected,
256}