Skip to main content

commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digestible, PublicKey};
4use commonware_p2p::Recipients;
5use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
6
7/// Message types that can be sent to the `Mailbox`
8pub enum Message<P: PublicKey, M: Committable + Digestible> {
9    /// Broadcast a [crate::Broadcaster::Message] to the network.
10    ///
11    /// The responder will be sent a list of peers that received the message.
12    Broadcast {
13        recipients: Recipients<P>,
14        message: M,
15        responder: oneshot::Sender<Vec<P>>,
16    },
17
18    /// Subscribe to receive a message by digest.
19    ///
20    /// The responder will be sent the first message for an commitment when it is available; either
21    /// instantly (if cached) or when it is received from the network. The request can be canceled
22    /// by dropping the responder.
23    Subscribe {
24        peer: Option<P>,
25        commitment: M::Commitment,
26        digest: Option<M::Digest>,
27        responder: oneshot::Sender<M>,
28    },
29
30    /// Get all messages for an commitment.
31    Get {
32        peer: Option<P>,
33        commitment: M::Commitment,
34        digest: Option<M::Digest>,
35        responder: oneshot::Sender<Vec<M>>,
36    },
37}
38
39/// Ingress mailbox for [super::Engine].
40#[derive(Clone)]
41pub struct Mailbox<P: PublicKey, M: Committable + Digestible + Codec> {
42    sender: mpsc::Sender<Message<P, M>>,
43}
44
45impl<P: PublicKey, M: Committable + Digestible + Codec> Mailbox<P, M> {
46    pub(super) const fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
47        Self { sender }
48    }
49
50    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally).
51    ///
52    /// The responder will be sent the first message for an commitment when it is available; either
53    /// instantly (if cached) or when it is received from the network. The request can be canceled
54    /// by dropping the responder.
55    ///
56    /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
57    pub async fn subscribe(
58        &mut self,
59        peer: Option<P>,
60        commitment: M::Commitment,
61        digest: Option<M::Digest>,
62    ) -> oneshot::Receiver<M> {
63        let (responder, receiver) = oneshot::channel();
64        self.sender
65            .send_lossy(Message::Subscribe {
66                peer,
67                commitment,
68                digest,
69                responder,
70            })
71            .await;
72        receiver
73    }
74
75    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally) with an
76    /// externally prepared responder.
77    ///
78    /// The responder will be sent the first message for an commitment when it is available; either
79    /// instantly (if cached) or when it is received from the network. The request can be canceled
80    /// by dropping the responder.
81    ///
82    /// If the engine has shut down, this is a no-op.
83    pub async fn subscribe_prepared(
84        &mut self,
85        peer: Option<P>,
86        commitment: M::Commitment,
87        digest: Option<M::Digest>,
88        responder: oneshot::Sender<M>,
89    ) {
90        self.sender
91            .send_lossy(Message::Subscribe {
92                peer,
93                commitment,
94                digest,
95                responder,
96            })
97            .await;
98    }
99
100    /// Get all messages for an commitment.
101    ///
102    /// If the engine has shut down, returns an empty vector.
103    pub async fn get(
104        &mut self,
105        peer: Option<P>,
106        commitment: M::Commitment,
107        digest: Option<M::Digest>,
108    ) -> Vec<M> {
109        self.sender
110            .request(|responder| Message::Get {
111                peer,
112                commitment,
113                digest,
114                responder,
115            })
116            .await
117            .unwrap_or_default()
118    }
119}
120
121impl<P: PublicKey, M: Committable + Digestible + Codec> Broadcaster for Mailbox<P, M> {
122    type Recipients = Recipients<P>;
123    type Message = M;
124    type Response = Vec<P>;
125
126    /// Broadcast a message to recipients.
127    ///
128    /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
129    async fn broadcast(
130        &mut self,
131        recipients: Self::Recipients,
132        message: Self::Message,
133    ) -> oneshot::Receiver<Self::Response> {
134        let (sender, receiver) = oneshot::channel();
135        self.sender
136            .send_lossy(Message::Broadcast {
137                recipients,
138                message,
139                responder: sender,
140            })
141            .await;
142        receiver
143    }
144}