commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digestible, PublicKey};
4use commonware_p2p::Recipients;
5use commonware_utils::channels::fallible::AsyncFallibleExt;
6use futures::channel::{mpsc, oneshot};
7
8/// Message types that can be sent to the `Mailbox`
9pub enum Message<P: PublicKey, M: Committable + Digestible> {
10    /// Broadcast a [crate::Broadcaster::Message] to the network.
11    ///
12    /// The responder will be sent a list of peers that received the message.
13    Broadcast {
14        recipients: Recipients<P>,
15        message: M,
16        responder: oneshot::Sender<Vec<P>>,
17    },
18
19    /// Subscribe to receive a message by digest.
20    ///
21    /// The responder will be sent the first message for an commitment when it is available; either
22    /// instantly (if cached) or when it is received from the network. The request can be canceled
23    /// by dropping the responder.
24    Subscribe {
25        peer: Option<P>,
26        commitment: M::Commitment,
27        digest: Option<M::Digest>,
28        responder: oneshot::Sender<M>,
29    },
30
31    /// Get all messages for an commitment.
32    Get {
33        peer: Option<P>,
34        commitment: M::Commitment,
35        digest: Option<M::Digest>,
36        responder: oneshot::Sender<Vec<M>>,
37    },
38}
39
40/// Ingress mailbox for [super::Engine].
41#[derive(Clone)]
42pub struct Mailbox<P: PublicKey, M: Committable + Digestible + Codec> {
43    sender: mpsc::Sender<Message<P, M>>,
44}
45
46impl<P: PublicKey, M: Committable + Digestible + Codec> Mailbox<P, M> {
47    pub(super) const fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
48        Self { sender }
49    }
50
51    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally).
52    ///
53    /// The responder will be sent the first message for an commitment when it is available; either
54    /// instantly (if cached) or when it is received from the network. The request can be canceled
55    /// by dropping the responder.
56    ///
57    /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
58    pub async fn subscribe(
59        &mut self,
60        peer: Option<P>,
61        commitment: M::Commitment,
62        digest: Option<M::Digest>,
63    ) -> oneshot::Receiver<M> {
64        let (responder, receiver) = oneshot::channel();
65        self.sender
66            .send_lossy(Message::Subscribe {
67                peer,
68                commitment,
69                digest,
70                responder,
71            })
72            .await;
73        receiver
74    }
75
76    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally) with an
77    /// externally prepared responder.
78    ///
79    /// The responder will be sent the first message for an commitment when it is available; either
80    /// instantly (if cached) or when it is received from the network. The request can be canceled
81    /// by dropping the responder.
82    ///
83    /// If the engine has shut down, this is a no-op.
84    pub async fn subscribe_prepared(
85        &mut self,
86        peer: Option<P>,
87        commitment: M::Commitment,
88        digest: Option<M::Digest>,
89        responder: oneshot::Sender<M>,
90    ) {
91        self.sender
92            .send_lossy(Message::Subscribe {
93                peer,
94                commitment,
95                digest,
96                responder,
97            })
98            .await;
99    }
100
101    /// Get all messages for an commitment.
102    ///
103    /// If the engine has shut down, returns an empty vector.
104    pub async fn get(
105        &mut self,
106        peer: Option<P>,
107        commitment: M::Commitment,
108        digest: Option<M::Digest>,
109    ) -> Vec<M> {
110        self.sender
111            .request(|responder| Message::Get {
112                peer,
113                commitment,
114                digest,
115                responder,
116            })
117            .await
118            .unwrap_or_default()
119    }
120}
121
122impl<P: PublicKey, M: Committable + Digestible + Codec> Broadcaster for Mailbox<P, M> {
123    type Recipients = Recipients<P>;
124    type Message = M;
125    type Response = Vec<P>;
126
127    /// Broadcast a message to recipients.
128    ///
129    /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
130    async fn broadcast(
131        &mut self,
132        recipients: Self::Recipients,
133        message: Self::Message,
134    ) -> oneshot::Receiver<Self::Response> {
135        let (sender, receiver) = oneshot::channel();
136        self.sender
137            .send_lossy(Message::Broadcast {
138                recipients,
139                message,
140                responder: sender,
141            })
142            .await;
143        receiver
144    }
145}