Skip to main content

commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digestible, PublicKey};
5use commonware_macros::select_loop;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    spawn_cell,
12    telemetry::metrics::status::{CounterExt, GaugeExt, Status},
13    Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::channel::{fallible::OneshotExt, mpsc, oneshot};
16use std::collections::{BTreeMap, VecDeque};
17use tracing::{debug, error, trace, warn};
18
19/// A responder waiting for a message.
20struct Waiter<P, Dd, M> {
21    /// The peer sending the message.
22    peer: Option<P>,
23
24    /// The digest of the message.
25    digest: Option<Dd>,
26
27    /// The responder to send the message to.
28    responder: oneshot::Sender<M>,
29}
30
31/// A pair of commitment and digest.
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33struct Pair<Dc, Dd> {
34    /// The commitment of the message.
35    commitment: Dc,
36
37    /// The digest of the message.
38    digest: Dd,
39}
40
41/// Instance of the main engine for the module.
42///
43/// It is responsible for:
44/// - Broadcasting messages to the network
45/// - Receiving messages from the network
46/// - Storing messages in the cache
47/// - Responding to requests from the application
48pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec> {
49    ////////////////////////////////////////
50    // Interfaces
51    ////////////////////////////////////////
52    context: ContextCell<E>,
53
54    ////////////////////////////////////////
55    // Configuration
56    ////////////////////////////////////////
57    /// My public key
58    public_key: P,
59
60    /// Whether messages are sent as priority
61    priority: bool,
62
63    /// Number of messages to cache per peer
64    deque_size: usize,
65
66    /// Configuration for decoding messages
67    codec_config: M::Cfg,
68
69    ////////////////////////////////////////
70    // Messaging
71    ////////////////////////////////////////
72    /// The mailbox for receiving messages.
73    mailbox_receiver: mpsc::Receiver<Message<P, M>>,
74
75    /// Pending requests from the application.
76    #[allow(clippy::type_complexity)]
77    waiters: BTreeMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
78
79    ////////////////////////////////////////
80    // Cache
81    ////////////////////////////////////////
82    /// All cached messages by commitment and digest.
83    ///
84    /// We store messages outside of the deques to minimize memory usage
85    /// when receiving duplicate messages.
86    items: BTreeMap<M::Commitment, BTreeMap<M::Digest, M>>,
87
88    /// A LRU cache of the latest received identities and digests from each peer.
89    ///
90    /// This is used to limit the number of digests stored per peer.
91    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
92    /// membership checks are done in linear time.
93    #[allow(clippy::type_complexity)]
94    deques: BTreeMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
95
96    /// The number of times each digest (globally unique) exists in one of the deques.
97    ///
98    /// Multiple peers can send the same message and we only want to store
99    /// the message once.
100    counts: BTreeMap<M::Digest, usize>,
101
102    ////////////////////////////////////////
103    // Metrics
104    ////////////////////////////////////////
105    /// Metrics
106    metrics: metrics::Metrics,
107}
108
109impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec>
110    Engine<E, P, M>
111{
112    /// Creates a new engine with the given context and configuration.
113    /// Returns the engine and a mailbox for sending messages to the engine.
114    pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, M>) {
115        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
116        let mailbox = Mailbox::<P, M>::new(mailbox_sender);
117
118        // TODO(#1833): Metrics should use the post-start context
119        let metrics = metrics::Metrics::init(context.clone());
120
121        let result = Self {
122            context: ContextCell::new(context),
123            public_key: cfg.public_key,
124            priority: cfg.priority,
125            deque_size: cfg.deque_size,
126            codec_config: cfg.codec_config,
127            mailbox_receiver,
128            waiters: BTreeMap::new(),
129            deques: BTreeMap::new(),
130            items: BTreeMap::new(),
131            counts: BTreeMap::new(),
132            metrics,
133        };
134
135        (result, mailbox)
136    }
137
138    /// Starts the engine with the given network.
139    pub fn start(
140        mut self,
141        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
142    ) -> Handle<()> {
143        spawn_cell!(self.context, self.run(network).await)
144    }
145
146    /// Inner run loop called by `start`.
147    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
148        let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
149
150        select_loop! {
151            self.context,
152            on_start => {
153                // Cleanup waiters
154                self.cleanup_waiters();
155                let _ = self.metrics.waiters.try_set(self.waiters.len());
156            },
157            on_stopped => {
158                debug!("shutdown");
159            },
160            // Handle mailbox messages
161            Some(msg) = self.mailbox_receiver.recv() else {
162                error!("mailbox receiver failed");
163                break;
164            } => match msg {
165                Message::Broadcast {
166                    recipients,
167                    message,
168                    responder,
169                } => {
170                    trace!("mailbox: broadcast");
171                    self.handle_broadcast(&mut sender, recipients, message, responder)
172                        .await;
173                }
174                Message::Subscribe {
175                    peer,
176                    commitment,
177                    digest,
178                    responder,
179                } => {
180                    trace!("mailbox: subscribe");
181                    self.handle_subscribe(peer, commitment, digest, responder)
182                        .await;
183                }
184                Message::Get {
185                    peer,
186                    commitment,
187                    digest,
188                    responder,
189                } => {
190                    trace!("mailbox: get");
191                    self.handle_get(peer, commitment, digest, responder).await;
192                }
193            },
194            // Handle incoming messages
195            msg = receiver.recv() => {
196                // Error handling
197                let (peer, msg) = match msg {
198                    Ok(r) => r,
199                    Err(err) => {
200                        error!(?err, "receiver failed");
201                        break;
202                    }
203                };
204
205                // Decode the message
206                let msg = match msg {
207                    Ok(msg) => msg,
208                    Err(err) => {
209                        warn!(?err, ?peer, "failed to decode message");
210                        self.metrics.receive.inc(Status::Invalid);
211                        continue;
212                    }
213                };
214
215                trace!(?peer, "network");
216                self.metrics
217                    .peer
218                    .get_or_create(&SequencerLabel::from(&peer))
219                    .inc();
220                self.handle_network(peer, msg).await;
221            },
222        }
223    }
224
225    ////////////////////////////////////////
226    // Handling
227    ////////////////////////////////////////
228
229    /// Handles a `broadcast` request from the application.
230    async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
231        &mut self,
232        sender: &mut WrappedSender<Sr, M>,
233        recipients: Recipients<P>,
234        msg: M,
235        responder: oneshot::Sender<Vec<P>>,
236    ) {
237        // Store the message, continue even if it was already stored
238        let _ = self.insert_message(self.public_key.clone(), msg.clone());
239
240        // Broadcast the message to the network
241        let sent_to = sender
242            .send(recipients, msg, self.priority)
243            .await
244            .unwrap_or_else(|err| {
245                error!(?err, "failed to send message");
246                vec![]
247            });
248        responder.send_lossy(sent_to);
249    }
250
251    /// Searches through all maintained messages for a match.
252    fn find_messages(
253        &mut self,
254        peer: &Option<P>,
255        commitment: M::Commitment,
256        digest: Option<M::Digest>,
257        all: bool,
258    ) -> Vec<M> {
259        match peer {
260            // Only consider messages from the peer filter
261            Some(s) => self
262                .deques
263                .get(s)
264                .into_iter()
265                .flat_map(|dq| dq.iter())
266                .filter(|pair| pair.commitment == commitment)
267                .filter_map(|pair| {
268                    self.items
269                        .get(&pair.commitment)
270                        .and_then(|m| m.get(&pair.digest))
271                })
272                .cloned()
273                .collect(),
274
275            // Search by commitment
276            None => self.items.get(&commitment).map_or_else(
277                // If there are no messages for the commitment, return an empty vector
278                Vec::new,
279                |msgs| match digest {
280                    // If a digest is provided, return whatever matches it.
281                    Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
282
283                    // If no digest was provided, return `all` messages for the commitment.
284                    None if all => msgs.values().cloned().collect(),
285                    None => msgs.values().next().cloned().into_iter().collect(),
286                },
287            ),
288        }
289    }
290
291    /// Handles a `subscribe` request from the application.
292    ///
293    /// If the message is already in the cache, the responder is immediately sent the message.
294    /// Otherwise, the responder is stored in the waiters list.
295    async fn handle_subscribe(
296        &mut self,
297        peer: Option<P>,
298        commitment: M::Commitment,
299        digest: Option<M::Digest>,
300        responder: oneshot::Sender<M>,
301    ) {
302        // Check if the message is already in the cache
303        let mut items = self.find_messages(&peer, commitment, digest, false);
304        if let Some(item) = items.pop() {
305            self.respond_subscribe(responder, item);
306            return;
307        }
308
309        // Store the responder
310        self.waiters.entry(commitment).or_default().push(Waiter {
311            peer,
312            digest,
313            responder,
314        });
315    }
316
317    /// Handles a `get` request from the application.
318    async fn handle_get(
319        &mut self,
320        peer: Option<P>,
321        commitment: M::Commitment,
322        digest: Option<M::Digest>,
323        responder: oneshot::Sender<Vec<M>>,
324    ) {
325        let items = self.find_messages(&peer, commitment, digest, true);
326        self.respond_get(responder, items);
327    }
328
329    /// Handles a message that was received from a peer.
330    async fn handle_network(&mut self, peer: P, msg: M) {
331        if !self.insert_message(peer.clone(), msg) {
332            debug!(?peer, "message already stored");
333            self.metrics.receive.inc(Status::Dropped);
334            return;
335        }
336
337        self.metrics.receive.inc(Status::Success);
338    }
339
340    ////////////////////////////////////////
341    // Cache Management
342    ////////////////////////////////////////
343
344    /// Inserts a message into the cache.
345    ///
346    /// Returns `true` if the message was inserted, `false` if it was already present.
347    /// Updates the deque, item count, and message cache, potentially evicting an old message.
348    fn insert_message(&mut self, peer: P, msg: M) -> bool {
349        // Get the commitment and digest of the message
350        let pair = Pair {
351            commitment: msg.commitment(),
352            digest: msg.digest(),
353        };
354
355        // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
356        if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
357            let mut i = 0;
358            while i < waiters.len() {
359                // Get the peer and digest filters
360                let Waiter {
361                    peer: peer_filter,
362                    digest: digest_filter,
363                    responder: _,
364                } = &waiters[i];
365
366                // Keep the waiter if either filter does not match
367                if peer_filter.as_ref().is_some_and(|s| s != &peer)
368                    || digest_filter.is_some_and(|d| d != pair.digest)
369                {
370                    i += 1;
371                    continue;
372                }
373
374                // Filters match, so fulfill the subscription and drop the entry.
375                //
376                // The index `i` is intentionally not incremented here to check
377                // the element that was swapped into position `i`.
378                let responder = waiters.swap_remove(i).responder;
379                self.respond_subscribe(responder, msg.clone());
380            }
381
382            // Re-insert if any waiters remain for this commitment
383            if !waiters.is_empty() {
384                self.waiters.insert(pair.commitment, waiters);
385            }
386        }
387
388        // Get the relevant deque for the peer
389        let deque = self
390            .deques
391            .entry(peer)
392            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
393
394        // If the message is already in the deque, move it to the front and return early
395        if let Some(i) = deque.iter().position(|d| *d == pair) {
396            if i != 0 {
397                let v = deque.remove(i).unwrap(); // Must exist
398                deque.push_front(v);
399            }
400            return false;
401        };
402
403        // - Insert the message into the peer cache
404        // - Increment the item count
405        // - Insert the message if-and-only-if the new item count is 1
406        deque.push_front(pair);
407        let count = self
408            .counts
409            .entry(pair.digest)
410            .and_modify(|c| *c = c.checked_add(1).unwrap())
411            .or_insert(1);
412        if *count == 1 {
413            let existing = self
414                .items
415                .entry(pair.commitment)
416                .or_default()
417                .insert(pair.digest, msg);
418            assert!(existing.is_none());
419        }
420
421        // If the cache is full...
422        if deque.len() > self.deque_size {
423            // Remove the oldest item from the peer cache
424            // Decrement the item count
425            // Remove the message if-and-only-if the new item count is 0
426            let stale = deque.pop_back().unwrap();
427            let count = self
428                .counts
429                .entry(stale.digest)
430                .and_modify(|c| *c = c.checked_sub(1).unwrap())
431                .or_insert_with(|| unreachable!());
432            if *count == 0 {
433                let existing = self.counts.remove(&stale.digest);
434                assert!(existing == Some(0));
435                let identities = self.items.get_mut(&stale.commitment).unwrap();
436                identities.remove(&stale.digest); // Must have existed
437                if identities.is_empty() {
438                    self.items.remove(&stale.commitment);
439                }
440            }
441        }
442
443        true
444    }
445
446    ////////////////////////////////////////
447    // Utilities
448    ////////////////////////////////////////
449
450    /// Remove all waiters that have dropped receivers.
451    fn cleanup_waiters(&mut self) {
452        self.waiters.retain(|_, waiters| {
453            let initial_len = waiters.len();
454            waiters.retain(|waiter| !waiter.responder.is_closed());
455            let dropped_count = initial_len - waiters.len();
456
457            // Increment metrics for each dropped waiter
458            for _ in 0..dropped_count {
459                self.metrics.get.inc(Status::Dropped);
460            }
461
462            !waiters.is_empty()
463        });
464    }
465
466    /// Respond to a waiter with a message.
467    /// Increments the appropriate metric based on the result.
468    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
469        self.metrics.subscribe.inc(if responder.send_lossy(msg) {
470            Status::Success
471        } else {
472            Status::Dropped
473        });
474    }
475
476    /// Respond to a waiter with an optional message.
477    /// Increments the appropriate metric based on the result.
478    fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
479        let found = !msg.is_empty();
480        self.metrics.get.inc(if responder.send_lossy(msg) {
481            if found {
482                Status::Success
483            } else {
484                Status::Failure
485            }
486        } else {
487            Status::Dropped
488        });
489    }
490}