distant_net/client/channel/
mailbox.rs1use std::collections::HashMap;
2use std::sync::{Arc, Weak};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use tokio::sync::{mpsc, Mutex, RwLock};
7use tokio::{io, time};
8
9use crate::common::{Id, Response, UntypedResponse};
10
11#[derive(Clone, Debug)]
12pub struct PostOffice<T> {
13 mailboxes: Arc<Mutex<HashMap<Id, mpsc::Sender<T>>>>,
14 default_box: Arc<RwLock<Option<mpsc::Sender<T>>>>,
15}
16
17impl<T> Default for PostOffice<T>
18where
19 T: Send + 'static,
20{
21 fn default() -> Self {
23 Self::new(Duration::from_secs(60))
24 }
25}
26
27impl<T> PostOffice<T>
28where
29 T: Send + 'static,
30{
31 pub fn new(cleanup: Duration) -> Self {
34 let mailboxes = Arc::new(Mutex::new(HashMap::new()));
35 let mref = Arc::downgrade(&mailboxes);
36
37 tokio::spawn(async move {
39 while let Some(m) = Weak::upgrade(&mref) {
40 m.lock()
41 .await
42 .retain(|_id, tx: &mut mpsc::Sender<T>| !tx.is_closed());
43
44 drop(m);
47
48 time::sleep(cleanup).await;
50 }
51 });
52
53 Self {
54 mailboxes,
55 default_box: Arc::new(RwLock::new(None)),
56 }
57 }
58
59 pub async fn make_mailbox(&self, id: Id, buffer: usize) -> Mailbox<T> {
62 let (tx, rx) = mpsc::channel(buffer);
63 self.mailboxes.lock().await.insert(id.clone(), tx);
64
65 Mailbox {
66 id,
67 rx: Box::new(rx),
68 }
69 }
70
71 pub async fn deliver(&self, id: &Id, value: T) -> bool {
74 if let Some(tx) = self.mailboxes.lock().await.get_mut(id) {
75 let success = tx.send(value).await.is_ok();
76
77 if !success {
79 self.mailboxes.lock().await.remove(id);
80 }
81
82 success
83 } else if let Some(tx) = self.default_box.read().await.as_ref() {
84 tx.send(value).await.is_ok()
85 } else {
86 false
87 }
88 }
89
90 pub async fn assign_default_mailbox(&self, buffer: usize) -> Mailbox<T> {
93 let (tx, rx) = mpsc::channel(buffer);
94 *self.default_box.write().await = Some(tx);
95
96 Mailbox {
97 id: "".to_string(),
98 rx: Box::new(rx),
99 }
100 }
101
102 pub async fn remove_default_mailbox(&self) {
105 *self.default_box.write().await = None;
106 }
107
108 pub async fn has_default_mailbox(&self) -> bool {
111 self.default_box.read().await.is_some()
112 }
113
114 pub async fn cancel(&self, id: &Id) {
116 self.mailboxes.lock().await.remove(id);
117 }
118
119 pub async fn cancel_many(&self, ids: impl Iterator<Item = &Id>) {
121 let mut lock = self.mailboxes.lock().await;
122 for id in ids {
123 lock.remove(id);
124 }
125 }
126
127 pub async fn cancel_all(&self) {
129 self.mailboxes.lock().await.clear();
130 }
131}
132
133impl<T> PostOffice<Response<T>>
134where
135 T: Send + 'static,
136{
137 pub async fn deliver_response(&self, res: Response<T>) -> bool {
140 self.deliver(&res.origin_id.clone(), res).await
141 }
142}
143
144impl PostOffice<UntypedResponse<'static>> {
145 pub async fn deliver_untyped_response(&self, res: UntypedResponse<'static>) -> bool {
148 self.deliver(&res.origin_id.clone().into_owned(), res).await
149 }
150}
151
152pub enum MailboxTryNextError {
154 Empty,
155 Closed,
156}
157
158#[async_trait]
159trait MailboxReceiver: Send + Sync {
160 type Output;
161
162 fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError>;
163
164 async fn recv(&mut self) -> Option<Self::Output>;
165
166 fn close(&mut self);
167}
168
169#[async_trait]
170impl<T: Send> MailboxReceiver for mpsc::Receiver<T> {
171 type Output = T;
172
173 fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
174 match mpsc::Receiver::try_recv(self) {
175 Ok(x) => Ok(x),
176 Err(mpsc::error::TryRecvError::Empty) => Err(MailboxTryNextError::Empty),
177 Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxTryNextError::Closed),
178 }
179 }
180
181 async fn recv(&mut self) -> Option<Self::Output> {
182 mpsc::Receiver::recv(self).await
183 }
184
185 fn close(&mut self) {
186 mpsc::Receiver::close(self)
187 }
188}
189
190struct MappedMailboxReceiver<T, U> {
191 rx: Box<dyn MailboxReceiver<Output = T>>,
192 f: Box<dyn Fn(T) -> U + Send + Sync>,
193}
194
195#[async_trait]
196impl<T: Send, U: Send> MailboxReceiver for MappedMailboxReceiver<T, U> {
197 type Output = U;
198
199 fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
200 match self.rx.try_recv() {
201 Ok(x) => Ok((self.f)(x)),
202 Err(x) => Err(x),
203 }
204 }
205
206 async fn recv(&mut self) -> Option<Self::Output> {
207 let value = self.rx.recv().await?;
208 Some((self.f)(value))
209 }
210
211 fn close(&mut self) {
212 self.rx.close()
213 }
214}
215
216struct MappedOptMailboxReceiver<T, U> {
217 rx: Box<dyn MailboxReceiver<Output = T>>,
218 f: Box<dyn Fn(T) -> Option<U> + Send + Sync>,
219}
220
221#[async_trait]
222impl<T: Send, U: Send> MailboxReceiver for MappedOptMailboxReceiver<T, U> {
223 type Output = U;
224
225 fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
226 match self.rx.try_recv() {
227 Ok(x) => match (self.f)(x) {
228 Some(x) => Ok(x),
229 None => Err(MailboxTryNextError::Empty),
230 },
231 Err(x) => Err(x),
232 }
233 }
234
235 async fn recv(&mut self) -> Option<Self::Output> {
236 loop {
239 let value = self.rx.recv().await?;
240 if let Some(x) = (self.f)(value) {
241 return Some(x);
242 }
243 }
244 }
245
246 fn close(&mut self) {
247 self.rx.close()
248 }
249}
250
251pub struct Mailbox<T> {
253 id: Id,
255
256 rx: Box<dyn MailboxReceiver<Output = T>>,
258}
259
260impl<T> Mailbox<T> {
261 pub fn id(&self) -> &Id {
263 &self.id
264 }
265
266 pub fn try_next(&mut self) -> Result<T, MailboxTryNextError> {
268 self.rx.try_recv()
269 }
270
271 pub async fn next(&mut self) -> Option<T> {
273 self.rx.recv().await
274 }
275
276 pub async fn next_timeout(&mut self, duration: Duration) -> io::Result<Option<T>> {
278 time::timeout(duration, self.next())
279 .await
280 .map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
281 }
282
283 pub fn close(&mut self) {
287 self.rx.close()
288 }
289}
290
291impl<T: Send + 'static> Mailbox<T> {
292 pub fn map<U: Send + 'static>(self, f: impl Fn(T) -> U + Send + Sync + 'static) -> Mailbox<U> {
294 Mailbox {
295 id: self.id,
296 rx: Box::new(MappedMailboxReceiver {
297 rx: self.rx,
298 f: Box::new(f),
299 }),
300 }
301 }
302
303 pub fn map_opt<U: Send + 'static>(
306 self,
307 f: impl Fn(T) -> Option<U> + Send + Sync + 'static,
308 ) -> Mailbox<U> {
309 Mailbox {
310 id: self.id,
311 rx: Box::new(MappedOptMailboxReceiver {
312 rx: self.rx,
313 f: Box::new(f),
314 }),
315 }
316 }
317}