Skip to main content

commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Digestible, PublicKey};
5use commonware_macros::select_loop;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Provider, Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    spawn_cell,
12    telemetry::metrics::status::{CounterExt, GaugeExt, Status},
13    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::{
16    channel::{fallible::OneshotExt, mpsc, oneshot},
17    ordered::Set,
18};
19use std::collections::{BTreeMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22/// A responder waiting for a message.
23struct Waiter<M> {
24    /// The responder to send the message to.
25    responder: oneshot::Sender<M>,
26}
27
28/// Instance of the main engine for the module.
29///
30/// It is responsible for:
31/// - Broadcasting messages to the network
32/// - Receiving messages from the network
33/// - Storing messages in the cache
34/// - Responding to requests from the application
35pub struct Engine<E, P, M, D>
36where
37    E: BufferPooler + Clock + Spawner + Metrics,
38    P: PublicKey,
39    M: Digestible + Codec,
40    D: Provider<PublicKey = P>,
41{
42    ////////////////////////////////////////
43    // Interfaces
44    ////////////////////////////////////////
45    context: ContextCell<E>,
46
47    ////////////////////////////////////////
48    // Configuration
49    ////////////////////////////////////////
50    /// My public key
51    public_key: P,
52
53    /// Whether messages are sent as priority
54    priority: bool,
55
56    /// Number of messages to cache per peer
57    deque_size: usize,
58
59    /// Configuration for decoding messages
60    codec_config: M::Cfg,
61
62    ////////////////////////////////////////
63    // Messaging
64    ////////////////////////////////////////
65    /// The mailbox for receiving messages.
66    mailbox_receiver: mpsc::Receiver<Message<P, M>>,
67
68    /// Pending requests from the application.
69    waiters: BTreeMap<M::Digest, Vec<Waiter<M>>>,
70
71    /// Provider for peer set changes.
72    peer_provider: D,
73
74    ////////////////////////////////////////
75    // Cache
76    ////////////////////////////////////////
77    /// All cached messages by digest.
78    items: BTreeMap<M::Digest, M>,
79
80    /// A LRU cache of the latest received digests from each peer.
81    ///
82    /// This is used to limit the number of digests stored per peer.
83    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
84    /// membership checks are done in linear time.
85    deques: BTreeMap<P, VecDeque<M::Digest>>,
86
87    /// The number of times each digest (globally unique) exists in one of the deques.
88    ///
89    /// Multiple peers can send the same message and we only want to store
90    /// the message once.
91    counts: BTreeMap<M::Digest, usize>,
92
93    ////////////////////////////////////////
94    // Metrics
95    ////////////////////////////////////////
96    /// Metrics
97    metrics: metrics::Metrics,
98}
99
100impl<E, P, M, D> Engine<E, P, M, D>
101where
102    E: BufferPooler + Clock + Spawner + Metrics,
103    P: PublicKey,
104    M: Digestible + Codec,
105    D: Provider<PublicKey = P>,
106{
107    /// Creates a new engine with the given context and configuration.
108    /// Returns the engine and a mailbox for sending messages to the engine.
109    pub fn new(context: E, cfg: Config<P, M::Cfg, D>) -> (Self, Mailbox<P, M>) {
110        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
111        let mailbox = Mailbox::<P, M>::new(mailbox_sender);
112
113        // TODO(#1833): Metrics should use the post-start context
114        let metrics = metrics::Metrics::init(context.clone());
115
116        let result = Self {
117            context: ContextCell::new(context),
118            public_key: cfg.public_key,
119            priority: cfg.priority,
120            deque_size: cfg.deque_size,
121            codec_config: cfg.codec_config,
122            mailbox_receiver,
123            waiters: BTreeMap::new(),
124            deques: BTreeMap::new(),
125            items: BTreeMap::new(),
126            counts: BTreeMap::new(),
127            peer_provider: cfg.peer_provider,
128            metrics,
129        };
130
131        (result, mailbox)
132    }
133
134    /// Starts the engine with the given network.
135    pub fn start(
136        mut self,
137        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
138    ) -> Handle<()> {
139        spawn_cell!(self.context, self.run(network).await)
140    }
141
142    /// Inner run loop called by `start`.
143    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
144        let (mut sender, mut receiver) = wrap(
145            self.codec_config.clone(),
146            self.context.network_buffer_pool().clone(),
147            network.0,
148            network.1,
149        );
150        let peer_set_subscription = &mut self.peer_provider.subscribe().await;
151
152        select_loop! {
153            self.context,
154            on_start => {
155                // Cleanup waiters
156                self.cleanup_waiters();
157                let _ = self.metrics.waiters.try_set(self.waiters.len());
158            },
159            on_stopped => {
160                debug!("shutdown");
161            },
162            // Handle mailbox messages
163            Some(msg) = self.mailbox_receiver.recv() else {
164                error!("mailbox receiver failed");
165                break;
166            } => match msg {
167                Message::Broadcast {
168                    recipients,
169                    message,
170                    responder,
171                } => {
172                    trace!("mailbox: broadcast");
173                    self.handle_broadcast(&mut sender, recipients, message, responder)
174                        .await;
175                }
176                Message::Subscribe { digest, responder } => {
177                    trace!("mailbox: subscribe");
178                    self.handle_subscribe(digest, responder);
179                }
180                Message::Get { digest, responder } => {
181                    trace!("mailbox: get");
182                    self.handle_get(digest, responder);
183                }
184            },
185            // Handle incoming messages
186            msg = receiver.recv() => {
187                // Error handling
188                let (peer, msg) = match msg {
189                    Ok(r) => r,
190                    Err(err) => {
191                        error!(?err, "receiver failed");
192                        break;
193                    }
194                };
195
196                // Decode the message
197                let msg = match msg {
198                    Ok(msg) => msg,
199                    Err(err) => {
200                        warn!(?err, ?peer, "failed to decode message");
201                        self.metrics.receive.inc(Status::Invalid);
202                        continue;
203                    }
204                };
205
206                trace!(?peer, "network");
207                self.metrics
208                    .peer
209                    .get_or_create(&SequencerLabel::from(&peer))
210                    .inc();
211                self.handle_network(peer, msg);
212            },
213            Some((_, _, tracked_peers)) = peer_set_subscription.recv() else {
214                debug!("peer set subscription closed");
215                break;
216            } => {
217                self.evict_untracked_peers(&tracked_peers);
218            },
219        }
220    }
221
222    ////////////////////////////////////////
223    // Handling
224    ////////////////////////////////////////
225
226    /// Handles a `broadcast` request from the application.
227    async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
228        &mut self,
229        sender: &mut WrappedSender<Sr, M>,
230        recipients: Recipients<P>,
231        msg: M,
232        responder: oneshot::Sender<Vec<P>>,
233    ) {
234        // Store the message, continue even if it was already stored
235        let _ = self.insert_message(self.public_key.clone(), msg.clone());
236
237        // Broadcast the message to the network
238        let sent_to = sender
239            .send(recipients, msg, self.priority)
240            .await
241            .unwrap_or_else(|err| {
242                error!(?err, "failed to send message");
243                vec![]
244            });
245        responder.send_lossy(sent_to);
246    }
247
248    /// Handles a `subscribe` request from the application.
249    ///
250    /// If the message is already in the cache, the responder is immediately sent the message.
251    /// Otherwise, the responder is stored in the waiters list.
252    fn handle_subscribe(&mut self, digest: M::Digest, responder: oneshot::Sender<M>) {
253        // Check if the message is already in the cache
254        if let Some(item) = self.items.get(&digest).cloned() {
255            self.respond_subscribe(responder, item);
256            return;
257        }
258
259        // Store the responder
260        self.waiters
261            .entry(digest)
262            .or_default()
263            .push(Waiter { responder });
264    }
265
266    /// Handles a `get` request from the application.
267    fn handle_get(&mut self, digest: M::Digest, responder: oneshot::Sender<Option<M>>) {
268        let item = self.items.get(&digest).cloned();
269        self.respond_get(responder, item);
270    }
271
272    /// Handles a message that was received from a peer.
273    fn handle_network(&mut self, peer: P, msg: M) {
274        if !self.insert_message(peer.clone(), msg) {
275            debug!(?peer, "message already stored");
276            self.metrics.receive.inc(Status::Dropped);
277            return;
278        }
279
280        self.metrics.receive.inc(Status::Success);
281    }
282
283    ////////////////////////////////////////
284    // Cache Management
285    ////////////////////////////////////////
286
287    /// Inserts a message into the cache.
288    ///
289    /// Returns `true` if the message was inserted, `false` if it was already present.
290    /// Updates the deque, item count, and message cache, potentially evicting an old message.
291    fn insert_message(&mut self, peer: P, msg: M) -> bool {
292        let digest = msg.digest();
293
294        // Send the message to the waiters, if any
295        if let Some(waiters) = self.waiters.remove(&digest) {
296            for waiter in waiters {
297                self.respond_subscribe(waiter.responder, msg.clone());
298            }
299        }
300
301        // Get the relevant deque for the peer
302        let deque = self
303            .deques
304            .entry(peer)
305            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
306
307        // If the message is already in the deque, move it to the front and return early
308        if let Some(i) = deque.iter().position(|d| *d == digest) {
309            if i != 0 {
310                let v = deque.remove(i).unwrap(); // Must exist
311                deque.push_front(v);
312            }
313            return false;
314        };
315
316        // - Insert the digest into the peer cache
317        // - Increment the item count
318        // - Insert the message if-and-only-if the new item count is 1
319        deque.push_front(digest);
320        let count = self
321            .counts
322            .entry(digest)
323            .and_modify(|c| *c = c.checked_add(1).unwrap())
324            .or_insert(1);
325        if *count == 1 {
326            let existing = self.items.insert(digest, msg);
327            assert!(existing.is_none());
328        }
329
330        // If the cache is full...
331        if deque.len() > self.deque_size {
332            // Remove the oldest item from the peer cache
333            // Decrement the item count
334            // Remove the message if-and-only-if the new item count is 0
335            let stale = deque.pop_back().unwrap();
336            decrement_digest_refcount(&mut self.counts, &mut self.items, &stale);
337        }
338
339        true
340    }
341
342    fn evict_untracked_peers(&mut self, tracked_peers: &Set<P>) {
343        let tracked = tracked_peers.as_ref();
344        for (peer, deque) in self
345            .deques
346            .extract_if(.., |peer, _| !tracked.contains(peer))
347        {
348            debug!(?peer, digests = deque.len(), "evicting disconnected peer");
349            for digest in deque {
350                decrement_digest_refcount(&mut self.counts, &mut self.items, &digest);
351            }
352        }
353    }
354
355    ////////////////////////////////////////
356    // Utilities
357    ////////////////////////////////////////
358
359    /// Remove all waiters that have dropped receivers.
360    fn cleanup_waiters(&mut self) {
361        self.waiters.retain(|_, waiters| {
362            let initial_len = waiters.len();
363            waiters.retain(|waiter| !waiter.responder.is_closed());
364            let dropped_count = initial_len - waiters.len();
365
366            // Increment metrics for each dropped waiter
367            for _ in 0..dropped_count {
368                self.metrics.get.inc(Status::Dropped);
369            }
370
371            !waiters.is_empty()
372        });
373    }
374
375    /// Respond to a waiter with a message.
376    /// Increments the appropriate metric based on the result.
377    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
378        self.metrics.subscribe.inc(if responder.send_lossy(msg) {
379            Status::Success
380        } else {
381            Status::Dropped
382        });
383    }
384
385    /// Respond to a get request.
386    /// Increments the appropriate metric based on the result.
387    fn respond_get(&mut self, responder: oneshot::Sender<Option<M>>, msg: Option<M>) {
388        let found = msg.is_some();
389        self.metrics.get.inc(if responder.send_lossy(msg) {
390            if found {
391                Status::Success
392            } else {
393                Status::Failure
394            }
395        } else {
396            Status::Dropped
397        });
398    }
399}
400
401/// Decrement a digest refcount and evict it from cache when no references remain.
402fn decrement_digest_refcount<D: Ord, M>(
403    counts: &mut BTreeMap<D, usize>,
404    items: &mut BTreeMap<D, M>,
405    digest: &D,
406) {
407    let should_remove = {
408        let count = counts.get_mut(digest).expect("count must exist");
409        *count = count.checked_sub(1).expect("count must be > 0");
410        *count == 0
411    };
412    if should_remove {
413        let existing = counts.remove(digest);
414        assert!(existing == Some(0));
415        items.remove(digest);
416    }
417}