Skip to main content

commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use commonware_actor::mailbox;
3use commonware_codec::Codec;
4use commonware_cryptography::{Digestible, PublicKey};
5use commonware_macros::select_loop;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Provider, Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    spawn_cell,
12    telemetry::metrics::{status::Status, GaugeExt},
13    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::{
16    channel::{fallible::OneshotExt, oneshot},
17    ordered::Set,
18};
19use std::collections::{BTreeMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22/// A responder waiting for a message.
23struct Waiter<M> {
24    /// The responder to send the message to.
25    responder: oneshot::Sender<M>,
26}
27
28/// Result of buffering an incoming or locally sent digest (inserted, duplicate, or ineligible).
29enum InsertMessageResult {
30    Inserted,
31    Duplicate,
32    Ineligible,
33}
34
35/// Instance of the main engine for the module.
36///
37/// It is responsible for:
38/// - Broadcasting messages to the network
39/// - Receiving messages from the network
40/// - Storing messages in the cache
41/// - Responding to requests from the application
42pub struct Engine<E, P, M, D>
43where
44    E: BufferPooler + Clock + Spawner + Metrics,
45    P: PublicKey,
46    M: Digestible + Codec,
47    D: Provider<PublicKey = P>,
48{
49    ////////////////////////////////////////
50    // Interfaces
51    ////////////////////////////////////////
52    context: ContextCell<E>,
53
54    ////////////////////////////////////////
55    // Configuration
56    ////////////////////////////////////////
57    /// My public key
58    public_key: P,
59
60    /// Whether messages are sent as priority
61    priority: bool,
62
63    /// Number of messages to cache per peer
64    deque_size: usize,
65
66    /// Configuration for decoding messages
67    codec_config: M::Cfg,
68
69    ////////////////////////////////////////
70    // Messaging
71    ////////////////////////////////////////
72    /// The mailbox for receiving messages.
73    mailbox_receiver: mailbox::Receiver<Message<P, M>>,
74
75    /// Pending requests from the application.
76    waiters: BTreeMap<M::Digest, Vec<Waiter<M>>>,
77
78    /// Provider for peer set changes.
79    peer_provider: D,
80
81    ////////////////////////////////////////
82    // Cache
83    ////////////////////////////////////////
84    /// All cached messages by digest.
85    items: BTreeMap<M::Digest, M>,
86
87    /// A LRU cache of the latest received digests from each peer.
88    ///
89    /// This is used to limit the number of digests stored per peer.
90    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
91    /// membership checks are done in linear time.
92    deques: BTreeMap<P, VecDeque<M::Digest>>,
93
94    /// The number of times each digest (globally unique) exists in one of the deques.
95    ///
96    /// Multiple peers can send the same message and we only want to store
97    /// the message once.
98    counts: BTreeMap<M::Digest, usize>,
99
100    /// Latest primary peer set allowed to keep buffered messages resident.
101    latest_primary_peers: Set<P>,
102
103    ////////////////////////////////////////
104    // Metrics
105    ////////////////////////////////////////
106    /// Metrics
107    metrics: metrics::Metrics<P>,
108}
109
110impl<E, P, M, D> Engine<E, P, M, D>
111where
112    E: BufferPooler + Clock + Spawner + Metrics,
113    P: PublicKey,
114    M: Digestible + Codec,
115    D: Provider<PublicKey = P>,
116{
117    /// Creates a new engine with the given context and configuration.
118    /// Returns the engine and a mailbox for sending messages to the engine.
119    pub fn new(context: E, cfg: Config<P, M::Cfg, D>) -> (Self, Mailbox<P, M>) {
120        let (mailbox_sender, mailbox_receiver) =
121            mailbox::new(context.child("mailbox"), cfg.mailbox_size);
122        let mailbox = Mailbox::<P, M>::new(mailbox_sender);
123
124        let metrics = metrics::Metrics::init(&context);
125
126        let result = Self {
127            context: ContextCell::new(context),
128            public_key: cfg.public_key,
129            priority: cfg.priority,
130            deque_size: cfg.deque_size,
131            codec_config: cfg.codec_config,
132            mailbox_receiver,
133            waiters: BTreeMap::new(),
134            deques: BTreeMap::new(),
135            items: BTreeMap::new(),
136            counts: BTreeMap::new(),
137            latest_primary_peers: Set::default(),
138            peer_provider: cfg.peer_provider,
139            metrics,
140        };
141
142        (result, mailbox)
143    }
144
145    /// Starts the engine with the given network.
146    pub fn start(
147        mut self,
148        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
149    ) -> Handle<()> {
150        spawn_cell!(self.context, self.run(network))
151    }
152
153    /// Inner run loop called by `start`.
154    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
155        let (mut sender, mut receiver) = wrap(
156            self.codec_config.clone(),
157            self.context.network_buffer_pool().clone(),
158            network.0,
159            network.1,
160        );
161        let mut peer_set_subscription = self.peer_provider.subscribe().await;
162
163        select_loop! {
164            self.context,
165            on_start => {
166                // Cleanup waiters
167                self.cleanup_waiters();
168                let _ = self.metrics.waiters.try_set(self.waiters.len());
169            },
170            on_stopped => {
171                debug!("shutdown");
172            },
173            // Handle peer set subscription messages
174            Some(update) = peer_set_subscription.recv() else {
175                debug!("peer set subscription closed");
176                break;
177            } => {
178                // Evict by latest primary only; see buffered module docs.
179                self.update_latest_primary_peers(update.latest.primary);
180            },
181            // Handle mailbox messages
182            Some(msg) = self.mailbox_receiver.recv() else {
183                error!("mailbox receiver failed");
184                break;
185            } => match msg {
186                Message::Broadcast {
187                    recipients,
188                    message,
189                } => {
190                    trace!("mailbox: broadcast");
191                    self.handle_broadcast(&mut sender, recipients, message);
192                }
193                Message::Subscribe { digest, responder } => {
194                    trace!("mailbox: subscribe");
195                    self.handle_subscribe(digest, responder);
196                }
197                Message::Get { digest, responder } => {
198                    trace!("mailbox: get");
199                    self.handle_get(digest, responder);
200                }
201            },
202            // Handle incoming messages
203            msg = receiver.recv() => {
204                // Error handling
205                let (peer, msg) = match msg {
206                    Ok(r) => r,
207                    Err(err) => {
208                        error!(?err, "receiver failed");
209                        break;
210                    }
211                };
212
213                // Decode the message
214                let msg = match msg {
215                    Ok(msg) => msg,
216                    Err(err) => {
217                        warn!(?err, ?peer, "failed to decode message");
218                        self.metrics.receive.inc(Status::Invalid);
219                        continue;
220                    }
221                };
222
223                trace!(?peer, "network");
224                self.metrics.peer.get_or_create_by(&peer).inc();
225                self.handle_network(peer, msg);
226            },
227        }
228    }
229
230    ////////////////////////////////////////
231    // Handling
232    ////////////////////////////////////////
233
234    /// Handles a `broadcast` request from the application.
235    fn handle_broadcast<Sr: Sender<PublicKey = P>>(
236        &mut self,
237        sender: &mut WrappedSender<Sr, M>,
238        recipients: Recipients<P>,
239        msg: M,
240    ) {
241        // Store the message, continue even if it was already stored
242        let digest = msg.digest();
243        let _ = self.insert_message(self.public_key.clone(), digest, msg.clone());
244
245        // Broadcast the message to the network
246        sender.send(recipients, msg, self.priority);
247    }
248
249    /// Handles a `subscribe` request from the application.
250    ///
251    /// If the message is already in the cache, the responder is immediately sent the message.
252    /// Otherwise, the responder is stored in the waiters list.
253    fn handle_subscribe(&mut self, digest: M::Digest, responder: oneshot::Sender<M>) {
254        // Check if the message is already in the cache
255        if let Some(item) = self.items.get(&digest).cloned() {
256            self.respond_subscribe(responder, item);
257            return;
258        }
259
260        // Store the responder
261        self.waiters
262            .entry(digest)
263            .or_default()
264            .push(Waiter { responder });
265    }
266
267    /// Handles a `get` request from the application.
268    fn handle_get(&mut self, digest: M::Digest, responder: oneshot::Sender<Option<M>>) {
269        let item = self.items.get(&digest).cloned();
270        self.respond_get(responder, item);
271    }
272
273    /// Handles a message that was received from a peer.
274    fn handle_network(&mut self, peer: P, msg: M) {
275        let digest = msg.digest();
276        match self.insert_message(peer.clone(), digest, msg) {
277            InsertMessageResult::Inserted => {
278                self.metrics.receive.inc(Status::Success);
279            }
280            InsertMessageResult::Duplicate => {
281                debug!(?peer, "message already stored");
282                self.metrics.receive.inc(Status::Dropped);
283            }
284            InsertMessageResult::Ineligible => {
285                debug!(?peer, "message from peer outside latest.primary not cached");
286                self.metrics.receive.inc(Status::Dropped);
287            }
288        }
289    }
290
291    ////////////////////////////////////////
292    // Cache Management
293    ////////////////////////////////////////
294
295    /// Inserts a message into the cache.
296    ///
297    /// Waiters are notified even when a sender is not eligible to keep a
298    /// buffered cache entry resident.
299    fn insert_message(&mut self, peer: P, digest: M::Digest, msg: M) -> InsertMessageResult {
300        // Send the message to the waiters, if any
301        if let Some(waiters) = self.waiters.remove(&digest) {
302            for waiter in waiters {
303                self.respond_subscribe(waiter.responder, msg.clone());
304            }
305        }
306
307        // Only peers listed in `latest.primary` may buffer
308        if self.latest_primary_peers.position(&peer).is_none() {
309            return InsertMessageResult::Ineligible;
310        }
311
312        // Get the relevant deque for the peer
313        let deque = self
314            .deques
315            .entry(peer)
316            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
317
318        // If the message is already in the deque, move it to the front and return early
319        if let Some(i) = deque.iter().position(|d| *d == digest) {
320            if i != 0 {
321                let v = deque.remove(i).unwrap(); // Must exist
322                deque.push_front(v);
323            }
324            return InsertMessageResult::Duplicate;
325        };
326
327        // - Insert the digest into the peer cache
328        // - Increment the item count
329        // - Insert the message if-and-only-if the new item count is 1
330        deque.push_front(digest);
331        let count = self
332            .counts
333            .entry(digest)
334            .and_modify(|c| *c = c.checked_add(1).unwrap())
335            .or_insert(1);
336        if *count == 1 {
337            let existing = self.items.insert(digest, msg);
338            assert!(existing.is_none());
339        }
340
341        // If the cache is full...
342        if deque.len() > self.deque_size {
343            // Remove the oldest item from the peer cache
344            // Decrement the item count
345            // Remove the message if-and-only-if the new item count is 0
346            let stale = deque.pop_back().unwrap();
347            decrement_digest_refcount(&mut self.counts, &mut self.items, &stale);
348        }
349
350        InsertMessageResult::Inserted
351    }
352
353    fn update_latest_primary_peers(&mut self, peers: Set<P>) {
354        for (peer, deque) in self
355            .deques
356            .extract_if(.., |peer, _| peers.position(peer).is_none())
357        {
358            debug!(?peer, digests = deque.len(), "evicting disconnected peer");
359            for digest in deque {
360                decrement_digest_refcount(&mut self.counts, &mut self.items, &digest);
361            }
362        }
363        self.latest_primary_peers = peers;
364    }
365
366    ////////////////////////////////////////
367    // Utilities
368    ////////////////////////////////////////
369
370    /// Remove all waiters that have dropped receivers.
371    fn cleanup_waiters(&mut self) {
372        self.waiters.retain(|_, waiters| {
373            let initial_len = waiters.len();
374            waiters.retain(|waiter| !waiter.responder.is_closed());
375            let dropped_count = initial_len - waiters.len();
376
377            // Increment metrics for each dropped waiter
378            for _ in 0..dropped_count {
379                self.metrics.get.inc(Status::Dropped);
380            }
381
382            !waiters.is_empty()
383        });
384    }
385
386    /// Respond to a waiter with a message.
387    /// Increments the appropriate metric based on the result.
388    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
389        self.metrics.subscribe.inc(if responder.send_lossy(msg) {
390            Status::Success
391        } else {
392            Status::Dropped
393        });
394    }
395
396    /// Respond to a get request.
397    /// Increments the appropriate metric based on the result.
398    fn respond_get(&mut self, responder: oneshot::Sender<Option<M>>, msg: Option<M>) {
399        let found = msg.is_some();
400        self.metrics.get.inc(if responder.send_lossy(msg) {
401            if found {
402                Status::Success
403            } else {
404                Status::Failure
405            }
406        } else {
407            Status::Dropped
408        });
409    }
410}
411
412/// Decrement a digest refcount and evict it from cache when no references remain.
413fn decrement_digest_refcount<D: Ord, M>(
414    counts: &mut BTreeMap<D, usize>,
415    items: &mut BTreeMap<D, M>,
416    digest: &D,
417) {
418    let should_remove = {
419        let count = counts.get_mut(digest).expect("count must exist");
420        *count = count.checked_sub(1).expect("count must be > 0");
421        *count == 0
422    };
423    if should_remove {
424        let existing = counts.remove(digest);
425        assert!(existing == Some(0));
426        items.remove(digest);
427    }
428}