commonware_consensus/aggregation/
engine.rs

1//! Engine for the module.
2
3use super::{
4    metrics,
5    safe_tip::SafeTip,
6    types::{Ack, Activity, Epoch, Error, Index, Item, TipAck},
7    Config,
8};
9use crate::{Automaton, Monitor, Reporter, ThresholdSupervisor};
10use commonware_cryptography::{
11    bls12381::primitives::{group, ops::threshold_signature_recover, poly, variant::Variant},
12    Digest, PublicKey,
13};
14use commonware_macros::select;
15use commonware_p2p::{
16    utils::codec::{wrap, WrappedSender},
17    Blocker, Receiver, Recipients, Sender,
18};
19use commonware_runtime::{
20    buffer::PoolRef,
21    telemetry::metrics::{
22        histogram,
23        status::{CounterExt, Status},
24    },
25    Clock, Handle, Metrics, Spawner, Storage,
26};
27use commonware_storage::journal::variable::{Config as JConfig, Journal};
28use commonware_utils::{futures::Pool as FuturesPool, PrioritySet};
29use futures::{
30    future::{self, Either},
31    pin_mut, StreamExt,
32};
33use std::{
34    cmp::max,
35    collections::{BTreeMap, HashMap},
36    marker::PhantomData,
37    num::NonZeroUsize,
38    time::{Duration, SystemTime},
39};
40use tracing::{debug, error, trace, warn};
41
42/// An entry for an index that does not yet have a threshold signature.
43enum Pending<V: Variant, D: Digest> {
44    /// The automaton has not yet provided the digest for this index.
45    /// The signatures may have arbitrary digests.
46    Unverified(HashMap<Epoch, HashMap<u32, Ack<V, D>>>),
47
48    /// Verified by the automaton. Now stores the digest.
49    Verified(D, HashMap<Epoch, HashMap<u32, Ack<V, D>>>),
50}
51
52/// The type returned by the `pending` pool, used by the application to return which digest is
53/// associated with the given index.
54struct DigestRequest<D: Digest, E: Clock> {
55    /// The index in question.
56    index: Index,
57
58    /// The result of the verification.
59    result: Result<D, Error>,
60
61    /// Records the time taken to get the digest.
62    timer: histogram::Timer<E>,
63}
64
65/// Instance of the engine.
66pub struct Engine<
67    E: Clock + Spawner + Storage + Metrics,
68    P: PublicKey,
69    V: Variant,
70    D: Digest,
71    A: Automaton<Context = Index, Digest = D> + Clone,
72    Z: Reporter<Activity = Activity<V, D>>,
73    M: Monitor<Index = Epoch>,
74    B: Blocker<PublicKey = P>,
75    TSu: ThresholdSupervisor<
76        Index = Epoch,
77        PublicKey = P,
78        Share = group::Share,
79        Identity = poly::Public<V>,
80    >,
81    NetS: Sender<PublicKey = P>,
82    NetR: Receiver<PublicKey = P>,
83> {
84    // ---------- Interfaces ----------
85    context: E,
86    automaton: A,
87    monitor: M,
88    validators: TSu,
89    reporter: Z,
90    blocker: B,
91
92    // ---------- Namespace Constants ----------
93    /// The namespace signatures.
94    namespace: Vec<u8>,
95
96    // Pruning
97    /// A tuple representing the epochs to keep in memory.
98    /// The first element is the number of old epochs to keep.
99    /// The second element is the number of future epochs to accept.
100    ///
101    /// For example, if the current epoch is 10, and the bounds are (1, 2), then
102    /// epochs 9, 10, 11, and 12 are kept (and accepted);
103    /// all others are pruned or rejected.
104    epoch_bounds: (u64, u64),
105
106    /// The concurrent number of chunks to process.
107    window: u64,
108
109    // Messaging
110    /// Pool of pending futures to request a digest from the automaton.
111    digest_requests: FuturesPool<DigestRequest<D, E>>,
112
113    // State
114    /// The current epoch.
115    epoch: Epoch,
116
117    /// The current tip.
118    tip: Index,
119
120    /// Tracks the tips of all validators.
121    safe_tip: SafeTip<P>,
122
123    /// The keys represent the set of all `Index` values for which we are attempting to form a
124    /// threshold signature, but do not yet have one. Values may be [Pending::Unverified] or
125    /// [Pending::Verified], depending on whether the automaton has verified the digest or not.
126    pending: BTreeMap<Index, Pending<V, D>>,
127
128    /// A map of indices with a threshold signature. Cached in memory if needed to send to other peers.
129    confirmed: BTreeMap<Index, (D, V::Signature)>,
130
131    // ---------- Rebroadcasting ----------
132    /// The frequency at which to rebroadcast pending indices.
133    rebroadcast_timeout: Duration,
134
135    /// A set of deadlines for rebroadcasting `Index` values that do not have a threshold signature.
136    rebroadcast_deadlines: PrioritySet<Index, SystemTime>,
137
138    // ---------- Journal ----------
139    /// Journal for storing acks signed by this node.
140    journal: Option<Journal<E, Activity<V, D>>>,
141    journal_partition: String,
142    journal_write_buffer: NonZeroUsize,
143    journal_replay_buffer: NonZeroUsize,
144    journal_heights_per_section: u64,
145    journal_compression: Option<u8>,
146    journal_buffer_pool: PoolRef,
147
148    // ---------- Network ----------
149    /// Whether to send acks as priority messages.
150    priority_acks: bool,
151
152    /// The network sender and receiver types.
153    _phantom: PhantomData<(NetS, NetR)>,
154
155    // ---------- Metrics ----------
156    /// Metrics
157    metrics: metrics::Metrics<E>,
158}
159
160impl<
161        E: Clock + Spawner + Storage + Metrics,
162        P: PublicKey,
163        V: Variant,
164        D: Digest,
165        A: Automaton<Context = Index, Digest = D> + Clone,
166        Z: Reporter<Activity = Activity<V, D>>,
167        M: Monitor<Index = Epoch>,
168        B: Blocker<PublicKey = P>,
169        TSu: ThresholdSupervisor<
170            Index = Epoch,
171            PublicKey = P,
172            Share = group::Share,
173            Identity = poly::Public<V>,
174        >,
175        NetS: Sender<PublicKey = P>,
176        NetR: Receiver<PublicKey = P>,
177    > Engine<E, P, V, D, A, Z, M, B, TSu, NetS, NetR>
178{
179    /// Creates a new engine with the given context and configuration.
180    pub fn new(context: E, cfg: Config<P, V, D, A, Z, M, B, TSu>) -> Self {
181        let metrics = metrics::Metrics::init(context.clone());
182
183        Self {
184            context,
185            automaton: cfg.automaton,
186            reporter: cfg.reporter,
187            monitor: cfg.monitor,
188            validators: cfg.validators,
189            blocker: cfg.blocker,
190            namespace: cfg.namespace,
191            epoch_bounds: cfg.epoch_bounds,
192            window: cfg.window.into(),
193            epoch: 0,
194            tip: 0,
195            safe_tip: SafeTip::default(),
196            digest_requests: FuturesPool::default(),
197            pending: BTreeMap::new(),
198            confirmed: BTreeMap::new(),
199            rebroadcast_timeout: cfg.rebroadcast_timeout.into(),
200            rebroadcast_deadlines: PrioritySet::new(),
201            journal: None,
202            journal_partition: cfg.journal_partition,
203            journal_write_buffer: cfg.journal_write_buffer,
204            journal_replay_buffer: cfg.journal_replay_buffer,
205            journal_heights_per_section: cfg.journal_heights_per_section.into(),
206            journal_compression: cfg.journal_compression,
207            journal_buffer_pool: cfg.journal_buffer_pool,
208            priority_acks: cfg.priority_acks,
209            _phantom: PhantomData,
210            metrics,
211        }
212    }
213
214    /// Runs the engine until the context is stopped.
215    ///
216    /// The engine will handle:
217    /// - Requesting and processing digests from the automaton
218    /// - Timeouts
219    ///   - Refreshing the Epoch
220    ///   - Rebroadcasting Acks
221    /// - Messages from the network:
222    ///   - Acks from other validators
223    pub fn start(mut self, network: (NetS, NetR)) -> Handle<()> {
224        self.context.spawn_ref()(self.run(network))
225    }
226
227    /// Inner run loop called by `start`.
228    async fn run(mut self, (net_sender, net_receiver): (NetS, NetR)) {
229        let (mut net_sender, mut net_receiver) = wrap((), net_sender, net_receiver);
230        let mut shutdown = self.context.stopped();
231
232        // Initialize the epoch
233        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
234        self.epoch = latest;
235
236        // Initialize Journal
237        let journal_cfg = JConfig {
238            partition: self.journal_partition.clone(),
239            compression: self.journal_compression,
240            codec_config: (),
241            buffer_pool: self.journal_buffer_pool.clone(),
242            write_buffer: self.journal_write_buffer,
243        };
244        let journal = Journal::init(self.context.with_label("journal"), journal_cfg)
245            .await
246            .expect("init failed");
247        self.replay(&journal).await;
248        self.journal = Some(journal);
249
250        // Initialize the tip manager
251        self.safe_tip.init(
252            self.validators
253                .participants(self.epoch)
254                .expect("unknown participants"),
255        );
256
257        loop {
258            self.metrics.tip.set(self.tip as i64);
259
260            // Propose a new digest if we are processing less than the window
261            let next = self.next();
262            if next < self.tip + self.window {
263                trace!("requesting new digest: index {}", next);
264                self.get_digest(next);
265                continue;
266            }
267
268            // Get the rebroadcast deadline for the next index
269            let rebroadcast = match self.rebroadcast_deadlines.peek() {
270                Some((_, &deadline)) => Either::Left(self.context.sleep_until(deadline)),
271                None => Either::Right(future::pending()),
272            };
273
274            // Process the next event
275            select! {
276                // Handle shutdown signal
277                _ = &mut shutdown => {
278                    debug!("shutdown");
279                    break;
280                },
281
282                // Handle refresh epoch deadline
283                epoch = epoch_updates.next() => {
284                    // Error handling
285                    let Some(epoch) = epoch else {
286                        error!("epoch subscription failed");
287                        break;
288                    };
289
290                    // Refresh the epoch
291                    debug!(current=self.epoch, new=epoch, "refresh epoch");
292                    assert!(epoch >= self.epoch);
293                    self.epoch = epoch;
294
295                    // Update the tip manager
296                    self.safe_tip.reconcile(self.validators.participants(epoch).unwrap());
297
298                    // Update data structures by purging old epochs
299                    let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
300                    self.pending.iter_mut().for_each(|(_, pending)| {
301                        match pending {
302                            Pending::Unverified(acks) => {
303                                acks.retain(|epoch, _| *epoch >= min_epoch);
304                            }
305                            Pending::Verified(_, acks) => {
306                                acks.retain(|epoch, _| *epoch >= min_epoch);
307                            }
308                        }
309                    });
310
311                    continue;
312                },
313
314                // Sign a new ack
315                request = self.digest_requests.next_completed() => {
316                    let DigestRequest { index, result, timer } = request;
317                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
318                    match result {
319                        Err(err) => {
320                            warn!(?err, ?index, "automaton returned error");
321                            self.metrics.digest.inc(Status::Dropped);
322                        }
323                        Ok(digest) => {
324                            if let Err(err) = self.handle_digest(index, digest, &mut net_sender).await {
325                                warn!(?err, ?index, "handle_digest failed");
326                                continue;
327                            }
328                        }
329                    }
330                },
331
332                // Handle incoming acks
333                msg = net_receiver.recv() => {
334                    // Error handling
335                    let (sender, msg) = match msg {
336                        Ok(r) => r,
337                        Err(err) => {
338                            warn!(?err, "ack receiver failed");
339                            break;
340                        }
341                    };
342                    let mut guard = self.metrics.acks.guard(Status::Invalid);
343                    let TipAck { ack, tip } = match msg {
344                        Ok(peer_ack) => peer_ack,
345                        Err(err) => {
346                            warn!(?err, ?sender, "ack decode failed, blocking peer");
347                            self.blocker.block(sender).await;
348                            continue;
349                        }
350                    };
351
352                    // Update the tip manager
353                    if self.safe_tip.update(sender.clone(), tip).is_some() {
354                        // Fast-forward our tip if needed
355                        let safe_tip = self.safe_tip.get();
356                        if safe_tip > self.tip {
357                           self.fast_forward_tip(safe_tip).await;
358                        }
359                    }
360
361                    // Validate that we need to process the ack
362                    if let Err(err) = self.validate_ack(&ack, &sender) {
363                        if err.blockable() {
364                            warn!(?sender, ?err, "blocking peer for validation failure");
365                            self.blocker.block(sender).await;
366                        } else {
367                            debug!(?sender, ?err, "ack validate failed");
368                        }
369                        continue;
370                    };
371
372                    // Handle the ack
373                    if let Err(err) = self.handle_ack(&ack).await {
374                        warn!(?err, ?sender, "ack handle failed");
375                        guard.set(Status::Failure);
376                        continue;
377                    }
378
379                    // Update the metrics
380                    debug!(?sender, epoch=ack.epoch, index=ack.item.index, "ack");
381                    guard.set(Status::Success);
382                },
383
384                // Rebroadcast
385                _ = rebroadcast => {
386                    // Get the next index to rebroadcast
387                    let (index, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
388                    trace!("rebroadcasting: index {}", index);
389                    if let Err(err) = self.handle_rebroadcast(index, &mut net_sender).await {
390                        warn!(?err, ?index, "rebroadcast failed");
391                    };
392                }
393            }
394        }
395
396        // Close journal on shutdown
397        if let Some(journal) = self.journal.take() {
398            journal
399                .close()
400                .await
401                .expect("unable to close aggregation journal");
402        }
403    }
404
405    // ---------- Handling ----------
406
407    /// Handles a digest returned by the automaton.
408    async fn handle_digest(
409        &mut self,
410        index: Index,
411        digest: D,
412        sender: &mut WrappedSender<NetS, TipAck<V, D>>,
413    ) -> Result<(), Error> {
414        // Entry must be `Pending::Unverified`, or return early
415        if !matches!(self.pending.get(&index), Some(Pending::Unverified(_))) {
416            return Err(Error::AckIndex(index));
417        };
418
419        // Move the entry to `Pending::Verified`
420        let Some(Pending::Unverified(acks)) = self.pending.remove(&index) else {
421            panic!("Pending::Unverified entry not found");
422        };
423        self.pending
424            .insert(index, Pending::Verified(digest, HashMap::new()));
425
426        // Handle each `ack` as if it was received over the network. This inserts the values into
427        // the new map, and may form a threshold signature if enough acks are present.
428        for epoch_acks in acks.values() {
429            for epoch_ack in epoch_acks.values() {
430                let _ = self.handle_ack(epoch_ack).await; // Ignore any errors (e.g. invalid signature)
431            }
432            // Break early if a threshold signature was formed
433            if self.confirmed.contains_key(&index) {
434                break;
435            }
436        }
437
438        // Sign my own ack
439        let ack = self.sign_ack(index, digest).await?;
440
441        // Set the rebroadcast deadline for this index
442        self.set_rebroadcast_deadline(index);
443
444        // Handle ack as if it was received over the network
445        let _ = self.handle_ack(&ack).await; // Ignore any errors (e.g. threshold already exists)
446
447        // Send ack over the network.
448        self.broadcast(ack, sender).await?;
449
450        Ok(())
451    }
452
453    /// Handles an ack
454    ///
455    /// Returns an error if the ack is invalid, or can be ignored
456    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
457    async fn handle_ack(&mut self, ack: &Ack<V, D>) -> Result<(), Error> {
458        // Get the quorum
459        let quorum = self.validators.identity().required();
460
461        // Get the acks
462        let acks_by_epoch = match self.pending.get_mut(&ack.item.index) {
463            None => {
464                // If the index is not in the pending pool, it may be confirmed
465                // (i.e. we have a threshold signature for it).
466                return Err(Error::AckIndex(ack.item.index));
467            }
468            Some(Pending::Unverified(acks)) => acks,
469            Some(Pending::Verified(_, acks)) => acks,
470        };
471
472        // We already checked that we don't have the ack
473
474        // Add the partial signature
475        let acks = acks_by_epoch.entry(ack.epoch).or_default();
476        if acks.contains_key(&ack.signature.index) {
477            return Ok(());
478        }
479        acks.insert(ack.signature.index, ack.clone());
480
481        // If a new threshold is formed, handle it
482        if acks.len() >= (quorum as usize) {
483            let item = ack.item.clone();
484            let partials = acks.values().map(|ack| &ack.signature);
485            let threshold = threshold_signature_recover::<V, _>(quorum, partials)
486                .expect("Failed to recover threshold signature");
487            self.metrics.threshold.inc();
488            self.handle_threshold(item, threshold).await;
489        }
490
491        Ok(())
492    }
493
494    /// Handles a threshold signature.
495    async fn handle_threshold(&mut self, item: Item<D>, threshold: V::Signature) {
496        // Check if we already have the threshold
497        let index = item.index;
498        if self.confirmed.contains_key(&index) {
499            return;
500        }
501
502        // Store the threshold
503        self.confirmed.insert(index, (item.digest, threshold));
504
505        // Journal and notify the automaton
506        let recovered = Activity::Recovered(item, threshold);
507        self.record(recovered.clone()).await;
508        self.sync(index).await;
509        self.reporter.report(recovered).await;
510
511        // Increase the tip if needed
512        if index == self.tip {
513            // Compute the next tip
514            let mut new_tip = index.saturating_add(1);
515            while self.confirmed.contains_key(&new_tip) && new_tip < Index::MAX {
516                new_tip = new_tip.saturating_add(1);
517            }
518
519            // If the next tip is larger, try to fast-forward the tip (may not be possible)
520            if new_tip > self.tip {
521                self.fast_forward_tip(new_tip).await;
522            }
523        }
524    }
525
526    /// Handles a rebroadcast request for the given index.
527    async fn handle_rebroadcast(
528        &mut self,
529        index: Index,
530        sender: &mut WrappedSender<NetS, TipAck<V, D>>,
531    ) -> Result<(), Error> {
532        let Some(Pending::Verified(digest, acks)) = self.pending.get(&index) else {
533            // The index may already be confirmed; continue silently if so
534            return Ok(());
535        };
536
537        // Get our signature
538        let Some(share) = self.validators.share(self.epoch) else {
539            return Err(Error::UnknownShare(self.epoch));
540        };
541        let ack = acks
542            .get(&self.epoch)
543            .and_then(|acks| acks.get(&share.index).cloned());
544        let ack = match ack {
545            Some(ack) => ack,
546            None => self.sign_ack(index, *digest).await?,
547        };
548
549        // Reinsert the index with a new deadline
550        self.set_rebroadcast_deadline(index);
551
552        // Broadcast the ack to all peers
553        self.broadcast(ack, sender).await
554    }
555
556    // ---------- Validation ----------
557
558    /// Takes a raw ack (from sender) from the p2p network and validates it.
559    ///
560    /// Returns the chunk, epoch, and partial signature if the ack is valid.
561    /// Returns an error if the ack is invalid.
562    fn validate_ack(&self, ack: &Ack<V, D>, sender: &P) -> Result<(), Error> {
563        // Validate epoch
564        {
565            let (eb_lo, eb_hi) = self.epoch_bounds;
566            let bound_lo = self.epoch.saturating_sub(eb_lo);
567            let bound_hi = self.epoch.saturating_add(eb_hi);
568            if ack.epoch < bound_lo || ack.epoch > bound_hi {
569                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
570            }
571        }
572
573        // Validate sender
574        let Some(sig_index) = self.validators.is_participant(ack.epoch, sender) else {
575            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
576        };
577        if sig_index != ack.signature.index {
578            return Err(Error::PeerMismatch);
579        }
580
581        // Validate height
582        if ack.item.index < self.tip {
583            return Err(Error::AckThresholded(ack.item.index));
584        }
585        if ack.item.index >= self.tip + self.window {
586            return Err(Error::AckIndex(ack.item.index));
587        }
588
589        // Validate that we don't already have the ack
590        if self.confirmed.contains_key(&ack.item.index) {
591            return Err(Error::AckThresholded(ack.item.index));
592        }
593        let have_ack = match self.pending.get(&ack.item.index) {
594            None => false,
595            Some(Pending::Unverified(epoch_map)) => epoch_map
596                .get(&ack.epoch)
597                .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
598            Some(Pending::Verified(_, epoch_map)) => epoch_map
599                .get(&ack.epoch)
600                .is_some_and(|acks| acks.contains_key(&ack.signature.index)),
601        };
602        if have_ack {
603            return Err(Error::AckDuplicate(sender.to_string(), ack.item.index));
604        }
605
606        // Validate partial signature
607        if !ack.verify(&self.namespace, self.validators.identity()) {
608            return Err(Error::InvalidAckSignature);
609        }
610
611        Ok(())
612    }
613
614    // ---------- Helpers ----------
615
616    /// Sets `index` as pending and requests the digest from the automaton.
617    fn get_digest(&mut self, index: Index) {
618        assert!(self
619            .pending
620            .insert(index, Pending::Unverified(HashMap::new()))
621            .is_none());
622
623        let mut automaton = self.automaton.clone();
624        let timer = self.metrics.digest_duration.timer();
625        self.digest_requests.push(async move {
626            let receiver = automaton.propose(index).await;
627            let result = receiver.await.map_err(Error::AppProposeCanceled);
628            DigestRequest {
629                index,
630                result,
631                timer,
632            }
633        });
634    }
635
636    // Sets the rebroadcast deadline for the given `index`.
637    fn set_rebroadcast_deadline(&mut self, index: Index) {
638        self.rebroadcast_deadlines
639            .put(index, self.context.current() + self.rebroadcast_timeout);
640    }
641
642    /// Signs an ack for the given index, and digest. Stores the ack in the journal and returns it.
643    /// Returns an error if the share is unknown at the current epoch.
644    async fn sign_ack(&mut self, index: Index, digest: D) -> Result<Ack<V, D>, Error> {
645        let Some(share) = self.validators.share(self.epoch) else {
646            return Err(Error::UnknownShare(self.epoch));
647        };
648
649        // Sign the item
650        let item = Item { index, digest };
651        let ack = Ack::sign(&self.namespace, self.epoch, share, item);
652
653        // Journal the ack
654        self.record(Activity::Ack(ack.clone())).await;
655        self.sync(index).await;
656
657        Ok(ack)
658    }
659
660    /// Broadcasts an ack to all peers with the appropriate priority.
661    ///
662    /// Returns an error if the sender returns an error.
663    async fn broadcast(
664        &mut self,
665        ack: Ack<V, D>,
666        sender: &mut WrappedSender<NetS, TipAck<V, D>>,
667    ) -> Result<(), Error> {
668        sender
669            .send(
670                Recipients::All,
671                TipAck { ack, tip: self.tip },
672                self.priority_acks,
673            )
674            .await
675            .map_err(|err| {
676                warn!(?err, "failed to send ack");
677                Error::UnableToSendMessage
678            })?;
679        Ok(())
680    }
681
682    /// Returns the next index that we should process. This is the minimum index for
683    /// which we do not have a digest or an outstanding request to the automaton for the digest.
684    fn next(&self) -> Index {
685        let max_pending = self
686            .pending
687            .last_key_value()
688            .map(|(k, _)| k.saturating_add(1))
689            .unwrap_or_default();
690        let max_confirmed = self
691            .confirmed
692            .last_key_value()
693            .map(|(k, _)| k.saturating_add(1))
694            .unwrap_or_default();
695        max(self.tip, max(max_pending, max_confirmed))
696    }
697
698    /// Increases the tip to the given value, pruning stale entries.
699    ///
700    /// # Panics
701    ///
702    /// Panics if the given tip is less-than-or-equal-to the current tip.
703    async fn fast_forward_tip(&mut self, tip: Index) {
704        assert!(tip > self.tip);
705
706        // Prune data structures
707        self.pending.retain(|index, _| *index >= tip);
708        self.confirmed.retain(|index, _| *index >= tip);
709
710        // Add tip to journal
711        self.record(Activity::Tip(tip)).await;
712        self.sync(tip).await;
713        self.reporter.report(Activity::Tip(tip)).await;
714
715        // Prune journal, ignoring errors
716        let section = self.get_journal_section(tip);
717        let journal = self.journal.as_mut().expect("journal must be initialized");
718        let _ = journal.prune(section).await;
719
720        // Update the tip
721        self.tip = tip;
722    }
723
724    // ---------- Journal ----------
725
726    /// Returns the section of the journal for the given `index`.
727    fn get_journal_section(&self, index: Index) -> u64 {
728        index / self.journal_heights_per_section
729    }
730
731    /// Replays the journal, updating the state of the engine.
732    async fn replay(&mut self, journal: &Journal<E, Activity<V, D>>) {
733        let mut tip = Index::default();
734        let mut recovered = Vec::new();
735        let mut acks = Vec::new();
736        let stream = journal
737            .replay(self.journal_replay_buffer)
738            .await
739            .expect("replay failed");
740        pin_mut!(stream);
741        while let Some(msg) = stream.next().await {
742            let (_, _, _, activity) = msg.expect("replay failed");
743            match activity {
744                Activity::Tip(index) => {
745                    tip = max(tip, index);
746                }
747                Activity::Recovered(item, signature) => {
748                    recovered.push((item, signature));
749                }
750                Activity::Ack(ack) => {
751                    acks.push(ack);
752                }
753            }
754        }
755        // Update the tip to the highest index in the journal
756        self.tip = tip;
757        // Add recovered signatures
758        recovered
759            .iter()
760            .filter(|(item, _)| item.index >= tip)
761            .for_each(|(item, signature)| {
762                self.confirmed.insert(item.index, (item.digest, *signature));
763            });
764        // Add any acks that haven't resulted in a threshold signature
765        acks = acks
766            .into_iter()
767            .filter(|ack| ack.item.index >= tip && !self.confirmed.contains_key(&ack.item.index))
768            .collect::<Vec<_>>();
769        acks.iter().for_each(|ack| {
770            assert!(self
771                .pending
772                .insert(
773                    ack.item.index,
774                    Pending::Verified(
775                        ack.item.digest,
776                        HashMap::from([(
777                            ack.epoch,
778                            HashMap::from([(ack.signature.index, ack.clone())]),
779                        )]),
780                    ),
781                )
782                .is_none());
783            self.set_rebroadcast_deadline(ack.item.index);
784        });
785    }
786
787    /// Appends an activity to the journal.
788    async fn record(&mut self, activity: Activity<V, D>) {
789        let index = match activity {
790            Activity::Ack(ref ack) => ack.item.index,
791            Activity::Recovered(ref item, _) => item.index,
792            Activity::Tip(index) => index,
793        };
794        let section = self.get_journal_section(index);
795        self.journal
796            .as_mut()
797            .expect("journal must be initialized")
798            .append(section, activity)
799            .await
800            .expect("unable to append to journal");
801    }
802
803    /// Syncs (ensures all data is written to disk).
804    async fn sync(&mut self, index: Index) {
805        let section = self.get_journal_section(index);
806        let journal = self.journal.as_mut().expect("journal must be initialized");
807        journal.sync(section).await.expect("unable to sync journal");
808    }
809}