Skip to main content

commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Error, Item, TipAck},
7    Config,
8};
9use crate::{
10    aggregation::{scheme, types::Certificate},
11    types::{Epoch, EpochDelta, Height, HeightDelta, Participant},
12    Automaton, Monitor, Reporter,
13};
14use commonware_cryptography::{
15    certificate::{Provider, Scheme},
16    Digest,
17};
18use commonware_macros::select_loop;
19use commonware_p2p::{
20    utils::codec::{wrap, WrappedSender},
21    Blocker, Receiver, Recipients, Sender,
22};
23use commonware_parallel::Strategy;
24use commonware_runtime::{
25    buffer::paged::CacheRef,
26    spawn_cell,
27    telemetry::metrics::{histogram, status::Status, GaugeExt},
28    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
29};
30use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
31use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet};
32use futures::{
33    future::{self, Either},
34    pin_mut, StreamExt,
35};
36use rand_core::CryptoRngCore;
37use std::{
38    cmp::max,
39    collections::BTreeMap,
40    num::{NonZeroU64, NonZeroUsize},
41    sync::Arc,
42    time::{Duration, SystemTime},
43};
44use tracing::{debug, error, info, trace, warn};
45
46/// An entry for a height that does not yet have a certificate.
47enum Pending<S: Scheme, D: Digest> {
48    /// The automaton has not yet provided the digest for this height.
49    /// The signatures may have arbitrary digests.
50    Unverified(BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
51
52    /// Verified by the automaton. Now stores the digest.
53    Verified(D, BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
54}
55
56/// The type returned by the `pending` pool, used by the application to return which digest is
57/// associated with the given height.
58struct DigestRequest<D: Digest> {
59    /// The height in question.
60    height: Height,
61
62    /// The result of the verification.
63    result: Result<D, Error>,
64
65    /// Records the time taken to get the digest.
66    timer: histogram::Timer,
67}
68
69/// Instance of the engine.
70pub struct Engine<
71    E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
72    P: Provider<Scope = Epoch>,
73    D: Digest,
74    A: Automaton<Context = Height, Digest = D>,
75    Z: Reporter<Activity = Activity<P::Scheme, D>>,
76    M: Monitor<Index = Epoch>,
77    B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
78    T: Strategy,
79> {
80    // ---------- Interfaces ----------
81    context: ContextCell<E>,
82    automaton: A,
83    monitor: M,
84    provider: P,
85    reporter: Z,
86    blocker: B,
87    strategy: T,
88
89    // Pruning
90    /// A tuple representing the epochs to keep in memory.
91    /// The first element is the number of old epochs to keep.
92    /// The second element is the number of future epochs to accept.
93    ///
94    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
95    /// epochs 9, 10, 11, and 12 are kept (and accepted);
96    /// all others are pruned or rejected.
97    epoch_bounds: (EpochDelta, EpochDelta),
98
99    /// The concurrent number of chunks to process.
100    window: HeightDelta,
101
102    /// Number of heights to track below the tip when collecting acks and/or pruning.
103    activity_timeout: HeightDelta,
104
105    // Messaging
106    /// Pool of pending futures to request a digest from the automaton.
107    digest_requests: FuturesPool<DigestRequest<D>>,
108
109    // State
110    /// The current epoch.
111    epoch: Epoch,
112
113    /// The current tip.
114    tip: Height,
115
116    /// Tracks the tips of all validators.
117    safe_tip: SafeTip<<P::Scheme as Scheme>::PublicKey>,
118
119    /// The keys represent the set of all `Height` values for which we are attempting to form a
120    /// certificate, but do not yet have one. Values may be [Pending::Unverified] or [Pending::Verified],
121    /// depending on whether the automaton has verified the digest or not.
122    pending: BTreeMap<Height, Pending<P::Scheme, D>>,
123
124    /// A map of heights with a certificate. Cached in memory if needed to send to other peers.
125    confirmed: BTreeMap<Height, Certificate<P::Scheme, D>>,
126
127    // ---------- Rebroadcasting ----------
128    /// The frequency at which to rebroadcast pending heights.
129    rebroadcast_timeout: Duration,
130
131    /// A set of deadlines for rebroadcasting `Height` values that do not have a certificate.
132    rebroadcast_deadlines: PrioritySet<Height, SystemTime>,
133
134    // ---------- Journal ----------
135    /// Journal for storing acks signed by this node.
136    journal: Option<Journal<E, Activity<P::Scheme, D>>>,
137    journal_partition: String,
138    journal_write_buffer: NonZeroUsize,
139    journal_replay_buffer: NonZeroUsize,
140    journal_heights_per_section: NonZeroU64,
141    journal_compression: Option<u8>,
142    journal_page_cache: CacheRef,
143
144    // ---------- Network ----------
145    /// Whether to send acks as priority messages.
146    priority_acks: bool,
147
148    // ---------- Metrics ----------
149    /// Metrics
150    metrics: metrics::Metrics,
151}
152
153impl<
154        E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
155        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
156        D: Digest,
157        A: Automaton<Context = Height, Digest = D>,
158        Z: Reporter<Activity = Activity<P::Scheme, D>>,
159        M: Monitor<Index = Epoch>,
160        B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
161        T: Strategy,
162    > Engine<E, P, D, A, Z, M, B, T>
163{
164    /// Creates a new engine with the given context and configuration.
165    pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
166        let metrics = metrics::Metrics::init(&context);
167
168        Self {
169            context: ContextCell::new(context),
170            automaton: cfg.automaton,
171            reporter: cfg.reporter,
172            monitor: cfg.monitor,
173            provider: cfg.provider,
174            blocker: cfg.blocker,
175            strategy: cfg.strategy,
176            epoch_bounds: cfg.epoch_bounds,
177            window: HeightDelta::new(cfg.window.into()),
178            activity_timeout: cfg.activity_timeout,
179            epoch: Epoch::zero(),
180            tip: Height::zero(),
181            safe_tip: SafeTip::default(),
182            digest_requests: FuturesPool::default(),
183            pending: BTreeMap::new(),
184            confirmed: BTreeMap::new(),
185            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
186            rebroadcast_deadlines: PrioritySet::new(),
187            journal: None,
188            journal_partition: cfg.journal_partition,
189            journal_write_buffer: cfg.journal_write_buffer,
190            journal_replay_buffer: cfg.journal_replay_buffer,
191            journal_heights_per_section: cfg.journal_heights_per_section,
192            journal_compression: cfg.journal_compression,
193            journal_page_cache: cfg.journal_page_cache,
194            priority_acks: cfg.priority_acks,
195            metrics,
196        }
197    }
198
199    /// Gets the scheme for a given epoch, returning an error if unavailable.
200    fn scheme(&self, epoch: Epoch) -> Result<Arc<P::Scheme>, Error> {
201        self.provider
202            .scoped(epoch)
203            .ok_or(Error::UnknownEpoch(epoch))
204    }
205
206    /// Runs the engine until the context is stopped.
207    ///
208    /// The engine will handle:
209    /// - Requesting and processing digests from the automaton
210    /// - Timeouts
211    ///   - Refreshing the Epoch
212    ///   - Rebroadcasting Acks
213    /// - Messages from the network:
214    ///   - Acks from other validators
215    pub fn start(
216        mut self,
217        network: (
218            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
219            impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
220        ),
221    ) -> Handle<()> {
222        spawn_cell!(self.context, self.run(network))
223    }
224
225    /// Inner run loop called by `start`.
226    async fn run(
227        mut self,
228        network: (
229            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
230            impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
231        ),
232    ) {
233        let (mut sender, mut receiver) = wrap(
234            (),
235            self.context.network_buffer_pool().clone(),
236            network.0,
237            network.1,
238        );
239
240        // Initialize the epoch
241        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
242        self.epoch = latest;
243
244        // Initialize Journal
245        let journal_cfg = JConfig {
246            partition: self.journal_partition.clone(),
247            compression: self.journal_compression,
248            codec_config: P::Scheme::certificate_codec_config_unbounded(),
249            page_cache: self.journal_page_cache.clone(),
250            write_buffer: self.journal_write_buffer,
251        };
252        let journal = Journal::init(self.context.child("journal"), journal_cfg)
253            .await
254            .expect("init failed");
255        let unverified_heights = self.replay(&journal).await;
256        self.journal = Some(journal);
257
258        // Request digests for unverified heights
259        for height in unverified_heights {
260            trace!(%height, "requesting digest for unverified height from replay");
261            self.get_digest(height);
262        }
263
264        // Initialize the tip manager
265        let scheme = self
266            .scheme(self.epoch)
267            .expect("current epoch scheme must exist");
268        self.safe_tip.init(scheme.participants());
269
270        select_loop! {
271            self.context,
272            on_start => {
273                let _ = self.metrics.tip.try_set(self.tip.get());
274
275                // Propose a new digest if we are processing less than the window
276                let next = self.next();
277
278                // Underflow safe: next >= self.tip is guaranteed by next()
279                if next.delta_from(self.tip).unwrap() < self.window {
280                    trace!(%next, "requesting new digest");
281                    assert!(self
282                        .pending
283                        .insert(next, Pending::Unverified(BTreeMap::new()))
284                        .is_none());
285                    self.get_digest(next);
286                    continue;
287                }
288
289                // Get the rebroadcast deadline for the next height
290                let rebroadcast = match self.rebroadcast_deadlines.peek() {
291                    Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
292                    None => Either::Right(future::pending()),
293                };
294            },
295            on_stopped => {
296                debug!("shutdown");
297            },
298            // Handle refresh epoch deadline
299            Some(epoch) = epoch_updates.recv() else {
300                error!("epoch subscription failed");
301                break;
302            } => {
303                // Refresh the epoch
304                debug!(current = %self.epoch, new = %epoch, "refresh epoch");
305                assert!(epoch >= self.epoch);
306                self.epoch = epoch;
307
308                // Update the tip manager
309                let scheme = self
310                    .scheme(self.epoch)
311                    .expect("current epoch scheme must exist");
312                self.safe_tip.reconcile(scheme.participants());
313
314                // Update data structures by purging old epochs
315                let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
316                self.pending
317                    .iter_mut()
318                    .for_each(|(_, pending)| match pending {
319                        self::Pending::Unverified(acks) => {
320                            acks.retain(|epoch, _| *epoch >= min_epoch);
321                        }
322                        self::Pending::Verified(_, acks) => {
323                            acks.retain(|epoch, _| *epoch >= min_epoch);
324                        }
325                    });
326
327                continue;
328            },
329
330            // Sign a new ack
331            request = self.digest_requests.next_completed() => {
332                let DigestRequest {
333                    height,
334                    result,
335                    timer,
336                } = request;
337                match result {
338                    Err(err) => {
339                        warn!(?err, %height, "automaton returned error");
340                        self.metrics.digest.inc(Status::Dropped);
341                    }
342                    Ok(digest) => {
343                        timer.observe(self.context.as_ref());
344                        if let Err(err) = self.handle_digest(height, digest, &mut sender).await {
345                            debug!(?err, %height, "handle_digest failed");
346                            continue;
347                        }
348                    }
349                }
350            },
351
352            // Handle incoming acks
353            msg = receiver.recv() => {
354                // Error handling
355                let (sender, msg) = match msg {
356                    Ok(r) => r,
357                    Err(err) => {
358                        warn!(?err, "ack receiver failed");
359                        break;
360                    }
361                };
362                let mut guard = self.metrics.acks.guard(Status::Invalid);
363                let TipAck { ack, tip } = match msg {
364                    Ok(peer_ack) => peer_ack,
365                    Err(err) => {
366                        commonware_p2p::block!(self.blocker, sender, ?err, "ack decode failed");
367                        continue;
368                    }
369                };
370
371                // Update the tip manager
372                if self.safe_tip.update(sender.clone(), tip).is_some() {
373                    // Fast-forward our tip if needed
374                    let safe_tip = self.safe_tip.get();
375                    if safe_tip > self.tip {
376                        self.fast_forward_tip(safe_tip).await;
377                    }
378                }
379
380                // Validate that we need to process the ack
381                if let Err(err) = self.validate_ack(&ack, &sender) {
382                    if err.blockable() {
383                        commonware_p2p::block!(
384                            self.blocker,
385                            sender,
386                            ?err,
387                            "ack validation failure"
388                        );
389                    } else {
390                        debug!(?sender, ?err, "ack validate failed");
391                    }
392                    continue;
393                };
394
395                // Handle the ack
396                if let Err(err) = self.handle_ack(&ack).await {
397                    debug!(?err, ?sender, "ack handle failed");
398                    guard.set(Status::Failure);
399                    continue;
400                }
401
402                // Update the metrics
403                debug!(?sender, epoch = %ack.epoch, height = %ack.item.height, "ack");
404                guard.set(Status::Success);
405            },
406
407            // Rebroadcast
408            _ = rebroadcast => {
409                // Get the next height to rebroadcast
410                let (height, _) = self
411                    .rebroadcast_deadlines
412                    .pop()
413                    .expect("no rebroadcast deadline");
414                trace!(%height, "rebroadcasting");
415                if let Err(err) = self.handle_rebroadcast(height, &mut sender).await {
416                    warn!(?err, %height, "rebroadcast failed");
417                };
418            },
419        }
420
421        // Close journal on shutdown
422        if let Some(journal) = self.journal.take() {
423            journal
424                .sync_all()
425                .await
426                .expect("unable to close aggregation journal");
427        }
428    }
429
430    // ---------- Handling ----------
431
432    /// Handles a digest returned by the automaton.
433    async fn handle_digest(
434        &mut self,
435        height: Height,
436        digest: D,
437        sender: &mut WrappedSender<
438            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
439            TipAck<P::Scheme, D>,
440        >,
441    ) -> Result<(), Error> {
442        // Entry must be `Pending::Unverified`, or return early
443        if !matches!(self.pending.get(&height), Some(Pending::Unverified(_))) {
444            return Err(Error::AckHeight(height));
445        };
446
447        // Move the entry to `Pending::Verified`
448        let Some(Pending::Unverified(acks)) = self.pending.remove(&height) else {
449            panic!("Pending::Unverified entry not found");
450        };
451        self.pending
452            .insert(height, Pending::Verified(digest, BTreeMap::new()));
453
454        // Handle each `ack` as if it was received over the network. This inserts the values into
455        // the new map, and may form a certificate if enough acks are present. Only process acks
456        // that match the verified digest.
457        for epoch_acks in acks.values() {
458            for epoch_ack in epoch_acks.values() {
459                // Drop acks that don't match the verified digest
460                if epoch_ack.item.digest != digest {
461                    continue;
462                }
463
464                // Handle the ack
465                let _ = self.handle_ack(epoch_ack).await;
466            }
467            // Break early if a certificate was formed
468            if self.confirmed.contains_key(&height) {
469                break;
470            }
471        }
472
473        // Sign my own ack
474        let ack = self.sign_ack(height, digest).await?;
475
476        // Set the rebroadcast deadline for this height
477        self.rebroadcast_deadlines
478            .put(height, self.context.current() + self.rebroadcast_timeout);
479
480        // Handle ack as if it was received over the network
481        let _ = self.handle_ack(&ack).await;
482
483        // Send ack over the network.
484        self.broadcast(ack, sender);
485
486        Ok(())
487    }
488
489    /// Handles an ack.
490    ///
491    /// Returns an error if the ack is invalid, or can be ignored (e.g. already exists, certificate
492    /// already exists, is outside the epoch bounds, etc.).
493    async fn handle_ack(&mut self, ack: &Ack<P::Scheme, D>) -> Result<(), Error> {
494        // Get the quorum (from scheme participants for the ack's epoch)
495        let scheme = self.scheme(ack.epoch)?;
496        let quorum = scheme.participants().quorum::<N3f1>();
497
498        // Get the acks and check digest consistency
499        let acks_by_epoch = match self.pending.get_mut(&ack.item.height) {
500            None => {
501                // If the height is not in the pending pool, it may be confirmed
502                // (i.e. we have a certificate for it).
503                return Err(Error::AckHeight(ack.item.height));
504            }
505            Some(Pending::Unverified(acks)) => acks,
506            Some(Pending::Verified(digest, acks)) => {
507                // If we have a verified digest, ensure the ack matches it
508                if ack.item.digest != *digest {
509                    return Err(Error::AckDigest(ack.item.height));
510                }
511                acks
512            }
513        };
514
515        // Add the attestation (if not already present)
516        let acks = acks_by_epoch.entry(ack.epoch).or_default();
517        if acks.contains_key(&ack.attestation.signer) {
518            return Ok(());
519        }
520        acks.insert(ack.attestation.signer, ack.clone());
521
522        // If there exists a quorum of acks with the same digest (or for the verified digest if it exists), form a certificate
523        let filtered = acks
524            .values()
525            .filter(|a| a.item.digest == ack.item.digest)
526            .collect::<Vec<_>>();
527        if filtered.len() >= quorum as usize {
528            if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
529                self.metrics.certificates.inc();
530                self.handle_certificate(certificate).await;
531            }
532        }
533
534        Ok(())
535    }
536
537    /// Handles a certificate.
538    async fn handle_certificate(&mut self, certificate: Certificate<P::Scheme, D>) {
539        // Check if we already have the certificate
540        let height = certificate.item.height;
541        if self.confirmed.contains_key(&height) {
542            return;
543        }
544
545        // Store the certificate
546        self.confirmed.insert(height, certificate.clone());
547
548        // Journal and notify the automaton
549        let certified = Activity::Certified(certificate);
550        self.record(certified.clone()).await;
551        self.sync(height).await;
552        self.reporter.report(certified);
553
554        // Increase the tip if needed
555        if height == self.tip {
556            // Compute the next tip
557            let mut new_tip = height.next();
558            while self.confirmed.contains_key(&new_tip) && new_tip.get() < u64::MAX {
559                new_tip = new_tip.next();
560            }
561
562            // If the next tip is larger, try to fast-forward the tip (may not be possible)
563            if new_tip > self.tip {
564                self.fast_forward_tip(new_tip).await;
565            }
566        }
567    }
568
569    /// Handles a rebroadcast request for the given height.
570    async fn handle_rebroadcast(
571        &mut self,
572        height: Height,
573        sender: &mut WrappedSender<
574            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
575            TipAck<P::Scheme, D>,
576        >,
577    ) -> Result<(), Error> {
578        let Some(Pending::Verified(digest, acks)) = self.pending.get(&height) else {
579            // The height may already be confirmed; continue silently if so
580            return Ok(());
581        };
582
583        // Get our signature
584        let scheme = self.scheme(self.epoch)?;
585        let Some(signer) = scheme.me() else {
586            return Err(Error::NotSigner(self.epoch));
587        };
588        let ack = acks
589            .get(&self.epoch)
590            .and_then(|acks| acks.get(&signer).cloned());
591        let ack = match ack {
592            Some(ack) => ack,
593            None => self.sign_ack(height, *digest).await?,
594        };
595
596        // Reinsert the height with a new deadline
597        self.rebroadcast_deadlines
598            .put(height, self.context.current() + self.rebroadcast_timeout);
599
600        // Broadcast the ack to all peers
601        self.broadcast(ack, sender);
602
603        Ok(())
604    }
605
606    // ---------- Validation ----------
607
608    /// Takes a raw ack (from sender) from the p2p network and validates it.
609    ///
610    /// Returns an error if the ack is invalid.
611    fn validate_ack(
612        &mut self,
613        ack: &Ack<P::Scheme, D>,
614        sender: &<P::Scheme as Scheme>::PublicKey,
615    ) -> Result<(), Error> {
616        // Validate epoch
617        {
618            let (eb_lo, eb_hi) = self.epoch_bounds;
619            let bound_lo = self.epoch.saturating_sub(eb_lo);
620            let bound_hi = self.epoch.saturating_add(eb_hi);
621            if ack.epoch < bound_lo || ack.epoch > bound_hi {
622                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
623            }
624        }
625
626        // Validate sender matches the signer
627        let scheme = self.scheme(ack.epoch)?;
628        let participants = scheme.participants();
629        let Some(signer) = participants.index(sender) else {
630            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
631        };
632        if signer != ack.attestation.signer {
633            return Err(Error::PeerMismatch);
634        }
635
636        // Collect acks below the tip (if we don't yet have a certificate)
637        let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
638        if ack.item.height < activity_threshold {
639            return Err(Error::AckCertified(ack.item.height));
640        }
641
642        // If the height is above the tip (and the window), ignore for now
643        if ack
644            .item
645            .height
646            .delta_from(self.tip)
647            .is_some_and(|d| d >= self.window)
648        {
649            return Err(Error::AckHeight(ack.item.height));
650        }
651
652        // Validate that we don't already have the ack
653        if self.confirmed.contains_key(&ack.item.height) {
654            return Err(Error::AckCertified(ack.item.height));
655        }
656        let have_ack = match self.pending.get(&ack.item.height) {
657            None => false,
658            Some(Pending::Unverified(epoch_map)) => epoch_map
659                .get(&ack.epoch)
660                .is_some_and(|acks| acks.contains_key(&ack.attestation.signer)),
661            Some(Pending::Verified(digest, epoch_map)) => {
662                // While we check this in the `handle_ack` function, checking early here avoids an
663                // unnecessary signature check.
664                if ack.item.digest != *digest {
665                    return Err(Error::AckDigest(ack.item.height));
666                }
667                epoch_map
668                    .get(&ack.epoch)
669                    .is_some_and(|acks| acks.contains_key(&ack.attestation.signer))
670            }
671        };
672        if have_ack {
673            return Err(Error::AckDuplicate(sender.to_string(), ack.item.height));
674        }
675
676        // Validate signature
677        if !ack.verify(self.context.as_mut(), &*scheme, &self.strategy) {
678            return Err(Error::InvalidAckSignature);
679        }
680
681        Ok(())
682    }
683
684    // ---------- Helpers ----------
685
686    /// Requests the digest from the automaton.
687    ///
688    /// Pending must contain the height.
689    fn get_digest(&mut self, height: Height) {
690        assert!(self.pending.contains_key(&height));
691        let mut automaton = self.automaton.clone();
692        let timer = self.metrics.digest_duration.timer(self.context.as_ref());
693        self.digest_requests.push(async move {
694            let receiver = automaton.propose(height).await;
695            let result = receiver.await.map_err(Error::AppProposeCanceled);
696            DigestRequest {
697                height,
698                result,
699                timer,
700            }
701        });
702    }
703
704    /// Signs an ack for the given height, and digest. Stores the ack in the journal and returns it.
705    /// Returns an error if the share is unknown at the current epoch.
706    async fn sign_ack(&mut self, height: Height, digest: D) -> Result<Ack<P::Scheme, D>, Error> {
707        let scheme = self.scheme(self.epoch)?;
708        if scheme.me().is_none() {
709            return Err(Error::NotSigner(self.epoch));
710        }
711
712        // Sign the item
713        let item = Item { height, digest };
714        let ack = Ack::sign(&*scheme, self.epoch, item).ok_or(Error::NotSigner(self.epoch))?;
715
716        // Journal the ack
717        self.record(Activity::Ack(ack.clone())).await;
718        self.sync(height).await;
719
720        Ok(ack)
721    }
722
723    /// Broadcasts an ack to all peers with the appropriate priority.
724    fn broadcast(
725        &mut self,
726        ack: Ack<P::Scheme, D>,
727        sender: &mut WrappedSender<
728            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
729            TipAck<P::Scheme, D>,
730        >,
731    ) {
732        sender.send(
733            Recipients::All,
734            TipAck { ack, tip: self.tip },
735            self.priority_acks,
736        );
737    }
738
739    /// Returns the next height that we should process. This is the minimum height for
740    /// which we do not have a digest or an outstanding request to the automaton for the digest.
741    fn next(&self) -> Height {
742        let max_pending = self
743            .pending
744            .last_key_value()
745            .map(|(k, _)| k.next())
746            .unwrap_or_default();
747        let max_confirmed = self
748            .confirmed
749            .last_key_value()
750            .map(|(k, _)| k.next())
751            .unwrap_or_default();
752        max(self.tip, max(max_pending, max_confirmed))
753    }
754
755    /// Increases the tip to the given value, pruning stale entries.
756    ///
757    /// # Panics
758    ///
759    /// Panics if the given tip is less-than-or-equal-to the current tip.
760    async fn fast_forward_tip(&mut self, tip: Height) {
761        assert!(tip > self.tip);
762
763        // Prune data structures with buffer to prevent losing certificates
764        let activity_threshold = tip.saturating_sub(self.activity_timeout);
765        self.pending
766            .retain(|height, _| *height >= activity_threshold);
767        self.confirmed
768            .retain(|height, _| *height >= activity_threshold);
769
770        // Add tip to journal
771        self.record(Activity::Tip(tip)).await;
772        self.sync(tip).await;
773        self.reporter.report(Activity::Tip(tip));
774
775        // Prune journal with buffer, ignoring errors
776        let section = self.get_journal_section(activity_threshold);
777        let journal = self.journal.as_mut().expect("journal must be initialized");
778        let _ = journal.prune(section).await;
779
780        // Update the tip
781        self.tip = tip;
782    }
783
784    // ---------- Journal ----------
785
786    /// Returns the section of the journal for the given `height`.
787    const fn get_journal_section(&self, height: Height) -> u64 {
788        height.get() / self.journal_heights_per_section.get()
789    }
790
791    /// Replays the journal, updating the state of the engine.
792    /// Returns a list of unverified pending heights that need digest requests.
793    async fn replay(&mut self, journal: &Journal<E, Activity<P::Scheme, D>>) -> Vec<Height> {
794        let mut tip = Height::default();
795        let mut certified = Vec::new();
796        let mut acks = Vec::new();
797        let stream = journal
798            .replay(0, 0, self.journal_replay_buffer)
799            .await
800            .expect("replay failed");
801        pin_mut!(stream);
802        while let Some(msg) = stream.next().await {
803            let (_, _, _, activity) = msg.expect("replay failed");
804            match activity {
805                Activity::Tip(height) => {
806                    tip = max(tip, height);
807                    self.reporter.report(Activity::Tip(height));
808                }
809                Activity::Certified(certificate) => {
810                    certified.push(certificate.clone());
811                    self.reporter.report(Activity::Certified(certificate));
812                }
813                Activity::Ack(ack) => {
814                    acks.push(ack.clone());
815                    self.reporter.report(Activity::Ack(ack));
816                }
817            }
818        }
819
820        // Update the tip to the highest height in the journal
821        self.tip = tip;
822        let activity_threshold = tip.saturating_sub(self.activity_timeout);
823
824        // Add certified items
825        certified
826            .iter()
827            .filter(|certificate| certificate.item.height >= activity_threshold)
828            .for_each(|certificate| {
829                self.confirmed
830                    .insert(certificate.item.height, certificate.clone());
831            });
832
833        // Group acks by height
834        let mut acks_by_height: BTreeMap<Height, Vec<Ack<P::Scheme, D>>> = BTreeMap::new();
835        for ack in acks {
836            if ack.item.height >= activity_threshold
837                && !self.confirmed.contains_key(&ack.item.height)
838            {
839                acks_by_height.entry(ack.item.height).or_default().push(ack);
840            }
841        }
842
843        // Process each height's acks
844        let mut unverified = Vec::new();
845        for (height, mut acks_group) in acks_by_height {
846            // Check if we have our own ack (which means we've verified the digest)
847            let current_scheme = self.scheme(self.epoch).ok();
848            let our_signer = current_scheme.as_ref().and_then(|s| s.me());
849            let our_digest = our_signer.and_then(|signer| {
850                acks_group
851                    .iter()
852                    .find(|ack| ack.epoch == self.epoch && ack.attestation.signer == signer)
853                    .map(|ack| ack.item.digest)
854            });
855
856            // If our_digest exists, delete everything from acks_group that doesn't match it
857            if let Some(digest) = our_digest {
858                acks_group.retain(|other| other.item.digest == digest);
859            }
860
861            // Create a new epoch map
862            let mut epoch_map = BTreeMap::new();
863            for ack in acks_group {
864                epoch_map
865                    .entry(ack.epoch)
866                    .or_insert_with(BTreeMap::new)
867                    .insert(ack.attestation.signer, ack);
868            }
869
870            // Insert as Verified if we have our own ack (meaning we verified the digest),
871            // otherwise as Unverified
872            match our_digest {
873                Some(digest) => {
874                    self.pending
875                        .insert(height, Pending::Verified(digest, epoch_map));
876
877                    // If we've already generated an ack and it isn't yet confirmed, mark for immediate rebroadcast
878                    self.rebroadcast_deadlines
879                        .put(height, self.context.current());
880                }
881                None => {
882                    self.pending.insert(height, Pending::Unverified(epoch_map));
883
884                    // Add to unverified heights
885                    unverified.push(height);
886                }
887            }
888        }
889
890        // After replay, ensure we have all heights from tip to next in pending or confirmed
891        // to handle the case where we restart and some heights have no acks yet
892        let next = self.next();
893        for height in Height::range(self.tip, next) {
894            // If we already have the height in pending or confirmed, skip
895            if self.pending.contains_key(&height) || self.confirmed.contains_key(&height) {
896                continue;
897            }
898
899            // Add missing height to pending
900            self.pending
901                .insert(height, Pending::Unverified(BTreeMap::new()));
902            unverified.push(height);
903        }
904        info!(tip = %self.tip, %next, ?unverified, "replayed journal");
905
906        unverified
907    }
908
909    /// Appends an activity to the journal.
910    async fn record(&mut self, activity: Activity<P::Scheme, D>) {
911        let height = match activity {
912            Activity::Ack(ref ack) => ack.item.height,
913            Activity::Certified(ref certificate) => certificate.item.height,
914            Activity::Tip(h) => h,
915        };
916        let section = self.get_journal_section(height);
917        self.journal
918            .as_mut()
919            .expect("journal must be initialized")
920            .append(section, &activity)
921            .await
922            .expect("unable to append to journal");
923    }
924
925    /// Syncs (ensures all data is written to disk).
926    async fn sync(&mut self, height: Height) {
927        let section = self.get_journal_section(height);
928        let journal = self.journal.as_mut().expect("journal must be initialized");
929        journal.sync(section).await.expect("unable to sync journal");
930    }
931}