commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digestible, PublicKey};
5use commonware_macros::select;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    telemetry::metrics::status::{CounterExt, Status},
12    Clock, Handle, Metrics, Spawner,
13};
14use futures::{
15    channel::{mpsc, oneshot},
16    StreamExt,
17};
18use std::collections::{HashMap, VecDeque};
19use tracing::{debug, error, trace, warn};
20
21/// A responder waiting for a message.
22struct Waiter<P, Dd, M> {
23    /// The peer sending the message.
24    peer: Option<P>,
25
26    /// The digest of the message.
27    digest: Option<Dd>,
28
29    /// The responder to send the message to.
30    responder: oneshot::Sender<M>,
31}
32
33/// A pair of commitment and digest.
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35struct Pair<Dc, Dd> {
36    /// The commitment of the message.
37    commitment: Dc,
38
39    /// The digest of the message.
40    digest: Dd,
41}
42
43/// Instance of the main engine for the module.
44///
45/// It is responsible for:
46/// - Broadcasting messages to the network
47/// - Receiving messages from the network
48/// - Storing messages in the cache
49/// - Responding to requests from the application
50pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec> {
51    ////////////////////////////////////////
52    // Interfaces
53    ////////////////////////////////////////
54    context: E,
55
56    ////////////////////////////////////////
57    // Configuration
58    ////////////////////////////////////////
59    /// My public key
60    public_key: P,
61
62    /// Whether messages are sent as priority
63    priority: bool,
64
65    /// Number of messages to cache per peer
66    deque_size: usize,
67
68    /// Configuration for decoding messages
69    codec_config: M::Cfg,
70
71    ////////////////////////////////////////
72    // Messaging
73    ////////////////////////////////////////
74    /// The mailbox for receiving messages.
75    mailbox_receiver: mpsc::Receiver<Message<P, M>>,
76
77    /// Pending requests from the application.
78    #[allow(clippy::type_complexity)]
79    waiters: HashMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
80
81    ////////////////////////////////////////
82    // Cache
83    ////////////////////////////////////////
84    /// All cached messages by commitment and digest.
85    ///
86    /// We store messages outside of the deques to minimize memory usage
87    /// when receiving duplicate messages.
88    items: HashMap<M::Commitment, HashMap<M::Digest, M>>,
89
90    /// A LRU cache of the latest received identities and digests from each peer.
91    ///
92    /// This is used to limit the number of digests stored per peer.
93    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
94    /// membership checks are done in linear time.
95    #[allow(clippy::type_complexity)]
96    deques: HashMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
97
98    /// The number of times each digest (globally unique) exists in one of the deques.
99    ///
100    /// Multiple peers can send the same message and we only want to store
101    /// the message once.
102    counts: HashMap<M::Digest, usize>,
103
104    ////////////////////////////////////////
105    // Metrics
106    ////////////////////////////////////////
107    /// Metrics
108    metrics: metrics::Metrics,
109}
110
111impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec>
112    Engine<E, P, M>
113{
114    /// Creates a new engine with the given context and configuration.
115    /// Returns the engine and a mailbox for sending messages to the engine.
116    pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, M>) {
117        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
118        let mailbox = Mailbox::<P, M>::new(mailbox_sender);
119        let metrics = metrics::Metrics::init(context.clone());
120
121        let result = Self {
122            context,
123            public_key: cfg.public_key,
124            priority: cfg.priority,
125            deque_size: cfg.deque_size,
126            codec_config: cfg.codec_config,
127            mailbox_receiver,
128            waiters: HashMap::new(),
129            deques: HashMap::new(),
130            items: HashMap::new(),
131            counts: HashMap::new(),
132            metrics,
133        };
134
135        (result, mailbox)
136    }
137
138    /// Starts the engine with the given network.
139    pub fn start(
140        mut self,
141        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
142    ) -> Handle<()> {
143        self.context.spawn_ref()(self.run(network))
144    }
145
146    /// Inner run loop called by `start`.
147    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
148        let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
149        let mut shutdown = self.context.stopped();
150
151        loop {
152            // Cleanup waiters
153            self.cleanup_waiters();
154            self.metrics.waiters.set(self.waiters.len() as i64);
155
156            select! {
157                // Handle shutdown signal
158                _ = &mut shutdown => {
159                    debug!("shutdown");
160                },
161
162                // Handle mailbox messages
163                mail = self.mailbox_receiver.next() => {
164                    let Some(msg) = mail else {
165                        error!("mailbox receiver failed");
166                        break;
167                    };
168                    match msg {
169                        Message::Broadcast{ recipients, message, responder } => {
170                            trace!("mailbox: broadcast");
171                            self.handle_broadcast(&mut sender, recipients, message, responder).await;
172                        }
173                        Message::Subscribe{ peer, commitment, digest, responder } => {
174                            trace!("mailbox: subscribe");
175                            self.handle_subscribe(peer, commitment, digest, responder).await;
176                        }
177                        Message::Get{ peer, commitment, digest, responder } => {
178                            trace!("mailbox: get");
179                            self.handle_get(peer, commitment, digest, responder).await;
180                        }
181                    }
182                },
183
184                // Handle incoming messages
185                msg = receiver.recv() => {
186                    // Error handling
187                    let (peer, msg) = match msg {
188                        Ok(r) => r,
189                        Err(err) => {
190                            error!(?err, "receiver failed");
191                            break;
192                        }
193                    };
194
195                    // Decode the message
196                    let msg = match msg {
197                        Ok(msg) => msg,
198                        Err(err) => {
199                            warn!(?err, ?peer, "failed to decode message");
200                            self.metrics.receive.inc(Status::Invalid);
201                            continue;
202                        }
203                    };
204
205                    trace!(?peer, "network");
206                    self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
207                    self.handle_network(peer, msg).await;
208                },
209            }
210        }
211    }
212
213    ////////////////////////////////////////
214    // Handling
215    ////////////////////////////////////////
216
217    /// Handles a `broadcast` request from the application.
218    async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
219        &mut self,
220        sender: &mut WrappedSender<Sr, M>,
221        recipients: Recipients<P>,
222        msg: M,
223        responder: oneshot::Sender<Vec<P>>,
224    ) {
225        // Store the message, continue even if it was already stored
226        let _ = self.insert_message(self.public_key.clone(), msg.clone());
227
228        // Broadcast the message to the network
229        let sent_to = sender
230            .send(recipients, msg, self.priority)
231            .await
232            .unwrap_or_else(|err| {
233                error!(?err, "failed to send message");
234                vec![]
235            });
236        let _ = responder.send(sent_to);
237    }
238
239    /// Searches through all maintained messages for a match.
240    fn find_messages(
241        &mut self,
242        peer: &Option<P>,
243        commitment: M::Commitment,
244        digest: Option<M::Digest>,
245        all: bool,
246    ) -> Vec<M> {
247        match peer {
248            // Only consider messages from the peer filter
249            Some(s) => self
250                .deques
251                .get(s)
252                .into_iter()
253                .flat_map(|dq| dq.iter())
254                .filter(|pair| pair.commitment == commitment)
255                .filter_map(|pair| {
256                    self.items
257                        .get(&pair.commitment)
258                        .and_then(|m| m.get(&pair.digest))
259                })
260                .cloned()
261                .collect(),
262
263            // Search by commitment
264            None => match self.items.get(&commitment) {
265                // If there are no messages for the commitment, return an empty vector
266                None => Vec::new(),
267
268                // If there are messages, return the ones that match the digest filter
269                Some(msgs) => match digest {
270                    // If a digest is provided, return whatever matches it.
271                    Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
272
273                    // If no digest was provided, return `all` messages for the commitment.
274                    None if all => msgs.values().cloned().collect(),
275                    None => msgs.values().next().cloned().into_iter().collect(),
276                },
277            },
278        }
279    }
280
281    /// Handles a `subscribe` request from the application.
282    ///
283    /// If the message is already in the cache, the responder is immediately sent the message.
284    /// Otherwise, the responder is stored in the waiters list.
285    async fn handle_subscribe(
286        &mut self,
287        peer: Option<P>,
288        commitment: M::Commitment,
289        digest: Option<M::Digest>,
290        responder: oneshot::Sender<M>,
291    ) {
292        // Check if the message is already in the cache
293        let mut items = self.find_messages(&peer, commitment, digest, false);
294        if let Some(item) = items.pop() {
295            self.respond_subscribe(responder, item);
296            return;
297        }
298
299        // Store the responder
300        self.waiters.entry(commitment).or_default().push(Waiter {
301            peer,
302            digest,
303            responder,
304        });
305    }
306
307    /// Handles a `get` request from the application.
308    async fn handle_get(
309        &mut self,
310        peer: Option<P>,
311        commitment: M::Commitment,
312        digest: Option<M::Digest>,
313        responder: oneshot::Sender<Vec<M>>,
314    ) {
315        let items = self.find_messages(&peer, commitment, digest, true);
316        self.respond_get(responder, items);
317    }
318
319    /// Handles a message that was received from a peer.
320    async fn handle_network(&mut self, peer: P, msg: M) {
321        if !self.insert_message(peer.clone(), msg) {
322            debug!(?peer, "message already stored");
323            self.metrics.receive.inc(Status::Dropped);
324            return;
325        }
326
327        self.metrics.receive.inc(Status::Success);
328    }
329
330    ////////////////////////////////////////
331    // Cache Management
332    ////////////////////////////////////////
333
334    /// Inserts a message into the cache.
335    ///
336    /// Returns `true` if the message was inserted, `false` if it was already present.
337    /// Updates the deque, item count, and message cache, potentially evicting an old message.
338    fn insert_message(&mut self, peer: P, msg: M) -> bool {
339        // Get the commitment and digest of the message
340        let pair = Pair {
341            commitment: msg.commitment(),
342            digest: msg.digest(),
343        };
344
345        // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
346        if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
347            let mut i = 0;
348            while i < waiters.len() {
349                // Get the peer and digest filters
350                let Waiter {
351                    peer: peer_filter,
352                    digest: digest_filter,
353                    responder: _,
354                } = &waiters[i];
355
356                // Keep the waiter if either filter does not match
357                if peer_filter.as_ref().is_some_and(|s| s != &peer)
358                    || digest_filter.is_some_and(|d| d != pair.digest)
359                {
360                    i += 1;
361                    continue;
362                }
363
364                // Filters match, so fulfill the subscription and drop the entry.
365                //
366                // The index `i` is intentionally not incremented here to check
367                // the element that was swapped into position `i`.
368                let responder = waiters.swap_remove(i).responder;
369                self.respond_subscribe(responder, msg.clone());
370            }
371
372            // Re-insert if any waiters remain for this commitment
373            if !waiters.is_empty() {
374                self.waiters.insert(pair.commitment, waiters);
375            }
376        }
377
378        // Get the relevant deque for the peer
379        let deque = self
380            .deques
381            .entry(peer)
382            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
383
384        // If the message is already in the deque, move it to the front and return early
385        if let Some(i) = deque.iter().position(|d| *d == pair) {
386            if i != 0 {
387                let v = deque.remove(i).unwrap(); // Must exist
388                deque.push_front(v);
389            }
390            return false;
391        };
392
393        // - Insert the message into the peer cache
394        // - Increment the item count
395        // - Insert the message if-and-only-if the new item count is 1
396        deque.push_front(pair);
397        let count = self
398            .counts
399            .entry(pair.digest)
400            .and_modify(|c| *c = c.checked_add(1).unwrap())
401            .or_insert(1);
402        if *count == 1 {
403            let existing = self
404                .items
405                .entry(pair.commitment)
406                .or_default()
407                .insert(pair.digest, msg);
408            assert!(existing.is_none());
409        }
410
411        // If the cache is full...
412        if deque.len() > self.deque_size {
413            // Remove the oldest item from the peer cache
414            // Decrement the item count
415            // Remove the message if-and-only-if the new item count is 0
416            let stale = deque.pop_back().unwrap();
417            let count = self
418                .counts
419                .entry(stale.digest)
420                .and_modify(|c| *c = c.checked_sub(1).unwrap())
421                .or_insert_with(|| unreachable!());
422            if *count == 0 {
423                let existing = self.counts.remove(&stale.digest);
424                assert!(existing == Some(0));
425                let identities = self.items.get_mut(&stale.commitment).unwrap();
426                identities.remove(&stale.digest); // Must have existed
427                if identities.is_empty() {
428                    self.items.remove(&stale.commitment);
429                }
430            }
431        }
432
433        true
434    }
435
436    ////////////////////////////////////////
437    // Utilities
438    ////////////////////////////////////////
439
440    /// Remove all waiters that have dropped receivers.
441    fn cleanup_waiters(&mut self) {
442        self.waiters.retain(|_, waiters| {
443            let initial_len = waiters.len();
444            waiters.retain(|waiter| !waiter.responder.is_canceled());
445            let dropped_count = initial_len - waiters.len();
446
447            // Increment metrics for each dropped waiter
448            for _ in 0..dropped_count {
449                self.metrics.get.inc(Status::Dropped);
450            }
451
452            !waiters.is_empty()
453        });
454    }
455
456    /// Respond to a waiter with a message.
457    /// Increments the appropriate metric based on the result.
458    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
459        let result = responder.send(msg);
460        self.metrics.subscribe.inc(match result {
461            Ok(_) => Status::Success,
462            Err(_) => Status::Dropped,
463        });
464    }
465
466    /// Respond to a waiter with an optional message.
467    /// Increments the appropriate metric based on the result.
468    fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
469        let found = !msg.is_empty();
470        let result = responder.send(msg);
471        self.metrics.get.inc(match result {
472            Ok(_) if found => Status::Success,
473            Ok(_) => Status::Failure,
474            Err(_) => Status::Dropped,
475        });
476    }
477}