commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Epoch, Error, Index, Item, TipAck},
7    Config,
8};
9use crate::{aggregation::types::Certificate, Automaton, Monitor, Reporter, ThresholdSupervisor};
10use commonware_cryptography::{
11    bls12381::primitives::{group, ops::threshold_signature_recover, variant::Variant},
12    Digest, PublicKey,
13};
14use commonware_macros::select;
15use commonware_p2p::{
16    utils::codec::{wrap, WrappedSender},
17    Blocker, Receiver, Recipients, Sender,
18};
19use commonware_runtime::{
20    buffer::PoolRef,
21    telemetry::metrics::{
22        histogram,
23        status::{CounterExt, Status},
24    },
25    Clock, Handle, Metrics, Spawner, Storage,
26};
27use commonware_storage::journal::variable::{Config as JConfig, Journal};
28use commonware_utils::{futures::Pool as FuturesPool, quorum_from_slice, PrioritySet};
29use futures::{
30    future::{self, Either},
31    pin_mut, StreamExt,
32};
33use std::{
34    cmp::max,
35    collections::BTreeMap,
36    num::NonZeroUsize,
37    time::{Duration, SystemTime},
38};
39use tracing::{debug, error, info, trace, warn};
40
41/// An entry for an index that does not yet have a threshold signature.
42enum Pending<V: Variant, D: Digest> {
43    /// The automaton has not yet provided the digest for this index.
44    /// The signatures may have arbitrary digests.
45    Unverified(BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
46
47    /// Verified by the automaton. Now stores the digest.
48    Verified(D, BTreeMap<Epoch, BTreeMap<u32, Ack<V, D>>>),
49}
50
51/// The type returned by the `pending` pool, used by the application to return which digest is
52/// associated with the given index.
53struct DigestRequest<D: Digest, E: Clock> {
54    /// The index in question.
55    index: Index,
56
57    /// The result of the verification.
58    result: Result<D, Error>,
59
60    /// Records the time taken to get the digest.
61    timer: histogram::Timer<E>,
62}
63
64/// Instance of the engine.
65pub struct Engine<
66    E: Clock + Spawner + Storage + Metrics,
67    P: PublicKey,
68    V: Variant,
69    D: Digest,
70    A: Automaton<Context = Index, Digest = D> + Clone,
71    Z: Reporter<Activity = Activity<V, D>>,
72    M: Monitor<Index = Epoch>,
73    B: Blocker<PublicKey = P>,
74    TSu: ThresholdSupervisor<
75        Index = Epoch,
76        PublicKey = P,
77        Polynomial = Vec<V::Public>,
78        Share = group::Share,
79    >,
80> {
81    // ---------- Interfaces ----------
82    context: E,
83    automaton: A,
84    monitor: M,
85    validators: TSu,
86    reporter: Z,
87    blocker: B,
88
89    // ---------- Namespace Constants ----------
90    /// The namespace signatures.
91    namespace: Vec<u8>,
92
93    // Pruning
94    /// A tuple representing the epochs to keep in memory.
95    /// The first element is the number of old epochs to keep.
96    /// The second element is the number of future epochs to accept.
97    ///
98    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
99    /// epochs 9, 10, 11, and 12 are kept (and accepted);
100    /// all others are pruned or rejected.
101    epoch_bounds: (u64, u64),
102
103    /// The concurrent number of chunks to process.
104    window: u64,
105
106    /// Number of indices to track below the tip when collecting acks and/or pruning.
107    activity_timeout: u64,
108
109    // Messaging
110    /// Pool of pending futures to request a digest from the automaton.
111    digest_requests: FuturesPool<DigestRequest<D, E>>,
112
113    // State
114    /// The current epoch.
115    epoch: Epoch,
116
117    /// The current tip.
118    tip: Index,
119
120    /// Tracks the tips of all validators.
121    safe_tip: SafeTip<P>,
122
123    /// The keys represent the set of all `Index` values for which we are attempting to form a
124    /// threshold signature, but do not yet have one. Values may be [Pending::Unverified] or
125    /// [Pending::Verified], depending on whether the automaton has verified the digest or not.
126    pending: BTreeMap<Index, Pending<V, D>>,
127
128    /// A map of indices with a threshold signature. Cached in memory if needed to send to other peers.
129    confirmed: BTreeMap<Index, Certificate<V, D>>,
130
131    // ---------- Rebroadcasting ----------
132    /// The frequency at which to rebroadcast pending indices.
133    rebroadcast_timeout: Duration,
134
135    /// A set of deadlines for rebroadcasting `Index` values that do not have a threshold signature.
136    rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
137
138    // ---------- Journal ----------
139    /// Journal for storing acks signed by this node.
140    journal: Option<Journal<E, Activity<V, D>>>,
141    journal_partition: String,
142    journal_write_buffer: NonZeroUsize,
143    journal_replay_buffer: NonZeroUsize,
144    journal_heights_per_section: u64,
145    journal_compression: Option<u8>,
146    journal_buffer_pool: PoolRef,
147
148    // ---------- Network ----------
149    /// Whether to send acks as priority messages.
150    priority_acks: bool,
151
152    // ---------- Metrics ----------
153    /// Metrics
154    metrics: metrics::Metrics<E>,
155}
156
157impl<
158        E: Clock + Spawner + Storage + Metrics,
159        P: PublicKey,
160        V: Variant,
161        D: Digest,
162        A: Automaton<Context = Index, Digest = D> + Clone,
163        Z: Reporter<Activity = Activity<V, D>>,
164        M: Monitor<Index = Epoch>,
165        B: Blocker<PublicKey = P>,
166        TSu: ThresholdSupervisor<
167            Index = Epoch,
168            PublicKey = P,
169            Polynomial = Vec<V::Public>,
170            Share = group::Share,
171        >,
172    > Engine<E, P, V, D, A, Z, M, B, TSu>
173{
174    /// Creates a new engine with the given context and configuration.
175    pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
176        let metrics = metrics::Metrics::init(context.clone());
177
178        Self {
179            context,
180            automaton: cfg.automaton,
181            reporter: cfg.reporter,
182            monitor: cfg.monitor,
183            validators: cfg.validators,
184            blocker: cfg.blocker,
185            namespace: cfg.namespace,
186            epoch_bounds: cfg.epoch_bounds,
187            window: cfg.window.into(),
188            activity_timeout: cfg.activity_timeout,
189            epoch: 0,
190            tip: 0,
191            safe_tip: SafeTip::default(),
192            digest_requests: FuturesPool::default(),
193            pending: BTreeMap::new(),
194            confirmed: BTreeMap::new(),
195            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
196            rebroadcast_deadlines: PrioritySet::new(),
197            journal: None,
198            journal_partition: cfg.journal_partition,
199            journal_write_buffer: cfg.journal_write_buffer,
200            journal_replay_buffer: cfg.journal_replay_buffer,
201            journal_heights_per_section: cfg.journal_heights_per_section.into(),
202            journal_compression: cfg.journal_compression,
203            journal_buffer_pool: cfg.journal_buffer_pool,
204            priority_acks: cfg.priority_acks,
205            metrics,
206        }
207    }
208
209    /// Runs the engine until the context is stopped.
210    ///
211    /// The engine will handle:
212    /// - Requesting and processing digests from the automaton
213    /// - Timeouts
214    ///   - Refreshing the Epoch
215    ///   - Rebroadcasting Acks
216    /// - Messages from the network:
217    ///   - Acks from other validators
218    pub fn start(
219        mut self,
220        network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>),
221    ) -> Handle<()> {
222        self.context.spawn_ref()(self.run(network))
223    }
224
225    /// Inner run loop called by `start`.
226    async fn run(mut self, network: (impl Sender<PublicKey = P>, impl Receiver<PublicKey = P>)) {
227        let (mut sender, mut receiver) = wrap((), network.0, network.1);
228        let mut shutdown = self.context.stopped();
229
230        // Initialize the epoch
231        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
232        self.epoch = latest;
233
234        // Initialize Journal
235        let journal_cfg = JConfig {
236            partition: self.journal_partition.clone(),
237            compression: self.journal_compression,
238            codec_config: (),
239            buffer_pool: self.journal_buffer_pool.clone(),
240            write_buffer: self.journal_write_buffer,
241        };
242        let journal = Journal::init(self.context.with_label("journal"), journal_cfg)
243            .await
244            .expect("init failed");
245        let unverified_indices = self.replay(&journal).await;
246        self.journal = Some(journal);
247
248        // Request digests for unverified indices
249        for index in unverified_indices {
250            trace!(index, "requesting digest for unverified index from replay");
251            self.get_digest(index);
252        }
253
254        // Initialize the tip manager
255        self.safe_tip.init(
256            self.validators
257                .participants(self.epoch)
258                .expect("unknown participants"),
259        );
260
261        loop {
262            self.metrics.tip.set(self.tip as i64);
263
264            // Propose a new digest if we are processing less than the window
265            let next = self.next();
266            if next < self.tip + self.window {
267                trace!(next, "requesting new digest");
268                assert!(self
269                    .pending
270                    .insert(next, Pending::Unverified(BTreeMap::new()))
271                    .is_none());
272                self.get_digest(next);
273                continue;
274            }
275
276            // Get the rebroadcast deadline for the next index
277            let rebroadcast = match self.rebroadcast_deadlines.peek() {
278                Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
279                None => Either::Right(future::pending()),
280            };
281
282            // Process the next event
283            select! {
284                // Handle shutdown signal
285                _ = &mut shutdown => {
286                    debug!("shutdown");
287                    break;
288                },
289
290                // Handle refresh epoch deadline
291                epoch = epoch_updates.next() => {
292                    // Error handling
293                    let Some(epoch) = epoch else {
294                        error!("epoch subscription failed");
295                        break;
296                    };
297
298                    // Refresh the epoch
299                    debug!(current=self.epoch, new=epoch, "refresh epoch");
300                    assert!(epoch >= self.epoch);
301                    self.epoch = epoch;
302
303                    // Update the tip manager
304                    self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
305
306                    // Update data structures by purging old epochs
307                    let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
308                    self.pending.iter_mut().for_each(|(_, pending)| {
309                        match pending {
310                            Pending::Unverified(acks) => {
311                                acks.retain(|epoch, _| *epoch >= min_epoch);
312                            }
313                            Pending::Verified(_, acks) => {
314                                acks.retain(|epoch, _| *epoch >= min_epoch);
315                            }
316                        }
317                    });
318
319                    continue;
320                },
321
322                // Sign a new ack
323                request = self.digest_requests.next_completed() => {
324                    let DigestRequest { index, result, timer } = request;
325                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
326                    match result {
327                        Err(err) => {
328                            warn!(?err, ?index, "automaton returned error");
329                            self.metrics.digest.inc(Status::Dropped);
330                        }
331                        Ok(digest) => {
332                            if let Err(err) = self.handle_digest(index, digest, &mut sender).await {
333                                debug!(?err, ?index, "handle_digest failed");
334                                continue;
335                            }
336                        }
337                    }
338                },
339
340                // Handle incoming acks
341                msg = receiver.recv() => {
342                    // Error handling
343                    let (sender, msg) = match msg {
344                        Ok(r) => r,
345                        Err(err) => {
346                            warn!(?err, "ack receiver failed");
347                            break;
348                        }
349                    };
350                    let mut guard = self.metrics.acks.guard(Status::Invalid);
351                    let TipAck { ack, tip } = match msg {
352                        Ok(peer_ack) => peer_ack,
353                        Err(err) => {
354                            warn!(?err, ?sender, "ack decode failed, blocking peer");
355                            self.blocker.block(sender).await;
356                            continue;
357                        }
358                    };
359
360                    // Update the tip manager
361                    if self.safe_tip.update(sender.clone(), tip).is_some() {
362                        // Fast-forward our tip if needed
363                        let safe_tip = self.safe_tip.get();
364                        if safe_tip > self.tip {
365                           self.fast_forward_tip(safe_tip).await;
366                        }
367                    }
368
369                    // Validate that we need to process the ack
370                    if let Err(err) = self.validate_ack(&ack, &sender) {
371                        if err.blockable() {
372                            warn!(?sender, ?err, "blocking peer for validation failure");
373                            self.blocker.block(sender).await;
374                        } else {
375                            debug!(?sender, ?err, "ack validate failed");
376                        }
377                        continue;
378                    };
379
380                    // Handle the ack
381                    if let Err(err) = self.handle_ack(&ack).await {
382                        debug!(?err, ?sender, "ack handle failed");
383                        guard.set(Status::Failure);
384                        continue;
385                    }
386
387                    // Update the metrics
388                    debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
389                    guard.set(Status::Success);
390                },
391
392                // Rebroadcast
393                _ = rebroadcast => {
394                    // Get the next index to rebroadcast
395                    let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
396                    trace!("rebroadcasting: index {}", index);
397                    if let Err(err) = self.handle_rebroadcast(index, &mut sender).await {
398                        warn!(?err, ?index, "rebroadcast failed");
399                    };
400                }
401            }
402        }
403
404        // Close journal on shutdown
405        if let Some(journal) = self.journal.take() {
406            journal
407                .close()
408                .await
409                .expect("unable to close aggregation journal");
410        }
411    }
412
413    // ---------- Handling ----------
414
415    /// Handles a digest returned by the automaton.
416    async fn handle_digest(
417        &mut self,
418        index: Index,
419        digest: D,
420        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
421    ) -> Result<(), Error> {
422        // Entry must be `Pending::Unverified`, or return early
423        if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
424            return Err(Error::AckIndex(index));
425        };
426
427        // Move the entry to `Pending::Verified`
428        let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
429            panic!("Pending::Unverified entry not found");
430        };
431        self.pending
432            .insert(index, Pending::Verified(digest, BTreeMap::new()));
433
434        // Handle each `ack` as if it was received over the network. This inserts the values into
435        // the new map, and may form a threshold signature if enough acks are present.
436        // Only process acks that match the verified digest.
437        for epoch_acks in acks.values() {
438            for epoch_ack in epoch_acks.values() {
439                // Drop acks that don't match the verified digest
440                if epoch_ack.item.digest != digest {
441                    continue;
442                }
443
444                // Handle the ack
445                let _ = self.handle_ack(epoch_ack).await;
446            }
447            // Break early if a threshold signature was formed
448            if self.confirmed.contains_key(&index) {
449                break;
450            }
451        }
452
453        // Sign my own ack
454        let ack = self.sign_ack(index, digest).await?;
455
456        // Set the rebroadcast deadline for this index
457        self.rebroadcast_deadlines
458            .put(index, self.context.current() + self.rebroadcast_timeout);
459
460        // Handle ack as if it was received over the network
461        let _ = self.handle_ack(&ack).await;
462
463        // Send ack over the network.
464        self.broadcast(ack, sender).await?;
465
466        Ok(())
467    }
468
469    /// Handles an ack
470    ///
471    /// Returns an error if the ack is invalid, or can be ignored
472    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
473    async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
474        // Get the quorum
475        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
476            return Err(Error::UnknownEpoch(ack.epoch));
477        };
478        let quorum = quorum_from_slice(polynomial);
479
480        // Get the acks and check digest consistency
481        let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
482            None => {
483                // If the index is not in the pending pool, it may be confirmed
484                // (i.e. we have a threshold signature for it).
485                return Err(Error::AckIndex(ack.item.index));
486            }
487            Some(Pending::Unverified(acks)) => acks,
488            Some(Pending::Verified(digest, acks)) => {
489                // If we have a verified digest, ensure the ack matches it
490                if ack.item.digest != *digest {
491                    return Err(Error::AckDigest(ack.item.index));
492                }
493                acks
494            }
495        };
496
497        // Add the partial signature (if not already present)
498        let acks = acks_by_epoch.entry(ack.epoch).or_default();
499        if acks.contains_key(&ack.signature.index) {
500            return Ok(());
501        }
502        acks.insert(ack.signature.index, ack.clone());
503
504        // If there exists a quorum of acks with the same digest (or for the verified digest if it exists), form a threshold signature
505        let partials = acks
506            .values()
507            .filter(|a| a.item.digest == ack.item.digest)
508            .map(|ack| &ack.signature)
509            .collect::<Vec<_>>();
510        if partials.len() >= (quorum as usize) {
511            let item = ack.item.clone();
512            let threshold = threshold_signature_recover::<V, _>(quorum, partials)
513                .expect("Failed to recover threshold signature");
514            self.metrics.threshold.inc();
515            self.handle_threshold(item, threshold).await;
516        }
517
518        Ok(())
519    }
520
521    /// Handles a threshold signature.
522    async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
523        // Check if we already have the threshold
524        let index = item.index;
525        if self.confirmed.contains_key(&index) {
526            return;
527        }
528
529        // Store the threshold
530        let certificate = Certificate {
531            item,
532            signature: threshold,
533        };
534        self.confirmed.insert(index, certificate.clone());
535
536        // Journal and notify the automaton
537        let certified = Activity::Certified(certificate);
538        self.record(certified.clone()).await;
539        self.sync(index).await;
540        self.reporter.report(certified).await;
541
542        // Increase the tip if needed
543        if index == self.tip {
544            // Compute the next tip
545            let mut new_tip = index.saturating_add(1);
546            while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
547                new_tip = new_tip.saturating_add(1);
548            }
549
550            // If the next tip is larger, try to fast-forward the tip (may not be possible)
551            if new_tip > self.tip {
552                self.fast_forward_tip(new_tip).await;
553            }
554        }
555    }
556
557    /// Handles a rebroadcast request for the given index.
558    async fn handle_rebroadcast(
559        &mut self,
560        index: Index,
561        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
562    ) -> Result<(), Error> {
563        let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
564            // The index may already be confirmed; continue silently if so
565            return Ok(());
566        };
567
568        // Get our signature
569        let Some(share) = self.validators.share(self.epoch) else {
570            return Err(Error::UnknownShare(self.epoch));
571        };
572        let ack = acks
573            .get(&self.epoch)
574            .and_then(|acks| acks.get(&share.index).cloned());
575        let ack = match ack {
576            Some(ack) => ack,
577            None => self.sign_ack(index, *digest).await?,
578        };
579
580        // Reinsert the index with a new deadline
581        self.rebroadcast_deadlines
582            .put(index, self.context.current() + self.rebroadcast_timeout);
583
584        // Broadcast the ack to all peers
585        self.broadcast(ack, sender).await
586    }
587
588    // ---------- Validation ----------
589
590    /// Takes a raw ack (from sender) from the p2p network and validates it.
591    ///
592    /// Returns the chunk, epoch, and partial signature if the ack is valid.
593    /// Returns an error if the ack is invalid.
594    fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
595        // Validate epoch
596        {
597            let (eb_lo, eb_hi) = self.epoch_bounds;
598            let bound_lo = self.epoch.saturating_sub(eb_lo);
599            let bound_hi = self.epoch.saturating_add(eb_hi);
600            if ack.epoch < bound_lo || ack.epoch > bound_hi {
601                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
602            }
603        }
604
605        // Validate sender
606        let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
607            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
608        };
609        if sig_index != ack.signature.index {
610            return Err(Error::PeerMismatch);
611        }
612
613        // Collect acks below the tip (if we don't yet have a threshold signature)
614        let activity_threshold = self.tip.saturating_sub(self.activity_timeout);
615        if ack.item.index < activity_threshold {
616            return Err(Error::AckThresholded(ack.item.index));
617        }
618
619        // If the index is above the tip (and the window), ignore for now
620        if ack.item.index >= self.tip + self.window {
621            return Err(Error::AckIndex(ack.item.index));
622        }
623
624        // Validate that we don't already have the ack
625        if self.confirmed.contains_key(&ack.item.index) {
626            return Err(Error::AckThresholded(ack.item.index));
627        }
628        let have_ack = match self.pending.get(&ack.item.index) {
629            None => false,
630            Some(Pending::Unverified(epoch_map)) => epoch_map
631                .get(&ack.epoch)
632                .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
633            Some(Pending::Verified(digest, epoch_map)) => {
634                // While we check this in the `handle_ack` function, checking early here avoids an
635                // unnecessary signature check.
636                if ack.item.digest != *digest {
637                    return Err(Error::AckDigest(ack.item.index));
638                }
639                epoch_map
640                    .get(&ack.epoch)
641                    .is_some_and(|acks| acks.contains_key(&ack.signature.index))
642            }
643        };
644        if have_ack {
645            return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
646        }
647
648        // Validate partial signature
649        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
650            return Err(Error::UnknownEpoch(ack.epoch));
651        };
652        if !ack.verify(&self.namespace, polynomial) {
653            return Err(Error::InvalidAckSignature);
654        }
655
656        Ok(())
657    }
658
659    // ---------- Helpers ----------
660
661    /// Requests the digest from the automaton.
662    ///
663    /// Pending must contain the index.
664    fn get_digest(&mut self, index: Index) {
665        assert!(self.pending.contains_key(&index));
666        let mut automaton = self.automaton.clone();
667        let timer = self.metrics.digest_duration.timer();
668        self.digest_requests.push(async move {
669            let receiver = automaton.propose(index).await;
670            let result = receiver.await.map_err(Error::AppProposeCanceled);
671            DigestRequest {
672                index,
673                result,
674                timer,
675            }
676        });
677    }
678
679    /// Signs an ack for the given index, and digest. Stores the ack in the journal and returns it.
680    /// Returns an error if the share is unknown at the current epoch.
681    async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
682        let Some(share) = self.validators.share(self.epoch) else {
683            return Err(Error::UnknownShare(self.epoch));
684        };
685
686        // Sign the item
687        let item = Item { index, digest };
688        let ack = Ack::sign(&self.namespace, self.epoch, share, item);
689
690        // Journal the ack
691        self.record(Activity::Ack(ack.clone())).await;
692        self.sync(index).await;
693
694        Ok(ack)
695    }
696
697    /// Broadcasts an ack to all peers with the appropriate priority.
698    ///
699    /// Returns an error if the sender returns an error.
700    async fn broadcast(
701        &mut self,
702        ack: Ack<V, D>,
703        sender: &mut WrappedSender<impl Sender<PublicKey = P>, TipAck<V, D>>,
704    ) -> Result<(), Error> {
705        sender
706            .send(
707                Recipients::All,
708                TipAck { ack, tip: self.tip },
709                self.priority_acks,
710            )
711            .await
712            .map_err(|err| {
713                warn!(?err, "failed to send ack");
714                Error::UnableToSendMessage
715            })?;
716        Ok(())
717    }
718
719    /// Returns the next index that we should process. This is the minimum index for
720    /// which we do not have a digest or an outstanding request to the automaton for the digest.
721    fn next(&self) -> Index {
722        let max_pending = self
723            .pending
724            .last_key_value()
725            .map(|(k, _)| k.saturating_add(1))
726            .unwrap_or_default();
727        let max_confirmed = self
728            .confirmed
729            .last_key_value()
730            .map(|(k, _)| k.saturating_add(1))
731            .unwrap_or_default();
732        max(self.tip, max(max_pending, max_confirmed))
733    }
734
735    /// Increases the tip to the given value, pruning stale entries.
736    ///
737    /// # Panics
738    ///
739    /// Panics if the given tip is less-than-or-equal-to the current tip.
740    async fn fast_forward_tip(&mut self, tip: Index) {
741        assert!(tip > self.tip);
742
743        // Prune data structures with buffer to prevent losing certificates
744        let activity_threshold = tip.saturating_sub(self.activity_timeout);
745        self.pending.retain(|index, _| *index >= activity_threshold);
746        self.confirmed
747            .retain(|index, _| *index >= activity_threshold);
748
749        // Add tip to journal
750        self.record(Activity::Tip(tip)).await;
751        self.sync(tip).await;
752        self.reporter.report(Activity::Tip(tip)).await;
753
754        // Prune journal with buffer, ignoring errors
755        let section = self.get_journal_section(activity_threshold);
756        let journal = self.journal.as_mut().expect("journal must be initialized");
757        let _ = journal.prune(section).await;
758
759        // Update the tip
760        self.tip = tip;
761    }
762
763    // ---------- Journal ----------
764
765    /// Returns the section of the journal for the given `index`.
766    fn get_journal_section(&self, index: Index) -> u64 {
767        index / self.journal_heights_per_section
768    }
769
770    /// Replays the journal, updating the state of the engine.
771    /// Returns a list of unverified pending indices that need digest requests.
772    async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) -> Vec<Index> {
773        let mut tip = Index::default();
774        let mut certified = Vec::new();
775        let mut acks = Vec::new();
776        let stream = journal
777            .replay(0, 0, self.journal_replay_buffer)
778            .await
779            .expect("replay failed");
780        pin_mut!(stream);
781        while let Some(msg) = stream.next().await {
782            let (_, _, _, activity) = msg.expect("replay failed");
783            match activity {
784                Activity::Tip(index) => {
785                    tip = max(tip, index);
786                    self.reporter.report(Activity::Tip(index)).await;
787                }
788                Activity::Certified(certificate) => {
789                    certified.push(certificate.clone());
790                    self.reporter.report(Activity::Certified(certificate)).await;
791                }
792                Activity::Ack(ack) => {
793                    acks.push(ack.clone());
794                    self.reporter.report(Activity::Ack(ack)).await;
795                }
796            }
797        }
798
799        // Update the tip to the highest index in the journal
800        self.tip = tip;
801        let activity_threshold = tip.saturating_sub(self.activity_timeout);
802
803        // Add certified items
804        certified
805            .iter()
806            .filter(|certificate| certificate.item.index >= activity_threshold)
807            .for_each(|certificate| {
808                self.confirmed
809                    .insert(certificate.item.index, certificate.clone());
810            });
811
812        // Group acks by index
813        let mut acks_by_index: BTreeMap<Index, Vec<Ack<V, D>>> = BTreeMap::new();
814        for ack in acks {
815            if ack.item.index >= activity_threshold && !self.confirmed.contains_key(&ack.item.index)
816            {
817                acks_by_index.entry(ack.item.index).or_default().push(ack);
818            }
819        }
820
821        // Process each index's acks
822        let mut unverified = Vec::new();
823        for (index, mut acks_group) in acks_by_index {
824            // Check if we have our own ack (which means we've verified the digest)
825            let our_share = self.validators.share(self.epoch);
826            let our_digest = if let Some(share) = our_share {
827                acks_group
828                    .iter()
829                    .find(|ack| ack.epoch == self.epoch && ack.signature.index == share.index)
830                    .map(|ack| ack.item.digest)
831            } else {
832                None
833            };
834
835            // If our_digest exists, delete everything from acks_group that doesn't match it
836            if let Some(digest) = our_digest {
837                acks_group.retain(|other| other.item.digest == digest);
838            }
839
840            // Create a new epoch map
841            let mut epoch_map = BTreeMap::new();
842            for ack in acks_group {
843                epoch_map
844                    .entry(ack.epoch)
845                    .or_insert_with(BTreeMap::new)
846                    .insert(ack.signature.index, ack);
847            }
848
849            // Insert as Verified if we have our own ack (meaning we verified the digest),
850            // otherwise as Unverified
851            match our_digest {
852                Some(digest) => {
853                    self.pending
854                        .insert(index, Pending::Verified(digest, epoch_map));
855
856                    // If we've already generated an ack and it isn't yet confirmed, mark for immediate rebroadcast
857                    self.rebroadcast_deadlines
858                        .put(index, self.context.current());
859                }
860                None => {
861                    self.pending.insert(index, Pending::Unverified(epoch_map));
862
863                    // Add to unverified indices
864                    unverified.push(index);
865                }
866            }
867        }
868
869        // After replay, ensure we have all indices from tip to next in pending or confirmed
870        // to handle the case where we restart and some indices have no acks yet
871        let next = self.next();
872        for index in self.tip..next {
873            // If we already have the index in pending or confirmed, skip
874            if self.pending.contains_key(&index) || self.confirmed.contains_key(&index) {
875                continue;
876            }
877
878            // Add missing index to pending
879            self.pending
880                .insert(index, Pending::Unverified(BTreeMap::new()));
881            unverified.push(index);
882        }
883        info!(self.tip, next, ?unverified, "replayed journal");
884
885        unverified
886    }
887
888    /// Appends an activity to the journal.
889    async fn record(&mut self, activity: Activity<V, D>) {
890        let index = match activity {
891            Activity::Ack(ref ack) => ack.item.index,
892            Activity::Certified(ref certificate) => certificate.item.index,
893            Activity::Tip(index) => index,
894        };
895        let section = self.get_journal_section(index);
896        self.journal
897            .as_mut()
898            .expect("journal must be initialized")
899            .append(section, activity)
900            .await
901            .expect("unable to append to journal");
902    }
903
904    /// Syncs (ensures all data is written to disk).
905    async fn sync(&mut self, index: Index) {
906        let section = self.get_journal_section(index);
907        let journal = self.journal.as_mut().expect("journal must be initialized");
908        journal.sync(section).await.expect("unable to sync journal");
909    }
910}