commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Committable, Digestible};
4use commonware_p2p::Recipients;
5use commonware_utils::Array;
6use futures::{
7    channel::{mpsc, oneshot},
8    SinkExt,
9};
10
11/// Message types that can be sent to the `Mailbox`
12pub enum Message<P: Array, M: Committable + Digestible> {
13    /// Broadcast a [`Message`](crate::Broadcaster::Message) to the network.
14    ///
15    /// The responder will be sent a list of peers that received the message.
16    Broadcast {
17        recipients: Recipients<P>,
18        message: M,
19        responder: oneshot::Sender<Vec<P>>,
20    },
21
22    /// Subscribe to receive a message by digest.
23    ///
24    /// The responder will be sent the first message for an commitment when it is available; either
25    /// instantly (if cached) or when it is received from the network. The request can be canceled
26    /// by dropping the responder.
27    Subscribe {
28        peer: Option<P>,
29        commitment: M::Commitment,
30        digest: Option<M::Digest>,
31        responder: oneshot::Sender<M>,
32    },
33
34    /// Get all messages for an commitment.
35    Get {
36        peer: Option<P>,
37        commitment: M::Commitment,
38        digest: Option<M::Digest>,
39        responder: oneshot::Sender<Vec<M>>,
40    },
41}
42
43/// Ingress mailbox for [`Engine`](super::Engine).
44#[derive(Clone)]
45pub struct Mailbox<P: Array, M: Committable + Digestible + Codec> {
46    sender: mpsc::Sender<Message<P, M>>,
47}
48
49impl<P: Array, M: Committable + Digestible + Codec> Mailbox<P, M> {
50    pub(super) fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
51        Self { sender }
52    }
53}
54
55impl<P: Array, M: Committable + Digestible + Codec> Mailbox<P, M> {
56    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally).
57    ///
58    /// The responder will be sent the first message for an commitment when it is available; either
59    /// instantly (if cached) or when it is received from the network. The request can be canceled
60    /// by dropping the responder.
61    pub async fn subscribe(
62        &mut self,
63        peer: Option<P>,
64        commitment: M::Commitment,
65        digest: Option<M::Digest>,
66    ) -> oneshot::Receiver<M> {
67        let (responder, receiver) = oneshot::channel();
68        self.sender
69            .send(Message::Subscribe {
70                peer,
71                commitment,
72                digest,
73                responder,
74            })
75            .await
76            .expect("mailbox closed");
77        receiver
78    }
79
80    /// Subscribe to a message by peer (optionally), commitment, and digest (optionally) with an
81    /// externally prepared responder.
82    ///
83    /// The responder will be sent the first message for an commitment when it is available; either
84    /// instantly (if cached) or when it is received from the network. The request can be canceled
85    /// by dropping the responder.
86    pub async fn subscribe_prepared(
87        &mut self,
88        peer: Option<P>,
89        commitment: M::Commitment,
90        digest: Option<M::Digest>,
91        responder: oneshot::Sender<M>,
92    ) {
93        self.sender
94            .send(Message::Subscribe {
95                peer,
96                commitment,
97                digest,
98                responder,
99            })
100            .await
101            .expect("mailbox closed");
102    }
103
104    /// Get all messages for an commitment.
105    pub async fn get(
106        &mut self,
107        peer: Option<P>,
108        commitment: M::Commitment,
109        digest: Option<M::Digest>,
110    ) -> Vec<M> {
111        let (responder, receiver) = oneshot::channel();
112        self.sender
113            .send(Message::Get {
114                peer,
115                commitment,
116                digest,
117                responder,
118            })
119            .await
120            .expect("mailbox closed");
121        receiver.await.expect("mailbox closed")
122    }
123}
124
125impl<P: Array, M: Committable + Digestible + Codec> Broadcaster for Mailbox<P, M> {
126    type Recipients = Recipients<P>;
127    type Message = M;
128    type Response = Vec<P>;
129
130    async fn broadcast(
131        &mut self,
132        recipients: Self::Recipients,
133        message: Self::Message,
134    ) -> oneshot::Receiver<Self::Response> {
135        let (sender, receiver) = oneshot::channel();
136        self.sender
137            .send(Message::Broadcast {
138                recipients,
139                message,
140                responder: sender,
141            })
142            .await
143            .expect("mailbox closed");
144        receiver
145    }
146}