commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use bytes::Bytes;
4use commonware_codec::{Codec, Config as CodecCfg};
5use commonware_cryptography::{Digest, Digestible};
6use commonware_macros::select;
7use commonware_p2p::{Receiver, Recipients, Sender};
8use commonware_runtime::{
9    telemetry::metrics::status::{CounterExt, Status},
10    Clock, Handle, Metrics, Spawner,
11};
12use commonware_utils::Array;
13use futures::{
14    channel::{mpsc, oneshot},
15    StreamExt,
16};
17use std::{
18    collections::{HashMap, VecDeque},
19    marker::PhantomData,
20};
21use tracing::{debug, error, trace, warn};
22
23/// Instance of the main engine for the module.
24///
25/// It is responsible for:
26/// - Broadcasting messages to the network
27/// - Receiving messages from the network
28/// - Storing messages in the cache
29/// - Responding to requests from the application
30pub struct Engine<
31    E: Clock + Spawner + Metrics,
32    P: Array,
33    D: Digest,
34    Cfg: CodecCfg,
35    M: Digestible<D> + Codec<Cfg>,
36    NetS: Sender<PublicKey = P>,
37    NetR: Receiver<PublicKey = P>,
38> {
39    ////////////////////////////////////////
40    // Interfaces
41    ////////////////////////////////////////
42    context: E,
43    _phantom: PhantomData<(NetS, NetR)>,
44
45    ////////////////////////////////////////
46    // Configuration
47    ////////////////////////////////////////
48    /// My public key
49    public_key: P,
50
51    /// Whether messages are sent as priority
52    priority: bool,
53
54    /// Number of messages to cache per sender
55    deque_size: usize,
56
57    /// Configuration for decoding messages
58    decode_config: Cfg,
59
60    ////////////////////////////////////////
61    // Messaging
62    ////////////////////////////////////////
63    /// The mailbox for receiving messages.
64    mailbox_receiver: mpsc::Receiver<Message<D, M>>,
65
66    /// Pending requests from the application.
67    waiters: HashMap<D, Vec<oneshot::Sender<M>>>,
68
69    ////////////////////////////////////////
70    // Cache
71    ////////////////////////////////////////
72    /// All cached messages by digest.
73    items: HashMap<D, M>,
74
75    /// A LRU cache of the latest received digests from each peer.
76    ///
77    /// This is used to limit the number of digests stored per peer.
78    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
79    /// membership checks are done in linear time.
80    deques: HashMap<P, VecDeque<D>>,
81
82    /// The number of times each digest exists in one of the deques.
83    ///
84    /// This is because multiple peers can send the same message.
85    counts: HashMap<D, usize>,
86
87    ////////////////////////////////////////
88    // Metrics
89    ////////////////////////////////////////
90    /// Metrics
91    metrics: metrics::Metrics,
92}
93
94impl<
95        E: Clock + Spawner + Metrics,
96        P: Array,
97        D: Digest,
98        Cfg: CodecCfg,
99        M: Digestible<D> + Codec<Cfg>,
100        NetS: Sender<PublicKey = P>,
101        NetR: Receiver<PublicKey = P>,
102    > Engine<E, P, D, Cfg, M, NetS, NetR>
103{
104    /// Creates a new engine with the given context and configuration.
105    /// Returns the engine and a mailbox for sending messages to the engine.
106    pub fn new(context: E, cfg: Config<Cfg, P>) -> (Self, Mailbox<D, M>) {
107        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
108        let mailbox = Mailbox::<D, M>::new(mailbox_sender);
109        let metrics = metrics::Metrics::init(context.clone());
110
111        let result = Self {
112            context,
113            _phantom: PhantomData,
114            public_key: cfg.public_key,
115            priority: cfg.priority,
116            deque_size: cfg.deque_size,
117            decode_config: cfg.decode_config,
118            mailbox_receiver,
119            waiters: HashMap::new(),
120            deques: HashMap::new(),
121            items: HashMap::new(),
122            counts: HashMap::new(),
123            metrics,
124        };
125
126        (result, mailbox)
127    }
128
129    /// Starts the engine with the given network.
130    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
131        self.context.spawn_ref()(self.run(network))
132    }
133
134    /// Inner run loop called by `start`.
135    async fn run(mut self, network: (NetS, NetR)) {
136        let (mut net_sender, mut net_receiver) = network;
137        let mut shutdown = self.context.stopped();
138
139        loop {
140            // Cleanup waiters
141            self.cleanup_waiters();
142            self.metrics.waiters.set(self.waiters.len() as i64);
143
144            select! {
145                // Handle shutdown signal
146                _ = &mut shutdown => {
147                    debug!("shutdown");
148                },
149
150                // Handle mailbox messages
151                mail = self.mailbox_receiver.next() => {
152                    let Some(msg) = mail else {
153                        error!("mailbox receiver failed");
154                        break;
155                    };
156                    match msg {
157                        Message::Broadcast{ message } => {
158                            trace!("mailbox: broadcast");
159                            self.handle_broadcast(&mut net_sender, message).await;
160                        }
161                        Message::Get{ digest, responder } => {
162                            trace!("mailbox: get");
163                            self.handle_get(digest, responder).await;
164                        }
165                    }
166                },
167
168                // Handle incoming messages
169                msg = net_receiver.recv() => {
170                    // Error handling
171                    let (peer, msg) = match msg {
172                        Ok(r) => r,
173                        Err(err) => {
174                            error!(?err, "receiver failed");
175                            break;
176                        }
177                    };
178
179                    // Decode the message
180                    let message = match M::decode_cfg(msg, &self.decode_config) {
181                        Ok(message) => message,
182                        Err(err) => {
183                            warn!(?err, ?peer, "failed to decode message");
184                            self.metrics.receive.inc(Status::Invalid);
185                            continue;
186                        }
187                    };
188
189                    trace!(?peer, "network");
190                    self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
191                    self.handle_network(peer, message).await;
192                },
193            }
194        }
195    }
196
197    ////////////////////////////////////////
198    // Handling
199    ////////////////////////////////////////
200
201    /// Handles a `broadcast` request from the application.
202    async fn handle_broadcast(&mut self, net_sender: &mut NetS, msg: M) {
203        // Store the message, continue even if it was already stored
204        let _ = self.insert_message(self.public_key.clone(), msg.clone());
205
206        // Broadcast the message to the network
207        let recipients = Recipients::All;
208        let msg = Bytes::from(msg.encode());
209        if let Err(err) = net_sender.send(recipients, msg, self.priority).await {
210            warn!(?err, "failed to send message");
211        }
212    }
213
214    /// Handles a `get` request from the application.
215    ///
216    /// If the message is already in the cache, the responder is immediately sent the message.
217    /// Otherwise, the responder is stored in the waiters list.
218    async fn handle_get(&mut self, digest: D, responder: oneshot::Sender<M>) {
219        // Check if the message is already in the cache
220        if let Some(msg) = self.items.get(&digest) {
221            self.respond(responder, msg.clone());
222            return;
223        }
224
225        // Store the responder
226        self.waiters.entry(digest).or_default().push(responder);
227    }
228
229    /// Handles a message that was received from a peer.
230    async fn handle_network(&mut self, peer: P, msg: M) {
231        if !self.insert_message(peer.clone(), msg) {
232            debug!(?peer, "message already stored");
233            self.metrics.receive.inc(Status::Dropped);
234            return;
235        }
236
237        self.metrics.receive.inc(Status::Success);
238    }
239
240    ////////////////////////////////////////
241    // Cache Management
242    ////////////////////////////////////////
243
244    /// Inserts a message into the cache.
245    ///
246    /// Returns `true` if the message was inserted, `false` if it was already present.
247    /// Updates the deque, item count, and message cache, potentially evicting an old message.
248    fn insert_message(&mut self, peer: P, msg: M) -> bool {
249        let digest = msg.digest();
250
251        // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
252        if let Some(responders) = self.waiters.remove(&digest) {
253            for responder in responders {
254                self.respond(responder, msg.clone());
255            }
256        }
257
258        // Get the relevant deque for the peer
259        let deque = self
260            .deques
261            .entry(peer)
262            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
263
264        // If the message is already in the deque, move it to the front and return early
265        if let Some(i) = deque.iter().position(|d| *d == digest) {
266            if i != 0 {
267                deque.remove(i).unwrap(); // Must exist
268                deque.push_front(digest);
269            }
270            return false;
271        };
272
273        // - Insert the message into the peer cache
274        // - Increment the item count
275        // - Insert the message if-and-only-if the new item count is 1
276        deque.push_front(digest);
277        let count = self
278            .counts
279            .entry(digest)
280            .and_modify(|c| *c = c.checked_add(1).unwrap())
281            .or_insert(1);
282        if *count == 1 {
283            let existing = self.items.insert(digest, msg);
284            assert!(existing.is_none());
285        }
286
287        // If the cache is full...
288        if deque.len() > self.deque_size {
289            // Remove the oldest digest from the peer cache
290            // Decrement the item count
291            // Remove the message if-and-only-if the new item count is 0
292            let stale = deque.pop_back().unwrap();
293            let count = self
294                .counts
295                .entry(stale)
296                .and_modify(|c| *c = c.checked_sub(1).unwrap())
297                .or_insert_with(|| unreachable!());
298            if *count == 0 {
299                let existing = self.counts.remove(&stale);
300                assert!(existing == Some(0));
301                self.items.remove(&stale).unwrap(); // Must have existed
302            }
303        }
304
305        true
306    }
307
308    ////////////////////////////////////////
309    // Utilities
310    ////////////////////////////////////////
311
312    /// Remove all waiters that have dropped receivers.
313    fn cleanup_waiters(&mut self) {
314        self.waiters.retain(|_, waiters| {
315            let initial_len = waiters.len();
316            waiters.retain(|waiter| !waiter.is_canceled());
317            let dropped_count = initial_len - waiters.len();
318
319            // Increment metrics for each dropped waiter
320            for _ in 0..dropped_count {
321                self.metrics.get.inc(Status::Dropped);
322            }
323
324            !waiters.is_empty()
325        });
326    }
327
328    /// Respond to a waiter with a message.
329    /// Increments the appropriate metric based on the result.
330    fn respond(&mut self, responder: oneshot::Sender<M>, msg: M) {
331        let result = responder.send(msg);
332        self.metrics.get.inc(match result {
333            Ok(_) => Status::Success,
334            Err(_) => Status::Dropped,
335        });
336    }
337}