1use super::{metrics, Config, Mailbox, Message};
2use crate::buffered::metrics::SequencerLabel;
3use commonware_codec::Codec;
4use commonware_cryptography::{Committable, Digestible, PublicKey};
5use commonware_macros::select;
6use commonware_p2p::{
7 utils::codec::{wrap, WrappedSender},
8 Receiver, Recipients, Sender,
9};
10use commonware_runtime::{
11 telemetry::metrics::status::{CounterExt, Status},
12 Clock, Handle, Metrics, Spawner,
13};
14use futures::{
15 channel::{mpsc, oneshot},
16 StreamExt,
17};
18use std::collections::{HashMap, VecDeque};
19use tracing::{debug, error, trace, warn};
20
21struct Waiter<P, Dd, M> {
23 peer: Option<P>,
25
26 digest: Option<Dd>,
28
29 responder: oneshot::Sender<M>,
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35struct Pair<Dc, Dd> {
36 commitment: Dc,
38
39 digest: Dd,
41}
42
43pub struct Engine<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec> {
51 context: E,
55
56 public_key: P,
61
62 priority: bool,
64
65 deque_size: usize,
67
68 codec_config: M::Cfg,
70
71 mailbox_receiver: mpsc::Receiver<Message<P, M>>,
76
77 #[allow(clippy::type_complexity)]
79 waiters: HashMap<M::Commitment, Vec<Waiter<P, M::Digest, M>>>,
80
81 items: HashMap<M::Commitment, HashMap<M::Digest, M>>,
89
90 #[allow(clippy::type_complexity)]
96 deques: HashMap<P, VecDeque<Pair<M::Commitment, M::Digest>>>,
97
98 counts: HashMap<M::Digest, usize>,
103
104 metrics: metrics::Metrics,
109}
110
111impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + Codec>
112 Engine<E, P, M>
113{
114 pub fn new(context: E, cfg: Config<P, M::Cfg>) -> (Self, Mailbox<P, M>) {
117 let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
118 let mailbox = Mailbox::<P, M>::new(mailbox_sender);
119 let metrics = metrics::Metrics::init(context.clone());
120
121 let result = Self {
122 context,
123 public_key: cfg.public_key,
124 priority: cfg.priority,
125 deque_size: cfg.deque_size,
126 codec_config: cfg.codec_config,
127 mailbox_receiver,
128 waiters: HashMap::new(),
129 deques: HashMap::new(),
130 items: HashMap::new(),
131 counts: HashMap::new(),
132 metrics,
133 };
134
135 (result, mailbox)
136 }
137
138 pub fn start(
140 mut self,
141 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
142 ) -> Handle<()> {
143 self.context.spawn_ref()(self.run(network))
144 }
145
146 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
148 let (mut sender, mut receiver) = wrap(self.codec_config.clone(), network.0, network.1);
149 let mut shutdown = self.context.stopped();
150
151 loop {
152 self.cleanup_waiters();
154 self.metrics.waiters.set(self.waiters.len() as i64);
155
156 select! {
157 _ = &mut shutdown => {
159 debug!("shutdown");
160 },
161
162 mail = self.mailbox_receiver.next() => {
164 let Some(msg) = mail else {
165 error!("mailbox receiver failed");
166 break;
167 };
168 match msg {
169 Message::Broadcast{ recipients, message, responder } => {
170 trace!("mailbox: broadcast");
171 self.handle_broadcast(&mut sender, recipients, message, responder).await;
172 }
173 Message::Subscribe{ peer, commitment, digest, responder } => {
174 trace!("mailbox: subscribe");
175 self.handle_subscribe(peer, commitment, digest, responder).await;
176 }
177 Message::Get{ peer, commitment, digest, responder } => {
178 trace!("mailbox: get");
179 self.handle_get(peer, commitment, digest, responder).await;
180 }
181 }
182 },
183
184 msg = receiver.recv() => {
186 let (peer, msg) = match msg {
188 Ok(r) => r,
189 Err(err) => {
190 error!(?err, "receiver failed");
191 break;
192 }
193 };
194
195 let msg = match msg {
197 Ok(msg) => msg,
198 Err(err) => {
199 warn!(?err, ?peer, "failed to decode message");
200 self.metrics.receive.inc(Status::Invalid);
201 continue;
202 }
203 };
204
205 trace!(?peer, "network");
206 self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
207 self.handle_network(peer, msg).await;
208 },
209 }
210 }
211 }
212
213 async fn handle_broadcast<Sr: Sender<PublicKey = P>>(
219 &mut self,
220 sender: &mut WrappedSender<Sr, M>,
221 recipients: Recipients<P>,
222 msg: M,
223 responder: oneshot::Sender<Vec<P>>,
224 ) {
225 let _ = self.insert_message(self.public_key.clone(), msg.clone());
227
228 let sent_to = sender
230 .send(recipients, msg, self.priority)
231 .await
232 .unwrap_or_else(|err| {
233 error!(?err, "failed to send message");
234 vec![]
235 });
236 let _ = responder.send(sent_to);
237 }
238
239 fn find_messages(
241 &mut self,
242 peer: &Option<P>,
243 commitment: M::Commitment,
244 digest: Option<M::Digest>,
245 all: bool,
246 ) -> Vec<M> {
247 match peer {
248 Some(s) => self
250 .deques
251 .get(s)
252 .into_iter()
253 .flat_map(|dq| dq.iter())
254 .filter(|pair| pair.commitment == commitment)
255 .filter_map(|pair| {
256 self.items
257 .get(&pair.commitment)
258 .and_then(|m| m.get(&pair.digest))
259 })
260 .cloned()
261 .collect(),
262
263 None => match self.items.get(&commitment) {
265 None => Vec::new(),
267
268 Some(msgs) => match digest {
270 Some(dg) => msgs.get(&dg).cloned().into_iter().collect(),
272
273 None if all => msgs.values().cloned().collect(),
275 None => msgs.values().next().cloned().into_iter().collect(),
276 },
277 },
278 }
279 }
280
281 async fn handle_subscribe(
286 &mut self,
287 peer: Option<P>,
288 commitment: M::Commitment,
289 digest: Option<M::Digest>,
290 responder: oneshot::Sender<M>,
291 ) {
292 let mut items = self.find_messages(&peer, commitment, digest, false);
294 if let Some(item) = items.pop() {
295 self.respond_subscribe(responder, item);
296 return;
297 }
298
299 self.waiters.entry(commitment).or_default().push(Waiter {
301 peer,
302 digest,
303 responder,
304 });
305 }
306
307 async fn handle_get(
309 &mut self,
310 peer: Option<P>,
311 commitment: M::Commitment,
312 digest: Option<M::Digest>,
313 responder: oneshot::Sender<Vec<M>>,
314 ) {
315 let items = self.find_messages(&peer, commitment, digest, true);
316 self.respond_get(responder, items);
317 }
318
319 async fn handle_network(&mut self, peer: P, msg: M) {
321 if !self.insert_message(peer.clone(), msg) {
322 debug!(?peer, "message already stored");
323 self.metrics.receive.inc(Status::Dropped);
324 return;
325 }
326
327 self.metrics.receive.inc(Status::Success);
328 }
329
330 fn insert_message(&mut self, peer: P, msg: M) -> bool {
339 let pair = Pair {
341 commitment: msg.commitment(),
342 digest: msg.digest(),
343 };
344
345 if let Some(mut waiters) = self.waiters.remove(&pair.commitment) {
347 let mut i = 0;
348 while i < waiters.len() {
349 let Waiter {
351 peer: peer_filter,
352 digest: digest_filter,
353 responder: _,
354 } = &waiters[i];
355
356 if peer_filter.as_ref().is_some_and(|s| s != &peer)
358 || digest_filter.is_some_and(|d| d != pair.digest)
359 {
360 i += 1;
361 continue;
362 }
363
364 let responder = waiters.swap_remove(i).responder;
369 self.respond_subscribe(responder, msg.clone());
370 }
371
372 if !waiters.is_empty() {
374 self.waiters.insert(pair.commitment, waiters);
375 }
376 }
377
378 let deque = self
380 .deques
381 .entry(peer)
382 .or_insert_with(|| VecDeque::with_capacity(self.deque_size + 1));
383
384 if let Some(i) = deque.iter().position(|d| *d == pair) {
386 if i != 0 {
387 let v = deque.remove(i).unwrap(); deque.push_front(v);
389 }
390 return false;
391 };
392
393 deque.push_front(pair);
397 let count = self
398 .counts
399 .entry(pair.digest)
400 .and_modify(|c| *c = c.checked_add(1).unwrap())
401 .or_insert(1);
402 if *count == 1 {
403 let existing = self
404 .items
405 .entry(pair.commitment)
406 .or_default()
407 .insert(pair.digest, msg);
408 assert!(existing.is_none());
409 }
410
411 if deque.len() > self.deque_size {
413 let stale = deque.pop_back().unwrap();
417 let count = self
418 .counts
419 .entry(stale.digest)
420 .and_modify(|c| *c = c.checked_sub(1).unwrap())
421 .or_insert_with(|| unreachable!());
422 if *count == 0 {
423 let existing = self.counts.remove(&stale.digest);
424 assert!(existing == Some(0));
425 let identities = self.items.get_mut(&stale.commitment).unwrap();
426 identities.remove(&stale.digest); if identities.is_empty() {
428 self.items.remove(&stale.commitment);
429 }
430 }
431 }
432
433 true
434 }
435
436 fn cleanup_waiters(&mut self) {
442 self.waiters.retain(|_, waiters| {
443 let initial_len = waiters.len();
444 waiters.retain(|waiter| !waiter.responder.is_canceled());
445 let dropped_count = initial_len - waiters.len();
446
447 for _ in 0..dropped_count {
449 self.metrics.get.inc(Status::Dropped);
450 }
451
452 !waiters.is_empty()
453 });
454 }
455
456 fn respond_subscribe(&mut self, responder: oneshot::Sender<M>, msg: M) {
459 let result = responder.send(msg);
460 self.metrics.subscribe.inc(match result {
461 Ok(_) => Status::Success,
462 Err(_) => Status::Dropped,
463 });
464 }
465
466 fn respond_get(&mut self, responder: oneshot::Sender<Vec<M>>, msg: Vec<M>) {
469 let found = !msg.is_empty();
470 let result = responder.send(msg);
471 self.metrics.get.inc(match result {
472 Ok(_) if found => Status::Success,
473 Ok(_) => Status::Failure,
474 Err(_) => Status::Dropped,
475 });
476 }
477}