commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digestible, PublicKey};
5use commonware_macros::select;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    spawn_cell,
12    telemetry::metrics::status::{CounterExt, GaugeExt, Status},
13    Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::channels::fallible::OneshotExt;
16use futures::{
17    channel::{mpsc, oneshot},
18    StreamExt,
19};
20use std::collections::{BTreeMap, VecDeque};
21use tracing::{debug, error, trace, warn};
22
23/// A responder waiting for a message.
24struct Waiter<P, Dd, M> {
25    /// The peer sending the message.
26    peer: Option<P>,
27
28    /// The digest of the message.
29    digest: Option<Dd>,
30
31    /// The responder to send the message to.
32    responder: oneshot::Sender<M>,
33}
34
35/// A pair of commitment and digest.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37struct Pair<Dc, Dd> {
38    /// The commitment of the message.
39    commitment: Dc,
40
41    /// The digest of the message.
42    digest: Dd,
43}
44
45/// Instance of the main engine for the module.
46///
47/// It is responsible for:
48/// - Broadcasting messages to the network
49/// - Receiving messages from the network
50/// - Storing messages in the cache
51/// - Responding to requests from the application
52pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec> {
53    ////////////////////////////////////////
54    // Interfaces
55    ////////////////////////////////////////
56    context: ContextCell<E>,
57
58    ////////////////////////////////////////
59    // Configuration
60    ////////////////////////////////////////
61    /// My public key
62    public_key: P,
63
64    /// Whether messages are sent as priority
65    priority: bool,
66
67    /// Number of messages to cache per peer
68    deque_size: usize,
69
70    /// Configuration for decoding messages
71    codec_config: M::Cfg,
72
73    ////////////////////////////////////////
74    // Messaging
75    ////////////////////////////////////////
76    /// The mailbox for receiving messages.
77    mailbox_receiver: mpsc::Receiver<Message<P, M>>,
78
79    /// Pending requests from the application.
80    #[allow(clippy::type_complexity)]
81    waiters: BTreeMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
82
83    ////////////////////////////////////////
84    // Cache
85    ////////////////////////////////////////
86    /// All cached messages by commitment and digest.
87    ///
88    /// We store messages outside of the deques to minimize memory usage
89    /// when receiving duplicate messages.
90    items: BTreeMap<M::Commitment, BTreeMap<M::Digest, M>>,
91
92    /// A LRU cache of the latest received identities and digests from each peer.
93    ///
94    /// This is used to limit the number of digests stored per peer.
95    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
96    /// membership checks are done in linear time.
97    #[allow(clippy::type_complexity)]
98    deques: BTreeMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
99
100    /// The number of times each digest (globally unique) exists in one of the deques.
101    ///
102    /// Multiple peers can send the same message and we only want to store
103    /// the message once.
104    counts: BTreeMap<M::Digest, usize>,
105
106    ////////////////////////////////////////
107    // Metrics
108    ////////////////////////////////////////
109    /// Metrics
110    metrics: metrics::Metrics,
111}
112
113impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec>
114    Engine<E, P, M>
115{
116    /// Creates a new engine with the given context and configuration.
117    /// Returns the engine and a mailbox for sending messages to the engine.
118    pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, M>) {
119        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
120        let mailbox = Mailbox::<P, M>::new(mailbox_sender);
121
122        // TODO(#1833): Metrics should use the post-start context
123        let metrics = metrics::Metrics::init(context.clone());
124
125        let result = Self {
126            context: ContextCell::new(context),
127            public_key: cfg.public_key,
128            priority: cfg.priority,
129            deque_size: cfg.deque_size,
130            codec_config: cfg.codec_config,
131            mailbox_receiver,
132            waiters: BTreeMap::new(),
133            deques: BTreeMap::new(),
134            items: BTreeMap::new(),
135            counts: BTreeMap::new(),
136            metrics,
137        };
138
139        (result, mailbox)
140    }
141
142    /// Starts the engine with the given network.
143    pub fn start(
144        mut self,
145        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
146    ) -> Handle<()> {
147        spawn_cell!(self.context, self.run(network).await)
148    }
149
150    /// Inner run loop called by `start`.
151    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
152        let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
153        let mut shutdown = self.context.stopped();
154
155        loop {
156            // Cleanup waiters
157            self.cleanup_waiters();
158            let _ = self.metrics.waiters.try_set(self.waiters.len());
159
160            select! {
161                // Handle shutdown signal
162                _ = &mut shutdown => {
163                    debug!("shutdown");
164                    break;
165                },
166
167                // Handle mailbox messages
168                mail = self.mailbox_receiver.next() => {
169                    let Some(msg) = mail else {
170                        error!("mailbox receiver failed");
171                        break;
172                    };
173                    match msg {
174                        Message::Broadcast{ recipients, message, responder } => {
175                            trace!("mailbox: broadcast");
176                            self.handle_broadcast(&mut sender, recipients, message, responder).await;
177                        }
178                        Message::Subscribe{ peer, commitment, digest, responder } => {
179                            trace!("mailbox: subscribe");
180                            self.handle_subscribe(peer, commitment, digest, responder).await;
181                        }
182                        Message::Get{ peer, commitment, digest, responder } => {
183                            trace!("mailbox: get");
184                            self.handle_get(peer, commitment, digest, responder).await;
185                        }
186                    }
187                },
188
189                // Handle incoming messages
190                msg = receiver.recv() => {
191                    // Error handling
192                    let (peer, msg) = match msg {
193                        Ok(r) => r,
194                        Err(err) => {
195                            error!(?err, "receiver failed");
196                            break;
197                        }
198                    };
199
200                    // Decode the message
201                    let msg = match msg {
202                        Ok(msg) => msg,
203                        Err(err) => {
204                            warn!(?err, ?peer, "failed to decode message");
205                            self.metrics.receive.inc(Status::Invalid);
206                            continue;
207                        }
208                    };
209
210                    trace!(?peer, "network");
211                    self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
212                    self.handle_network(peer, msg).await;
213                },
214            }
215        }
216    }
217
218    ////////////////////////////////////////
219    // Handling
220    ////////////////////////////////////////
221
222    /// Handles a `broadcast` request from the application.
223    async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
224        &mut self,
225        sender: &mut WrappedSender<Sr, M>,
226        recipients: Recipients<P>,
227        msg: M,
228        responder: oneshot::Sender<Vec<P>>,
229    ) {
230        // Store the message, continue even if it was already stored
231        let _ = self.insert_message(self.public_key.clone(), msg.clone());
232
233        // Broadcast the message to the network
234        let sent_to = sender
235            .send(recipients, msg, self.priority)
236            .await
237            .unwrap_or_else(|err| {
238                error!(?err, "failed to send message");
239                vec![]
240            });
241        responder.send_lossy(sent_to);
242    }
243
244    /// Searches through all maintained messages for a match.
245    fn find_messages(
246        &mut self,
247        peer: &Option<P>,
248        commitment: M::Commitment,
249        digest: Option<M::Digest>,
250        all: bool,
251    ) -> Vec<M> {
252        match peer {
253            // Only consider messages from the peer filter
254            Some(s) => self
255                .deques
256                .get(s)
257                .into_iter()
258                .flat_map(|dq| dq.iter())
259                .filter(|pair| pair.commitment == commitment)
260                .filter_map(|pair| {
261                    self.items
262                        .get(&pair.commitment)
263                        .and_then(|m| m.get(&pair.digest))
264                })
265                .cloned()
266                .collect(),
267
268            // Search by commitment
269            None => self.items.get(&commitment).map_or_else(
270                // If there are no messages for the commitment, return an empty vector
271                Vec::new,
272                |msgs| match digest {
273                    // If a digest is provided, return whatever matches it.
274                    Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
275
276                    // If no digest was provided, return `all` messages for the commitment.
277                    None if all => msgs.values().cloned().collect(),
278                    None => msgs.values().next().cloned().into_iter().collect(),
279                },
280            ),
281        }
282    }
283
284    /// Handles a `subscribe` request from the application.
285    ///
286    /// If the message is already in the cache, the responder is immediately sent the message.
287    /// Otherwise, the responder is stored in the waiters list.
288    async fn handle_subscribe(
289        &mut self,
290        peer: Option<P>,
291        commitment: M::Commitment,
292        digest: Option<M::Digest>,
293        responder: oneshot::Sender<M>,
294    ) {
295        // Check if the message is already in the cache
296        let mut items = self.find_messages(&peer, commitment, digest, false);
297        if let Some(item) = items.pop() {
298            self.respond_subscribe(responder, item);
299            return;
300        }
301
302        // Store the responder
303        self.waiters.entry(commitment).or_default().push(Waiter {
304            peer,
305            digest,
306            responder,
307        });
308    }
309
310    /// Handles a `get` request from the application.
311    async fn handle_get(
312        &mut self,
313        peer: Option<P>,
314        commitment: M::Commitment,
315        digest: Option<M::Digest>,
316        responder: oneshot::Sender<Vec<M>>,
317    ) {
318        let items = self.find_messages(&peer, commitment, digest, true);
319        self.respond_get(responder, items);
320    }
321
322    /// Handles a message that was received from a peer.
323    async fn handle_network(&mut self, peer: P, msg: M) {
324        if !self.insert_message(peer.clone(), msg) {
325            debug!(?peer, "message already stored");
326            self.metrics.receive.inc(Status::Dropped);
327            return;
328        }
329
330        self.metrics.receive.inc(Status::Success);
331    }
332
333    ////////////////////////////////////////
334    // Cache Management
335    ////////////////////////////////////////
336
337    /// Inserts a message into the cache.
338    ///
339    /// Returns `true` if the message was inserted, `false` if it was already present.
340    /// Updates the deque, item count, and message cache, potentially evicting an old message.
341    fn insert_message(&mut self, peer: P, msg: M) -> bool {
342        // Get the commitment and digest of the message
343        let pair = Pair {
344            commitment: msg.commitment(),
345            digest: msg.digest(),
346        };
347
348        // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
349        if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
350            let mut i = 0;
351            while i < waiters.len() {
352                // Get the peer and digest filters
353                let Waiter {
354                    peer: peer_filter,
355                    digest: digest_filter,
356                    responder: _,
357                } = &waiters[i];
358
359                // Keep the waiter if either filter does not match
360                if peer_filter.as_ref().is_some_and(|s| s != &peer)
361                    || digest_filter.is_some_and(|d| d != pair.digest)
362                {
363                    i += 1;
364                    continue;
365                }
366
367                // Filters match, so fulfill the subscription and drop the entry.
368                //
369                // The index `i` is intentionally not incremented here to check
370                // the element that was swapped into position `i`.
371                let responder = waiters.swap_remove(i).responder;
372                self.respond_subscribe(responder, msg.clone());
373            }
374
375            // Re-insert if any waiters remain for this commitment
376            if !waiters.is_empty() {
377                self.waiters.insert(pair.commitment, waiters);
378            }
379        }
380
381        // Get the relevant deque for the peer
382        let deque = self
383            .deques
384            .entry(peer)
385            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
386
387        // If the message is already in the deque, move it to the front and return early
388        if let Some(i) = deque.iter().position(|d| *d == pair) {
389            if i != 0 {
390                let v = deque.remove(i).unwrap(); // Must exist
391                deque.push_front(v);
392            }
393            return false;
394        };
395
396        // - Insert the message into the peer cache
397        // - Increment the item count
398        // - Insert the message if-and-only-if the new item count is 1
399        deque.push_front(pair);
400        let count = self
401            .counts
402            .entry(pair.digest)
403            .and_modify(|c| *c = c.checked_add(1).unwrap())
404            .or_insert(1);
405        if *count == 1 {
406            let existing = self
407                .items
408                .entry(pair.commitment)
409                .or_default()
410                .insert(pair.digest, msg);
411            assert!(existing.is_none());
412        }
413
414        // If the cache is full...
415        if deque.len() > self.deque_size {
416            // Remove the oldest item from the peer cache
417            // Decrement the item count
418            // Remove the message if-and-only-if the new item count is 0
419            let stale = deque.pop_back().unwrap();
420            let count = self
421                .counts
422                .entry(stale.digest)
423                .and_modify(|c| *c = c.checked_sub(1).unwrap())
424                .or_insert_with(|| unreachable!());
425            if *count == 0 {
426                let existing = self.counts.remove(&stale.digest);
427                assert!(existing == Some(0));
428                let identities = self.items.get_mut(&stale.commitment).unwrap();
429                identities.remove(&stale.digest); // Must have existed
430                if identities.is_empty() {
431                    self.items.remove(&stale.commitment);
432                }
433            }
434        }
435
436        true
437    }
438
439    ////////////////////////////////////////
440    // Utilities
441    ////////////////////////////////////////
442
443    /// Remove all waiters that have dropped receivers.
444    fn cleanup_waiters(&mut self) {
445        self.waiters.retain(|_, waiters| {
446            let initial_len = waiters.len();
447            waiters.retain(|waiter| !waiter.responder.is_canceled());
448            let dropped_count = initial_len - waiters.len();
449
450            // Increment metrics for each dropped waiter
451            for _ in 0..dropped_count {
452                self.metrics.get.inc(Status::Dropped);
453            }
454
455            !waiters.is_empty()
456        });
457    }
458
459    /// Respond to a waiter with a message.
460    /// Increments the appropriate metric based on the result.
461    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
462        self.metrics.subscribe.inc(if responder.send_lossy(msg) {
463            Status::Success
464        } else {
465            Status::Dropped
466        });
467    }
468
469    /// Respond to a waiter with an optional message.
470    /// Increments the appropriate metric based on the result.
471    fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
472        let found = !msg.is_empty();
473        self.metrics.get.inc(if responder.send_lossy(msg) {
474            if found {
475                Status::Success
476            } else {
477                Status::Failure
478            }
479        } else {
480            Status::Dropped
481        });
482    }
483}