memfault_ssf/
mailbox.rs

1//
2// Copyright (c) Memfault, Inc.
3// See License.txt for details
4use std::{
5    error::Error,
6    fmt::Display,
7    sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender, TrySendError},
8};
9
10use crate::{Envelope, Handler, Message, Service};
11
12use tokio::sync::mpsc as tokio_mpsc;
13
14/// The only reason for a message to fail to send is if the receiver channel is closed.
15// An improvement would be to return the message back to the sender (the
16// channel does it but after we wrap it in an envelope, it's complicated...)
17#[derive(Debug)]
18pub enum MailboxError {
19    SendChannelClosed,
20    NoResponse,
21    SendChannelFull,
22}
23
24impl Display for MailboxError {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        write!(f, "error sending message")
27    }
28}
29impl Error for MailboxError {}
30
31pub struct Mailbox<S: Service> {
32    sender: Sender<Envelope<S>>,
33}
34
35impl<S: Service> Mailbox<S> {
36    pub fn create() -> (Self, Receiver<Envelope<S>>) {
37        let (sender, receiver) = channel();
38        (Mailbox { sender }, receiver)
39    }
40
41    pub fn send_and_forget<M>(&self, message: M) -> Result<(), MailboxError>
42    where
43        M: Message,
44        S: Handler<M>,
45    {
46        self.sender
47            .send(Envelope::wrap(message))
48            .map_err(|_e| MailboxError::SendChannelClosed)
49    }
50
51    pub fn send_and_wait_for_reply<M>(&self, message: M) -> Result<M::Reply, MailboxError>
52    where
53        M: Message,
54        S: Handler<M>,
55    {
56        let (envelope, ack_receiver) = Envelope::wrap_with_reply(message);
57
58        self.sender
59            .send(envelope)
60            .map_err(|_e| MailboxError::SendChannelClosed)?;
61
62        ack_receiver.recv().map_err(|_e| MailboxError::NoResponse)
63    }
64}
65
66impl<S: Service> Clone for Mailbox<S> {
67    fn clone(&self) -> Self {
68        Mailbox {
69            sender: self.sender.clone(),
70        }
71    }
72}
73
74pub struct BoundedMailbox<S: Service> {
75    sender: SyncSender<Envelope<S>>,
76}
77
78impl<S: Service> BoundedMailbox<S> {
79    pub fn create(channel_size: usize) -> (Self, Receiver<Envelope<S>>) {
80        let (sender, receiver) = sync_channel(channel_size);
81        (BoundedMailbox { sender }, receiver)
82    }
83
84    pub fn send_and_forget<M>(&self, message: M) -> Result<(), MailboxError>
85    where
86        M: Message,
87        S: Handler<M>,
88    {
89        self.sender
90            .try_send(Envelope::wrap(message))
91            .map_err(|e| match e {
92                std::sync::mpsc::TrySendError::Full(_) => MailboxError::SendChannelFull,
93                std::sync::mpsc::TrySendError::Disconnected(_) => MailboxError::SendChannelClosed,
94            })
95    }
96
97    pub fn send_and_wait_for_reply<M>(&self, message: M) -> Result<M::Reply, MailboxError>
98    where
99        M: Message,
100        S: Handler<M>,
101    {
102        let (envelope, ack_receiver) = Envelope::wrap_with_reply(message);
103
104        self.sender.try_send(envelope).map_err(|e| match e {
105            TrySendError::Full(_) => MailboxError::SendChannelFull,
106            TrySendError::Disconnected(_) => MailboxError::SendChannelClosed,
107        })?;
108
109        ack_receiver.recv().map_err(|_e| MailboxError::NoResponse)
110    }
111}
112
113impl<S: Service> Clone for BoundedMailbox<S> {
114    fn clone(&self) -> Self {
115        BoundedMailbox {
116            sender: self.sender.clone(),
117        }
118    }
119}
120
121pub struct BoundedTaskMailbox<S: Service> {
122    sender: tokio_mpsc::Sender<Envelope<S>>,
123}
124
125impl<S: Service> BoundedTaskMailbox<S> {
126    pub fn create(channel_size: usize) -> (Self, tokio_mpsc::Receiver<Envelope<S>>) {
127        let (sender, receiver) = tokio_mpsc::channel(channel_size);
128        (BoundedTaskMailbox { sender }, receiver)
129    }
130
131    pub fn send_and_forget<M>(&self, message: M) -> Result<(), MailboxError>
132    where
133        M: Message,
134        S: Handler<M>,
135    {
136        self.sender
137            .try_send(Envelope::wrap(message))
138            .map_err(|e| match e {
139                tokio_mpsc::error::TrySendError::Full(_) => MailboxError::SendChannelFull,
140                tokio_mpsc::error::TrySendError::Closed(_) => MailboxError::SendChannelClosed,
141            })
142    }
143
144    pub fn send_and_wait_for_reply<M>(&self, message: M) -> Result<M::Reply, MailboxError>
145    where
146        M: Message,
147        S: Handler<M>,
148    {
149        let (envelope, ack_receiver) = Envelope::wrap_with_reply(message);
150
151        self.sender.try_send(envelope).map_err(|e| match e {
152            tokio_mpsc::error::TrySendError::Full(_) => MailboxError::SendChannelFull,
153            tokio_mpsc::error::TrySendError::Closed(_) => MailboxError::SendChannelClosed,
154        })?;
155
156        ack_receiver.recv().map_err(|_e| MailboxError::NoResponse)
157    }
158}
159
160impl<S: Service> Clone for BoundedTaskMailbox<S> {
161    fn clone(&self) -> Self {
162        BoundedTaskMailbox {
163            sender: self.sender.clone(),
164        }
165    }
166}