commonware_broadcast/buffered/
ingress.rs1use crate::Broadcaster;
2use commonware_actor::{
3 mailbox::{Overflow, Policy, Sender},
4 Feedback,
5};
6use commonware_codec::Codec;
7use commonware_cryptography::{Digestible, PublicKey};
8use commonware_p2p::Recipients;
9use commonware_utils::channel::oneshot;
10use std::collections::VecDeque;
11
12pub(crate) enum Message<P: PublicKey, M: Digestible> {
14 Broadcast {
16 recipients: Recipients<P>,
17 message: M,
18 },
19
20 Subscribe {
26 digest: M::Digest,
27 responder: oneshot::Sender<M>,
28 },
29
30 Get {
32 digest: M::Digest,
33 responder: oneshot::Sender<Option<M>>,
34 },
35}
36
37impl<P: PublicKey, M: Digestible> Message<P, M> {
38 fn response_closed(&self) -> bool {
39 match self {
40 Self::Subscribe { responder, .. } => responder.is_closed(),
41 Self::Get { responder, .. } => responder.is_closed(),
42 Self::Broadcast { .. } => false,
43 }
44 }
45}
46
47pub(crate) struct Pending<P: PublicKey, M: Digestible>(VecDeque<Message<P, M>>);
48
49impl<P: PublicKey, M: Digestible> Default for Pending<P, M> {
50 fn default() -> Self {
51 Self(VecDeque::new())
52 }
53}
54
55impl<P: PublicKey, M: Digestible> Overflow<Message<P, M>> for Pending<P, M> {
56 fn is_empty(&self) -> bool {
57 self.0.is_empty()
58 }
59
60 fn drain<F>(&mut self, mut push: F)
61 where
62 F: FnMut(Message<P, M>) -> Option<Message<P, M>>,
63 {
64 while let Some(message) = self.0.pop_front() {
65 if message.response_closed() {
66 continue;
67 }
68
69 if let Some(message) = push(message) {
70 self.0.push_front(message);
71 break;
72 }
73 }
74 }
75}
76
77impl<P: PublicKey, M: Digestible> Policy for Message<P, M> {
78 type Overflow = Pending<P, M>;
79
80 fn handle(overflow: &mut Self::Overflow, message: Self) {
81 if message.response_closed() {
82 return;
83 }
84
85 overflow.0.push_back(message);
86 }
87}
88
89#[derive(Clone)]
91pub struct Mailbox<P: PublicKey, M: Digestible + Codec> {
92 sender: Sender<Message<P, M>>,
93}
94
95impl<P: PublicKey, M: Digestible + Codec> Mailbox<P, M> {
96 pub(super) const fn new(sender: Sender<Message<P, M>>) -> Self {
97 Self { sender }
98 }
99
100 pub fn subscribe(&self, digest: M::Digest) -> oneshot::Receiver<M> {
108 let (responder, receiver) = oneshot::channel();
109 let _ = self
110 .sender
111 .enqueue(Message::Subscribe { digest, responder });
112 receiver
113 }
114
115 pub fn subscribe_prepared(&self, digest: M::Digest, responder: oneshot::Sender<M>) {
123 let _ = self
124 .sender
125 .enqueue(Message::Subscribe { digest, responder });
126 }
127
128 pub async fn get(&self, digest: M::Digest) -> Option<M> {
132 let (responder, receiver) = oneshot::channel();
133 let _ = self.sender.enqueue(Message::Get { digest, responder });
134 receiver.await.unwrap_or_default()
135 }
136}
137
138impl<P: PublicKey, M: Digestible + Codec> Broadcaster for Mailbox<P, M> {
139 type Recipients = Recipients<P>;
140 type Message = M;
141
142 fn broadcast(&self, recipients: Self::Recipients, message: Self::Message) -> Feedback {
146 self.sender.enqueue(Message::Broadcast {
147 recipients,
148 message,
149 })
150 }
151}