Skip to main content

commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Digestible, PublicKey};
5use commonware_macros::select_loop;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Provider, Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    spawn_cell,
12    telemetry::metrics::status::{CounterExt, GaugeExt, Status},
13    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::{
16    channel::{fallible::OneshotExt, mpsc, oneshot},
17    ordered::Set,
18};
19use std::collections::{BTreeMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22/// A responder waiting for a message.
23struct Waiter<M> {
24    /// The responder to send the message to.
25    responder: oneshot::Sender<M>,
26}
27
28/// Result of buffering an incoming or locally sent digest (inserted, duplicate, or ineligible).
29enum InsertMessageResult {
30    Inserted,
31    Duplicate,
32    Ineligible,
33}
34
35/// Instance of the main engine for the module.
36///
37/// It is responsible for:
38/// - Broadcasting messages to the network
39/// - Receiving messages from the network
40/// - Storing messages in the cache
41/// - Responding to requests from the application
42pub struct Engine<E, P, M, D>
43where
44    E: BufferPooler + Clock + Spawner + Metrics,
45    P: PublicKey,
46    M: Digestible + Codec,
47    D: Provider<PublicKey = P>,
48{
49    ////////////////////////////////////////
50    // Interfaces
51    ////////////////////////////////////////
52    context: ContextCell<E>,
53
54    ////////////////////////////////////////
55    // Configuration
56    ////////////////////////////////////////
57    /// My public key
58    public_key: P,
59
60    /// Whether messages are sent as priority
61    priority: bool,
62
63    /// Number of messages to cache per peer
64    deque_size: usize,
65
66    /// Configuration for decoding messages
67    codec_config: M::Cfg,
68
69    ////////////////////////////////////////
70    // Messaging
71    ////////////////////////////////////////
72    /// The mailbox for receiving messages.
73    mailbox_receiver: mpsc::Receiver<Message<P, M>>,
74
75    /// Pending requests from the application.
76    waiters: BTreeMap<M::Digest, Vec<Waiter<M>>>,
77
78    /// Provider for peer set changes.
79    peer_provider: D,
80
81    ////////////////////////////////////////
82    // Cache
83    ////////////////////////////////////////
84    /// All cached messages by digest.
85    items: BTreeMap<M::Digest, M>,
86
87    /// A LRU cache of the latest received digests from each peer.
88    ///
89    /// This is used to limit the number of digests stored per peer.
90    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
91    /// membership checks are done in linear time.
92    deques: BTreeMap<P, VecDeque<M::Digest>>,
93
94    /// The number of times each digest (globally unique) exists in one of the deques.
95    ///
96    /// Multiple peers can send the same message and we only want to store
97    /// the message once.
98    counts: BTreeMap<M::Digest, usize>,
99
100    /// Latest primary peer set allowed to keep buffered messages resident.
101    latest_primary_peers: Set<P>,
102
103    ////////////////////////////////////////
104    // Metrics
105    ////////////////////////////////////////
106    /// Metrics
107    metrics: metrics::Metrics,
108}
109
110impl<E, P, M, D> Engine<E, P, M, D>
111where
112    E: BufferPooler + Clock + Spawner + Metrics,
113    P: PublicKey,
114    M: Digestible + Codec,
115    D: Provider<PublicKey = P>,
116{
117    /// Creates a new engine with the given context and configuration.
118    /// Returns the engine and a mailbox for sending messages to the engine.
119    pub fn new(context: E, cfg: Config<P, M::Cfg, D>) -> (Self, Mailbox<P, M>) {
120        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
121        let mailbox = Mailbox::<P, M>::new(mailbox_sender);
122
123        // TODO(#1833): Metrics should use the post-start context
124        let metrics = metrics::Metrics::init(context.clone());
125
126        let result = Self {
127            context: ContextCell::new(context),
128            public_key: cfg.public_key,
129            priority: cfg.priority,
130            deque_size: cfg.deque_size,
131            codec_config: cfg.codec_config,
132            mailbox_receiver,
133            waiters: BTreeMap::new(),
134            deques: BTreeMap::new(),
135            items: BTreeMap::new(),
136            counts: BTreeMap::new(),
137            latest_primary_peers: Set::default(),
138            peer_provider: cfg.peer_provider,
139            metrics,
140        };
141
142        (result, mailbox)
143    }
144
145    /// Starts the engine with the given network.
146    pub fn start(
147        mut self,
148        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
149    ) -> Handle<()> {
150        spawn_cell!(self.context, self.run(network).await)
151    }
152
153    /// Inner run loop called by `start`.
154    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
155        let (mut sender, mut receiver) = wrap(
156            self.codec_config.clone(),
157            self.context.network_buffer_pool().clone(),
158            network.0,
159            network.1,
160        );
161        let peer_set_subscription = &mut self.peer_provider.subscribe().await;
162
163        select_loop! {
164            self.context,
165            on_start => {
166                // Cleanup waiters
167                self.cleanup_waiters();
168                let _ = self.metrics.waiters.try_set(self.waiters.len());
169            },
170            on_stopped => {
171                debug!("shutdown");
172            },
173            // Handle peer set subscription messages
174            Some(update) = peer_set_subscription.recv() else {
175                debug!("peer set subscription closed");
176                break;
177            } => {
178                // Evict by latest primary only; see buffered module docs.
179                self.update_latest_primary_peers(update.latest.primary);
180            },
181            // Handle mailbox messages
182            Some(msg) = self.mailbox_receiver.recv() else {
183                error!("mailbox receiver failed");
184                break;
185            } => match msg {
186                Message::Broadcast {
187                    recipients,
188                    message,
189                    responder,
190                } => {
191                    trace!("mailbox: broadcast");
192                    self.handle_broadcast(&mut sender, recipients, message, responder)
193                        .await;
194                }
195                Message::Subscribe { digest, responder } => {
196                    trace!("mailbox: subscribe");
197                    self.handle_subscribe(digest, responder);
198                }
199                Message::Get { digest, responder } => {
200                    trace!("mailbox: get");
201                    self.handle_get(digest, responder);
202                }
203            },
204            // Handle incoming messages
205            msg = receiver.recv() => {
206                // Error handling
207                let (peer, msg) = match msg {
208                    Ok(r) => r,
209                    Err(err) => {
210                        error!(?err, "receiver failed");
211                        break;
212                    }
213                };
214
215                // Decode the message
216                let msg = match msg {
217                    Ok(msg) => msg,
218                    Err(err) => {
219                        warn!(?err, ?peer, "failed to decode message");
220                        self.metrics.receive.inc(Status::Invalid);
221                        continue;
222                    }
223                };
224
225                trace!(?peer, "network");
226                self.metrics
227                    .peer
228                    .get_or_create(&SequencerLabel::from(&peer))
229                    .inc();
230                self.handle_network(peer, msg);
231            },
232        }
233    }
234
235    ////////////////////////////////////////
236    // Handling
237    ////////////////////////////////////////
238
239    /// Handles a `broadcast` request from the application.
240    async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
241        &mut self,
242        sender: &mut WrappedSender<Sr, M>,
243        recipients: Recipients<P>,
244        msg: M,
245        responder: oneshot::Sender<Vec<P>>,
246    ) {
247        // Store the message, continue even if it was already stored
248        let digest = msg.digest();
249        let _ = self.insert_message(self.public_key.clone(), digest, msg.clone());
250
251        // Broadcast the message to the network
252        let sent_to = sender
253            .send(recipients, msg, self.priority)
254            .await
255            .unwrap_or_else(|err| {
256                error!(?err, "failed to send message");
257                vec![]
258            });
259        responder.send_lossy(sent_to);
260    }
261
262    /// Handles a `subscribe` request from the application.
263    ///
264    /// If the message is already in the cache, the responder is immediately sent the message.
265    /// Otherwise, the responder is stored in the waiters list.
266    fn handle_subscribe(&mut self, digest: M::Digest, responder: oneshot::Sender<M>) {
267        // Check if the message is already in the cache
268        if let Some(item) = self.items.get(&digest).cloned() {
269            self.respond_subscribe(responder, item);
270            return;
271        }
272
273        // Store the responder
274        self.waiters
275            .entry(digest)
276            .or_default()
277            .push(Waiter { responder });
278    }
279
280    /// Handles a `get` request from the application.
281    fn handle_get(&mut self, digest: M::Digest, responder: oneshot::Sender<Option<M>>) {
282        let item = self.items.get(&digest).cloned();
283        self.respond_get(responder, item);
284    }
285
286    /// Handles a message that was received from a peer.
287    fn handle_network(&mut self, peer: P, msg: M) {
288        let digest = msg.digest();
289        match self.insert_message(peer.clone(), digest, msg) {
290            InsertMessageResult::Inserted => {
291                self.metrics.receive.inc(Status::Success);
292            }
293            InsertMessageResult::Duplicate => {
294                debug!(?peer, "message already stored");
295                self.metrics.receive.inc(Status::Dropped);
296            }
297            InsertMessageResult::Ineligible => {
298                debug!(?peer, "message from peer outside latest.primary not cached");
299                self.metrics.receive.inc(Status::Dropped);
300            }
301        }
302    }
303
304    ////////////////////////////////////////
305    // Cache Management
306    ////////////////////////////////////////
307
308    /// Inserts a message into the cache.
309    ///
310    /// Waiters are notified even when a sender is not eligible to keep a
311    /// buffered cache entry resident.
312    fn insert_message(&mut self, peer: P, digest: M::Digest, msg: M) -> InsertMessageResult {
313        // Send the message to the waiters, if any
314        if let Some(waiters) = self.waiters.remove(&digest) {
315            for waiter in waiters {
316                self.respond_subscribe(waiter.responder, msg.clone());
317            }
318        }
319
320        // Only peers listed in `latest.primary` may buffer
321        if self.latest_primary_peers.position(&peer).is_none() {
322            return InsertMessageResult::Ineligible;
323        }
324
325        // Get the relevant deque for the peer
326        let deque = self
327            .deques
328            .entry(peer)
329            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
330
331        // If the message is already in the deque, move it to the front and return early
332        if let Some(i) = deque.iter().position(|d| *d == digest) {
333            if i != 0 {
334                let v = deque.remove(i).unwrap(); // Must exist
335                deque.push_front(v);
336            }
337            return InsertMessageResult::Duplicate;
338        };
339
340        // - Insert the digest into the peer cache
341        // - Increment the item count
342        // - Insert the message if-and-only-if the new item count is 1
343        deque.push_front(digest);
344        let count = self
345            .counts
346            .entry(digest)
347            .and_modify(|c| *c = c.checked_add(1).unwrap())
348            .or_insert(1);
349        if *count == 1 {
350            let existing = self.items.insert(digest, msg);
351            assert!(existing.is_none());
352        }
353
354        // If the cache is full...
355        if deque.len() > self.deque_size {
356            // Remove the oldest item from the peer cache
357            // Decrement the item count
358            // Remove the message if-and-only-if the new item count is 0
359            let stale = deque.pop_back().unwrap();
360            decrement_digest_refcount(&mut self.counts, &mut self.items, &stale);
361        }
362
363        InsertMessageResult::Inserted
364    }
365
366    fn update_latest_primary_peers(&mut self, peers: Set<P>) {
367        for (peer, deque) in self
368            .deques
369            .extract_if(.., |peer, _| peers.position(peer).is_none())
370        {
371            debug!(?peer, digests = deque.len(), "evicting disconnected peer");
372            for digest in deque {
373                decrement_digest_refcount(&mut self.counts, &mut self.items, &digest);
374            }
375        }
376        self.latest_primary_peers = peers;
377    }
378
379    ////////////////////////////////////////
380    // Utilities
381    ////////////////////////////////////////
382
383    /// Remove all waiters that have dropped receivers.
384    fn cleanup_waiters(&mut self) {
385        self.waiters.retain(|_, waiters| {
386            let initial_len = waiters.len();
387            waiters.retain(|waiter| !waiter.responder.is_closed());
388            let dropped_count = initial_len - waiters.len();
389
390            // Increment metrics for each dropped waiter
391            for _ in 0..dropped_count {
392                self.metrics.get.inc(Status::Dropped);
393            }
394
395            !waiters.is_empty()
396        });
397    }
398
399    /// Respond to a waiter with a message.
400    /// Increments the appropriate metric based on the result.
401    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
402        self.metrics.subscribe.inc(if responder.send_lossy(msg) {
403            Status::Success
404        } else {
405            Status::Dropped
406        });
407    }
408
409    /// Respond to a get request.
410    /// Increments the appropriate metric based on the result.
411    fn respond_get(&mut self, responder: oneshot::Sender<Option<M>>, msg: Option<M>) {
412        let found = msg.is_some();
413        self.metrics.get.inc(if responder.send_lossy(msg) {
414            if found {
415                Status::Success
416            } else {
417                Status::Failure
418            }
419        } else {
420            Status::Dropped
421        });
422    }
423}
424
425/// Decrement a digest refcount and evict it from cache when no references remain.
426fn decrement_digest_refcount<D: Ord, M>(
427    counts: &mut BTreeMap<D, usize>,
428    items: &mut BTreeMap<D, M>,
429    digest: &D,
430) {
431    let should_remove = {
432        let count = counts.get_mut(digest).expect("count must exist");
433        *count = count.checked_sub(1).expect("count must be > 0");
434        *count == 0
435    };
436    if should_remove {
437        let existing = counts.remove(digest);
438        assert!(existing == Some(0));
439        items.remove(digest);
440    }
441}