1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Error, Index, Item, TipAck},
7 Config,
8};
9use crate::{
10 aggregation::types::Certificate, types::Epoch, Automaton, Monitor, Reporter,
11 ThresholdSupervisor,
12};
13use commonware_cryptography::{
14 bls12381::primitives::{group, ops::threshold_signature_recover, variant::Variant},
15 Digest, PublicKey,
16};
17use commonware_macros::select;
18use commonware_p2p::{
19 utils::codec::{wrap, WrappedSender},
20 Blocker, Receiver, Recipients, Sender,
21};
22use commonware_runtime::{
23 buffer::PoolRef,
24 spawn_cell,
25 telemetry::metrics::{
26 histogram,
27 status::{CounterExt, Status},
28 },
29 Clock, ContextCell, Handle, Metrics, Spawner, Storage,
30};
31use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
32use commonware_utils::{futures::Pool as FuturesPool, quorum_from_slice, PrioritySet};
33use futures::{
34 future::{self, Either},
35 pin_mut, StreamExt,
36};
37use std::{
38 cmp::max,
39 collections::BTreeMap,
40 num::NonZeroUsize,
41 time::{Duration, SystemTime},
42};
43use tracing::{debug, error, info, trace, warn};
44
45enum Pending<V: Variant, D: Digest> {
47 Unverified(BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
50
51 Verified(D, BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
53}
54
55struct DigestRequest<D: Digest, E: Clock> {
58 index: Index,
60
61 result: Result<D, Error>,
63
64 timer: histogram::Timer<E>,
66}
67
68pub struct Engine<
70 E: Clock + Spawner + Storage + Metrics,
71 P: PublicKey,
72 V: Variant,
73 D: Digest,
74 A: Automaton<Context = Index, Digest = D> + Clone,
75 Z: Reporter<Activity = Activity<V, D>>,
76 M: Monitor<Index = Epoch>,
77 B: Blocker<PublicKey = P>,
78 TSu: ThresholdSupervisor<
79 Index = Epoch,
80 PublicKey = P,
81 Polynomial = Vec<V::Public>,
82 Share = group::Share,
83 >,
84> {
85 context: ContextCell<E>,
87 automaton: A,
88 monitor: M,
89 validators: TSu,
90 reporter: Z,
91 blocker: B,
92
93 namespace: Vec<u8>,
96
97 epoch_bounds: (u64, u64),
106
107 window: u64,
109
110 activity_timeout: u64,
112
113 digest_requests: FuturesPool<DigestRequest<D, E>>,
116
117 epoch: Epoch,
120
121 tip: Index,
123
124 safe_tip: SafeTip<P>,
126
127 pending: BTreeMap<Index, Pending<V, D>>,
131
132 confirmed: BTreeMap<Index, Certificate<V, D>>,
134
135 rebroadcast_timeout: Duration,
138
139 rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
141
142 journal: Option<Journal<E, Activity<V, D>>>,
145 journal_partition: String,
146 journal_write_buffer: NonZeroUsize,
147 journal_replay_buffer: NonZeroUsize,
148 journal_heights_per_section: u64,
149 journal_compression: Option<u8>,
150 journal_buffer_pool: PoolRef,
151
152 priority_acks: bool,
155
156 metrics: metrics::Metrics<E>,
159}
160
161impl<
162 E: Clock + Spawner + Storage + Metrics,
163 P: PublicKey,
164 V: Variant,
165 D: Digest,
166 A: Automaton<Context = Index, Digest = D> + Clone,
167 Z: Reporter<Activity = Activity<V, D>>,
168 M: Monitor<Index = Epoch>,
169 B: Blocker<PublicKey = P>,
170 TSu: ThresholdSupervisor<
171 Index = Epoch,
172 PublicKey = P,
173 Polynomial = Vec<V::Public>,
174 Share = group::Share,
175 >,
176 > Engine<E, P, V, D, A, Z, M, B, TSu>
177{
178 pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
180 let metrics = metrics::Metrics::init(context.clone());
182
183 Self {
184 context: ContextCell::new(context),
185 automaton: cfg.automaton,
186 reporter: cfg.reporter,
187 monitor: cfg.monitor,
188 validators: cfg.validators,
189 blocker: cfg.blocker,
190 namespace: cfg.namespace,
191 epoch_bounds: cfg.epoch_bounds,
192 window: cfg.window.into(),
193 activity_timeout: cfg.activity_timeout,
194 epoch: 0,
195 tip: 0,
196 safe_tip: SafeTip::default(),
197 digest_requests: FuturesPool::default(),
198 pending: BTreeMap::new(),
199 confirmed: BTreeMap::new(),
200 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
201 rebroadcast_deadlines: PrioritySet::new(),
202 journal: None,
203 journal_partition: cfg.journal_partition,
204 journal_write_buffer: cfg.journal_write_buffer,
205 journal_replay_buffer: cfg.journal_replay_buffer,
206 journal_heights_per_section: cfg.journal_heights_per_section.into(),
207 journal_compression: cfg.journal_compression,
208 journal_buffer_pool: cfg.journal_buffer_pool,
209 priority_acks: cfg.priority_acks,
210 metrics,
211 }
212 }
213
214 pub fn start(
224 mut self,
225 network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
226 ) -> Handle<()> {
227 spawn_cell!(self.context, self.run(network).await)
228 }
229
230 async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
232 let (mut sender, mut receiver) = wrap((), network.0, network.1);
233 let mut shutdown = self.context.stopped();
234
235 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
237 self.epoch = latest;
238
239 let journal_cfg = JConfig {
241 partition: self.journal_partition.clone(),
242 compression: self.journal_compression,
243 codec_config: (),
244 buffer_pool: self.journal_buffer_pool.clone(),
245 write_buffer: self.journal_write_buffer,
246 };
247 let journal = Journal::init(self.context.with_label("journal").into(), journal_cfg)
248 .await
249 .expect("init failed");
250 let unverified_indices = self.replay(&journal).await;
251 self.journal = Some(journal);
252
253 for index in unverified_indices {
255 trace!(index, "requesting digest for unverified index from replay");
256 self.get_digest(index);
257 }
258
259 self.safe_tip.init(
261 self.validators
262 .participants(self.epoch)
263 .expect("unknown participants"),
264 );
265
266 loop {
267 self.metrics.tip.set(self.tip as i64);
268
269 let next = self.next();
271 if next < self.tip + self.window {
272 trace!(next, "requesting new digest");
273 assert!(self
274 .pending
275 .insert(next, Pending::Unverified(BTreeMap::new()))
276 .is_none());
277 self.get_digest(next);
278 continue;
279 }
280
281 let rebroadcast = match self.rebroadcast_deadlines.peek() {
283 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
284 None => Either::Right(future::pending()),
285 };
286
287 select! {
289 _ = &mut shutdown => {
291 debug!("shutdown");
292 break;
293 },
294
295 epoch = epoch_updates.next() => {
297 let Some(epoch) = epoch else {
299 error!("epoch subscription failed");
300 break;
301 };
302
303 debug!(current=self.epoch, new=epoch, "refresh epoch");
305 assert!(epoch >= self.epoch);
306 self.epoch = epoch;
307
308 self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
310
311 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
313 self.pending.iter_mut().for_each(|(_, pending)| {
314 match pending {
315 Pending::Unverified(acks) => {
316 acks.retain(|epoch, _| *epoch >= min_epoch);
317 }
318 Pending::Verified(_, acks) => {
319 acks.retain(|epoch, _| *epoch >= min_epoch);
320 }
321 }
322 });
323
324 continue;
325 },
326
327 request = self.digest_requests.next_completed() => {
329 let DigestRequest { index, result, timer } = request;
330 drop(timer); match result {
332 Err(err) => {
333 warn!(?err, ?index, "automaton returned error");
334 self.metrics.digest.inc(Status::Dropped);
335 }
336 Ok(digest) => {
337 if let Err(err) = self.handle_digest(index, digest, &mut sender).await {
338 debug!(?err, ?index, "handle_digest failed");
339 continue;
340 }
341 }
342 }
343 },
344
345 msg = receiver.recv() => {
347 let (sender, msg) = match msg {
349 Ok(r) => r,
350 Err(err) => {
351 warn!(?err, "ack receiver failed");
352 break;
353 }
354 };
355 let mut guard = self.metrics.acks.guard(Status::Invalid);
356 let TipAck { ack, tip } = match msg {
357 Ok(peer_ack) => peer_ack,
358 Err(err) => {
359 warn!(?err, ?sender, "ack decode failed, blocking peer");
360 self.blocker.block(sender).await;
361 continue;
362 }
363 };
364
365 if self.safe_tip.update(sender.clone(), tip).is_some() {
367 let safe_tip = self.safe_tip.get();
369 if safe_tip > self.tip {
370 self.fast_forward_tip(safe_tip).await;
371 }
372 }
373
374 if let Err(err) = self.validate_ack(&ack, &sender) {
376 if err.blockable() {
377 warn!(?sender, ?err, "blocking peer for validation failure");
378 self.blocker.block(sender).await;
379 } else {
380 debug!(?sender, ?err, "ack validate failed");
381 }
382 continue;
383 };
384
385 if let Err(err) = self.handle_ack(&ack).await {
387 debug!(?err, ?sender, "ack handle failed");
388 guard.set(Status::Failure);
389 continue;
390 }
391
392 debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
394 guard.set(Status::Success);
395 },
396
397 _ = rebroadcast => {
399 let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
401 trace!("rebroadcasting: index {}", index);
402 if let Err(err) = self.handle_rebroadcast(index, &mut sender).await {
403 warn!(?err, ?index, "rebroadcast failed");
404 };
405 }
406 }
407 }
408
409 if let Some(journal) = self.journal.take() {
411 journal
412 .close()
413 .await
414 .expect("unable to close aggregation journal");
415 }
416 }
417
418 async fn handle_digest(
422 &mut self,
423 index: Index,
424 digest: D,
425 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
426 ) -> Result<(), Error> {
427 if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
429 return Err(Error::AckIndex(index));
430 };
431
432 let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
434 panic!("Pending::Unverified entry not found");
435 };
436 self.pending
437 .insert(index, Pending::Verified(digest, BTreeMap::new()));
438
439 for epoch_acks in acks.values() {
443 for epoch_ack in epoch_acks.values() {
444 if epoch_ack.item.digest != digest {
446 continue;
447 }
448
449 let _ = self.handle_ack(epoch_ack).await;
451 }
452 if self.confirmed.contains_key(&index) {
454 break;
455 }
456 }
457
458 let ack = self.sign_ack(index, digest).await?;
460
461 self.rebroadcast_deadlines
463 .put(index, self.context.current() + self.rebroadcast_timeout);
464
465 let _ = self.handle_ack(&ack).await;
467
468 self.broadcast(ack, sender).await?;
470
471 Ok(())
472 }
473
474 async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
479 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
481 return Err(Error::UnknownEpoch(ack.epoch));
482 };
483 let quorum = quorum_from_slice(polynomial);
484
485 let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
487 None => {
488 return Err(Error::AckIndex(ack.item.index));
491 }
492 Some(Pending::Unverified(acks)) => acks,
493 Some(Pending::Verified(digest, acks)) => {
494 if ack.item.digest != *digest {
496 return Err(Error::AckDigest(ack.item.index));
497 }
498 acks
499 }
500 };
501
502 let acks = acks_by_epoch.entry(ack.epoch).or_default();
504 if acks.contains_key(&ack.signature.index) {
505 return Ok(());
506 }
507 acks.insert(ack.signature.index, ack.clone());
508
509 let partials = acks
511 .values()
512 .filter(|a| a.item.digest == ack.item.digest)
513 .map(|ack| &ack.signature)
514 .collect::<Vec<_>>();
515 if partials.len() >= (quorum as usize) {
516 let item = ack.item.clone();
517 let threshold = threshold_signature_recover::<V, _>(quorum, partials)
518 .expect("Failed to recover threshold signature");
519 self.metrics.threshold.inc();
520 self.handle_threshold(item, threshold).await;
521 }
522
523 Ok(())
524 }
525
526 async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
528 let index = item.index;
530 if self.confirmed.contains_key(&index) {
531 return;
532 }
533
534 let certificate = Certificate {
536 item,
537 signature: threshold,
538 };
539 self.confirmed.insert(index, certificate.clone());
540
541 let certified = Activity::Certified(certificate);
543 self.record(certified.clone()).await;
544 self.sync(index).await;
545 self.reporter.report(certified).await;
546
547 if index == self.tip {
549 let mut new_tip = index.saturating_add(1);
551 while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
552 new_tip = new_tip.saturating_add(1);
553 }
554
555 if new_tip > self.tip {
557 self.fast_forward_tip(new_tip).await;
558 }
559 }
560 }
561
562 async fn handle_rebroadcast(
564 &mut self,
565 index: Index,
566 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
567 ) -> Result<(), Error> {
568 let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
569 return Ok(());
571 };
572
573 let Some(share) = self.validators.share(self.epoch) else {
575 return Err(Error::UnknownShare(self.epoch));
576 };
577 let ack = acks
578 .get(&self.epoch)
579 .and_then(|acks| acks.get(&share.index).cloned());
580 let ack = match ack {
581 Some(ack) => ack,
582 None => self.sign_ack(index, *digest).await?,
583 };
584
585 self.rebroadcast_deadlines
587 .put(index, self.context.current() + self.rebroadcast_timeout);
588
589 self.broadcast(ack, sender).await
591 }
592
593 fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
600 {
602 let (eb_lo, eb_hi) = self.epoch_bounds;
603 let bound_lo = self.epoch.saturating_sub(eb_lo);
604 let bound_hi = self.epoch.saturating_add(eb_hi);
605 if ack.epoch < bound_lo || ack.epoch > bound_hi {
606 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
607 }
608 }
609
610 let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
612 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
613 };
614 if sig_index != ack.signature.index {
615 return Err(Error::PeerMismatch);
616 }
617
618 let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
620 if ack.item.index < activity_threshold {
621 return Err(Error::AckThresholded(ack.item.index));
622 }
623
624 if ack.item.index >= self.tip + self.window {
626 return Err(Error::AckIndex(ack.item.index));
627 }
628
629 if self.confirmed.contains_key(&ack.item.index) {
631 return Err(Error::AckThresholded(ack.item.index));
632 }
633 let have_ack = match self.pending.get(&ack.item.index) {
634 None => false,
635 Some(Pending::Unverified(epoch_map)) => epoch_map
636 .get(&ack.epoch)
637 .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
638 Some(Pending::Verified(digest, epoch_map)) => {
639 if ack.item.digest != *digest {
642 return Err(Error::AckDigest(ack.item.index));
643 }
644 epoch_map
645 .get(&ack.epoch)
646 .is_some_and(|acks| acks.contains_key(&ack.signature.index))
647 }
648 };
649 if have_ack {
650 return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
651 }
652
653 let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
655 return Err(Error::UnknownEpoch(ack.epoch));
656 };
657 if !ack.verify(&self.namespace, polynomial) {
658 return Err(Error::InvalidAckSignature);
659 }
660
661 Ok(())
662 }
663
664 fn get_digest(&mut self, index: Index) {
670 assert!(self.pending.contains_key(&index));
671 let mut automaton = self.automaton.clone();
672 let timer = self.metrics.digest_duration.timer();
673 self.digest_requests.push(async move {
674 let receiver = automaton.propose(index).await;
675 let result = receiver.await.map_err(Error::AppProposeCanceled);
676 DigestRequest {
677 index,
678 result,
679 timer,
680 }
681 });
682 }
683
684 async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
687 let Some(share) = self.validators.share(self.epoch) else {
688 return Err(Error::UnknownShare(self.epoch));
689 };
690
691 let item = Item { index, digest };
693 let ack = Ack::sign(&self.namespace, self.epoch, share, item);
694
695 self.record(Activity::Ack(ack.clone())).await;
697 self.sync(index).await;
698
699 Ok(ack)
700 }
701
702 async fn broadcast(
706 &mut self,
707 ack: Ack<V, D>,
708 sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
709 ) -> Result<(), Error> {
710 sender
711 .send(
712 Recipients::All,
713 TipAck { ack, tip: self.tip },
714 self.priority_acks,
715 )
716 .await
717 .map_err(|err| {
718 warn!(?err, "failed to send ack");
719 Error::UnableToSendMessage
720 })?;
721 Ok(())
722 }
723
724 fn next(&self) -> Index {
727 let max_pending = self
728 .pending
729 .last_key_value()
730 .map(|(k, _)| k.saturating_add(1))
731 .unwrap_or_default();
732 let max_confirmed = self
733 .confirmed
734 .last_key_value()
735 .map(|(k, _)| k.saturating_add(1))
736 .unwrap_or_default();
737 max(self.tip, max(max_pending, max_confirmed))
738 }
739
740 async fn fast_forward_tip(&mut self, tip: Index) {
746 assert!(tip > self.tip);
747
748 let activity_threshold = tip.saturating_sub(self.activity_timeout);
750 self.pending.retain(|index, _| *index >= activity_threshold);
751 self.confirmed
752 .retain(|index, _| *index >= activity_threshold);
753
754 self.record(Activity::Tip(tip)).await;
756 self.sync(tip).await;
757 self.reporter.report(Activity::Tip(tip)).await;
758
759 let section = self.get_journal_section(activity_threshold);
761 let journal = self.journal.as_mut().expect("journal must be initialized");
762 let _ = journal.prune(section).await;
763
764 self.tip = tip;
766 }
767
768 fn get_journal_section(&self, index: Index) -> u64 {
772 index / self.journal_heights_per_section
773 }
774
775 async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) -> Vec<Index> {
778 let mut tip = Index::default();
779 let mut certified = Vec::new();
780 let mut acks = Vec::new();
781 let stream = journal
782 .replay(0, 0, self.journal_replay_buffer)
783 .await
784 .expect("replay failed");
785 pin_mut!(stream);
786 while let Some(msg) = stream.next().await {
787 let (_, _, _, activity) = msg.expect("replay failed");
788 match activity {
789 Activity::Tip(index) => {
790 tip = max(tip, index);
791 self.reporter.report(Activity::Tip(index)).await;
792 }
793 Activity::Certified(certificate) => {
794 certified.push(certificate.clone());
795 self.reporter.report(Activity::Certified(certificate)).await;
796 }
797 Activity::Ack(ack) => {
798 acks.push(ack.clone());
799 self.reporter.report(Activity::Ack(ack)).await;
800 }
801 }
802 }
803
804 self.tip = tip;
806 let activity_threshold = tip.saturating_sub(self.activity_timeout);
807
808 certified
810 .iter()
811 .filter(|certificate| certificate.item.index >= activity_threshold)
812 .for_each(|certificate| {
813 self.confirmed
814 .insert(certificate.item.index, certificate.clone());
815 });
816
817 let mut acks_by_index: BTreeMap<Index, Vec<Ack<V, D>>> = BTreeMap::new();
819 for ack in acks {
820 if ack.item.index >= activity_threshold && !self.confirmed.contains_key(&ack.item.index)
821 {
822 acks_by_index.entry(ack.item.index).or_default().push(ack);
823 }
824 }
825
826 let mut unverified = Vec::new();
828 for (index, mut acks_group) in acks_by_index {
829 let our_share = self.validators.share(self.epoch);
831 let our_digest = if let Some(share) = our_share {
832 acks_group
833 .iter()
834 .find(|ack| ack.epoch == self.epoch && ack.signature.index == share.index)
835 .map(|ack| ack.item.digest)
836 } else {
837 None
838 };
839
840 if let Some(digest) = our_digest {
842 acks_group.retain(|other| other.item.digest == digest);
843 }
844
845 let mut epoch_map = BTreeMap::new();
847 for ack in acks_group {
848 epoch_map
849 .entry(ack.epoch)
850 .or_insert_with(BTreeMap::new)
851 .insert(ack.signature.index, ack);
852 }
853
854 match our_digest {
857 Some(digest) => {
858 self.pending
859 .insert(index, Pending::Verified(digest, epoch_map));
860
861 self.rebroadcast_deadlines
863 .put(index, self.context.current());
864 }
865 None => {
866 self.pending.insert(index, Pending::Unverified(epoch_map));
867
868 unverified.push(index);
870 }
871 }
872 }
873
874 let next = self.next();
877 for index in self.tip..next {
878 if self.pending.contains_key(&index) || self.confirmed.contains_key(&index) {
880 continue;
881 }
882
883 self.pending
885 .insert(index, Pending::Unverified(BTreeMap::new()));
886 unverified.push(index);
887 }
888 info!(self.tip, next, ?unverified, "replayed journal");
889
890 unverified
891 }
892
893 async fn record(&mut self, activity: Activity<V, D>) {
895 let index = match activity {
896 Activity::Ack(ref ack) => ack.item.index,
897 Activity::Certified(ref certificate) => certificate.item.index,
898 Activity::Tip(index) => index,
899 };
900 let section = self.get_journal_section(index);
901 self.journal
902 .as_mut()
903 .expect("journal must be initialized")
904 .append(section, activity)
905 .await
906 .expect("unable to append to journal");
907 }
908
909 async fn sync(&mut self, index: Index) {
911 let section = self.get_journal_section(index);
912 let journal = self.journal.as_mut().expect("journal must be initialized");
913 journal.sync(section).await.expect("unable to sync journal");
914 }
915}