commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_codec::{Codec, Config};
3use commonware_cryptography::{Digest, Digestible};
4use commonware_utils::Array;
5use futures::{
6    channel::{mpsc, oneshot},
7    SinkExt,
8};
9
10/// Message types that can be sent to the `Mailbox`
11pub enum Message<P: Array, D: Digest, M: Digestible<D>> {
12    /// Broadcast a [`Message`](crate::Broadcaster::Message) to the network.
13    ///
14    /// The responder will be sent a list of peers that received the message.
15    Broadcast {
16        message: M,
17        responder: oneshot::Sender<Vec<P>>,
18    },
19
20    /// Subscribe to receive a message by digest.
21    ///
22    /// The responder will be sent the message when it is available; either instantly (if cached) or
23    /// when it is received from the network. The request can be canceled by dropping the responder.
24    Subscribe {
25        digest: D,
26        responder: oneshot::Sender<M>,
27    },
28
29    /// Get a message by digest.
30    Get {
31        digest: D,
32        responder: oneshot::Sender<Option<M>>,
33    },
34}
35
36/// Ingress mailbox for [`Engine`](super::Engine).
37#[derive(Clone)]
38pub struct Mailbox<P: Array, D: Digest, M: Digestible<D>> {
39    sender: mpsc::Sender<Message<P, D, M>>,
40}
41
42impl<P: Array, D: Digest, M: Digestible<D>> Mailbox<P, D, M> {
43    pub(super) fn new(sender: mpsc::Sender<Message<P, D, M>>) -> Self {
44        Self { sender }
45    }
46}
47
48impl<P: Array, D: Digest, M: Digestible<D>> Mailbox<P, D, M> {
49    /// Subscribe to a message by digest.
50    ///
51    /// The responder will be sent the message when it is available; either instantly (if cached) or
52    /// when it is received from the network. The request can be canceled by dropping the responder.
53    pub async fn subscribe(&mut self, digest: D) -> oneshot::Receiver<M> {
54        let (sender, receiver) = oneshot::channel();
55        self.sender
56            .send(Message::Subscribe {
57                digest,
58                responder: sender,
59            })
60            .await
61            .expect("mailbox closed");
62        receiver
63    }
64
65    /// Subscribe to a message by digest with an externally prepared sender.
66    ///
67    /// The responder will be sent the message when it is available; either instantly (if cached) or
68    /// when it is received from the network. The request can be canceled by dropping the responder.
69    pub async fn subscribe_prepared(&mut self, digest: D, sender: oneshot::Sender<M>) {
70        self.sender
71            .send(Message::Subscribe {
72                digest,
73                responder: sender,
74            })
75            .await
76            .expect("mailbox closed");
77    }
78
79    /// Get a message by digest.
80    pub async fn get(&mut self, digest: D) -> Option<M> {
81        let (sender, receiver) = oneshot::channel();
82        self.sender
83            .send(Message::Get {
84                digest,
85                responder: sender,
86            })
87            .await
88            .expect("mailbox closed");
89        receiver.await.unwrap_or(None)
90    }
91}
92
93impl<Cfg: Config, P: Array, D: Digest, M: Codec<Cfg> + Digestible<D>> Broadcaster<Cfg>
94    for Mailbox<P, D, M>
95{
96    type Message = M;
97    type Response = Vec<P>;
98
99    async fn broadcast(&mut self, message: Self::Message) -> oneshot::Receiver<Vec<P>> {
100        let (sender, receiver) = oneshot::channel();
101        self.sender
102            .send(Message::Broadcast {
103                message,
104                responder: sender,
105            })
106            .await
107            .expect("mailbox closed");
108        receiver
109    }
110}