Skip to main content

commonware_consensus/ordered_broadcast/
engine.rs

1//! Engine for the module.
2//!
3//! It is responsible for:
4//! - Proposing nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer's chain
7//! - Assembling certificates from votes for each chunk
8//! - Notifying other actors of new chunks and certificates
9
10use super::{
11    metrics, scheme,
12    types::{
13        Ack, Activity, Chunk, ChunkSigner, ChunkVerifier, Context, Error, Lock, Node, Parent,
14        Proposal, SequencersProvider,
15    },
16    AckManager, Config, TipManager,
17};
18use crate::{
19    types::{Epoch, EpochDelta, Height, HeightDelta},
20    Automaton, Monitor, Relay, Reporter,
21};
22use commonware_codec::Encode;
23use commonware_cryptography::{
24    certificate::{Provider, Scheme},
25    Digest, PublicKey, Signer,
26};
27use commonware_macros::select_loop;
28use commonware_p2p::{
29    utils::codec::{wrap, WrappedSender},
30    Receiver, Recipients, Sender,
31};
32use commonware_parallel::Strategy;
33use commonware_runtime::{
34    buffer::paged::CacheRef,
35    spawn_cell,
36    telemetry::metrics::{
37        histogram,
38        status::{CounterExt, GaugeExt, Status},
39    },
40    Clock, ContextCell, Handle, Metrics, Spawner, Storage,
41};
42use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
43use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, ordered::Quorum};
44use futures::{
45    future::{self, Either},
46    pin_mut, StreamExt,
47};
48use rand_core::CryptoRngCore;
49use std::{
50    collections::BTreeMap,
51    num::{NonZeroU64, NonZeroUsize},
52    time::{Duration, SystemTime},
53};
54use tracing::{debug, error, info, warn};
55
56/// Represents a pending verification request to the automaton.
57struct Verify<C: PublicKey, D: Digest, E: Clock> {
58    timer: histogram::Timer<E>,
59    context: Context<C>,
60    payload: D,
61    result: Result<bool, Error>,
62}
63
64/// Instance of the engine.
65pub struct Engine<
66    E: Clock + Spawner + CryptoRngCore + Storage + Metrics,
67    C: Signer,
68    S: SequencersProvider<PublicKey = C::PublicKey>,
69    P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
70    D: Digest,
71    A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
72    R: Relay<Digest = D>,
73    Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
74    M: Monitor<Index = Epoch>,
75    T: Strategy,
76> {
77    ////////////////////////////////////////
78    // Interfaces
79    ////////////////////////////////////////
80    context: ContextCell<E>,
81    sequencer_signer: Option<ChunkSigner<C>>,
82    sequencers_provider: S,
83    validators_provider: P,
84    automaton: A,
85    relay: R,
86    monitor: M,
87    reporter: Z,
88    strategy: T,
89
90    ////////////////////////////////////////
91    // Namespace Constants
92    ////////////////////////////////////////
93
94    // Verifier for chunk signatures.
95    chunk_verifier: ChunkVerifier,
96
97    ////////////////////////////////////////
98    // Timeouts
99    ////////////////////////////////////////
100
101    // The configured timeout for rebroadcasting a chunk to all validators
102    rebroadcast_timeout: Duration,
103    rebroadcast_deadline: Option<SystemTime>,
104
105    ////////////////////////////////////////
106    // Pruning
107    ////////////////////////////////////////
108
109    // A tuple representing the epochs to keep in memory.
110    // The first element is the number of old epochs to keep.
111    // The second element is the number of future epochs to accept.
112    //
113    // For example, if the current epoch is 10, and the bounds are (1, 2), then
114    // epochs 9, 10, 11, and 12 are kept (and accepted);
115    // all others are pruned or rejected.
116    epoch_bounds: (EpochDelta, EpochDelta),
117
118    // The number of future heights to accept acks for.
119    // This is used to prevent spam of acks for arbitrary heights.
120    //
121    // For example, if the current tip for a sequencer is at height 100,
122    // and the height_bound is 10, then acks for heights 100-110 are accepted.
123    height_bound: HeightDelta,
124
125    ////////////////////////////////////////
126    // Messaging
127    ////////////////////////////////////////
128
129    // A stream of futures.
130    //
131    // Each future represents a verification request to the automaton
132    // that will either timeout or resolve with a boolean.
133    //
134    // There is no limit to the number of futures in this pool, so the automaton
135    // can apply backpressure by dropping the verification requests if necessary.
136    pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
137
138    ////////////////////////////////////////
139    // Storage
140    ////////////////////////////////////////
141
142    // The number of heights per each journal section.
143    journal_heights_per_section: NonZeroU64,
144
145    // The number of bytes to buffer when replaying a journal.
146    journal_replay_buffer: NonZeroUsize,
147
148    // The size of the write buffer to use for each blob in the journal.
149    journal_write_buffer: NonZeroUsize,
150
151    // A prefix for the journal names.
152    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
153    journal_name_prefix: String,
154
155    // Compression level for the journal.
156    journal_compression: Option<u8>,
157
158    // Page cache for the journal.
159    journal_page_cache: CacheRef,
160
161    // A map of sequencer public keys to their journals.
162    #[allow(clippy::type_complexity)]
163    journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
164
165    ////////////////////////////////////////
166    // State
167    ////////////////////////////////////////
168
169    // Tracks the current tip for each sequencer.
170    // The tip is a `Node` which is comprised of a `Chunk` and,
171    // if not the genesis chunk for that sequencer,
172    // a certificate over the parent chunk.
173    tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
174
175    // Tracks the acknowledgements for chunks.
176    // This is comprised of votes or certificates.
177    ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
178
179    // The current epoch.
180    epoch: Epoch,
181
182    ////////////////////////////////////////
183    // Network
184    ////////////////////////////////////////
185
186    // Whether to send proposals as priority messages.
187    priority_proposals: bool,
188
189    // Whether to send acks as priority messages.
190    priority_acks: bool,
191
192    ////////////////////////////////////////
193    // Metrics
194    ////////////////////////////////////////
195
196    // Metrics
197    metrics: metrics::Metrics<E>,
198
199    // The timer of my last new proposal
200    propose_timer: Option<histogram::Timer<E>>,
201}
202
203impl<
204        E: Clock + Spawner + CryptoRngCore + Storage + Metrics,
205        C: Signer,
206        S: SequencersProvider<PublicKey = C::PublicKey>,
207        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
208        D: Digest,
209        A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
210        R: Relay<Digest = D>,
211        Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
212        M: Monitor<Index = Epoch>,
213        T: Strategy,
214    > Engine<E, C, S, P, D, A, R, Z, M, T>
215{
216    /// Creates a new engine with the given context and configuration.
217    pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M, T>) -> Self {
218        // TODO(#1833): Metrics should use the post-start context
219        let metrics = metrics::Metrics::init(context.clone());
220
221        Self {
222            context: ContextCell::new(context),
223            sequencer_signer: cfg.sequencer_signer,
224            sequencers_provider: cfg.sequencers_provider,
225            validators_provider: cfg.validators_provider,
226            automaton: cfg.automaton,
227            relay: cfg.relay,
228            reporter: cfg.reporter,
229            monitor: cfg.monitor,
230            strategy: cfg.strategy,
231            chunk_verifier: cfg.chunk_verifier,
232            rebroadcast_timeout: cfg.rebroadcast_timeout,
233            rebroadcast_deadline: None,
234            epoch_bounds: cfg.epoch_bounds,
235            height_bound: cfg.height_bound,
236            pending_verifies: FuturesPool::default(),
237            journal_heights_per_section: cfg.journal_heights_per_section,
238            journal_replay_buffer: cfg.journal_replay_buffer,
239            journal_write_buffer: cfg.journal_write_buffer,
240            journal_name_prefix: cfg.journal_name_prefix,
241            journal_compression: cfg.journal_compression,
242            journal_page_cache: cfg.journal_page_cache,
243            journals: BTreeMap::new(),
244            tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
245            ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
246            epoch: Epoch::zero(),
247            priority_proposals: cfg.priority_proposals,
248            priority_acks: cfg.priority_acks,
249            metrics,
250            propose_timer: None,
251        }
252    }
253
254    /// Runs the engine until the context is stopped.
255    ///
256    /// The engine will handle:
257    /// - Requesting and processing proposals from the application
258    /// - Timeouts
259    ///   - Refreshing the Epoch
260    ///   - Rebroadcasting Proposals
261    /// - Messages from the network:
262    ///   - Nodes
263    ///   - Acks
264    pub fn start(
265        mut self,
266        chunk_network: (
267            impl Sender<PublicKey = C::PublicKey>,
268            impl Receiver<PublicKey = C::PublicKey>,
269        ),
270        ack_network: (
271            impl Sender<PublicKey = C::PublicKey>,
272            impl Receiver<PublicKey = C::PublicKey>,
273        ),
274    ) -> Handle<()> {
275        spawn_cell!(self.context, self.run(chunk_network, ack_network).await)
276    }
277
278    /// Inner run loop called by `start`.
279    async fn run(
280        mut self,
281        chunk_network: (
282            impl Sender<PublicKey = C::PublicKey>,
283            impl Receiver<PublicKey = C::PublicKey>,
284        ),
285        ack_network: (
286            impl Sender<PublicKey = C::PublicKey>,
287            impl Receiver<PublicKey = C::PublicKey>,
288        ),
289    ) {
290        let mut node_sender = chunk_network.0;
291        let mut node_receiver = chunk_network.1;
292        let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
293
294        // Tracks if there is an outstanding proposal request to the automaton.
295        let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
296
297        // Initialize the epoch
298        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
299        self.epoch = latest;
300
301        // Before starting on the main loop, initialize my own sequencer journal
302        // and attempt to rebroadcast if necessary.
303        if let Some(ref signer) = self.sequencer_signer {
304            self.journal_prepare(&signer.public_key()).await;
305            if let Err(err) = self.rebroadcast(&mut node_sender).await {
306                // Rebroadcasting may return a non-critical error, so log the error and continue.
307                info!(?err, "initial rebroadcast failed");
308            }
309        }
310
311        select_loop! {
312            self.context,
313            on_start => {
314                // Request a new proposal if necessary
315                if pending.is_none() {
316                    if let Some(context) = self.should_propose() {
317                        let receiver = self.automaton.propose(context.clone()).await;
318                        pending = Some((context, receiver));
319                    }
320                }
321
322                // Create deadline futures.
323                //
324                // If the deadline is None, the future will never resolve.
325                let rebroadcast = match self.rebroadcast_deadline {
326                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
327                    None => Either::Right(future::pending()),
328                };
329                let propose = match &mut pending {
330                    Some((_context, receiver)) => Either::Left(receiver),
331                    None => Either::Right(futures::future::pending()),
332                };
333            },
334            on_stopped => {
335                debug!("shutdown");
336            },
337            // Handle refresh epoch deadline
338            Some(epoch) = epoch_updates.recv() else {
339                error!("epoch subscription failed");
340                break;
341            } => {
342                // Refresh the epoch
343                debug!(current = %self.epoch, new = %epoch, "refresh epoch");
344                assert!(epoch >= self.epoch);
345                self.epoch = epoch;
346                continue;
347            },
348
349            // Handle rebroadcast deadline
350            _ = rebroadcast => {
351                if let Some(ref signer) = self.sequencer_signer {
352                    debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
353                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
354                        info!(?err, "rebroadcast failed");
355                        continue;
356                    }
357                }
358            },
359
360            // Propose a new chunk
361            receiver = propose => {
362                // Clear the pending proposal
363                let (context, _) = pending.take().unwrap();
364                debug!(height = %context.height, "propose");
365
366                // Error handling for dropped proposals
367                let Ok(payload) = receiver else {
368                    warn!(?context, "automaton dropped proposal");
369                    continue;
370                };
371
372                // Propose the chunk
373                if let Err(err) = self
374                    .propose(context.clone(), payload, &mut node_sender)
375                    .await
376                {
377                    warn!(?err, ?context, "propose new failed");
378                    continue;
379                }
380            },
381
382            // Handle incoming nodes
383            msg = node_receiver.recv() => {
384                // Error handling
385                let (sender, msg) = match msg {
386                    Ok(r) => r,
387                    Err(err) => {
388                        error!(?err, "node receiver failed");
389                        break;
390                    }
391                };
392                let mut guard = self.metrics.nodes.guard(Status::Invalid);
393
394                // Decode using staged decoding with epoch-aware certificate bounds
395                let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
396                    Ok(node) => node,
397                    Err(err) => {
398                        debug!(?err, ?sender, "node decode failed");
399                        continue;
400                    }
401                };
402                let result = match self.validate_node(&node, &sender) {
403                    Ok(result) => result,
404                    Err(err) => {
405                        debug!(?err, ?sender, "node validate failed");
406                        continue;
407                    }
408                };
409
410                // Initialize journal for sequencer if it does not exist
411                self.journal_prepare(&sender).await;
412
413                // Handle the parent certificate
414                if let Some(parent_chunk) = result {
415                    let parent = node.parent.as_ref().unwrap();
416                    self.handle_certificate(
417                        &parent_chunk,
418                        parent.epoch,
419                        parent.certificate.clone(),
420                    )
421                    .await;
422                }
423
424                // Process the node
425                //
426                // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote
427                // on it again (our original vote may have been lost).
428                self.handle_node(&node).await;
429                debug!(?sender, height = %node.chunk.height, "node");
430                guard.set(Status::Success);
431            },
432
433            // Handle incoming acks
434            msg = ack_receiver.recv() => {
435                // Error handling
436                let (sender, msg) = match msg {
437                    Ok(r) => r,
438                    Err(err) => {
439                        warn!(?err, "ack receiver failed");
440                        break;
441                    }
442                };
443                let mut guard = self.metrics.acks.guard(Status::Invalid);
444                let ack = match msg {
445                    Ok(ack) => ack,
446                    Err(err) => {
447                        debug!(?err, ?sender, "ack decode failed");
448                        continue;
449                    }
450                };
451                if let Err(err) = self.validate_ack(&ack, &sender) {
452                    debug!(?err, ?sender, "ack validate failed");
453                    continue;
454                };
455                if let Err(err) = self.handle_ack(&ack).await {
456                    debug!(?err, ?sender, "ack handle failed");
457                    guard.set(Status::Failure);
458                    continue;
459                }
460                debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack");
461                guard.set(Status::Success);
462            },
463
464            // Handle completed verification futures.
465            verify = self.pending_verifies.next_completed() => {
466                let Verify {
467                    timer,
468                    context,
469                    payload,
470                    result,
471                } = verify;
472                drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
473                match result {
474                    Err(err) => {
475                        warn!(?err, ?context, "verified returned error");
476                        self.metrics.verify.inc(Status::Dropped);
477                    }
478                    Ok(false) => {
479                        debug!(?context, "verified was false");
480                        self.metrics.verify.inc(Status::Failure);
481                    }
482                    Ok(true) => {
483                        debug!(?context, "verified");
484                        self.metrics.verify.inc(Status::Success);
485                        if let Err(err) = self
486                            .handle_app_verified(&context, &payload, &mut ack_sender)
487                            .await
488                        {
489                            debug!(?err, ?context, ?payload, "verified handle failed");
490                        }
491                    }
492                }
493            },
494        }
495
496        // Sync and drop all journals, regardless of how we exit the loop
497        self.pending_verifies.cancel_all();
498        while let Some((_, journal)) = self.journals.pop_first() {
499            journal.sync_all().await.expect("unable to sync journal");
500        }
501    }
502
503    ////////////////////////////////////////
504    // Handling
505    ////////////////////////////////////////
506
507    /// Handles a verified message from the automaton.
508    ///
509    /// This is called when the automaton has verified a payload.
510    /// The chunk will be signed if it matches the current tip.
511    async fn handle_app_verified(
512        &mut self,
513        context: &Context<C::PublicKey>,
514        payload: &D,
515        ack_sender: &mut WrappedSender<
516            impl Sender<PublicKey = C::PublicKey>,
517            Ack<C::PublicKey, P::Scheme, D>,
518        >,
519    ) -> Result<(), Error> {
520        // Get the tip
521        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
522            return Err(Error::AppVerifiedNoTip);
523        };
524
525        // Return early if the height does not match
526        if tip.chunk.height != context.height {
527            return Err(Error::AppVerifiedHeightMismatch);
528        }
529
530        // Return early if the payload does not match
531        if tip.chunk.payload != *payload {
532            return Err(Error::AppVerifiedPayloadMismatch);
533        }
534
535        // Emit the activity
536        self.reporter
537            .report(Activity::Tip(Proposal::new(
538                tip.chunk.clone(),
539                tip.signature.clone(),
540            )))
541            .await;
542
543        // Get the validator scheme for the current epoch
544        let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
545            return Err(Error::UnknownScheme(self.epoch));
546        };
547
548        // Construct vote (if a validator)
549        let Some(ack) = Ack::sign(scheme.as_ref(), tip.chunk.clone(), self.epoch) else {
550            return Err(Error::NotSigner(self.epoch));
551        };
552
553        // Sync the journal to prevent ever acking two conflicting chunks at
554        // the same height, even if the node crashes and restarts.
555        self.journal_sync(&context.sequencer, context.height).await;
556
557        // The recipients are all the validators in the epoch and the sequencer.
558        // The sequencer may or may not be a validator.
559        let recipients = {
560            let validators = scheme.participants();
561            let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
562            if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
563                recipients.push(tip.chunk.sequencer.clone());
564            }
565            recipients
566        };
567
568        // Handle the ack internally
569        self.handle_ack(&ack).await?;
570
571        // Send the ack to the network
572        ack_sender
573            .send(Recipients::Some(recipients), ack, self.priority_acks)
574            .await
575            .map_err(|_| Error::UnableToSendMessage)?;
576
577        Ok(())
578    }
579
580    /// Handles a certificate, either received from a `Node` from the network or generated locally.
581    ///
582    /// The certificate must already be verified.
583    /// If the certificate is new, it is stored and the proof is emitted to the committer.
584    /// If the certificate is already known, it is ignored.
585    async fn handle_certificate(
586        &mut self,
587        chunk: &Chunk<C::PublicKey, D>,
588        epoch: Epoch,
589        certificate: <P::Scheme as Scheme>::Certificate,
590    ) {
591        // Set the certificate, returning early if it already exists
592        if !self.ack_manager.add_certificate(
593            &chunk.sequencer,
594            chunk.height,
595            epoch,
596            certificate.clone(),
597        ) {
598            return;
599        }
600
601        // If the certificate is for my sequencer, record metric
602        if let Some(ref signer) = self.sequencer_signer {
603            if chunk.sequencer == signer.public_key() {
604                self.propose_timer.take();
605            }
606        }
607
608        // Emit the activity
609        self.reporter
610            .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)))
611            .await;
612    }
613
614    /// Handles an ack
615    ///
616    /// Returns an error if the ack is invalid, or can be ignored
617    /// (e.g. already exists, certificate already exists, is outside the epoch bounds, etc.).
618    async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
619        // Get the scheme for the ack's epoch
620        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
621            return Err(Error::UnknownScheme(ack.epoch));
622        };
623
624        // Add the vote. If a new certificate is formed, handle it.
625        if let Some(certificate) = self
626            .ack_manager
627            .add_ack(ack, scheme.as_ref(), &self.strategy)
628        {
629            debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate");
630            self.metrics.certificates.inc();
631            self.handle_certificate(&ack.chunk, ack.epoch, certificate)
632                .await;
633        }
634
635        Ok(())
636    }
637
638    /// Handles a valid `Node` message, storing it as the tip.
639    /// Alerts the automaton of the new node.
640    /// Also appends the `Node` to the journal if it's new.
641    async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
642        // Store the tip
643        let is_new = self.tip_manager.put(node);
644
645        // If a higher height than the previous tip...
646        if is_new {
647            // Update metrics for sequencer height
648            let _ = self
649                .metrics
650                .sequencer_heights
651                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
652                .try_set(node.chunk.height.get());
653
654            // Append to journal if the `Node` is new, making sure to sync the journal
655            // to prevent sending two conflicting chunks to the automaton, even if
656            // the node crashes and restarts.
657            self.journal_append(node.clone()).await;
658            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
659                .await;
660        }
661
662        // Verify the chunk with the automaton
663        let context = Context {
664            sequencer: node.chunk.sequencer.clone(),
665            height: node.chunk.height,
666        };
667        let payload = node.chunk.payload;
668        let mut automaton = self.automaton.clone();
669        let timer = self.metrics.verify_duration.timer();
670        self.pending_verifies.push(async move {
671            let receiver = automaton.verify(context.clone(), payload).await;
672            let result = receiver.await.map_err(Error::AppVerifyCanceled);
673            Verify {
674                timer,
675                context,
676                payload,
677                result,
678            }
679        });
680    }
681
682    ////////////////////////////////////////
683    // Proposing
684    ////////////////////////////////////////
685
686    /// Returns a `Context` if the engine should request a proposal from the automaton.
687    ///
688    /// Should only be called if the engine is not already waiting for a proposal.
689    fn should_propose(&self) -> Option<Context<C::PublicKey>> {
690        // Return `None` if we don't have a sequencer signer
691        let me = self.sequencer_signer.as_ref()?.public_key();
692
693        // Return `None` if I am not a sequencer in the current epoch
694        self.sequencers_provider
695            .sequencers(self.epoch)?
696            .position(&me)?;
697
698        // Return the next context unless my current tip has no certificate
699        match self.tip_manager.get(&me) {
700            None => Some(Context {
701                sequencer: me,
702                height: Height::zero(),
703            }),
704            Some(tip) => self
705                .ack_manager
706                .get_certificate(&me, tip.chunk.height)
707                .map(|_| Context {
708                    sequencer: me,
709                    height: tip.chunk.height.next(),
710                }),
711        }
712    }
713
714    /// Propose a new chunk to the network.
715    ///
716    /// The result is returned to the caller via the provided channel.
717    /// The proposal is only successful if the parent Chunk and certificate are known.
718    async fn propose(
719        &mut self,
720        context: Context<C::PublicKey>,
721        payload: D,
722        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
723    ) -> Result<(), Error> {
724        let mut guard = self.metrics.propose.guard(Status::Dropped);
725        let signer = self
726            .sequencer_signer
727            .as_mut()
728            .ok_or(Error::IAmNotASequencer(self.epoch))?;
729        let me = signer.public_key();
730
731        // Error-check context sequencer
732        if context.sequencer != me {
733            return Err(Error::ContextSequencer);
734        }
735
736        // Error-check that I am a sequencer in the current epoch
737        self.sequencers_provider
738            .sequencers(self.epoch)
739            .and_then(|s| s.position(&me))
740            .ok_or(Error::IAmNotASequencer(self.epoch))?;
741
742        // Get parent Chunk and certificate
743        let mut height = Height::zero();
744        let mut parent = None;
745        if let Some(tip) = self.tip_manager.get(&me) {
746            // Get certificate, or, if it doesn't exist, return an error
747            let Some((epoch, certificate)) =
748                self.ack_manager.get_certificate(&me, tip.chunk.height)
749            else {
750                return Err(Error::MissingCertificate);
751            };
752
753            // Update height and parent
754            height = tip.chunk.height.next();
755            parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
756        }
757
758        // Error-check context height
759        if context.height != height {
760            return Err(Error::ContextHeight);
761        }
762
763        // Construct new node
764        let node = Node::sign(signer, height, payload, parent);
765
766        // Deal with the chunk as if it were received over the network
767        self.handle_node(&node).await;
768
769        // Sync the journal to prevent ever proposing two conflicting chunks
770        // at the same height, even if the node crashes and restarts
771        self.journal_sync(&me, height).await;
772
773        // Record the start time of the proposal
774        self.propose_timer = Some(self.metrics.e2e_duration.timer());
775
776        // Broadcast to network
777        if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
778            guard.set(Status::Failure);
779            return Err(err);
780        };
781
782        // Return success
783        guard.set(Status::Success);
784        Ok(())
785    }
786
787    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all validators.
788    ///
789    /// This is only done if:
790    /// - this instance is the sequencer for the current epoch.
791    /// - this instance has a chunk to rebroadcast.
792    /// - this instance has not yet collected the certificate for the chunk.
793    async fn rebroadcast(
794        &mut self,
795        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
796    ) -> Result<(), Error> {
797        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
798
799        // Unset the rebroadcast deadline
800        self.rebroadcast_deadline = None;
801
802        // Return if we don't have a sequencer signer
803        let signer = self
804            .sequencer_signer
805            .as_ref()
806            .ok_or(Error::IAmNotASequencer(self.epoch))?;
807        let me = signer.public_key();
808
809        // Return if not a sequencer in the current epoch
810        self.sequencers_provider
811            .sequencers(self.epoch)
812            .and_then(|s| s.position(&me))
813            .ok_or(Error::IAmNotASequencer(self.epoch))?;
814
815        // Return if no chunk to rebroadcast
816        let Some(tip) = self.tip_manager.get(&me) else {
817            return Err(Error::NothingToRebroadcast);
818        };
819
820        // Return if certificate already collected
821        if self
822            .ack_manager
823            .get_certificate(&me, tip.chunk.height)
824            .is_some()
825        {
826            return Err(Error::AlreadyCertified);
827        }
828
829        // Broadcast the message, which resets the rebroadcast deadline
830        guard.set(Status::Failure);
831        self.broadcast(tip, node_sender, self.epoch).await?;
832        guard.set(Status::Success);
833        Ok(())
834    }
835
836    /// Send a  `Node` message to all validators in the given epoch.
837    async fn broadcast(
838        &mut self,
839        node: Node<C::PublicKey, P::Scheme, D>,
840        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
841        epoch: Epoch,
842    ) -> Result<(), Error> {
843        // Get the scheme for the epoch to access validators
844        let Some(scheme) = self.validators_provider.scoped(epoch) else {
845            return Err(Error::UnknownScheme(epoch));
846        };
847        let validators = scheme.participants();
848
849        // Tell the relay to broadcast the full data
850        self.relay.broadcast(node.chunk.payload).await;
851
852        // Send the node to all validators
853        node_sender
854            .send(
855                Recipients::Some(validators.iter().cloned().collect()),
856                node.encode(),
857                self.priority_proposals,
858            )
859            .await
860            .map_err(|_| Error::BroadcastFailed)?;
861
862        // Set the rebroadcast deadline
863        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
864
865        Ok(())
866    }
867
868    ////////////////////////////////////////
869    // Validation
870    ////////////////////////////////////////
871
872    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
873    ///
874    /// If valid (and not already the tracked tip for the sender), returns the implied
875    /// parent chunk and its certificate.
876    /// Else returns an error if the `Node` is invalid.
877    fn validate_node(
878        &mut self,
879        node: &Node<C::PublicKey, P::Scheme, D>,
880        sender: &C::PublicKey,
881    ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
882        // Verify the sender
883        if node.chunk.sequencer != *sender {
884            return Err(Error::PeerMismatch);
885        }
886
887        // Optimization: If the node is exactly equal to the tip,
888        // don't perform further validation.
889        if let Some(tip) = self.tip_manager.get(sender) {
890            if tip == *node {
891                return Ok(None);
892            }
893        }
894
895        // Validate chunk
896        self.validate_chunk(&node.chunk, self.epoch)?;
897
898        // Verify the node
899        node.verify(
900            &mut self.context,
901            &self.chunk_verifier,
902            &self.validators_provider,
903            &self.strategy,
904        )
905    }
906
907    /// Takes a raw ack (from sender) from the p2p network and validates it.
908    ///
909    /// Returns the chunk, epoch, and vote if the ack is valid.
910    /// Returns an error if the ack is invalid.
911    fn validate_ack(
912        &mut self,
913        ack: &Ack<C::PublicKey, P::Scheme, D>,
914        sender: &<P::Scheme as Scheme>::PublicKey,
915    ) -> Result<(), Error> {
916        // Validate chunk
917        self.validate_chunk(&ack.chunk, ack.epoch)?;
918
919        // Get the scheme for the epoch to validate the sender
920        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
921            return Err(Error::UnknownScheme(ack.epoch));
922        };
923
924        // Validate sender is a participant and matches the vote signer
925        let participants = scheme.participants();
926        let Some(index) = participants.index(sender) else {
927            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
928        };
929        if index != ack.attestation.signer {
930            return Err(Error::PeerMismatch);
931        }
932
933        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
934        {
935            let (eb_lo, eb_hi) = self.epoch_bounds;
936            let bound_lo = self.epoch.saturating_sub(eb_lo);
937            let bound_hi = self.epoch.saturating_add(eb_hi);
938            if ack.epoch < bound_lo || ack.epoch > bound_hi {
939                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
940            }
941        }
942
943        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
944        {
945            let bound_lo = self
946                .tip_manager
947                .get(&ack.chunk.sequencer)
948                .map(|t| t.chunk.height)
949                .unwrap_or(Height::zero());
950            let bound_hi = bound_lo.saturating_add(self.height_bound);
951            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
952                return Err(Error::AckHeightOutsideBounds(
953                    ack.chunk.height,
954                    bound_lo,
955                    bound_hi,
956                ));
957            }
958        }
959
960        // Validate the vote signature
961        if !ack.verify(&mut self.context, scheme.as_ref(), &self.strategy) {
962            return Err(Error::InvalidAckSignature);
963        }
964
965        Ok(())
966    }
967
968    /// Takes a raw chunk from the p2p network and validates it against the epoch.
969    ///
970    /// Returns the chunk if the chunk is valid.
971    /// Returns an error if the chunk is invalid.
972    fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
973        // Verify sequencer
974        if self
975            .sequencers_provider
976            .sequencers(epoch)
977            .and_then(|s| s.position(&chunk.sequencer))
978            .is_none()
979        {
980            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
981        }
982
983        // Verify height
984        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
985            // Height must be at least the tip height
986            match chunk.height.cmp(&tip.chunk.height) {
987                std::cmp::Ordering::Less => {
988                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
989                }
990                std::cmp::Ordering::Equal => {
991                    // Ensure this matches the tip if the height is the same
992                    if tip.chunk.payload != chunk.payload {
993                        return Err(Error::ChunkMismatch(
994                            chunk.sequencer.to_string(),
995                            chunk.height,
996                        ));
997                    }
998                }
999                std::cmp::Ordering::Greater => {}
1000            }
1001        }
1002
1003        Ok(())
1004    }
1005
1006    ////////////////////////////////////////
1007    // Journal
1008    ////////////////////////////////////////
1009
1010    /// Returns the section of the journal for the given height.
1011    const fn get_journal_section(&self, height: Height) -> u64 {
1012        height.get() / self.journal_heights_per_section.get()
1013    }
1014
1015    /// Ensures the journal exists and is initialized for the given sequencer.
1016    /// If the journal does not exist, it is created and replayed.
1017    /// Else, no action is taken.
1018    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1019        // Return early if the journal already exists
1020        if self.journals.contains_key(sequencer) {
1021            return;
1022        }
1023
1024        // Initialize journal
1025        let cfg = JournalConfig {
1026            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1027            compression: self.journal_compression,
1028            codec_config: P::Scheme::certificate_codec_config_unbounded(),
1029            page_cache: self.journal_page_cache.clone(),
1030            write_buffer: self.journal_write_buffer,
1031        };
1032        let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1033            self.context
1034                .with_label("journal")
1035                .with_attribute("sequencer", sequencer)
1036                .into_present(),
1037            cfg,
1038        )
1039        .await
1040        .expect("unable to init journal");
1041
1042        // Replay journal
1043        {
1044            debug!(?sequencer, "journal replay begin");
1045
1046            // Prepare the stream
1047            let stream = journal
1048                .replay(0, 0, self.journal_replay_buffer)
1049                .await
1050                .expect("unable to replay journal");
1051            pin_mut!(stream);
1052
1053            // Read from the stream, which may be in arbitrary order.
1054            // Remember the highest node height
1055            let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1056            let mut num_items = 0;
1057            while let Some(msg) = stream.next().await {
1058                let (_, _, _, node) = msg.expect("unable to read from journal");
1059                num_items += 1;
1060                let height = node.chunk.height;
1061                match tip {
1062                    None => {
1063                        tip = Some(node);
1064                    }
1065                    Some(ref t) => {
1066                        if height > t.chunk.height {
1067                            tip = Some(node);
1068                        }
1069                    }
1070                }
1071            }
1072
1073            // Set the tip only once. The items from the journal may be in arbitrary order,
1074            // and the tip manager will panic if inserting tips out-of-order.
1075            if let Some(node) = tip.take() {
1076                let is_new = self.tip_manager.put(&node);
1077                assert!(is_new);
1078            }
1079
1080            debug!(?sequencer, ?num_items, "journal replay end");
1081        }
1082
1083        // Store journal
1084        self.journals.insert(sequencer.clone(), journal);
1085    }
1086
1087    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1088    ///
1089    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1090    /// the journal must already be open and replayed.
1091    async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1092        let section = self.get_journal_section(node.chunk.height);
1093        self.journals
1094            .get_mut(&node.chunk.sequencer)
1095            .expect("journal does not exist")
1096            .append(section, node)
1097            .await
1098            .expect("unable to append to journal");
1099    }
1100
1101    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1102    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: Height) {
1103        let section = self.get_journal_section(height);
1104
1105        // Get journal
1106        let journal = self
1107            .journals
1108            .get_mut(sequencer)
1109            .expect("journal does not exist");
1110
1111        // Sync journal
1112        journal.sync(section).await.expect("unable to sync journal");
1113
1114        // Prune journal, ignoring errors
1115        let _ = journal.prune(section).await;
1116    }
1117}