commonware_broadcast/buffered/engine.rs
1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use bytes::Bytes;
4use commonware_codec::Codec;
5use commonware_cryptography::{Digest, Digestible};
6use commonware_macros::select;
7use commonware_p2p::{Receiver, Recipients, Sender};
8use commonware_runtime::{
9 telemetry::metrics::status::{CounterExt, Status},
10 Clock, Handle, Metrics, Spawner,
11};
12use commonware_utils::Array;
13use futures::{
14 channel::{mpsc, oneshot},
15 StreamExt,
16};
17use std::{
18 collections::{HashMap, VecDeque},
19 marker::PhantomData,
20};
21use tracing::{debug, error, trace, warn};
22
23/// Instance of the main engine for the module.
24///
25/// It is responsible for:
26/// - Broadcasting messages to the network
27/// - Receiving messages from the network
28/// - Storing messages in the cache
29/// - Responding to requests from the application
30pub struct Engine<
31 E: Clock + Spawner + Metrics,
32 P: Array,
33 D: Digest,
34 M: Digestible<D> + Codec,
35 NetS: Sender<PublicKey = P>,
36 NetR: Receiver<PublicKey = P>,
37> {
38 ////////////////////////////////////////
39 // Interfaces
40 ////////////////////////////////////////
41 context: E,
42 _phantom: PhantomData<(NetS, NetR)>,
43
44 ////////////////////////////////////////
45 // Configuration
46 ////////////////////////////////////////
47 /// My public key
48 public_key: P,
49
50 /// Whether messages are sent as priority
51 priority: bool,
52
53 /// Number of messages to cache per sender
54 deque_size: usize,
55
56 ////////////////////////////////////////
57 // Messaging
58 ////////////////////////////////////////
59 /// The mailbox for receiving messages.
60 mailbox_receiver: mpsc::Receiver<Message<D, M>>,
61
62 /// Pending requests from the application.
63 waiters: HashMap<D, Vec<oneshot::Sender<M>>>,
64
65 ////////////////////////////////////////
66 // Cache
67 ////////////////////////////////////////
68 /// All cached messages by digest.
69 items: HashMap<D, M>,
70
71 /// A LRU cache of the latest received digests from each peer.
72 ///
73 /// This is used to limit the number of digests stored per peer.
74 /// At most `deque_size` digests are stored per peer. This value is expected to be small, so
75 /// membership checks are done in linear time.
76 deques: HashMap<P, VecDeque<D>>,
77
78 /// The number of times each digest exists in one of the deques.
79 ///
80 /// This is because multiple peers can send the same message.
81 counts: HashMap<D, usize>,
82
83 ////////////////////////////////////////
84 // Metrics
85 ////////////////////////////////////////
86 /// Metrics
87 metrics: metrics::Metrics,
88}
89
90impl<
91 E: Clock + Spawner + Metrics,
92 P: Array,
93 D: Digest,
94 M: Digestible<D> + Codec,
95 NetS: Sender<PublicKey = P>,
96 NetR: Receiver<PublicKey = P>,
97 > Engine<E, P, D, M, NetS, NetR>
98{
99 /// Creates a new engine with the given context and configuration.
100 /// Returns the engine and a mailbox for sending messages to the engine.
101 pub fn new(context: E, cfg: Config<P>) -> (Self, Mailbox<D, M>) {
102 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
103 let mailbox = Mailbox::<D, M>::new(mailbox_sender);
104 let metrics = metrics::Metrics::init(context.clone());
105
106 let result = Self {
107 context,
108 _phantom: PhantomData,
109 public_key: cfg.public_key,
110 priority: cfg.priority,
111 deque_size: cfg.deque_size,
112 mailbox_receiver,
113 waiters: HashMap::new(),
114 deques: HashMap::new(),
115 items: HashMap::new(),
116 counts: HashMap::new(),
117 metrics,
118 };
119
120 (result, mailbox)
121 }
122
123 /// Starts the engine with the given network.
124 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
125 self.context.spawn_ref()(self.run(network))
126 }
127
128 /// Inner run loop called by `start`.
129 async fn run(mut self, network: (NetS, NetR)) {
130 let (mut net_sender, mut net_receiver) = network;
131 let mut shutdown = self.context.stopped();
132
133 loop {
134 // Cleanup waiters
135 self.cleanup_waiters();
136 self.metrics.waiters.set(self.waiters.len() as i64);
137
138 select! {
139 // Handle shutdown signal
140 _ = &mut shutdown => {
141 debug!("shutdown");
142 },
143
144 // Handle mailbox messages
145 mail = self.mailbox_receiver.next() => {
146 let Some(msg) = mail else {
147 error!("mailbox receiver failed");
148 break;
149 };
150 match msg {
151 Message::Broadcast{ message } => {
152 trace!("mailbox: broadcast");
153 self.handle_broadcast(&mut net_sender, message).await;
154 }
155 Message::Get{ digest, responder } => {
156 trace!("mailbox: get");
157 self.handle_get(digest, responder).await;
158 }
159 }
160 },
161
162 // Handle incoming messages
163 msg = net_receiver.recv() => {
164 // Error handling
165 let (peer, msg) = match msg {
166 Ok(r) => r,
167 Err(err) => {
168 error!(?err, "receiver failed");
169 break;
170 }
171 };
172
173 // Decode the message
174 let message = match M::decode(msg) {
175 Ok(message) => message,
176 Err(err) => {
177 warn!(?err, ?peer, "failed to decode message");
178 self.metrics.receive.inc(Status::Invalid);
179 continue;
180 }
181 };
182
183 trace!(?peer, "network");
184 self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
185 self.handle_network(peer, message).await;
186 },
187 }
188 }
189 }
190
191 ////////////////////////////////////////
192 // Handling
193 ////////////////////////////////////////
194
195 /// Handles a `broadcast` request from the application.
196 async fn handle_broadcast(&mut self, net_sender: &mut NetS, msg: M) {
197 // Store the message, continue even if it was already stored
198 let _ = self.insert_message(self.public_key.clone(), msg.clone());
199
200 // Broadcast the message to the network
201 let recipients = Recipients::All;
202 let msg = Bytes::from(msg.encode());
203 if let Err(err) = net_sender.send(recipients, msg, self.priority).await {
204 warn!(?err, "failed to send message");
205 }
206 }
207
208 /// Handles a `get` request from the application.
209 ///
210 /// If the message is already in the cache, the responder is immediately sent the message.
211 /// Otherwise, the responder is stored in the waiters list.
212 async fn handle_get(&mut self, digest: D, responder: oneshot::Sender<M>) {
213 // Check if the message is already in the cache
214 if let Some(msg) = self.items.get(&digest) {
215 self.respond(responder, msg.clone());
216 return;
217 }
218
219 // Store the responder
220 self.waiters.entry(digest).or_default().push(responder);
221 }
222
223 /// Handles a message that was received from a peer.
224 async fn handle_network(&mut self, peer: P, msg: M) {
225 if !self.insert_message(peer.clone(), msg) {
226 debug!(?peer, "message already stored");
227 self.metrics.receive.inc(Status::Dropped);
228 return;
229 }
230
231 self.metrics.receive.inc(Status::Success);
232 }
233
234 ////////////////////////////////////////
235 // Cache Management
236 ////////////////////////////////////////
237
238 /// Inserts a message into the cache.
239 ///
240 /// Returns `true` if the message was inserted, `false` if it was already present.
241 /// Updates the deque, item count, and message cache, potentially evicting an old message.
242 fn insert_message(&mut self, peer: P, msg: M) -> bool {
243 let digest = msg.digest();
244
245 // Send the message to the waiters, if any, ignoring errors (as the receiver may have dropped)
246 if let Some(responders) = self.waiters.remove(&digest) {
247 for responder in responders {
248 self.respond(responder, msg.clone());
249 }
250 }
251
252 // Get the relevant deque for the peer
253 let deque = self
254 .deques
255 .entry(peer)
256 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
257
258 // If the message is already in the deque, move it to the front and return early
259 if let Some(i) = deque.iter().position(|d| *d == digest) {
260 if i != 0 {
261 deque.remove(i).unwrap(); // Must exist
262 deque.push_front(digest);
263 }
264 return false;
265 };
266
267 // - Insert the message into the peer cache
268 // - Increment the item count
269 // - Insert the message if-and-only-if the new item count is 1
270 deque.push_front(digest);
271 let count = self
272 .counts
273 .entry(digest)
274 .and_modify(|c| *c = c.checked_add(1).unwrap())
275 .or_insert(1);
276 if *count == 1 {
277 let existing = self.items.insert(digest, msg);
278 assert!(existing.is_none());
279 }
280
281 // If the cache is full...
282 if deque.len() > self.deque_size {
283 // Remove the oldest digest from the peer cache
284 // Decrement the item count
285 // Remove the message if-and-only-if the new item count is 0
286 let stale = deque.pop_back().unwrap();
287 let count = self
288 .counts
289 .entry(stale)
290 .and_modify(|c| *c = c.checked_sub(1).unwrap())
291 .or_insert_with(|| unreachable!());
292 if *count == 0 {
293 let existing = self.counts.remove(&stale);
294 assert!(existing == Some(0));
295 self.items.remove(&stale).unwrap(); // Must have existed
296 }
297 }
298
299 true
300 }
301
302 ////////////////////////////////////////
303 // Utilities
304 ////////////////////////////////////////
305
306 /// Remove all waiters that have dropped receivers.
307 fn cleanup_waiters(&mut self) {
308 self.waiters.retain(|_, waiters| {
309 let initial_len = waiters.len();
310 waiters.retain(|waiter| !waiter.is_canceled());
311 let dropped_count = initial_len - waiters.len();
312
313 // Increment metrics for each dropped waiter
314 for _ in 0..dropped_count {
315 self.metrics.get.inc(Status::Dropped);
316 }
317
318 !waiters.is_empty()
319 });
320 }
321
322 /// Respond to a waiter with a message.
323 /// Increments the appropriate metric based on the result.
324 fn respond(&mut self, responder: oneshot::Sender<M>, msg: M) {
325 let result = responder.send(msg);
326 self.metrics.get.inc(match result {
327 Ok(_) => Status::Success,
328 Err(_) => Status::Dropped,
329 });
330 }
331}