commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Error, Item, TipAck},
7    Config,
8};
9use crate::{
10    aggregation::{scheme, types::Certificate},
11    types::{Epoch, EpochDelta, Height, HeightDelta, Participant},
12    Automaton, Monitor, Reporter,
13};
14use commonware_cryptography::{
15    certificate::{Provider, Scheme},
16    Digest,
17};
18use commonware_macros::select;
19use commonware_p2p::{
20    utils::codec::{wrap, WrappedSender},
21    Blocker, Receiver, Recipients, Sender,
22};
23use commonware_parallel::Strategy;
24use commonware_runtime::{
25    buffer::PoolRef,
26    spawn_cell,
27    telemetry::metrics::{
28        histogram,
29        status::{CounterExt, GaugeExt, Status},
30    },
31    Clock, ContextCell, Handle, Metrics, Spawner, Storage,
32};
33use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
34use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet};
35use futures::{
36    future::{self, Either},
37    pin_mut, StreamExt,
38};
39use rand_core::CryptoRngCore;
40use std::{
41    cmp::max,
42    collections::BTreeMap,
43    num::{NonZeroU64, NonZeroUsize},
44    sync::Arc,
45    time::{Duration, SystemTime},
46};
47use tracing::{debug, error, info, trace, warn};
48
49/// An entry for a height that does not yet have a certificate.
50enum Pending<S: Scheme, D: Digest> {
51    /// The automaton has not yet provided the digest for this height.
52    /// The signatures may have arbitrary digests.
53    Unverified(BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
54
55    /// Verified by the automaton. Now stores the digest.
56    Verified(D, BTreeMap<Epoch, BTreeMap<Participant, Ack<S, D>>>),
57}
58
59/// The type returned by the `pending` pool, used by the application to return which digest is
60/// associated with the given height.
61struct DigestRequest<D: Digest, E: Clock> {
62    /// The height in question.
63    height: Height,
64
65    /// The result of the verification.
66    result: Result<D, Error>,
67
68    /// Records the time taken to get the digest.
69    timer: histogram::Timer<E>,
70}
71
72/// Instance of the engine.
73pub struct Engine<
74    E: Clock + Spawner + Storage + Metrics + CryptoRngCore,
75    P: Provider<Scope = Epoch>,
76    D: Digest,
77    A: Automaton<Context = Height, Digest = D> + Clone,
78    Z: Reporter<Activity = Activity<P::Scheme, D>>,
79    M: Monitor<Index = Epoch>,
80    B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
81    T: Strategy,
82> {
83    // ---------- Interfaces ----------
84    context: ContextCell<E>,
85    automaton: A,
86    monitor: M,
87    provider: P,
88    reporter: Z,
89    blocker: B,
90    strategy: T,
91
92    // Pruning
93    /// A tuple representing the epochs to keep in memory.
94    /// The first element is the number of old epochs to keep.
95    /// The second element is the number of future epochs to accept.
96    ///
97    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
98    /// epochs 9, 10, 11, and 12 are kept (and accepted);
99    /// all others are pruned or rejected.
100    epoch_bounds: (EpochDelta, EpochDelta),
101
102    /// The concurrent number of chunks to process.
103    window: HeightDelta,
104
105    /// Number of heights to track below the tip when collecting acks and/or pruning.
106    activity_timeout: HeightDelta,
107
108    // Messaging
109    /// Pool of pending futures to request a digest from the automaton.
110    digest_requests: FuturesPool<DigestRequest<D, E>>,
111
112    // State
113    /// The current epoch.
114    epoch: Epoch,
115
116    /// The current tip.
117    tip: Height,
118
119    /// Tracks the tips of all validators.
120    safe_tip: SafeTip<<P::Scheme as Scheme>::PublicKey>,
121
122    /// The keys represent the set of all `Height` values for which we are attempting to form a
123    /// certificate, but do not yet have one. Values may be [Pending::Unverified] or [Pending::Verified],
124    /// depending on whether the automaton has verified the digest or not.
125    pending: BTreeMap<Height, Pending<P::Scheme, D>>,
126
127    /// A map of heights with a certificate. Cached in memory if needed to send to other peers.
128    confirmed: BTreeMap<Height, Certificate<P::Scheme, D>>,
129
130    // ---------- Rebroadcasting ----------
131    /// The frequency at which to rebroadcast pending heights.
132    rebroadcast_timeout: Duration,
133
134    /// A set of deadlines for rebroadcasting `Height` values that do not have a certificate.
135    rebroadcast_deadlines: PrioritySet<Height, SystemTime>,
136
137    // ---------- Journal ----------
138    /// Journal for storing acks signed by this node.
139    journal: Option<Journal<E, Activity<P::Scheme, D>>>,
140    journal_partition: String,
141    journal_write_buffer: NonZeroUsize,
142    journal_replay_buffer: NonZeroUsize,
143    journal_heights_per_section: NonZeroU64,
144    journal_compression: Option<u8>,
145    journal_buffer_pool: PoolRef,
146
147    // ---------- Network ----------
148    /// Whether to send acks as priority messages.
149    priority_acks: bool,
150
151    // ---------- Metrics ----------
152    /// Metrics
153    metrics: metrics::Metrics<E>,
154}
155
156impl<
157        E: Clock + Spawner + Storage + Metrics + CryptoRngCore,
158        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
159        D: Digest,
160        A: Automaton<Context = Height, Digest = D> + Clone,
161        Z: Reporter<Activity = Activity<P::Scheme, D>>,
162        M: Monitor<Index = Epoch>,
163        B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
164        T: Strategy,
165    > Engine<E, P, D, A, Z, M, B, T>
166{
167    /// Creates a new engine with the given context and configuration.
168    pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
169        // TODO(#1833): Metrics should use the post-start context
170        let metrics = metrics::Metrics::init(context.clone());
171
172        Self {
173            context: ContextCell::new(context),
174            automaton: cfg.automaton,
175            reporter: cfg.reporter,
176            monitor: cfg.monitor,
177            provider: cfg.provider,
178            blocker: cfg.blocker,
179            strategy: cfg.strategy,
180            epoch_bounds: cfg.epoch_bounds,
181            window: HeightDelta::new(cfg.window.into()),
182            activity_timeout: cfg.activity_timeout,
183            epoch: Epoch::zero(),
184            tip: Height::zero(),
185            safe_tip: SafeTip::default(),
186            digest_requests: FuturesPool::default(),
187            pending: BTreeMap::new(),
188            confirmed: BTreeMap::new(),
189            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
190            rebroadcast_deadlines: PrioritySet::new(),
191            journal: None,
192            journal_partition: cfg.journal_partition,
193            journal_write_buffer: cfg.journal_write_buffer,
194            journal_replay_buffer: cfg.journal_replay_buffer,
195            journal_heights_per_section: cfg.journal_heights_per_section,
196            journal_compression: cfg.journal_compression,
197            journal_buffer_pool: cfg.journal_buffer_pool,
198            priority_acks: cfg.priority_acks,
199            metrics,
200        }
201    }
202
203    /// Gets the scheme for a given epoch, returning an error if unavailable.
204    fn scheme(&self, epoch: Epoch) -> Result<Arc<P::Scheme>, Error> {
205        self.provider
206            .scoped(epoch)
207            .ok_or(Error::UnknownEpoch(epoch))
208    }
209
210    /// Runs the engine until the context is stopped.
211    ///
212    /// The engine will handle:
213    /// - Requesting and processing digests from the automaton
214    /// - Timeouts
215    ///   - Refreshing the Epoch
216    ///   - Rebroadcasting Acks
217    /// - Messages from the network:
218    ///   - Acks from other validators
219    pub fn start(
220        mut self,
221        network: (
222            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
223            impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
224        ),
225    ) -> Handle<()> {
226        spawn_cell!(self.context, self.run(network).await)
227    }
228
229    /// Inner run loop called by `start`.
230    async fn run(
231        mut self,
232        network: (
233            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
234            impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
235        ),
236    ) {
237        let (mut sender, mut receiver) = wrap((), network.0, network.1);
238        let mut shutdown = self.context.stopped();
239
240        // Initialize the epoch
241        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
242        self.epoch = latest;
243
244        // Initialize Journal
245        let journal_cfg = JConfig {
246            partition: self.journal_partition.clone(),
247            compression: self.journal_compression,
248            codec_config: P::Scheme::certificate_codec_config_unbounded(),
249            buffer_pool: self.journal_buffer_pool.clone(),
250            write_buffer: self.journal_write_buffer,
251        };
252        let journal = Journal::init(
253            self.context.with_label("journal").into_present(),
254            journal_cfg,
255        )
256        .await
257        .expect("init failed");
258        let unverified_heights = self.replay(&journal).await;
259        self.journal = Some(journal);
260
261        // Request digests for unverified heights
262        for height in unverified_heights {
263            trace!(%height, "requesting digest for unverified height from replay");
264            self.get_digest(height);
265        }
266
267        // Initialize the tip manager
268        let scheme = self
269            .scheme(self.epoch)
270            .expect("current epoch scheme must exist");
271        self.safe_tip.init(scheme.participants());
272
273        loop {
274            let _ = self.metrics.tip.try_set(self.tip.get());
275
276            // Propose a new digest if we are processing less than the window
277            let next = self.next();
278
279            // Underflow safe: next >= self.tip is guaranteed by next()
280            if next.delta_from(self.tip).unwrap() < self.window {
281                trace!(%next, "requesting new digest");
282                assert!(self
283                    .pending
284                    .insert(next, Pending::Unverified(BTreeMap::new()))
285                    .is_none());
286                self.get_digest(next);
287                continue;
288            }
289
290            // Get the rebroadcast deadline for the next height
291            let rebroadcast = match self.rebroadcast_deadlines.peek() {
292                Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
293                None => Either::Right(future::pending()),
294            };
295
296            // Process the next event
297            select! {
298                // Handle shutdown signal
299                _ = &mut shutdown => {
300                    debug!("shutdown");
301                    break;
302                },
303
304                // Handle refresh epoch deadline
305                epoch = epoch_updates.next() => {
306                    // Error handling
307                    let Some(epoch) = epoch else {
308                        error!("epoch subscription failed");
309                        break;
310                    };
311
312                    // Refresh the epoch
313                    debug!(current = %self.epoch, new = %epoch, "refresh epoch");
314                    assert!(epoch >= self.epoch);
315                    self.epoch = epoch;
316
317                    // Update the tip manager
318                    let scheme = self.scheme(self.epoch)
319                        .expect("current epoch scheme must exist");
320                    self.safe_tip.reconcile(scheme.participants());
321
322                    // Update data structures by purging old epochs
323                    let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
324                    self.pending.iter_mut().for_each(|(_, pending)| {
325                        match pending {
326                            Pending::Unverified(acks) => {
327                                acks.retain(|epoch, _| *epoch >= min_epoch);
328                            }
329                            Pending::Verified(_, acks) => {
330                                acks.retain(|epoch, _| *epoch >= min_epoch);
331                            }
332                        }
333                    });
334
335                    continue;
336                },
337
338                // Sign a new ack
339                request = self.digest_requests.next_completed() => {
340                    let DigestRequest { height, result, timer } = request;
341                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
342                    match result {
343                        Err(err) => {
344                            warn!(?err, %height, "automaton returned error");
345                            self.metrics.digest.inc(Status::Dropped);
346                        }
347                        Ok(digest) => {
348                            if let Err(err) = self.handle_digest(height, digest, &mut sender).await {
349                                debug!(?err, %height, "handle_digest failed");
350                                continue;
351                            }
352                        }
353                    }
354                },
355
356                // Handle incoming acks
357                msg = receiver.recv() => {
358                    // Error handling
359                    let (sender, msg) = match msg {
360                        Ok(r) => r,
361                        Err(err) => {
362                            warn!(?err, "ack receiver failed");
363                            break;
364                        }
365                    };
366                    let mut guard = self.metrics.acks.guard(Status::Invalid);
367                    let TipAck { ack, tip } = match msg {
368                        Ok(peer_ack) => peer_ack,
369                        Err(err) => {
370                            warn!(?err, ?sender, "ack decode failed, blocking peer");
371                            self.blocker.block(sender).await;
372                            continue;
373                        }
374                    };
375
376                    // Update the tip manager
377                    if self.safe_tip.update(sender.clone(), tip).is_some() {
378                        // Fast-forward our tip if needed
379                        let safe_tip = self.safe_tip.get();
380                        if safe_tip > self.tip {
381                           self.fast_forward_tip(safe_tip).await;
382                        }
383                    }
384
385                    // Validate that we need to process the ack
386                    if let Err(err) = self.validate_ack(&ack, &sender) {
387                        if err.blockable() {
388                            warn!(?sender, ?err, "blocking peer for validation failure");
389                            self.blocker.block(sender).await;
390                        } else {
391                            debug!(?sender, ?err, "ack validate failed");
392                        }
393                        continue;
394                    };
395
396                    // Handle the ack
397                    if let Err(err) = self.handle_ack(&ack).await {
398                        debug!(?err, ?sender, "ack handle failed");
399                        guard.set(Status::Failure);
400                        continue;
401                    }
402
403                    // Update the metrics
404                    debug!(?sender, epoch = %ack.epoch, height = %ack.item.height, "ack");
405                    guard.set(Status::Success);
406                },
407
408                // Rebroadcast
409                _ = rebroadcast => {
410                    // Get the next height to rebroadcast
411                    let (height, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
412                    trace!(%height, "rebroadcasting");
413                    if let Err(err) = self.handle_rebroadcast(height, &mut sender).await {
414                        warn!(?err, %height, "rebroadcast failed");
415                    };
416                }
417            }
418        }
419
420        // Close journal on shutdown
421        if let Some(journal) = self.journal.take() {
422            journal
423                .sync_all()
424                .await
425                .expect("unable to close aggregation journal");
426        }
427    }
428
429    // ---------- Handling ----------
430
431    /// Handles a digest returned by the automaton.
432    async fn handle_digest(
433        &mut self,
434        height: Height,
435        digest: D,
436        sender: &mut WrappedSender<
437            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
438            TipAck<P::Scheme, D>,
439        >,
440    ) -> Result<(), Error> {
441        // Entry must be `Pending::Unverified`, or return early
442        if !matches!(self.pending.get(&height), Some(Pending::Unverified(_))) {
443            return Err(Error::AckHeight(height));
444        };
445
446        // Move the entry to `Pending::Verified`
447        let Some(Pending::Unverified(acks)) = self.pending.remove(&height) else {
448            panic!("Pending::Unverified entry not found");
449        };
450        self.pending
451            .insert(height, Pending::Verified(digest, BTreeMap::new()));
452
453        // Handle each `ack` as if it was received over the network. This inserts the values into
454        // the new map, and may form a certificate if enough acks are present. Only process acks
455        // that match the verified digest.
456        for epoch_acks in acks.values() {
457            for epoch_ack in epoch_acks.values() {
458                // Drop acks that don't match the verified digest
459                if epoch_ack.item.digest != digest {
460                    continue;
461                }
462
463                // Handle the ack
464                let _ = self.handle_ack(epoch_ack).await;
465            }
466            // Break early if a certificate was formed
467            if self.confirmed.contains_key(&height) {
468                break;
469            }
470        }
471
472        // Sign my own ack
473        let ack = self.sign_ack(height, digest).await?;
474
475        // Set the rebroadcast deadline for this height
476        self.rebroadcast_deadlines
477            .put(height, self.context.current() + self.rebroadcast_timeout);
478
479        // Handle ack as if it was received over the network
480        let _ = self.handle_ack(&ack).await;
481
482        // Send ack over the network.
483        self.broadcast(ack, sender).await?;
484
485        Ok(())
486    }
487
488    /// Handles an ack.
489    ///
490    /// Returns an error if the ack is invalid, or can be ignored (e.g. already exists, certificate
491    /// already exists, is outside the epoch bounds, etc.).
492    async fn handle_ack(&mut self, ack: &Ack<P::Scheme, D>) -> Result<(), Error> {
493        // Get the quorum (from scheme participants for the ack's epoch)
494        let scheme = self.scheme(ack.epoch)?;
495        let quorum = scheme.participants().quorum::<N3f1>();
496
497        // Get the acks and check digest consistency
498        let acks_by_epoch = match self.pending.get_mut(&ack.item.height) {
499            None => {
500                // If the height is not in the pending pool, it may be confirmed
501                // (i.e. we have a certificate for it).
502                return Err(Error::AckHeight(ack.item.height));
503            }
504            Some(Pending::Unverified(acks)) => acks,
505            Some(Pending::Verified(digest, acks)) => {
506                // If we have a verified digest, ensure the ack matches it
507                if ack.item.digest != *digest {
508                    return Err(Error::AckDigest(ack.item.height));
509                }
510                acks
511            }
512        };
513
514        // Add the attestation (if not already present)
515        let acks = acks_by_epoch.entry(ack.epoch).or_default();
516        if acks.contains_key(&ack.attestation.signer) {
517            return Ok(());
518        }
519        acks.insert(ack.attestation.signer, ack.clone());
520
521        // If there exists a quorum of acks with the same digest (or for the verified digest if it exists), form a certificate
522        let filtered = acks
523            .values()
524            .filter(|a| a.item.digest == ack.item.digest)
525            .collect::<Vec<_>>();
526        if filtered.len() >= quorum as usize {
527            if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
528                self.metrics.certificates.inc();
529                self.handle_certificate(certificate).await;
530            }
531        }
532
533        Ok(())
534    }
535
536    /// Handles a certificate.
537    async fn handle_certificate(&mut self, certificate: Certificate<P::Scheme, D>) {
538        // Check if we already have the certificate
539        let height = certificate.item.height;
540        if self.confirmed.contains_key(&height) {
541            return;
542        }
543
544        // Store the certificate
545        self.confirmed.insert(height, certificate.clone());
546
547        // Journal and notify the automaton
548        let certified = Activity::Certified(certificate);
549        self.record(certified.clone()).await;
550        self.sync(height).await;
551        self.reporter.report(certified).await;
552
553        // Increase the tip if needed
554        if height == self.tip {
555            // Compute the next tip
556            let mut new_tip = height.next();
557            while self.confirmed.contains_key(&new_tip) && new_tip.get() < u64::MAX {
558                new_tip = new_tip.next();
559            }
560
561            // If the next tip is larger, try to fast-forward the tip (may not be possible)
562            if new_tip > self.tip {
563                self.fast_forward_tip(new_tip).await;
564            }
565        }
566    }
567
568    /// Handles a rebroadcast request for the given height.
569    async fn handle_rebroadcast(
570        &mut self,
571        height: Height,
572        sender: &mut WrappedSender<
573            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
574            TipAck<P::Scheme, D>,
575        >,
576    ) -> Result<(), Error> {
577        let Some(Pending::Verified(digest, acks)) = self.pending.get(&height) else {
578            // The height may already be confirmed; continue silently if so
579            return Ok(());
580        };
581
582        // Get our signature
583        let scheme = self.scheme(self.epoch)?;
584        let Some(signer) = scheme.me() else {
585            return Err(Error::NotSigner(self.epoch));
586        };
587        let ack = acks
588            .get(&self.epoch)
589            .and_then(|acks| acks.get(&signer).cloned());
590        let ack = match ack {
591            Some(ack) => ack,
592            None => self.sign_ack(height, *digest).await?,
593        };
594
595        // Reinsert the height with a new deadline
596        self.rebroadcast_deadlines
597            .put(height, self.context.current() + self.rebroadcast_timeout);
598
599        // Broadcast the ack to all peers
600        self.broadcast(ack, sender).await
601    }
602
603    // ---------- Validation ----------
604
605    /// Takes a raw ack (from sender) from the p2p network and validates it.
606    ///
607    /// Returns an error if the ack is invalid.
608    fn validate_ack(
609        &mut self,
610        ack: &Ack<P::Scheme, D>,
611        sender: &<P::Scheme as Scheme>::PublicKey,
612    ) -> Result<(), Error> {
613        // Validate epoch
614        {
615            let (eb_lo, eb_hi) = self.epoch_bounds;
616            let bound_lo = self.epoch.saturating_sub(eb_lo);
617            let bound_hi = self.epoch.saturating_add(eb_hi);
618            if ack.epoch < bound_lo || ack.epoch > bound_hi {
619                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
620            }
621        }
622
623        // Validate sender matches the signer
624        let scheme = self.scheme(ack.epoch)?;
625        let participants = scheme.participants();
626        let Some(signer) = participants.index(sender) else {
627            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
628        };
629        if signer != ack.attestation.signer {
630            return Err(Error::PeerMismatch);
631        }
632
633        // Collect acks below the tip (if we don't yet have a certificate)
634        let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
635        if ack.item.height < activity_threshold {
636            return Err(Error::AckCertified(ack.item.height));
637        }
638
639        // If the height is above the tip (and the window), ignore for now
640        if ack
641            .item
642            .height
643            .delta_from(self.tip)
644            .is_some_and(|d| d >= self.window)
645        {
646            return Err(Error::AckHeight(ack.item.height));
647        }
648
649        // Validate that we don't already have the ack
650        if self.confirmed.contains_key(&ack.item.height) {
651            return Err(Error::AckCertified(ack.item.height));
652        }
653        let have_ack = match self.pending.get(&ack.item.height) {
654            None => false,
655            Some(Pending::Unverified(epoch_map)) => epoch_map
656                .get(&ack.epoch)
657                .is_some_and(|acks| acks.contains_key(&ack.attestation.signer)),
658            Some(Pending::Verified(digest, epoch_map)) => {
659                // While we check this in the `handle_ack` function, checking early here avoids an
660                // unnecessary signature check.
661                if ack.item.digest != *digest {
662                    return Err(Error::AckDigest(ack.item.height));
663                }
664                epoch_map
665                    .get(&ack.epoch)
666                    .is_some_and(|acks| acks.contains_key(&ack.attestation.signer))
667            }
668        };
669        if have_ack {
670            return Err(Error::AckDuplicate(sender.to_string(), ack.item.height));
671        }
672
673        // Validate signature
674        if !ack.verify(&mut self.context, &*scheme, &self.strategy) {
675            return Err(Error::InvalidAckSignature);
676        }
677
678        Ok(())
679    }
680
681    // ---------- Helpers ----------
682
683    /// Requests the digest from the automaton.
684    ///
685    /// Pending must contain the height.
686    fn get_digest(&mut self, height: Height) {
687        assert!(self.pending.contains_key(&height));
688        let mut automaton = self.automaton.clone();
689        let timer = self.metrics.digest_duration.timer();
690        self.digest_requests.push(async move {
691            let receiver = automaton.propose(height).await;
692            let result = receiver.await.map_err(Error::AppProposeCanceled);
693            DigestRequest {
694                height,
695                result,
696                timer,
697            }
698        });
699    }
700
701    /// Signs an ack for the given height, and digest. Stores the ack in the journal and returns it.
702    /// Returns an error if the share is unknown at the current epoch.
703    async fn sign_ack(&mut self, height: Height, digest: D) -> Result<Ack<P::Scheme, D>, Error> {
704        let scheme = self.scheme(self.epoch)?;
705        if scheme.me().is_none() {
706            return Err(Error::NotSigner(self.epoch));
707        }
708
709        // Sign the item
710        let item = Item { height, digest };
711        let ack = Ack::sign(&*scheme, self.epoch, item).ok_or(Error::NotSigner(self.epoch))?;
712
713        // Journal the ack
714        self.record(Activity::Ack(ack.clone())).await;
715        self.sync(height).await;
716
717        Ok(ack)
718    }
719
720    /// Broadcasts an ack to all peers with the appropriate priority.
721    ///
722    /// Returns an error if the sender returns an error.
723    async fn broadcast(
724        &mut self,
725        ack: Ack<P::Scheme, D>,
726        sender: &mut WrappedSender<
727            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
728            TipAck<P::Scheme, D>,
729        >,
730    ) -> Result<(), Error> {
731        sender
732            .send(
733                Recipients::All,
734                TipAck { ack, tip: self.tip },
735                self.priority_acks,
736            )
737            .await
738            .map_err(|err| {
739                warn!(?err, "failed to send ack");
740                Error::UnableToSendMessage
741            })?;
742        Ok(())
743    }
744
745    /// Returns the next height that we should process. This is the minimum height for
746    /// which we do not have a digest or an outstanding request to the automaton for the digest.
747    fn next(&self) -> Height {
748        let max_pending = self
749            .pending
750            .last_key_value()
751            .map(|(k, _)| k.next())
752            .unwrap_or_default();
753        let max_confirmed = self
754            .confirmed
755            .last_key_value()
756            .map(|(k, _)| k.next())
757            .unwrap_or_default();
758        max(self.tip, max(max_pending, max_confirmed))
759    }
760
761    /// Increases the tip to the given value, pruning stale entries.
762    ///
763    /// # Panics
764    ///
765    /// Panics if the given tip is less-than-or-equal-to the current tip.
766    async fn fast_forward_tip(&mut self, tip: Height) {
767        assert!(tip > self.tip);
768
769        // Prune data structures with buffer to prevent losing certificates
770        let activity_threshold = tip.saturating_sub(self.activity_timeout);
771        self.pending
772            .retain(|height, _| *height >= activity_threshold);
773        self.confirmed
774            .retain(|height, _| *height >= activity_threshold);
775
776        // Add tip to journal
777        self.record(Activity::Tip(tip)).await;
778        self.sync(tip).await;
779        self.reporter.report(Activity::Tip(tip)).await;
780
781        // Prune journal with buffer, ignoring errors
782        let section = self.get_journal_section(activity_threshold);
783        let journal = self.journal.as_mut().expect("journal must be initialized");
784        let _ = journal.prune(section).await;
785
786        // Update the tip
787        self.tip = tip;
788    }
789
790    // ---------- Journal ----------
791
792    /// Returns the section of the journal for the given `height`.
793    const fn get_journal_section(&self, height: Height) -> u64 {
794        height.get() / self.journal_heights_per_section.get()
795    }
796
797    /// Replays the journal, updating the state of the engine.
798    /// Returns a list of unverified pending heights that need digest requests.
799    async fn replay(&mut self, journal: &Journal<E, Activity<P::Scheme, D>>) -> Vec<Height> {
800        let mut tip = Height::default();
801        let mut certified = Vec::new();
802        let mut acks = Vec::new();
803        let stream = journal
804            .replay(0, 0, self.journal_replay_buffer)
805            .await
806            .expect("replay failed");
807        pin_mut!(stream);
808        while let Some(msg) = stream.next().await {
809            let (_, _, _, activity) = msg.expect("replay failed");
810            match activity {
811                Activity::Tip(height) => {
812                    tip = max(tip, height);
813                    self.reporter.report(Activity::Tip(height)).await;
814                }
815                Activity::Certified(certificate) => {
816                    certified.push(certificate.clone());
817                    self.reporter.report(Activity::Certified(certificate)).await;
818                }
819                Activity::Ack(ack) => {
820                    acks.push(ack.clone());
821                    self.reporter.report(Activity::Ack(ack)).await;
822                }
823            }
824        }
825
826        // Update the tip to the highest height in the journal
827        self.tip = tip;
828        let activity_threshold = tip.saturating_sub(self.activity_timeout);
829
830        // Add certified items
831        certified
832            .iter()
833            .filter(|certificate| certificate.item.height >= activity_threshold)
834            .for_each(|certificate| {
835                self.confirmed
836                    .insert(certificate.item.height, certificate.clone());
837            });
838
839        // Group acks by height
840        let mut acks_by_height: BTreeMap<Height, Vec<Ack<P::Scheme, D>>> = BTreeMap::new();
841        for ack in acks {
842            if ack.item.height >= activity_threshold
843                && !self.confirmed.contains_key(&ack.item.height)
844            {
845                acks_by_height.entry(ack.item.height).or_default().push(ack);
846            }
847        }
848
849        // Process each height's acks
850        let mut unverified = Vec::new();
851        for (height, mut acks_group) in acks_by_height {
852            // Check if we have our own ack (which means we've verified the digest)
853            let current_scheme = self.scheme(self.epoch).ok();
854            let our_signer = current_scheme.as_ref().and_then(|s| s.me());
855            let our_digest = our_signer.and_then(|signer| {
856                acks_group
857                    .iter()
858                    .find(|ack| ack.epoch == self.epoch && ack.attestation.signer == signer)
859                    .map(|ack| ack.item.digest)
860            });
861
862            // If our_digest exists, delete everything from acks_group that doesn't match it
863            if let Some(digest) = our_digest {
864                acks_group.retain(|other| other.item.digest == digest);
865            }
866
867            // Create a new epoch map
868            let mut epoch_map = BTreeMap::new();
869            for ack in acks_group {
870                epoch_map
871                    .entry(ack.epoch)
872                    .or_insert_with(BTreeMap::new)
873                    .insert(ack.attestation.signer, ack);
874            }
875
876            // Insert as Verified if we have our own ack (meaning we verified the digest),
877            // otherwise as Unverified
878            match our_digest {
879                Some(digest) => {
880                    self.pending
881                        .insert(height, Pending::Verified(digest, epoch_map));
882
883                    // If we've already generated an ack and it isn't yet confirmed, mark for immediate rebroadcast
884                    self.rebroadcast_deadlines
885                        .put(height, self.context.current());
886                }
887                None => {
888                    self.pending.insert(height, Pending::Unverified(epoch_map));
889
890                    // Add to unverified heights
891                    unverified.push(height);
892                }
893            }
894        }
895
896        // After replay, ensure we have all heights from tip to next in pending or confirmed
897        // to handle the case where we restart and some heights have no acks yet
898        let next = self.next();
899        for height in Height::range(self.tip, next) {
900            // If we already have the height in pending or confirmed, skip
901            if self.pending.contains_key(&height) || self.confirmed.contains_key(&height) {
902                continue;
903            }
904
905            // Add missing height to pending
906            self.pending
907                .insert(height, Pending::Unverified(BTreeMap::new()));
908            unverified.push(height);
909        }
910        info!(tip = %self.tip, %next, ?unverified, "replayed journal");
911
912        unverified
913    }
914
915    /// Appends an activity to the journal.
916    async fn record(&mut self, activity: Activity<P::Scheme, D>) {
917        let height = match activity {
918            Activity::Ack(ref ack) => ack.item.height,
919            Activity::Certified(ref certificate) => certificate.item.height,
920            Activity::Tip(h) => h,
921        };
922        let section = self.get_journal_section(height);
923        self.journal
924            .as_mut()
925            .expect("journal must be initialized")
926            .append(section, activity)
927            .await
928            .expect("unable to append to journal");
929    }
930
931    /// Syncs (ensures all data is written to disk).
932    async fn sync(&mut self, height: Height) {
933        let section = self.get_journal_section(height);
934        let journal = self.journal.as_mut().expect("journal must be initialized");
935        journal.sync(section).await.expect("unable to sync journal");
936    }
937}