commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digestible, PublicKey};
4use commonware_p2p::Recipients;
5use futures::{
6    channel::{mpsc, oneshot},
7    SinkExt,
8};
9
10/// Message types that can be sent to the `Mailbox`
11pub enum Message<P: PublicKey, M: Committable + Digestible> {
12    /// Broadcast a [crate::Broadcaster::Message] to the network.
13    ///
14    /// The responder will be sent a list of peers that received the message.
15    Broadcast {
16        recipients: Recipients<P>,
17        message: M,
18        responder: oneshot::Sender<Vec<P>>,
19    },
20
21    /// Subscribe to receive a message by digest.
22    ///
23    /// The responder will be sent the first message for an commitment when it is available; either
24    /// instantly (if cached) or when it is received from the network. The request can be canceled
25    /// by dropping the responder.
26    Subscribe {
27        peer: Option<P>,
28        commitment: M::Commitment,
29        digest: Option<M::Digest>,
30        responder: oneshot::Sender<M>,
31    },
32
33    /// Get all messages for an commitment.
34    Get {
35        peer: Option<P>,
36        commitment: M::Commitment,
37        digest: Option<M::Digest>,
38        responder: oneshot::Sender<Vec<M>>,
39    },
40}
41
42/// Ingress mailbox for [super::Engine].
43#[derive(Clone)]
44pub struct Mailbox<P: PublicKey, M: Committable + Digestible + Codec> {
45    sender: mpsc::Sender<Message<P, M>>,
46}
47
48impl<P: PublicKey, M: Committable + Digestible + Codec> Mailbox<P, M> {
49    pub(super) fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
50        Self { sender }
51    }
52}
53
54impl<P: PublicKey, M: Committable + Digestible + Codec> Mailbox<P, M> {
55    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally).
56    ///
57    /// The responder will be sent the first message for an commitment when it is available; either
58    /// instantly (if cached) or when it is received from the network. The request can be canceled
59    /// by dropping the responder.
60    pub async fn subscribe(
61        &mut self,
62        peer: Option<P>,
63        commitment: M::Commitment,
64        digest: Option<M::Digest>,
65    ) -> oneshot::Receiver<M> {
66        let (responder, receiver) = oneshot::channel();
67        self.sender
68            .send(Message::Subscribe {
69                peer,
70                commitment,
71                digest,
72                responder,
73            })
74            .await
75            .expect("mailbox closed");
76        receiver
77    }
78
79    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally) with an
80    /// externally prepared responder.
81    ///
82    /// The responder will be sent the first message for an commitment when it is available; either
83    /// instantly (if cached) or when it is received from the network. The request can be canceled
84    /// by dropping the responder.
85    pub async fn subscribe_prepared(
86        &mut self,
87        peer: Option<P>,
88        commitment: M::Commitment,
89        digest: Option<M::Digest>,
90        responder: oneshot::Sender<M>,
91    ) {
92        self.sender
93            .send(Message::Subscribe {
94                peer,
95                commitment,
96                digest,
97                responder,
98            })
99            .await
100            .expect("mailbox closed");
101    }
102
103    /// Get all messages for an commitment.
104    pub async fn get(
105        &mut self,
106        peer: Option<P>,
107        commitment: M::Commitment,
108        digest: Option<M::Digest>,
109    ) -> Vec<M> {
110        let (responder, receiver) = oneshot::channel();
111        self.sender
112            .send(Message::Get {
113                peer,
114                commitment,
115                digest,
116                responder,
117            })
118            .await
119            .expect("mailbox closed");
120        receiver.await.expect("mailbox closed")
121    }
122}
123
124impl<P: PublicKey, M: Committable + Digestible + Codec> Broadcaster for Mailbox<P, M> {
125    type Recipients = Recipients<P>;
126    type Message = M;
127    type Response = Vec<P>;
128
129    async fn broadcast(
130        &mut self,
131        recipients: Self::Recipients,
132        message: Self::Message,
133    ) -> oneshot::Receiver<Self::Response> {
134        let (sender, receiver) = oneshot::channel();
135        self.sender
136            .send(Message::Broadcast {
137                recipients,
138                message,
139                responder: sender,
140            })
141            .await
142            .expect("mailbox closed");
143        receiver
144    }
145}