1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Digestible, PublicKey};
5use commonware_macros::select_loop;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Provider, Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 spawn_cell,
12 telemetry::metrics::status::{CounterExt, GaugeExt, Status},
13 BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::{
16 channel::{fallible::OneshotExt, mpsc, oneshot},
17 ordered::Set,
18};
19use std::collections::{BTreeMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22struct Waiter<M> {
24 responder: oneshot::Sender<M>,
26}
27
28enum InsertMessageResult {
30 Inserted,
31 Duplicate,
32 Ineligible,
33}
34
35pub struct Engine<E, P, M, D>
43where
44 E: BufferPooler + Clock + Spawner + Metrics,
45 P: PublicKey,
46 M: Digestible + Codec,
47 D: Provider<PublicKey = P>,
48{
49 context: ContextCell<E>,
53
54 public_key: P,
59
60 priority: bool,
62
63 deque_size: usize,
65
66 codec_config: M::Cfg,
68
69 mailbox_receiver: mpsc::Receiver<Message<P, M>>,
74
75 waiters: BTreeMap<M::Digest, Vec<Waiter<M>>>,
77
78 peer_provider: D,
80
81 items: BTreeMap<M::Digest, M>,
86
87 deques: BTreeMap<P, VecDeque<M::Digest>>,
93
94 counts: BTreeMap<M::Digest, usize>,
99
100 latest_primary_peers: Set<P>,
102
103 metrics: metrics::Metrics,
108}
109
110impl<E, P, M, D> Engine<E, P, M, D>
111where
112 E: BufferPooler + Clock + Spawner + Metrics,
113 P: PublicKey,
114 M: Digestible + Codec,
115 D: Provider<PublicKey = P>,
116{
117 pub fn new(context: E, cfg: Config<P, M::Cfg, D>) -> (Self, Mailbox<P, M>) {
120 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
121 let mailbox = Mailbox::<P, M>::new(mailbox_sender);
122
123 let metrics = metrics::Metrics::init(context.clone());
125
126 let result = Self {
127 context: ContextCell::new(context),
128 public_key: cfg.public_key,
129 priority: cfg.priority,
130 deque_size: cfg.deque_size,
131 codec_config: cfg.codec_config,
132 mailbox_receiver,
133 waiters: BTreeMap::new(),
134 deques: BTreeMap::new(),
135 items: BTreeMap::new(),
136 counts: BTreeMap::new(),
137 latest_primary_peers: Set::default(),
138 peer_provider: cfg.peer_provider,
139 metrics,
140 };
141
142 (result, mailbox)
143 }
144
145 pub fn start(
147 mut self,
148 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
149 ) -> Handle<()> {
150 spawn_cell!(self.context, self.run(network).await)
151 }
152
153 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
155 let (mut sender, mut receiver) = wrap(
156 self.codec_config.clone(),
157 self.context.network_buffer_pool().clone(),
158 network.0,
159 network.1,
160 );
161 let peer_set_subscription = &mut self.peer_provider.subscribe().await;
162
163 select_loop! {
164 self.context,
165 on_start => {
166 self.cleanup_waiters();
168 let _ = self.metrics.waiters.try_set(self.waiters.len());
169 },
170 on_stopped => {
171 debug!("shutdown");
172 },
173 Some(update) = peer_set_subscription.recv() else {
175 debug!("peer set subscription closed");
176 break;
177 } => {
178 self.update_latest_primary_peers(update.latest.primary);
180 },
181 Some(msg) = self.mailbox_receiver.recv() else {
183 error!("mailbox receiver failed");
184 break;
185 } => match msg {
186 Message::Broadcast {
187 recipients,
188 message,
189 responder,
190 } => {
191 trace!("mailbox: broadcast");
192 self.handle_broadcast(&mut sender, recipients, message, responder)
193 .await;
194 }
195 Message::Subscribe { digest, responder } => {
196 trace!("mailbox: subscribe");
197 self.handle_subscribe(digest, responder);
198 }
199 Message::Get { digest, responder } => {
200 trace!("mailbox: get");
201 self.handle_get(digest, responder);
202 }
203 },
204 msg = receiver.recv() => {
206 let (peer, msg) = match msg {
208 Ok(r) => r,
209 Err(err) => {
210 error!(?err, "receiver failed");
211 break;
212 }
213 };
214
215 let msg = match msg {
217 Ok(msg) => msg,
218 Err(err) => {
219 warn!(?err, ?peer, "failed to decode message");
220 self.metrics.receive.inc(Status::Invalid);
221 continue;
222 }
223 };
224
225 trace!(?peer, "network");
226 self.metrics
227 .peer
228 .get_or_create(&SequencerLabel::from(&peer))
229 .inc();
230 self.handle_network(peer, msg);
231 },
232 }
233 }
234
235 async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
241 &mut self,
242 sender: &mut WrappedSender<Sr, M>,
243 recipients: Recipients<P>,
244 msg: M,
245 responder: oneshot::Sender<Vec<P>>,
246 ) {
247 let digest = msg.digest();
249 let _ = self.insert_message(self.public_key.clone(), digest, msg.clone());
250
251 let sent_to = sender
253 .send(recipients, msg, self.priority)
254 .await
255 .unwrap_or_else(|err| {
256 error!(?err, "failed to send message");
257 vec![]
258 });
259 responder.send_lossy(sent_to);
260 }
261
262 fn handle_subscribe(&mut self, digest: M::Digest, responder: oneshot::Sender<M>) {
267 if let Some(item) = self.items.get(&digest).cloned() {
269 self.respond_subscribe(responder, item);
270 return;
271 }
272
273 self.waiters
275 .entry(digest)
276 .or_default()
277 .push(Waiter { responder });
278 }
279
280 fn handle_get(&mut self, digest: M::Digest, responder: oneshot::Sender<Option<M>>) {
282 let item = self.items.get(&digest).cloned();
283 self.respond_get(responder, item);
284 }
285
286 fn handle_network(&mut self, peer: P, msg: M) {
288 let digest = msg.digest();
289 match self.insert_message(peer.clone(), digest, msg) {
290 InsertMessageResult::Inserted => {
291 self.metrics.receive.inc(Status::Success);
292 }
293 InsertMessageResult::Duplicate => {
294 debug!(?peer, "message already stored");
295 self.metrics.receive.inc(Status::Dropped);
296 }
297 InsertMessageResult::Ineligible => {
298 debug!(?peer, "message from peer outside latest.primary not cached");
299 self.metrics.receive.inc(Status::Dropped);
300 }
301 }
302 }
303
304 fn insert_message(&mut self, peer: P, digest: M::Digest, msg: M) -> InsertMessageResult {
313 if let Some(waiters) = self.waiters.remove(&digest) {
315 for waiter in waiters {
316 self.respond_subscribe(waiter.responder, msg.clone());
317 }
318 }
319
320 if self.latest_primary_peers.position(&peer).is_none() {
322 return InsertMessageResult::Ineligible;
323 }
324
325 let deque = self
327 .deques
328 .entry(peer)
329 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
330
331 if let Some(i) = deque.iter().position(|d| *d == digest) {
333 if i != 0 {
334 let v = deque.remove(i).unwrap(); deque.push_front(v);
336 }
337 return InsertMessageResult::Duplicate;
338 };
339
340 deque.push_front(digest);
344 let count = self
345 .counts
346 .entry(digest)
347 .and_modify(|c| *c = c.checked_add(1).unwrap())
348 .or_insert(1);
349 if *count == 1 {
350 let existing = self.items.insert(digest, msg);
351 assert!(existing.is_none());
352 }
353
354 if deque.len() > self.deque_size {
356 let stale = deque.pop_back().unwrap();
360 decrement_digest_refcount(&mut self.counts, &mut self.items, &stale);
361 }
362
363 InsertMessageResult::Inserted
364 }
365
366 fn update_latest_primary_peers(&mut self, peers: Set<P>) {
367 for (peer, deque) in self
368 .deques
369 .extract_if(.., |peer, _| peers.position(peer).is_none())
370 {
371 debug!(?peer, digests = deque.len(), "evicting disconnected peer");
372 for digest in deque {
373 decrement_digest_refcount(&mut self.counts, &mut self.items, &digest);
374 }
375 }
376 self.latest_primary_peers = peers;
377 }
378
379 fn cleanup_waiters(&mut self) {
385 self.waiters.retain(|_, waiters| {
386 let initial_len = waiters.len();
387 waiters.retain(|waiter| !waiter.responder.is_closed());
388 let dropped_count = initial_len - waiters.len();
389
390 for _ in 0..dropped_count {
392 self.metrics.get.inc(Status::Dropped);
393 }
394
395 !waiters.is_empty()
396 });
397 }
398
399 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
402 self.metrics.subscribe.inc(if responder.send_lossy(msg) {
403 Status::Success
404 } else {
405 Status::Dropped
406 });
407 }
408
409 fn respond_get(&mut self, responder: oneshot::Sender<Option<M>>, msg: Option<M>) {
412 let found = msg.is_some();
413 self.metrics.get.inc(if responder.send_lossy(msg) {
414 if found {
415 Status::Success
416 } else {
417 Status::Failure
418 }
419 } else {
420 Status::Dropped
421 });
422 }
423}
424
425fn decrement_digest_refcount<D: Ord, M>(
427 counts: &mut BTreeMap<D, usize>,
428 items: &mut BTreeMap<D, M>,
429 digest: &D,
430) {
431 let should_remove = {
432 let count = counts.get_mut(digest).expect("count must exist");
433 *count = count.checked_sub(1).expect("count must be > 0");
434 *count == 0
435 };
436 if should_remove {
437 let existing = counts.remove(digest);
438 assert!(existing == Some(0));
439 items.remove(digest);
440 }
441}