1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::Duration;

use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex, RwLock};
use tokio::{io, time};

use crate::common::{Id, Response, UntypedResponse};

#[derive(Clone, Debug)]
pub struct PostOffice<T> {
    mailboxes: Arc<Mutex<HashMap<Id, mpsc::Sender<T>>>>,
    default_box: Arc<RwLock<Option<mpsc::Sender<T>>>>,
}

impl<T> Default for PostOffice<T>
where
    T: Send + 'static,
{
    /// Creates a new postoffice with a cleanup interval of 60s
    fn default() -> Self {
        Self::new(Duration::from_secs(60))
    }
}

impl<T> PostOffice<T>
where
    T: Send + 'static,
{
    /// Creates a new post office that delivers to mailboxes, cleaning up orphaned mailboxes
    /// waiting `cleanup` time inbetween attempts
    pub fn new(cleanup: Duration) -> Self {
        let mailboxes = Arc::new(Mutex::new(HashMap::new()));
        let mref = Arc::downgrade(&mailboxes);

        // Spawn a task that will clean up orphaned mailboxes every minute
        tokio::spawn(async move {
            while let Some(m) = Weak::upgrade(&mref) {
                m.lock()
                    .await
                    .retain(|_id, tx: &mut mpsc::Sender<T>| !tx.is_closed());

                // NOTE: Must drop the reference before sleeping, otherwise we block
                //       access to the mailbox map elsewhere and deadlock!
                drop(m);

                // Wait a minute before trying again
                time::sleep(cleanup).await;
            }
        });

        Self {
            mailboxes,
            default_box: Arc::new(RwLock::new(None)),
        }
    }

    /// Creates a new mailbox using the given id and buffer size for maximum values that
    /// can be queued in the mailbox
    pub async fn make_mailbox(&self, id: Id, buffer: usize) -> Mailbox<T> {
        let (tx, rx) = mpsc::channel(buffer);
        self.mailboxes.lock().await.insert(id.clone(), tx);

        Mailbox {
            id,
            rx: Box::new(rx),
        }
    }

    /// Delivers some value to appropriate mailbox, returning false if no mailbox is found
    /// for the specified id or if the mailbox is no longer receiving values
    pub async fn deliver(&self, id: &Id, value: T) -> bool {
        if let Some(tx) = self.mailboxes.lock().await.get_mut(id) {
            let success = tx.send(value).await.is_ok();

            // If failed, we want to remove the mailbox sender as it is no longer valid
            if !success {
                self.mailboxes.lock().await.remove(id);
            }

            success
        } else if let Some(tx) = self.default_box.read().await.as_ref() {
            tx.send(value).await.is_ok()
        } else {
            false
        }
    }

    /// Creates a new default mailbox that will be used whenever no mailbox is found to deliver
    /// mail. This will replace any existing default mailbox.
    pub async fn assign_default_mailbox(&self, buffer: usize) -> Mailbox<T> {
        let (tx, rx) = mpsc::channel(buffer);
        *self.default_box.write().await = Some(tx);

        Mailbox {
            id: "".to_string(),
            rx: Box::new(rx),
        }
    }

    /// Removes the default mailbox such that any mail without a matching mailbox will be dropped
    /// instead of being delivered to a default mailbox.
    pub async fn remove_default_mailbox(&self) {
        *self.default_box.write().await = None;
    }

    /// Returns true if the post office is using a default mailbox for all mail that does not map
    /// to another mailbox.
    pub async fn has_default_mailbox(&self) -> bool {
        self.default_box.read().await.is_some()
    }

    /// Cancels delivery to the mailbox with the specified `id`.
    pub async fn cancel(&self, id: &Id) {
        self.mailboxes.lock().await.remove(id);
    }

    /// Cancels delivery to the mailboxes with the specified `id`s.
    pub async fn cancel_many(&self, ids: impl Iterator<Item = &Id>) {
        let mut lock = self.mailboxes.lock().await;
        for id in ids {
            lock.remove(id);
        }
    }

    /// Cancels delivery to all mailboxes.
    pub async fn cancel_all(&self) {
        self.mailboxes.lock().await.clear();
    }
}

impl<T> PostOffice<Response<T>>
where
    T: Send + 'static,
{
    /// Delivers some response to appropriate mailbox, returning false if no mailbox is found
    /// for the response's origin or if the mailbox is no longer receiving values
    pub async fn deliver_response(&self, res: Response<T>) -> bool {
        self.deliver(&res.origin_id.clone(), res).await
    }
}

impl PostOffice<UntypedResponse<'static>> {
    /// Delivers some response to appropriate mailbox, returning false if no mailbox is found
    /// for the response's origin or if the mailbox is no longer receiving values
    pub async fn deliver_untyped_response(&self, res: UntypedResponse<'static>) -> bool {
        self.deliver(&res.origin_id.clone().into_owned(), res).await
    }
}

