commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Error, Index, Item, TipAck},
7    Config,
8};
9use crate::{
10    aggregation::types::Certificate, types::Epoch, Automaton, Monitor, Reporter,
11    ThresholdSupervisor,
12};
13use commonware_cryptography::{
14    bls12381::primitives::{group, ops::threshold_signature_recover, variant::Variant},
15    Digest, PublicKey,
16};
17use commonware_macros::select;
18use commonware_p2p::{
19    utils::codec::{wrap, WrappedSender},
20    Blocker, Receiver, Recipients, Sender,
21};
22use commonware_runtime::{
23    buffer::PoolRef,
24    spawn_cell,
25    telemetry::metrics::{
26        histogram,
27        status::{CounterExt, Status},
28    },
29    Clock, ContextCell, Handle, Metrics, Spawner, Storage,
30};
31use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
32use commonware_utils::{futures::Pool as FuturesPool, quorum_from_slice, PrioritySet};
33use futures::{
34    future::{self, Either},
35    pin_mut, StreamExt,
36};
37use std::{
38    cmp::max,
39    collections::BTreeMap,
40    num::NonZeroUsize,
41    time::{Duration, SystemTime},
42};
43use tracing::{debug, error, info, trace, warn};
44
45/// An entry for an index that does not yet have a threshold signature.
46enum Pending<V: Variant, D: Digest> {
47    /// The automaton has not yet provided the digest for this index.
48    /// The signatures may have arbitrary digests.
49    Unverified(BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
50
51    /// Verified by the automaton. Now stores the digest.
52    Verified(D, BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
53}
54
55/// The type returned by the `pending` pool, used by the application to return which digest is
56/// associated with the given index.
57struct DigestRequest<D: Digest, E: Clock> {
58    /// The index in question.
59    index: Index,
60
61    /// The result of the verification.
62    result: Result<D, Error>,
63
64    /// Records the time taken to get the digest.
65    timer: histogram::Timer<E>,
66}
67
68/// Instance of the engine.
69pub struct Engine<
70    E: Clock + Spawner + Storage + Metrics,
71    P: PublicKey,
72    V: Variant,
73    D: Digest,
74    A: Automaton<Context = Index, Digest = D> + Clone,
75    Z: Reporter<Activity = Activity<V, D>>,
76    M: Monitor<Index = Epoch>,
77    B: Blocker<PublicKey = P>,
78    TSu: ThresholdSupervisor<
79        Index = Epoch,
80        PublicKey = P,
81        Polynomial = Vec<V::Public>,
82        Share = group::Share,
83    >,
84> {
85    // ---------- Interfaces ----------
86    context: ContextCell<E>,
87    automaton: A,
88    monitor: M,
89    validators: TSu,
90    reporter: Z,
91    blocker: B,
92
93    // ---------- Namespace Constants ----------
94    /// The namespace signatures.
95    namespace: Vec<u8>,
96
97    // Pruning
98    /// A tuple representing the epochs to keep in memory.
99    /// The first element is the number of old epochs to keep.
100    /// The second element is the number of future epochs to accept.
101    ///
102    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
103    /// epochs 9, 10, 11, and 12 are kept (and accepted);
104    /// all others are pruned or rejected.
105    epoch_bounds: (u64, u64),
106
107    /// The concurrent number of chunks to process.
108    window: u64,
109
110    /// Number of indices to track below the tip when collecting acks and/or pruning.
111    activity_timeout: u64,
112
113    // Messaging
114    /// Pool of pending futures to request a digest from the automaton.
115    digest_requests: FuturesPool<DigestRequest<D, E>>,
116
117    // State
118    /// The current epoch.
119    epoch: Epoch,
120
121    /// The current tip.
122    tip: Index,
123
124    /// Tracks the tips of all validators.
125    safe_tip: SafeTip<P>,
126
127    /// The keys represent the set of all `Index` values for which we are attempting to form a
128    /// threshold signature, but do not yet have one. Values may be [Pending::Unverified] or
129    /// [Pending::Verified], depending on whether the automaton has verified the digest or not.
130    pending: BTreeMap<Index, Pending<V, D>>,
131
132    /// A map of indices with a threshold signature. Cached in memory if needed to send to other peers.
133    confirmed: BTreeMap<Index, Certificate<V, D>>,
134
135    // ---------- Rebroadcasting ----------
136    /// The frequency at which to rebroadcast pending indices.
137    rebroadcast_timeout: Duration,
138
139    /// A set of deadlines for rebroadcasting `Index` values that do not have a threshold signature.
140    rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
141
142    // ---------- Journal ----------
143    /// Journal for storing acks signed by this node.
144    journal: Option<Journal<E, Activity<V, D>>>,
145    journal_partition: String,
146    journal_write_buffer: NonZeroUsize,
147    journal_replay_buffer: NonZeroUsize,
148    journal_heights_per_section: u64,
149    journal_compression: Option<u8>,
150    journal_buffer_pool: PoolRef,
151
152    // ---------- Network ----------
153    /// Whether to send acks as priority messages.
154    priority_acks: bool,
155
156    // ---------- Metrics ----------
157    /// Metrics
158    metrics: metrics::Metrics<E>,
159}
160
161impl<
162        E: Clock + Spawner + Storage + Metrics,
163        P: PublicKey,
164        V: Variant,
165        D: Digest,
166        A: Automaton<Context = Index, Digest = D> + Clone,
167        Z: Reporter<Activity = Activity<V, D>>,
168        M: Monitor<Index = Epoch>,
169        B: Blocker<PublicKey = P>,
170        TSu: ThresholdSupervisor<
171            Index = Epoch,
172            PublicKey = P,
173            Polynomial = Vec<V::Public>,
174            Share = group::Share,
175        >,
176    > Engine<E, P, V, D, A, Z, M, B, TSu>
177{
178    /// Creates a new engine with the given context and configuration.
179    pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
180        // TODO(#1833): Metrics should use the post-start context
181        let metrics = metrics::Metrics::init(context.clone());
182
183        Self {
184            context: ContextCell::new(context),
185            automaton: cfg.automaton,
186            reporter: cfg.reporter,
187            monitor: cfg.monitor,
188            validators: cfg.validators,
189            blocker: cfg.blocker,
190            namespace: cfg.namespace,
191            epoch_bounds: cfg.epoch_bounds,
192            window: cfg.window.into(),
193            activity_timeout: cfg.activity_timeout,
194            epoch: 0,
195            tip: 0,
196            safe_tip: SafeTip::default(),
197            digest_requests: FuturesPool::default(),
198            pending: BTreeMap::new(),
199            confirmed: BTreeMap::new(),
200            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
201            rebroadcast_deadlines: PrioritySet::new(),
202            journal: None,
203            journal_partition: cfg.journal_partition,
204            journal_write_buffer: cfg.journal_write_buffer,
205            journal_replay_buffer: cfg.journal_replay_buffer,
206            journal_heights_per_section: cfg.journal_heights_per_section.into(),
207            journal_compression: cfg.journal_compression,
208            journal_buffer_pool: cfg.journal_buffer_pool,
209            priority_acks: cfg.priority_acks,
210            metrics,
211        }
212    }
213
214    /// Runs the engine until the context is stopped.
215    ///
216    /// The engine will handle:
217    /// - Requesting and processing digests from the automaton
218    /// - Timeouts
219    ///   - Refreshing the Epoch
220    ///   - Rebroadcasting Acks
221    /// - Messages from the network:
222    ///   - Acks from other validators
223    pub fn start(
224        mut self,
225        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
226    ) -> Handle<()> {
227        spawn_cell!(self.context, self.run(network).await)
228    }
229
230    /// Inner run loop called by `start`.
231    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
232        let (mut sender, mut receiver) = wrap((), network.0, network.1);
233        let mut shutdown = self.context.stopped();
234
235        // Initialize the epoch
236        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
237        self.epoch = latest;
238
239        // Initialize Journal
240        let journal_cfg = JConfig {
241            partition: self.journal_partition.clone(),
242            compression: self.journal_compression,
243            codec_config: (),
244            buffer_pool: self.journal_buffer_pool.clone(),
245            write_buffer: self.journal_write_buffer,
246        };
247        let journal = Journal::init(self.context.with_label("journal").into(), journal_cfg)
248            .await
249            .expect("init failed");
250        let unverified_indices = self.replay(&journal).await;
251        self.journal = Some(journal);
252
253        // Request digests for unverified indices
254        for index in unverified_indices {
255            trace!(index, "requesting digest for unverified index from replay");
256            self.get_digest(index);
257        }
258
259        // Initialize the tip manager
260        self.safe_tip.init(
261            self.validators
262                .participants(self.epoch)
263                .expect("unknown participants"),
264        );
265
266        loop {
267            self.metrics.tip.set(self.tip as i64);
268
269            // Propose a new digest if we are processing less than the window
270            let next = self.next();
271            if next < self.tip + self.window {
272                trace!(next, "requesting new digest");
273                assert!(self
274                    .pending
275                    .insert(next, Pending::Unverified(BTreeMap::new()))
276                    .is_none());
277                self.get_digest(next);
278                continue;
279            }
280
281            // Get the rebroadcast deadline for the next index
282            let rebroadcast = match self.rebroadcast_deadlines.peek() {
283                Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
284                None => Either::Right(future::pending()),
285            };
286
287            // Process the next event
288            select! {
289                // Handle shutdown signal
290                _ = &mut shutdown => {
291                    debug!("shutdown");
292                    break;
293                },
294
295                // Handle refresh epoch deadline
296                epoch = epoch_updates.next() => {
297                    // Error handling
298                    let Some(epoch) = epoch else {
299                        error!("epoch subscription failed");
300                        break;
301                    };
302
303                    // Refresh the epoch
304                    debug!(current=self.epoch, new=epoch, "refresh epoch");
305                    assert!(epoch >= self.epoch);
306                    self.epoch = epoch;
307
308                    // Update the tip manager
309                    self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
310
311                    // Update data structures by purging old epochs
312                    let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
313                    self.pending.iter_mut().for_each(|(_, pending)| {
314                        match pending {
315                            Pending::Unverified(acks) => {
316                                acks.retain(|epoch, _| *epoch >= min_epoch);
317                            }
318                            Pending::Verified(_, acks) => {
319                                acks.retain(|epoch, _| *epoch >= min_epoch);
320                            }
321                        }
322                    });
323
324                    continue;
325                },
326
327                // Sign a new ack
328                request = self.digest_requests.next_completed() => {
329                    let DigestRequest { index, result, timer } = request;
330                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
331                    match result {
332                        Err(err) => {
333                            warn!(?err, ?index, "automaton returned error");
334                            self.metrics.digest.inc(Status::Dropped);
335                        }
336                        Ok(digest) => {
337                            if let Err(err) = self.handle_digest(index, digest, &mut sender).await {
338                                debug!(?err, ?index, "handle_digest failed");
339                                continue;
340                            }
341                        }
342                    }
343                },
344
345                // Handle incoming acks
346                msg = receiver.recv() => {
347                    // Error handling
348                    let (sender, msg) = match msg {
349                        Ok(r) => r,
350                        Err(err) => {
351                            warn!(?err, "ack receiver failed");
352                            break;
353                        }
354                    };
355                    let mut guard = self.metrics.acks.guard(Status::Invalid);
356                    let TipAck { ack, tip } = match msg {
357                        Ok(peer_ack) => peer_ack,
358                        Err(err) => {
359                            warn!(?err, ?sender, "ack decode failed, blocking peer");
360                            self.blocker.block(sender).await;
361                            continue;
362                        }
363                    };
364
365                    // Update the tip manager
366                    if self.safe_tip.update(sender.clone(), tip).is_some() {
367                        // Fast-forward our tip if needed
368                        let safe_tip = self.safe_tip.get();
369                        if safe_tip > self.tip {
370                           self.fast_forward_tip(safe_tip).await;
371                        }
372                    }
373
374                    // Validate that we need to process the ack
375                    if let Err(err) = self.validate_ack(&ack, &sender) {
376                        if err.blockable() {
377                            warn!(?sender, ?err, "blocking peer for validation failure");
378                            self.blocker.block(sender).await;
379                        } else {
380                            debug!(?sender, ?err, "ack validate failed");
381                        }
382                        continue;
383                    };
384
385                    // Handle the ack
386                    if let Err(err) = self.handle_ack(&ack).await {
387                        debug!(?err, ?sender, "ack handle failed");
388                        guard.set(Status::Failure);
389                        continue;
390                    }
391
392                    // Update the metrics
393                    debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
394                    guard.set(Status::Success);
395                },
396
397                // Rebroadcast
398                _ = rebroadcast => {
399                    // Get the next index to rebroadcast
400                    let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
401                    trace!("rebroadcasting: index {}", index);
402                    if let Err(err) = self.handle_rebroadcast(index, &mut sender).await {
403                        warn!(?err, ?index, "rebroadcast failed");
404                    };
405                }
406            }
407        }
408
409        // Close journal on shutdown
410        if let Some(journal) = self.journal.take() {
411            journal
412                .close()
413                .await
414                .expect("unable to close aggregation journal");
415        }
416    }
417
418    // ---------- Handling ----------
419
420    /// Handles a digest returned by the automaton.
421    async fn handle_digest(
422        &mut self,
423        index: Index,
424        digest: D,
425        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
426    ) -> Result<(), Error> {
427        // Entry must be `Pending::Unverified`, or return early
428        if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
429            return Err(Error::AckIndex(index));
430        };
431
432        // Move the entry to `Pending::Verified`
433        let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
434            panic!("Pending::Unverified entry not found");
435        };
436        self.pending
437            .insert(index, Pending::Verified(digest, BTreeMap::new()));
438
439        // Handle each `ack` as if it was received over the network. This inserts the values into
440        // the new map, and may form a threshold signature if enough acks are present.
441        // Only process acks that match the verified digest.
442        for epoch_acks in acks.values() {
443            for epoch_ack in epoch_acks.values() {
444                // Drop acks that don't match the verified digest
445                if epoch_ack.item.digest != digest {
446                    continue;
447                }
448
449                // Handle the ack
450                let _ = self.handle_ack(epoch_ack).await;
451            }
452            // Break early if a threshold signature was formed
453            if self.confirmed.contains_key(&index) {
454                break;
455            }
456        }
457
458        // Sign my own ack
459        let ack = self.sign_ack(index, digest).await?;
460
461        // Set the rebroadcast deadline for this index
462        self.rebroadcast_deadlines
463            .put(index, self.context.current() + self.rebroadcast_timeout);
464
465        // Handle ack as if it was received over the network
466        let _ = self.handle_ack(&ack).await;
467
468        // Send ack over the network.
469        self.broadcast(ack, sender).await?;
470
471        Ok(())
472    }
473
474    /// Handles an ack
475    ///
476    /// Returns an error if the ack is invalid, or can be ignored
477    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
478    async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
479        // Get the quorum
480        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
481            return Err(Error::UnknownEpoch(ack.epoch));
482        };
483        let quorum = quorum_from_slice(polynomial);
484
485        // Get the acks and check digest consistency
486        let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
487            None => {
488                // If the index is not in the pending pool, it may be confirmed
489                // (i.e. we have a threshold signature for it).
490                return Err(Error::AckIndex(ack.item.index));
491            }
492            Some(Pending::Unverified(acks)) => acks,
493            Some(Pending::Verified(digest, acks)) => {
494                // If we have a verified digest, ensure the ack matches it
495                if ack.item.digest != *digest {
496                    return Err(Error::AckDigest(ack.item.index));
497                }
498                acks
499            }
500        };
501
502        // Add the partial signature (if not already present)
503        let acks = acks_by_epoch.entry(ack.epoch).or_default();
504        if acks.contains_key(&ack.signature.index) {
505            return Ok(());
506        }
507        acks.insert(ack.signature.index, ack.clone());
508
509        // If there exists a quorum of acks with the same digest (or for the verified digest if it exists), form a threshold signature
510        let partials = acks
511            .values()
512            .filter(|a| a.item.digest == ack.item.digest)
513            .map(|ack| &ack.signature)
514            .collect::<Vec<_>>();
515        if partials.len() >= (quorum as usize) {
516            let item = ack.item.clone();
517            let threshold = threshold_signature_recover::<V, _>(quorum, partials)
518                .expect("Failed to recover threshold signature");
519            self.metrics.threshold.inc();
520            self.handle_threshold(item, threshold).await;
521        }
522
523        Ok(())
524    }
525
526    /// Handles a threshold signature.
527    async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
528        // Check if we already have the threshold
529        let index = item.index;
530        if self.confirmed.contains_key(&index) {
531            return;
532        }
533
534        // Store the threshold
535        let certificate = Certificate {
536            item,
537            signature: threshold,
538        };
539        self.confirmed.insert(index, certificate.clone());
540
541        // Journal and notify the automaton
542        let certified = Activity::Certified(certificate);
543        self.record(certified.clone()).await;
544        self.sync(index).await;
545        self.reporter.report(certified).await;
546
547        // Increase the tip if needed
548        if index == self.tip {
549            // Compute the next tip
550            let mut new_tip = index.saturating_add(1);
551            while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
552                new_tip = new_tip.saturating_add(1);
553            }
554
555            // If the next tip is larger, try to fast-forward the tip (may not be possible)
556            if new_tip > self.tip {
557                self.fast_forward_tip(new_tip).await;
558            }
559        }
560    }
561
562    /// Handles a rebroadcast request for the given index.
563    async fn handle_rebroadcast(
564        &mut self,
565        index: Index,
566        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
567    ) -> Result<(), Error> {
568        let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
569            // The index may already be confirmed; continue silently if so
570            return Ok(());
571        };
572
573        // Get our signature
574        let Some(share) = self.validators.share(self.epoch) else {
575            return Err(Error::UnknownShare(self.epoch));
576        };
577        let ack = acks
578            .get(&self.epoch)
579            .and_then(|acks| acks.get(&share.index).cloned());
580        let ack = match ack {
581            Some(ack) => ack,
582            None => self.sign_ack(index, *digest).await?,
583        };
584
585        // Reinsert the index with a new deadline
586        self.rebroadcast_deadlines
587            .put(index, self.context.current() + self.rebroadcast_timeout);
588
589        // Broadcast the ack to all peers
590        self.broadcast(ack, sender).await
591    }
592
593    // ---------- Validation ----------
594
595    /// Takes a raw ack (from sender) from the p2p network and validates it.
596    ///
597    /// Returns the chunk, epoch, and partial signature if the ack is valid.
598    /// Returns an error if the ack is invalid.
599    fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
600        // Validate epoch
601        {
602            let (eb_lo, eb_hi) = self.epoch_bounds;
603            let bound_lo = self.epoch.saturating_sub(eb_lo);
604            let bound_hi = self.epoch.saturating_add(eb_hi);
605            if ack.epoch < bound_lo || ack.epoch > bound_hi {
606                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
607            }
608        }
609
610        // Validate sender
611        let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
612            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
613        };
614        if sig_index != ack.signature.index {
615            return Err(Error::PeerMismatch);
616        }
617
618        // Collect acks below the tip (if we don't yet have a threshold signature)
619        let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
620        if ack.item.index < activity_threshold {
621            return Err(Error::AckThresholded(ack.item.index));
622        }
623
624        // If the index is above the tip (and the window), ignore for now
625        if ack.item.index >= self.tip + self.window {
626            return Err(Error::AckIndex(ack.item.index));
627        }
628
629        // Validate that we don't already have the ack
630        if self.confirmed.contains_key(&ack.item.index) {
631            return Err(Error::AckThresholded(ack.item.index));
632        }
633        let have_ack = match self.pending.get(&ack.item.index) {
634            None => false,
635            Some(Pending::Unverified(epoch_map)) => epoch_map
636                .get(&ack.epoch)
637                .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
638            Some(Pending::Verified(digest, epoch_map)) => {
639                // While we check this in the `handle_ack` function, checking early here avoids an
640                // unnecessary signature check.
641                if ack.item.digest != *digest {
642                    return Err(Error::AckDigest(ack.item.index));
643                }
644                epoch_map
645                    .get(&ack.epoch)
646                    .is_some_and(|acks| acks.contains_key(&ack.signature.index))
647            }
648        };
649        if have_ack {
650            return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
651        }
652
653        // Validate partial signature
654        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
655            return Err(Error::UnknownEpoch(ack.epoch));
656        };
657        if !ack.verify(&self.namespace, polynomial) {
658            return Err(Error::InvalidAckSignature);
659        }
660
661        Ok(())
662    }
663
664    // ---------- Helpers ----------
665
666    /// Requests the digest from the automaton.
667    ///
668    /// Pending must contain the index.
669    fn get_digest(&mut self, index: Index) {
670        assert!(self.pending.contains_key(&index));
671        let mut automaton = self.automaton.clone();
672        let timer = self.metrics.digest_duration.timer();
673        self.digest_requests.push(async move {
674            let receiver = automaton.propose(index).await;
675            let result = receiver.await.map_err(Error::AppProposeCanceled);
676            DigestRequest {
677                index,
678                result,
679                timer,
680            }
681        });
682    }
683
684    /// Signs an ack for the given index, and digest. Stores the ack in the journal and returns it.
685    /// Returns an error if the share is unknown at the current epoch.
686    async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
687        let Some(share) = self.validators.share(self.epoch) else {
688            return Err(Error::UnknownShare(self.epoch));
689        };
690
691        // Sign the item
692        let item = Item { index, digest };
693        let ack = Ack::sign(&self.namespace, self.epoch, share, item);
694
695        // Journal the ack
696        self.record(Activity::Ack(ack.clone())).await;
697        self.sync(index).await;
698
699        Ok(ack)
700    }
701
702    /// Broadcasts an ack to all peers with the appropriate priority.
703    ///
704    /// Returns an error if the sender returns an error.
705    async fn broadcast(
706        &mut self,
707        ack: Ack<V, D>,
708        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
709    ) -> Result<(), Error> {
710        sender
711            .send(
712                Recipients::All,
713                TipAck { ack, tip: self.tip },
714                self.priority_acks,
715            )
716            .await
717            .map_err(|err| {
718                warn!(?err, "failed to send ack");
719                Error::UnableToSendMessage
720            })?;
721        Ok(())
722    }
723
724    /// Returns the next index that we should process. This is the minimum index for
725    /// which we do not have a digest or an outstanding request to the automaton for the digest.
726    fn next(&self) -> Index {
727        let max_pending = self
728            .pending
729            .last_key_value()
730            .map(|(k, _)| k.saturating_add(1))
731            .unwrap_or_default();
732        let max_confirmed = self
733            .confirmed
734            .last_key_value()
735            .map(|(k, _)| k.saturating_add(1))
736            .unwrap_or_default();
737        max(self.tip, max(max_pending, max_confirmed))
738    }
739
740    /// Increases the tip to the given value, pruning stale entries.
741    ///
742    /// # Panics
743    ///
744    /// Panics if the given tip is less-than-or-equal-to the current tip.
745    async fn fast_forward_tip(&mut self, tip: Index) {
746        assert!(tip > self.tip);
747
748        // Prune data structures with buffer to prevent losing certificates
749        let activity_threshold = tip.saturating_sub(self.activity_timeout);
750        self.pending.retain(|index, _| *index >= activity_threshold);
751        self.confirmed
752            .retain(|index, _| *index >= activity_threshold);
753
754        // Add tip to journal
755        self.record(Activity::Tip(tip)).await;
756        self.sync(tip).await;
757        self.reporter.report(Activity::Tip(tip)).await;
758
759        // Prune journal with buffer, ignoring errors
760        let section = self.get_journal_section(activity_threshold);
761        let journal = self.journal.as_mut().expect("journal must be initialized");
762        let _ = journal.prune(section).await;
763
764        // Update the tip
765        self.tip = tip;
766    }
767
768    // ---------- Journal ----------
769
770    /// Returns the section of the journal for the given `index`.
771    fn get_journal_section(&self, index: Index) -> u64 {
772        index / self.journal_heights_per_section
773    }
774
775    /// Replays the journal, updating the state of the engine.
776    /// Returns a list of unverified pending indices that need digest requests.
777    async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) -> Vec<Index> {
778        let mut tip = Index::default();
779        let mut certified = Vec::new();
780        let mut acks = Vec::new();
781        let stream = journal
782            .replay(0, 0, self.journal_replay_buffer)
783            .await
784            .expect("replay failed");
785        pin_mut!(stream);
786        while let Some(msg) = stream.next().await {
787            let (_, _, _, activity) = msg.expect("replay failed");
788            match activity {
789                Activity::Tip(index) => {
790                    tip = max(tip, index);
791                    self.reporter.report(Activity::Tip(index)).await;
792                }
793                Activity::Certified(certificate) => {
794                    certified.push(certificate.clone());
795                    self.reporter.report(Activity::Certified(certificate)).await;
796                }
797                Activity::Ack(ack) => {
798                    acks.push(ack.clone());
799                    self.reporter.report(Activity::Ack(ack)).await;
800                }
801            }
802        }
803
804        // Update the tip to the highest index in the journal
805        self.tip = tip;
806        let activity_threshold = tip.saturating_sub(self.activity_timeout);
807
808        // Add certified items
809        certified
810            .iter()
811            .filter(|certificate| certificate.item.index >= activity_threshold)
812            .for_each(|certificate| {
813                self.confirmed
814                    .insert(certificate.item.index, certificate.clone());
815            });
816
817        // Group acks by index
818        let mut acks_by_index: BTreeMap<Index, Vec<Ack<V, D>>> = BTreeMap::new();
819        for ack in acks {
820            if ack.item.index >= activity_threshold && !self.confirmed.contains_key(&ack.item.index)
821            {
822                acks_by_index.entry(ack.item.index).or_default().push(ack);
823            }
824        }
825
826        // Process each index's acks
827        let mut unverified = Vec::new();
828        for (index, mut acks_group) in acks_by_index {
829            // Check if we have our own ack (which means we've verified the digest)
830            let our_share = self.validators.share(self.epoch);
831            let our_digest = if let Some(share) = our_share {
832                acks_group
833                    .iter()
834                    .find(|ack| ack.epoch == self.epoch && ack.signature.index == share.index)
835                    .map(|ack| ack.item.digest)
836            } else {
837                None
838            };
839
840            // If our_digest exists, delete everything from acks_group that doesn't match it
841            if let Some(digest) = our_digest {
842                acks_group.retain(|other| other.item.digest == digest);
843            }
844
845            // Create a new epoch map
846            let mut epoch_map = BTreeMap::new();
847            for ack in acks_group {
848                epoch_map
849                    .entry(ack.epoch)
850                    .or_insert_with(BTreeMap::new)
851                    .insert(ack.signature.index, ack);
852            }
853
854            // Insert as Verified if we have our own ack (meaning we verified the digest),
855            // otherwise as Unverified
856            match our_digest {
857                Some(digest) => {
858                    self.pending
859                        .insert(index, Pending::Verified(digest, epoch_map));
860
861                    // If we've already generated an ack and it isn't yet confirmed, mark for immediate rebroadcast
862                    self.rebroadcast_deadlines
863                        .put(index, self.context.current());
864                }
865                None => {
866                    self.pending.insert(index, Pending::Unverified(epoch_map));
867
868                    // Add to unverified indices
869                    unverified.push(index);
870                }
871            }
872        }
873
874        // After replay, ensure we have all indices from tip to next in pending or confirmed
875        // to handle the case where we restart and some indices have no acks yet
876        let next = self.next();
877        for index in self.tip..next {
878            // If we already have the index in pending or confirmed, skip
879            if self.pending.contains_key(&index) || self.confirmed.contains_key(&index) {
880                continue;
881            }
882
883            // Add missing index to pending
884            self.pending
885                .insert(index, Pending::Unverified(BTreeMap::new()));
886            unverified.push(index);
887        }
888        info!(self.tip, next, ?unverified, "replayed journal");
889
890        unverified
891    }
892
893    /// Appends an activity to the journal.
894    async fn record(&mut self, activity: Activity<V, D>) {
895        let index = match activity {
896            Activity::Ack(ref ack) => ack.item.index,
897            Activity::Certified(ref certificate) => certificate.item.index,
898            Activity::Tip(index) => index,
899        };
900        let section = self.get_journal_section(index);
901        self.journal
902            .as_mut()
903            .expect("journal must be initialized")
904            .append(section, activity)
905            .await
906            .expect("unable to append to journal");
907    }
908
909    /// Syncs (ensures all data is written to disk).
910    async fn sync(&mut self, index: Index) {
911        let section = self.get_journal_section(index);
912        let journal = self.journal.as_mut().expect("journal must be initialized");
913        journal.sync(section).await.expect("unable to sync journal");
914    }
915}