commonware_broadcast/buffered/engine.rs
1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digest, Digestible};
5use commonware_macros::select;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 telemetry::metrics::status::{CounterExt, Status},
12 Clock, Handle, Metrics, Spawner,
13};
14use commonware_utils::Array;
15use futures::{
16 channel::{mpsc, oneshot},
17 StreamExt,
18};
19use std::collections::{HashMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22/// A responder waiting for a message.
23struct Waiter<P, Dd, M> {
24 /// The peer sending the message.
25 peer: Option<P>,
26
27 /// The digest of the message.
28 digest: Option<Dd>,
29
30 /// The responder to send the message to.
31 responder: oneshot::Sender<M>,
32}
33
34/// A pair of commitment and digest.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36struct Pair<Dc, Dd> {
37 /// The commitment of the message.
38 commitment: Dc,
39
40 /// The digest of the message.
41 digest: Dd,
42}
43
44/// Instance of the main engine for the module.
45///
46/// It is responsible for:
47/// - Broadcasting messages to the network
48/// - Receiving messages from the network
49/// - Storing messages in the cache
50/// - Responding to requests from the application
51pub struct Engine<
52 E: Clock + Spawner + Metrics,
53 P: Array,
54 Dc: Digest,
55 Dd: Digest,
56 M: Committable<Dc> + Digestible<Dd> + Codec,
57> {
58 ////////////////////////////////////////
59 // Interfaces
60 ////////////////////////////////////////
61 context: E,
62
63 ////////////////////////////////////////
64 // Configuration
65 ////////////////////////////////////////
66 /// My public key
67 public_key: P,
68
69 /// Whether messages are sent as priority
70 priority: bool,
71
72 /// Number of messages to cache per peer
73 deque_size: usize,
74
75 /// Configuration for decoding messages
76 codec_config: M::Cfg,
77
78 ////////////////////////////////////////
79 // Messaging
80 ////////////////////////////////////////
81 /// The mailbox for receiving messages.
82 mailbox_receiver: mpsc::Receiver<Message<P, Dc, Dd, M>>,
83
84 /// Pending requests from the application.
85 waiters: HashMap<Dc, Vec<Waiter<P, Dd, M>>>,
86
87 ////////////////////////////////////////
88 // Cache
89 ////////////////////////////////////////
90 /// All cached messages by commitment and digest.
91 ///
92 /// We store messages outside of the deques to minimize memory usage
93 /// when receiving duplicate messages.
94 items: HashMap<Dc, HashMap<Dd, M>>,
95
96 /// A LRU cache of the latest received identities and digests from each peer.
97 ///
98 /// This is used to limit the number of digests stored per peer.
99 /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
100 /// membership checks are done in linear time.
101 deques: HashMap<P, VecDeque<Pair<Dc, Dd>>>,
102
103 /// The number of times each digest (globally unique) exists in one of the deques.
104 ///
105 /// Multiple peers can send the same message and we only want to store
106 /// the message once.
107 counts: HashMap<Dd, usize>,
108
109 ////////////////////////////////////////
110 // Metrics
111 ////////////////////////////////////////
112 /// Metrics
113 metrics: metrics::Metrics,
114}
115
116impl<
117 E: Clock + Spawner + Metrics,
118 P: Array,
119 Dc: Digest,
120 Dd: Digest,
121 M: Committable<Dc> + Digestible<Dd> + Codec,
122 > Engine<E, P, Dc, Dd, M>
123{
124 /// Creates a new engine with the given context and configuration.
125 /// Returns the engine and a mailbox for sending messages to the engine.
126 pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, Dc, Dd, M>) {
127 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
128 let mailbox = Mailbox::<P, Dc, Dd, M>::new(mailbox_sender);
129 let metrics = metrics::Metrics::init(context.clone());
130
131 let result = Self {
132 context,
133 public_key: cfg.public_key,
134 priority: cfg.priority,
135 deque_size: cfg.deque_size,
136 codec_config: cfg.codec_config,
137 mailbox_receiver,
138 waiters: HashMap::new(),
139 deques: HashMap::new(),
140 items: HashMap::new(),
141 counts: HashMap::new(),
142 metrics,
143 };
144
145 (result, mailbox)
146 }
147
148 /// Starts the engine with the given network.
149 pub fn start(
150 mut self,
151 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
152 ) -> Handle<()> {
153 self.context.spawn_ref()(self.run(network))
154 }
155
156 /// Inner run loop called by `start`.
157 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
158 let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
159 let mut shutdown = self.context.stopped();
160
161 loop {
162 // Cleanup waiters
163 self.cleanup_waiters();
164 self.metrics.waiters.set(self.waiters.len() as i64);
165
166 select! {
167 // Handle shutdown signal
168 _ = &mut shutdown => {
169 debug!("shutdown");
170 },
171
172 // Handle mailbox messages
173 mail = self.mailbox_receiver.next() => {
174 let Some(msg) = mail else {
175 error!("mailbox receiver failed");
176 break;
177 };
178 match msg {
179 Message::Broadcast{ recipients, message, responder } => {
180 trace!("mailbox: broadcast");
181 self.handle_broadcast(&mut sender, recipients, message, responder).await;
182 }
183 Message::Subscribe{ peer, commitment, digest, responder } => {
184 trace!("mailbox: subscribe");
185 self.handle_subscribe(peer, commitment, digest, responder).await;
186 }
187 Message::Get{ peer, commitment, digest, responder } => {
188 trace!("mailbox: get");
189 self.handle_get(peer, commitment, digest, responder).await;
190 }
191 }
192 },
193
194 // Handle incoming messages
195 msg = receiver.recv() => {
196 // Error handling
197 let (peer, msg) = match msg {
198 Ok(r) => r,
199 Err(err) => {
200 error!(?err, "receiver failed");
201 break;
202 }
203 };
204
205 // Decode the message
206 let msg = match msg {
207 Ok(msg) => msg,
208 Err(err) => {
209 warn!(?err, ?peer, "failed to decode message");
210 self.metrics.receive.inc(Status::Invalid);
211 continue;
212 }
213 };
214
215 trace!(?peer, "network");
216 self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
217 self.handle_network(peer, msg).await;
218 },
219 }
220 }
221 }
222
223 ////////////////////////////////////////
224 // Handling
225 ////////////////////////////////////////
226
227 /// Handles a `broadcast` request from the application.
228 async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
229 &mut self,
230 sender: &mut WrappedSender<Sr, M>,
231 recipients: Recipients<P>,
232 msg: M,
233 responder: oneshot::Sender<Vec<P>>,
234 ) {
235 // Store the message, continue even if it was already stored
236 let _ = self.insert_message(self.public_key.clone(), msg.clone());
237
238 // Broadcast the message to the network
239 let sent_to = sender
240 .send(recipients, msg, self.priority)
241 .await
242 .unwrap_or_else(|err| {
243 error!(?err, "failed to send message");
244 vec![]
245 });
246 let _ = responder.send(sent_to);
247 }
248
249 /// Searches through all maintained messages for a match.
250 fn find_messages(
251 &mut self,
252 peer: &Option<P>,
253 commitment: Dc,
254 digest: Option<Dd>,
255 all: bool,
256 ) -> Vec<M> {
257 match peer {
258 // Only consider messages from the peer filter
259 Some(s) => self
260 .deques
261 .get(s)
262 .into_iter()
263 .flat_map(|dq| dq.iter())
264 .filter(|pair| pair.commitment == commitment)
265 .filter_map(|pair| {
266 self.items
267 .get(&pair.commitment)
268 .and_then(|m| m.get(&pair.digest))
269 })
270 .cloned()
271 .collect(),
272
273 // Search by commitment
274 None => match self.items.get(&commitment) {
275 // If there are no messages for the commitment, return an empty vector
276 None => Vec::new(),
277
278 // If there are messages, return the ones that match the digest filter
279 Some(msgs) => match digest {
280 // If a digest is provided, return whatever matches it.
281 Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
282
283 // If no digest was provided, return `all` messages for the commitment.
284 None if all => msgs.values().cloned().collect(),
285 None => msgs.values().next().cloned().into_iter().collect(),
286 },
287 },
288 }
289 }
290
291 /// Handles a `subscribe` request from the application.
292 ///
293 /// If the message is already in the cache, the responder is immediately sent the message.
294 /// Otherwise, the responder is stored in the waiters list.
295 async fn handle_subscribe(
296 &mut self,
297 peer: Option<P>,
298 commitment: Dc,
299 digest: Option<Dd>,
300 responder: oneshot::Sender<M>,
301 ) {
302 // Check if the message is already in the cache
303 let mut items = self.find_messages(&peer, commitment, digest, false);
304 if let Some(item) = items.pop() {
305 self.respond_subscribe(responder, item);
306 return;
307 }
308
309 // Store the responder
310 self.waiters.entry(commitment).or_default().push(Waiter {
311 peer,
312 digest,
313 responder,
314 });
315 }
316
317 /// Handles a `get` request from the application.
318 async fn handle_get(
319 &mut self,
320 peer: Option<P>,
321 commitment: Dc,
322 digest: Option<Dd>,
323 responder: oneshot::Sender<Vec<M>>,
324 ) {
325 let items = self.find_messages(&peer, commitment, digest, true);
326 self.respond_get(responder, items);
327 }
328
329 /// Handles a message that was received from a peer.
330 async fn handle_network(&mut self, peer: P, msg: M) {
331 if !self.insert_message(peer.clone(), msg) {
332 debug!(?peer, "message already stored");
333 self.metrics.receive.inc(Status::Dropped);
334 return;
335 }
336
337 self.metrics.receive.inc(Status::Success);
338 }
339
340 ////////////////////////////////////////
341 // Cache Management
342 ////////////////////////////////////////
343
344 /// Inserts a message into the cache.
345 ///
346 /// Returns `true` if the message was inserted, `false` if it was already present.
347 /// Updates the deque, item count, and message cache, potentially evicting an old message.
348 fn insert_message(&mut self, peer: P, msg: M) -> bool {
349 // Get the commitment and digest of the message
350 let pair = Pair {
351 commitment: msg.commitment(),
352 digest: msg.digest(),
353 };
354
355 // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
356 if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
357 let mut i = 0;
358 while i < waiters.len() {
359 // Get the peer and digest filters
360 let Waiter {
361 peer: peer_filter,
362 digest: digest_filter,
363 responder: _,
364 } = &waiters[i];
365
366 // Keep the waiter if either filter does not match
367 if peer_filter.as_ref().is_some_and(|s| s != &peer)
368 || digest_filter.is_some_and(|d| d != pair.digest)
369 {
370 i += 1;
371 continue;
372 }
373
374 // Filters match, so fulfill the subscription and drop the entry.
375 //
376 // The index `i` is intentionally not incremented here to check
377 // the element that was swapped into position `i`.
378 let responder = waiters.swap_remove(i).responder;
379 self.respond_subscribe(responder, msg.clone());
380 }
381
382 // Re-insert if any waiters remain for this commitment
383 if !waiters.is_empty() {
384 self.waiters.insert(pair.commitment, waiters);
385 }
386 }
387
388 // Get the relevant deque for the peer
389 let deque = self
390 .deques
391 .entry(peer)
392 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
393
394 // If the message is already in the deque, move it to the front and return early
395 if let Some(i) = deque.iter().position(|d| *d == pair) {
396 if i != 0 {
397 let v = deque.remove(i).unwrap(); // Must exist
398 deque.push_front(v);
399 }
400 return false;
401 };
402
403 // - Insert the message into the peer cache
404 // - Increment the item count
405 // - Insert the message if-and-only-if the new item count is 1
406 deque.push_front(pair);
407 let count = self
408 .counts
409 .entry(pair.digest)
410 .and_modify(|c| *c = c.checked_add(1).unwrap())
411 .or_insert(1);
412 if *count == 1 {
413 let existing = self
414 .items
415 .entry(pair.commitment)
416 .or_default()
417 .insert(pair.digest, msg);
418 assert!(existing.is_none());
419 }
420
421 // If the cache is full...
422 if deque.len() > self.deque_size {
423 // Remove the oldest item from the peer cache
424 // Decrement the item count
425 // Remove the message if-and-only-if the new item count is 0
426 let stale = deque.pop_back().unwrap();
427 let count = self
428 .counts
429 .entry(stale.digest)
430 .and_modify(|c| *c = c.checked_sub(1).unwrap())
431 .or_insert_with(|| unreachable!());
432 if *count == 0 {
433 let existing = self.counts.remove(&stale.digest);
434 assert!(existing == Some(0));
435 let identities = self.items.get_mut(&stale.commitment).unwrap();
436 identities.remove(&stale.digest); // Must have existed
437 if identities.is_empty() {
438 self.items.remove(&stale.commitment);
439 }
440 }
441 }
442
443 true
444 }
445
446 ////////////////////////////////////////
447 // Utilities
448 ////////////////////////////////////////
449
450 /// Remove all waiters that have dropped receivers.
451 fn cleanup_waiters(&mut self) {
452 self.waiters.retain(|_, waiters| {
453 let initial_len = waiters.len();
454 waiters.retain(|waiter| !waiter.responder.is_canceled());
455 let dropped_count = initial_len - waiters.len();
456
457 // Increment metrics for each dropped waiter
458 for _ in 0..dropped_count {
459 self.metrics.get.inc(Status::Dropped);
460 }
461
462 !waiters.is_empty()
463 });
464 }
465
466 /// Respond to a waiter with a message.
467 /// Increments the appropriate metric based on the result.
468 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
469 let result = responder.send(msg);
470 self.metrics.subscribe.inc(match result {
471 Ok(_) => Status::Success,
472 Err(_) => Status::Dropped,
473 });
474 }
475
476 /// Respond to a waiter with an optional message.
477 /// Increments the appropriate metric based on the result.
478 fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
479 let found = !msg.is_empty();
480 let result = responder.send(msg);
481 self.metrics.get.inc(match result {
482 Ok(_) if found => Status::Success,
483 Ok(_) => Status::Failure,
484 Err(_) => Status::Dropped,
485 });
486 }
487}