commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digest, Digestible};
4use commonware_p2p::Recipients;
5use commonware_utils::Array;
6use futures::{
7    channel::{mpsc, oneshot},
8    SinkExt,
9};
10
11/// Message types that can be sent to the `Mailbox`
12pub enum Message<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd>> {
13    /// Broadcast a [`Message`](crate::Broadcaster::Message) to the network.
14    ///
15    /// The responder will be sent a list of peers that received the message.
16    Broadcast {
17        recipients: Recipients<P>,
18        message: M,
19        responder: oneshot::Sender<Vec<P>>,
20    },
21
22    /// Subscribe to receive a message by digest.
23    ///
24    /// The responder will be sent the first message for an commitment when it is available; either
25    /// instantly (if cached) or when it is received from the network. The request can be canceled
26    /// by dropping the responder.
27    Subscribe {
28        peer: Option<P>,
29        commitment: Dc,
30        digest: Option<Dd>,
31        responder: oneshot::Sender<M>,
32    },
33
34    /// Get all messages for an commitment.
35    Get {
36        peer: Option<P>,
37        commitment: Dc,
38        digest: Option<Dd>,
39        responder: oneshot::Sender<Vec<M>>,
40    },
41}
42
43/// Ingress mailbox for [`Engine`](super::Engine).
44#[derive(Clone)]
45pub struct Mailbox<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd> + Codec> {
46    sender: mpsc::Sender<Message<P, Dc, Dd, M>>,
47}
48
49impl<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd> + Codec>
50    Mailbox<P, Dc, Dd, M>
51{
52    pub(super) fn new(sender: mpsc::Sender<Message<P, Dc, Dd, M>>) -> Self {
53        Self { sender }
54    }
55}
56
57impl<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd> + Codec>
58    Mailbox<P, Dc, Dd, M>
59{
60    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally).
61    ///
62    /// The responder will be sent the first message for an commitment when it is available; either
63    /// instantly (if cached) or when it is received from the network. The request can be canceled
64    /// by dropping the responder.
65    pub async fn subscribe(
66        &mut self,
67        peer: Option<P>,
68        commitment: Dc,
69        digest: Option<Dd>,
70    ) -> oneshot::Receiver<M> {
71        let (responder, receiver) = oneshot::channel();
72        self.sender
73            .send(Message::Subscribe {
74                peer,
75                commitment,
76                digest,
77                responder,
78            })
79            .await
80            .expect("mailbox closed");
81        receiver
82    }
83
84    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally) with an
85    /// externally prepared responder.
86    ///
87    /// The responder will be sent the first message for an commitment when it is available; either
88    /// instantly (if cached) or when it is received from the network. The request can be canceled
89    /// by dropping the responder.
90    pub async fn subscribe_prepared(
91        &mut self,
92        peer: Option<P>,
93        commitment: Dc,
94        digest: Option<Dd>,
95        responder: oneshot::Sender<M>,
96    ) {
97        self.sender
98            .send(Message::Subscribe {
99                peer,
100                commitment,
101                digest,
102                responder,
103            })
104            .await
105            .expect("mailbox closed");
106    }
107
108    /// Get all messages for an commitment.
109    pub async fn get(&mut self, peer: Option<P>, commitment: Dc, digest: Option<Dd>) -> Vec<M> {
110        let (responder, receiver) = oneshot::channel();
111        self.sender
112            .send(Message::Get {
113                peer,
114                commitment,
115                digest,
116                responder,
117            })
118            .await
119            .expect("mailbox closed");
120        receiver.await.expect("mailbox closed")
121    }
122}
123
124impl<P: Array, Dc: Digest, Dd: Digest, M: Committable<Dc> + Digestible<Dd> + Codec> Broadcaster
125    for Mailbox<P, Dc, Dd, M>
126{
127    type Recipients = Recipients<P>;
128    type Message = M;
129    type Response = Vec<P>;
130
131    async fn broadcast(
132        &mut self,
133        recipients: Self::Recipients,
134        message: Self::Message,
135    ) -> oneshot::Receiver<Self::Response> {
136        let (sender, receiver) = oneshot::channel();
137        self.sender
138            .send(Message::Broadcast {
139                recipients,
140                message,
141                responder: sender,
142            })
143            .await
144            .expect("mailbox closed");
145        receiver
146    }
147}