commonware_broadcast/buffered/
engine.rs1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::{Codec, Config as CodecCfg};
4use commonware_cryptography::{Digest, Digestible};
5use commonware_macros::select;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 telemetry::metrics::status::{CounterExt, Status},
12 Clock, Handle, Metrics, Spawner,
13};
14use commonware_utils::Array;
15use futures::{
16 channel::{mpsc, oneshot},
17 StreamExt,
18};
19use std::collections::{HashMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22pub struct Engine<
30 E: Clock + Spawner + Metrics,
31 P: Array,
32 D: Digest,
33 Cfg: CodecCfg,
34 M: Digestible<D> + Codec<Cfg>,
35> {
36 context: E,
40
41 public_key: P,
46
47 priority: bool,
49
50 deque_size: usize,
52
53 codec_config: Cfg,
55
56 mailbox_receiver: mpsc::Receiver<Message<P, D, M>>,
61
62 waiters: HashMap<D, Vec<oneshot::Sender<M>>>,
64
65 items: HashMap<D, M>,
70
71 deques: HashMap<P, VecDeque<D>>,
77
78 counts: HashMap<D, usize>,
82
83 metrics: metrics::Metrics,
88}
89
90impl<
91 E: Clock + Spawner + Metrics,
92 P: Array,
93 D: Digest,
94 Cfg: CodecCfg,
95 M: Digestible<D> + Codec<Cfg>,
96 > Engine<E, P, D, Cfg, M>
97{
98 pub fn new(context: E, cfg: Config<Cfg, P>) -> (Self, Mailbox<P, D, M>) {
101 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
102 let mailbox = Mailbox::<P, D, M>::new(mailbox_sender);
103 let metrics = metrics::Metrics::init(context.clone());
104
105 let result = Self {
106 context,
107 public_key: cfg.public_key,
108 priority: cfg.priority,
109 deque_size: cfg.deque_size,
110 codec_config: cfg.codec_config,
111 mailbox_receiver,
112 waiters: HashMap::new(),
113 deques: HashMap::new(),
114 items: HashMap::new(),
115 counts: HashMap::new(),
116 metrics,
117 };
118
119 (result, mailbox)
120 }
121
122 pub fn start(
124 mut self,
125 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
126 ) -> Handle<()> {
127 self.context.spawn_ref()(self.run(network))
128 }
129
130 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
132 let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
133 let mut shutdown = self.context.stopped();
134
135 loop {
136 self.cleanup_waiters();
138 self.metrics.waiters.set(self.waiters.len() as i64);
139
140 select! {
141 _ = &mut shutdown => {
143 debug!("shutdown");
144 },
145
146 mail = self.mailbox_receiver.next() => {
148 let Some(msg) = mail else {
149 error!("mailbox receiver failed");
150 break;
151 };
152 match msg {
153 Message::Broadcast{ message, responder } => {
154 trace!("mailbox: broadcast");
155 self.handle_broadcast(&mut sender, message, responder).await;
156 }
157 Message::Subscribe{ digest, responder } => {
158 trace!("mailbox: subscribe");
159 self.handle_subscribe(digest, responder).await;
160 }
161 Message::Get{ digest, responder } => {
162 trace!("mailbox: get");
163 self.handle_get(digest, responder).await;
164 }
165 }
166 },
167
168 msg = receiver.recv() => {
170 let (peer, msg) = match msg {
172 Ok(r) => r,
173 Err(err) => {
174 error!(?err, "receiver failed");
175 break;
176 }
177 };
178
179 let msg = match msg {
181 Ok(msg) => msg,
182 Err(err) => {
183 warn!(?err, ?peer, "failed to decode message");
184 self.metrics.receive.inc(Status::Invalid);
185 continue;
186 }
187 };
188
189 trace!(?peer, "network");
190 self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
191 self.handle_network(peer, msg).await;
192 },
193 }
194 }
195 }
196
197 async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
203 &mut self,
204 sender: &mut WrappedSender<Sr, Cfg, M>,
205 msg: M,
206 responder: oneshot::Sender<Vec<P>>,
207 ) {
208 let _ = self.insert_message(self.public_key.clone(), msg.clone());
210
211 let recipients = Recipients::All;
213 let sent_to = sender
214 .send(recipients, msg, self.priority)
215 .await
216 .unwrap_or_else(|err| {
217 error!(?err, "failed to send message");
218 vec![]
219 });
220 let _ = responder.send(sent_to);
221 }
222
223 async fn handle_subscribe(&mut self, digest: D, responder: oneshot::Sender<M>) {
228 if let Some(msg) = self.items.get(&digest) {
230 self.respond_subscribe(responder, msg.clone());
231 return;
232 }
233
234 self.waiters.entry(digest).or_default().push(responder);
236 }
237
238 async fn handle_get(&mut self, digest: D, responder: oneshot::Sender<Option<M>>) {
240 let item = self.items.get(&digest).cloned();
241 self.respond_get(responder, item);
242 }
243
244 async fn handle_network(&mut self, peer: P, msg: M) {
246 if !self.insert_message(peer.clone(), msg) {
247 debug!(?peer, "message already stored");
248 self.metrics.receive.inc(Status::Dropped);
249 return;
250 }
251
252 self.metrics.receive.inc(Status::Success);
253 }
254
255 fn insert_message(&mut self, peer: P, msg: M) -> bool {
264 let digest = msg.digest();
265
266 if let Some(responders) = self.waiters.remove(&digest) {
268 for responder in responders {
269 self.respond_subscribe(responder, msg.clone());
270 }
271 }
272
273 let deque = self
275 .deques
276 .entry(peer)
277 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
278
279 if let Some(i) = deque.iter().position(|d| *d == digest) {
281 if i != 0 {
282 deque.remove(i).unwrap(); deque.push_front(digest);
284 }
285 return false;
286 };
287
288 deque.push_front(digest);
292 let count = self
293 .counts
294 .entry(digest)
295 .and_modify(|c| *c = c.checked_add(1).unwrap())
296 .or_insert(1);
297 if *count == 1 {
298 let existing = self.items.insert(digest, msg);
299 assert!(existing.is_none());
300 }
301
302 if deque.len() > self.deque_size {
304 let stale = deque.pop_back().unwrap();
308 let count = self
309 .counts
310 .entry(stale)
311 .and_modify(|c| *c = c.checked_sub(1).unwrap())
312 .or_insert_with(|| unreachable!());
313 if *count == 0 {
314 let existing = self.counts.remove(&stale);
315 assert!(existing == Some(0));
316 self.items.remove(&stale).unwrap(); }
318 }
319
320 true
321 }
322
323 fn cleanup_waiters(&mut self) {
329 self.waiters.retain(|_, waiters| {
330 let initial_len = waiters.len();
331 waiters.retain(|waiter| !waiter.is_canceled());
332 let dropped_count = initial_len - waiters.len();
333
334 for _ in 0..dropped_count {
336 self.metrics.get.inc(Status::Dropped);
337 }
338
339 !waiters.is_empty()
340 });
341 }
342
343 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
346 let result = responder.send(msg);
347 self.metrics.subscribe.inc(match result {
348 Ok(_) => Status::Success,
349 Err(_) => Status::Dropped,
350 });
351 }
352
353 fn respond_get(&mut self, responder: oneshot::Sender<Option<M>>, msg: Option<M>) {
356 let found = msg.is_some();
357 let result = responder.send(msg);
358 self.metrics.get.inc(match result {
359 Ok(_) if found => Status::Success,
360 Ok(_) => Status::Failure,
361 Err(_) => Status::Dropped,
362 });
363 }
364}