commonware_broadcast/buffered/
engine.rs

1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use bytes::Bytes;
4use commonware_codec::Codec;
5use commonware_cryptography::{Digest, Digestible};
6use commonware_macros::select;
7use commonware_p2p::{Receiver, Recipients, Sender};
8use commonware_runtime::{
9    telemetry::metrics::status::{CounterExt, Status},
10    Clock, Handle, Metrics, Spawner,
11};
12use commonware_utils::Array;
13use futures::{
14    channel::{mpsc, oneshot},
15    StreamExt,
16};
17use std::{
18    collections::{HashMap, VecDeque},
19    marker::PhantomData,
20};
21use tracing::{debug, error, trace, warn};
22
23/// Instance of the main engine for the module.
24///
25/// It is responsible for:
26/// - Broadcasting messages to the network
27/// - Receiving messages from the network
28/// - Storing messages in the cache
29/// - Responding to requests from the application
30pub struct Engine<
31    E: Clock + Spawner + Metrics,
32    P: Array,
33    D: Digest,
34    M: Digestible<D> + Codec,
35    NetS: Sender<PublicKey = P>,
36    NetR: Receiver<PublicKey = P>,
37> {
38    ////////////////////////////////////////
39    // Interfaces
40    ////////////////////////////////////////
41    context: E,
42    _phantom: PhantomData<(NetS, NetR)>,
43
44    ////////////////////////////////////////
45    // Configuration
46    ////////////////////////////////////////
47    /// My public key
48    public_key: P,
49
50    /// Whether messages are sent as priority
51    priority: bool,
52
53    /// Number of messages to cache per sender
54    deque_size: usize,
55
56    ////////////////////////////////////////
57    // Messaging
58    ////////////////////////////////////////
59    /// The mailbox for receiving messages.
60    mailbox_receiver: mpsc::Receiver<Message<D, M>>,
61
62    /// Pending requests from the application.
63    waiters: HashMap<D, Vec<oneshot::Sender<M>>>,
64
65    ////////////////////////////////////////
66    // Cache
67    ////////////////////////////////////////
68    /// All cached messages by digest.
69    items: HashMap<D, M>,
70
71    /// A LRU cache of the latest received digests from each peer.
72    ///
73    /// This is used to limit the number of digests stored per peer.
74    /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
75    /// membership checks are done in linear time.
76    deques: HashMap<P, VecDeque<D>>,
77
78    /// The number of times each digest exists in one of the deques.
79    ///
80    /// This is because multiple peers can send the same message.
81    counts: HashMap<D, usize>,
82
83    ////////////////////////////////////////
84    // Metrics
85    ////////////////////////////////////////
86    /// Metrics
87    metrics: metrics::Metrics,
88}
89
90impl<
91        E: Clock + Spawner + Metrics,
92        P: Array,
93        D: Digest,
94        M: Digestible<D> + Codec,
95        NetS: Sender<PublicKey = P>,
96        NetR: Receiver<PublicKey = P>,
97    > Engine<E, P, D, M, NetS, NetR>
98{
99    /// Creates a new engine with the given context and configuration.
100    /// Returns the engine and a mailbox for sending messages to the engine.
101    pub fn new(context: E, cfg: Config<P>) -> (Self, Mailbox<D, M>) {
102        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
103        let mailbox = Mailbox::<D, M>::new(mailbox_sender);
104        let metrics = metrics::Metrics::init(context.clone());
105
106        let result = Self {
107            context,
108            _phantom: PhantomData,
109            public_key: cfg.public_key,
110            priority: cfg.priority,
111            deque_size: cfg.deque_size,
112            mailbox_receiver,
113            waiters: HashMap::new(),
114            deques: HashMap::new(),
115            items: HashMap::new(),
116            counts: HashMap::new(),
117            metrics,
118        };
119
120        (result, mailbox)
121    }
122
123    /// Starts the engine with the given network.
124    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
125        self.context.spawn_ref()(self.run(network))
126    }
127
128    /// Inner run loop called by `start`.
129    async fn run(mut self, network: (NetS, NetR)) {
130        let (mut net_sender, mut net_receiver) = network;
131        let mut shutdown = self.context.stopped();
132
133        loop {
134            // Cleanup waiters
135            self.cleanup_waiters();
136            self.metrics.waiters.set(self.waiters.len() as i64);
137
138            select! {
139                // Handle shutdown signal
140                _ = &mut shutdown => {
141                    debug!("shutdown");
142                },
143
144                // Handle mailbox messages
145                mail = self.mailbox_receiver.next() => {
146                    let Some(msg) = mail else {
147                        error!("mailbox receiver failed");
148                        break;
149                    };
150                    match msg {
151                        Message::Broadcast{ message } => {
152                            trace!("mailbox: broadcast");
153                            self.handle_broadcast(&mut net_sender, message).await;
154                        }
155                        Message::Get{ digest, responder } => {
156                            trace!("mailbox: get");
157                            self.handle_get(digest, responder).await;
158                        }
159                    }
160                },
161
162                // Handle incoming messages
163                msg = net_receiver.recv() => {
164                    // Error handling
165                    let (peer, msg) = match msg {
166                        Ok(r) => r,
167                        Err(err) => {
168                            error!(?err, "receiver failed");
169                            break;
170                        }
171                    };
172
173                    // Decode the message
174                    let message = match M::decode(msg) {
175                        Ok(message) => message,
176                        Err(err) => {
177                            warn!(?err, ?peer, "failed to decode message");
178                            self.metrics.receive.inc(Status::Invalid);
179                            continue;
180                        }
181                    };
182
183                    trace!(?peer, "network");
184                    self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
185                    self.handle_network(peer, message).await;
186                },
187            }
188        }
189    }
190
191    ////////////////////////////////////////
192    // Handling
193    ////////////////////////////////////////
194
195    /// Handles a `broadcast` request from the application.
196    async fn handle_broadcast(&mut self, net_sender: &mut NetS, msg: M) {
197        // Store the message, continue even if it was already stored
198        let _ = self.insert_message(self.public_key.clone(), msg.clone());
199
200        // Broadcast the message to the network
201        let recipients = Recipients::All;
202        let msg = Bytes::from(msg.encode());
203        if let Err(err) = net_sender.send(recipients, msg, self.priority).await {
204            warn!(?err, "failed to send message");
205        }
206    }
207
208    /// Handles a `get` request from the application.
209    ///
210    /// If the message is already in the cache, the responder is immediately sent the message.
211    /// Otherwise, the responder is stored in the waiters list.
212    async fn handle_get(&mut self, digest: D, responder: oneshot::Sender<M>) {
213        // Check if the message is already in the cache
214        if let Some(msg) = self.items.get(&digest) {
215            self.respond(responder, msg.clone());
216            return;
217        }
218
219        // Store the responder
220        self.waiters.entry(digest).or_default().push(responder);
221    }
222
223    /// Handles a message that was received from a peer.
224    async fn handle_network(&mut self, peer: P, msg: M) {
225        if !self.insert_message(peer.clone(), msg) {
226            debug!(?peer, "message already stored");
227            self.metrics.receive.inc(Status::Dropped);
228            return;
229        }
230
231        self.metrics.receive.inc(Status::Success);
232    }
233
234    ////////////////////////////////////////
235    // Cache Management
236    ////////////////////////////////////////
237
238    /// Inserts a message into the cache.
239    ///
240    /// Returns `true` if the message was inserted, `false` if it was already present.
241    /// Updates the deque, item count, and message cache, potentially evicting an old message.
242    fn insert_message(&mut self, peer: P, msg: M) -> bool {
243        let digest = msg.digest();
244
245        // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
246        if let Some(responders) = self.waiters.remove(&digest) {
247            for responder in responders {
248                self.respond(responder, msg.clone());
249            }
250        }
251
252        // Get the relevant deque for the peer
253        let deque = self
254            .deques
255            .entry(peer)
256            .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
257
258        // If the message is already in the deque, move it to the front and return early
259        if let Some(i) = deque.iter().position(|d| *d == digest) {
260            if i != 0 {
261                deque.remove(i).unwrap(); // Must exist
262                deque.push_front(digest);
263            }
264            return false;
265        };
266
267        // - Insert the message into the peer cache
268        // - Increment the item count
269        // - Insert the message if-and-only-if the new item count is 1
270        deque.push_front(digest);
271        let count = self
272            .counts
273            .entry(digest)
274            .and_modify(|c| *c = c.checked_add(1).unwrap())
275            .or_insert(1);
276        if *count == 1 {
277            let existing = self.items.insert(digest, msg);
278            assert!(existing.is_none());
279        }
280
281        // If the cache is full...
282        if deque.len() > self.deque_size {
283            // Remove the oldest digest from the peer cache
284            // Decrement the item count
285            // Remove the message if-and-only-if the new item count is 0
286            let stale = deque.pop_back().unwrap();
287            let count = self
288                .counts
289                .entry(stale)
290                .and_modify(|c| *c = c.checked_sub(1).unwrap())
291                .or_insert_with(|| unreachable!());
292            if *count == 0 {
293                let existing = self.counts.remove(&stale);
294                assert!(existing == Some(0));
295                self.items.remove(&stale).unwrap(); // Must have existed
296            }
297        }
298
299        true
300    }
301
302    ////////////////////////////////////////
303    // Utilities
304    ////////////////////////////////////////
305
306    /// Remove all waiters that have dropped receivers.
307    fn cleanup_waiters(&mut self) {
308        self.waiters.retain(|_, waiters| {
309            let initial_len = waiters.len();
310            waiters.retain(|waiter| !waiter.is_canceled());
311            let dropped_count = initial_len - waiters.len();
312
313            // Increment metrics for each dropped waiter
314            for _ in 0..dropped_count {
315                self.metrics.get.inc(Status::Dropped);
316            }
317
318            !waiters.is_empty()
319        });
320    }
321
322    /// Respond to a waiter with a message.
323    /// Increments the appropriate metric based on the result.
324    fn respond(&mut self, responder: oneshot::Sender<M>, msg: M) {
325        let result = responder.send(msg);
326        self.metrics.get.inc(match result {
327            Ok(_) => Status::Success,
328            Err(_) => Status::Dropped,
329        });
330    }
331}