commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digest, Digestible};
5use commonware_macros::select;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    telemetry::metrics::status::{CounterExt, Status},
12    Clock, Handle, Metrics, Spawner,
13};
14use commonware_utils::Array;
15use futures::{
16    channel::{mpsc, oneshot},
17    StreamExt,
18};
19use std::collections::{HashMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22/// A responder waiting for a message.
23struct Waiter<P, Dd, M> {
24    /// The peer sending the message.
25    peer: Option<P>,
26
27    /// The digest of the message.
28    digest: Option<Dd>,
29
30    /// The responder to send the message to.
31    responder: oneshot::Sender<M>,
32}
33
34/// A pair of commitment and digest.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36struct Pair<Dc, Dd> {
37    /// The commitment of the message.
38    commitment: Dc,
39
40    /// The digest of the message.
41    digest: Dd,
42}
43
44/// Instance of the main engine for the module.
45///
46/// It is responsible for:
47/// - Broadcasting messages to the network
48/// - Receiving messages from the network
49/// - Storing messages in the cache
50/// - Responding to requests from the application
51pub struct Engine<
52    E: Clock + Spawner + Metrics,
53    P: Array,
54    Dc: Digest,
55    Dd: Digest,
56    M: Committable<Dc> + Digestible<Dd> + Codec,
57> {
58    ////////////////////////////////////////
59    // Interfaces
60    ////////////////////////////////////////
61    context: E,
62
63    ////////////////////////////////////////
64    // Configuration
65    ////////////////////////////////////////
66    /// My public key
67    public_key: P,
68
69    /// Whether messages are sent as priority
70    priority: bool,
71
72    /// Number of messages to cache per peer
73    deque_size: usize,
74
75    /// Configuration for decoding messages
76    codec_config: M::Cfg,
77
78    ////////////////////////////////////////
79    // Messaging
80    ////////////////////////////////////////
81    /// The mailbox for receiving messages.
82    mailbox_receiver: mpsc::Receiver<Message<P, Dc, Dd, M>>,
83
84    /// Pending requests from the application.
85    waiters: HashMap<Dc, Vec<Waiter<P, Dd, M>>>,
86
87    ////////////////////////////////////////
88    // Cache
89    ////////////////////////////////////////
90    /// All cached messages by commitment and digest.
91    ///
92    /// We store messages outside of the deques to minimize memory usage
93    /// when receiving duplicate messages.
94    items: HashMap<Dc, HashMap<Dd, M>>,
95
96    /// A LRU cache of the latest received identities and digests from each peer.
97    ///
98    /// This is used to limit the number of digests stored per peer.
99    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
100    /// membership checks are done in linear time.
101    deques: HashMap<P, VecDeque<Pair<Dc, Dd>>>,
102
103    /// The number of times each digest (globally unique) exists in one of the deques.
104    ///
105    /// Multiple peers can send the same message and we only want to store
106    /// the message once.
107    counts: HashMap<Dd, usize>,
108
109    ////////////////////////////////////////
110    // Metrics
111    ////////////////////////////////////////
112    /// Metrics
113    metrics: metrics::Metrics,
114}
115
116impl<
117        E: Clock + Spawner + Metrics,
118        P: Array,
119        Dc: Digest,
120        Dd: Digest,
121        M: Committable<Dc> + Digestible<Dd> + Codec,
122    > Engine<E, P, Dc, Dd, M>
123{
124    /// Creates a new engine with the given context and configuration.
125    /// Returns the engine and a mailbox for sending messages to the engine.
126    pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, Dc, Dd, M>) {
127        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
128        let mailbox = Mailbox::<P, Dc, Dd, M>::new(mailbox_sender);
129        let metrics = metrics::Metrics::init(context.clone());
130
131        let result = Self {
132            context,
133            public_key: cfg.public_key,
134            priority: cfg.priority,
135            deque_size: cfg.deque_size,
136            codec_config: cfg.codec_config,
137            mailbox_receiver,
138            waiters: HashMap::new(),
139            deques: HashMap::new(),
140            items: HashMap::new(),
141            counts: HashMap::new(),
142            metrics,
143        };
144
145        (result, mailbox)
146    }
147
148    /// Starts the engine with the given network.
149    pub fn start(
150        mut self,
151        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
152    ) -> Handle<()> {
153        self.context.spawn_ref()(self.run(network))
154    }
155
156    /// Inner run loop called by `start`.
157    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
158        let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
159        let mut shutdown = self.context.stopped();
160
161        loop {
162            // Cleanup waiters
163            self.cleanup_waiters();
164            self.metrics.waiters.set(self.waiters.len() as i64);
165
166            select! {
167                // Handle shutdown signal
168                _ = &mut shutdown => {
169                    debug!("shutdown");
170                },
171
172                // Handle mailbox messages
173                mail = self.mailbox_receiver.next() => {
174                    let Some(msg) = mail else {
175                        error!("mailbox receiver failed");
176                        break;
177                    };
178                    match msg {
179                        Message::Broadcast{ recipients, message, responder } => {
180                            trace!("mailbox: broadcast");
181                            self.handle_broadcast(&mut sender, recipients, message, responder).await;
182                        }
183                        Message::Subscribe{ peer, commitment, digest, responder } => {
184                            trace!("mailbox: subscribe");
185                            self.handle_subscribe(peer, commitment, digest, responder).await;
186                        }
187                        Message::Get{ peer, commitment, digest, responder } => {
188                            trace!("mailbox: get");
189                            self.handle_get(peer, commitment, digest, responder).await;
190                        }
191                    }
192                },
193
194                // Handle incoming messages
195                msg = receiver.recv() => {
196                    // Error handling
197                    let (peer, msg) = match msg {
198                        Ok(r) => r,
199                        Err(err) => {
200                            error!(?err, "receiver failed");
201                            break;
202                        }
203                    };
204
205                    // Decode the message
206                    let msg = match msg {
207                        Ok(msg) => msg,
208                        Err(err) => {
209                            warn!(?err, ?peer, "failed to decode message");
210                            self.metrics.receive.inc(Status::Invalid);
211                            continue;
212                        }
213                    };
214
215                    trace!(?peer, "network");
216                    self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
217                    self.handle_network(peer, msg).await;
218                },
219            }
220        }
221    }
222
223    ////////////////////////////////////////
224    // Handling
225    ////////////////////////////////////////
226
227    /// Handles a `broadcast` request from the application.
228    async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
229        &mut self,
230        sender: &mut WrappedSender<Sr, M>,
231        recipients: Recipients<P>,
232        msg: M,
233        responder: oneshot::Sender<Vec<P>>,
234    ) {
235        // Store the message, continue even if it was already stored
236        let _ = self.insert_message(self.public_key.clone(), msg.clone());
237
238        // Broadcast the message to the network
239        let sent_to = sender
240            .send(recipients, msg, self.priority)
241            .await
242            .unwrap_or_else(|err| {
243                error!(?err, "failed to send message");
244                vec![]
245            });
246        let _ = responder.send(sent_to);
247    }
248
249    /// Searches through all maintained messages for a match.
250    fn find_messages(
251        &mut self,
252        peer: &Option<P>,
253        commitment: Dc,
254        digest: Option<Dd>,
255        all: bool,
256    ) -> Vec<M> {
257        match peer {
258            // Only consider messages from the peer filter
259            Some(s) => self
260                .deques
261                .get(s)
262                .into_iter()
263                .flat_map(|dq| dq.iter())
264                .filter(|pair| pair.commitment == commitment)
265                .filter_map(|pair| {
266                    self.items
267                        .get(&pair.commitment)
268                        .and_then(|m| m.get(&pair.digest))
269                })
270                .cloned()
271                .collect(),
272
273            // Search by commitment
274            None => match self.items.get(&commitment) {
275                // If there are no messages for the commitment, return an empty vector
276                None => Vec::new(),
277
278                // If there are messages, return the ones that match the digest filter
279                Some(msgs) => match digest {
280                    // If a digest is provided, return whatever matches it.
281                    Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
282
283                    // If no digest was provided, return `all` messages for the commitment.
284                    None if all => msgs.values().cloned().collect(),
285                    None => msgs.values().next().cloned().into_iter().collect(),
286                },
287            },
288        }
289    }
290
291    /// Handles a `subscribe` request from the application.
292    ///
293    /// If the message is already in the cache, the responder is immediately sent the message.
294    /// Otherwise, the responder is stored in the waiters list.
295    async fn handle_subscribe(
296        &mut self,
297        peer: Option<P>,
298        commitment: Dc,
299        digest: Option<Dd>,
300        responder: oneshot::Sender<M>,
301    ) {
302        // Check if the message is already in the cache
303        let mut items = self.find_messages(&peer, commitment, digest, false);
304        if let Some(item) = items.pop() {
305            self.respond_subscribe(responder, item);
306            return;
307        }
308
309        // Store the responder
310        self.waiters.entry(commitment).or_default().push(Waiter {
311            peer,
312            digest,
313            responder,
314        });
315    }
316
317    /// Handles a `get` request from the application.
318    async fn handle_get(
319        &mut self,
320        peer: Option<P>,
321        commitment: Dc,
322        digest: Option<Dd>,
323        responder: oneshot::Sender<Vec<M>>,
324    ) {
325        let items = self.find_messages(&peer, commitment, digest, true);
326        self.respond_get(responder, items);
327    }
328
329    /// Handles a message that was received from a peer.
330    async fn handle_network(&mut self, peer: P, msg: M) {
331        if !self.insert_message(peer.clone(), msg) {
332            debug!(?peer, "message already stored");
333            self.metrics.receive.inc(Status::Dropped);
334            return;
335        }
336
337        self.metrics.receive.inc(Status::Success);
338    }
339
340    ////////////////////////////////////////
341    // Cache Management
342    ////////////////////////////////////////
343
344    /// Inserts a message into the cache.
345    ///
346    /// Returns `true` if the message was inserted, `false` if it was already present.
347    /// Updates the deque, item count, and message cache, potentially evicting an old message.
348    fn insert_message(&mut self, peer: P, msg: M) -> bool {
349        // Get the commitment and digest of the message
350        let pair = Pair {
351            commitment: msg.commitment(),
352            digest: msg.digest(),
353        };
354
355        // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
356        if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
357            let mut i = 0;
358            while i < waiters.len() {
359                // Get the peer and digest filters
360                let Waiter {
361                    peer: peer_filter,
362                    digest: digest_filter,
363                    responder: _,
364                } = &waiters[i];
365
366                // Keep the waiter if either filter does not match
367                if peer_filter.as_ref().is_some_and(|s| s != &peer)
368                    || digest_filter.is_some_and(|d| d != pair.digest)
369                {
370                    i += 1;
371                    continue;
372                }
373
374                // Filters match, so fulfill the subscription and drop the entry.
375                //
376                // The index `i` is intentionally not incremented here to check
377                // the element that was swapped into position `i`.
378                let responder = waiters.swap_remove(i).responder;
379                self.respond_subscribe(responder, msg.clone());
380            }
381
382            // Re-insert if any waiters remain for this commitment
383            if !waiters.is_empty() {
384                self.waiters.insert(pair.commitment, waiters);
385            }
386        }
387
388        // Get the relevant deque for the peer
389        let deque = self
390            .deques
391            .entry(peer)
392            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
393
394        // If the message is already in the deque, move it to the front and return early
395        if let Some(i) = deque.iter().position(|d| *d == pair) {
396            if i != 0 {
397                let v = deque.remove(i).unwrap(); // Must exist
398                deque.push_front(v);
399            }
400            return false;
401        };
402
403        // - Insert the message into the peer cache
404        // - Increment the item count
405        // - Insert the message if-and-only-if the new item count is 1
406        deque.push_front(pair);
407        let count = self
408            .counts
409            .entry(pair.digest)
410            .and_modify(|c| *c = c.checked_add(1).unwrap())
411            .or_insert(1);
412        if *count == 1 {
413            let existing = self
414                .items
415                .entry(pair.commitment)
416                .or_default()
417                .insert(pair.digest, msg);
418            assert!(existing.is_none());
419        }
420
421        // If the cache is full...
422        if deque.len() > self.deque_size {
423            // Remove the oldest item from the peer cache
424            // Decrement the item count
425            // Remove the message if-and-only-if the new item count is 0
426            let stale = deque.pop_back().unwrap();
427            let count = self
428                .counts
429                .entry(stale.digest)
430                .and_modify(|c| *c = c.checked_sub(1).unwrap())
431                .or_insert_with(|| unreachable!());
432            if *count == 0 {
433                let existing = self.counts.remove(&stale.digest);
434                assert!(existing == Some(0));
435                let identities = self.items.get_mut(&stale.commitment).unwrap();
436                identities.remove(&stale.digest); // Must have existed
437                if identities.is_empty() {
438                    self.items.remove(&stale.commitment);
439                }
440            }
441        }
442
443        true
444    }
445
446    ////////////////////////////////////////
447    // Utilities
448    ////////////////////////////////////////
449
450    /// Remove all waiters that have dropped receivers.
451    fn cleanup_waiters(&mut self) {
452        self.waiters.retain(|_, waiters| {
453            let initial_len = waiters.len();
454            waiters.retain(|waiter| !waiter.responder.is_canceled());
455            let dropped_count = initial_len - waiters.len();
456
457            // Increment metrics for each dropped waiter
458            for _ in 0..dropped_count {
459                self.metrics.get.inc(Status::Dropped);
460            }
461
462            !waiters.is_empty()
463        });
464    }
465
466    /// Respond to a waiter with a message.
467    /// Increments the appropriate metric based on the result.
468    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
469        let result = responder.send(msg);
470        self.metrics.subscribe.inc(match result {
471            Ok(_) => Status::Success,
472            Err(_) => Status::Dropped,
473        });
474    }
475
476    /// Respond to a waiter with an optional message.
477    /// Increments the appropriate metric based on the result.
478    fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
479        let found = !msg.is_empty();
480        let result = responder.send(msg);
481        self.metrics.get.inc(match result {
482            Ok(_) if found => Status::Success,
483            Ok(_) => Status::Failure,
484            Err(_) => Status::Dropped,
485        });
486    }
487}