1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digestible};
5use commonware_macros::select;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 telemetry::metrics::status::{CounterExt, Status},
12 Clock, Handle, Metrics, Spawner,
13};
14use commonware_utils::Array;
15use futures::{
16 channel::{mpsc, oneshot},
17 StreamExt,
18};
19use std::collections::{HashMap, VecDeque};
20use tracing::{debug, error, trace, warn};
21
22struct Waiter<P, Dd, M> {
24 peer: Option<P>,
26
27 digest: Option<Dd>,
29
30 responder: oneshot::Sender<M>,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
36struct Pair<Dc, Dd> {
37 commitment: Dc,
39
40 digest: Dd,
42}
43
44pub struct Engine<E: Clock + Spawner + Metrics, P: Array, M: Committable + Digestible + Codec> {
52 context: E,
56
57 public_key: P,
62
63 priority: bool,
65
66 deque_size: usize,
68
69 codec_config: M::Cfg,
71
72 mailbox_receiver: mpsc::Receiver<Message<P, M>>,
77
78 #[allow(clippy::type_complexity)]
80 waiters: HashMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
81
82 items: HashMap<M::Commitment, HashMap<M::Digest, M>>,
90
91 #[allow(clippy::type_complexity)]
97 deques: HashMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
98
99 counts: HashMap<M::Digest, usize>,
104
105 metrics: metrics::Metrics,
110}
111
112impl<E: Clock + Spawner + Metrics, P: Array, M: Committable + Digestible + Codec> Engine<E, P, M> {
113 pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, M>) {
116 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
117 let mailbox = Mailbox::<P, M>::new(mailbox_sender);
118 let metrics = metrics::Metrics::init(context.clone());
119
120 let result = Self {
121 context,
122 public_key: cfg.public_key,
123 priority: cfg.priority,
124 deque_size: cfg.deque_size,
125 codec_config: cfg.codec_config,
126 mailbox_receiver,
127 waiters: HashMap::new(),
128 deques: HashMap::new(),
129 items: HashMap::new(),
130 counts: HashMap::new(),
131 metrics,
132 };
133
134 (result, mailbox)
135 }
136
137 pub fn start(
139 mut self,
140 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
141 ) -> Handle<()> {
142 self.context.spawn_ref()(self.run(network))
143 }
144
145 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
147 let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
148 let mut shutdown = self.context.stopped();
149
150 loop {
151 self.cleanup_waiters();
153 self.metrics.waiters.set(self.waiters.len() as i64);
154
155 select! {
156 _ = &mut shutdown => {
158 debug!("shutdown");
159 },
160
161 mail = self.mailbox_receiver.next() => {
163 let Some(msg) = mail else {
164 error!("mailbox receiver failed");
165 break;
166 };
167 match msg {
168 Message::Broadcast{ recipients, message, responder } => {
169 trace!("mailbox: broadcast");
170 self.handle_broadcast(&mut sender, recipients, message, responder).await;
171 }
172 Message::Subscribe{ peer, commitment, digest, responder } => {
173 trace!("mailbox: subscribe");
174 self.handle_subscribe(peer, commitment, digest, responder).await;
175 }
176 Message::Get{ peer, commitment, digest, responder } => {
177 trace!("mailbox: get");
178 self.handle_get(peer, commitment, digest, responder).await;
179 }
180 }
181 },
182
183 msg = receiver.recv() => {
185 let (peer, msg) = match msg {
187 Ok(r) => r,
188 Err(err) => {
189 error!(?err, "receiver failed");
190 break;
191 }
192 };
193
194 let msg = match msg {
196 Ok(msg) => msg,
197 Err(err) => {
198 warn!(?err, ?peer, "failed to decode message");
199 self.metrics.receive.inc(Status::Invalid);
200 continue;
201 }
202 };
203
204 trace!(?peer, "network");
205 self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
206 self.handle_network(peer, msg).await;
207 },
208 }
209 }
210 }
211
212 async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
218 &mut self,
219 sender: &mut WrappedSender<Sr, M>,
220 recipients: Recipients<P>,
221 msg: M,
222 responder: oneshot::Sender<Vec<P>>,
223 ) {
224 let _ = self.insert_message(self.public_key.clone(), msg.clone());
226
227 let sent_to = sender
229 .send(recipients, msg, self.priority)
230 .await
231 .unwrap_or_else(|err| {
232 error!(?err, "failed to send message");
233 vec![]
234 });
235 let _ = responder.send(sent_to);
236 }
237
238 fn find_messages(
240 &mut self,
241 peer: &Option<P>,
242 commitment: M::Commitment,
243 digest: Option<M::Digest>,
244 all: bool,
245 ) -> Vec<M> {
246 match peer {
247 Some(s) => self
249 .deques
250 .get(s)
251 .into_iter()
252 .flat_map(|dq| dq.iter())
253 .filter(|pair| pair.commitment == commitment)
254 .filter_map(|pair| {
255 self.items
256 .get(&pair.commitment)
257 .and_then(|m| m.get(&pair.digest))
258 })
259 .cloned()
260 .collect(),
261
262 None => match self.items.get(&commitment) {
264 None => Vec::new(),
266
267 Some(msgs) => match digest {
269 Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
271
272 None if all => msgs.values().cloned().collect(),
274 None => msgs.values().next().cloned().into_iter().collect(),
275 },
276 },
277 }
278 }
279
280 async fn handle_subscribe(
285 &mut self,
286 peer: Option<P>,
287 commitment: M::Commitment,
288 digest: Option<M::Digest>,
289 responder: oneshot::Sender<M>,
290 ) {
291 let mut items = self.find_messages(&peer, commitment, digest, false);
293 if let Some(item) = items.pop() {
294 self.respond_subscribe(responder, item);
295 return;
296 }
297
298 self.waiters.entry(commitment).or_default().push(Waiter {
300 peer,
301 digest,
302 responder,
303 });
304 }
305
306 async fn handle_get(
308 &mut self,
309 peer: Option<P>,
310 commitment: M::Commitment,
311 digest: Option<M::Digest>,
312 responder: oneshot::Sender<Vec<M>>,
313 ) {
314 let items = self.find_messages(&peer, commitment, digest, true);
315 self.respond_get(responder, items);
316 }
317
318 async fn handle_network(&mut self, peer: P, msg: M) {
320 if !self.insert_message(peer.clone(), msg) {
321 debug!(?peer, "message already stored");
322 self.metrics.receive.inc(Status::Dropped);
323 return;
324 }
325
326 self.metrics.receive.inc(Status::Success);
327 }
328
329 fn insert_message(&mut self, peer: P, msg: M) -> bool {
338 let pair = Pair {
340 commitment: msg.commitment(),
341 digest: msg.digest(),
342 };
343
344 if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
346 let mut i = 0;
347 while i < waiters.len() {
348 let Waiter {
350 peer: peer_filter,
351 digest: digest_filter,
352 responder: _,
353 } = &waiters[i];
354
355 if peer_filter.as_ref().is_some_and(|s| s != &peer)
357 || digest_filter.is_some_and(|d| d != pair.digest)
358 {
359 i += 1;
360 continue;
361 }
362
363 let responder = waiters.swap_remove(i).responder;
368 self.respond_subscribe(responder, msg.clone());
369 }
370
371 if !waiters.is_empty() {
373 self.waiters.insert(pair.commitment, waiters);
374 }
375 }
376
377 let deque = self
379 .deques
380 .entry(peer)
381 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
382
383 if let Some(i) = deque.iter().position(|d| *d == pair) {
385 if i != 0 {
386 let v = deque.remove(i).unwrap(); deque.push_front(v);
388 }
389 return false;
390 };
391
392 deque.push_front(pair);
396 let count = self
397 .counts
398 .entry(pair.digest)
399 .and_modify(|c| *c = c.checked_add(1).unwrap())
400 .or_insert(1);
401 if *count == 1 {
402 let existing = self
403 .items
404 .entry(pair.commitment)
405 .or_default()
406 .insert(pair.digest, msg);
407 assert!(existing.is_none());
408 }
409
410 if deque.len() > self.deque_size {
412 let stale = deque.pop_back().unwrap();
416 let count = self
417 .counts
418 .entry(stale.digest)
419 .and_modify(|c| *c = c.checked_sub(1).unwrap())
420 .or_insert_with(|| unreachable!());
421 if *count == 0 {
422 let existing = self.counts.remove(&stale.digest);
423 assert!(existing == Some(0));
424 let identities = self.items.get_mut(&stale.commitment).unwrap();
425 identities.remove(&stale.digest); if identities.is_empty() {
427 self.items.remove(&stale.commitment);
428 }
429 }
430 }
431
432 true
433 }
434
435 fn cleanup_waiters(&mut self) {
441 self.waiters.retain(|_, waiters| {
442 let initial_len = waiters.len();
443 waiters.retain(|waiter| !waiter.responder.is_canceled());
444 let dropped_count = initial_len - waiters.len();
445
446 for _ in 0..dropped_count {
448 self.metrics.get.inc(Status::Dropped);
449 }
450
451 !waiters.is_empty()
452 });
453 }
454
455 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
458 let result = responder.send(msg);
459 self.metrics.subscribe.inc(match result {
460 Ok(_) => Status::Success,
461 Err(_) => Status::Dropped,
462 });
463 }
464
465 fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
468 let found = !msg.is_empty();
469 let result = responder.send(msg);
470 self.metrics.get.inc(match result {
471 Ok(_) if found => Status::Success,
472 Ok(_) => Status::Failure,
473 Err(_) => Status::Dropped,
474 });
475 }
476}