commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::{Codec, Config as CodecCfg};
4use commonware_cryptography::{Digest, Digestible};
5use commonware_macros::select;
6use commonware_p2p::{
7    utils::codec::{wrap, WrappedSender},
8    Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11    telemetry::metrics::status::{CounterExt, Status},
12    Clock, Handle, Metrics, Spawner,
13};
14use commonware_utils::Array;
15use futures::{
16    channel::{mpsc, oneshot},
17    StreamExt,
18};
19use std::collections::{HashMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22/// Instance of the main engine for the module.
23///
24/// It is responsible for:
25/// - Broadcasting messages to the network
26/// - Receiving messages from the network
27/// - Storing messages in the cache
28/// - Responding to requests from the application
29pub struct Engine<
30    E: Clock + Spawner + Metrics,
31    P: Array,
32    D: Digest,
33    Cfg: CodecCfg,
34    M: Digestible<D> + Codec<Cfg>,
35> {
36    ////////////////////////////////////////
37    // Interfaces
38    ////////////////////////////////////////
39    context: E,
40
41    ////////////////////////////////////////
42    // Configuration
43    ////////////////////////////////////////
44    /// My public key
45    public_key: P,
46
47    /// Whether messages are sent as priority
48    priority: bool,
49
50    /// Number of messages to cache per sender
51    deque_size: usize,
52
53    /// Configuration for decoding messages
54    codec_config: Cfg,
55
56    ////////////////////////////////////////
57    // Messaging
58    ////////////////////////////////////////
59    /// The mailbox for receiving messages.
60    mailbox_receiver: mpsc::Receiver<Message<P, D, M>>,
61
62    /// Pending requests from the application.
63    waiters: HashMap<D, Vec<oneshot::Sender<M>>>,
64
65    ////////////////////////////////////////
66    // Cache
67    ////////////////////////////////////////
68    /// All cached messages by digest.
69    items: HashMap<D, M>,
70
71    /// A LRU cache of the latest received digests from each peer.
72    ///
73    /// This is used to limit the number of digests stored per peer.
74    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
75    /// membership checks are done in linear time.
76    deques: HashMap<P, VecDeque<D>>,
77
78    /// The number of times each digest exists in one of the deques.
79    ///
80    /// This is because multiple peers can send the same message.
81    counts: HashMap<D, usize>,
82
83    ////////////////////////////////////////
84    // Metrics
85    ////////////////////////////////////////
86    /// Metrics
87    metrics: metrics::Metrics,
88}
89
90impl<
91        E: Clock + Spawner + Metrics,
92        P: Array,
93        D: Digest,
94        Cfg: CodecCfg,
95        M: Digestible<D> + Codec<Cfg>,
96    > Engine<E, P, D, Cfg, M>
97{
98    /// Creates a new engine with the given context and configuration.
99    /// Returns the engine and a mailbox for sending messages to the engine.
100    pub fn new(context: E, cfg: Config<Cfg, P>) -> (Self, Mailbox<P, D, M>) {
101        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
102        let mailbox = Mailbox::<P, D, M>::new(mailbox_sender);
103        let metrics = metrics::Metrics::init(context.clone());
104
105        let result = Self {
106            context,
107            public_key: cfg.public_key,
108            priority: cfg.priority,
109            deque_size: cfg.deque_size,
110            codec_config: cfg.codec_config,
111            mailbox_receiver,
112            waiters: HashMap::new(),
113            deques: HashMap::new(),
114            items: HashMap::new(),
115            counts: HashMap::new(),
116            metrics,
117        };
118
119        (result, mailbox)
120    }
121
122    /// Starts the engine with the given network.
123    pub fn start(
124        mut self,
125        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
126    ) -> Handle<()> {
127        self.context.spawn_ref()(self.run(network))
128    }
129
130    /// Inner run loop called by `start`.
131    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
132        let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
133        let mut shutdown = self.context.stopped();
134
135        loop {
136            // Cleanup waiters
137            self.cleanup_waiters();
138            self.metrics.waiters.set(self.waiters.len() as i64);
139
140            select! {
141                // Handle shutdown signal
142                _ = &mut shutdown => {
143                    debug!("shutdown");
144                },
145
146                // Handle mailbox messages
147                mail = self.mailbox_receiver.next() => {
148                    let Some(msg) = mail else {
149                        error!("mailbox receiver failed");
150                        break;
151                    };
152                    match msg {
153                        Message::Broadcast{ message, responder } => {
154                            trace!("mailbox: broadcast");
155                            self.handle_broadcast(&mut sender, message, responder).await;
156                        }
157                        Message::Subscribe{ digest, responder } => {
158                            trace!("mailbox: subscribe");
159                            self.handle_subscribe(digest, responder).await;
160                        }
161                        Message::Get{ digest, responder } => {
162                            trace!("mailbox: get");
163                            self.handle_get(digest, responder).await;
164                        }
165                    }
166                },
167
168                // Handle incoming messages
169                msg = receiver.recv() => {
170                    // Error handling
171                    let (peer, msg) = match msg {
172                        Ok(r) => r,
173                        Err(err) => {
174                            error!(?err, "receiver failed");
175                            break;
176                        }
177                    };
178
179                    // Decode the message
180                    let msg = match msg {
181                        Ok(msg) => msg,
182                        Err(err) => {
183                            warn!(?err, ?peer, "failed to decode message");
184                            self.metrics.receive.inc(Status::Invalid);
185                            continue;
186                        }
187                    };
188
189                    trace!(?peer, "network");
190                    self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
191                    self.handle_network(peer, msg).await;
192                },
193            }
194        }
195    }
196
197    ////////////////////////////////////////
198    // Handling
199    ////////////////////////////////////////
200
201    /// Handles a `broadcast` request from the application.
202    async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
203        &mut self,
204        sender: &mut WrappedSender<Sr, Cfg, M>,
205        msg: M,
206        responder: oneshot::Sender<Vec<P>>,
207    ) {
208        // Store the message, continue even if it was already stored
209        let _ = self.insert_message(self.public_key.clone(), msg.clone());
210
211        // Broadcast the message to the network
212        let recipients = Recipients::All;
213        let sent_to = sender
214            .send(recipients, msg, self.priority)
215            .await
216            .unwrap_or_else(|err| {
217                error!(?err, "failed to send message");
218                vec![]
219            });
220        let _ = responder.send(sent_to);
221    }
222
223    /// Handles a `subscribe` request from the application.
224    ///
225    /// If the message is already in the cache, the responder is immediately sent the message.
226    /// Otherwise, the responder is stored in the waiters list.
227    async fn handle_subscribe(&mut self, digest: D, responder: oneshot::Sender<M>) {
228        // Check if the message is already in the cache
229        if let Some(msg) = self.items.get(&digest) {
230            self.respond_subscribe(responder, msg.clone());
231            return;
232        }
233
234        // Store the responder
235        self.waiters.entry(digest).or_default().push(responder);
236    }
237
238    /// Handles a `get` request from the application.
239    async fn handle_get(&mut self, digest: D, responder: oneshot::Sender<Option<M>>) {
240        let item = self.items.get(&digest).cloned();
241        self.respond_get(responder, item);
242    }
243
244    /// Handles a message that was received from a peer.
245    async fn handle_network(&mut self, peer: P, msg: M) {
246        if !self.insert_message(peer.clone(), msg) {
247            debug!(?peer, "message already stored");
248            self.metrics.receive.inc(Status::Dropped);
249            return;
250        }
251
252        self.metrics.receive.inc(Status::Success);
253    }
254
255    ////////////////////////////////////////
256    // Cache Management
257    ////////////////////////////////////////
258
259    /// Inserts a message into the cache.
260    ///
261    /// Returns `true` if the message was inserted, `false` if it was already present.
262    /// Updates the deque, item count, and message cache, potentially evicting an old message.
263    fn insert_message(&mut self, peer: P, msg: M) -> bool {
264        let digest = msg.digest();
265
266        // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
267        if let Some(responders) = self.waiters.remove(&digest) {
268            for responder in responders {
269                self.respond_subscribe(responder, msg.clone());
270            }
271        }
272
273        // Get the relevant deque for the peer
274        let deque = self
275            .deques
276            .entry(peer)
277            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
278
279        // If the message is already in the deque, move it to the front and return early
280        if let Some(i) = deque.iter().position(|d| *d == digest) {
281            if i != 0 {
282                deque.remove(i).unwrap(); // Must exist
283                deque.push_front(digest);
284            }
285            return false;
286        };
287
288        // - Insert the message into the peer cache
289        // - Increment the item count
290        // - Insert the message if-and-only-if the new item count is 1
291        deque.push_front(digest);
292        let count = self
293            .counts
294            .entry(digest)
295            .and_modify(|c| *c = c.checked_add(1).unwrap())
296            .or_insert(1);
297        if *count == 1 {
298            let existing = self.items.insert(digest, msg);
299            assert!(existing.is_none());
300        }
301
302        // If the cache is full...
303        if deque.len() > self.deque_size {
304            // Remove the oldest digest from the peer cache
305            // Decrement the item count
306            // Remove the message if-and-only-if the new item count is 0
307            let stale = deque.pop_back().unwrap();
308            let count = self
309                .counts
310                .entry(stale)
311                .and_modify(|c| *c = c.checked_sub(1).unwrap())
312                .or_insert_with(|| unreachable!());
313            if *count == 0 {
314                let existing = self.counts.remove(&stale);
315                assert!(existing == Some(0));
316                self.items.remove(&stale).unwrap(); // Must have existed
317            }
318        }
319
320        true
321    }
322
323    ////////////////////////////////////////
324    // Utilities
325    ////////////////////////////////////////
326
327    /// Remove all waiters that have dropped receivers.
328    fn cleanup_waiters(&mut self) {
329        self.waiters.retain(|_, waiters| {
330            let initial_len = waiters.len();
331            waiters.retain(|waiter| !waiter.is_canceled());
332            let dropped_count = initial_len - waiters.len();
333
334            // Increment metrics for each dropped waiter
335            for _ in 0..dropped_count {
336                self.metrics.get.inc(Status::Dropped);
337            }
338
339            !waiters.is_empty()
340        });
341    }
342
343    /// Respond to a waiter with a message.
344    /// Increments the appropriate metric based on the result.
345    fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
346        let result = responder.send(msg);
347        self.metrics.subscribe.inc(match result {
348            Ok(_) => Status::Success,
349            Err(_) => Status::Dropped,
350        });
351    }
352
353    /// Respond to a waiter with an optional message.
354    /// Increments the appropriate metric based on the result.
355    fn respond_get(&mut self, responder: oneshot::Sender<Option<M>>, msg: Option<M>) {
356        let found = msg.is_some();
357        let result = responder.send(msg);
358        self.metrics.get.inc(match result {
359            Ok(_) if found => Status::Success,
360            Ok(_) => Status::Failure,
361            Err(_) => Status::Dropped,
362        });
363    }
364}