1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use crate::{
address::{Address, Message},
errors::ReceiveError,
};
use futures::{channel::mpsc, StreamExt};
use std::future::Future;
pub const DEFAULT_CAPACITY: usize = 128;
#[derive(Debug)]
pub struct Mailbox<Input> {
stopped: bool,
receiver: mpsc::Receiver<Message<Input>>,
address: Address<Input>,
}
impl<Input> Default for Mailbox<Input> {
fn default() -> Self {
Self::new()
}
}
impl<Input> Mailbox<Input> {
pub fn new() -> Self {
Self::with_capacity(DEFAULT_CAPACITY)
}
pub fn with_capacity(capacity: usize) -> Self {
let (sender, receiver) = mpsc::channel(capacity);
let address = Address::new(sender);
let stopped = false;
Self {
stopped,
receiver,
address,
}
}
pub fn address(&self) -> Address<Input> {
self.address.clone()
}
pub async fn receive(&mut self) -> Result<Input, ReceiveError> {
if self.stopped {
return Err(ReceiveError::Stopped);
}
if let Some(message) = self.receiver.next().await {
match message {
Message::Message(input) => Ok(input),
Message::StopRequest => Err(ReceiveError::Stopped),
}
} else {
Err(ReceiveError::AllSendersDisconnected)
}
}
pub async fn run_with<F, Fut>(mut self, mut handler: F) -> Result<(), ReceiveError>
where
F: FnMut(Input) -> Fut,
Fut: Future<Output = ()>,
{
self.stopped = false;
while let Some(message) = self.receiver.next().await {
match message {
Message::Message(data) => {
handler(data).await;
}
Message::StopRequest => {
return Ok(());
}
}
}
Err(ReceiveError::AllSendersDisconnected)
}
pub fn resume(&mut self) {
self.stopped = false;
}
}