Skip to main content

commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Error, Item, TipAck},
7    Config,
8};
9use crate::{
10    aggregation::{scheme, types::Certificate},
11    types::{Epoch, EpochDelta, Height, HeightDelta, Participant},
12    Automaton, Monitor, Reporter,
13};
14use commonware_cryptography::{
15    certificate::{Provider, Scheme},
16    Digest,
17};
18use commonware_macros::select_loop;
19use commonware_p2p::{
20    utils::codec::{wrap, WrappedSender},
21    Blocker, Receiver, Recipients, Sender,
22};
23use commonware_parallel::Strategy;
24use commonware_runtime::{
25    buffer::paged::CacheRef,
26    spawn_cell,
27    telemetry::metrics::{
28        histogram,
29        status::{CounterExt, GaugeExt, Status},
30    },
31    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
32};
33use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
34use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet};
35use futures::{
36    future::{self, Either},
37    pin_mut, StreamExt,
38};
39use rand_core::CryptoRngCore;
40use std::{
41    cmp::max,
42    collections::BTreeMap,
43    num::{NonZeroU64, NonZeroUsize},
44    sync::Arc,
45    time::{Duration, SystemTime},
46};
47use tracing::{debug, error, info, trace, warn};
48
49/// An entry for a height that does not yet have a certificate.
50enum Pending<S: Scheme, D: Digest> {
51    /// The automaton has not yet provided the digest for this height.
52    /// The signatures may have arbitrary digests.
53    Unverified(BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
54
55    /// Verified by the automaton. Now stores the digest.
56    Verified(D, BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
57}
58
59/// The type returned by the `pending` pool, used by the application to return which digest is
60/// associated with the given height.
61struct DigestRequest<D: Digest, E: Clock> {
62    /// The height in question.
63    height: Height,
64
65    /// The result of the verification.
66    result: Result<D, Error>,
67
68    /// Records the time taken to get the digest.
69    timer: histogram::Timer<E>,
70}
71
72/// Instance of the engine.
73pub struct Engine<
74    E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
75    P: Provider<Scope = Epoch>,
76    D: Digest,
77    A: Automaton<Context = Height, Digest = D> + Clone,
78    Z: Reporter<Activity = Activity<P::Scheme, D>>,
79    M: Monitor<Index = Epoch>,
80    B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
81    T: Strategy,
82> {
83    // ---------- Interfaces ----------
84    context: ContextCell<E>,
85    automaton: A,
86    monitor: M,
87    provider: P,
88    reporter: Z,
89    blocker: B,
90    strategy: T,
91
92    // Pruning
93    /// A tuple representing the epochs to keep in memory.
94    /// The first element is the number of old epochs to keep.
95    /// The second element is the number of future epochs to accept.
96    ///
97    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
98    /// epochs 9, 10, 11, and 12 are kept (and accepted);
99    /// all others are pruned or rejected.
100    epoch_bounds: (EpochDelta, EpochDelta),
101
102    /// The concurrent number of chunks to process.
103    window: HeightDelta,
104
105    /// Number of heights to track below the tip when collecting acks and/or pruning.
106    activity_timeout: HeightDelta,
107
108    // Messaging
109    /// Pool of pending futures to request a digest from the automaton.
110    digest_requests: FuturesPool<DigestRequest<D, E>>,
111
112    // State
113    /// The current epoch.
114    epoch: Epoch,
115
116    /// The current tip.
117    tip: Height,
118
119    /// Tracks the tips of all validators.
120    safe_tip: SafeTip<<P::Scheme as Scheme>::PublicKey>,
121
122    /// The keys represent the set of all `Height` values for which we are attempting to form a
123    /// certificate, but do not yet have one. Values may be [Pending::Unverified] or [Pending::Verified],
124    /// depending on whether the automaton has verified the digest or not.
125    pending: BTreeMap<Height, Pending<P::Scheme, D>>,
126
127    /// A map of heights with a certificate. Cached in memory if needed to send to other peers.
128    confirmed: BTreeMap<Height, Certificate<P::Scheme, D>>,
129
130    // ---------- Rebroadcasting ----------
131    /// The frequency at which to rebroadcast pending heights.
132    rebroadcast_timeout: Duration,
133
134    /// A set of deadlines for rebroadcasting `Height` values that do not have a certificate.
135    rebroadcast_deadlines: PrioritySet<Height, SystemTime>,
136
137    // ---------- Journal ----------
138    /// Journal for storing acks signed by this node.
139    journal: Option<Journal<E, Activity<P::Scheme, D>>>,
140    journal_partition: String,
141    journal_write_buffer: NonZeroUsize,
142    journal_replay_buffer: NonZeroUsize,
143    journal_heights_per_section: NonZeroU64,
144    journal_compression: Option<u8>,
145    journal_page_cache: CacheRef,
146
147    // ---------- Network ----------
148    /// Whether to send acks as priority messages.
149    priority_acks: bool,
150
151    // ---------- Metrics ----------
152    /// Metrics
153    metrics: metrics::Metrics<E>,
154}
155
156impl<
157        E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
158        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
159        D: Digest,
160        A: Automaton<Context = Height, Digest = D> + Clone,
161        Z: Reporter<Activity = Activity<P::Scheme, D>>,
162        M: Monitor<Index = Epoch>,
163        B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
164        T: Strategy,
165    > Engine<E, P, D, A, Z, M, B, T>
166{
167    /// Creates a new engine with the given context and configuration.
168    pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
169        // TODO(#1833): Metrics should use the post-start context
170        let metrics = metrics::Metrics::init(context.clone());
171
172        Self {
173            context: ContextCell::new(context),
174            automaton: cfg.automaton,
175            reporter: cfg.reporter,
176            monitor: cfg.monitor,
177            provider: cfg.provider,
178            blocker: cfg.blocker,
179            strategy: cfg.strategy,
180            epoch_bounds: cfg.epoch_bounds,
181            window: HeightDelta::new(cfg.window.into()),
182            activity_timeout: cfg.activity_timeout,
183            epoch: Epoch::zero(),
184            tip: Height::zero(),
185            safe_tip: SafeTip::default(),
186            digest_requests: FuturesPool::default(),
187            pending: BTreeMap::new(),
188            confirmed: BTreeMap::new(),
189            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
190            rebroadcast_deadlines: PrioritySet::new(),
191            journal: None,
192            journal_partition: cfg.journal_partition,
193            journal_write_buffer: cfg.journal_write_buffer,
194            journal_replay_buffer: cfg.journal_replay_buffer,
195            journal_heights_per_section: cfg.journal_heights_per_section,
196            journal_compression: cfg.journal_compression,
197            journal_page_cache: cfg.journal_page_cache,
198            priority_acks: cfg.priority_acks,
199            metrics,
200        }
201    }
202
203    /// Gets the scheme for a given epoch, returning an error if unavailable.
204    fn scheme(&self, epoch: Epoch) -> Result<Arc<P::Scheme>, Error> {
205        self.provider
206            .scoped(epoch)
207            .ok_or(Error::UnknownEpoch(epoch))
208    }
209
210    /// Runs the engine until the context is stopped.
211    ///
212    /// The engine will handle:
213    /// - Requesting and processing digests from the automaton
214    /// - Timeouts
215    ///   - Refreshing the Epoch
216    ///   - Rebroadcasting Acks
217    /// - Messages from the network:
218    ///   - Acks from other validators
219    pub fn start(
220        mut self,
221        network: (
222            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
223            impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
224        ),
225    ) -> Handle<()> {
226        spawn_cell!(self.context, self.run(network).await)
227    }
228
229    /// Inner run loop called by `start`.
230    async fn run(
231        mut self,
232        network: (
233            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
234            impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
235        ),
236    ) {
237        let (mut sender, mut receiver) = wrap(
238            (),
239            self.context.network_buffer_pool().clone(),
240            network.0,
241            network.1,
242        );
243
244        // Initialize the epoch
245        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
246        self.epoch = latest;
247
248        // Initialize Journal
249        let journal_cfg = JConfig {
250            partition: self.journal_partition.clone(),
251            compression: self.journal_compression,
252            codec_config: P::Scheme::certificate_codec_config_unbounded(),
253            page_cache: self.journal_page_cache.clone(),
254            write_buffer: self.journal_write_buffer,
255        };
256        let journal = Journal::init(
257            self.context.with_label("journal").into_present(),
258            journal_cfg,
259        )
260        .await
261        .expect("init failed");
262        let unverified_heights = self.replay(&journal).await;
263        self.journal = Some(journal);
264
265        // Request digests for unverified heights
266        for height in unverified_heights {
267            trace!(%height, "requesting digest for unverified height from replay");
268            self.get_digest(height);
269        }
270
271        // Initialize the tip manager
272        let scheme = self
273            .scheme(self.epoch)
274            .expect("current epoch scheme must exist");
275        self.safe_tip.init(scheme.participants());
276
277        select_loop! {
278            self.context,
279            on_start => {
280                let _ = self.metrics.tip.try_set(self.tip.get());
281
282                // Propose a new digest if we are processing less than the window
283                let next = self.next();
284
285                // Underflow safe: next >= self.tip is guaranteed by next()
286                if next.delta_from(self.tip).unwrap() < self.window {
287                    trace!(%next, "requesting new digest");
288                    assert!(self
289                        .pending
290                        .insert(next, Pending::Unverified(BTreeMap::new()))
291                        .is_none());
292                    self.get_digest(next);
293                    continue;
294                }
295
296                // Get the rebroadcast deadline for the next height
297                let rebroadcast = match self.rebroadcast_deadlines.peek() {
298                    Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
299                    None => Either::Right(future::pending()),
300                };
301            },
302            on_stopped => {
303                debug!("shutdown");
304            },
305            // Handle refresh epoch deadline
306            Some(epoch) = epoch_updates.recv() else {
307                error!("epoch subscription failed");
308                break;
309            } => {
310                // Refresh the epoch
311                debug!(current = %self.epoch, new = %epoch, "refresh epoch");
312                assert!(epoch >= self.epoch);
313                self.epoch = epoch;
314
315                // Update the tip manager
316                let scheme = self
317                    .scheme(self.epoch)
318                    .expect("current epoch scheme must exist");
319                self.safe_tip.reconcile(scheme.participants());
320
321                // Update data structures by purging old epochs
322                let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
323                self.pending
324                    .iter_mut()
325                    .for_each(|(_, pending)| match pending {
326                        self::Pending::Unverified(acks) => {
327                            acks.retain(|epoch, _| *epoch >= min_epoch);
328                        }
329                        self::Pending::Verified(_, acks) => {
330                            acks.retain(|epoch, _| *epoch >= min_epoch);
331                        }
332                    });
333
334                continue;
335            },
336
337            // Sign a new ack
338            request = self.digest_requests.next_completed() => {
339                let DigestRequest {
340                    height,
341                    result,
342                    timer,
343                } = request;
344                drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
345                match result {
346                    Err(err) => {
347                        warn!(?err, %height, "automaton returned error");
348                        self.metrics.digest.inc(Status::Dropped);
349                    }
350                    Ok(digest) => {
351                        if let Err(err) = self.handle_digest(height, digest, &mut sender).await {
352                            debug!(?err, %height, "handle_digest failed");
353                            continue;
354                        }
355                    }
356                }
357            },
358
359            // Handle incoming acks
360            msg = receiver.recv() => {
361                // Error handling
362                let (sender, msg) = match msg {
363                    Ok(r) => r,
364                    Err(err) => {
365                        warn!(?err, "ack receiver failed");
366                        break;
367                    }
368                };
369                let mut guard = self.metrics.acks.guard(Status::Invalid);
370                let TipAck { ack, tip } = match msg {
371                    Ok(peer_ack) => peer_ack,
372                    Err(err) => {
373                        commonware_p2p::block!(self.blocker, sender, ?err, "ack decode failed");
374                        continue;
375                    }
376                };
377
378                // Update the tip manager
379                if self.safe_tip.update(sender.clone(), tip).is_some() {
380                    // Fast-forward our tip if needed
381                    let safe_tip = self.safe_tip.get();
382                    if safe_tip > self.tip {
383                        self.fast_forward_tip(safe_tip).await;
384                    }
385                }
386
387                // Validate that we need to process the ack
388                if let Err(err) = self.validate_ack(&ack, &sender) {
389                    if err.blockable() {
390                        commonware_p2p::block!(
391                            self.blocker,
392                            sender,
393                            ?err,
394                            "ack validation failure"
395                        );
396                    } else {
397                        debug!(?sender, ?err, "ack validate failed");
398                    }
399                    continue;
400                };
401
402                // Handle the ack
403                if let Err(err) = self.handle_ack(&ack).await {
404                    debug!(?err, ?sender, "ack handle failed");
405                    guard.set(Status::Failure);
406                    continue;
407                }
408
409                // Update the metrics
410                debug!(?sender, epoch = %ack.epoch, height = %ack.item.height, "ack");
411                guard.set(Status::Success);
412            },
413
414            // Rebroadcast
415            _ = rebroadcast => {
416                // Get the next height to rebroadcast
417                let (height, _) = self
418                    .rebroadcast_deadlines
419                    .pop()
420                    .expect("no rebroadcast deadline");
421                trace!(%height, "rebroadcasting");
422                if let Err(err) = self.handle_rebroadcast(height, &mut sender).await {
423                    warn!(?err, %height, "rebroadcast failed");
424                };
425            },
426        }
427
428        // Close journal on shutdown
429        if let Some(journal) = self.journal.take() {
430            journal
431                .sync_all()
432                .await
433                .expect("unable to close aggregation journal");
434        }
435    }
436
437    // ---------- Handling ----------
438
439    /// Handles a digest returned by the automaton.
440    async fn handle_digest(
441        &mut self,
442        height: Height,
443        digest: D,
444        sender: &mut WrappedSender<
445            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
446            TipAck<P::Scheme, D>,
447        >,
448    ) -> Result<(), Error> {
449        // Entry must be `Pending::Unverified`, or return early
450        if !matches!(self.pending.get(&height), Some(Pending::Unverified(_))) {
451            return Err(Error::AckHeight(height));
452        };
453
454        // Move the entry to `Pending::Verified`
455        let Some(Pending::Unverified(acks)) = self.pending.remove(&height) else {
456            panic!("Pending::Unverified entry not found");
457        };
458        self.pending
459            .insert(height, Pending::Verified(digest, BTreeMap::new()));
460
461        // Handle each `ack` as if it was received over the network. This inserts the values into
462        // the new map, and may form a certificate if enough acks are present. Only process acks
463        // that match the verified digest.
464        for epoch_acks in acks.values() {
465            for epoch_ack in epoch_acks.values() {
466                // Drop acks that don't match the verified digest
467                if epoch_ack.item.digest != digest {
468                    continue;
469                }
470
471                // Handle the ack
472                let _ = self.handle_ack(epoch_ack).await;
473            }
474            // Break early if a certificate was formed
475            if self.confirmed.contains_key(&height) {
476                break;
477            }
478        }
479
480        // Sign my own ack
481        let ack = self.sign_ack(height, digest).await?;
482
483        // Set the rebroadcast deadline for this height
484        self.rebroadcast_deadlines
485            .put(height, self.context.current() + self.rebroadcast_timeout);
486
487        // Handle ack as if it was received over the network
488        let _ = self.handle_ack(&ack).await;
489
490        // Send ack over the network.
491        self.broadcast(ack, sender).await?;
492
493        Ok(())
494    }
495
496    /// Handles an ack.
497    ///
498    /// Returns an error if the ack is invalid, or can be ignored (e.g. already exists, certificate
499    /// already exists, is outside the epoch bounds, etc.).
500    async fn handle_ack(&mut self, ack: &Ack<P::Scheme, D>) -> Result<(), Error> {
501        // Get the quorum (from scheme participants for the ack's epoch)
502        let scheme = self.scheme(ack.epoch)?;
503        let quorum = scheme.participants().quorum::<N3f1>();
504
505        // Get the acks and check digest consistency
506        let acks_by_epoch = match self.pending.get_mut(&ack.item.height) {
507            None => {
508                // If the height is not in the pending pool, it may be confirmed
509                // (i.e. we have a certificate for it).
510                return Err(Error::AckHeight(ack.item.height));
511            }
512            Some(Pending::Unverified(acks)) => acks,
513            Some(Pending::Verified(digest, acks)) => {
514                // If we have a verified digest, ensure the ack matches it
515                if ack.item.digest != *digest {
516                    return Err(Error::AckDigest(ack.item.height));
517                }
518                acks
519            }
520        };
521
522        // Add the attestation (if not already present)
523        let acks = acks_by_epoch.entry(ack.epoch).or_default();
524        if acks.contains_key(&ack.attestation.signer) {
525            return Ok(());
526        }
527        acks.insert(ack.attestation.signer, ack.clone());
528
529        // If there exists a quorum of acks with the same digest (or for the verified digest if it exists), form a certificate
530        let filtered = acks
531            .values()
532            .filter(|a| a.item.digest == ack.item.digest)
533            .collect::<Vec<_>>();
534        if filtered.len() >= quorum as usize {
535            if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
536                self.metrics.certificates.inc();
537                self.handle_certificate(certificate).await;
538            }
539        }
540
541        Ok(())
542    }
543
544    /// Handles a certificate.
545    async fn handle_certificate(&mut self, certificate: Certificate<P::Scheme, D>) {
546        // Check if we already have the certificate
547        let height = certificate.item.height;
548        if self.confirmed.contains_key(&height) {
549            return;
550        }
551
552        // Store the certificate
553        self.confirmed.insert(height, certificate.clone());
554
555        // Journal and notify the automaton
556        let certified = Activity::Certified(certificate);
557        self.record(certified.clone()).await;
558        self.sync(height).await;
559        self.reporter.report(certified).await;
560
561        // Increase the tip if needed
562        if height == self.tip {
563            // Compute the next tip
564            let mut new_tip = height.next();
565            while self.confirmed.contains_key(&new_tip) && new_tip.get() < u64::MAX {
566                new_tip = new_tip.next();
567            }
568
569            // If the next tip is larger, try to fast-forward the tip (may not be possible)
570            if new_tip > self.tip {
571                self.fast_forward_tip(new_tip).await;
572            }
573        }
574    }
575
576    /// Handles a rebroadcast request for the given height.
577    async fn handle_rebroadcast(
578        &mut self,
579        height: Height,
580        sender: &mut WrappedSender<
581            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
582            TipAck<P::Scheme, D>,
583        >,
584    ) -> Result<(), Error> {
585        let Some(Pending::Verified(digest, acks)) = self.pending.get(&height) else {
586            // The height may already be confirmed; continue silently if so
587            return Ok(());
588        };
589
590        // Get our signature
591        let scheme = self.scheme(self.epoch)?;
592        let Some(signer) = scheme.me() else {
593            return Err(Error::NotSigner(self.epoch));
594        };
595        let ack = acks
596            .get(&self.epoch)
597            .and_then(|acks| acks.get(&signer).cloned());
598        let ack = match ack {
599            Some(ack) => ack,
600            None => self.sign_ack(height, *digest).await?,
601        };
602
603        // Reinsert the height with a new deadline
604        self.rebroadcast_deadlines
605            .put(height, self.context.current() + self.rebroadcast_timeout);
606
607        // Broadcast the ack to all peers
608        self.broadcast(ack, sender).await
609    }
610
611    // ---------- Validation ----------
612
613    /// Takes a raw ack (from sender) from the p2p network and validates it.
614    ///
615    /// Returns an error if the ack is invalid.
616    fn validate_ack(
617        &mut self,
618        ack: &Ack<P::Scheme, D>,
619        sender: &<P::Scheme as Scheme>::PublicKey,
620    ) -> Result<(), Error> {
621        // Validate epoch
622        {
623            let (eb_lo, eb_hi) = self.epoch_bounds;
624            let bound_lo = self.epoch.saturating_sub(eb_lo);
625            let bound_hi = self.epoch.saturating_add(eb_hi);
626            if ack.epoch < bound_lo || ack.epoch > bound_hi {
627                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
628            }
629        }
630
631        // Validate sender matches the signer
632        let scheme = self.scheme(ack.epoch)?;
633        let participants = scheme.participants();
634        let Some(signer) = participants.index(sender) else {
635            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
636        };
637        if signer != ack.attestation.signer {
638            return Err(Error::PeerMismatch);
639        }
640
641        // Collect acks below the tip (if we don't yet have a certificate)
642        let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
643        if ack.item.height < activity_threshold {
644            return Err(Error::AckCertified(ack.item.height));
645        }
646
647        // If the height is above the tip (and the window), ignore for now
648        if ack
649            .item
650            .height
651            .delta_from(self.tip)
652            .is_some_and(|d| d >= self.window)
653        {
654            return Err(Error::AckHeight(ack.item.height));
655        }
656
657        // Validate that we don't already have the ack
658        if self.confirmed.contains_key(&ack.item.height) {
659            return Err(Error::AckCertified(ack.item.height));
660        }
661        let have_ack = match self.pending.get(&ack.item.height) {
662            None => false,
663            Some(Pending::Unverified(epoch_map)) => epoch_map
664                .get(&ack.epoch)
665                .is_some_and(|acks| acks.contains_key(&ack.attestation.signer)),
666            Some(Pending::Verified(digest, epoch_map)) => {
667                // While we check this in the `handle_ack` function, checking early here avoids an
668                // unnecessary signature check.
669                if ack.item.digest != *digest {
670                    return Err(Error::AckDigest(ack.item.height));
671                }
672                epoch_map
673                    .get(&ack.epoch)
674                    .is_some_and(|acks| acks.contains_key(&ack.attestation.signer))
675            }
676        };
677        if have_ack {
678            return Err(Error::AckDuplicate(sender.to_string(), ack.item.height));
679        }
680
681        // Validate signature
682        if !ack.verify(&mut self.context, &*scheme, &self.strategy) {
683            return Err(Error::InvalidAckSignature);
684        }
685
686        Ok(())
687    }
688
689    // ---------- Helpers ----------
690
691    /// Requests the digest from the automaton.
692    ///
693    /// Pending must contain the height.
694    fn get_digest(&mut self, height: Height) {
695        assert!(self.pending.contains_key(&height));
696        let mut automaton = self.automaton.clone();
697        let timer = self.metrics.digest_duration.timer();
698        self.digest_requests.push(async move {
699            let receiver = automaton.propose(height).await;
700            let result = receiver.await.map_err(Error::AppProposeCanceled);
701            DigestRequest {
702                height,
703                result,
704                timer,
705            }
706        });
707    }
708
709    /// Signs an ack for the given height, and digest. Stores the ack in the journal and returns it.
710    /// Returns an error if the share is unknown at the current epoch.
711    async fn sign_ack(&mut self, height: Height, digest: D) -> Result<Ack<P::Scheme, D>, Error> {
712        let scheme = self.scheme(self.epoch)?;
713        if scheme.me().is_none() {
714            return Err(Error::NotSigner(self.epoch));
715        }
716
717        // Sign the item
718        let item = Item { height, digest };
719        let ack = Ack::sign(&*scheme, self.epoch, item).ok_or(Error::NotSigner(self.epoch))?;
720
721        // Journal the ack
722        self.record(Activity::Ack(ack.clone())).await;
723        self.sync(height).await;
724
725        Ok(ack)
726    }
727
728    /// Broadcasts an ack to all peers with the appropriate priority.
729    ///
730    /// Returns an error if the sender returns an error.
731    async fn broadcast(
732        &mut self,
733        ack: Ack<P::Scheme, D>,
734        sender: &mut WrappedSender<
735            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
736            TipAck<P::Scheme, D>,
737        >,
738    ) -> Result<(), Error> {
739        sender
740            .send(
741                Recipients::All,
742                TipAck { ack, tip: self.tip },
743                self.priority_acks,
744            )
745            .await
746            .map_err(|err| {
747                warn!(?err, "failed to send ack");
748                Error::UnableToSendMessage
749            })?;
750        Ok(())
751    }
752
753    /// Returns the next height that we should process. This is the minimum height for
754    /// which we do not have a digest or an outstanding request to the automaton for the digest.
755    fn next(&self) -> Height {
756        let max_pending = self
757            .pending
758            .last_key_value()
759            .map(|(k, _)| k.next())
760            .unwrap_or_default();
761        let max_confirmed = self
762            .confirmed
763            .last_key_value()
764            .map(|(k, _)| k.next())
765            .unwrap_or_default();
766        max(self.tip, max(max_pending, max_confirmed))
767    }
768
769    /// Increases the tip to the given value, pruning stale entries.
770    ///
771    /// # Panics
772    ///
773    /// Panics if the given tip is less-than-or-equal-to the current tip.
774    async fn fast_forward_tip(&mut self, tip: Height) {
775        assert!(tip > self.tip);
776
777        // Prune data structures with buffer to prevent losing certificates
778        let activity_threshold = tip.saturating_sub(self.activity_timeout);
779        self.pending
780            .retain(|height, _| *height >= activity_threshold);
781        self.confirmed
782            .retain(|height, _| *height >= activity_threshold);
783
784        // Add tip to journal
785        self.record(Activity::Tip(tip)).await;
786        self.sync(tip).await;
787        self.reporter.report(Activity::Tip(tip)).await;
788
789        // Prune journal with buffer, ignoring errors
790        let section = self.get_journal_section(activity_threshold);
791        let journal = self.journal.as_mut().expect("journal must be initialized");
792        let _ = journal.prune(section).await;
793
794        // Update the tip
795        self.tip = tip;
796    }
797
798    // ---------- Journal ----------
799
800    /// Returns the section of the journal for the given `height`.
801    const fn get_journal_section(&self, height: Height) -> u64 {
802        height.get() / self.journal_heights_per_section.get()
803    }
804
805    /// Replays the journal, updating the state of the engine.
806    /// Returns a list of unverified pending heights that need digest requests.
807    async fn replay(&mut self, journal: &Journal<E, Activity<P::Scheme, D>>) -> Vec<Height> {
808        let mut tip = Height::default();
809        let mut certified = Vec::new();
810        let mut acks = Vec::new();
811        let stream = journal
812            .replay(0, 0, self.journal_replay_buffer)
813            .await
814            .expect("replay failed");
815        pin_mut!(stream);
816        while let Some(msg) = stream.next().await {
817            let (_, _, _, activity) = msg.expect("replay failed");
818            match activity {
819                Activity::Tip(height) => {
820                    tip = max(tip, height);
821                    self.reporter.report(Activity::Tip(height)).await;
822                }
823                Activity::Certified(certificate) => {
824                    certified.push(certificate.clone());
825                    self.reporter.report(Activity::Certified(certificate)).await;
826                }
827                Activity::Ack(ack) => {
828                    acks.push(ack.clone());
829                    self.reporter.report(Activity::Ack(ack)).await;
830                }
831            }
832        }
833
834        // Update the tip to the highest height in the journal
835        self.tip = tip;
836        let activity_threshold = tip.saturating_sub(self.activity_timeout);
837
838        // Add certified items
839        certified
840            .iter()
841            .filter(|certificate| certificate.item.height >= activity_threshold)
842            .for_each(|certificate| {
843                self.confirmed
844                    .insert(certificate.item.height, certificate.clone());
845            });
846
847        // Group acks by height
848        let mut acks_by_height: BTreeMap<Height, Vec<Ack<P::Scheme, D>>> = BTreeMap::new();
849        for ack in acks {
850            if ack.item.height >= activity_threshold
851                && !self.confirmed.contains_key(&ack.item.height)
852            {
853                acks_by_height.entry(ack.item.height).or_default().push(ack);
854            }
855        }
856
857        // Process each height's acks
858        let mut unverified = Vec::new();
859        for (height, mut acks_group) in acks_by_height {
860            // Check if we have our own ack (which means we've verified the digest)
861            let current_scheme = self.scheme(self.epoch).ok();
862            let our_signer = current_scheme.as_ref().and_then(|s| s.me());
863            let our_digest = our_signer.and_then(|signer| {
864                acks_group
865                    .iter()
866                    .find(|ack| ack.epoch == self.epoch && ack.attestation.signer == signer)
867                    .map(|ack| ack.item.digest)
868            });
869
870            // If our_digest exists, delete everything from acks_group that doesn't match it
871            if let Some(digest) = our_digest {
872                acks_group.retain(|other| other.item.digest == digest);
873            }
874
875            // Create a new epoch map
876            let mut epoch_map = BTreeMap::new();
877            for ack in acks_group {
878                epoch_map
879                    .entry(ack.epoch)
880                    .or_insert_with(BTreeMap::new)
881                    .insert(ack.attestation.signer, ack);
882            }
883
884            // Insert as Verified if we have our own ack (meaning we verified the digest),
885            // otherwise as Unverified
886            match our_digest {
887                Some(digest) => {
888                    self.pending
889                        .insert(height, Pending::Verified(digest, epoch_map));
890
891                    // If we've already generated an ack and it isn't yet confirmed, mark for immediate rebroadcast
892                    self.rebroadcast_deadlines
893                        .put(height, self.context.current());
894                }
895                None => {
896                    self.pending.insert(height, Pending::Unverified(epoch_map));
897
898                    // Add to unverified heights
899                    unverified.push(height);
900                }
901            }
902        }
903
904        // After replay, ensure we have all heights from tip to next in pending or confirmed
905        // to handle the case where we restart and some heights have no acks yet
906        let next = self.next();
907        for height in Height::range(self.tip, next) {
908            // If we already have the height in pending or confirmed, skip
909            if self.pending.contains_key(&height) || self.confirmed.contains_key(&height) {
910                continue;
911            }
912
913            // Add missing height to pending
914            self.pending
915                .insert(height, Pending::Unverified(BTreeMap::new()));
916            unverified.push(height);
917        }
918        info!(tip = %self.tip, %next, ?unverified, "replayed journal");
919
920        unverified
921    }
922
923    /// Appends an activity to the journal.
924    async fn record(&mut self, activity: Activity<P::Scheme, D>) {
925        let height = match activity {
926            Activity::Ack(ref ack) => ack.item.height,
927            Activity::Certified(ref certificate) => certificate.item.height,
928            Activity::Tip(h) => h,
929        };
930        let section = self.get_journal_section(height);
931        self.journal
932            .as_mut()
933            .expect("journal must be initialized")
934            .append(section, &activity)
935            .await
936            .expect("unable to append to journal");
937    }
938
939    /// Syncs (ensures all data is written to disk).
940    async fn sync(&mut self, height: Height) {
941        let section = self.get_journal_section(height);
942        let journal = self.journal.as_mut().expect("journal must be initialized");
943        journal.sync(section).await.expect("unable to sync journal");
944    }
945}