1use super::{
4 metrics,
5 safe_tip::SafeTip,
6 types::{Ack, Activity, Epoch, Error, Index, Item, TipAck},
7 Config,
8};
9use crate::{Automaton, Monitor, Reporter, ThresholdSupervisor};
10use commonware_cryptography::{
11 bls12381::primitives::{group, ops::threshold_signature_recover, poly, variant::Variant},
12 Digest, PublicKey,
13};
14use commonware_macros::select;
15use commonware_p2p::{
16 utils::codec::{wrap, WrappedSender},
17 Blocker, Receiver, Recipients, Sender,
18};
19use commonware_runtime::{
20 buffer::PoolRef,
21 telemetry::metrics::{
22 histogram,
23 status::{CounterExt, Status},
24 },
25 Clock, Handle, Metrics, Spawner, Storage,
26};
27use commonware_storage::journal::variable::{Config as JConfig, Journal};
28use commonware_utils::{futures::Pool as FuturesPool, PrioritySet};
29use futures::{
30 future::{self, Either},
31 pin_mut, StreamExt,
32};
33use std::{
34 cmp::max,
35 collections::{BTreeMap, HashMap},
36 marker::PhantomData,
37 num::NonZeroUsize,
38 time::{Duration, SystemTime},
39};
40use tracing::{debug, error, trace, warn};
41
42enum Pending<V: Variant, D: Digest> {
44 Unverified(HashMap<Epoch, HashMap<u32, Ack<V, D>>>),
47
48 Verified(D, HashMap<Epoch, HashMap<u32, Ack<V, D>>>),
50}
51
52struct DigestRequest<D: Digest, E: Clock> {
55 index: Index,
57
58 result: Result<D, Error>,
60
61 timer: histogram::Timer<E>,
63}
64
65pub struct Engine<
67 E: Clock + Spawner + Storage + Metrics,
68 P: PublicKey,
69 V: Variant,
70 D: Digest,
71 A: Automaton<Context = Index, Digest = D> + Clone,
72 Z: Reporter<Activity = Activity<V, D>>,
73 M: Monitor<Index = Epoch>,
74 B: Blocker<PublicKey = P>,
75 TSu: ThresholdSupervisor<
76 Index = Epoch,
77 PublicKey = P,
78 Share = group::Share,
79 Identity = poly::Public<V>,
80 >,
81 NetS: Sender<PublicKey = P>,
82 NetR: Receiver<PublicKey = P>,
83> {
84 context: E,
86 automaton: A,
87 monitor: M,
88 validators: TSu,
89 reporter: Z,
90 blocker: B,
91
92 namespace: Vec<u8>,
95
96 epoch_bounds: (u64, u64),
105
106 window: u64,
108
109 digest_requests: FuturesPool<DigestRequest<D, E>>,
112
113 epoch: Epoch,
116
117 tip: Index,
119
120 safe_tip: SafeTip<P>,
122
123 pending: BTreeMap<Index, Pending<V, D>>,
127
128 confirmed: BTreeMap<Index, (D, V::Signature)>,
130
131 rebroadcast_timeout: Duration,
134
135 rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
137
138 journal: Option<Journal<E, Activity<V, D>>>,
141 journal_partition: String,
142 journal_write_buffer: NonZeroUsize,
143 journal_replay_buffer: NonZeroUsize,
144 journal_heights_per_section: u64,
145 journal_compression: Option<u8>,
146 journal_buffer_pool: PoolRef,
147
148 priority_acks: bool,
151
152 _phantom: PhantomData<(NetS, NetR)>,
154
155 metrics: metrics::Metrics<E>,
158}
159
160impl<
161 E: Clock + Spawner + Storage + Metrics,
162 P: PublicKey,
163 V: Variant,
164 D: Digest,
165 A: Automaton<Context = Index, Digest = D> + Clone,
166 Z: Reporter<Activity = Activity<V, D>>,
167 M: Monitor<Index = Epoch>,
168 B: Blocker<PublicKey = P>,
169 TSu: ThresholdSupervisor<
170 Index = Epoch,
171 PublicKey = P,
172 Share = group::Share,
173 Identity = poly::Public<V>,
174 >,
175 NetS: Sender<PublicKey = P>,
176 NetR: Receiver<PublicKey = P>,
177 > Engine<E, P, V, D, A, Z, M, B, TSu, NetS, NetR>
178{
179 pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
181 let metrics = metrics::Metrics::init(context.clone());
182
183 Self {
184 context,
185 automaton: cfg.automaton,
186 reporter: cfg.reporter,
187 monitor: cfg.monitor,
188 validators: cfg.validators,
189 blocker: cfg.blocker,
190 namespace: cfg.namespace,
191 epoch_bounds: cfg.epoch_bounds,
192 window: cfg.window.into(),
193 epoch: 0,
194 tip: 0,
195 safe_tip: SafeTip::default(),
196 digest_requests: FuturesPool::default(),
197 pending: BTreeMap::new(),
198 confirmed: BTreeMap::new(),
199 rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
200 rebroadcast_deadlines: PrioritySet::new(),
201 journal: None,
202 journal_partition: cfg.journal_partition,
203 journal_write_buffer: cfg.journal_write_buffer,
204 journal_replay_buffer: cfg.journal_replay_buffer,
205 journal_heights_per_section: cfg.journal_heights_per_section.into(),
206 journal_compression: cfg.journal_compression,
207 journal_buffer_pool: cfg.journal_buffer_pool,
208 priority_acks: cfg.priority_acks,
209 _phantom: PhantomData,
210 metrics,
211 }
212 }
213
214 pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
224 self.context.spawn_ref()(self.run(network))
225 }
226
227 async fn run(mut self, (net_sender, net_receiver): (NetS, NetR)) {
229 let (mut net_sender, mut net_receiver) = wrap((), net_sender, net_receiver);
230 let mut shutdown = self.context.stopped();
231
232 let (latest, mut epoch_updates) = self.monitor.subscribe().await;
234 self.epoch = latest;
235
236 let journal_cfg = JConfig {
238 partition: self.journal_partition.clone(),
239 compression: self.journal_compression,
240 codec_config: (),
241 buffer_pool: self.journal_buffer_pool.clone(),
242 write_buffer: self.journal_write_buffer,
243 };
244 let journal = Journal::init(self.context.with_label("journal"), journal_cfg)
245 .await
246 .expect("init failed");
247 self.replay(&journal).await;
248 self.journal = Some(journal);
249
250 self.safe_tip.init(
252 self.validators
253 .participants(self.epoch)
254 .expect("unknown participants"),
255 );
256
257 loop {
258 self.metrics.tip.set(self.tip as i64);
259
260 let next = self.next();
262 if next < self.tip + self.window {
263 trace!("requesting new digest: index {}", next);
264 self.get_digest(next);
265 continue;
266 }
267
268 let rebroadcast = match self.rebroadcast_deadlines.peek() {
270 Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
271 None => Either::Right(future::pending()),
272 };
273
274 select! {
276 _ = &mut shutdown => {
278 debug!("shutdown");
279 break;
280 },
281
282 epoch = epoch_updates.next() => {
284 let Some(epoch) = epoch else {
286 error!("epoch subscription failed");
287 break;
288 };
289
290 debug!(current=self.epoch, new=epoch, "refresh epoch");
292 assert!(epoch >= self.epoch);
293 self.epoch = epoch;
294
295 self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
297
298 let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
300 self.pending.iter_mut().for_each(|(_, pending)| {
301 match pending {
302 Pending::Unverified(acks) => {
303 acks.retain(|epoch, _| *epoch >= min_epoch);
304 }
305 Pending::Verified(_, acks) => {
306 acks.retain(|epoch, _| *epoch >= min_epoch);
307 }
308 }
309 });
310
311 continue;
312 },
313
314 request = self.digest_requests.next_completed() => {
316 let DigestRequest { index, result, timer } = request;
317 drop(timer); match result {
319 Err(err) => {
320 warn!(?err, ?index, "automaton returned error");
321 self.metrics.digest.inc(Status::Dropped);
322 }
323 Ok(digest) => {
324 if let Err(err) = self.handle_digest(index, digest, &mut net_sender).await {
325 warn!(?err, ?index, "handle_digest failed");
326 continue;
327 }
328 }
329 }
330 },
331
332 msg = net_receiver.recv() => {
334 let (sender, msg) = match msg {
336 Ok(r) => r,
337 Err(err) => {
338 warn!(?err, "ack receiver failed");
339 break;
340 }
341 };
342 let mut guard = self.metrics.acks.guard(Status::Invalid);
343 let TipAck { ack, tip } = match msg {
344 Ok(peer_ack) => peer_ack,
345 Err(err) => {
346 warn!(?err, ?sender, "ack decode failed, blocking peer");
347 self.blocker.block(sender).await;
348 continue;
349 }
350 };
351
352 if self.safe_tip.update(sender.clone(), tip).is_some() {
354 let safe_tip = self.safe_tip.get();
356 if safe_tip > self.tip {
357 self.fast_forward_tip(safe_tip).await;
358 }
359 }
360
361 if let Err(err) = self.validate_ack(&ack, &sender) {
363 if err.blockable() {
364 warn!(?sender, ?err, "blocking peer for validation failure");
365 self.blocker.block(sender).await;
366 } else {
367 debug!(?sender, ?err, "ack validate failed");
368 }
369 continue;
370 };
371
372 if let Err(err) = self.handle_ack(&ack).await {
374 warn!(?err, ?sender, "ack handle failed");
375 guard.set(Status::Failure);
376 continue;
377 }
378
379 debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
381 guard.set(Status::Success);
382 },
383
384 _ = rebroadcast => {
386 let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
388 trace!("rebroadcasting: index {}", index);
389 if let Err(err) = self.handle_rebroadcast(index, &mut net_sender).await {
390 warn!(?err, ?index, "rebroadcast failed");
391 };
392 }
393 }
394 }
395
396 if let Some(journal) = self.journal.take() {
398 journal
399 .close()
400 .await
401 .expect("unable to close aggregation journal");
402 }
403 }
404
405 async fn handle_digest(
409 &mut self,
410 index: Index,
411 digest: D,
412 sender: &mut WrappedSender<NetS, TipAck<V, D>>,
413 ) -> Result<(), Error> {
414 if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
416 return Err(Error::AckIndex(index));
417 };
418
419 let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
421 panic!("Pending::Unverified entry not found");
422 };
423 self.pending
424 .insert(index, Pending::Verified(digest, HashMap::new()));
425
426 for epoch_acks in acks.values() {
429 for epoch_ack in epoch_acks.values() {
430 let _ = self.handle_ack(epoch_ack).await; }
432 if self.confirmed.contains_key(&index) {
434 break;
435 }
436 }
437
438 let ack = self.sign_ack(index, digest).await?;
440
441 self.set_rebroadcast_deadline(index);
443
444 let _ = self.handle_ack(&ack).await; self.broadcast(ack, sender).await?;
449
450 Ok(())
451 }
452
453 async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
458 let quorum = self.validators.identity().required();
460
461 let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
463 None => {
464 return Err(Error::AckIndex(ack.item.index));
467 }
468 Some(Pending::Unverified(acks)) => acks,
469 Some(Pending::Verified(_, acks)) => acks,
470 };
471
472 let acks = acks_by_epoch.entry(ack.epoch).or_default();
476 if acks.contains_key(&ack.signature.index) {
477 return Ok(());
478 }
479 acks.insert(ack.signature.index, ack.clone());
480
481 if acks.len() >= (quorum as usize) {
483 let item = ack.item.clone();
484 let partials = acks.values().map(|ack| &ack.signature);
485 let threshold = threshold_signature_recover::<V, _>(quorum, partials)
486 .expect("Failed to recover threshold signature");
487 self.metrics.threshold.inc();
488 self.handle_threshold(item, threshold).await;
489 }
490
491 Ok(())
492 }
493
494 async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
496 let index = item.index;
498 if self.confirmed.contains_key(&index) {
499 return;
500 }
501
502 self.confirmed.insert(index, (item.digest, threshold));
504
505 let recovered = Activity::Recovered(item, threshold);
507 self.record(recovered.clone()).await;
508 self.sync(index).await;
509 self.reporter.report(recovered).await;
510
511 if index == self.tip {
513 let mut new_tip = index.saturating_add(1);
515 while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
516 new_tip = new_tip.saturating_add(1);
517 }
518
519 if new_tip > self.tip {
521 self.fast_forward_tip(new_tip).await;
522 }
523 }
524 }
525
526 async fn handle_rebroadcast(
528 &mut self,
529 index: Index,
530 sender: &mut WrappedSender<NetS, TipAck<V, D>>,
531 ) -> Result<(), Error> {
532 let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
533 return Ok(());
535 };
536
537 let Some(share) = self.validators.share(self.epoch) else {
539 return Err(Error::UnknownShare(self.epoch));
540 };
541 let ack = acks
542 .get(&self.epoch)
543 .and_then(|acks| acks.get(&share.index).cloned());
544 let ack = match ack {
545 Some(ack) => ack,
546 None => self.sign_ack(index, *digest).await?,
547 };
548
549 self.set_rebroadcast_deadline(index);
551
552 self.broadcast(ack, sender).await
554 }
555
556 fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
563 {
565 let (eb_lo, eb_hi) = self.epoch_bounds;
566 let bound_lo = self.epoch.saturating_sub(eb_lo);
567 let bound_hi = self.epoch.saturating_add(eb_hi);
568 if ack.epoch < bound_lo || ack.epoch > bound_hi {
569 return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
570 }
571 }
572
573 let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
575 return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
576 };
577 if sig_index != ack.signature.index {
578 return Err(Error::PeerMismatch);
579 }
580
581 if ack.item.index < self.tip {
583 return Err(Error::AckThresholded(ack.item.index));
584 }
585 if ack.item.index >= self.tip + self.window {
586 return Err(Error::AckIndex(ack.item.index));
587 }
588
589 if self.confirmed.contains_key(&ack.item.index) {
591 return Err(Error::AckThresholded(ack.item.index));
592 }
593 let have_ack = match self.pending.get(&ack.item.index) {
594 None => false,
595 Some(Pending::Unverified(epoch_map)) => epoch_map
596 .get(&ack.epoch)
597 .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
598 Some(Pending::Verified(_, epoch_map)) => epoch_map
599 .get(&ack.epoch)
600 .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
601 };
602 if have_ack {
603 return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
604 }
605
606 if !ack.verify(&self.namespace, self.validators.identity()) {
608 return Err(Error::InvalidAckSignature);
609 }
610
611 Ok(())
612 }
613
614 fn get_digest(&mut self, index: Index) {
618 assert!(self
619 .pending
620 .insert(index, Pending::Unverified(HashMap::new()))
621 .is_none());
622
623 let mut automaton = self.automaton.clone();
624 let timer = self.metrics.digest_duration.timer();
625 self.digest_requests.push(async move {
626 let receiver = automaton.propose(index).await;
627 let result = receiver.await.map_err(Error::AppProposeCanceled);
628 DigestRequest {
629 index,
630 result,
631 timer,
632 }
633 });
634 }
635
636 fn set_rebroadcast_deadline(&mut self, index: Index) {
638 self.rebroadcast_deadlines
639 .put(index, self.context.current() + self.rebroadcast_timeout);
640 }
641
642 async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
645 let Some(share) = self.validators.share(self.epoch) else {
646 return Err(Error::UnknownShare(self.epoch));
647 };
648
649 let item = Item { index, digest };
651 let ack = Ack::sign(&self.namespace, self.epoch, share, item);
652
653 self.record(Activity::Ack(ack.clone())).await;
655 self.sync(index).await;
656
657 Ok(ack)
658 }
659
660 async fn broadcast(
664 &mut self,
665 ack: Ack<V, D>,
666 sender: &mut WrappedSender<NetS, TipAck<V, D>>,
667 ) -> Result<(), Error> {
668 sender
669 .send(
670 Recipients::All,
671 TipAck { ack, tip: self.tip },
672 self.priority_acks,
673 )
674 .await
675 .map_err(|err| {
676 warn!(?err, "failed to send ack");
677 Error::UnableToSendMessage
678 })?;
679 Ok(())
680 }
681
682 fn next(&self) -> Index {
685 let max_pending = self
686 .pending
687 .last_key_value()
688 .map(|(k, _)| k.saturating_add(1))
689 .unwrap_or_default();
690 let max_confirmed = self
691 .confirmed
692 .last_key_value()
693 .map(|(k, _)| k.saturating_add(1))
694 .unwrap_or_default();
695 max(self.tip, max(max_pending, max_confirmed))
696 }
697
698 async fn fast_forward_tip(&mut self, tip: Index) {
704 assert!(tip > self.tip);
705
706 self.pending.retain(|index, _| *index >= tip);
708 self.confirmed.retain(|index, _| *index >= tip);
709
710 self.record(Activity::Tip(tip)).await;
712 self.sync(tip).await;
713 self.reporter.report(Activity::Tip(tip)).await;
714
715 let section = self.get_journal_section(tip);
717 let journal = self.journal.as_mut().expect("journal must be initialized");
718 let _ = journal.prune(section).await;
719
720 self.tip = tip;
722 }
723
724 fn get_journal_section(&self, index: Index) -> u64 {
728 index / self.journal_heights_per_section
729 }
730
731 async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) {
733 let mut tip = Index::default();
734 let mut recovered = Vec::new();
735 let mut acks = Vec::new();
736 let stream = journal
737 .replay(self.journal_replay_buffer)
738 .await
739 .expect("replay failed");
740 pin_mut!(stream);
741 while let Some(msg) = stream.next().await {
742 let (_, _, _, activity) = msg.expect("replay failed");
743 match activity {
744 Activity::Tip(index) => {
745 tip = max(tip, index);
746 }
747 Activity::Recovered(item, signature) => {
748 recovered.push((item, signature));
749 }
750 Activity::Ack(ack) => {
751 acks.push(ack);
752 }
753 }
754 }
755 self.tip = tip;
757 recovered
759 .iter()
760 .filter(|(item, _)| item.index >= tip)
761 .for_each(|(item, signature)| {
762 self.confirmed.insert(item.index, (item.digest, *signature));
763 });
764 acks = acks
766 .into_iter()
767 .filter(|ack| ack.item.index >= tip && !self.confirmed.contains_key(&ack.item.index))
768 .collect::<Vec<_>>();
769 acks.iter().for_each(|ack| {
770 assert!(self
771 .pending
772 .insert(
773 ack.item.index,
774 Pending::Verified(
775 ack.item.digest,
776 HashMap::from([(
777 ack.epoch,
778 HashMap::from([(ack.signature.index, ack.clone())]),
779 )]),
780 ),
781 )
782 .is_none());
783 self.set_rebroadcast_deadline(ack.item.index);
784 });
785 }
786
787 async fn record(&mut self, activity: Activity<V, D>) {
789 let index = match activity {
790 Activity::Ack(ref ack) => ack.item.index,
791 Activity::Recovered(ref item, _) => item.index,
792 Activity::Tip(index) => index,
793 };
794 let section = self.get_journal_section(index);
795 self.journal
796 .as_mut()
797 .expect("journal must be initialized")
798 .append(section, activity)
799 .await
800 .expect("unable to append to journal");
801 }
802
803 async fn sync(&mut self, index: Index) {
805 let section = self.get_journal_section(index);
806 let journal = self.journal.as_mut().expect("journal must be initialized");
807 journal.sync(section).await.expect("unable to sync journal");
808 }
809}