Skip to main content

commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_codec::Codec;
3use commonware_cryptography::{Digestible, PublicKey};
4use commonware_p2p::Recipients;
5use commonware_utils::channel::{fallible::AsyncFallibleExt, mpsc, oneshot};
6
7/// Message types that can be sent to the `Mailbox`
8pub enum Message<P: PublicKey, M: Digestible> {
9    /// Broadcast a [crate::Broadcaster::Message] to the network.
10    ///
11    /// The responder will be sent a list of peers that received the message.
12    Broadcast {
13        recipients: Recipients<P>,
14        message: M,
15        responder: oneshot::Sender<Vec<P>>,
16    },
17
18    /// Subscribe to receive a message by digest.
19    ///
20    /// The responder will be sent the message when it is available; either
21    /// instantly (if cached) or when it is received from the network. The request can be canceled
22    /// by dropping the responder.
23    Subscribe {
24        digest: M::Digest,
25        responder: oneshot::Sender<M>,
26    },
27
28    /// Get a message by digest.
29    Get {
30        digest: M::Digest,
31        responder: oneshot::Sender<Option<M>>,
32    },
33}
34
35/// Ingress mailbox for [super::Engine].
36#[derive(Clone)]
37pub struct Mailbox<P: PublicKey, M: Digestible + Codec> {
38    sender: mpsc::Sender<Message<P, M>>,
39}
40
41impl<P: PublicKey, M: Digestible + Codec> Mailbox<P, M> {
42    pub(super) const fn new(sender: mpsc::Sender<Message<P, M>>) -> Self {
43        Self { sender }
44    }
45
46    /// Subscribe to a message by digest.
47    ///
48    /// The responder will be sent the message when it is available; either
49    /// instantly (if cached) or when it is received from the network. The request can be canceled
50    /// by dropping the responder.
51    ///
52    /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
53    pub async fn subscribe(&self, digest: M::Digest) -> oneshot::Receiver<M> {
54        let (responder, receiver) = oneshot::channel();
55        self.sender
56            .send_lossy(Message::Subscribe { digest, responder })
57            .await;
58        receiver
59    }
60
61    /// Subscribe to a message by digest with an externally prepared responder.
62    ///
63    /// The responder will be sent the message when it is available; either
64    /// instantly (if cached) or when it is received from the network. The request can be canceled
65    /// by dropping the responder.
66    ///
67    /// If the engine has shut down, this is a no-op.
68    pub async fn subscribe_prepared(&self, digest: M::Digest, responder: oneshot::Sender<M>) {
69        self.sender
70            .send_lossy(Message::Subscribe { digest, responder })
71            .await;
72    }
73
74    /// Get a message by digest.
75    ///
76    /// If the engine has shut down, returns `None`.
77    pub async fn get(&self, digest: M::Digest) -> Option<M> {
78        self.sender
79            .request(|responder| Message::Get { digest, responder })
80            .await
81            .unwrap_or_default()
82    }
83}
84
85impl<P: PublicKey, M: Digestible + Codec> Broadcaster for Mailbox<P, M> {
86    type Recipients = Recipients<P>;
87    type Message = M;
88    type Response = Vec<P>;
89
90    /// Broadcast a message to recipients.
91    ///
92    /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
93    async fn broadcast(
94        &self,
95        recipients: Self::Recipients,
96        message: Self::Message,
97    ) -> oneshot::Receiver<Self::Response> {
98        let (sender, receiver) = oneshot::channel();
99        self.sender
100            .send_lossy(Message::Broadcast {
101                recipients,
102                message,
103                responder: sender,
104            })
105            .await;
106        receiver
107    }
108}