commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Epoch, Error, Index, Item, TipAck},
7    Config,
8};
9use crate::{Automaton, Monitor, Reporter, ThresholdSupervisor};
10use commonware_cryptography::{
11    bls12381::primitives::{group, ops::threshold_signature_recover, poly, variant::Variant},
12    Digest, PublicKey,
13};
14use commonware_macros::select;
15use commonware_p2p::{
16    utils::codec::{wrap, WrappedSender},
17    Blocker, Receiver, Recipients, Sender,
18};
19use commonware_runtime::{
20    telemetry::metrics::{
21        histogram,
22        status::{CounterExt, Status},
23    },
24    Clock, Handle, Metrics, Spawner, Storage,
25};
26use commonware_storage::journal::variable::{Config as JConfig, Journal};
27use commonware_utils::{futures::Pool as FuturesPool, PrioritySet};
28use futures::{
29    future::{self, Either},
30    pin_mut, StreamExt,
31};
32use std::{
33    cmp::max,
34    collections::{BTreeMap, HashMap},
35    marker::PhantomData,
36    time::{Duration, SystemTime},
37};
38use tracing::{debug, error, trace, warn};
39
40/// An entry for an index that does not yet have a threshold signature.
41enum Pending<V: Variant, D: Digest> {
42    /// The automaton has not yet provided the digest for this index.
43    /// The signatures may have arbitrary digests.
44    Unverified(HashMap<Epoch, HashMap<u32, Ack<V, D>>>),
45
46    /// Verified by the automaton. Now stores the digest.
47    Verified(D, HashMap<Epoch, HashMap<u32, Ack<V, D>>>),
48}
49
50/// The type returned by the `pending` pool, used by the application to return which digest is
51/// associated with the given index.
52struct DigestRequest<D: Digest, E: Clock> {
53    /// The index in question.
54    index: Index,
55
56    /// The result of the verification.
57    result: Result<D, Error>,
58
59    /// Records the time taken to get the digest.
60    timer: histogram::Timer<E>,
61}
62
63/// Instance of the engine.
64pub struct Engine<
65    E: Clock + Spawner + Storage + Metrics,
66    P: PublicKey,
67    V: Variant,
68    D: Digest,
69    A: Automaton<Context = Index, Digest = D> + Clone,
70    Z: Reporter<Activity = Activity<V, D>>,
71    M: Monitor<Index = Epoch>,
72    B: Blocker<PublicKey = P>,
73    TSu: ThresholdSupervisor<
74        Index = Epoch,
75        PublicKey = P,
76        Share = group::Share,
77        Identity = poly::Public<V>,
78    >,
79    NetS: Sender<PublicKey = P>,
80    NetR: Receiver<PublicKey = P>,
81> {
82    // ---------- Interfaces ----------
83    context: E,
84    automaton: A,
85    monitor: M,
86    validators: TSu,
87    reporter: Z,
88    blocker: B,
89
90    // ---------- Namespace Constants ----------
91    /// The namespace signatures.
92    namespace: Vec<u8>,
93
94    // Pruning
95    /// A tuple representing the epochs to keep in memory.
96    /// The first element is the number of old epochs to keep.
97    /// The second element is the number of future epochs to accept.
98    ///
99    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
100    /// epochs 9, 10, 11, and 12 are kept (and accepted);
101    /// all others are pruned or rejected.
102    epoch_bounds: (u64, u64),
103
104    /// The concurrent number of chunks to process.
105    window: u64,
106
107    // Messaging
108    /// Pool of pending futures to request a digest from the automaton.
109    digest_requests: FuturesPool<DigestRequest<D, E>>,
110
111    // State
112    /// The current epoch.
113    epoch: Epoch,
114
115    /// The current tip.
116    tip: Index,
117
118    /// Tracks the tips of all validators.
119    safe_tip: SafeTip<P>,
120
121    /// The keys represent the set of all `Index` values for which we are attempting to form a
122    /// threshold signature, but do not yet have one. Values may be [Pending::Unverified] or
123    /// [Pending::Verified], depending on whether the automaton has verified the digest or not.
124    pending: BTreeMap<Index, Pending<V, D>>,
125
126    /// A map of indices with a threshold signature. Cached in memory if needed to send to other peers.
127    confirmed: BTreeMap<Index, (D, V::Signature)>,
128
129    // ---------- Rebroadcasting ----------
130    /// The frequency at which to rebroadcast pending indices.
131    rebroadcast_timeout: Duration,
132
133    /// A set of deadlines for rebroadcasting `Index` values that do not have a threshold signature.
134    rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
135
136    // ---------- Journal ----------
137    /// Journal for storing acks signed by this node.
138    journal: Option<Journal<E, Activity<V, D>>>,
139    journal_partition: String,
140    journal_write_buffer: usize,
141    journal_replay_buffer: usize,
142    journal_heights_per_section: u64,
143    journal_compression: Option<u8>,
144
145    // ---------- Network ----------
146    /// Whether to send acks as priority messages.
147    priority_acks: bool,
148
149    /// The network sender and receiver types.
150    _phantom: PhantomData<(NetS, NetR)>,
151
152    // ---------- Metrics ----------
153    /// Metrics
154    metrics: metrics::Metrics<E>,
155}
156
157impl<
158        E: Clock + Spawner + Storage + Metrics,
159        P: PublicKey,
160        V: Variant,
161        D: Digest,
162        A: Automaton<Context = Index, Digest = D> + Clone,
163        Z: Reporter<Activity = Activity<V, D>>,
164        M: Monitor<Index = Epoch>,
165        B: Blocker<PublicKey = P>,
166        TSu: ThresholdSupervisor<
167            Index = Epoch,
168            PublicKey = P,
169            Share = group::Share,
170            Identity = poly::Public<V>,
171        >,
172        NetS: Sender<PublicKey = P>,
173        NetR: Receiver<PublicKey = P>,
174    > Engine<E, P, V, D, A, Z, M, B, TSu, NetS, NetR>
175{
176    /// Creates a new engine with the given context and configuration.
177    pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
178        let metrics = metrics::Metrics::init(context.clone());
179
180        Self {
181            context,
182            automaton: cfg.automaton,
183            reporter: cfg.reporter,
184            monitor: cfg.monitor,
185            validators: cfg.validators,
186            blocker: cfg.blocker,
187            namespace: cfg.namespace,
188            epoch_bounds: cfg.epoch_bounds,
189            window: cfg.window.into(),
190            epoch: 0,
191            tip: 0,
192            safe_tip: SafeTip::default(),
193            digest_requests: FuturesPool::default(),
194            pending: BTreeMap::new(),
195            confirmed: BTreeMap::new(),
196            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
197            rebroadcast_deadlines: PrioritySet::new(),
198            journal: None,
199            journal_partition: cfg.journal_partition,
200            journal_write_buffer: cfg.journal_write_buffer.into(),
201            journal_replay_buffer: cfg.journal_replay_buffer.into(),
202            journal_heights_per_section: cfg.journal_heights_per_section.into(),
203            journal_compression: cfg.journal_compression,
204            priority_acks: cfg.priority_acks,
205            _phantom: PhantomData,
206            metrics,
207        }
208    }
209
210    /// Runs the engine until the context is stopped.
211    ///
212    /// The engine will handle:
213    /// - Requesting and processing digests from the automaton
214    /// - Timeouts
215    ///   - Refreshing the Epoch
216    ///   - Rebroadcasting Acks
217    /// - Messages from the network:
218    ///   - Acks from other validators
219    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
220        self.context.spawn_ref()(self.run(network))
221    }
222
223    /// Inner run loop called by `start`.
224    async fn run(mut self, (net_sender, net_receiver): (NetS, NetR)) {
225        let (mut net_sender, mut net_receiver) = wrap((), net_sender, net_receiver);
226        let mut shutdown = self.context.stopped();
227
228        // Initialize the epoch
229        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
230        self.epoch = latest;
231
232        // Initialize Journal
233        let journal_cfg = JConfig {
234            partition: self.journal_partition.clone(),
235            compression: self.journal_compression,
236            codec_config: (),
237            write_buffer: self.journal_write_buffer,
238        };
239        let journal = Journal::init(self.context.with_label("journal"), journal_cfg)
240            .await
241            .expect("init failed");
242        self.replay(&journal).await;
243        self.journal = Some(journal);
244
245        // Initialize the tip manager
246        self.safe_tip.init(
247            self.validators
248                .participants(self.epoch)
249                .expect("unknown participants"),
250        );
251
252        loop {
253            self.metrics.tip.set(self.tip as i64);
254
255            // Propose a new digest if we are processing less than the window
256            let next = self.next();
257            if next < self.tip + self.window {
258                trace!("requesting new digest: index {}", next);
259                self.get_digest(next);
260                continue;
261            }
262
263            // Get the rebroadcast deadline for the next index
264            let rebroadcast = match self.rebroadcast_deadlines.peek() {
265                Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
266                None => Either::Right(future::pending()),
267            };
268
269            // Process the next event
270            select! {
271                // Handle shutdown signal
272                _ = &mut shutdown => {
273                    debug!("shutdown");
274                    break;
275                },
276
277                // Handle refresh epoch deadline
278                epoch = epoch_updates.next() => {
279                    // Error handling
280                    let Some(epoch) = epoch else {
281                        error!("epoch subscription failed");
282                        break;
283                    };
284
285                    // Refresh the epoch
286                    debug!(current=self.epoch, new=epoch, "refresh epoch");
287                    assert!(epoch >= self.epoch);
288                    self.epoch = epoch;
289
290                    // Update the tip manager
291                    self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
292
293                    // Update data structures by purging old epochs
294                    let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
295                    self.pending.iter_mut().for_each(|(_, pending)| {
296                        match pending {
297                            Pending::Unverified(acks) => {
298                                acks.retain(|epoch, _| *epoch >= min_epoch);
299                            }
300                            Pending::Verified(_, acks) => {
301                                acks.retain(|epoch, _| *epoch >= min_epoch);
302                            }
303                        }
304                    });
305
306                    continue;
307                },
308
309                // Sign a new ack
310                request = self.digest_requests.next_completed() => {
311                    let DigestRequest { index, result, timer } = request;
312                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
313                    match result {
314                        Err(err) => {
315                            warn!(?err, ?index, "automaton returned error");
316                            self.metrics.digest.inc(Status::Dropped);
317                        }
318                        Ok(digest) => {
319                            if let Err(err) = self.handle_digest(index, digest, &mut net_sender).await {
320                                warn!(?err, ?index, "handle_digest failed");
321                                continue;
322                            }
323                        }
324                    }
325                },
326
327                // Handle incoming acks
328                msg = net_receiver.recv() => {
329                    // Error handling
330                    let (sender, msg) = match msg {
331                        Ok(r) => r,
332                        Err(err) => {
333                            warn!(?err, "ack receiver failed");
334                            break;
335                        }
336                    };
337                    let mut guard = self.metrics.acks.guard(Status::Invalid);
338                    let TipAck { ack, tip } = match msg {
339                        Ok(peer_ack) => peer_ack,
340                        Err(err) => {
341                            warn!(?err, ?sender, "ack decode failed, blocking peer");
342                            self.blocker.block(sender).await;
343                            continue;
344                        }
345                    };
346
347                    // Update the tip manager
348                    if self.safe_tip.update(sender.clone(), tip).is_some() {
349                        // Fast-forward our tip if needed
350                        let safe_tip = self.safe_tip.get();
351                        if safe_tip > self.tip {
352                           self.fast_forward_tip(safe_tip).await;
353                        }
354                    }
355
356                    // Validate that we need to process the ack
357                    if let Err(err) = self.validate_ack(&ack, &sender) {
358                        if err.blockable() {
359                            warn!(?sender, ?err, "blocking peer for validation failure");
360                            self.blocker.block(sender).await;
361                        } else {
362                            debug!(?sender, ?err, "ack validate failed");
363                        }
364                        continue;
365                    };
366
367                    // Handle the ack
368                    if let Err(err) = self.handle_ack(&ack).await {
369                        warn!(?err, ?sender, "ack handle failed");
370                        guard.set(Status::Failure);
371                        continue;
372                    }
373
374                    // Update the metrics
375                    debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
376                    guard.set(Status::Success);
377                },
378
379                // Rebroadcast
380                _ = rebroadcast => {
381                    // Get the next index to rebroadcast
382                    let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
383                    trace!("rebroadcasting: index {}", index);
384                    if let Err(err) = self.handle_rebroadcast(index, &mut net_sender).await {
385                        warn!(?err, ?index, "rebroadcast failed");
386                    };
387                }
388            }
389        }
390
391        // Close journal on shutdown
392        if let Some(journal) = self.journal.take() {
393            journal
394                .close()
395                .await
396                .expect("unable to close aggregation journal");
397        }
398    }
399
400    // ---------- Handling ----------
401
402    /// Handles a digest returned by the automaton.
403    async fn handle_digest(
404        &mut self,
405        index: Index,
406        digest: D,
407        sender: &mut WrappedSender<NetS, TipAck<V, D>>,
408    ) -> Result<(), Error> {
409        // Entry must be `Pending::Unverified`, or return early
410        if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
411            return Err(Error::AckIndex(index));
412        };
413
414        // Move the entry to `Pending::Verified`
415        let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
416            panic!("Pending::Unverified entry not found");
417        };
418        self.pending
419            .insert(index, Pending::Verified(digest, HashMap::new()));
420
421        // Handle each `ack` as if it was received over the network. This inserts the values into
422        // the new map, and may form a threshold signature if enough acks are present.
423        for epoch_acks in acks.values() {
424            for epoch_ack in epoch_acks.values() {
425                let _ = self.handle_ack(epoch_ack).await; // Ignore any errors (e.g. invalid signature)
426            }
427            // Break early if a threshold signature was formed
428            if self.confirmed.contains_key(&index) {
429                break;
430            }
431        }
432
433        // Sign my own ack
434        let ack = self.sign_ack(index, digest).await?;
435
436        // Set the rebroadcast deadline for this index
437        self.set_rebroadcast_deadline(index);
438
439        // Handle ack as if it was received over the network
440        let _ = self.handle_ack(&ack).await; // Ignore any errors (e.g. threshold already exists)
441
442        // Send ack over the network.
443        self.broadcast(ack, sender).await?;
444
445        Ok(())
446    }
447
448    /// Handles an ack
449    ///
450    /// Returns an error if the ack is invalid, or can be ignored
451    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
452    async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
453        // Get the quorum
454        let quorum = self.validators.identity().required();
455
456        // Get the acks
457        let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
458            None => {
459                // If the index is not in the pending pool, it may be confirmed
460                // (i.e. we have a threshold signature for it).
461                return Err(Error::AckIndex(ack.item.index));
462            }
463            Some(Pending::Unverified(acks)) => acks,
464            Some(Pending::Verified(_, acks)) => acks,
465        };
466
467        // We already checked that we don't have the ack
468
469        // Add the partial signature
470        let acks = acks_by_epoch.entry(ack.epoch).or_default();
471        if acks.contains_key(&ack.signature.index) {
472            return Ok(());
473        }
474        acks.insert(ack.signature.index, ack.clone());
475
476        // If a new threshold is formed, handle it
477        if acks.len() >= (quorum as usize) {
478            let item = ack.item.clone();
479            let partials = acks.values().map(|ack| &ack.signature);
480            let threshold = threshold_signature_recover::<V, _>(quorum, partials)
481                .expect("Failed to recover threshold signature");
482            self.metrics.threshold.inc();
483            self.handle_threshold(item, threshold).await;
484        }
485
486        Ok(())
487    }
488
489    /// Handles a threshold signature.
490    async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
491        // Check if we already have the threshold
492        let index = item.index;
493        if self.confirmed.contains_key(&index) {
494            return;
495        }
496
497        // Store the threshold
498        self.confirmed.insert(index, (item.digest, threshold));
499
500        // Journal and notify the automaton
501        let recovered = Activity::Recovered(item, threshold);
502        self.record(recovered.clone()).await;
503        self.sync(index).await;
504        self.reporter.report(recovered).await;
505
506        // Increase the tip if needed
507        if index == self.tip {
508            // Compute the next tip
509            let mut new_tip = index.saturating_add(1);
510            while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
511                new_tip = new_tip.saturating_add(1);
512            }
513
514            // If the next tip is larger, try to fast-forward the tip (may not be possible)
515            if new_tip > self.tip {
516                self.fast_forward_tip(new_tip).await;
517            }
518        }
519    }
520
521    /// Handles a rebroadcast request for the given index.
522    async fn handle_rebroadcast(
523        &mut self,
524        index: Index,
525        sender: &mut WrappedSender<NetS, TipAck<V, D>>,
526    ) -> Result<(), Error> {
527        let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
528            // The index may already be confirmed; continue silently if so
529            return Ok(());
530        };
531
532        // Get our signature
533        let Some(share) = self.validators.share(self.epoch) else {
534            return Err(Error::UnknownShare(self.epoch));
535        };
536        let ack = acks
537            .get(&self.epoch)
538            .and_then(|acks| acks.get(&share.index).cloned());
539        let ack = match ack {
540            Some(ack) => ack,
541            None => self.sign_ack(index, *digest).await?,
542        };
543
544        // Reinsert the index with a new deadline
545        self.set_rebroadcast_deadline(index);
546
547        // Broadcast the ack to all peers
548        self.broadcast(ack, sender).await
549    }
550
551    // ---------- Validation ----------
552
553    /// Takes a raw ack (from sender) from the p2p network and validates it.
554    ///
555    /// Returns the chunk, epoch, and partial signature if the ack is valid.
556    /// Returns an error if the ack is invalid.
557    fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
558        // Validate epoch
559        {
560            let (eb_lo, eb_hi) = self.epoch_bounds;
561            let bound_lo = self.epoch.saturating_sub(eb_lo);
562            let bound_hi = self.epoch.saturating_add(eb_hi);
563            if ack.epoch < bound_lo || ack.epoch > bound_hi {
564                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
565            }
566        }
567
568        // Validate sender
569        let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
570            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
571        };
572        if sig_index != ack.signature.index {
573            return Err(Error::PeerMismatch);
574        }
575
576        // Validate height
577        if ack.item.index < self.tip {
578            return Err(Error::AckThresholded(ack.item.index));
579        }
580        if ack.item.index >= self.tip + self.window {
581            return Err(Error::AckIndex(ack.item.index));
582        }
583
584        // Validate that we don't already have the ack
585        if self.confirmed.contains_key(&ack.item.index) {
586            return Err(Error::AckThresholded(ack.item.index));
587        }
588        let have_ack = match self.pending.get(&ack.item.index) {
589            None => false,
590            Some(Pending::Unverified(epoch_map)) => epoch_map
591                .get(&ack.epoch)
592                .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
593            Some(Pending::Verified(_, epoch_map)) => epoch_map
594                .get(&ack.epoch)
595                .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
596        };
597        if have_ack {
598            return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
599        }
600
601        // Validate partial signature
602        if !ack.verify(&self.namespace, self.validators.identity()) {
603            return Err(Error::InvalidAckSignature);
604        }
605
606        Ok(())
607    }
608
609    // ---------- Helpers ----------
610
611    /// Sets `index` as pending and requests the digest from the automaton.
612    fn get_digest(&mut self, index: Index) {
613        assert!(self
614            .pending
615            .insert(index, Pending::Unverified(HashMap::new()))
616            .is_none());
617
618        let mut automaton = self.automaton.clone();
619        let timer = self.metrics.digest_duration.timer();
620        self.digest_requests.push(async move {
621            let receiver = automaton.propose(index).await;
622            let result = receiver.await.map_err(Error::AppProposeCanceled);
623            DigestRequest {
624                index,
625                result,
626                timer,
627            }
628        });
629    }
630
631    // Sets the rebroadcast deadline for the given `index`.
632    fn set_rebroadcast_deadline(&mut self, index: Index) {
633        self.rebroadcast_deadlines
634            .put(index, self.context.current() + self.rebroadcast_timeout);
635    }
636
637    /// Signs an ack for the given index, and digest. Stores the ack in the journal and returns it.
638    /// Returns an error if the share is unknown at the current epoch.
639    async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
640        let Some(share) = self.validators.share(self.epoch) else {
641            return Err(Error::UnknownShare(self.epoch));
642        };
643
644        // Sign the item
645        let item = Item { index, digest };
646        let ack = Ack::sign(&self.namespace, self.epoch, share, item);
647
648        // Journal the ack
649        self.record(Activity::Ack(ack.clone())).await;
650        self.sync(index).await;
651
652        Ok(ack)
653    }
654
655    /// Broadcasts an ack to all peers with the appropriate priority.
656    ///
657    /// Returns an error if the sender returns an error.
658    async fn broadcast(
659        &mut self,
660        ack: Ack<V, D>,
661        sender: &mut WrappedSender<NetS, TipAck<V, D>>,
662    ) -> Result<(), Error> {
663        sender
664            .send(
665                Recipients::All,
666                TipAck { ack, tip: self.tip },
667                self.priority_acks,
668            )
669            .await
670            .map_err(|err| {
671                warn!(?err, "failed to send ack");
672                Error::UnableToSendMessage
673            })?;
674        Ok(())
675    }
676
677    /// Returns the next index that we should process. This is the minimum index for
678    /// which we do not have a digest or an outstanding request to the automaton for the digest.
679    fn next(&self) -> Index {
680        let max_pending = self
681            .pending
682            .last_key_value()
683            .map(|(k, _)| k.saturating_add(1))
684            .unwrap_or_default();
685        let max_confirmed = self
686            .confirmed
687            .last_key_value()
688            .map(|(k, _)| k.saturating_add(1))
689            .unwrap_or_default();
690        max(self.tip, max(max_pending, max_confirmed))
691    }
692
693    /// Increases the tip to the given value, pruning stale entries.
694    ///
695    /// # Panics
696    ///
697    /// Panics if the given tip is less-than-or-equal-to the current tip.
698    async fn fast_forward_tip(&mut self, tip: Index) {
699        assert!(tip > self.tip);
700
701        // Prune data structures
702        self.pending.retain(|index, _| *index >= tip);
703        self.confirmed.retain(|index, _| *index >= tip);
704
705        // Add tip to journal
706        self.record(Activity::Tip(tip)).await;
707        self.sync(tip).await;
708        self.reporter.report(Activity::Tip(tip)).await;
709
710        // Prune journal, ignoring errors
711        let section = self.get_journal_section(tip);
712        let journal = self.journal.as_mut().expect("journal must be initialized");
713        let _ = journal.prune(section).await;
714
715        // Update the tip
716        self.tip = tip;
717    }
718
719    // ---------- Journal ----------
720
721    /// Returns the section of the journal for the given `index`.
722    fn get_journal_section(&self, index: Index) -> u64 {
723        index / self.journal_heights_per_section
724    }
725
726    /// Replays the journal, updating the state of the engine.
727    async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) {
728        let mut tip = Index::default();
729        let mut recovered = Vec::new();
730        let mut acks = Vec::new();
731        let stream = journal
732            .replay(self.journal_replay_buffer)
733            .await
734            .expect("replay failed");
735        pin_mut!(stream);
736        while let Some(msg) = stream.next().await {
737            let (_, _, _, activity) = msg.expect("replay failed");
738            match activity {
739                Activity::Tip(index) => {
740                    tip = max(tip, index);
741                }
742                Activity::Recovered(item, signature) => {
743                    recovered.push((item, signature));
744                }
745                Activity::Ack(ack) => {
746                    acks.push(ack);
747                }
748            }
749        }
750        // Update the tip to the highest index in the journal
751        self.tip = tip;
752        // Add recovered signatures
753        recovered
754            .iter()
755            .filter(|(item, _)| item.index >= tip)
756            .for_each(|(item, signature)| {
757                self.confirmed.insert(item.index, (item.digest, *signature));
758            });
759        // Add any acks that haven't resulted in a threshold signature
760        acks = acks
761            .into_iter()
762            .filter(|ack| ack.item.index >= tip && !self.confirmed.contains_key(&ack.item.index))
763            .collect::<Vec<_>>();
764        acks.iter().for_each(|ack| {
765            assert!(self
766                .pending
767                .insert(
768                    ack.item.index,
769                    Pending::Verified(
770                        ack.item.digest,
771                        HashMap::from([(
772                            ack.epoch,
773                            HashMap::from([(ack.signature.index, ack.clone())]),
774                        )]),
775                    ),
776                )
777                .is_none());
778            self.set_rebroadcast_deadline(ack.item.index);
779        });
780    }
781
782    /// Appends an activity to the journal.
783    async fn record(&mut self, activity: Activity<V, D>) {
784        let index = match activity {
785            Activity::Ack(ref ack) => ack.item.index,
786            Activity::Recovered(ref item, _) => item.index,
787            Activity::Tip(index) => index,
788        };
789        let section = self.get_journal_section(index);
790        self.journal
791            .as_mut()
792            .expect("journal must be initialized")
793            .append(section, activity)
794            .await
795            .expect("unable to append to journal");
796    }
797
798    /// Syncs (ensures all data is written to disk).
799    async fn sync(&mut self, index: Index) {
800        let section = self.get_journal_section(index);
801        let journal = self.journal.as_mut().expect("journal must be initialized");
802        journal.sync(section).await.expect("unable to sync journal");
803    }
804}