1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Epoch, Error, Index, Item, TipAck},
7 Config,
8};
9use crate::{aggregation::types::Certificate, Automaton, Monitor, Reporter, ThresholdSupervisor};
10use commonware_cryptography::{
11 bls12381::primitives::{group, ops::threshold_signature_recover, variant::Variant},
12 Digest, PublicKey,
13};
14use commonware_macros::select;
15use commonware_p2p::{
16 utils::codec::{wrap, WrappedSender},
17 Blocker, Receiver, Recipients, Sender,
18};
19use commonware_runtime::{
20 buffer::PoolRef,
21 telemetry::metrics::{
22 histogram,
23 status::{CounterExt, Status},
24 },
25 Clock, Handle, Metrics, Spawner, Storage,
26};
27use commonware_storage::journal::variable::{Config as JConfig, Journal};
28use commonware_utils::{futures::Pool as FuturesPool, quorum_from_slice, PrioritySet};
29use futures::{
30 future::{self, Either},
31 pin_mut, StreamExt,
32};
33use std::{
34 cmp::max,
35 collections::BTreeMap,
36 num::NonZeroUsize,
37 time::{Duration, SystemTime},
38};
39use tracing::{debug, error, info, trace, warn};
40
41enum Pending<V: Variant, D: Digest> {
43 Unverified(BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
46
47 Verified(D, BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
49}
50
51struct DigestRequest<D: Digest, E: Clock> {
54 index: Index,
56
57 result: Result<D, Error>,
59
60 timer: histogram::Timer<E>,
62}
63
64pub struct Engine<
66 E: Clock + Spawner + Storage + Metrics,
67 P: PublicKey,
68 V: Variant,
69 D: Digest,
70 A: Automaton<Context = Index, Digest = D> + Clone,
71 Z: Reporter<Activity = Activity<V, D>>,
72 M: Monitor<Index = Epoch>,
73 B: Blocker<PublicKey = P>,
74 TSu: ThresholdSupervisor<
75 Index = Epoch,
76 PublicKey = P,
77 Polynomial = Vec<V::Public>,
78 Share = group::Share,
79 >,
80> {
81 context: E,
83 automaton: A,
84 monitor: M,
85 validators: TSu,
86 reporter: Z,
87 blocker: B,
88
89 namespace: Vec<u8>,
92
93 epoch_bounds: (u64, u64),
102
103 window: u64,
105
106 activity_timeout: u64,
108
109 digest_requests: FuturesPool<DigestRequest<D, E>>,
112
113 epoch: Epoch,
116
117 tip: Index,
119
120 safe_tip: SafeTip<P>,
122
123 pending: BTreeMap<Index, Pending<V, D>>,
127
128 confirmed: BTreeMap<Index, Certificate<V, D>>,
130
131 rebroadcast_timeout: Duration,
134
135 rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
137
138 journal: Option<Journal<E, Activity<V, D>>>,
141 journal_partition: String,
142 journal_write_buffer: NonZeroUsize,
143 journal_replay_buffer: NonZeroUsize,
144 journal_heights_per_section: u64,
145 journal_compression: Option<u8>,
146 journal_buffer_pool: PoolRef,
147
148 priority_acks: bool,
151
152 metrics: metrics::Metrics<E>,
155}
156
157impl<
158 E: Clock + Spawner + Storage + Metrics,
159 P: PublicKey,
160 V: Variant,
161 D: Digest,
162 A: Automaton<Context = Index, Digest = D> + Clone,
163 Z: Reporter<Activity = Activity<V, D>>,
164 M: Monitor<Index = Epoch>,
165 B: Blocker<PublicKey = P>,
166 TSu: ThresholdSupervisor<
167 Index = Epoch,
168 PublicKey = P,
169 Polynomial = Vec<V::Public>,
170 Share = group::Share,
171 >,
172 > Engine<E, P, V, D, A, Z, M, B, TSu>
173{
174 pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
176 let metrics = metrics::Metrics::init(context.clone());
177
178 Self {
179 context,
180 automaton: cfg.automaton,
181 reporter: cfg.reporter,
182 monitor: cfg.monitor,
183 validators: cfg.validators,
184 blocker: cfg.blocker,
185 namespace: cfg.namespace,
186 epoch_bounds: cfg.epoch_bounds,
187 window: cfg.window.into(),
188 activity_timeout: cfg.activity_timeout,
189 epoch: 0,
190 tip: 0,
191 safe_tip: SafeTip::default(),
192 digest_requests: FuturesPool::default(),
193 pending: BTreeMap::new(),
194 confirmed: BTreeMap::new(),
195 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
196 rebroadcast_deadlines: PrioritySet::new(),
197 journal: None,
198 journal_partition: cfg.journal_partition,
199 journal_write_buffer: cfg.journal_write_buffer,
200 journal_replay_buffer: cfg.journal_replay_buffer,
201 journal_heights_per_section: cfg.journal_heights_per_section.into(),
202 journal_compression: cfg.journal_compression,
203 journal_buffer_pool: cfg.journal_buffer_pool,
204 priority_acks: cfg.priority_acks,
205 metrics,
206 }
207 }
208
209 pub fn start(
219 mut self,
220 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
221 ) -> Handle<()> {
222 self.context.spawn_ref()(self.run(network))
223 }
224
225 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
227 let (mut sender, mut receiver) = wrap((), network.0, network.1);
228 let mut shutdown = self.context.stopped();
229
230 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
232 self.epoch = latest;
233
234 let journal_cfg = JConfig {
236 partition: self.journal_partition.clone(),
237 compression: self.journal_compression,
238 codec_config: (),
239 buffer_pool: self.journal_buffer_pool.clone(),
240 write_buffer: self.journal_write_buffer,
241 };
242 let journal = Journal::init(self.context.with_label("journal"), journal_cfg)
243 .await
244 .expect("init failed");
245 self.replay(&journal).await;
246 self.journal = Some(journal);
247
248 self.safe_tip.init(
250 self.validators
251 .participants(self.epoch)
252 .expect("unknown participants"),
253 );
254
255 loop {
256 self.metrics.tip.set(self.tip as i64);
257
258 let next = self.next();
260 if next < self.tip + self.window {
261 trace!("requesting new digest: index {}", next);
262 self.get_digest(next);
263 continue;
264 }
265
266 let rebroadcast = match self.rebroadcast_deadlines.peek() {
268 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
269 None => Either::Right(future::pending()),
270 };
271
272 select! {
274 _ = &mut shutdown => {
276 debug!("shutdown");
277 break;
278 },
279
280 epoch = epoch_updates.next() => {
282 let Some(epoch) = epoch else {
284 error!("epoch subscription failed");
285 break;
286 };
287
288 debug!(current=self.epoch, new=epoch, "refresh epoch");
290 assert!(epoch >= self.epoch);
291 self.epoch = epoch;
292
293 self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
295
296 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
298 self.pending.iter_mut().for_each(|(_, pending)| {
299 match pending {
300 Pending::Unverified(acks) => {
301 acks.retain(|epoch, _| *epoch >= min_epoch);
302 }
303 Pending::Verified(_, acks) => {
304 acks.retain(|epoch, _| *epoch >= min_epoch);
305 }
306 }
307 });
308
309 continue;
310 },
311
312 request = self.digest_requests.next_completed() => {
314 let DigestRequest { index, result, timer } = request;
315 drop(timer); match result {
317 Err(err) => {
318 warn!(?err, ?index, "automaton returned error");
319 self.metrics.digest.inc(Status::Dropped);
320 }
321 Ok(digest) => {
322 if let Err(err) = self.handle_digest(index, digest, &mut sender).await {
323 debug!(?err, ?index, "handle_digest failed");
324 continue;
325 }
326 }
327 }
328 },
329
330 msg = receiver.recv() => {
332 let (sender, msg) = match msg {
334 Ok(r) => r,
335 Err(err) => {
336 warn!(?err, "ack receiver failed");
337 break;
338 }
339 };
340 let mut guard = self.metrics.acks.guard(Status::Invalid);
341 let TipAck { ack, tip } = match msg {
342 Ok(peer_ack) => peer_ack,
343 Err(err) => {
344 warn!(?err, ?sender, "ack decode failed, blocking peer");
345 self.blocker.block(sender).await;
346 continue;
347 }
348 };
349
350 if self.safe_tip.update(sender.clone(), tip).is_some() {
352 let safe_tip = self.safe_tip.get();
354 if safe_tip > self.tip {
355 self.fast_forward_tip(safe_tip).await;
356 }
357 }
358
359 if let Err(err) = self.validate_ack(&ack, &sender) {
361 if err.blockable() {
362 warn!(?sender, ?err, "blocking peer for validation failure");
363 self.blocker.block(sender).await;
364 } else {
365 debug!(?sender, ?err, "ack validate failed");
366 }
367 continue;
368 };
369
370 if let Err(err) = self.handle_ack(&ack).await {
372 debug!(?err, ?sender, "ack handle failed");
373 guard.set(Status::Failure);
374 continue;
375 }
376
377 debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
379 guard.set(Status::Success);
380 },
381
382 _ = rebroadcast => {
384 let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
386 trace!("rebroadcasting: index {}", index);
387 if let Err(err) = self.handle_rebroadcast(index, &mut sender).await {
388 warn!(?err, ?index, "rebroadcast failed");
389 };
390 }
391 }
392 }
393
394 if let Some(journal) = self.journal.take() {
396 journal
397 .close()
398 .await
399 .expect("unable to close aggregation journal");
400 }
401 }
402
403 async fn handle_digest(
407 &mut self,
408 index: Index,
409 digest: D,
410 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
411 ) -> Result<(), Error> {
412 if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
414 return Err(Error::AckIndex(index));
415 };
416
417 let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
419 panic!("Pending::Unverified entry not found");
420 };
421 self.pending
422 .insert(index, Pending::Verified(digest, BTreeMap::new()));
423
424 for epoch_acks in acks.values() {
428 for epoch_ack in epoch_acks.values() {
429 if epoch_ack.item.digest != digest {
431 continue;
432 }
433
434 let _ = self.handle_ack(epoch_ack).await;
436 }
437 if self.confirmed.contains_key(&index) {
439 break;
440 }
441 }
442
443 let ack = self.sign_ack(index, digest).await?;
445
446 self.rebroadcast_deadlines
448 .put(index, self.context.current() + self.rebroadcast_timeout);
449
450 let _ = self.handle_ack(&ack).await;
452
453 self.broadcast(ack, sender).await?;
455
456 Ok(())
457 }
458
459 async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
464 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
466 return Err(Error::UnknownEpoch(ack.epoch));
467 };
468 let quorum = quorum_from_slice(polynomial);
469
470 let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
472 None => {
473 return Err(Error::AckIndex(ack.item.index));
476 }
477 Some(Pending::Unverified(acks)) => acks,
478 Some(Pending::Verified(digest, acks)) => {
479 if ack.item.digest != *digest {
481 return Err(Error::AckDigest(ack.item.index));
482 }
483 acks
484 }
485 };
486
487 let acks = acks_by_epoch.entry(ack.epoch).or_default();
489 if acks.contains_key(&ack.signature.index) {
490 return Ok(());
491 }
492 acks.insert(ack.signature.index, ack.clone());
493
494 let partials = acks
496 .values()
497 .filter(|a| a.item.digest == ack.item.digest)
498 .map(|ack| &ack.signature)
499 .collect::<Vec<_>>();
500 if partials.len() >= (quorum as usize) {
501 let item = ack.item.clone();
502 let threshold = threshold_signature_recover::<V, _>(quorum, partials)
503 .expect("Failed to recover threshold signature");
504 self.metrics.threshold.inc();
505 self.handle_threshold(item, threshold).await;
506 }
507
508 Ok(())
509 }
510
511 async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
513 let index = item.index;
515 if self.confirmed.contains_key(&index) {
516 return;
517 }
518
519 let certificate = Certificate {
521 item,
522 signature: threshold,
523 };
524 self.confirmed.insert(index, certificate.clone());
525
526 let certified = Activity::Certified(certificate);
528 self.record(certified.clone()).await;
529 self.sync(index).await;
530 self.reporter.report(certified).await;
531
532 if index == self.tip {
534 let mut new_tip = index.saturating_add(1);
536 while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
537 new_tip = new_tip.saturating_add(1);
538 }
539
540 if new_tip > self.tip {
542 self.fast_forward_tip(new_tip).await;
543 }
544 }
545 }
546
547 async fn handle_rebroadcast(
549 &mut self,
550 index: Index,
551 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
552 ) -> Result<(), Error> {
553 let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
554 return Ok(());
556 };
557
558 let Some(share) = self.validators.share(self.epoch) else {
560 return Err(Error::UnknownShare(self.epoch));
561 };
562 let ack = acks
563 .get(&self.epoch)
564 .and_then(|acks| acks.get(&share.index).cloned());
565 let ack = match ack {
566 Some(ack) => ack,
567 None => self.sign_ack(index, *digest).await?,
568 };
569
570 self.rebroadcast_deadlines
572 .put(index, self.context.current() + self.rebroadcast_timeout);
573
574 self.broadcast(ack, sender).await
576 }
577
578 fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
585 {
587 let (eb_lo, eb_hi) = self.epoch_bounds;
588 let bound_lo = self.epoch.saturating_sub(eb_lo);
589 let bound_hi = self.epoch.saturating_add(eb_hi);
590 if ack.epoch < bound_lo || ack.epoch > bound_hi {
591 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
592 }
593 }
594
595 let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
597 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
598 };
599 if sig_index != ack.signature.index {
600 return Err(Error::PeerMismatch);
601 }
602
603 let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
605 if ack.item.index < activity_threshold {
606 return Err(Error::AckThresholded(ack.item.index));
607 }
608
609 if ack.item.index >= self.tip + self.window {
611 return Err(Error::AckIndex(ack.item.index));
612 }
613
614 if self.confirmed.contains_key(&ack.item.index) {
616 return Err(Error::AckThresholded(ack.item.index));
617 }
618 let have_ack = match self.pending.get(&ack.item.index) {
619 None => false,
620 Some(Pending::Unverified(epoch_map)) => epoch_map
621 .get(&ack.epoch)
622 .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
623 Some(Pending::Verified(digest, epoch_map)) => {
624 if ack.item.digest != *digest {
627 return Err(Error::AckDigest(ack.item.index));
628 }
629 epoch_map
630 .get(&ack.epoch)
631 .is_some_and(|acks| acks.contains_key(&ack.signature.index))
632 }
633 };
634 if have_ack {
635 return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
636 }
637
638 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
640 return Err(Error::UnknownEpoch(ack.epoch));
641 };
642 if !ack.verify(&self.namespace, polynomial) {
643 return Err(Error::InvalidAckSignature);
644 }
645
646 Ok(())
647 }
648
649 fn get_digest(&mut self, index: Index) {
653 assert!(self
654 .pending
655 .insert(index, Pending::Unverified(BTreeMap::new()))
656 .is_none());
657
658 let mut automaton = self.automaton.clone();
659 let timer = self.metrics.digest_duration.timer();
660 self.digest_requests.push(async move {
661 let receiver = automaton.propose(index).await;
662 let result = receiver.await.map_err(Error::AppProposeCanceled);
663 DigestRequest {
664 index,
665 result,
666 timer,
667 }
668 });
669 }
670
671 async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
674 let Some(share) = self.validators.share(self.epoch) else {
675 return Err(Error::UnknownShare(self.epoch));
676 };
677
678 let item = Item { index, digest };
680 let ack = Ack::sign(&self.namespace, self.epoch, share, item);
681
682 self.record(Activity::Ack(ack.clone())).await;
684 self.sync(index).await;
685
686 Ok(ack)
687 }
688
689 async fn broadcast(
693 &mut self,
694 ack: Ack<V, D>,
695 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
696 ) -> Result<(), Error> {
697 sender
698 .send(
699 Recipients::All,
700 TipAck { ack, tip: self.tip },
701 self.priority_acks,
702 )
703 .await
704 .map_err(|err| {
705 warn!(?err, "failed to send ack");
706 Error::UnableToSendMessage
707 })?;
708 Ok(())
709 }
710
711 fn next(&self) -> Index {
714 let max_pending = self
715 .pending
716 .last_key_value()
717 .map(|(k, _)| k.saturating_add(1))
718 .unwrap_or_default();
719 let max_confirmed = self
720 .confirmed
721 .last_key_value()
722 .map(|(k, _)| k.saturating_add(1))
723 .unwrap_or_default();
724 max(self.tip, max(max_pending, max_confirmed))
725 }
726
727 async fn fast_forward_tip(&mut self, tip: Index) {
733 assert!(tip > self.tip);
734
735 let activity_threshold = tip.saturating_sub(self.activity_timeout);
737 self.pending.retain(|index, _| *index >= activity_threshold);
738 self.confirmed
739 .retain(|index, _| *index >= activity_threshold);
740
741 self.record(Activity::Tip(tip)).await;
743 self.sync(tip).await;
744 self.reporter.report(Activity::Tip(tip)).await;
745
746 let section = self.get_journal_section(activity_threshold);
748 let journal = self.journal.as_mut().expect("journal must be initialized");
749 let _ = journal.prune(section).await;
750
751 self.tip = tip;
753 }
754
755 fn get_journal_section(&self, index: Index) -> u64 {
759 index / self.journal_heights_per_section
760 }
761
762 async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) {
764 let mut tip = Index::default();
765 let mut certified = Vec::new();
766 let mut acks = Vec::new();
767 let stream = journal
768 .replay(0, 0, self.journal_replay_buffer)
769 .await
770 .expect("replay failed");
771 pin_mut!(stream);
772 while let Some(msg) = stream.next().await {
773 let (_, _, _, activity) = msg.expect("replay failed");
774 match activity {
775 Activity::Tip(index) => {
776 tip = max(tip, index);
777 self.reporter.report(Activity::Tip(index)).await;
778 }
779 Activity::Certified(certificate) => {
780 certified.push(certificate.clone());
781 self.reporter.report(Activity::Certified(certificate)).await;
782 }
783 Activity::Ack(ack) => {
784 acks.push(ack.clone());
785 self.reporter.report(Activity::Ack(ack)).await;
786 }
787 }
788 }
789
790 self.tip = tip;
792 let activity_threshold = tip.saturating_sub(self.activity_timeout);
793
794 certified
796 .iter()
797 .filter(|certificate| certificate.item.index >= activity_threshold)
798 .for_each(|certificate| {
799 self.confirmed
800 .insert(certificate.item.index, certificate.clone());
801 });
802
803 let mut acks_by_index: BTreeMap<Index, Vec<Ack<V, D>>> = BTreeMap::new();
805 for ack in acks {
806 if ack.item.index >= activity_threshold && !self.confirmed.contains_key(&ack.item.index)
807 {
808 acks_by_index.entry(ack.item.index).or_default().push(ack);
809 }
810 }
811
812 for (index, mut acks_group) in acks_by_index {
814 let our_share = self.validators.share(self.epoch);
816 let our_digest = if let Some(share) = our_share {
817 acks_group
818 .iter()
819 .find(|ack| ack.epoch == self.epoch && ack.signature.index == share.index)
820 .map(|ack| ack.item.digest)
821 } else {
822 None
823 };
824
825 if let Some(digest) = our_digest {
827 acks_group.retain(|other| other.item.digest == digest);
828 }
829
830 let mut epoch_map = BTreeMap::new();
832 for ack in acks_group {
833 epoch_map
834 .entry(ack.epoch)
835 .or_insert_with(BTreeMap::new)
836 .insert(ack.signature.index, ack);
837 }
838
839 match our_digest {
842 Some(digest) => {
843 self.pending
844 .insert(index, Pending::Verified(digest, epoch_map));
845
846 self.rebroadcast_deadlines
848 .put(index, self.context.current());
849 }
850 None => {
851 self.pending.insert(index, Pending::Unverified(epoch_map));
852 }
853 }
854 }
855
856 info!(self.tip, next = self.next(), "replayed journal");
857 }
858
859 async fn record(&mut self, activity: Activity<V, D>) {
861 let index = match activity {
862 Activity::Ack(ref ack) => ack.item.index,
863 Activity::Certified(ref certificate) => certificate.item.index,
864 Activity::Tip(index) => index,
865 };
866 let section = self.get_journal_section(index);
867 self.journal
868 .as_mut()
869 .expect("journal must be initialized")
870 .append(section, activity)
871 .await
872 .expect("unable to append to journal");
873 }
874
875 async fn sync(&mut self, index: Index) {
877 let section = self.get_journal_section(index);
878 let journal = self.journal.as_mut().expect("journal must be initialized");
879 journal.sync(section).await.expect("unable to sync journal");
880 }
881}