commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Epoch, Error, Index, Item, TipAck},
7    Config,
8};
9use crate::{aggregation::types::Certificate, Automaton, Monitor, Reporter, ThresholdSupervisor};
10use commonware_cryptography::{
11    bls12381::primitives::{group, ops::threshold_signature_recover, variant::Variant},
12    Digest, PublicKey,
13};
14use commonware_macros::select;
15use commonware_p2p::{
16    utils::codec::{wrap, WrappedSender},
17    Blocker, Receiver, Recipients, Sender,
18};
19use commonware_runtime::{
20    buffer::PoolRef,
21    telemetry::metrics::{
22        histogram,
23        status::{CounterExt, Status},
24    },
25    Clock, Handle, Metrics, Spawner, Storage,
26};
27use commonware_storage::journal::variable::{Config as JConfig, Journal};
28use commonware_utils::{futures::Pool as FuturesPool, quorum_from_slice, PrioritySet};
29use futures::{
30    future::{self, Either},
31    pin_mut, StreamExt,
32};
33use std::{
34    cmp::max,
35    collections::BTreeMap,
36    num::NonZeroUsize,
37    time::{Duration, SystemTime},
38};
39use tracing::{debug, error, info, trace, warn};
40
41/// An entry for an index that does not yet have a threshold signature.
42enum Pending<V: Variant, D: Digest> {
43    /// The automaton has not yet provided the digest for this index.
44    /// The signatures may have arbitrary digests.
45    Unverified(BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
46
47    /// Verified by the automaton. Now stores the digest.
48    Verified(D, BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
49}
50
51/// The type returned by the `pending` pool, used by the application to return which digest is
52/// associated with the given index.
53struct DigestRequest<D: Digest, E: Clock> {
54    /// The index in question.
55    index: Index,
56
57    /// The result of the verification.
58    result: Result<D, Error>,
59
60    /// Records the time taken to get the digest.
61    timer: histogram::Timer<E>,
62}
63
64/// Instance of the engine.
65pub struct Engine<
66    E: Clock + Spawner + Storage + Metrics,
67    P: PublicKey,
68    V: Variant,
69    D: Digest,
70    A: Automaton<Context = Index, Digest = D> + Clone,
71    Z: Reporter<Activity = Activity<V, D>>,
72    M: Monitor<Index = Epoch>,
73    B: Blocker<PublicKey = P>,
74    TSu: ThresholdSupervisor<
75        Index = Epoch,
76        PublicKey = P,
77        Polynomial = Vec<V::Public>,
78        Share = group::Share,
79    >,
80> {
81    // ---------- Interfaces ----------
82    context: E,
83    automaton: A,
84    monitor: M,
85    validators: TSu,
86    reporter: Z,
87    blocker: B,
88
89    // ---------- Namespace Constants ----------
90    /// The namespace signatures.
91    namespace: Vec<u8>,
92
93    // Pruning
94    /// A tuple representing the epochs to keep in memory.
95    /// The first element is the number of old epochs to keep.
96    /// The second element is the number of future epochs to accept.
97    ///
98    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
99    /// epochs 9, 10, 11, and 12 are kept (and accepted);
100    /// all others are pruned or rejected.
101    epoch_bounds: (u64, u64),
102
103    /// The concurrent number of chunks to process.
104    window: u64,
105
106    /// Number of indices to track below the tip when collecting acks and/or pruning.
107    activity_timeout: u64,
108
109    // Messaging
110    /// Pool of pending futures to request a digest from the automaton.
111    digest_requests: FuturesPool<DigestRequest<D, E>>,
112
113    // State
114    /// The current epoch.
115    epoch: Epoch,
116
117    /// The current tip.
118    tip: Index,
119
120    /// Tracks the tips of all validators.
121    safe_tip: SafeTip<P>,
122
123    /// The keys represent the set of all `Index` values for which we are attempting to form a
124    /// threshold signature, but do not yet have one. Values may be [Pending::Unverified] or
125    /// [Pending::Verified], depending on whether the automaton has verified the digest or not.
126    pending: BTreeMap<Index, Pending<V, D>>,
127
128    /// A map of indices with a threshold signature. Cached in memory if needed to send to other peers.
129    confirmed: BTreeMap<Index, Certificate<V, D>>,
130
131    // ---------- Rebroadcasting ----------
132    /// The frequency at which to rebroadcast pending indices.
133    rebroadcast_timeout: Duration,
134
135    /// A set of deadlines for rebroadcasting `Index` values that do not have a threshold signature.
136    rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
137
138    // ---------- Journal ----------
139    /// Journal for storing acks signed by this node.
140    journal: Option<Journal<E, Activity<V, D>>>,
141    journal_partition: String,
142    journal_write_buffer: NonZeroUsize,
143    journal_replay_buffer: NonZeroUsize,
144    journal_heights_per_section: u64,
145    journal_compression: Option<u8>,
146    journal_buffer_pool: PoolRef,
147
148    // ---------- Network ----------
149    /// Whether to send acks as priority messages.
150    priority_acks: bool,
151
152    // ---------- Metrics ----------
153    /// Metrics
154    metrics: metrics::Metrics<E>,
155}
156
157impl<
158        E: Clock + Spawner + Storage + Metrics,
159        P: PublicKey,
160        V: Variant,
161        D: Digest,
162        A: Automaton<Context = Index, Digest = D> + Clone,
163        Z: Reporter<Activity = Activity<V, D>>,
164        M: Monitor<Index = Epoch>,
165        B: Blocker<PublicKey = P>,
166        TSu: ThresholdSupervisor<
167            Index = Epoch,
168            PublicKey = P,
169            Polynomial = Vec<V::Public>,
170            Share = group::Share,
171        >,
172    > Engine<E, P, V, D, A, Z, M, B, TSu>
173{
174    /// Creates a new engine with the given context and configuration.
175    pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
176        let metrics = metrics::Metrics::init(context.clone());
177
178        Self {
179            context,
180            automaton: cfg.automaton,
181            reporter: cfg.reporter,
182            monitor: cfg.monitor,
183            validators: cfg.validators,
184            blocker: cfg.blocker,
185            namespace: cfg.namespace,
186            epoch_bounds: cfg.epoch_bounds,
187            window: cfg.window.into(),
188            activity_timeout: cfg.activity_timeout,
189            epoch: 0,
190            tip: 0,
191            safe_tip: SafeTip::default(),
192            digest_requests: FuturesPool::default(),
193            pending: BTreeMap::new(),
194            confirmed: BTreeMap::new(),
195            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
196            rebroadcast_deadlines: PrioritySet::new(),
197            journal: None,
198            journal_partition: cfg.journal_partition,
199            journal_write_buffer: cfg.journal_write_buffer,
200            journal_replay_buffer: cfg.journal_replay_buffer,
201            journal_heights_per_section: cfg.journal_heights_per_section.into(),
202            journal_compression: cfg.journal_compression,
203            journal_buffer_pool: cfg.journal_buffer_pool,
204            priority_acks: cfg.priority_acks,
205            metrics,
206        }
207    }
208
209    /// Runs the engine until the context is stopped.
210    ///
211    /// The engine will handle:
212    /// - Requesting and processing digests from the automaton
213    /// - Timeouts
214    ///   - Refreshing the Epoch
215    ///   - Rebroadcasting Acks
216    /// - Messages from the network:
217    ///   - Acks from other validators
218    pub fn start(
219        mut self,
220        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
221    ) -> Handle<()> {
222        self.context.spawn_ref()(self.run(network))
223    }
224
225    /// Inner run loop called by `start`.
226    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
227        let (mut sender, mut receiver) = wrap((), network.0, network.1);
228        let mut shutdown = self.context.stopped();
229
230        // Initialize the epoch
231        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
232        self.epoch = latest;
233
234        // Initialize Journal
235        let journal_cfg = JConfig {
236            partition: self.journal_partition.clone(),
237            compression: self.journal_compression,
238            codec_config: (),
239            buffer_pool: self.journal_buffer_pool.clone(),
240            write_buffer: self.journal_write_buffer,
241        };
242        let journal = Journal::init(self.context.with_label("journal"), journal_cfg)
243            .await
244            .expect("init failed");
245        self.replay(&journal).await;
246        self.journal = Some(journal);
247
248        // Initialize the tip manager
249        self.safe_tip.init(
250            self.validators
251                .participants(self.epoch)
252                .expect("unknown participants"),
253        );
254
255        loop {
256            self.metrics.tip.set(self.tip as i64);
257
258            // Propose a new digest if we are processing less than the window
259            let next = self.next();
260            if next < self.tip + self.window {
261                trace!("requesting new digest: index {}", next);
262                self.get_digest(next);
263                continue;
264            }
265
266            // Get the rebroadcast deadline for the next index
267            let rebroadcast = match self.rebroadcast_deadlines.peek() {
268                Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
269                None => Either::Right(future::pending()),
270            };
271
272            // Process the next event
273            select! {
274                // Handle shutdown signal
275                _ = &mut shutdown => {
276                    debug!("shutdown");
277                    break;
278                },
279
280                // Handle refresh epoch deadline
281                epoch = epoch_updates.next() => {
282                    // Error handling
283                    let Some(epoch) = epoch else {
284                        error!("epoch subscription failed");
285                        break;
286                    };
287
288                    // Refresh the epoch
289                    debug!(current=self.epoch, new=epoch, "refresh epoch");
290                    assert!(epoch >= self.epoch);
291                    self.epoch = epoch;
292
293                    // Update the tip manager
294                    self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
295
296                    // Update data structures by purging old epochs
297                    let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
298                    self.pending.iter_mut().for_each(|(_, pending)| {
299                        match pending {
300                            Pending::Unverified(acks) => {
301                                acks.retain(|epoch, _| *epoch >= min_epoch);
302                            }
303                            Pending::Verified(_, acks) => {
304                                acks.retain(|epoch, _| *epoch >= min_epoch);
305                            }
306                        }
307                    });
308
309                    continue;
310                },
311
312                // Sign a new ack
313                request = self.digest_requests.next_completed() => {
314                    let DigestRequest { index, result, timer } = request;
315                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
316                    match result {
317                        Err(err) => {
318                            warn!(?err, ?index, "automaton returned error");
319                            self.metrics.digest.inc(Status::Dropped);
320                        }
321                        Ok(digest) => {
322                            if let Err(err) = self.handle_digest(index, digest, &mut sender).await {
323                                debug!(?err, ?index, "handle_digest failed");
324                                continue;
325                            }
326                        }
327                    }
328                },
329
330                // Handle incoming acks
331                msg = receiver.recv() => {
332                    // Error handling
333                    let (sender, msg) = match msg {
334                        Ok(r) => r,
335                        Err(err) => {
336                            warn!(?err, "ack receiver failed");
337                            break;
338                        }
339                    };
340                    let mut guard = self.metrics.acks.guard(Status::Invalid);
341                    let TipAck { ack, tip } = match msg {
342                        Ok(peer_ack) => peer_ack,
343                        Err(err) => {
344                            warn!(?err, ?sender, "ack decode failed, blocking peer");
345                            self.blocker.block(sender).await;
346                            continue;
347                        }
348                    };
349
350                    // Update the tip manager
351                    if self.safe_tip.update(sender.clone(), tip).is_some() {
352                        // Fast-forward our tip if needed
353                        let safe_tip = self.safe_tip.get();
354                        if safe_tip > self.tip {
355                           self.fast_forward_tip(safe_tip).await;
356                        }
357                    }
358
359                    // Validate that we need to process the ack
360                    if let Err(err) = self.validate_ack(&ack, &sender) {
361                        if err.blockable() {
362                            warn!(?sender, ?err, "blocking peer for validation failure");
363                            self.blocker.block(sender).await;
364                        } else {
365                            debug!(?sender, ?err, "ack validate failed");
366                        }
367                        continue;
368                    };
369
370                    // Handle the ack
371                    if let Err(err) = self.handle_ack(&ack).await {
372                        debug!(?err, ?sender, "ack handle failed");
373                        guard.set(Status::Failure);
374                        continue;
375                    }
376
377                    // Update the metrics
378                    debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
379                    guard.set(Status::Success);
380                },
381
382                // Rebroadcast
383                _ = rebroadcast => {
384                    // Get the next index to rebroadcast
385                    let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
386                    trace!("rebroadcasting: index {}", index);
387                    if let Err(err) = self.handle_rebroadcast(index, &mut sender).await {
388                        warn!(?err, ?index, "rebroadcast failed");
389                    };
390                }
391            }
392        }
393
394        // Close journal on shutdown
395        if let Some(journal) = self.journal.take() {
396            journal
397                .close()
398                .await
399                .expect("unable to close aggregation journal");
400        }
401    }
402
403    // ---------- Handling ----------
404
405    /// Handles a digest returned by the automaton.
406    async fn handle_digest(
407        &mut self,
408        index: Index,
409        digest: D,
410        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
411    ) -> Result<(), Error> {
412        // Entry must be `Pending::Unverified`, or return early
413        if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
414            return Err(Error::AckIndex(index));
415        };
416
417        // Move the entry to `Pending::Verified`
418        let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
419            panic!("Pending::Unverified entry not found");
420        };
421        self.pending
422            .insert(index, Pending::Verified(digest, BTreeMap::new()));
423
424        // Handle each `ack` as if it was received over the network. This inserts the values into
425        // the new map, and may form a threshold signature if enough acks are present.
426        // Only process acks that match the verified digest.
427        for epoch_acks in acks.values() {
428            for epoch_ack in epoch_acks.values() {
429                // Drop acks that don't match the verified digest
430                if epoch_ack.item.digest != digest {
431                    continue;
432                }
433
434                // Handle the ack
435                let _ = self.handle_ack(epoch_ack).await;
436            }
437            // Break early if a threshold signature was formed
438            if self.confirmed.contains_key(&index) {
439                break;
440            }
441        }
442
443        // Sign my own ack
444        let ack = self.sign_ack(index, digest).await?;
445
446        // Set the rebroadcast deadline for this index
447        self.rebroadcast_deadlines
448            .put(index, self.context.current() + self.rebroadcast_timeout);
449
450        // Handle ack as if it was received over the network
451        let _ = self.handle_ack(&ack).await;
452
453        // Send ack over the network.
454        self.broadcast(ack, sender).await?;
455
456        Ok(())
457    }
458
459    /// Handles an ack
460    ///
461    /// Returns an error if the ack is invalid, or can be ignored
462    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
463    async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
464        // Get the quorum
465        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
466            return Err(Error::UnknownEpoch(ack.epoch));
467        };
468        let quorum = quorum_from_slice(polynomial);
469
470        // Get the acks and check digest consistency
471        let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
472            None => {
473                // If the index is not in the pending pool, it may be confirmed
474                // (i.e. we have a threshold signature for it).
475                return Err(Error::AckIndex(ack.item.index));
476            }
477            Some(Pending::Unverified(acks)) => acks,
478            Some(Pending::Verified(digest, acks)) => {
479                // If we have a verified digest, ensure the ack matches it
480                if ack.item.digest != *digest {
481                    return Err(Error::AckDigest(ack.item.index));
482                }
483                acks
484            }
485        };
486
487        // Add the partial signature (if not already present)
488        let acks = acks_by_epoch.entry(ack.epoch).or_default();
489        if acks.contains_key(&ack.signature.index) {
490            return Ok(());
491        }
492        acks.insert(ack.signature.index, ack.clone());
493
494        // If there exists a quorum of acks with the same digest (or for the verified digest if it exists), form a threshold signature
495        let partials = acks
496            .values()
497            .filter(|a| a.item.digest == ack.item.digest)
498            .map(|ack| &ack.signature)
499            .collect::<Vec<_>>();
500        if partials.len() >= (quorum as usize) {
501            let item = ack.item.clone();
502            let threshold = threshold_signature_recover::<V, _>(quorum, partials)
503                .expect("Failed to recover threshold signature");
504            self.metrics.threshold.inc();
505            self.handle_threshold(item, threshold).await;
506        }
507
508        Ok(())
509    }
510
511    /// Handles a threshold signature.
512    async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
513        // Check if we already have the threshold
514        let index = item.index;
515        if self.confirmed.contains_key(&index) {
516            return;
517        }
518
519        // Store the threshold
520        let certificate = Certificate {
521            item,
522            signature: threshold,
523        };
524        self.confirmed.insert(index, certificate.clone());
525
526        // Journal and notify the automaton
527        let certified = Activity::Certified(certificate);
528        self.record(certified.clone()).await;
529        self.sync(index).await;
530        self.reporter.report(certified).await;
531
532        // Increase the tip if needed
533        if index == self.tip {
534            // Compute the next tip
535            let mut new_tip = index.saturating_add(1);
536            while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
537                new_tip = new_tip.saturating_add(1);
538            }
539
540            // If the next tip is larger, try to fast-forward the tip (may not be possible)
541            if new_tip > self.tip {
542                self.fast_forward_tip(new_tip).await;
543            }
544        }
545    }
546
547    /// Handles a rebroadcast request for the given index.
548    async fn handle_rebroadcast(
549        &mut self,
550        index: Index,
551        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
552    ) -> Result<(), Error> {
553        let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
554            // The index may already be confirmed; continue silently if so
555            return Ok(());
556        };
557
558        // Get our signature
559        let Some(share) = self.validators.share(self.epoch) else {
560            return Err(Error::UnknownShare(self.epoch));
561        };
562        let ack = acks
563            .get(&self.epoch)
564            .and_then(|acks| acks.get(&share.index).cloned());
565        let ack = match ack {
566            Some(ack) => ack,
567            None => self.sign_ack(index, *digest).await?,
568        };
569
570        // Reinsert the index with a new deadline
571        self.rebroadcast_deadlines
572            .put(index, self.context.current() + self.rebroadcast_timeout);
573
574        // Broadcast the ack to all peers
575        self.broadcast(ack, sender).await
576    }
577
578    // ---------- Validation ----------
579
580    /// Takes a raw ack (from sender) from the p2p network and validates it.
581    ///
582    /// Returns the chunk, epoch, and partial signature if the ack is valid.
583    /// Returns an error if the ack is invalid.
584    fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
585        // Validate epoch
586        {
587            let (eb_lo, eb_hi) = self.epoch_bounds;
588            let bound_lo = self.epoch.saturating_sub(eb_lo);
589            let bound_hi = self.epoch.saturating_add(eb_hi);
590            if ack.epoch < bound_lo || ack.epoch > bound_hi {
591                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
592            }
593        }
594
595        // Validate sender
596        let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
597            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
598        };
599        if sig_index != ack.signature.index {
600            return Err(Error::PeerMismatch);
601        }
602
603        // Collect acks below the tip (if we don't yet have a threshold signature)
604        let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
605        if ack.item.index < activity_threshold {
606            return Err(Error::AckThresholded(ack.item.index));
607        }
608
609        // If the index is above the tip (and the window), ignore for now
610        if ack.item.index >= self.tip + self.window {
611            return Err(Error::AckIndex(ack.item.index));
612        }
613
614        // Validate that we don't already have the ack
615        if self.confirmed.contains_key(&ack.item.index) {
616            return Err(Error::AckThresholded(ack.item.index));
617        }
618        let have_ack = match self.pending.get(&ack.item.index) {
619            None => false,
620            Some(Pending::Unverified(epoch_map)) => epoch_map
621                .get(&ack.epoch)
622                .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
623            Some(Pending::Verified(digest, epoch_map)) => {
624                // While we check this in the `handle_ack` function, checking early here avoids an
625                // unnecessary signature check.
626                if ack.item.digest != *digest {
627                    return Err(Error::AckDigest(ack.item.index));
628                }
629                epoch_map
630                    .get(&ack.epoch)
631                    .is_some_and(|acks| acks.contains_key(&ack.signature.index))
632            }
633        };
634        if have_ack {
635            return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
636        }
637
638        // Validate partial signature
639        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
640            return Err(Error::UnknownEpoch(ack.epoch));
641        };
642        if !ack.verify(&self.namespace, polynomial) {
643            return Err(Error::InvalidAckSignature);
644        }
645
646        Ok(())
647    }
648
649    // ---------- Helpers ----------
650
651    /// Sets `index` as pending and requests the digest from the automaton.
652    fn get_digest(&mut self, index: Index) {
653        assert!(self
654            .pending
655            .insert(index, Pending::Unverified(BTreeMap::new()))
656            .is_none());
657
658        let mut automaton = self.automaton.clone();
659        let timer = self.metrics.digest_duration.timer();
660        self.digest_requests.push(async move {
661            let receiver = automaton.propose(index).await;
662            let result = receiver.await.map_err(Error::AppProposeCanceled);
663            DigestRequest {
664                index,
665                result,
666                timer,
667            }
668        });
669    }
670
671    /// Signs an ack for the given index, and digest. Stores the ack in the journal and returns it.
672    /// Returns an error if the share is unknown at the current epoch.
673    async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
674        let Some(share) = self.validators.share(self.epoch) else {
675            return Err(Error::UnknownShare(self.epoch));
676        };
677
678        // Sign the item
679        let item = Item { index, digest };
680        let ack = Ack::sign(&self.namespace, self.epoch, share, item);
681
682        // Journal the ack
683        self.record(Activity::Ack(ack.clone())).await;
684        self.sync(index).await;
685
686        Ok(ack)
687    }
688
689    /// Broadcasts an ack to all peers with the appropriate priority.
690    ///
691    /// Returns an error if the sender returns an error.
692    async fn broadcast(
693        &mut self,
694        ack: Ack<V, D>,
695        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
696    ) -> Result<(), Error> {
697        sender
698            .send(
699                Recipients::All,
700                TipAck { ack, tip: self.tip },
701                self.priority_acks,
702            )
703            .await
704            .map_err(|err| {
705                warn!(?err, "failed to send ack");
706                Error::UnableToSendMessage
707            })?;
708        Ok(())
709    }
710
711    /// Returns the next index that we should process. This is the minimum index for
712    /// which we do not have a digest or an outstanding request to the automaton for the digest.
713    fn next(&self) -> Index {
714        let max_pending = self
715            .pending
716            .last_key_value()
717            .map(|(k, _)| k.saturating_add(1))
718            .unwrap_or_default();
719        let max_confirmed = self
720            .confirmed
721            .last_key_value()
722            .map(|(k, _)| k.saturating_add(1))
723            .unwrap_or_default();
724        max(self.tip, max(max_pending, max_confirmed))
725    }
726
727    /// Increases the tip to the given value, pruning stale entries.
728    ///
729    /// # Panics
730    ///
731    /// Panics if the given tip is less-than-or-equal-to the current tip.
732    async fn fast_forward_tip(&mut self, tip: Index) {
733        assert!(tip > self.tip);
734
735        // Prune data structures with buffer to prevent losing certificates
736        let activity_threshold = tip.saturating_sub(self.activity_timeout);
737        self.pending.retain(|index, _| *index >= activity_threshold);
738        self.confirmed
739            .retain(|index, _| *index >= activity_threshold);
740
741        // Add tip to journal
742        self.record(Activity::Tip(tip)).await;
743        self.sync(tip).await;
744        self.reporter.report(Activity::Tip(tip)).await;
745
746        // Prune journal with buffer, ignoring errors
747        let section = self.get_journal_section(activity_threshold);
748        let journal = self.journal.as_mut().expect("journal must be initialized");
749        let _ = journal.prune(section).await;
750
751        // Update the tip
752        self.tip = tip;
753    }
754
755    // ---------- Journal ----------
756
757    /// Returns the section of the journal for the given `index`.
758    fn get_journal_section(&self, index: Index) -> u64 {
759        index / self.journal_heights_per_section
760    }
761
762    /// Replays the journal, updating the state of the engine.
763    async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) {
764        let mut tip = Index::default();
765        let mut certified = Vec::new();
766        let mut acks = Vec::new();
767        let stream = journal
768            .replay(0, 0, self.journal_replay_buffer)
769            .await
770            .expect("replay failed");
771        pin_mut!(stream);
772        while let Some(msg) = stream.next().await {
773            let (_, _, _, activity) = msg.expect("replay failed");
774            match activity {
775                Activity::Tip(index) => {
776                    tip = max(tip, index);
777                    self.reporter.report(Activity::Tip(index)).await;
778                }
779                Activity::Certified(certificate) => {
780                    certified.push(certificate.clone());
781                    self.reporter.report(Activity::Certified(certificate)).await;
782                }
783                Activity::Ack(ack) => {
784                    acks.push(ack.clone());
785                    self.reporter.report(Activity::Ack(ack)).await;
786                }
787            }
788        }
789
790        // Update the tip to the highest index in the journal
791        self.tip = tip;
792        let activity_threshold = tip.saturating_sub(self.activity_timeout);
793
794        // Add certified items
795        certified
796            .iter()
797            .filter(|certificate| certificate.item.index >= activity_threshold)
798            .for_each(|certificate| {
799                self.confirmed
800                    .insert(certificate.item.index, certificate.clone());
801            });
802
803        // Group acks by index
804        let mut acks_by_index: BTreeMap<Index, Vec<Ack<V, D>>> = BTreeMap::new();
805        for ack in acks {
806            if ack.item.index >= activity_threshold && !self.confirmed.contains_key(&ack.item.index)
807            {
808                acks_by_index.entry(ack.item.index).or_default().push(ack);
809            }
810        }
811
812        // Process each index's acks
813        for (index, mut acks_group) in acks_by_index {
814            // Check if we have our own ack (which means we've verified the digest)
815            let our_share = self.validators.share(self.epoch);
816            let our_digest = if let Some(share) = our_share {
817                acks_group
818                    .iter()
819                    .find(|ack| ack.epoch == self.epoch && ack.signature.index == share.index)
820                    .map(|ack| ack.item.digest)
821            } else {
822                None
823            };
824
825            // If our_digest exists, delete everything from acks_group that doesn't match it
826            if let Some(digest) = our_digest {
827                acks_group.retain(|other| other.item.digest == digest);
828            }
829
830            // Create a new epoch map
831            let mut epoch_map = BTreeMap::new();
832            for ack in acks_group {
833                epoch_map
834                    .entry(ack.epoch)
835                    .or_insert_with(BTreeMap::new)
836                    .insert(ack.signature.index, ack);
837            }
838
839            // Insert as Verified if we have our own ack (meaning we verified the digest),
840            // otherwise as Unverified
841            match our_digest {
842                Some(digest) => {
843                    self.pending
844                        .insert(index, Pending::Verified(digest, epoch_map));
845
846                    // If we've already generated an ack and it isn't yet confirmed, mark for immediate rebroadcast
847                    self.rebroadcast_deadlines
848                        .put(index, self.context.current());
849                }
850                None => {
851                    self.pending.insert(index, Pending::Unverified(epoch_map));
852                }
853            }
854        }
855
856        info!(self.tip, next = self.next(), "replayed journal");
857    }
858
859    /// Appends an activity to the journal.
860    async fn record(&mut self, activity: Activity<V, D>) {
861        let index = match activity {
862            Activity::Ack(ref ack) => ack.item.index,
863            Activity::Certified(ref certificate) => certificate.item.index,
864            Activity::Tip(index) => index,
865        };
866        let section = self.get_journal_section(index);
867        self.journal
868            .as_mut()
869            .expect("journal must be initialized")
870            .append(section, activity)
871            .await
872            .expect("unable to append to journal");
873    }
874
875    /// Syncs (ensures all data is written to disk).
876    async fn sync(&mut self, index: Index) {
877        let section = self.get_journal_section(index);
878        let journal = self.journal.as_mut().expect("journal must be initialized");
879        journal.sync(section).await.expect("unable to sync journal");
880    }
881}