1use tokio::sync::mpsc;
7
8#[derive(Debug, Clone, Copy)]
10pub enum MailboxConfig {
11 Unbounded,
13 Bounded {
15 capacity: usize,
17 },
18}
19
20impl Default for MailboxConfig {
21 fn default() -> Self {
22 Self::Bounded { capacity: 100 }
23 }
24}
25
26impl MailboxConfig {
27 pub const fn unbounded() -> Self {
29 Self::Unbounded
30 }
31
32 #[must_use]
34 pub fn bounded(capacity: usize) -> Self {
35 assert!(capacity > 0, "mailbox capacity must be > 0");
36 Self::Bounded { capacity }
37 }
38}
39
40#[derive(Clone)]
42pub struct MailboxHandle {
43 tx: MailboxSender,
44 worker_id: String,
45}
46
47enum MailboxSender {
48 Bounded(mpsc::Sender<String>),
49 Unbounded(mpsc::UnboundedSender<String>),
50}
51
52impl Clone for MailboxSender {
53 fn clone(&self) -> Self {
54 match self {
55 Self::Bounded(tx) => Self::Bounded(tx.clone()),
56 Self::Unbounded(tx) => Self::Unbounded(tx.clone()),
57 }
58 }
59}
60
61impl MailboxHandle {
62 pub(crate) fn new_bounded(tx: mpsc::Sender<String>, worker_id: impl Into<String>) -> Self {
63 Self {
64 tx: MailboxSender::Bounded(tx),
65 worker_id: worker_id.into(),
66 }
67 }
68
69 pub(crate) fn new_unbounded(
70 tx: mpsc::UnboundedSender<String>,
71 worker_id: impl Into<String>,
72 ) -> Self {
73 Self {
74 tx: MailboxSender::Unbounded(tx),
75 worker_id: worker_id.into(),
76 }
77 }
78
79 pub async fn send(&self, message: impl Into<String>) -> Result<(), SendError> {
85 let msg = message.into();
86 match &self.tx {
87 MailboxSender::Bounded(tx) => tx
88 .send(msg)
89 .await
90 .map_err(|_| SendError::Closed(self.worker_id.clone())),
91 MailboxSender::Unbounded(tx) => tx
92 .send(msg)
93 .map_err(|_| SendError::Closed(self.worker_id.clone())),
94 }
95 }
96
97 pub fn try_send(&self, message: impl Into<String>) -> Result<(), TrySendError> {
103 let msg = message.into();
104 match &self.tx {
105 MailboxSender::Bounded(tx) => tx.try_send(msg).map_err(|e| match e {
106 mpsc::error::TrySendError::Full(_) => TrySendError::Full,
107 mpsc::error::TrySendError::Closed(_) => {
108 TrySendError::Closed(self.worker_id.clone())
109 }
110 }),
111 MailboxSender::Unbounded(tx) => tx
112 .send(msg)
113 .map_err(|_| TrySendError::Closed(self.worker_id.clone())),
114 }
115 }
116
117 #[must_use]
119 pub fn worker_id(&self) -> &str {
120 &self.worker_id
121 }
122
123 #[must_use]
125 pub fn is_open(&self) -> bool {
126 match &self.tx {
127 MailboxSender::Bounded(tx) => !tx.is_closed(),
128 MailboxSender::Unbounded(tx) => !tx.is_closed(),
129 }
130 }
131}
132
133pub struct Mailbox {
135 rx: MailboxReceiver,
136}
137
138enum MailboxReceiver {
139 Bounded(mpsc::Receiver<String>),
140 Unbounded(mpsc::UnboundedReceiver<String>),
141}
142
143impl Mailbox {
144 pub(crate) fn new_bounded(rx: mpsc::Receiver<String>) -> Self {
145 Self {
146 rx: MailboxReceiver::Bounded(rx),
147 }
148 }
149
150 pub(crate) fn new_unbounded(rx: mpsc::UnboundedReceiver<String>) -> Self {
151 Self {
152 rx: MailboxReceiver::Unbounded(rx),
153 }
154 }
155
156 pub async fn recv(&mut self) -> Option<String> {
158 match &mut self.rx {
159 MailboxReceiver::Bounded(rx) => rx.recv().await,
160 MailboxReceiver::Unbounded(rx) => rx.recv().await,
161 }
162 }
163
164 pub fn try_recv(&mut self) -> Result<String, TryRecvError> {
170 match &mut self.rx {
171 MailboxReceiver::Bounded(rx) => rx.try_recv().map_err(|e| match e {
172 mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
173 mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
174 }),
175 MailboxReceiver::Unbounded(rx) => rx.try_recv().map_err(|e| match e {
176 mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
177 mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
178 }),
179 }
180 }
181}
182
183#[must_use]
185pub fn mailbox(config: MailboxConfig) -> (MailboxHandle, Mailbox) {
186 match config {
187 MailboxConfig::Unbounded => {
188 let (tx, rx) = mpsc::unbounded_channel();
189 (
190 MailboxHandle::new_unbounded(tx, "unnamed"),
191 Mailbox::new_unbounded(rx),
192 )
193 }
194 MailboxConfig::Bounded { capacity } => {
195 let (tx, rx) = mpsc::channel(capacity);
196 (
197 MailboxHandle::new_bounded(tx, "unnamed"),
198 Mailbox::new_bounded(rx),
199 )
200 }
201 }
202}
203
204pub fn mailbox_named(
206 config: MailboxConfig,
207 worker_id_input: impl Into<String>,
208) -> (MailboxHandle, Mailbox) {
209 let worker_id = worker_id_input.into();
210 match config {
211 MailboxConfig::Unbounded => {
212 let (tx, rx) = mpsc::unbounded_channel();
213 (
214 MailboxHandle::new_unbounded(tx, worker_id),
215 Mailbox::new_unbounded(rx),
216 )
217 }
218 MailboxConfig::Bounded { capacity } => {
219 let (tx, rx) = mpsc::channel(capacity);
220 (
221 MailboxHandle::new_bounded(tx, worker_id),
222 Mailbox::new_bounded(rx),
223 )
224 }
225 }
226}
227
228#[derive(Debug, Clone, thiserror::Error)]
230pub enum SendError {
231 #[error("worker '{0}' mailbox is closed")]
233 Closed(String),
234}
235
236#[derive(Debug, Clone, thiserror::Error)]
238pub enum TrySendError {
239 #[error("mailbox is full")]
241 Full,
242 #[error("worker '{0}' mailbox is closed")]
244 Closed(String),
245}
246
247#[derive(Debug, Clone, thiserror::Error)]
249pub enum TryRecvError {
250 #[error("mailbox is empty")]
252 Empty,
253 #[error("mailbox is disconnected")]
255 Disconnected,
256}