1use std::{
5 error::Error,
6 fmt::Display,
7 sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TrySendError},
8};
9
10use crate::{Envelope, Handler, Message, Service};
11
12use tokio::sync::mpsc as tokio_mpsc;
13
14#[derive(Debug)]
18pub enum MailboxError {
19 SendChannelClosed,
20 NoResponse,
21 SendChannelFull,
22}
23
24impl Display for MailboxError {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 write!(f, "error sending message")
27 }
28}
29impl Error for MailboxError {}
30
31pub struct Mailbox<S: Service> {
32 sender: Sender<Envelope<S>>,
33}
34
35impl<S: Service> Mailbox<S> {
36 pub fn create() -> (Self, Receiver<Envelope<S>>) {
37 let (sender, receiver) = channel();
38 (Mailbox { sender }, receiver)
39 }
40
41 pub fn send_and_forget<M>(&self, message: M) -> Result<(), MailboxError>
42 where
43 M: Message,
44 S: Handler<M>,
45 {
46 self.sender
47 .send(Envelope::wrap(message))
48 .map_err(|_e| MailboxError::SendChannelClosed)
49 }
50
51 pub fn send_and_wait_for_reply<M>(&self, message: M) -> Result<M::Reply, MailboxError>
52 where
53 M: Message,
54 S: Handler<M>,
55 {
56 let (envelope, ack_receiver) = Envelope::wrap_with_reply(message);
57
58 self.sender
59 .send(envelope)
60 .map_err(|_e| MailboxError::SendChannelClosed)?;
61
62 ack_receiver.recv().map_err(|_e| MailboxError::NoResponse)
63 }
64}
65
66impl<S: Service> Clone for Mailbox<S> {
67 fn clone(&self) -> Self {
68 Mailbox {
69 sender: self.sender.clone(),
70 }
71 }
72}
73
74pub struct BoundedMailbox<S: Service> {
75 sender: SyncSender<Envelope<S>>,
76}
77
78impl<S: Service> BoundedMailbox<S> {
79 pub fn create(channel_size: usize) -> (Self, Receiver<Envelope<S>>) {
80 let (sender, receiver) = sync_channel(channel_size);
81 (BoundedMailbox { sender }, receiver)
82 }
83
84 pub fn send_and_forget<M>(&self, message: M) -> Result<(), MailboxError>
85 where
86 M: Message,
87 S: Handler<M>,
88 {
89 self.sender
90 .try_send(Envelope::wrap(message))
91 .map_err(|e| match e {
92 std::sync::mpsc::TrySendError::Full(_) => MailboxError::SendChannelFull,
93 std::sync::mpsc::TrySendError::Disconnected(_) => MailboxError::SendChannelClosed,
94 })
95 }
96
97 pub fn send_and_wait_for_reply<M>(&self, message: M) -> Result<M::Reply, MailboxError>
98 where
99 M: Message,
100 S: Handler<M>,
101 {
102 let (envelope, ack_receiver) = Envelope::wrap_with_reply(message);
103
104 self.sender.try_send(envelope).map_err(|e| match e {
105 TrySendError::Full(_) => MailboxError::SendChannelFull,
106 TrySendError::Disconnected(_) => MailboxError::SendChannelClosed,
107 })?;
108
109 ack_receiver.recv().map_err(|_e| MailboxError::NoResponse)
110 }
111}
112
113impl<S: Service> Clone for BoundedMailbox<S> {
114 fn clone(&self) -> Self {
115 BoundedMailbox {
116 sender: self.sender.clone(),
117 }
118 }
119}
120
121pub struct BoundedTaskMailbox<S: Service> {
122 sender: tokio_mpsc::Sender<Envelope<S>>,
123}
124
125impl<S: Service> BoundedTaskMailbox<S> {
126 pub fn create(channel_size: usize) -> (Self, tokio_mpsc::Receiver<Envelope<S>>) {
127 let (sender, receiver) = tokio_mpsc::channel(channel_size);
128 (BoundedTaskMailbox { sender }, receiver)
129 }
130
131 pub fn send_and_forget<M>(&self, message: M) -> Result<(), MailboxError>
132 where
133 M: Message,
134 S: Handler<M>,
135 {
136 self.sender
137 .try_send(Envelope::wrap(message))
138 .map_err(|e| match e {
139 tokio_mpsc::error::TrySendError::Full(_) => MailboxError::SendChannelFull,
140 tokio_mpsc::error::TrySendError::Closed(_) => MailboxError::SendChannelClosed,
141 })
142 }
143
144 pub fn send_and_wait_for_reply<M>(&self, message: M) -> Result<M::Reply, MailboxError>
145 where
146 M: Message,
147 S: Handler<M>,
148 {
149 let (envelope, ack_receiver) = Envelope::wrap_with_reply(message);
150
151 self.sender.try_send(envelope).map_err(|e| match e {
152 tokio_mpsc::error::TrySendError::Full(_) => MailboxError::SendChannelFull,
153 tokio_mpsc::error::TrySendError::Closed(_) => MailboxError::SendChannelClosed,
154 })?;
155
156 ack_receiver.recv().map_err(|_e| MailboxError::NoResponse)
157 }
158}
159
160impl<S: Service> Clone for BoundedTaskMailbox<S> {
161 fn clone(&self) -> Self {
162 BoundedTaskMailbox {
163 sender: self.sender.clone(),
164 }
165 }
166}