Skip to main content

commonware_broadcast/buffered/
ingress.rs

1use crate::Broadcaster;
2use commonware_actor::{
3    mailbox::{Overflow, Policy, Sender},
4    Feedback,
5};
6use commonware_codec::Codec;
7use commonware_cryptography::{Digestible, PublicKey};
8use commonware_p2p::Recipients;
9use commonware_utils::channel::oneshot;
10use std::collections::VecDeque;
11
12/// Message types that can be sent to the `Mailbox`
13pub(crate) enum Message<P: PublicKey, M: Digestible> {
14    /// Broadcast a [crate::Broadcaster::Message] to the network.
15    Broadcast {
16        recipients: Recipients<P>,
17        message: M,
18    },
19
20    /// Subscribe to receive a message by digest.
21    ///
22    /// The responder will be sent the message when it is available; either
23    /// instantly (if cached) or when it is received from the network. The request can be canceled
24    /// by dropping the responder.
25    Subscribe {
26        digest: M::Digest,
27        responder: oneshot::Sender<M>,
28    },
29
30    /// Get a message by digest.
31    Get {
32        digest: M::Digest,
33        responder: oneshot::Sender<Option<M>>,
34    },
35}
36
37impl<P: PublicKey, M: Digestible> Message<P, M> {
38    fn response_closed(&self) -> bool {
39        match self {
40            Self::Subscribe { responder, .. } => responder.is_closed(),
41            Self::Get { responder, .. } => responder.is_closed(),
42            Self::Broadcast { .. } => false,
43        }
44    }
45}
46
47pub(crate) struct Pending<P: PublicKey, M: Digestible>(VecDeque<Message<P, M>>);
48
49impl<P: PublicKey, M: Digestible> Default for Pending<P, M> {
50    fn default() -> Self {
51        Self(VecDeque::new())
52    }
53}
54
55impl<P: PublicKey, M: Digestible> Overflow<Message<P, M>> for Pending<P, M> {
56    fn is_empty(&self) -> bool {
57        self.0.is_empty()
58    }
59
60    fn drain<F>(&mut self, mut push: F)
61    where
62        F: FnMut(Message<P, M>) -> Option<Message<P, M>>,
63    {
64        while let Some(message) = self.0.pop_front() {
65            if message.response_closed() {
66                continue;
67            }
68
69            if let Some(message) = push(message) {
70                self.0.push_front(message);
71                break;
72            }
73        }
74    }
75}
76
77impl<P: PublicKey, M: Digestible> Policy for Message<P, M> {
78    type Overflow = Pending<P, M>;
79
80    fn handle(overflow: &mut Self::Overflow, message: Self) {
81        if message.response_closed() {
82            return;
83        }
84
85        overflow.0.push_back(message);
86    }
87}
88
89/// Ingress mailbox for [super::Engine].
90#[derive(Clone)]
91pub struct Mailbox<P: PublicKey, M: Digestible + Codec> {
92    sender: Sender<Message<P, M>>,
93}
94
95impl<P: PublicKey, M: Digestible + Codec> Mailbox<P, M> {
96    pub(super) const fn new(sender: Sender<Message<P, M>>) -> Self {
97        Self { sender }
98    }
99
100    /// Subscribe to a message by digest.
101    ///
102    /// The responder will be sent the message when it is available; either
103    /// instantly (if cached) or when it is received from the network. The request can be canceled
104    /// by dropping the responder.
105    ///
106    /// If the engine has shut down, the returned receiver will resolve to `Canceled`.
107    pub fn subscribe(&self, digest: M::Digest) -> oneshot::Receiver<M> {
108        let (responder, receiver) = oneshot::channel();
109        let _ = self
110            .sender
111            .enqueue(Message::Subscribe { digest, responder });
112        receiver
113    }
114
115    /// Subscribe to a message by digest with an externally prepared responder.
116    ///
117    /// The responder will be sent the message when it is available; either
118    /// instantly (if cached) or when it is received from the network. The request can be canceled
119    /// by dropping the responder.
120    ///
121    /// If the engine has shut down, this is a no-op.
122    pub fn subscribe_prepared(&self, digest: M::Digest, responder: oneshot::Sender<M>) {
123        let _ = self
124            .sender
125            .enqueue(Message::Subscribe { digest, responder });
126    }
127
128    /// Get a message by digest.
129    ///
130    /// If the engine has shut down, returns `None`.
131    pub async fn get(&self, digest: M::Digest) -> Option<M> {
132        let (responder, receiver) = oneshot::channel();
133        let _ = self.sender.enqueue(Message::Get { digest, responder });
134        receiver.await.unwrap_or_default()
135    }
136}
137
138impl<P: PublicKey, M: Digestible + Codec> Broadcaster for Mailbox<P, M> {
139    type Recipients = Recipients<P>;
140    type Message = M;
141
142    /// Broadcast a message to recipients.
143    ///
144    /// If the engine has shut down, returns [`Feedback::Closed`].
145    fn broadcast(&self, recipients: Self::Recipients, message: Self::Message) -> Feedback {
146        self.sender.enqueue(Message::Broadcast {
147            recipients,
148            message,
149        })
150    }
151}