distant_net/client/channel/
mailbox.rs1use std::collections::HashMap;
2use std::sync::{Arc, Weak};
3use std::time::Duration;
4
5use async_trait::async_trait;
6use tokio::sync::{mpsc, Mutex, RwLock};
7use tokio::{io, time};
8
9use crate::common::{Id, Response, UntypedResponse};
10
11#[derive(Clone, Debug)]
12pub struct PostOffice<T> {
13    mailboxes: Arc<Mutex<HashMap<Id, mpsc::Sender<T>>>>,
14    default_box: Arc<RwLock<Option<mpsc::Sender<T>>>>,
15}
16
17impl<T> Default for PostOffice<T>
18where
19    T: Send + 'static,
20{
21    fn default() -> Self {
23        Self::new(Duration::from_secs(60))
24    }
25}
26
27impl<T> PostOffice<T>
28where
29    T: Send + 'static,
30{
31    pub fn new(cleanup: Duration) -> Self {
34        let mailboxes = Arc::new(Mutex::new(HashMap::new()));
35        let mref = Arc::downgrade(&mailboxes);
36
37        tokio::spawn(async move {
39            while let Some(m) = Weak::upgrade(&mref) {
40                m.lock()
41                    .await
42                    .retain(|_id, tx: &mut mpsc::Sender<T>| !tx.is_closed());
43
44                drop(m);
47
48                time::sleep(cleanup).await;
50            }
51        });
52
53        Self {
54            mailboxes,
55            default_box: Arc::new(RwLock::new(None)),
56        }
57    }
58
59    pub async fn make_mailbox(&self, id: Id, buffer: usize) -> Mailbox<T> {
62        let (tx, rx) = mpsc::channel(buffer);
63        self.mailboxes.lock().await.insert(id.clone(), tx);
64
65        Mailbox {
66            id,
67            rx: Box::new(rx),
68        }
69    }
70
71    pub async fn deliver(&self, id: &Id, value: T) -> bool {
74        if let Some(tx) = self.mailboxes.lock().await.get_mut(id) {
75            let success = tx.send(value).await.is_ok();
76
77            if !success {
79                self.mailboxes.lock().await.remove(id);
80            }
81
82            success
83        } else if let Some(tx) = self.default_box.read().await.as_ref() {
84            tx.send(value).await.is_ok()
85        } else {
86            false
87        }
88    }
89
90    pub async fn assign_default_mailbox(&self, buffer: usize) -> Mailbox<T> {
93        let (tx, rx) = mpsc::channel(buffer);
94        *self.default_box.write().await = Some(tx);
95
96        Mailbox {
97            id: "".to_string(),
98            rx: Box::new(rx),
99        }
100    }
101
102    pub async fn remove_default_mailbox(&self) {
105        *self.default_box.write().await = None;
106    }
107
108    pub async fn has_default_mailbox(&self) -> bool {
111        self.default_box.read().await.is_some()
112    }
113
114    pub async fn cancel(&self, id: &Id) {
116        self.mailboxes.lock().await.remove(id);
117    }
118
119    pub async fn cancel_many(&self, ids: impl Iterator<Item = &Id>) {
121        let mut lock = self.mailboxes.lock().await;
122        for id in ids {
123            lock.remove(id);
124        }
125    }
126
127    pub async fn cancel_all(&self) {
129        self.mailboxes.lock().await.clear();
130    }
131}
132
133impl<T> PostOffice<Response<T>>
134where
135    T: Send + 'static,
136{
137    pub async fn deliver_response(&self, res: Response<T>) -> bool {
140        self.deliver(&res.origin_id.clone(), res).await
141    }
142}
143
144impl PostOffice<UntypedResponse<'static>> {
145    pub async fn deliver_untyped_response(&self, res: UntypedResponse<'static>) -> bool {
148        self.deliver(&res.origin_id.clone().into_owned(), res).await
149    }
150}
151
152pub enum MailboxTryNextError {
154    Empty,
155    Closed,
156}
157
158#[async_trait]
159trait MailboxReceiver: Send + Sync {
160    type Output;
161
162    fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError>;
163
164    async fn recv(&mut self) -> Option<Self::Output>;
165
166    fn close(&mut self);
167}
168
169#[async_trait]
170impl<T: Send> MailboxReceiver for mpsc::Receiver<T> {
171    type Output = T;
172
173    fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
174        match mpsc::Receiver::try_recv(self) {
175            Ok(x) => Ok(x),
176            Err(mpsc::error::TryRecvError::Empty) => Err(MailboxTryNextError::Empty),
177            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxTryNextError::Closed),
178        }
179    }
180
181    async fn recv(&mut self) -> Option<Self::Output> {
182        mpsc::Receiver::recv(self).await
183    }
184
185    fn close(&mut self) {
186        mpsc::Receiver::close(self)
187    }
188}
189
190struct MappedMailboxReceiver<T, U> {
191    rx: Box<dyn MailboxReceiver<Output = T>>,
192    f: Box<dyn Fn(T) -> U + Send + Sync>,
193}
194
195#[async_trait]
196impl<T: Send, U: Send> MailboxReceiver for MappedMailboxReceiver<T, U> {
197    type Output = U;
198
199    fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
200        match self.rx.try_recv() {
201            Ok(x) => Ok((self.f)(x)),
202            Err(x) => Err(x),
203        }
204    }
205
206    async fn recv(&mut self) -> Option<Self::Output> {
207        let value = self.rx.recv().await?;
208        Some((self.f)(value))
209    }
210
211    fn close(&mut self) {
212        self.rx.close()
213    }
214}
215
216struct MappedOptMailboxReceiver<T, U> {
217    rx: Box<dyn MailboxReceiver<Output = T>>,
218    f: Box<dyn Fn(T) -> Option<U> + Send + Sync>,
219}
220
221#[async_trait]
222impl<T: Send, U: Send> MailboxReceiver for MappedOptMailboxReceiver<T, U> {
223    type Output = U;
224
225    fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
226        match self.rx.try_recv() {
227            Ok(x) => match (self.f)(x) {
228                Some(x) => Ok(x),
229                None => Err(MailboxTryNextError::Empty),
230            },
231            Err(x) => Err(x),
232        }
233    }
234
235    async fn recv(&mut self) -> Option<Self::Output> {
236        loop {
239            let value = self.rx.recv().await?;
240            if let Some(x) = (self.f)(value) {
241                return Some(x);
242            }
243        }
244    }
245
246    fn close(&mut self) {
247        self.rx.close()
248    }
249}
250
251pub struct Mailbox<T> {
253    id: Id,
255
256    rx: Box<dyn MailboxReceiver<Output = T>>,
258}
259
260impl<T> Mailbox<T> {
261    pub fn id(&self) -> &Id {
263        &self.id
264    }
265
266    pub fn try_next(&mut self) -> Result<T, MailboxTryNextError> {
268        self.rx.try_recv()
269    }
270
271    pub async fn next(&mut self) -> Option<T> {
273        self.rx.recv().await
274    }
275
276    pub async fn next_timeout(&mut self, duration: Duration) -> io::Result<Option<T>> {
278        time::timeout(duration, self.next())
279            .await
280            .map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
281    }
282
283    pub fn close(&mut self) {
287        self.rx.close()
288    }
289}
290
291impl<T: Send + 'static> Mailbox<T> {
292    pub fn map<U: Send + 'static>(self, f: impl Fn(T) -> U + Send + Sync + 'static) -> Mailbox<U> {
294        Mailbox {
295            id: self.id,
296            rx: Box::new(MappedMailboxReceiver {
297                rx: self.rx,
298                f: Box::new(f),
299            }),
300        }
301    }
302
303    pub fn map_opt<U: Send + 'static>(
306        self,
307        f: impl Fn(T) -> Option<U> + Send + Sync + 'static,
308    ) -> Mailbox<U> {
309        Mailbox {
310            id: self.id,
311            rx: Box::new(MappedOptMailboxReceiver {
312                rx: self.rx,
313                f: Box::new(f),
314            }),
315        }
316    }
317}