commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Error, Index, Item, TipAck},
7    Config,
8};
9use crate::{
10    aggregation::{scheme, types::Certificate},
11    types::{Epoch, EpochDelta},
12    Automaton, Monitor, Reporter,
13};
14use commonware_cryptography::{
15    certificate::{Provider, Scheme},
16    Digest,
17};
18use commonware_macros::select;
19use commonware_p2p::{
20    utils::codec::{wrap, WrappedSender},
21    Blocker, Receiver, Recipients, Sender,
22};
23use commonware_runtime::{
24    buffer::PoolRef,
25    spawn_cell,
26    telemetry::metrics::{
27        histogram,
28        status::{CounterExt, GaugeExt, Status},
29    },
30    Clock, ContextCell, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
33use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, PrioritySet};
34use futures::{
35    future::{self, Either},
36    pin_mut, StreamExt,
37};
38use std::{
39    cmp::max,
40    collections::BTreeMap,
41    num::NonZeroUsize,
42    sync::Arc,
43    time::{Duration, SystemTime},
44};
45use tracing::{debug, error, info, trace, warn};
46
47/// An entry for an index that does not yet have a certificate.
48enum Pending<S: Scheme, D: Digest> {
49    /// The automaton has not yet provided the digest for this index.
50    /// The signatures may have arbitrary digests.
51    Unverified(BTreeMap<Epoch, BTreeMap<u32, Ack<S, D>>>),
52
53    /// Verified by the automaton. Now stores the digest.
54    Verified(D, BTreeMap<Epoch, BTreeMap<u32, Ack<S, D>>>),
55}
56
57/// The type returned by the `pending` pool, used by the application to return which digest is
58/// associated with the given index.
59struct DigestRequest<D: Digest, E: Clock> {
60    /// The index in question.
61    index: Index,
62
63    /// The result of the verification.
64    result: Result<D, Error>,
65
66    /// Records the time taken to get the digest.
67    timer: histogram::Timer<E>,
68}
69
70/// Instance of the engine.
71pub struct Engine<
72    E: Clock + Spawner + Storage + Metrics,
73    P: Provider<Scope = Epoch>,
74    D: Digest,
75    A: Automaton<Context = Index, Digest = D> + Clone,
76    Z: Reporter<Activity = Activity<P::Scheme, D>>,
77    M: Monitor<Index = Epoch>,
78    B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
79> {
80    // ---------- Interfaces ----------
81    context: ContextCell<E>,
82    automaton: A,
83    monitor: M,
84    provider: P,
85    reporter: Z,
86    blocker: B,
87
88    // ---------- Namespace Constants ----------
89    /// The namespace signatures.
90    namespace: Vec<u8>,
91
92    // Pruning
93    /// A tuple representing the epochs to keep in memory.
94    /// The first element is the number of old epochs to keep.
95    /// The second element is the number of future epochs to accept.
96    ///
97    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
98    /// epochs 9, 10, 11, and 12 are kept (and accepted);
99    /// all others are pruned or rejected.
100    epoch_bounds: (EpochDelta, EpochDelta),
101
102    /// The concurrent number of chunks to process.
103    window: u64,
104
105    /// Number of indices to track below the tip when collecting acks and/or pruning.
106    activity_timeout: u64,
107
108    // Messaging
109    /// Pool of pending futures to request a digest from the automaton.
110    digest_requests: FuturesPool<DigestRequest<D, E>>,
111
112    // State
113    /// The current epoch.
114    epoch: Epoch,
115
116    /// The current tip.
117    tip: Index,
118
119    /// Tracks the tips of all validators.
120    safe_tip: SafeTip<<P::Scheme as Scheme>::PublicKey>,
121
122    /// The keys represent the set of all `Index` values for which we are attempting to form a
123    /// certificate, but do not yet have one. Values may be [Pending::Unverified] or [Pending::Verified],
124    /// depending on whether the automaton has verified the digest or not.
125    pending: BTreeMap<Index, Pending<P::Scheme, D>>,
126
127    /// A map of indices with a certificate. Cached in memory if needed to send to other peers.
128    confirmed: BTreeMap<Index, Certificate<P::Scheme, D>>,
129
130    // ---------- Rebroadcasting ----------
131    /// The frequency at which to rebroadcast pending indices.
132    rebroadcast_timeout: Duration,
133
134    /// A set of deadlines for rebroadcasting `Index` values that do not have a certificate.
135    rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
136
137    // ---------- Journal ----------
138    /// Journal for storing acks signed by this node.
139    journal: Option<Journal<E, Activity<P::Scheme, D>>>,
140    journal_partition: String,
141    journal_write_buffer: NonZeroUsize,
142    journal_replay_buffer: NonZeroUsize,
143    journal_heights_per_section: u64,
144    journal_compression: Option<u8>,
145    journal_buffer_pool: PoolRef,
146
147    // ---------- Network ----------
148    /// Whether to send acks as priority messages.
149    priority_acks: bool,
150
151    // ---------- Metrics ----------
152    /// Metrics
153    metrics: metrics::Metrics<E>,
154}
155
156impl<
157        E: Clock + Spawner + Storage + Metrics,
158        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
159        D: Digest,
160        A: Automaton<Context = Index, Digest = D> + Clone,
161        Z: Reporter<Activity = Activity<P::Scheme, D>>,
162        M: Monitor<Index = Epoch>,
163        B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
164    > Engine<E, P, D, A, Z, M, B>
165{
166    /// Creates a new engine with the given context and configuration.
167    pub fn new(context: E, cfg: Config<P, D, A, Z, M, B>) -> Self {
168        // TODO(#1833): Metrics should use the post-start context
169        let metrics = metrics::Metrics::init(context.clone());
170
171        Self {
172            context: ContextCell::new(context),
173            automaton: cfg.automaton,
174            reporter: cfg.reporter,
175            monitor: cfg.monitor,
176            provider: cfg.provider,
177            blocker: cfg.blocker,
178            namespace: cfg.namespace,
179            epoch_bounds: cfg.epoch_bounds,
180            window: cfg.window.into(),
181            activity_timeout: cfg.activity_timeout,
182            epoch: Epoch::zero(),
183            tip: 0,
184            safe_tip: SafeTip::default(),
185            digest_requests: FuturesPool::default(),
186            pending: BTreeMap::new(),
187            confirmed: BTreeMap::new(),
188            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
189            rebroadcast_deadlines: PrioritySet::new(),
190            journal: None,
191            journal_partition: cfg.journal_partition,
192            journal_write_buffer: cfg.journal_write_buffer,
193            journal_replay_buffer: cfg.journal_replay_buffer,
194            journal_heights_per_section: cfg.journal_heights_per_section.into(),
195            journal_compression: cfg.journal_compression,
196            journal_buffer_pool: cfg.journal_buffer_pool,
197            priority_acks: cfg.priority_acks,
198            metrics,
199        }
200    }
201
202    /// Gets the scheme for a given epoch, returning an error if unavailable.
203    fn scheme(&self, epoch: Epoch) -> Result<Arc<P::Scheme>, Error> {
204        self.provider
205            .scoped(epoch)
206            .ok_or(Error::UnknownEpoch(epoch))
207    }
208
209    /// Runs the engine until the context is stopped.
210    ///
211    /// The engine will handle:
212    /// - Requesting and processing digests from the automaton
213    /// - Timeouts
214    ///   - Refreshing the Epoch
215    ///   - Rebroadcasting Acks
216    /// - Messages from the network:
217    ///   - Acks from other validators
218    pub fn start(
219        mut self,
220        network: (
221            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
222            impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
223        ),
224    ) -> Handle<()> {
225        spawn_cell!(self.context, self.run(network).await)
226    }
227
228    /// Inner run loop called by `start`.
229    async fn run(
230        mut self,
231        network: (
232            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
233            impl Receiver<PublicKey = <P::Scheme as Scheme>::PublicKey>,
234        ),
235    ) {
236        let (mut sender, mut receiver) = wrap((), network.0, network.1);
237        let mut shutdown = self.context.stopped();
238
239        // Initialize the epoch
240        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
241        self.epoch = latest;
242
243        // Initialize Journal
244        let journal_cfg = JConfig {
245            partition: self.journal_partition.clone(),
246            compression: self.journal_compression,
247            codec_config: P::Scheme::certificate_codec_config_unbounded(),
248            buffer_pool: self.journal_buffer_pool.clone(),
249            write_buffer: self.journal_write_buffer,
250        };
251        let journal = Journal::init(
252            self.context.with_label("journal").into_present(),
253            journal_cfg,
254        )
255        .await
256        .expect("init failed");
257        let unverified_indices = self.replay(&journal).await;
258        self.journal = Some(journal);
259
260        // Request digests for unverified indices
261        for index in unverified_indices {
262            trace!(index, "requesting digest for unverified index from replay");
263            self.get_digest(index);
264        }
265
266        // Initialize the tip manager
267        let scheme = self
268            .scheme(self.epoch)
269            .expect("current epoch scheme must exist");
270        self.safe_tip.init(scheme.participants());
271
272        loop {
273            let _ = self.metrics.tip.try_set(self.tip);
274
275            // Propose a new digest if we are processing less than the window
276            let next = self.next();
277
278            // Underflow safe: next >= self.tip is guaranteed by next()
279            if next - self.tip < self.window {
280                trace!(next, "requesting new digest");
281                assert!(self
282                    .pending
283                    .insert(next, Pending::Unverified(BTreeMap::new()))
284                    .is_none());
285                self.get_digest(next);
286                continue;
287            }
288
289            // Get the rebroadcast deadline for the next index
290            let rebroadcast = match self.rebroadcast_deadlines.peek() {
291                Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
292                None => Either::Right(future::pending()),
293            };
294
295            // Process the next event
296            select! {
297                // Handle shutdown signal
298                _ = &mut shutdown => {
299                    debug!("shutdown");
300                    break;
301                },
302
303                // Handle refresh epoch deadline
304                epoch = epoch_updates.next() => {
305                    // Error handling
306                    let Some(epoch) = epoch else {
307                        error!("epoch subscription failed");
308                        break;
309                    };
310
311                    // Refresh the epoch
312                    debug!(current = %self.epoch, new = %epoch, "refresh epoch");
313                    assert!(epoch >= self.epoch);
314                    self.epoch = epoch;
315
316                    // Update the tip manager
317                    let scheme = self.scheme(self.epoch)
318                        .expect("current epoch scheme must exist");
319                    self.safe_tip.reconcile(scheme.participants());
320
321                    // Update data structures by purging old epochs
322                    let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
323                    self.pending.iter_mut().for_each(|(_, pending)| {
324                        match pending {
325                            Pending::Unverified(acks) => {
326                                acks.retain(|epoch, _| *epoch >= min_epoch);
327                            }
328                            Pending::Verified(_, acks) => {
329                                acks.retain(|epoch, _| *epoch >= min_epoch);
330                            }
331                        }
332                    });
333
334                    continue;
335                },
336
337                // Sign a new ack
338                request = self.digest_requests.next_completed() => {
339                    let DigestRequest { index, result, timer } = request;
340                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
341                    match result {
342                        Err(err) => {
343                            warn!(?err, ?index, "automaton returned error");
344                            self.metrics.digest.inc(Status::Dropped);
345                        }
346                        Ok(digest) => {
347                            if let Err(err) = self.handle_digest(index, digest, &mut sender).await {
348                                debug!(?err, ?index, "handle_digest failed");
349                                continue;
350                            }
351                        }
352                    }
353                },
354
355                // Handle incoming acks
356                msg = receiver.recv() => {
357                    // Error handling
358                    let (sender, msg) = match msg {
359                        Ok(r) => r,
360                        Err(err) => {
361                            warn!(?err, "ack receiver failed");
362                            break;
363                        }
364                    };
365                    let mut guard = self.metrics.acks.guard(Status::Invalid);
366                    let TipAck { ack, tip } = match msg {
367                        Ok(peer_ack) => peer_ack,
368                        Err(err) => {
369                            warn!(?err, ?sender, "ack decode failed, blocking peer");
370                            self.blocker.block(sender).await;
371                            continue;
372                        }
373                    };
374
375                    // Update the tip manager
376                    if self.safe_tip.update(sender.clone(), tip).is_some() {
377                        // Fast-forward our tip if needed
378                        let safe_tip = self.safe_tip.get();
379                        if safe_tip > self.tip {
380                           self.fast_forward_tip(safe_tip).await;
381                        }
382                    }
383
384                    // Validate that we need to process the ack
385                    if let Err(err) = self.validate_ack(&ack, &sender) {
386                        if err.blockable() {
387                            warn!(?sender, ?err, "blocking peer for validation failure");
388                            self.blocker.block(sender).await;
389                        } else {
390                            debug!(?sender, ?err, "ack validate failed");
391                        }
392                        continue;
393                    };
394
395                    // Handle the ack
396                    if let Err(err) = self.handle_ack(&ack).await {
397                        debug!(?err, ?sender, "ack handle failed");
398                        guard.set(Status::Failure);
399                        continue;
400                    }
401
402                    // Update the metrics
403                    debug!(?sender, epoch = %ack.epoch, index = ack.item.index, "ack");
404                    guard.set(Status::Success);
405                },
406
407                // Rebroadcast
408                _ = rebroadcast => {
409                    // Get the next index to rebroadcast
410                    let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
411                    trace!("rebroadcasting: index {}", index);
412                    if let Err(err) = self.handle_rebroadcast(index, &mut sender).await {
413                        warn!(?err, ?index, "rebroadcast failed");
414                    };
415                }
416            }
417        }
418
419        // Close journal on shutdown
420        if let Some(journal) = self.journal.take() {
421            journal
422                .close()
423                .await
424                .expect("unable to close aggregation journal");
425        }
426    }
427
428    // ---------- Handling ----------
429
430    /// Handles a digest returned by the automaton.
431    async fn handle_digest(
432        &mut self,
433        index: Index,
434        digest: D,
435        sender: &mut WrappedSender<
436            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
437            TipAck<P::Scheme, D>,
438        >,
439    ) -> Result<(), Error> {
440        // Entry must be `Pending::Unverified`, or return early
441        if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
442            return Err(Error::AckIndex(index));
443        };
444
445        // Move the entry to `Pending::Verified`
446        let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
447            panic!("Pending::Unverified entry not found");
448        };
449        self.pending
450            .insert(index, Pending::Verified(digest, BTreeMap::new()));
451
452        // Handle each `ack` as if it was received over the network. This inserts the values into
453        // the new map, and may form a certificate if enough acks are present. Only process acks
454        // that match the verified digest.
455        for epoch_acks in acks.values() {
456            for epoch_ack in epoch_acks.values() {
457                // Drop acks that don't match the verified digest
458                if epoch_ack.item.digest != digest {
459                    continue;
460                }
461
462                // Handle the ack
463                let _ = self.handle_ack(epoch_ack).await;
464            }
465            // Break early if a certificate was formed
466            if self.confirmed.contains_key(&index) {
467                break;
468            }
469        }
470
471        // Sign my own ack
472        let ack = self.sign_ack(index, digest).await?;
473
474        // Set the rebroadcast deadline for this index
475        self.rebroadcast_deadlines
476            .put(index, self.context.current() + self.rebroadcast_timeout);
477
478        // Handle ack as if it was received over the network
479        let _ = self.handle_ack(&ack).await;
480
481        // Send ack over the network.
482        self.broadcast(ack, sender).await?;
483
484        Ok(())
485    }
486
487    /// Handles an ack.
488    ///
489    /// Returns an error if the ack is invalid, or can be ignored (e.g. already exists, certificate
490    /// already exists, is outside the epoch bounds, etc.).
491    async fn handle_ack(&mut self, ack: &Ack<P::Scheme, D>) -> Result<(), Error> {
492        // Get the quorum (from scheme participants for the ack's epoch)
493        let scheme = self.scheme(ack.epoch)?;
494        let quorum = scheme.participants().quorum();
495
496        // Get the acks and check digest consistency
497        let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
498            None => {
499                // If the index is not in the pending pool, it may be confirmed
500                // (i.e. we have a certificate for it).
501                return Err(Error::AckIndex(ack.item.index));
502            }
503            Some(Pending::Unverified(acks)) => acks,
504            Some(Pending::Verified(digest, acks)) => {
505                // If we have a verified digest, ensure the ack matches it
506                if ack.item.digest != *digest {
507                    return Err(Error::AckDigest(ack.item.index));
508                }
509                acks
510            }
511        };
512
513        // Add the attestation (if not already present)
514        let acks = acks_by_epoch.entry(ack.epoch).or_default();
515        if acks.contains_key(&ack.attestation.signer) {
516            return Ok(());
517        }
518        acks.insert(ack.attestation.signer, ack.clone());
519
520        // If there exists a quorum of acks with the same digest (or for the verified digest if it exists), form a certificate
521        let filtered = acks
522            .values()
523            .filter(|a| a.item.digest == ack.item.digest)
524            .collect::<Vec<_>>();
525        if filtered.len() >= quorum as usize {
526            if let Some(certificate) = Certificate::from_acks(&*scheme, filtered) {
527                self.metrics.certificates.inc();
528                self.handle_certificate(certificate).await;
529            }
530        }
531
532        Ok(())
533    }
534
535    /// Handles a certificate.
536    async fn handle_certificate(&mut self, certificate: Certificate<P::Scheme, D>) {
537        // Check if we already have the certificate
538        let index = certificate.item.index;
539        if self.confirmed.contains_key(&index) {
540            return;
541        }
542
543        // Store the certificate
544        self.confirmed.insert(index, certificate.clone());
545
546        // Journal and notify the automaton
547        let certified = Activity::Certified(certificate);
548        self.record(certified.clone()).await;
549        self.sync(index).await;
550        self.reporter.report(certified).await;
551
552        // Increase the tip if needed
553        if index == self.tip {
554            // Compute the next tip
555            let mut new_tip = index.saturating_add(1);
556            while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
557                new_tip = new_tip.saturating_add(1);
558            }
559
560            // If the next tip is larger, try to fast-forward the tip (may not be possible)
561            if new_tip > self.tip {
562                self.fast_forward_tip(new_tip).await;
563            }
564        }
565    }
566
567    /// Handles a rebroadcast request for the given index.
568    async fn handle_rebroadcast(
569        &mut self,
570        index: Index,
571        sender: &mut WrappedSender<
572            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
573            TipAck<P::Scheme, D>,
574        >,
575    ) -> Result<(), Error> {
576        let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
577            // The index may already be confirmed; continue silently if so
578            return Ok(());
579        };
580
581        // Get our signature
582        let scheme = self.scheme(self.epoch)?;
583        let Some(signer) = scheme.me() else {
584            return Err(Error::NotSigner(self.epoch));
585        };
586        let ack = acks
587            .get(&self.epoch)
588            .and_then(|acks| acks.get(&signer).cloned());
589        let ack = match ack {
590            Some(ack) => ack,
591            None => self.sign_ack(index, *digest).await?,
592        };
593
594        // Reinsert the index with a new deadline
595        self.rebroadcast_deadlines
596            .put(index, self.context.current() + self.rebroadcast_timeout);
597
598        // Broadcast the ack to all peers
599        self.broadcast(ack, sender).await
600    }
601
602    // ---------- Validation ----------
603
604    /// Takes a raw ack (from sender) from the p2p network and validates it.
605    ///
606    /// Returns an error if the ack is invalid.
607    fn validate_ack(
608        &self,
609        ack: &Ack<P::Scheme, D>,
610        sender: &<P::Scheme as Scheme>::PublicKey,
611    ) -> Result<(), Error> {
612        // Validate epoch
613        {
614            let (eb_lo, eb_hi) = self.epoch_bounds;
615            let bound_lo = self.epoch.saturating_sub(eb_lo);
616            let bound_hi = self.epoch.saturating_add(eb_hi);
617            if ack.epoch < bound_lo || ack.epoch > bound_hi {
618                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
619            }
620        }
621
622        // Validate sender matches the signer
623        let scheme = self.scheme(ack.epoch)?;
624        let participants = scheme.participants();
625        let Some(signer) = participants.index(sender) else {
626            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
627        };
628        if signer != ack.attestation.signer {
629            return Err(Error::PeerMismatch);
630        }
631
632        // Collect acks below the tip (if we don't yet have a certificate)
633        let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
634        if ack.item.index < activity_threshold {
635            return Err(Error::AckCertified(ack.item.index));
636        }
637
638        // If the index is above the tip (and the window), ignore for now
639        if ack.item.index.saturating_sub(self.tip) >= self.window {
640            return Err(Error::AckIndex(ack.item.index));
641        }
642
643        // Validate that we don't already have the ack
644        if self.confirmed.contains_key(&ack.item.index) {
645            return Err(Error::AckCertified(ack.item.index));
646        }
647        let have_ack = match self.pending.get(&ack.item.index) {
648            None => false,
649            Some(Pending::Unverified(epoch_map)) => epoch_map
650                .get(&ack.epoch)
651                .is_some_and(|acks| acks.contains_key(&ack.attestation.signer)),
652            Some(Pending::Verified(digest, epoch_map)) => {
653                // While we check this in the `handle_ack` function, checking early here avoids an
654                // unnecessary signature check.
655                if ack.item.digest != *digest {
656                    return Err(Error::AckDigest(ack.item.index));
657                }
658                epoch_map
659                    .get(&ack.epoch)
660                    .is_some_and(|acks| acks.contains_key(&ack.attestation.signer))
661            }
662        };
663        if have_ack {
664            return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
665        }
666
667        // Validate signature
668        if !ack.verify(&*scheme, &self.namespace) {
669            return Err(Error::InvalidAckSignature);
670        }
671
672        Ok(())
673    }
674
675    // ---------- Helpers ----------
676
677    /// Requests the digest from the automaton.
678    ///
679    /// Pending must contain the index.
680    fn get_digest(&mut self, index: Index) {
681        assert!(self.pending.contains_key(&index));
682        let mut automaton = self.automaton.clone();
683        let timer = self.metrics.digest_duration.timer();
684        self.digest_requests.push(async move {
685            let receiver = automaton.propose(index).await;
686            let result = receiver.await.map_err(Error::AppProposeCanceled);
687            DigestRequest {
688                index,
689                result,
690                timer,
691            }
692        });
693    }
694
695    /// Signs an ack for the given index, and digest. Stores the ack in the journal and returns it.
696    /// Returns an error if the share is unknown at the current epoch.
697    async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<P::Scheme, D>, Error> {
698        let scheme = self.scheme(self.epoch)?;
699        if scheme.me().is_none() {
700            return Err(Error::NotSigner(self.epoch));
701        }
702
703        // Sign the item
704        let item = Item { index, digest };
705        let ack = Ack::sign(&*scheme, &self.namespace, self.epoch, item)
706            .ok_or(Error::NotSigner(self.epoch))?;
707
708        // Journal the ack
709        self.record(Activity::Ack(ack.clone())).await;
710        self.sync(index).await;
711
712        Ok(ack)
713    }
714
715    /// Broadcasts an ack to all peers with the appropriate priority.
716    ///
717    /// Returns an error if the sender returns an error.
718    async fn broadcast(
719        &mut self,
720        ack: Ack<P::Scheme, D>,
721        sender: &mut WrappedSender<
722            impl Sender<PublicKey = <P::Scheme as Scheme>::PublicKey>,
723            TipAck<P::Scheme, D>,
724        >,
725    ) -> Result<(), Error> {
726        sender
727            .send(
728                Recipients::All,
729                TipAck { ack, tip: self.tip },
730                self.priority_acks,
731            )
732            .await
733            .map_err(|err| {
734                warn!(?err, "failed to send ack");
735                Error::UnableToSendMessage
736            })?;
737        Ok(())
738    }
739
740    /// Returns the next index that we should process. This is the minimum index for
741    /// which we do not have a digest or an outstanding request to the automaton for the digest.
742    fn next(&self) -> Index {
743        let max_pending = self
744            .pending
745            .last_key_value()
746            .map(|(k, _)| k.saturating_add(1))
747            .unwrap_or_default();
748        let max_confirmed = self
749            .confirmed
750            .last_key_value()
751            .map(|(k, _)| k.saturating_add(1))
752            .unwrap_or_default();
753        max(self.tip, max(max_pending, max_confirmed))
754    }
755
756    /// Increases the tip to the given value, pruning stale entries.
757    ///
758    /// # Panics
759    ///
760    /// Panics if the given tip is less-than-or-equal-to the current tip.
761    async fn fast_forward_tip(&mut self, tip: Index) {
762        assert!(tip > self.tip);
763
764        // Prune data structures with buffer to prevent losing certificates
765        let activity_threshold = tip.saturating_sub(self.activity_timeout);
766        self.pending.retain(|index, _| *index >= activity_threshold);
767        self.confirmed
768            .retain(|index, _| *index >= activity_threshold);
769
770        // Add tip to journal
771        self.record(Activity::Tip(tip)).await;
772        self.sync(tip).await;
773        self.reporter.report(Activity::Tip(tip)).await;
774
775        // Prune journal with buffer, ignoring errors
776        let section = self.get_journal_section(activity_threshold);
777        let journal = self.journal.as_mut().expect("journal must be initialized");
778        let _ = journal.prune(section).await;
779
780        // Update the tip
781        self.tip = tip;
782    }
783
784    // ---------- Journal ----------
785
786    /// Returns the section of the journal for the given `index`.
787    const fn get_journal_section(&self, index: Index) -> u64 {
788        index / self.journal_heights_per_section
789    }
790
791    /// Replays the journal, updating the state of the engine.
792    /// Returns a list of unverified pending indices that need digest requests.
793    async fn replay(&mut self, journal: &Journal<E, Activity<P::Scheme, D>>) -> Vec<Index> {
794        let mut tip = Index::default();
795        let mut certified = Vec::new();
796        let mut acks = Vec::new();
797        let stream = journal
798            .replay(0, 0, self.journal_replay_buffer)
799            .await
800            .expect("replay failed");
801        pin_mut!(stream);
802        while let Some(msg) = stream.next().await {
803            let (_, _, _, activity) = msg.expect("replay failed");
804            match activity {
805                Activity::Tip(index) => {
806                    tip = max(tip, index);
807                    self.reporter.report(Activity::Tip(index)).await;
808                }
809                Activity::Certified(certificate) => {
810                    certified.push(certificate.clone());
811                    self.reporter.report(Activity::Certified(certificate)).await;
812                }
813                Activity::Ack(ack) => {
814                    acks.push(ack.clone());
815                    self.reporter.report(Activity::Ack(ack)).await;
816                }
817            }
818        }
819
820        // Update the tip to the highest index in the journal
821        self.tip = tip;
822        let activity_threshold = tip.saturating_sub(self.activity_timeout);
823
824        // Add certified items
825        certified
826            .iter()
827            .filter(|certificate| certificate.item.index >= activity_threshold)
828            .for_each(|certificate| {
829                self.confirmed
830                    .insert(certificate.item.index, certificate.clone());
831            });
832
833        // Group acks by index
834        let mut acks_by_index: BTreeMap<Index, Vec<Ack<P::Scheme, D>>> = BTreeMap::new();
835        for ack in acks {
836            if ack.item.index >= activity_threshold && !self.confirmed.contains_key(&ack.item.index)
837            {
838                acks_by_index.entry(ack.item.index).or_default().push(ack);
839            }
840        }
841
842        // Process each index's acks
843        let mut unverified = Vec::new();
844        for (index, mut acks_group) in acks_by_index {
845            // Check if we have our own ack (which means we've verified the digest)
846            let current_scheme = self.scheme(self.epoch).ok();
847            let our_signer = current_scheme.as_ref().and_then(|s| s.me());
848            let our_digest = our_signer.and_then(|signer| {
849                acks_group
850                    .iter()
851                    .find(|ack| ack.epoch == self.epoch && ack.attestation.signer == signer)
852                    .map(|ack| ack.item.digest)
853            });
854
855            // If our_digest exists, delete everything from acks_group that doesn't match it
856            if let Some(digest) = our_digest {
857                acks_group.retain(|other| other.item.digest == digest);
858            }
859
860            // Create a new epoch map
861            let mut epoch_map = BTreeMap::new();
862            for ack in acks_group {
863                epoch_map
864                    .entry(ack.epoch)
865                    .or_insert_with(BTreeMap::new)
866                    .insert(ack.attestation.signer, ack);
867            }
868
869            // Insert as Verified if we have our own ack (meaning we verified the digest),
870            // otherwise as Unverified
871            match our_digest {
872                Some(digest) => {
873                    self.pending
874                        .insert(index, Pending::Verified(digest, epoch_map));
875
876                    // If we've already generated an ack and it isn't yet confirmed, mark for immediate rebroadcast
877                    self.rebroadcast_deadlines
878                        .put(index, self.context.current());
879                }
880                None => {
881                    self.pending.insert(index, Pending::Unverified(epoch_map));
882
883                    // Add to unverified indices
884                    unverified.push(index);
885                }
886            }
887        }
888
889        // After replay, ensure we have all indices from tip to next in pending or confirmed
890        // to handle the case where we restart and some indices have no acks yet
891        let next = self.next();
892        for index in self.tip..next {
893            // If we already have the index in pending or confirmed, skip
894            if self.pending.contains_key(&index) || self.confirmed.contains_key(&index) {
895                continue;
896            }
897
898            // Add missing index to pending
899            self.pending
900                .insert(index, Pending::Unverified(BTreeMap::new()));
901            unverified.push(index);
902        }
903        info!(self.tip, next, ?unverified, "replayed journal");
904
905        unverified
906    }
907
908    /// Appends an activity to the journal.
909    async fn record(&mut self, activity: Activity<P::Scheme, D>) {
910        let index = match activity {
911            Activity::Ack(ref ack) => ack.item.index,
912            Activity::Certified(ref certificate) => certificate.item.index,
913            Activity::Tip(index) => index,
914        };
915        let section = self.get_journal_section(index);
916        self.journal
917            .as_mut()
918            .expect("journal must be initialized")
919            .append(section, activity)
920            .await
921            .expect("unable to append to journal");
922    }
923
924    /// Syncs (ensures all data is written to disk).
925    async fn sync(&mut self, index: Index) {
926        let section = self.get_journal_section(index);
927        let journal = self.journal.as_mut().expect("journal must be initialized");
928        journal.sync(section).await.expect("unable to sync journal");
929    }
930}