/// Error encountered when invoking [`try_recv`] for [`MailboxReceiver`].
pub enum MailboxTryNextError {
    Empty,
    Closed,
}

#[async_trait]
trait MailboxReceiver: Send + Sync {
    type Output;

    fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError>;

    async fn recv(&mut self) -> Option<Self::Output>;

    fn close(&mut self);
}

#[async_trait]
impl<T: Send> MailboxReceiver for mpsc::Receiver<T> {
    type Output = T;

    fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
        match mpsc::Receiver::try_recv(self) {
            Ok(x) => Ok(x),
            Err(mpsc::error::TryRecvError::Empty) => Err(MailboxTryNextError::Empty),
            Err(mpsc::error::TryRecvError::Disconnected) => Err(MailboxTryNextError::Closed),
        }
    }

    async fn recv(&mut self) -> Option<Self::Output> {
        mpsc::Receiver::recv(self).await
    }

    fn close(&mut self) {
        mpsc::Receiver::close(self)
    }
}

struct MappedMailboxReceiver<T, U> {
    rx: Box<dyn MailboxReceiver<Output = T>>,
    f: Box<dyn Fn(T) -> U + Send + Sync>,
}

#[async_trait]
impl<T: Send, U: Send> MailboxReceiver for MappedMailboxReceiver<T, U> {
    type Output = U;

    fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
        match self.rx.try_recv() {
            Ok(x) => Ok((self.f)(x)),
            Err(x) => Err(x),
        }
    }

    async fn recv(&mut self) -> Option<Self::Output> {
        let value = self.rx.recv().await?;
        Some((self.f)(value))
    }

    fn close(&mut self) {
        self.rx.close()
    }
}

struct MappedOptMailboxReceiver<T, U> {
    rx: Box<dyn MailboxReceiver<Output = T>>,
    f: Box<dyn Fn(T) -> Option<U> + Send + Sync>,
}

#[async_trait]
impl<T: Send, U: Send> MailboxReceiver for MappedOptMailboxReceiver<T, U> {
    type Output = U;

    fn try_recv(&mut self) -> Result<Self::Output, MailboxTryNextError> {
        match self.rx.try_recv() {
            Ok(x) => match (self.f)(x) {
                Some(x) => Ok(x),
                None => Err(MailboxTryNextError::Empty),
            },
            Err(x) => Err(x),
        }
    }

    async fn recv(&mut self) -> Option<Self::Output> {
        // Continually receive a new value and convert it to Option<U>
        // until Option<U> == Some(U) or we receive None from our inner receiver
        loop {
            let value = self.rx.recv().await?;
            if let Some(x) = (self.f)(value) {
                return Some(x);
            }
        }
    }

    fn close(&mut self) {
        self.rx.close()
    }
}

/// Represents a destination for responses
pub struct Mailbox<T> {
    /// Represents id associated with the mailbox
    id: Id,

    /// Underlying mailbox storage
    rx: Box<dyn MailboxReceiver<Output = T>>,
}

impl<T> Mailbox<T> {
    /// Represents id associated with the mailbox
    pub fn id(&self) -> &Id {
        &self.id
    }

    /// Tries to receive the next value in mailbox without blocking or waiting async
    pub fn try_next(&mut self) -> Result<T, MailboxTryNextError> {
        self.rx.try_recv()
    }

    /// Receives next value in mailbox
    pub async fn next(&mut self) -> Option<T> {
        self.rx.recv().await
    }

    /// Receives next value in mailbox, waiting up to duration before timing out
    pub async fn next_timeout(&mut self, duration: Duration) -> io::Result<Option<T>> {
        time::timeout(duration, self.next())
            .await
            .map_err(|x| io::Error::new(io::ErrorKind::TimedOut, x))
    }

    /// Closes the mailbox such that it will not receive any more values
    ///
    /// Any values already in the mailbox will still be returned via `next`
    pub fn close(&mut self) {
        self.rx.close()
    }
}

impl<T: Send + 'static> Mailbox<T> {
    /// Maps the results of each mailbox value into a new type `U`
    pub fn map<U: Send + 'static>(self, f: impl Fn(T) -> U + Send + Sync + 'static) -> Mailbox<U> {
        Mailbox {
            id: self.id,
            rx: Box::new(MappedMailboxReceiver {
                rx: self.rx,
                f: Box::new(f),
            }),
        }
    }

    /// Maps the results of each mailbox value into a new type `U` by returning an `Option<U>`
    /// where the option is `None` in the case that `T` cannot be converted into `U`
    pub fn map_opt<U: Send + 'static>(
        self,
        f: impl Fn(T) -> Option<U> + Send + Sync + 'static,
    ) -> Mailbox<U> {
        Mailbox {
            id: self.id,
            rx: Box::new(MappedOptMailboxReceiver {
                rx: self.rx,
                f: Box::new(f),
            }),
        }
    }
}