commonware_broadcast/buffered/ingress.rs
1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digestible, PublicKey};
4use commonware_p2p::Recipients;
5use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
6
7/// Message types that can be sent to the `Mailbox`
8pub enum Message<P: PublicKey, M: Committable + Digestible> {
9 /// Broadcast a [crate::Broadcaster::Message] to the network.
10 ///
11 /// The responder will be sent a list of peers that received the message.
12 Broadcast {
13 recipients: Recipients<P>,
14 message: M,
15 responder: oneshot::Sender<Vec<P>>,
16 },
17
18 /// Subscribe to receive a message by digest.
19 ///
20 /// The responder will be sent the first message for an commitment when it is available; either
21 /// instantly (if cached) or when it is received from the network. The request can be canceled
22 /// by dropping the responder.
23 Subscribe {
24 peer: Option<P>,
25 commitment: M::Commitment,
26 digest: Option<M::Digest>,
27 responder: oneshot::Sender<M>,
28 },
29
30 /// Get all messages for an commitment.
31 Get {
32 peer: Option<P>,
33 commitment: M::Commitment,
34 digest: Option<M::Digest>,
35 responder: oneshot::Sender<Vec<M>>,
36 },
37}
38
39/// Ingress mailbox for [super::Engine].
40#[derive(Clone)]
41pub struct Mailbox<P: PublicKey, M: Committable + Digestible + Codec> {
42 sender: mpsc::Sender<Message<P, M>>,
43}
44
45impl<P: PublicKey, M: Committable + Digestible + Codec> Mailbox<P, M> {
46 pub(super) const fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
47 Self { sender }
48 }
49
50 /// Subscribe to a message by peer (optionally), commitment, and digest (optionally).
51 ///
52 /// The responder will be sent the first message for an commitment when it is available; either
53 /// instantly (if cached) or when it is received from the network. The request can be canceled
54 /// by dropping the responder.
55 ///
56 /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
57 pub async fn subscribe(
58 &mut self,
59 peer: Option<P>,
60 commitment: M::Commitment,
61 digest: Option<M::Digest>,
62 ) -> oneshot::Receiver<M> {
63 let (responder, receiver) = oneshot::channel();
64 self.sender
65 .send_lossy(Message::Subscribe {
66 peer,
67 commitment,
68 digest,
69 responder,
70 })
71 .await;
72 receiver
73 }
74
75 /// Subscribe to a message by peer (optionally), commitment, and digest (optionally) with an
76 /// externally prepared responder.
77 ///
78 /// The responder will be sent the first message for an commitment when it is available; either
79 /// instantly (if cached) or when it is received from the network. The request can be canceled
80 /// by dropping the responder.
81 ///
82 /// If the engine has shut down, this is a no-op.
83 pub async fn subscribe_prepared(
84 &mut self,
85 peer: Option<P>,
86 commitment: M::Commitment,
87 digest: Option<M::Digest>,
88 responder: oneshot::Sender<M>,
89 ) {
90 self.sender
91 .send_lossy(Message::Subscribe {
92 peer,
93 commitment,
94 digest,
95 responder,
96 })
97 .await;
98 }
99
100 /// Get all messages for an commitment.
101 ///
102 /// If the engine has shut down, returns an empty vector.
103 pub async fn get(
104 &mut self,
105 peer: Option<P>,
106 commitment: M::Commitment,
107 digest: Option<M::Digest>,
108 ) -> Vec<M> {
109 self.sender
110 .request(|responder| Message::Get {
111 peer,
112 commitment,
113 digest,
114 responder,
115 })
116 .await
117 .unwrap_or_default()
118 }
119}
120
121impl<P: PublicKey, M: Committable + Digestible + Codec> Broadcaster for Mailbox<P, M> {
122 type Recipients = Recipients<P>;
123 type Message = M;
124 type Response = Vec<P>;
125
126 /// Broadcast a message to recipients.
127 ///
128 /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
129 async fn broadcast(
130 &mut self,
131 recipients: Self::Recipients,
132 message: Self::Message,
133 ) -> oneshot::Receiver<Self::Response> {
134 let (sender, receiver) = oneshot::channel();
135 self.sender
136 .send_lossy(Message::Broadcast {
137 recipients,
138 message,
139 responder: sender,
140 })
141 .await;
142 receiver
143 }
144}