commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digestible};
5use commonware_macros::select;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    telemetry::metrics::status::{CounterExt, Status},
12    Clock, Handle, Metrics, Spawner,
13};
14use commonware_utils::Array;
15use futures::{
16    channel::{mpsc, oneshot},
17    StreamExt,
18};
19use std::collections::{HashMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22/// A responder waiting for a message.
23struct Waiter<P, Dd, M> {
24    /// The peer sending the message.
25    peer: Option<P>,
26
27    /// The digest of the message.
28    digest: Option<Dd>,
29
30    /// The responder to send the message to.
31    responder: oneshot::Sender<M>,
32}
33
34/// A pair of commitment and digest.
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36struct Pair<Dc, Dd> {
37    /// The commitment of the message.
38    commitment: Dc,
39
40    /// The digest of the message.
41    digest: Dd,
42}
43
44/// Instance of the main engine for the module.
45///
46/// It is responsible for:
47/// - Broadcasting messages to the network
48/// - Receiving messages from the network
49/// - Storing messages in the cache
50/// - Responding to requests from the application
51pub struct Engine<E: Clock + Spawner + Metrics, P: Array, M: Committable + Digestible + Codec> {
52    ////////////////////////////////////////
53    // Interfaces
54    ////////////////////////////////////////
55    context: E,
56
57    ////////////////////////////////////////
58    // Configuration
59    ////////////////////////////////////////
60    /// My public key
61    public_key: P,
62
63    /// Whether messages are sent as priority
64    priority: bool,
65
66    /// Number of messages to cache per peer
67    deque_size: usize,
68
69    /// Configuration for decoding messages
70    codec_config: M::Cfg,
71
72    ////////////////////////////////////////
73    // Messaging
74    ////////////////////////////////////////
75    /// The mailbox for receiving messages.
76    mailbox_receiver: mpsc::Receiver<Message<P, M>>,
77
78    /// Pending requests from the application.
79    #[allow(clippy::type_complexity)]
80    waiters: HashMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
81
82    ////////////////////////////////////////
83    // Cache
84    ////////////////////////////////////////
85    /// All cached messages by commitment and digest.
86    ///
87    /// We store messages outside of the deques to minimize memory usage
88    /// when receiving duplicate messages.
89    items: HashMap<M::Commitment, HashMap<M::Digest, M>>,
90
91    /// A LRU cache of the latest received identities and digests from each peer.
92    ///
93    /// This is used to limit the number of digests stored per peer.
94    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
95    /// membership checks are done in linear time.
96    #[allow(clippy::type_complexity)]
97    deques: HashMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
98
99    /// The number of times each digest (globally unique) exists in one of the deques.
100    ///
101    /// Multiple peers can send the same message and we only want to store
102    /// the message once.
103    counts: HashMap<M::Digest, usize>,
104
105    ////////////////////////////////////////
106    // Metrics
107    ////////////////////////////////////////
108    /// Metrics
109    metrics: metrics::Metrics,
110}
111
112impl<E: Clock + Spawner + Metrics, P: Array, M: Committable + Digestible + Codec> Engine<E, P, M> {
113    /// Creates a new engine with the given context and configuration.
114    /// Returns the engine and a mailbox for sending messages to the engine.
115    pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, M>) {
116        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
117        let mailbox = Mailbox::<P, M>::new(mailbox_sender);
118        let metrics = metrics::Metrics::init(context.clone());
119
120        let result = Self {
121            context,
122            public_key: cfg.public_key,
123            priority: cfg.priority,
124            deque_size: cfg.deque_size,
125            codec_config: cfg.codec_config,
126            mailbox_receiver,
127            waiters: HashMap::new(),
128            deques: HashMap::new(),
129            items: HashMap::new(),
130            counts: HashMap::new(),
131            metrics,
132        };
133
134        (result, mailbox)
135    }
136
137    /// Starts the engine with the given network.
138    pub fn start(
139        mut self,
140        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
141    ) -> Handle<()> {
142        self.context.spawn_ref()(self.run(network))
143    }
144
145    /// Inner run loop called by `start`.
146    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
147        let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
148        let mut shutdown = self.context.stopped();
149
150        loop {
151            // Cleanup waiters
152            self.cleanup_waiters();
153            self.metrics.waiters.set(self.waiters.len() as i64);
154
155            select! {
156                // Handle shutdown signal
157                _ = &mut shutdown => {
158                    debug!("shutdown");
159                },
160
161                // Handle mailbox messages
162                mail = self.mailbox_receiver.next() => {
163                    let Some(msg) = mail else {
164                        error!("mailbox receiver failed");
165                        break;
166                    };
167                    match msg {
168                        Message::Broadcast{ recipients, message, responder } => {
169                            trace!("mailbox: broadcast");
170                            self.handle_broadcast(&mut sender, recipients, message, responder).await;
171                        }
172                        Message::Subscribe{ peer, commitment, digest, responder } => {
173                            trace!("mailbox: subscribe");
174                            self.handle_subscribe(peer, commitment, digest, responder).await;
175                        }
176                        Message::Get{ peer, commitment, digest, responder } => {
177                            trace!("mailbox: get");
178                            self.handle_get(peer, commitment, digest, responder).await;
179                        }
180                    }
181                },
182
183                // Handle incoming messages
184                msg = receiver.recv() => {
185                    // Error handling
186                    let (peer, msg) = match msg {
187                        Ok(r) => r,
188                        Err(err) => {
189                            error!(?err, "receiver failed");
190                            break;
191                        }
192                    };
193
194                    // Decode the message
195                    let msg = match msg {
196                        Ok(msg) => msg,
197                        Err(err) => {
198                            warn!(?err, ?peer, "failed to decode message");
199                            self.metrics.receive.inc(Status::Invalid);
200                            continue;
201                        }
202                    };
203
204                    trace!(?peer, "network");
205                    self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
206                    self.handle_network(peer, msg).await;
207                },
208            }
209        }
210    }
211
212    ////////////////////////////////////////
213    // Handling
214    ////////////////////////////////////////
215
216    /// Handles a `broadcast` request from the application.
217    async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
218        &mut self,
219        sender: &mut WrappedSender<Sr, M>,
220        recipients: Recipients<P>,
221        msg: M,
222        responder: oneshot::Sender<Vec<P>>,
223    ) {
224        // Store the message, continue even if it was already stored
225        let _ = self.insert_message(self.public_key.clone(), msg.clone());
226
227        // Broadcast the message to the network
228        let sent_to = sender
229            .send(recipients, msg, self.priority)
230            .await
231            .unwrap_or_else(|err| {
232                error!(?err, "failed to send message");
233                vec![]
234            });
235        let _ = responder.send(sent_to);
236    }
237
238    /// Searches through all maintained messages for a match.
239    fn find_messages(
240        &mut self,
241        peer: &Option<P>,
242        commitment: M::Commitment,
243        digest: Option<M::Digest>,
244        all: bool,
245    ) -> Vec<M> {
246        match peer {
247            // Only consider messages from the peer filter
248            Some(s) => self
249                .deques
250                .get(s)
251                .into_iter()
252                .flat_map(|dq| dq.iter())
253                .filter(|pair| pair.commitment == commitment)
254                .filter_map(|pair| {
255                    self.items
256                        .get(&pair.commitment)
257                        .and_then(|m| m.get(&pair.digest))
258                })
259                .cloned()
260                .collect(),
261
262            // Search by commitment
263            None => match self.items.get(&commitment) {
264                // If there are no messages for the commitment, return an empty vector
265                None => Vec::new(),
266
267                // If there are messages, return the ones that match the digest filter
268                Some(msgs) => match digest {
269                    // If a digest is provided, return whatever matches it.
270                    Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
271
272                    // If no digest was provided, return `all` messages for the commitment.
273                    None if all => msgs.values().cloned().collect(),
274                    None => msgs.values().next().cloned().into_iter().collect(),
275                },
276            },
277        }
278    }
279
280    /// Handles a `subscribe` request from the application.
281    ///
282    /// If the message is already in the cache, the responder is immediately sent the message.
283    /// Otherwise, the responder is stored in the waiters list.
284    async fn handle_subscribe(
285        &mut self,
286        peer: Option<P>,
287        commitment: M::Commitment,
288        digest: Option<M::Digest>,
289        responder: oneshot::Sender<M>,
290    ) {
291        // Check if the message is already in the cache
292        let mut items = self.find_messages(&peer, commitment, digest, false);
293        if let Some(item) = items.pop() {
294            self.respond_subscribe(responder, item);
295            return;
296        }
297
298        // Store the responder
299        self.waiters.entry(commitment).or_default().push(Waiter {
300            peer,
301            digest,
302            responder,
303        });
304    }
305
306    /// Handles a `get` request from the application.
307    async fn handle_get(
308        &mut self,
309        peer: Option<P>,
310        commitment: M::Commitment,
311        digest: Option<M::Digest>,
312        responder: oneshot::Sender<Vec<M>>,
313    ) {
314        let items = self.find_messages(&peer, commitment, digest, true);
315        self.respond_get(responder, items);
316    }
317
318    /// Handles a message that was received from a peer.
319    async fn handle_network(&mut self, peer: P, msg: M) {
320        if !self.insert_message(peer.clone(), msg) {
321            debug!(?peer, "message already stored");
322            self.metrics.receive.inc(Status::Dropped);
323            return;
324        }
325
326        self.metrics.receive.inc(Status::Success);
327    }
328
329    ////////////////////////////////////////
330    // Cache Management
331    ////////////////////////////////////////
332
333    /// Inserts a message into the cache.
334    ///
335    /// Returns `true` if the message was inserted, `false` if it was already present.
336    /// Updates the deque, item count, and message cache, potentially evicting an old message.
337    fn insert_message(&mut self, peer: P, msg: M) -> bool {
338        // Get the commitment and digest of the message
339        let pair = Pair {
340            commitment: msg.commitment(),
341            digest: msg.digest(),
342        };
343
344        // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
345        if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
346            let mut i = 0;
347            while i < waiters.len() {
348                // Get the peer and digest filters
349                let Waiter {
350                    peer: peer_filter,
351                    digest: digest_filter,
352                    responder: _,
353                } = &waiters[i];
354
355                // Keep the waiter if either filter does not match
356                if peer_filter.as_ref().is_some_and(|s| s != &peer)
357                    || digest_filter.is_some_and(|d| d != pair.digest)
358                {
359                    i += 1;
360                    continue;
361                }
362
363                // Filters match, so fulfill the subscription and drop the entry.
364                //
365                // The index `i` is intentionally not incremented here to check
366                // the element that was swapped into position `i`.
367                let responder = waiters.swap_remove(i).responder;
368                self.respond_subscribe(responder, msg.clone());
369            }
370
371            // Re-insert if any waiters remain for this commitment
372            if !waiters.is_empty() {
373                self.waiters.insert(pair.commitment, waiters);
374            }
375        }
376
377        // Get the relevant deque for the peer
378        let deque = self
379            .deques
380            .entry(peer)
381            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
382
383        // If the message is already in the deque, move it to the front and return early
384        if let Some(i) = deque.iter().position(|d| *d == pair) {
385            if i != 0 {
386                let v = deque.remove(i).unwrap(); // Must exist
387                deque.push_front(v);
388            }
389            return false;
390        };
391
392        // - Insert the message into the peer cache
393        // - Increment the item count
394        // - Insert the message if-and-only-if the new item count is 1
395        deque.push_front(pair);
396        let count = self
397            .counts
398            .entry(pair.digest)
399            .and_modify(|c| *c = c.checked_add(1).unwrap())
400            .or_insert(1);
401        if *count == 1 {
402            let existing = self
403                .items
404                .entry(pair.commitment)
405                .or_default()
406                .insert(pair.digest, msg);
407            assert!(existing.is_none());
408        }
409
410        // If the cache is full...
411        if deque.len() > self.deque_size {
412            // Remove the oldest item from the peer cache
413            // Decrement the item count
414            // Remove the message if-and-only-if the new item count is 0
415            let stale = deque.pop_back().unwrap();
416            let count = self
417                .counts
418                .entry(stale.digest)
419                .and_modify(|c| *c = c.checked_sub(1).unwrap())
420                .or_insert_with(|| unreachable!());
421            if *count == 0 {
422                let existing = self.counts.remove(&stale.digest);
423                assert!(existing == Some(0));
424                let identities = self.items.get_mut(&stale.commitment).unwrap();
425                identities.remove(&stale.digest); // Must have existed
426                if identities.is_empty() {
427                    self.items.remove(&stale.commitment);
428                }
429            }
430        }
431
432        true
433    }
434
435    ////////////////////////////////////////
436    // Utilities
437    ////////////////////////////////////////
438
439    /// Remove all waiters that have dropped receivers.
440    fn cleanup_waiters(&mut self) {
441        self.waiters.retain(|_, waiters| {
442            let initial_len = waiters.len();
443            waiters.retain(|waiter| !waiter.responder.is_canceled());
444            let dropped_count = initial_len - waiters.len();
445
446            // Increment metrics for each dropped waiter
447            for _ in 0..dropped_count {
448                self.metrics.get.inc(Status::Dropped);
449            }
450
451            !waiters.is_empty()
452        });
453    }
454
455    /// Respond to a waiter with a message.
456    /// Increments the appropriate metric based on the result.
457    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
458        let result = responder.send(msg);
459        self.metrics.subscribe.inc(match result {
460            Ok(_) => Status::Success,
461            Err(_) => Status::Dropped,
462        });
463    }
464
465    /// Respond to a waiter with an optional message.
466    /// Increments the appropriate metric based on the result.
467    fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
468        let found = !msg.is_empty();
469        let result = responder.send(msg);
470        self.metrics.get.inc(match result {
471            Ok(_) if found => Status::Success,
472            Ok(_) => Status::Failure,
473            Err(_) => Status::Dropped,
474        });
475    }
476}