1use tokio::sync::mpsc;
7
8#[derive(Debug, Clone, Copy)]
10pub enum MailboxConfig {
11 Unbounded,
13 Bounded {
15 capacity: usize,
17 },
18}
19
20impl Default for MailboxConfig {
21 fn default() -> Self {
22 Self::Bounded { capacity: 100 }
23 }
24}
25
26impl MailboxConfig {
27 #[must_use]
29 pub const fn unbounded() -> Self {
30 Self::Unbounded
31 }
32
33 #[must_use]
39 pub fn bounded(capacity: usize) -> Self {
40 assert!(capacity > 0, "mailbox capacity must be > 0");
41 Self::Bounded { capacity }
42 }
43}
44
45#[derive(Clone)]
47pub struct MailboxHandle {
48 tx: MailboxSender,
49 worker_id: String,
50}
51
52enum MailboxSender {
53 Bounded(mpsc::Sender<String>),
54 Unbounded(mpsc::UnboundedSender<String>),
55}
56
57impl Clone for MailboxSender {
58 fn clone(&self) -> Self {
59 match self {
60 Self::Bounded(tx) => Self::Bounded(tx.clone()),
61 Self::Unbounded(tx) => Self::Unbounded(tx.clone()),
62 }
63 }
64}
65
66impl MailboxHandle {
67 pub(crate) fn new_bounded(tx: mpsc::Sender<String>, worker_id: impl Into<String>) -> Self {
68 Self {
69 tx: MailboxSender::Bounded(tx),
70 worker_id: worker_id.into(),
71 }
72 }
73
74 pub(crate) fn new_unbounded(
75 tx: mpsc::UnboundedSender<String>,
76 worker_id: impl Into<String>,
77 ) -> Self {
78 Self {
79 tx: MailboxSender::Unbounded(tx),
80 worker_id: worker_id.into(),
81 }
82 }
83
84 pub async fn send(&self, message: impl Into<String>) -> Result<(), SendError> {
90 let msg = message.into();
91 match &self.tx {
92 MailboxSender::Bounded(tx) => tx
93 .send(msg)
94 .await
95 .map_err(|_| SendError::Closed(self.worker_id.clone())),
96 MailboxSender::Unbounded(tx) => tx
97 .send(msg)
98 .map_err(|_| SendError::Closed(self.worker_id.clone())),
99 }
100 }
101
102 pub fn try_send(&self, message: impl Into<String>) -> Result<(), TrySendError> {
108 let msg = message.into();
109 match &self.tx {
110 MailboxSender::Bounded(tx) => tx.try_send(msg).map_err(|e| match e {
111 mpsc::error::TrySendError::Full(_) => TrySendError::Full,
112 mpsc::error::TrySendError::Closed(_) => {
113 TrySendError::Closed(self.worker_id.clone())
114 }
115 }),
116 MailboxSender::Unbounded(tx) => tx
117 .send(msg)
118 .map_err(|_| TrySendError::Closed(self.worker_id.clone())),
119 }
120 }
121
122 #[must_use]
124 pub fn worker_id(&self) -> &str {
125 &self.worker_id
126 }
127
128 #[must_use]
130 pub fn is_open(&self) -> bool {
131 match &self.tx {
132 MailboxSender::Bounded(tx) => !tx.is_closed(),
133 MailboxSender::Unbounded(tx) => !tx.is_closed(),
134 }
135 }
136}
137
138pub struct Mailbox {
140 rx: MailboxReceiver,
141}
142
143enum MailboxReceiver {
144 Bounded(mpsc::Receiver<String>),
145 Unbounded(mpsc::UnboundedReceiver<String>),
146}
147
148impl Mailbox {
149 pub(crate) fn new_bounded(rx: mpsc::Receiver<String>) -> Self {
150 Self {
151 rx: MailboxReceiver::Bounded(rx),
152 }
153 }
154
155 pub(crate) fn new_unbounded(rx: mpsc::UnboundedReceiver<String>) -> Self {
156 Self {
157 rx: MailboxReceiver::Unbounded(rx),
158 }
159 }
160
161 pub async fn recv(&mut self) -> Option<String> {
163 match &mut self.rx {
164 MailboxReceiver::Bounded(rx) => rx.recv().await,
165 MailboxReceiver::Unbounded(rx) => rx.recv().await,
166 }
167 }
168
169 pub fn try_recv(&mut self) -> Result<String, TryRecvError> {
175 match &mut self.rx {
176 MailboxReceiver::Bounded(rx) => rx.try_recv().map_err(|e| match e {
177 mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
178 mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
179 }),
180 MailboxReceiver::Unbounded(rx) => rx.try_recv().map_err(|e| match e {
181 mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
182 mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
183 }),
184 }
185 }
186}
187
188#[must_use]
190pub fn mailbox(config: MailboxConfig) -> (MailboxHandle, Mailbox) {
191 match config {
192 MailboxConfig::Unbounded => {
193 let (tx, rx) = mpsc::unbounded_channel();
194 (
195 MailboxHandle::new_unbounded(tx, "unnamed"),
196 Mailbox::new_unbounded(rx),
197 )
198 }
199 MailboxConfig::Bounded { capacity } => {
200 let (tx, rx) = mpsc::channel(capacity);
201 (
202 MailboxHandle::new_bounded(tx, "unnamed"),
203 Mailbox::new_bounded(rx),
204 )
205 }
206 }
207}
208
209pub fn mailbox_named(
211 config: MailboxConfig,
212 worker_id_input: impl Into<String>,
213) -> (MailboxHandle, Mailbox) {
214 let worker_id = worker_id_input.into();
215 match config {
216 MailboxConfig::Unbounded => {
217 let (tx, rx) = mpsc::unbounded_channel();
218 (
219 MailboxHandle::new_unbounded(tx, worker_id),
220 Mailbox::new_unbounded(rx),
221 )
222 }
223 MailboxConfig::Bounded { capacity } => {
224 let (tx, rx) = mpsc::channel(capacity);
225 (
226 MailboxHandle::new_bounded(tx, worker_id),
227 Mailbox::new_bounded(rx),
228 )
229 }
230 }
231}
232
233#[derive(Debug, Clone, thiserror::Error)]
235pub enum SendError {
236 #[error("worker '{0}' mailbox is closed")]
238 Closed(String),
239}
240
241#[derive(Debug, Clone, thiserror::Error)]
243pub enum TrySendError {
244 #[error("mailbox is full")]
246 Full,
247 #[error("worker '{0}' mailbox is closed")]
249 Closed(String),
250}
251
252#[derive(Debug, Clone, thiserror::Error)]
254pub enum TryRecvError {
255 #[error("mailbox is empty")]
257 Empty,
258 #[error("mailbox is disconnected")]
260 Disconnected,
261}