1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Epoch, Error, Index, Item, TipAck},
7 Config,
8};
9use crate::{Automaton, Monitor, Reporter, ThresholdSupervisor};
10use commonware_cryptography::{
11 bls12381::primitives::{group, ops::threshold_signature_recover, poly, variant::Variant},
12 Digest, PublicKey,
13};
14use commonware_macros::select;
15use commonware_p2p::{
16 utils::codec::{wrap, WrappedSender},
17 Blocker, Receiver, Recipients, Sender,
18};
19use commonware_runtime::{
20 telemetry::metrics::{
21 histogram,
22 status::{CounterExt, Status},
23 },
24 Clock, Handle, Metrics, Spawner, Storage,
25};
26use commonware_storage::journal::variable::{Config as JConfig, Journal};
27use commonware_utils::{futures::Pool as FuturesPool, PrioritySet};
28use futures::{
29 future::{self, Either},
30 pin_mut, StreamExt,
31};
32use std::{
33 cmp::max,
34 collections::{BTreeMap, HashMap},
35 marker::PhantomData,
36 time::{Duration, SystemTime},
37};
38use tracing::{debug, error, trace, warn};
39
40enum Pending<V: Variant, D: Digest> {
42 Unverified(HashMap<Epoch, HashMap<u32, Ack<V, D>>>),
45
46 Verified(D, HashMap<Epoch, HashMap<u32, Ack<V, D>>>),
48}
49
50struct DigestRequest<D: Digest, E: Clock> {
53 index: Index,
55
56 result: Result<D, Error>,
58
59 timer: histogram::Timer<E>,
61}
62
63pub struct Engine<
65 E: Clock + Spawner + Storage + Metrics,
66 P: PublicKey,
67 V: Variant,
68 D: Digest,
69 A: Automaton<Context = Index, Digest = D> + Clone,
70 Z: Reporter<Activity = Activity<V, D>>,
71 M: Monitor<Index = Epoch>,
72 B: Blocker<PublicKey = P>,
73 TSu: ThresholdSupervisor<
74 Index = Epoch,
75 PublicKey = P,
76 Share = group::Share,
77 Identity = poly::Public<V>,
78 >,
79 NetS: Sender<PublicKey = P>,
80 NetR: Receiver<PublicKey = P>,
81> {
82 context: E,
84 automaton: A,
85 monitor: M,
86 validators: TSu,
87 reporter: Z,
88 blocker: B,
89
90 namespace: Vec<u8>,
93
94 epoch_bounds: (u64, u64),
103
104 window: u64,
106
107 digest_requests: FuturesPool<DigestRequest<D, E>>,
110
111 epoch: Epoch,
114
115 tip: Index,
117
118 safe_tip: SafeTip<P>,
120
121 pending: BTreeMap<Index, Pending<V, D>>,
125
126 confirmed: BTreeMap<Index, (D, V::Signature)>,
128
129 rebroadcast_timeout: Duration,
132
133 rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
135
136 journal: Option<Journal<E, Activity<V, D>>>,
139 journal_partition: String,
140 journal_write_buffer: usize,
141 journal_replay_buffer: usize,
142 journal_heights_per_section: u64,
143 journal_compression: Option<u8>,
144
145 priority_acks: bool,
148
149 _phantom: PhantomData<(NetS, NetR)>,
151
152 metrics: metrics::Metrics<E>,
155}
156
157impl<
158 E: Clock + Spawner + Storage + Metrics,
159 P: PublicKey,
160 V: Variant,
161 D: Digest,
162 A: Automaton<Context = Index, Digest = D> + Clone,
163 Z: Reporter<Activity = Activity<V, D>>,
164 M: Monitor<Index = Epoch>,
165 B: Blocker<PublicKey = P>,
166 TSu: ThresholdSupervisor<
167 Index = Epoch,
168 PublicKey = P,
169 Share = group::Share,
170 Identity = poly::Public<V>,
171 >,
172 NetS: Sender<PublicKey = P>,
173 NetR: Receiver<PublicKey = P>,
174 > Engine<E, P, V, D, A, Z, M, B, TSu, NetS, NetR>
175{
176 pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
178 let metrics = metrics::Metrics::init(context.clone());
179
180 Self {
181 context,
182 automaton: cfg.automaton,
183 reporter: cfg.reporter,
184 monitor: cfg.monitor,
185 validators: cfg.validators,
186 blocker: cfg.blocker,
187 namespace: cfg.namespace,
188 epoch_bounds: cfg.epoch_bounds,
189 window: cfg.window.into(),
190 epoch: 0,
191 tip: 0,
192 safe_tip: SafeTip::default(),
193 digest_requests: FuturesPool::default(),
194 pending: BTreeMap::new(),
195 confirmed: BTreeMap::new(),
196 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
197 rebroadcast_deadlines: PrioritySet::new(),
198 journal: None,
199 journal_partition: cfg.journal_partition,
200 journal_write_buffer: cfg.journal_write_buffer.into(),
201 journal_replay_buffer: cfg.journal_replay_buffer.into(),
202 journal_heights_per_section: cfg.journal_heights_per_section.into(),
203 journal_compression: cfg.journal_compression,
204 priority_acks: cfg.priority_acks,
205 _phantom: PhantomData,
206 metrics,
207 }
208 }
209
210 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
220 self.context.spawn_ref()(self.run(network))
221 }
222
223 async fn run(mut self, (net_sender, net_receiver): (NetS, NetR)) {
225 let (mut net_sender, mut net_receiver) = wrap((), net_sender, net_receiver);
226 let mut shutdown = self.context.stopped();
227
228 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
230 self.epoch = latest;
231
232 let journal_cfg = JConfig {
234 partition: self.journal_partition.clone(),
235 compression: self.journal_compression,
236 codec_config: (),
237 write_buffer: self.journal_write_buffer,
238 };
239 let journal = Journal::init(self.context.with_label("journal"), journal_cfg)
240 .await
241 .expect("init failed");
242 self.replay(&journal).await;
243 self.journal = Some(journal);
244
245 self.safe_tip.init(
247 self.validators
248 .participants(self.epoch)
249 .expect("unknown participants"),
250 );
251
252 loop {
253 self.metrics.tip.set(self.tip as i64);
254
255 let next = self.next();
257 if next < self.tip + self.window {
258 trace!("requesting new digest: index {}", next);
259 self.get_digest(next);
260 continue;
261 }
262
263 let rebroadcast = match self.rebroadcast_deadlines.peek() {
265 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
266 None => Either::Right(future::pending()),
267 };
268
269 select! {
271 _ = &mut shutdown => {
273 debug!("shutdown");
274 break;
275 },
276
277 epoch = epoch_updates.next() => {
279 let Some(epoch) = epoch else {
281 error!("epoch subscription failed");
282 break;
283 };
284
285 debug!(current=self.epoch, new=epoch, "refresh epoch");
287 assert!(epoch >= self.epoch);
288 self.epoch = epoch;
289
290 self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
292
293 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
295 self.pending.iter_mut().for_each(|(_, pending)| {
296 match pending {
297 Pending::Unverified(acks) => {
298 acks.retain(|epoch, _| *epoch >= min_epoch);
299 }
300 Pending::Verified(_, acks) => {
301 acks.retain(|epoch, _| *epoch >= min_epoch);
302 }
303 }
304 });
305
306 continue;
307 },
308
309 request = self.digest_requests.next_completed() => {
311 let DigestRequest { index, result, timer } = request;
312 drop(timer); match result {
314 Err(err) => {
315 warn!(?err, ?index, "automaton returned error");
316 self.metrics.digest.inc(Status::Dropped);
317 }
318 Ok(digest) => {
319 if let Err(err) = self.handle_digest(index, digest, &mut net_sender).await {
320 warn!(?err, ?index, "handle_digest failed");
321 continue;
322 }
323 }
324 }
325 },
326
327 msg = net_receiver.recv() => {
329 let (sender, msg) = match msg {
331 Ok(r) => r,
332 Err(err) => {
333 warn!(?err, "ack receiver failed");
334 break;
335 }
336 };
337 let mut guard = self.metrics.acks.guard(Status::Invalid);
338 let TipAck { ack, tip } = match msg {
339 Ok(peer_ack) => peer_ack,
340 Err(err) => {
341 warn!(?err, ?sender, "ack decode failed, blocking peer");
342 self.blocker.block(sender).await;
343 continue;
344 }
345 };
346
347 if self.safe_tip.update(sender.clone(), tip).is_some() {
349 let safe_tip = self.safe_tip.get();
351 if safe_tip > self.tip {
352 self.fast_forward_tip(safe_tip).await;
353 }
354 }
355
356 if let Err(err) = self.validate_ack(&ack, &sender) {
358 if err.blockable() {
359 warn!(?sender, ?err, "blocking peer for validation failure");
360 self.blocker.block(sender).await;
361 } else {
362 debug!(?sender, ?err, "ack validate failed");
363 }
364 continue;
365 };
366
367 if let Err(err) = self.handle_ack(&ack).await {
369 warn!(?err, ?sender, "ack handle failed");
370 guard.set(Status::Failure);
371 continue;
372 }
373
374 debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
376 guard.set(Status::Success);
377 },
378
379 _ = rebroadcast => {
381 let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
383 trace!("rebroadcasting: index {}", index);
384 if let Err(err) = self.handle_rebroadcast(index, &mut net_sender).await {
385 warn!(?err, ?index, "rebroadcast failed");
386 };
387 }
388 }
389 }
390
391 if let Some(journal) = self.journal.take() {
393 journal
394 .close()
395 .await
396 .expect("unable to close aggregation journal");
397 }
398 }
399
400 async fn handle_digest(
404 &mut self,
405 index: Index,
406 digest: D,
407 sender: &mut WrappedSender<NetS, TipAck<V, D>>,
408 ) -> Result<(), Error> {
409 if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
411 return Err(Error::AckIndex(index));
412 };
413
414 let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
416 panic!("Pending::Unverified entry not found");
417 };
418 self.pending
419 .insert(index, Pending::Verified(digest, HashMap::new()));
420
421 for epoch_acks in acks.values() {
424 for epoch_ack in epoch_acks.values() {
425 let _ = self.handle_ack(epoch_ack).await; }
427 if self.confirmed.contains_key(&index) {
429 break;
430 }
431 }
432
433 let ack = self.sign_ack(index, digest).await?;
435
436 self.set_rebroadcast_deadline(index);
438
439 let _ = self.handle_ack(&ack).await; self.broadcast(ack, sender).await?;
444
445 Ok(())
446 }
447
448 async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
453 let quorum = self.validators.identity().required();
455
456 let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
458 None => {
459 return Err(Error::AckIndex(ack.item.index));
462 }
463 Some(Pending::Unverified(acks)) => acks,
464 Some(Pending::Verified(_, acks)) => acks,
465 };
466
467 let acks = acks_by_epoch.entry(ack.epoch).or_default();
471 if acks.contains_key(&ack.signature.index) {
472 return Ok(());
473 }
474 acks.insert(ack.signature.index, ack.clone());
475
476 if acks.len() >= (quorum as usize) {
478 let item = ack.item.clone();
479 let partials = acks.values().map(|ack| &ack.signature);
480 let threshold = threshold_signature_recover::<V, _>(quorum, partials)
481 .expect("Failed to recover threshold signature");
482 self.metrics.threshold.inc();
483 self.handle_threshold(item, threshold).await;
484 }
485
486 Ok(())
487 }
488
489 async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
491 let index = item.index;
493 if self.confirmed.contains_key(&index) {
494 return;
495 }
496
497 self.confirmed.insert(index, (item.digest, threshold));
499
500 let recovered = Activity::Recovered(item, threshold);
502 self.record(recovered.clone()).await;
503 self.sync(index).await;
504 self.reporter.report(recovered).await;
505
506 if index == self.tip {
508 let mut new_tip = index.saturating_add(1);
510 while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
511 new_tip = new_tip.saturating_add(1);
512 }
513
514 if new_tip > self.tip {
516 self.fast_forward_tip(new_tip).await;
517 }
518 }
519 }
520
521 async fn handle_rebroadcast(
523 &mut self,
524 index: Index,
525 sender: &mut WrappedSender<NetS, TipAck<V, D>>,
526 ) -> Result<(), Error> {
527 let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
528 return Ok(());
530 };
531
532 let Some(share) = self.validators.share(self.epoch) else {
534 return Err(Error::UnknownShare(self.epoch));
535 };
536 let ack = acks
537 .get(&self.epoch)
538 .and_then(|acks| acks.get(&share.index).cloned());
539 let ack = match ack {
540 Some(ack) => ack,
541 None => self.sign_ack(index, *digest).await?,
542 };
543
544 self.set_rebroadcast_deadline(index);
546
547 self.broadcast(ack, sender).await
549 }
550
551 fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
558 {
560 let (eb_lo, eb_hi) = self.epoch_bounds;
561 let bound_lo = self.epoch.saturating_sub(eb_lo);
562 let bound_hi = self.epoch.saturating_add(eb_hi);
563 if ack.epoch < bound_lo || ack.epoch > bound_hi {
564 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
565 }
566 }
567
568 let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
570 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
571 };
572 if sig_index != ack.signature.index {
573 return Err(Error::PeerMismatch);
574 }
575
576 if ack.item.index < self.tip {
578 return Err(Error::AckThresholded(ack.item.index));
579 }
580 if ack.item.index >= self.tip + self.window {
581 return Err(Error::AckIndex(ack.item.index));
582 }
583
584 if self.confirmed.contains_key(&ack.item.index) {
586 return Err(Error::AckThresholded(ack.item.index));
587 }
588 let have_ack = match self.pending.get(&ack.item.index) {
589 None => false,
590 Some(Pending::Unverified(epoch_map)) => epoch_map
591 .get(&ack.epoch)
592 .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
593 Some(Pending::Verified(_, epoch_map)) => epoch_map
594 .get(&ack.epoch)
595 .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
596 };
597 if have_ack {
598 return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
599 }
600
601 if !ack.verify(&self.namespace, self.validators.identity()) {
603 return Err(Error::InvalidAckSignature);
604 }
605
606 Ok(())
607 }
608
609 fn get_digest(&mut self, index: Index) {
613 assert!(self
614 .pending
615 .insert(index, Pending::Unverified(HashMap::new()))
616 .is_none());
617
618 let mut automaton = self.automaton.clone();
619 let timer = self.metrics.digest_duration.timer();
620 self.digest_requests.push(async move {
621 let receiver = automaton.propose(index).await;
622 let result = receiver.await.map_err(Error::AppProposeCanceled);
623 DigestRequest {
624 index,
625 result,
626 timer,
627 }
628 });
629 }
630
631 fn set_rebroadcast_deadline(&mut self, index: Index) {
633 self.rebroadcast_deadlines
634 .put(index, self.context.current() + self.rebroadcast_timeout);
635 }
636
637 async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
640 let Some(share) = self.validators.share(self.epoch) else {
641 return Err(Error::UnknownShare(self.epoch));
642 };
643
644 let item = Item { index, digest };
646 let ack = Ack::sign(&self.namespace, self.epoch, share, item);
647
648 self.record(Activity::Ack(ack.clone())).await;
650 self.sync(index).await;
651
652 Ok(ack)
653 }
654
655 async fn broadcast(
659 &mut self,
660 ack: Ack<V, D>,
661 sender: &mut WrappedSender<NetS, TipAck<V, D>>,
662 ) -> Result<(), Error> {
663 sender
664 .send(
665 Recipients::All,
666 TipAck { ack, tip: self.tip },
667 self.priority_acks,
668 )
669 .await
670 .map_err(|err| {
671 warn!(?err, "failed to send ack");
672 Error::UnableToSendMessage
673 })?;
674 Ok(())
675 }
676
677 fn next(&self) -> Index {
680 let max_pending = self
681 .pending
682 .last_key_value()
683 .map(|(k, _)| k.saturating_add(1))
684 .unwrap_or_default();
685 let max_confirmed = self
686 .confirmed
687 .last_key_value()
688 .map(|(k, _)| k.saturating_add(1))
689 .unwrap_or_default();
690 max(self.tip, max(max_pending, max_confirmed))
691 }
692
693 async fn fast_forward_tip(&mut self, tip: Index) {
699 assert!(tip > self.tip);
700
701 self.pending.retain(|index, _| *index >= tip);
703 self.confirmed.retain(|index, _| *index >= tip);
704
705 self.record(Activity::Tip(tip)).await;
707 self.sync(tip).await;
708 self.reporter.report(Activity::Tip(tip)).await;
709
710 let section = self.get_journal_section(tip);
712 let journal = self.journal.as_mut().expect("journal must be initialized");
713 let _ = journal.prune(section).await;
714
715 self.tip = tip;
717 }
718
719 fn get_journal_section(&self, index: Index) -> u64 {
723 index / self.journal_heights_per_section
724 }
725
726 async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) {
728 let mut tip = Index::default();
729 let mut recovered = Vec::new();
730 let mut acks = Vec::new();
731 let stream = journal
732 .replay(self.journal_replay_buffer)
733 .await
734 .expect("replay failed");
735 pin_mut!(stream);
736 while let Some(msg) = stream.next().await {
737 let (_, _, _, activity) = msg.expect("replay failed");
738 match activity {
739 Activity::Tip(index) => {
740 tip = max(tip, index);
741 }
742 Activity::Recovered(item, signature) => {
743 recovered.push((item, signature));
744 }
745 Activity::Ack(ack) => {
746 acks.push(ack);
747 }
748 }
749 }
750 self.tip = tip;
752 recovered
754 .iter()
755 .filter(|(item, _)| item.index >= tip)
756 .for_each(|(item, signature)| {
757 self.confirmed.insert(item.index, (item.digest, *signature));
758 });
759 acks = acks
761 .into_iter()
762 .filter(|ack| ack.item.index >= tip && !self.confirmed.contains_key(&ack.item.index))
763 .collect::<Vec<_>>();
764 acks.iter().for_each(|ack| {
765 assert!(self
766 .pending
767 .insert(
768 ack.item.index,
769 Pending::Verified(
770 ack.item.digest,
771 HashMap::from([(
772 ack.epoch,
773 HashMap::from([(ack.signature.index, ack.clone())]),
774 )]),
775 ),
776 )
777 .is_none());
778 self.set_rebroadcast_deadline(ack.item.index);
779 });
780 }
781
782 async fn record(&mut self, activity: Activity<V, D>) {
784 let index = match activity {
785 Activity::Ack(ref ack) => ack.item.index,
786 Activity::Recovered(ref item, _) => item.index,
787 Activity::Tip(index) => index,
788 };
789 let section = self.get_journal_section(index);
790 self.journal
791 .as_mut()
792 .expect("journal must be initialized")
793 .append(section, activity)
794 .await
795 .expect("unable to append to journal");
796 }
797
798 async fn sync(&mut self, index: Index) {
800 let section = self.get_journal_section(index);
801 let journal = self.journal.as_mut().expect("journal must be initialized");
802 journal.sync(section).await.expect("unable to sync journal");
803 }
804}