1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digestible, PublicKey};
5use commonware_macros::select;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 spawn_cell,
12 telemetry::metrics::status::{CounterExt, GaugeExt, Status},
13 Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::channels::fallible::OneshotExt;
16use futures::{
17 channel::{mpsc, oneshot},
18 StreamExt,
19};
20use std::collections::{BTreeMap, VecDeque};
21use tracing::{debug, error, trace, warn};
22
23struct Waiter<P, Dd, M> {
25 peer: Option<P>,
27
28 digest: Option<Dd>,
30
31 responder: oneshot::Sender<M>,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
37struct Pair<Dc, Dd> {
38 commitment: Dc,
40
41 digest: Dd,
43}
44
45pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec> {
53 context: ContextCell<E>,
57
58 public_key: P,
63
64 priority: bool,
66
67 deque_size: usize,
69
70 codec_config: M::Cfg,
72
73 mailbox_receiver: mpsc::Receiver<Message<P, M>>,
78
79 #[allow(clippy::type_complexity)]
81 waiters: BTreeMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
82
83 items: BTreeMap<M::Commitment, BTreeMap<M::Digest, M>>,
91
92 #[allow(clippy::type_complexity)]
98 deques: BTreeMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
99
100 counts: BTreeMap<M::Digest, usize>,
105
106 metrics: metrics::Metrics,
111}
112
113impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec>
114 Engine<E, P, M>
115{
116 pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, M>) {
119 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
120 let mailbox = Mailbox::<P, M>::new(mailbox_sender);
121
122 let metrics = metrics::Metrics::init(context.clone());
124
125 let result = Self {
126 context: ContextCell::new(context),
127 public_key: cfg.public_key,
128 priority: cfg.priority,
129 deque_size: cfg.deque_size,
130 codec_config: cfg.codec_config,
131 mailbox_receiver,
132 waiters: BTreeMap::new(),
133 deques: BTreeMap::new(),
134 items: BTreeMap::new(),
135 counts: BTreeMap::new(),
136 metrics,
137 };
138
139 (result, mailbox)
140 }
141
142 pub fn start(
144 mut self,
145 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
146 ) -> Handle<()> {
147 spawn_cell!(self.context, self.run(network).await)
148 }
149
150 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
152 let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
153 let mut shutdown = self.context.stopped();
154
155 loop {
156 self.cleanup_waiters();
158 let _ = self.metrics.waiters.try_set(self.waiters.len());
159
160 select! {
161 _ = &mut shutdown => {
163 debug!("shutdown");
164 break;
165 },
166
167 mail = self.mailbox_receiver.next() => {
169 let Some(msg) = mail else {
170 error!("mailbox receiver failed");
171 break;
172 };
173 match msg {
174 Message::Broadcast{ recipients, message, responder } => {
175 trace!("mailbox: broadcast");
176 self.handle_broadcast(&mut sender, recipients, message, responder).await;
177 }
178 Message::Subscribe{ peer, commitment, digest, responder } => {
179 trace!("mailbox: subscribe");
180 self.handle_subscribe(peer, commitment, digest, responder).await;
181 }
182 Message::Get{ peer, commitment, digest, responder } => {
183 trace!("mailbox: get");
184 self.handle_get(peer, commitment, digest, responder).await;
185 }
186 }
187 },
188
189 msg = receiver.recv() => {
191 let (peer, msg) = match msg {
193 Ok(r) => r,
194 Err(err) => {
195 error!(?err, "receiver failed");
196 break;
197 }
198 };
199
200 let msg = match msg {
202 Ok(msg) => msg,
203 Err(err) => {
204 warn!(?err, ?peer, "failed to decode message");
205 self.metrics.receive.inc(Status::Invalid);
206 continue;
207 }
208 };
209
210 trace!(?peer, "network");
211 self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
212 self.handle_network(peer, msg).await;
213 },
214 }
215 }
216 }
217
218 async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
224 &mut self,
225 sender: &mut WrappedSender<Sr, M>,
226 recipients: Recipients<P>,
227 msg: M,
228 responder: oneshot::Sender<Vec<P>>,
229 ) {
230 let _ = self.insert_message(self.public_key.clone(), msg.clone());
232
233 let sent_to = sender
235 .send(recipients, msg, self.priority)
236 .await
237 .unwrap_or_else(|err| {
238 error!(?err, "failed to send message");
239 vec![]
240 });
241 responder.send_lossy(sent_to);
242 }
243
244 fn find_messages(
246 &mut self,
247 peer: &Option<P>,
248 commitment: M::Commitment,
249 digest: Option<M::Digest>,
250 all: bool,
251 ) -> Vec<M> {
252 match peer {
253 Some(s) => self
255 .deques
256 .get(s)
257 .into_iter()
258 .flat_map(|dq| dq.iter())
259 .filter(|pair| pair.commitment == commitment)
260 .filter_map(|pair| {
261 self.items
262 .get(&pair.commitment)
263 .and_then(|m| m.get(&pair.digest))
264 })
265 .cloned()
266 .collect(),
267
268 None => self.items.get(&commitment).map_or_else(
270 Vec::new,
272 |msgs| match digest {
273 Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
275
276 None if all => msgs.values().cloned().collect(),
278 None => msgs.values().next().cloned().into_iter().collect(),
279 },
280 ),
281 }
282 }
283
284 async fn handle_subscribe(
289 &mut self,
290 peer: Option<P>,
291 commitment: M::Commitment,
292 digest: Option<M::Digest>,
293 responder: oneshot::Sender<M>,
294 ) {
295 let mut items = self.find_messages(&peer, commitment, digest, false);
297 if let Some(item) = items.pop() {
298 self.respond_subscribe(responder, item);
299 return;
300 }
301
302 self.waiters.entry(commitment).or_default().push(Waiter {
304 peer,
305 digest,
306 responder,
307 });
308 }
309
310 async fn handle_get(
312 &mut self,
313 peer: Option<P>,
314 commitment: M::Commitment,
315 digest: Option<M::Digest>,
316 responder: oneshot::Sender<Vec<M>>,
317 ) {
318 let items = self.find_messages(&peer, commitment, digest, true);
319 self.respond_get(responder, items);
320 }
321
322 async fn handle_network(&mut self, peer: P, msg: M) {
324 if !self.insert_message(peer.clone(), msg) {
325 debug!(?peer, "message already stored");
326 self.metrics.receive.inc(Status::Dropped);
327 return;
328 }
329
330 self.metrics.receive.inc(Status::Success);
331 }
332
333 fn insert_message(&mut self, peer: P, msg: M) -> bool {
342 let pair = Pair {
344 commitment: msg.commitment(),
345 digest: msg.digest(),
346 };
347
348 if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
350 let mut i = 0;
351 while i < waiters.len() {
352 let Waiter {
354 peer: peer_filter,
355 digest: digest_filter,
356 responder: _,
357 } = &waiters[i];
358
359 if peer_filter.as_ref().is_some_and(|s| s != &peer)
361 || digest_filter.is_some_and(|d| d != pair.digest)
362 {
363 i += 1;
364 continue;
365 }
366
367 let responder = waiters.swap_remove(i).responder;
372 self.respond_subscribe(responder, msg.clone());
373 }
374
375 if !waiters.is_empty() {
377 self.waiters.insert(pair.commitment, waiters);
378 }
379 }
380
381 let deque = self
383 .deques
384 .entry(peer)
385 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
386
387 if let Some(i) = deque.iter().position(|d| *d == pair) {
389 if i != 0 {
390 let v = deque.remove(i).unwrap(); deque.push_front(v);
392 }
393 return false;
394 };
395
396 deque.push_front(pair);
400 let count = self
401 .counts
402 .entry(pair.digest)
403 .and_modify(|c| *c = c.checked_add(1).unwrap())
404 .or_insert(1);
405 if *count == 1 {
406 let existing = self
407 .items
408 .entry(pair.commitment)
409 .or_default()
410 .insert(pair.digest, msg);
411 assert!(existing.is_none());
412 }
413
414 if deque.len() > self.deque_size {
416 let stale = deque.pop_back().unwrap();
420 let count = self
421 .counts
422 .entry(stale.digest)
423 .and_modify(|c| *c = c.checked_sub(1).unwrap())
424 .or_insert_with(|| unreachable!());
425 if *count == 0 {
426 let existing = self.counts.remove(&stale.digest);
427 assert!(existing == Some(0));
428 let identities = self.items.get_mut(&stale.commitment).unwrap();
429 identities.remove(&stale.digest); if identities.is_empty() {
431 self.items.remove(&stale.commitment);
432 }
433 }
434 }
435
436 true
437 }
438
439 fn cleanup_waiters(&mut self) {
445 self.waiters.retain(|_, waiters| {
446 let initial_len = waiters.len();
447 waiters.retain(|waiter| !waiter.responder.is_canceled());
448 let dropped_count = initial_len - waiters.len();
449
450 for _ in 0..dropped_count {
452 self.metrics.get.inc(Status::Dropped);
453 }
454
455 !waiters.is_empty()
456 });
457 }
458
459 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
462 self.metrics.subscribe.inc(if responder.send_lossy(msg) {
463 Status::Success
464 } else {
465 Status::Dropped
466 });
467 }
468
469 fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
472 let found = !msg.is_empty();
473 self.metrics.get.inc(if responder.send_lossy(msg) {
474 if found {
475 Status::Success
476 } else {
477 Status::Failure
478 }
479 } else {
480 Status::Dropped
481 });
482 }
483}