1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Error, Index, Item, TipAck},
7 Config,
8};
9use crate::{
10 aggregation::{scheme, types::Certificate},
11 types::{Epoch, EpochDelta},
12 Automaton, Monitor, Reporter,
13};
14use commonware_cryptography::{
15 certificate::{Provider, Scheme},
16 Digest,
17};
18use commonware_macros::select;
19use commonware_p2p::{
20 utils::codec::{wrap, WrappedSender},
21 Blocker, Receiver, Recipients, Sender,
22};
23use commonware_runtime::{
24 buffer::PoolRef,
25 spawn_cell,
26 telemetry::metrics::{
27 histogram,
28 status::{CounterExt, GaugeExt, Status},
29 },
30 Clock, ContextCell, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
33use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, PrioritySet};
34use futures::{
35 future::{self, Either},
36 pin_mut, StreamExt,
37};
38use std::{
39 cmp::max,
40 collections::BTreeMap,
41 num::NonZeroUsize,
42 sync::Arc,
43 time::{Duration, SystemTime},
44};
45use tracing::{debug, error, info, trace, warn};
46
47enum Pending<S: Scheme, D: Digest> {
49 Unverified(BTreeMap<Epoch, BTreeMap<u32, Ack<S, D>>>),
52
53 Verified(D, BTreeMap<Epoch, BTreeMap<u32, Ack<S, D>>>),
55}
56
57struct DigestRequest<D: Digest, E: Clock> {
60 index: Index,
62
63 result: Result<D, Error>,
65
66 timer: histogram::Timer<E>,
68}
69
70pub struct Engine<
72 E: Clock + Spawner + Storage + Metrics,
73 P: Provider<Scope = Epoch>,
74 D: Digest,
75 A: Automaton<Context = Index, Digest = D> + Clone,
76 Z: Reporter<Activity = Activity<P::Scheme, D>>,
77 M: Monitor<Index = Epoch>,
78 B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
79> {
80 context: ContextCell<E>,
82 automaton: A,
83 monitor: M,
84 provider: P,
85 reporter: Z,
86 blocker: B,
87
88 namespace: Vec<u8>,
91
92 epoch_bounds: (EpochDelta, EpochDelta),
101
102 window: u64,
104
105 activity_timeout: u64,
107
108 digest_requests: FuturesPool<DigestRequest<D, E>>,
111
112 epoch: Epoch,
115
116 tip: Index,
118
119 safe_tip: SafeTip<<P::Scheme as Scheme>::PublicKey>,
121
122 pending: BTreeMap<Index, Pending<P::Scheme, D>>,
126
127 confirmed: BTreeMap<Index, Certificate<P::Scheme, D>>,
129
130 rebroadcast_timeout: Duration,
133
134 rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
136
137 journal: Option<Journal<E, Activity<P::Scheme, D>>>,
140 journal_partition: String,
141 journal_write_buffer: NonZeroUsize,
142 journal_replay_buffer: NonZeroUsize,
143 journal_heights_per_section: u64,
144 journal_compression: Option<u8>,
145 journal_buffer_pool: PoolRef,
146
147 priority_acks: bool,
150
151 metrics: metrics::Metrics<E>,
154}
155
156impl<
157 E: Clock + Spawner + Storage + Metrics,
158 P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
159 D: Digest,
160 A: Automaton<Context = Index, Digest = D> + Clone,
161 Z: Reporter<Activity = Activity<P::Scheme, D>>,
162 M: Monitor<Index = Epoch>,
163 B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
164 > Engine<E, P, D, A, Z, M, B>
165{
166 pub fn new(context: E, cfg: Config<P, D, A, Z, M, B>) -> Self {
168 let metrics = metrics::Metrics::init(context.clone());
170
171 Self {
172 context: ContextCell::new(context),
173 automaton: cfg.automaton,
174 reporter: cfg.reporter,
175 monitor: cfg.monitor,
176 provider: cfg.provider,
177 blocker: cfg.blocker,
178 namespace: cfg.namespace,
179 epoch_bounds: cfg.epoch_bounds,
180 window: cfg.window.into(),
181 activity_timeout: cfg.activity_timeout,
182 epoch: Epoch::zero(),
183 tip: 0,
184 safe_tip: SafeTip::default(),
185 digest_requests: FuturesPool::default(),
186 pending: BTreeMap::new(),
187 confirmed: BTreeMap::new(),
188 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
189 rebroadcast_deadlines: PrioritySet::new(),
190 journal: None,
191 journal_partition: cfg.journal_partition,
192 journal_write_buffer: cfg.journal_write_buffer,
193 journal_replay_buffer: cfg.journal_replay_buffer,
194 journal_heights_per_section: cfg.journal_heights_per_section.into(),
195 journal_compression: cfg.journal_compression,
196 journal_buffer_pool: cfg.journal_buffer_pool,
197 priority_acks: cfg.priority_acks,
198 metrics,
199 }
200 }
201
202 fn scheme(&self, epoch: Epoch) -> Result<Arc<P::Scheme>, Error> {
204 self.provider
205 .scoped(epoch)
206 .ok_or(Error::UnknownEpoch(epoch))
207 }
208
209 pub fn start(
219 mut self,
220 network: (
221 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
222 impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
223 ),
224 ) -> Handle<()> {
225 spawn_cell!(self.context, self.run(network).await)
226 }
227
228 async fn run(
230 mut self,
231 network: (
232 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
233 impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
234 ),
235 ) {
236 let (mut sender, mut receiver) = wrap((), network.0, network.1);
237 let mut shutdown = self.context.stopped();
238
239 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
241 self.epoch = latest;
242
243 let journal_cfg = JConfig {
245 partition: self.journal_partition.clone(),
246 compression: self.journal_compression,
247 codec_config: P::Scheme::certificate_codec_config_unbounded(),
248 buffer_pool: self.journal_buffer_pool.clone(),
249 write_buffer: self.journal_write_buffer,
250 };
251 let journal = Journal::init(
252 self.context.with_label("journal").into_present(),
253 journal_cfg,
254 )
255 .await
256 .expect("init failed");
257 let unverified_indices = self.replay(&journal).await;
258 self.journal = Some(journal);
259
260 for index in unverified_indices {
262 trace!(index, "requesting digest for unverified index from replay");
263 self.get_digest(index);
264 }
265
266 let scheme = self
268 .scheme(self.epoch)
269 .expect("current epoch scheme must exist");
270 self.safe_tip.init(scheme.participants());
271
272 loop {
273 let _ = self.metrics.tip.try_set(self.tip);
274
275 let next = self.next();
277
278 if next - self.tip < self.window {
280 trace!(next, "requesting new digest");
281 assert!(self
282 .pending
283 .insert(next, Pending::Unverified(BTreeMap::new()))
284 .is_none());
285 self.get_digest(next);
286 continue;
287 }
288
289 let rebroadcast = match self.rebroadcast_deadlines.peek() {
291 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
292 None => Either::Right(future::pending()),
293 };
294
295 select! {
297 _ = &mut shutdown => {
299 debug!("shutdown");
300 break;
301 },
302
303 epoch = epoch_updates.next() => {
305 let Some(epoch) = epoch else {
307 error!("epoch subscription failed");
308 break;
309 };
310
311 debug!(current = %self.epoch, new = %epoch, "refresh epoch");
313 assert!(epoch >= self.epoch);
314 self.epoch = epoch;
315
316 let scheme = self.scheme(self.epoch)
318 .expect("current epoch scheme must exist");
319 self.safe_tip.reconcile(scheme.participants());
320
321 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
323 self.pending.iter_mut().for_each(|(_, pending)| {
324 match pending {
325 Pending::Unverified(acks) => {
326 acks.retain(|epoch, _| *epoch >= min_epoch);
327 }
328 Pending::Verified(_, acks) => {
329 acks.retain(|epoch, _| *epoch >= min_epoch);
330 }
331 }
332 });
333
334 continue;
335 },
336
337 request = self.digest_requests.next_completed() => {
339 let DigestRequest { index, result, timer } = request;
340 drop(timer); match result {
342 Err(err) => {
343 warn!(?err, ?index, "automaton returned error");
344 self.metrics.digest.inc(Status::Dropped);
345 }
346 Ok(digest) => {
347 if let Err(err) = self.handle_digest(index, digest, &mut sender).await {
348 debug!(?err, ?index, "handle_digest failed");
349 continue;
350 }
351 }
352 }
353 },
354
355 msg = receiver.recv() => {
357 let (sender, msg) = match msg {
359 Ok(r) => r,
360 Err(err) => {
361 warn!(?err, "ack receiver failed");
362 break;
363 }
364 };
365 let mut guard = self.metrics.acks.guard(Status::Invalid);
366 let TipAck { ack, tip } = match msg {
367 Ok(peer_ack) => peer_ack,
368 Err(err) => {
369 warn!(?err, ?sender, "ack decode failed, blocking peer");
370 self.blocker.block(sender).await;
371 continue;
372 }
373 };
374
375 if self.safe_tip.update(sender.clone(), tip).is_some() {
377 let safe_tip = self.safe_tip.get();
379 if safe_tip > self.tip {
380 self.fast_forward_tip(safe_tip).await;
381 }
382 }
383
384 if let Err(err) = self.validate_ack(&ack, &sender) {
386 if err.blockable() {
387 warn!(?sender, ?err, "blocking peer for validation failure");
388 self.blocker.block(sender).await;
389 } else {
390 debug!(?sender, ?err, "ack validate failed");
391 }
392 continue;
393 };
394
395 if let Err(err) = self.handle_ack(&ack).await {
397 debug!(?err, ?sender, "ack handle failed");
398 guard.set(Status::Failure);
399 continue;
400 }
401
402 debug!(?sender, epoch = %ack.epoch, index = ack.item.index, "ack");
404 guard.set(Status::Success);
405 },
406
407 _ = rebroadcast => {
409 let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
411 trace!("rebroadcasting: index {}", index);
412 if let Err(err) = self.handle_rebroadcast(index, &mut sender).await {
413 warn!(?err, ?index, "rebroadcast failed");
414 };
415 }
416 }
417 }
418
419 if let Some(journal) = self.journal.take() {
421 journal
422 .close()
423 .await
424 .expect("unable to close aggregation journal");
425 }
426 }
427
428 async fn handle_digest(
432 &mut self,
433 index: Index,
434 digest: D,
435 sender: &mut WrappedSender<
436 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
437 TipAck<P::Scheme, D>,
438 >,
439 ) -> Result<(), Error> {
440 if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
442 return Err(Error::AckIndex(index));
443 };
444
445 let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
447 panic!("Pending::Unverified entry not found");
448 };
449 self.pending
450 .insert(index, Pending::Verified(digest, BTreeMap::new()));
451
452 for epoch_acks in acks.values() {
456 for epoch_ack in epoch_acks.values() {
457 if epoch_ack.item.digest != digest {
459 continue;
460 }
461
462 let _ = self.handle_ack(epoch_ack).await;
464 }
465 if self.confirmed.contains_key(&index) {
467 break;
468 }
469 }
470
471 let ack = self.sign_ack(index, digest).await?;
473
474 self.rebroadcast_deadlines
476 .put(index, self.context.current() + self.rebroadcast_timeout);
477
478 let _ = self.handle_ack(&ack).await;
480
481 self.broadcast(ack, sender).await?;
483
484 Ok(())
485 }
486
487 async fn handle_ack(&mut self, ack: &Ack<P::Scheme, D>) -> Result<(), Error> {
492 let scheme = self.scheme(ack.epoch)?;
494 let quorum = scheme.participants().quorum();
495
496 let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
498 None => {
499 return Err(Error::AckIndex(ack.item.index));
502 }
503 Some(Pending::Unverified(acks)) => acks,
504 Some(Pending::Verified(digest, acks)) => {
505 if ack.item.digest != *digest {
507 return Err(Error::AckDigest(ack.item.index));
508 }
509 acks
510 }
511 };
512
513 let acks = acks_by_epoch.entry(ack.epoch).or_default();
515 if acks.contains_key(&ack.attestation.signer) {
516 return Ok(());
517 }
518 acks.insert(ack.attestation.signer, ack.clone());
519
520 let filtered = acks
522 .values()
523 .filter(|a| a.item.digest == ack.item.digest)
524 .collect::<Vec<_>>();
525 if filtered.len() >= quorum as usize {
526 if let Some(certificate) = Certificate::from_acks(&*scheme, filtered) {
527 self.metrics.certificates.inc();
528 self.handle_certificate(certificate).await;
529 }
530 }
531
532 Ok(())
533 }
534
535 async fn handle_certificate(&mut self, certificate: Certificate<P::Scheme, D>) {
537 let index = certificate.item.index;
539 if self.confirmed.contains_key(&index) {
540 return;
541 }
542
543 self.confirmed.insert(index, certificate.clone());
545
546 let certified = Activity::Certified(certificate);
548 self.record(certified.clone()).await;
549 self.sync(index).await;
550 self.reporter.report(certified).await;
551
552 if index == self.tip {
554 let mut new_tip = index.saturating_add(1);
556 while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
557 new_tip = new_tip.saturating_add(1);
558 }
559
560 if new_tip > self.tip {
562 self.fast_forward_tip(new_tip).await;
563 }
564 }
565 }
566
567 async fn handle_rebroadcast(
569 &mut self,
570 index: Index,
571 sender: &mut WrappedSender<
572 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
573 TipAck<P::Scheme, D>,
574 >,
575 ) -> Result<(), Error> {
576 let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
577 return Ok(());
579 };
580
581 let scheme = self.scheme(self.epoch)?;
583 let Some(signer) = scheme.me() else {
584 return Err(Error::NotSigner(self.epoch));
585 };
586 let ack = acks
587 .get(&self.epoch)
588 .and_then(|acks| acks.get(&signer).cloned());
589 let ack = match ack {
590 Some(ack) => ack,
591 None => self.sign_ack(index, *digest).await?,
592 };
593
594 self.rebroadcast_deadlines
596 .put(index, self.context.current() + self.rebroadcast_timeout);
597
598 self.broadcast(ack, sender).await
600 }
601
602 fn validate_ack(
608 &self,
609 ack: &Ack<P::Scheme, D>,
610 sender: &<P::Scheme as Scheme>::PublicKey,
611 ) -> Result<(), Error> {
612 {
614 let (eb_lo, eb_hi) = self.epoch_bounds;
615 let bound_lo = self.epoch.saturating_sub(eb_lo);
616 let bound_hi = self.epoch.saturating_add(eb_hi);
617 if ack.epoch < bound_lo || ack.epoch > bound_hi {
618 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
619 }
620 }
621
622 let scheme = self.scheme(ack.epoch)?;
624 let participants = scheme.participants();
625 let Some(signer) = participants.index(sender) else {
626 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
627 };
628 if signer != ack.attestation.signer {
629 return Err(Error::PeerMismatch);
630 }
631
632 let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
634 if ack.item.index < activity_threshold {
635 return Err(Error::AckCertified(ack.item.index));
636 }
637
638 if ack.item.index.saturating_sub(self.tip) >= self.window {
640 return Err(Error::AckIndex(ack.item.index));
641 }
642
643 if self.confirmed.contains_key(&ack.item.index) {
645 return Err(Error::AckCertified(ack.item.index));
646 }
647 let have_ack = match self.pending.get(&ack.item.index) {
648 None => false,
649 Some(Pending::Unverified(epoch_map)) => epoch_map
650 .get(&ack.epoch)
651 .is_some_and(|acks| acks.contains_key(&ack.attestation.signer)),
652 Some(Pending::Verified(digest, epoch_map)) => {
653 if ack.item.digest != *digest {
656 return Err(Error::AckDigest(ack.item.index));
657 }
658 epoch_map
659 .get(&ack.epoch)
660 .is_some_and(|acks| acks.contains_key(&ack.attestation.signer))
661 }
662 };
663 if have_ack {
664 return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
665 }
666
667 if !ack.verify(&*scheme, &self.namespace) {
669 return Err(Error::InvalidAckSignature);
670 }
671
672 Ok(())
673 }
674
675 fn get_digest(&mut self, index: Index) {
681 assert!(self.pending.contains_key(&index));
682 let mut automaton = self.automaton.clone();
683 let timer = self.metrics.digest_duration.timer();
684 self.digest_requests.push(async move {
685 let receiver = automaton.propose(index).await;
686 let result = receiver.await.map_err(Error::AppProposeCanceled);
687 DigestRequest {
688 index,
689 result,
690 timer,
691 }
692 });
693 }
694
695 async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<P::Scheme, D>, Error> {
698 let scheme = self.scheme(self.epoch)?;
699 if scheme.me().is_none() {
700 return Err(Error::NotSigner(self.epoch));
701 }
702
703 let item = Item { index, digest };
705 let ack = Ack::sign(&*scheme, &self.namespace, self.epoch, item)
706 .ok_or(Error::NotSigner(self.epoch))?;
707
708 self.record(Activity::Ack(ack.clone())).await;
710 self.sync(index).await;
711
712 Ok(ack)
713 }
714
715 async fn broadcast(
719 &mut self,
720 ack: Ack<P::Scheme, D>,
721 sender: &mut WrappedSender<
722 impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
723 TipAck<P::Scheme, D>,
724 >,
725 ) -> Result<(), Error> {
726 sender
727 .send(
728 Recipients::All,
729 TipAck { ack, tip: self.tip },
730 self.priority_acks,
731 )
732 .await
733 .map_err(|err| {
734 warn!(?err, "failed to send ack");
735 Error::UnableToSendMessage
736 })?;
737 Ok(())
738 }
739
740 fn next(&self) -> Index {
743 let max_pending = self
744 .pending
745 .last_key_value()
746 .map(|(k, _)| k.saturating_add(1))
747 .unwrap_or_default();
748 let max_confirmed = self
749 .confirmed
750 .last_key_value()
751 .map(|(k, _)| k.saturating_add(1))
752 .unwrap_or_default();
753 max(self.tip, max(max_pending, max_confirmed))
754 }
755
756 async fn fast_forward_tip(&mut self, tip: Index) {
762 assert!(tip > self.tip);
763
764 let activity_threshold = tip.saturating_sub(self.activity_timeout);
766 self.pending.retain(|index, _| *index >= activity_threshold);
767 self.confirmed
768 .retain(|index, _| *index >= activity_threshold);
769
770 self.record(Activity::Tip(tip)).await;
772 self.sync(tip).await;
773 self.reporter.report(Activity::Tip(tip)).await;
774
775 let section = self.get_journal_section(activity_threshold);
777 let journal = self.journal.as_mut().expect("journal must be initialized");
778 let _ = journal.prune(section).await;
779
780 self.tip = tip;
782 }
783
784 const fn get_journal_section(&self, index: Index) -> u64 {
788 index / self.journal_heights_per_section
789 }
790
791 async fn replay(&mut self, journal: &Journal<E, Activity<P::Scheme, D>>) -> Vec<Index> {
794 let mut tip = Index::default();
795 let mut certified = Vec::new();
796 let mut acks = Vec::new();
797 let stream = journal
798 .replay(0, 0, self.journal_replay_buffer)
799 .await
800 .expect("replay failed");
801 pin_mut!(stream);
802 while let Some(msg) = stream.next().await {
803 let (_, _, _, activity) = msg.expect("replay failed");
804 match activity {
805 Activity::Tip(index) => {
806 tip = max(tip, index);
807 self.reporter.report(Activity::Tip(index)).await;
808 }
809 Activity::Certified(certificate) => {
810 certified.push(certificate.clone());
811 self.reporter.report(Activity::Certified(certificate)).await;
812 }
813 Activity::Ack(ack) => {
814 acks.push(ack.clone());
815 self.reporter.report(Activity::Ack(ack)).await;
816 }
817 }
818 }
819
820 self.tip = tip;
822 let activity_threshold = tip.saturating_sub(self.activity_timeout);
823
824 certified
826 .iter()
827 .filter(|certificate| certificate.item.index >= activity_threshold)
828 .for_each(|certificate| {
829 self.confirmed
830 .insert(certificate.item.index, certificate.clone());
831 });
832
833 let mut acks_by_index: BTreeMap<Index, Vec<Ack<P::Scheme, D>>> = BTreeMap::new();
835 for ack in acks {
836 if ack.item.index >= activity_threshold && !self.confirmed.contains_key(&ack.item.index)
837 {
838 acks_by_index.entry(ack.item.index).or_default().push(ack);
839 }
840 }
841
842 let mut unverified = Vec::new();
844 for (index, mut acks_group) in acks_by_index {
845 let current_scheme = self.scheme(self.epoch).ok();
847 let our_signer = current_scheme.as_ref().and_then(|s| s.me());
848 let our_digest = our_signer.and_then(|signer| {
849 acks_group
850 .iter()
851 .find(|ack| ack.epoch == self.epoch && ack.attestation.signer == signer)
852 .map(|ack| ack.item.digest)
853 });
854
855 if let Some(digest) = our_digest {
857 acks_group.retain(|other| other.item.digest == digest);
858 }
859
860 let mut epoch_map = BTreeMap::new();
862 for ack in acks_group {
863 epoch_map
864 .entry(ack.epoch)
865 .or_insert_with(BTreeMap::new)
866 .insert(ack.attestation.signer, ack);
867 }
868
869 match our_digest {
872 Some(digest) => {
873 self.pending
874 .insert(index, Pending::Verified(digest, epoch_map));
875
876 self.rebroadcast_deadlines
878 .put(index, self.context.current());
879 }
880 None => {
881 self.pending.insert(index, Pending::Unverified(epoch_map));
882
883 unverified.push(index);
885 }
886 }
887 }
888
889 let next = self.next();
892 for index in self.tip..next {
893 if self.pending.contains_key(&index) || self.confirmed.contains_key(&index) {
895 continue;
896 }
897
898 self.pending
900 .insert(index, Pending::Unverified(BTreeMap::new()));
901 unverified.push(index);
902 }
903 info!(self.tip, next, ?unverified, "replayed journal");
904
905 unverified
906 }
907
908 async fn record(&mut self, activity: Activity<P::Scheme, D>) {
910 let index = match activity {
911 Activity::Ack(ref ack) => ack.item.index,
912 Activity::Certified(ref certificate) => certificate.item.index,
913 Activity::Tip(index) => index,
914 };
915 let section = self.get_journal_section(index);
916 self.journal
917 .as_mut()
918 .expect("journal must be initialized")
919 .append(section, activity)
920 .await
921 .expect("unable to append to journal");
922 }
923
924 async fn sync(&mut self, index: Index) {
926 let section = self.get_journal_section(index);
927 let journal = self.journal.as_mut().expect("journal must be initialized");
928 journal.sync(section).await.expect("unable to sync journal");
929 }
930}