1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digestible, PublicKey};
5use commonware_macros::select_loop;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 spawn_cell,
12 telemetry::metrics::status::{CounterExt, GaugeExt, Status},
13 Clock, ContextCell, Handle, Metrics, Spawner,
14};
15use commonware_utils::channel::{fallible::OneshotExt, mpsc, oneshot};
16use std::collections::{BTreeMap, VecDeque};
17use tracing::{debug, error, trace, warn};
18
19struct Waiter<P, Dd, M> {
21 peer: Option<P>,
23
24 digest: Option<Dd>,
26
27 responder: oneshot::Sender<M>,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33struct Pair<Dc, Dd> {
34 commitment: Dc,
36
37 digest: Dd,
39}
40
41pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec> {
49 context: ContextCell<E>,
53
54 public_key: P,
59
60 priority: bool,
62
63 deque_size: usize,
65
66 codec_config: M::Cfg,
68
69 mailbox_receiver: mpsc::Receiver<Message<P, M>>,
74
75 #[allow(clippy::type_complexity)]
77 waiters: BTreeMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
78
79 items: BTreeMap<M::Commitment, BTreeMap<M::Digest, M>>,
87
88 #[allow(clippy::type_complexity)]
94 deques: BTreeMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
95
96 counts: BTreeMap<M::Digest, usize>,
101
102 metrics: metrics::Metrics,
107}
108
109impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec>
110 Engine<E, P, M>
111{
112 pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, M>) {
115 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
116 let mailbox = Mailbox::<P, M>::new(mailbox_sender);
117
118 let metrics = metrics::Metrics::init(context.clone());
120
121 let result = Self {
122 context: ContextCell::new(context),
123 public_key: cfg.public_key,
124 priority: cfg.priority,
125 deque_size: cfg.deque_size,
126 codec_config: cfg.codec_config,
127 mailbox_receiver,
128 waiters: BTreeMap::new(),
129 deques: BTreeMap::new(),
130 items: BTreeMap::new(),
131 counts: BTreeMap::new(),
132 metrics,
133 };
134
135 (result, mailbox)
136 }
137
138 pub fn start(
140 mut self,
141 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
142 ) -> Handle<()> {
143 spawn_cell!(self.context, self.run(network).await)
144 }
145
146 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
148 let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
149
150 select_loop! {
151 self.context,
152 on_start => {
153 self.cleanup_waiters();
155 let _ = self.metrics.waiters.try_set(self.waiters.len());
156 },
157 on_stopped => {
158 debug!("shutdown");
159 },
160 Some(msg) = self.mailbox_receiver.recv() else {
162 error!("mailbox receiver failed");
163 break;
164 } => match msg {
165 Message::Broadcast {
166 recipients,
167 message,
168 responder,
169 } => {
170 trace!("mailbox: broadcast");
171 self.handle_broadcast(&mut sender, recipients, message, responder)
172 .await;
173 }
174 Message::Subscribe {
175 peer,
176 commitment,
177 digest,
178 responder,
179 } => {
180 trace!("mailbox: subscribe");
181 self.handle_subscribe(peer, commitment, digest, responder)
182 .await;
183 }
184 Message::Get {
185 peer,
186 commitment,
187 digest,
188 responder,
189 } => {
190 trace!("mailbox: get");
191 self.handle_get(peer, commitment, digest, responder).await;
192 }
193 },
194 msg = receiver.recv() => {
196 let (peer, msg) = match msg {
198 Ok(r) => r,
199 Err(err) => {
200 error!(?err, "receiver failed");
201 break;
202 }
203 };
204
205 let msg = match msg {
207 Ok(msg) => msg,
208 Err(err) => {
209 warn!(?err, ?peer, "failed to decode message");
210 self.metrics.receive.inc(Status::Invalid);
211 continue;
212 }
213 };
214
215 trace!(?peer, "network");
216 self.metrics
217 .peer
218 .get_or_create(&SequencerLabel::from(&peer))
219 .inc();
220 self.handle_network(peer, msg).await;
221 },
222 }
223 }
224
225 async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
231 &mut self,
232 sender: &mut WrappedSender<Sr, M>,
233 recipients: Recipients<P>,
234 msg: M,
235 responder: oneshot::Sender<Vec<P>>,
236 ) {
237 let _ = self.insert_message(self.public_key.clone(), msg.clone());
239
240 let sent_to = sender
242 .send(recipients, msg, self.priority)
243 .await
244 .unwrap_or_else(|err| {
245 error!(?err, "failed to send message");
246 vec![]
247 });
248 responder.send_lossy(sent_to);
249 }
250
251 fn find_messages(
253 &mut self,
254 peer: &Option<P>,
255 commitment: M::Commitment,
256 digest: Option<M::Digest>,
257 all: bool,
258 ) -> Vec<M> {
259 match peer {
260 Some(s) => self
262 .deques
263 .get(s)
264 .into_iter()
265 .flat_map(|dq| dq.iter())
266 .filter(|pair| pair.commitment == commitment)
267 .filter_map(|pair| {
268 self.items
269 .get(&pair.commitment)
270 .and_then(|m| m.get(&pair.digest))
271 })
272 .cloned()
273 .collect(),
274
275 None => self.items.get(&commitment).map_or_else(
277 Vec::new,
279 |msgs| match digest {
280 Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
282
283 None if all => msgs.values().cloned().collect(),
285 None => msgs.values().next().cloned().into_iter().collect(),
286 },
287 ),
288 }
289 }
290
291 async fn handle_subscribe(
296 &mut self,
297 peer: Option<P>,
298 commitment: M::Commitment,
299 digest: Option<M::Digest>,
300 responder: oneshot::Sender<M>,
301 ) {
302 let mut items = self.find_messages(&peer, commitment, digest, false);
304 if let Some(item) = items.pop() {
305 self.respond_subscribe(responder, item);
306 return;
307 }
308
309 self.waiters.entry(commitment).or_default().push(Waiter {
311 peer,
312 digest,
313 responder,
314 });
315 }
316
317 async fn handle_get(
319 &mut self,
320 peer: Option<P>,
321 commitment: M::Commitment,
322 digest: Option<M::Digest>,
323 responder: oneshot::Sender<Vec<M>>,
324 ) {
325 let items = self.find_messages(&peer, commitment, digest, true);
326 self.respond_get(responder, items);
327 }
328
329 async fn handle_network(&mut self, peer: P, msg: M) {
331 if !self.insert_message(peer.clone(), msg) {
332 debug!(?peer, "message already stored");
333 self.metrics.receive.inc(Status::Dropped);
334 return;
335 }
336
337 self.metrics.receive.inc(Status::Success);
338 }
339
340 fn insert_message(&mut self, peer: P, msg: M) -> bool {
349 let pair = Pair {
351 commitment: msg.commitment(),
352 digest: msg.digest(),
353 };
354
355 if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
357 let mut i = 0;
358 while i < waiters.len() {
359 let Waiter {
361 peer: peer_filter,
362 digest: digest_filter,
363 responder: _,
364 } = &waiters[i];
365
366 if peer_filter.as_ref().is_some_and(|s| s != &peer)
368 || digest_filter.is_some_and(|d| d != pair.digest)
369 {
370 i += 1;
371 continue;
372 }
373
374 let responder = waiters.swap_remove(i).responder;
379 self.respond_subscribe(responder, msg.clone());
380 }
381
382 if !waiters.is_empty() {
384 self.waiters.insert(pair.commitment, waiters);
385 }
386 }
387
388 let deque = self
390 .deques
391 .entry(peer)
392 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
393
394 if let Some(i) = deque.iter().position(|d| *d == pair) {
396 if i != 0 {
397 let v = deque.remove(i).unwrap(); deque.push_front(v);
399 }
400 return false;
401 };
402
403 deque.push_front(pair);
407 let count = self
408 .counts
409 .entry(pair.digest)
410 .and_modify(|c| *c = c.checked_add(1).unwrap())
411 .or_insert(1);
412 if *count == 1 {
413 let existing = self
414 .items
415 .entry(pair.commitment)
416 .or_default()
417 .insert(pair.digest, msg);
418 assert!(existing.is_none());
419 }
420
421 if deque.len() > self.deque_size {
423 let stale = deque.pop_back().unwrap();
427 let count = self
428 .counts
429 .entry(stale.digest)
430 .and_modify(|c| *c = c.checked_sub(1).unwrap())
431 .or_insert_with(|| unreachable!());
432 if *count == 0 {
433 let existing = self.counts.remove(&stale.digest);
434 assert!(existing == Some(0));
435 let identities = self.items.get_mut(&stale.commitment).unwrap();
436 identities.remove(&stale.digest); if identities.is_empty() {
438 self.items.remove(&stale.commitment);
439 }
440 }
441 }
442
443 true
444 }
445
446 fn cleanup_waiters(&mut self) {
452 self.waiters.retain(|_, waiters| {
453 let initial_len = waiters.len();
454 waiters.retain(|waiter| !waiter.responder.is_closed());
455 let dropped_count = initial_len - waiters.len();
456
457 for _ in 0..dropped_count {
459 self.metrics.get.inc(Status::Dropped);
460 }
461
462 !waiters.is_empty()
463 });
464 }
465
466 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
469 self.metrics.subscribe.inc(if responder.send_lossy(msg) {
470 Status::Success
471 } else {
472 Status::Dropped
473 });
474 }
475
476 fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
479 let found = !msg.is_empty();
480 self.metrics.get.inc(if responder.send_lossy(msg) {
481 if found {
482 Status::Success
483 } else {
484 Status::Failure
485 }
486 } else {
487 Status::Dropped
488 });
489 }
490}