1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Epoch, Error, Index, Item, TipAck},
7 Config,
8};
9use crate::{aggregation::types::Certificate, Automaton, Monitor, Reporter, ThresholdSupervisor};
10use commonware_cryptography::{
11 bls12381::primitives::{group, ops::threshold_signature_recover, variant::Variant},
12 Digest, PublicKey,
13};
14use commonware_macros::select;
15use commonware_p2p::{
16 utils::codec::{wrap, WrappedSender},
17 Blocker, Receiver, Recipients, Sender,
18};
19use commonware_runtime::{
20 buffer::PoolRef,
21 telemetry::metrics::{
22 histogram,
23 status::{CounterExt, Status},
24 },
25 Clock, Handle, Metrics, Spawner, Storage,
26};
27use commonware_storage::journal::variable::{Config as JConfig, Journal};
28use commonware_utils::{futures::Pool as FuturesPool, quorum_from_slice, PrioritySet};
29use futures::{
30 future::{self, Either},
31 pin_mut, StreamExt,
32};
33use std::{
34 cmp::max,
35 collections::BTreeMap,
36 num::NonZeroUsize,
37 time::{Duration, SystemTime},
38};
39use tracing::{debug, error, info, trace, warn};
40
41enum Pending<V: Variant, D: Digest> {
43 Unverified(BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
46
47 Verified(D, BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
49}
50
51struct DigestRequest<D: Digest, E: Clock> {
54 index: Index,
56
57 result: Result<D, Error>,
59
60 timer: histogram::Timer<E>,
62}
63
64pub struct Engine<
66 E: Clock + Spawner + Storage + Metrics,
67 P: PublicKey,
68 V: Variant,
69 D: Digest,
70 A: Automaton<Context = Index, Digest = D> + Clone,
71 Z: Reporter<Activity = Activity<V, D>>,
72 M: Monitor<Index = Epoch>,
73 B: Blocker<PublicKey = P>,
74 TSu: ThresholdSupervisor<
75 Index = Epoch,
76 PublicKey = P,
77 Polynomial = Vec<V::Public>,
78 Share = group::Share,
79 >,
80> {
81 context: E,
83 automaton: A,
84 monitor: M,
85 validators: TSu,
86 reporter: Z,
87 blocker: B,
88
89 namespace: Vec<u8>,
92
93 epoch_bounds: (u64, u64),
102
103 window: u64,
105
106 activity_timeout: u64,
108
109 digest_requests: FuturesPool<DigestRequest<D, E>>,
112
113 epoch: Epoch,
116
117 tip: Index,
119
120 safe_tip: SafeTip<P>,
122
123 pending: BTreeMap<Index, Pending<V, D>>,
127
128 confirmed: BTreeMap<Index, Certificate<V, D>>,
130
131 rebroadcast_timeout: Duration,
134
135 rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
137
138 journal: Option<Journal<E, Activity<V, D>>>,
141 journal_partition: String,
142 journal_write_buffer: NonZeroUsize,
143 journal_replay_buffer: NonZeroUsize,
144 journal_heights_per_section: u64,
145 journal_compression: Option<u8>,
146 journal_buffer_pool: PoolRef,
147
148 priority_acks: bool,
151
152 metrics: metrics::Metrics<E>,
155}
156
157impl<
158 E: Clock + Spawner + Storage + Metrics,
159 P: PublicKey,
160 V: Variant,
161 D: Digest,
162 A: Automaton<Context = Index, Digest = D> + Clone,
163 Z: Reporter<Activity = Activity<V, D>>,
164 M: Monitor<Index = Epoch>,
165 B: Blocker<PublicKey = P>,
166 TSu: ThresholdSupervisor<
167 Index = Epoch,
168 PublicKey = P,
169 Polynomial = Vec<V::Public>,
170 Share = group::Share,
171 >,
172 > Engine<E, P, V, D, A, Z, M, B, TSu>
173{
174 pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
176 let metrics = metrics::Metrics::init(context.clone());
177
178 Self {
179 context,
180 automaton: cfg.automaton,
181 reporter: cfg.reporter,
182 monitor: cfg.monitor,
183 validators: cfg.validators,
184 blocker: cfg.blocker,
185 namespace: cfg.namespace,
186 epoch_bounds: cfg.epoch_bounds,
187 window: cfg.window.into(),
188 activity_timeout: cfg.activity_timeout,
189 epoch: 0,
190 tip: 0,
191 safe_tip: SafeTip::default(),
192 digest_requests: FuturesPool::default(),
193 pending: BTreeMap::new(),
194 confirmed: BTreeMap::new(),
195 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
196 rebroadcast_deadlines: PrioritySet::new(),
197 journal: None,
198 journal_partition: cfg.journal_partition,
199 journal_write_buffer: cfg.journal_write_buffer,
200 journal_replay_buffer: cfg.journal_replay_buffer,
201 journal_heights_per_section: cfg.journal_heights_per_section.into(),
202 journal_compression: cfg.journal_compression,
203 journal_buffer_pool: cfg.journal_buffer_pool,
204 priority_acks: cfg.priority_acks,
205 metrics,
206 }
207 }
208
209 pub fn start(
219 mut self,
220 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
221 ) -> Handle<()> {
222 self.context.spawn_ref()(self.run(network))
223 }
224
225 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
227 let (mut sender, mut receiver) = wrap((), network.0, network.1);
228 let mut shutdown = self.context.stopped();
229
230 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
232 self.epoch = latest;
233
234 let journal_cfg = JConfig {
236 partition: self.journal_partition.clone(),
237 compression: self.journal_compression,
238 codec_config: (),
239 buffer_pool: self.journal_buffer_pool.clone(),
240 write_buffer: self.journal_write_buffer,
241 };
242 let journal = Journal::init(self.context.with_label("journal"), journal_cfg)
243 .await
244 .expect("init failed");
245 let unverified_indices = self.replay(&journal).await;
246 self.journal = Some(journal);
247
248 for index in unverified_indices {
250 trace!(index, "requesting digest for unverified index from replay");
251 self.get_digest(index);
252 }
253
254 self.safe_tip.init(
256 self.validators
257 .participants(self.epoch)
258 .expect("unknown participants"),
259 );
260
261 loop {
262 self.metrics.tip.set(self.tip as i64);
263
264 let next = self.next();
266 if next < self.tip + self.window {
267 trace!(next, "requesting new digest");
268 assert!(self
269 .pending
270 .insert(next, Pending::Unverified(BTreeMap::new()))
271 .is_none());
272 self.get_digest(next);
273 continue;
274 }
275
276 let rebroadcast = match self.rebroadcast_deadlines.peek() {
278 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
279 None => Either::Right(future::pending()),
280 };
281
282 select! {
284 _ = &mut shutdown => {
286 debug!("shutdown");
287 break;
288 },
289
290 epoch = epoch_updates.next() => {
292 let Some(epoch) = epoch else {
294 error!("epoch subscription failed");
295 break;
296 };
297
298 debug!(current=self.epoch, new=epoch, "refresh epoch");
300 assert!(epoch >= self.epoch);
301 self.epoch = epoch;
302
303 self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
305
306 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
308 self.pending.iter_mut().for_each(|(_, pending)| {
309 match pending {
310 Pending::Unverified(acks) => {
311 acks.retain(|epoch, _| *epoch >= min_epoch);
312 }
313 Pending::Verified(_, acks) => {
314 acks.retain(|epoch, _| *epoch >= min_epoch);
315 }
316 }
317 });
318
319 continue;
320 },
321
322 request = self.digest_requests.next_completed() => {
324 let DigestRequest { index, result, timer } = request;
325 drop(timer); match result {
327 Err(err) => {
328 warn!(?err, ?index, "automaton returned error");
329 self.metrics.digest.inc(Status::Dropped);
330 }
331 Ok(digest) => {
332 if let Err(err) = self.handle_digest(index, digest, &mut sender).await {
333 debug!(?err, ?index, "handle_digest failed");
334 continue;
335 }
336 }
337 }
338 },
339
340 msg = receiver.recv() => {
342 let (sender, msg) = match msg {
344 Ok(r) => r,
345 Err(err) => {
346 warn!(?err, "ack receiver failed");
347 break;
348 }
349 };
350 let mut guard = self.metrics.acks.guard(Status::Invalid);
351 let TipAck { ack, tip } = match msg {
352 Ok(peer_ack) => peer_ack,
353 Err(err) => {
354 warn!(?err, ?sender, "ack decode failed, blocking peer");
355 self.blocker.block(sender).await;
356 continue;
357 }
358 };
359
360 if self.safe_tip.update(sender.clone(), tip).is_some() {
362 let safe_tip = self.safe_tip.get();
364 if safe_tip > self.tip {
365 self.fast_forward_tip(safe_tip).await;
366 }
367 }
368
369 if let Err(err) = self.validate_ack(&ack, &sender) {
371 if err.blockable() {
372 warn!(?sender, ?err, "blocking peer for validation failure");
373 self.blocker.block(sender).await;
374 } else {
375 debug!(?sender, ?err, "ack validate failed");
376 }
377 continue;
378 };
379
380 if let Err(err) = self.handle_ack(&ack).await {
382 debug!(?err, ?sender, "ack handle failed");
383 guard.set(Status::Failure);
384 continue;
385 }
386
387 debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
389 guard.set(Status::Success);
390 },
391
392 _ = rebroadcast => {
394 let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
396 trace!("rebroadcasting: index {}", index);
397 if let Err(err) = self.handle_rebroadcast(index, &mut sender).await {
398 warn!(?err, ?index, "rebroadcast failed");
399 };
400 }
401 }
402 }
403
404 if let Some(journal) = self.journal.take() {
406 journal
407 .close()
408 .await
409 .expect("unable to close aggregation journal");
410 }
411 }
412
413 async fn handle_digest(
417 &mut self,
418 index: Index,
419 digest: D,
420 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
421 ) -> Result<(), Error> {
422 if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
424 return Err(Error::AckIndex(index));
425 };
426
427 let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
429 panic!("Pending::Unverified entry not found");
430 };
431 self.pending
432 .insert(index, Pending::Verified(digest, BTreeMap::new()));
433
434 for epoch_acks in acks.values() {
438 for epoch_ack in epoch_acks.values() {
439 if epoch_ack.item.digest != digest {
441 continue;
442 }
443
444 let _ = self.handle_ack(epoch_ack).await;
446 }
447 if self.confirmed.contains_key(&index) {
449 break;
450 }
451 }
452
453 let ack = self.sign_ack(index, digest).await?;
455
456 self.rebroadcast_deadlines
458 .put(index, self.context.current() + self.rebroadcast_timeout);
459
460 let _ = self.handle_ack(&ack).await;
462
463 self.broadcast(ack, sender).await?;
465
466 Ok(())
467 }
468
469 async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
474 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
476 return Err(Error::UnknownEpoch(ack.epoch));
477 };
478 let quorum = quorum_from_slice(polynomial);
479
480 let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
482 None => {
483 return Err(Error::AckIndex(ack.item.index));
486 }
487 Some(Pending::Unverified(acks)) => acks,
488 Some(Pending::Verified(digest, acks)) => {
489 if ack.item.digest != *digest {
491 return Err(Error::AckDigest(ack.item.index));
492 }
493 acks
494 }
495 };
496
497 let acks = acks_by_epoch.entry(ack.epoch).or_default();
499 if acks.contains_key(&ack.signature.index) {
500 return Ok(());
501 }
502 acks.insert(ack.signature.index, ack.clone());
503
504 let partials = acks
506 .values()
507 .filter(|a| a.item.digest == ack.item.digest)
508 .map(|ack| &ack.signature)
509 .collect::<Vec<_>>();
510 if partials.len() >= (quorum as usize) {
511 let item = ack.item.clone();
512 let threshold = threshold_signature_recover::<V, _>(quorum, partials)
513 .expect("Failed to recover threshold signature");
514 self.metrics.threshold.inc();
515 self.handle_threshold(item, threshold).await;
516 }
517
518 Ok(())
519 }
520
521 async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
523 let index = item.index;
525 if self.confirmed.contains_key(&index) {
526 return;
527 }
528
529 let certificate = Certificate {
531 item,
532 signature: threshold,
533 };
534 self.confirmed.insert(index, certificate.clone());
535
536 let certified = Activity::Certified(certificate);
538 self.record(certified.clone()).await;
539 self.sync(index).await;
540 self.reporter.report(certified).await;
541
542 if index == self.tip {
544 let mut new_tip = index.saturating_add(1);
546 while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
547 new_tip = new_tip.saturating_add(1);
548 }
549
550 if new_tip > self.tip {
552 self.fast_forward_tip(new_tip).await;
553 }
554 }
555 }
556
557 async fn handle_rebroadcast(
559 &mut self,
560 index: Index,
561 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
562 ) -> Result<(), Error> {
563 let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
564 return Ok(());
566 };
567
568 let Some(share) = self.validators.share(self.epoch) else {
570 return Err(Error::UnknownShare(self.epoch));
571 };
572 let ack = acks
573 .get(&self.epoch)
574 .and_then(|acks| acks.get(&share.index).cloned());
575 let ack = match ack {
576 Some(ack) => ack,
577 None => self.sign_ack(index, *digest).await?,
578 };
579
580 self.rebroadcast_deadlines
582 .put(index, self.context.current() + self.rebroadcast_timeout);
583
584 self.broadcast(ack, sender).await
586 }
587
588 fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
595 {
597 let (eb_lo, eb_hi) = self.epoch_bounds;
598 let bound_lo = self.epoch.saturating_sub(eb_lo);
599 let bound_hi = self.epoch.saturating_add(eb_hi);
600 if ack.epoch < bound_lo || ack.epoch > bound_hi {
601 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
602 }
603 }
604
605 let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
607 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
608 };
609 if sig_index != ack.signature.index {
610 return Err(Error::PeerMismatch);
611 }
612
613 let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
615 if ack.item.index < activity_threshold {
616 return Err(Error::AckThresholded(ack.item.index));
617 }
618
619 if ack.item.index >= self.tip + self.window {
621 return Err(Error::AckIndex(ack.item.index));
622 }
623
624 if self.confirmed.contains_key(&ack.item.index) {
626 return Err(Error::AckThresholded(ack.item.index));
627 }
628 let have_ack = match self.pending.get(&ack.item.index) {
629 None => false,
630 Some(Pending::Unverified(epoch_map)) => epoch_map
631 .get(&ack.epoch)
632 .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
633 Some(Pending::Verified(digest, epoch_map)) => {
634 if ack.item.digest != *digest {
637 return Err(Error::AckDigest(ack.item.index));
638 }
639 epoch_map
640 .get(&ack.epoch)
641 .is_some_and(|acks| acks.contains_key(&ack.signature.index))
642 }
643 };
644 if have_ack {
645 return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
646 }
647
648 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
650 return Err(Error::UnknownEpoch(ack.epoch));
651 };
652 if !ack.verify(&self.namespace, polynomial) {
653 return Err(Error::InvalidAckSignature);
654 }
655
656 Ok(())
657 }
658
659 fn get_digest(&mut self, index: Index) {
665 assert!(self.pending.contains_key(&index));
666 let mut automaton = self.automaton.clone();
667 let timer = self.metrics.digest_duration.timer();
668 self.digest_requests.push(async move {
669 let receiver = automaton.propose(index).await;
670 let result = receiver.await.map_err(Error::AppProposeCanceled);
671 DigestRequest {
672 index,
673 result,
674 timer,
675 }
676 });
677 }
678
679 async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
682 let Some(share) = self.validators.share(self.epoch) else {
683 return Err(Error::UnknownShare(self.epoch));
684 };
685
686 let item = Item { index, digest };
688 let ack = Ack::sign(&self.namespace, self.epoch, share, item);
689
690 self.record(Activity::Ack(ack.clone())).await;
692 self.sync(index).await;
693
694 Ok(ack)
695 }
696
697 async fn broadcast(
701 &mut self,
702 ack: Ack<V, D>,
703 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
704 ) -> Result<(), Error> {
705 sender
706 .send(
707 Recipients::All,
708 TipAck { ack, tip: self.tip },
709 self.priority_acks,
710 )
711 .await
712 .map_err(|err| {
713 warn!(?err, "failed to send ack");
714 Error::UnableToSendMessage
715 })?;
716 Ok(())
717 }
718
719 fn next(&self) -> Index {
722 let max_pending = self
723 .pending
724 .last_key_value()
725 .map(|(k, _)| k.saturating_add(1))
726 .unwrap_or_default();
727 let max_confirmed = self
728 .confirmed
729 .last_key_value()
730 .map(|(k, _)| k.saturating_add(1))
731 .unwrap_or_default();
732 max(self.tip, max(max_pending, max_confirmed))
733 }
734
735 async fn fast_forward_tip(&mut self, tip: Index) {
741 assert!(tip > self.tip);
742
743 let activity_threshold = tip.saturating_sub(self.activity_timeout);
745 self.pending.retain(|index, _| *index >= activity_threshold);
746 self.confirmed
747 .retain(|index, _| *index >= activity_threshold);
748
749 self.record(Activity::Tip(tip)).await;
751 self.sync(tip).await;
752 self.reporter.report(Activity::Tip(tip)).await;
753
754 let section = self.get_journal_section(activity_threshold);
756 let journal = self.journal.as_mut().expect("journal must be initialized");
757 let _ = journal.prune(section).await;
758
759 self.tip = tip;
761 }
762
763 fn get_journal_section(&self, index: Index) -> u64 {
767 index / self.journal_heights_per_section
768 }
769
770 async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) -> Vec<Index> {
773 let mut tip = Index::default();
774 let mut certified = Vec::new();
775 let mut acks = Vec::new();
776 let stream = journal
777 .replay(0, 0, self.journal_replay_buffer)
778 .await
779 .expect("replay failed");
780 pin_mut!(stream);
781 while let Some(msg) = stream.next().await {
782 let (_, _, _, activity) = msg.expect("replay failed");
783 match activity {
784 Activity::Tip(index) => {
785 tip = max(tip, index);
786 self.reporter.report(Activity::Tip(index)).await;
787 }
788 Activity::Certified(certificate) => {
789 certified.push(certificate.clone());
790 self.reporter.report(Activity::Certified(certificate)).await;
791 }
792 Activity::Ack(ack) => {
793 acks.push(ack.clone());
794 self.reporter.report(Activity::Ack(ack)).await;
795 }
796 }
797 }
798
799 self.tip = tip;
801 let activity_threshold = tip.saturating_sub(self.activity_timeout);
802
803 certified
805 .iter()
806 .filter(|certificate| certificate.item.index >= activity_threshold)
807 .for_each(|certificate| {
808 self.confirmed
809 .insert(certificate.item.index, certificate.clone());
810 });
811
812 let mut acks_by_index: BTreeMap<Index, Vec<Ack<V, D>>> = BTreeMap::new();
814 for ack in acks {
815 if ack.item.index >= activity_threshold && !self.confirmed.contains_key(&ack.item.index)
816 {
817 acks_by_index.entry(ack.item.index).or_default().push(ack);
818 }
819 }
820
821 let mut unverified = Vec::new();
823 for (index, mut acks_group) in acks_by_index {
824 let our_share = self.validators.share(self.epoch);
826 let our_digest = if let Some(share) = our_share {
827 acks_group
828 .iter()
829 .find(|ack| ack.epoch == self.epoch && ack.signature.index == share.index)
830 .map(|ack| ack.item.digest)
831 } else {
832 None
833 };
834
835 if let Some(digest) = our_digest {
837 acks_group.retain(|other| other.item.digest == digest);
838 }
839
840 let mut epoch_map = BTreeMap::new();
842 for ack in acks_group {
843 epoch_map
844 .entry(ack.epoch)
845 .or_insert_with(BTreeMap::new)
846 .insert(ack.signature.index, ack);
847 }
848
849 match our_digest {
852 Some(digest) => {
853 self.pending
854 .insert(index, Pending::Verified(digest, epoch_map));
855
856 self.rebroadcast_deadlines
858 .put(index, self.context.current());
859 }
860 None => {
861 self.pending.insert(index, Pending::Unverified(epoch_map));
862
863 unverified.push(index);
865 }
866 }
867 }
868
869 let next = self.next();
872 for index in self.tip..next {
873 if self.pending.contains_key(&index) || self.confirmed.contains_key(&index) {
875 continue;
876 }
877
878 self.pending
880 .insert(index, Pending::Unverified(BTreeMap::new()));
881 unverified.push(index);
882 }
883 info!(self.tip, next, ?unverified, "replayed journal");
884
885 unverified
886 }
887
888 async fn record(&mut self, activity: Activity<V, D>) {
890 let index = match activity {
891 Activity::Ack(ref ack) => ack.item.index,
892 Activity::Certified(ref certificate) => certificate.item.index,
893 Activity::Tip(index) => index,
894 };
895 let section = self.get_journal_section(index);
896 self.journal
897 .as_mut()
898 .expect("journal must be initialized")
899 .append(section, activity)
900 .await
901 .expect("unable to append to journal");
902 }
903
904 async fn sync(&mut self, index: Index) {
906 let section = self.get_journal_section(index);
907 let journal = self.journal.as_mut().expect("journal must be initialized");
908 journal.sync(section).await.expect("unable to sync journal");
909 }
910}