commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Digest, Digestible};
4use futures::{
5    channel::{mpsc, oneshot},
6    SinkExt,
7};
8
9/// Message types that can be sent to the `Mailbox`
10pub enum Message<D, M> {
11    /// Broadcast a [`Message`](crate::Broadcaster::Message) to the network.
12    Broadcast { message: M },
13
14    /// Get a message by digest.
15    ///
16    /// The responder will be sent the message when it is available; either instantly (if cached) or
17    /// when it is received from the network. The request can be canceled by dropping the responder.
18    Get {
19        digest: D,
20        responder: oneshot::Sender<M>,
21    },
22}
23
24/// Ingress mailbox for [`Engine`](super::Engine).
25#[derive(Clone)]
26pub struct Mailbox<D: Digest, M: Digestible<D>> {
27    sender: mpsc::Sender<Message<D, M>>,
28}
29
30impl<D: Digest, M: Digestible<D>> Mailbox<D, M> {
31    pub(super) fn new(sender: mpsc::Sender<Message<D, M>>) -> Self {
32        Self { sender }
33    }
34}
35
36impl<D: Digest, M: Digestible<D>> Mailbox<D, M> {
37    /// Get a message by digest.
38    pub async fn get(&mut self, digest: D) -> oneshot::Receiver<M> {
39        let (sender, receiver) = oneshot::channel();
40        self.sender
41            .send(Message::Get {
42                digest,
43                responder: sender,
44            })
45            .await
46            .expect("mailbox closed");
47        receiver
48    }
49}
50
51impl<D: Digest, M: Codec + Digestible<D>> Broadcaster for Mailbox<D, M> {
52    type Message = M;
53
54    async fn broadcast(&mut self, message: Self::Message) {
55        self.sender
56            .send(Message::Broadcast { message })
57            .await
58            .expect("mailbox closed");
59    }
60}