commonware_consensus/ordered_broadcast/
engine.rs

1//! Engine for the module.
2//!
3//! It is responsible for:
4//! - Proposing nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer's chain
7//! - Assembling certificates from votes for each chunk
8//! - Notifying other actors of new chunks and certificates
9
10use super::{
11    metrics, scheme,
12    types::{
13        Ack, Activity, Chunk, Context, Error, Lock, Node, Parent, Proposal, SequencersProvider,
14    },
15    AckManager, Config, TipManager,
16};
17use crate::{
18    types::{Epoch, EpochDelta},
19    Automaton, Monitor, Relay, Reporter,
20};
21use commonware_codec::Encode;
22use commonware_cryptography::{
23    certificate::{Provider, Scheme},
24    Digest, PublicKey, Signer,
25};
26use commonware_macros::select;
27use commonware_p2p::{
28    utils::codec::{wrap, WrappedSender},
29    Receiver, Recipients, Sender,
30};
31use commonware_runtime::{
32    buffer::PoolRef,
33    spawn_cell,
34    telemetry::metrics::{
35        histogram,
36        status::{CounterExt, GaugeExt, Status},
37    },
38    Clock, ContextCell, Handle, Metrics, Spawner, Storage,
39};
40use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
41use commonware_utils::futures::Pool as FuturesPool;
42use futures::{
43    channel::oneshot,
44    future::{self, Either},
45    pin_mut, StreamExt,
46};
47use rand::{CryptoRng, Rng};
48use std::{
49    collections::BTreeMap,
50    num::NonZeroUsize,
51    time::{Duration, SystemTime},
52};
53use tracing::{debug, error, info, warn};
54
55/// Represents a pending verification request to the automaton.
56struct Verify<C: PublicKey, D: Digest, E: Clock> {
57    timer: histogram::Timer<E>,
58    context: Context<C>,
59    payload: D,
60    result: Result<bool, Error>,
61}
62
63/// Instance of the engine.
64pub struct Engine<
65    E: Clock + Spawner + Rng + CryptoRng + Storage + Metrics,
66    C: Signer,
67    S: SequencersProvider<PublicKey = C::PublicKey>,
68    P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
69    D: Digest,
70    A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
71    R: Relay<Digest = D>,
72    Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
73    M: Monitor<Index = Epoch>,
74> {
75    ////////////////////////////////////////
76    // Interfaces
77    ////////////////////////////////////////
78    context: ContextCell<E>,
79    sequencer_signer: Option<C>,
80    sequencers_provider: S,
81    validators_provider: P,
82    automaton: A,
83    relay: R,
84    monitor: M,
85    reporter: Z,
86
87    ////////////////////////////////////////
88    // Namespace Constants
89    ////////////////////////////////////////
90
91    // The namespace signatures.
92    namespace: Vec<u8>,
93
94    ////////////////////////////////////////
95    // Timeouts
96    ////////////////////////////////////////
97
98    // The configured timeout for rebroadcasting a chunk to all validators
99    rebroadcast_timeout: Duration,
100    rebroadcast_deadline: Option<SystemTime>,
101
102    ////////////////////////////////////////
103    // Pruning
104    ////////////////////////////////////////
105
106    // A tuple representing the epochs to keep in memory.
107    // The first element is the number of old epochs to keep.
108    // The second element is the number of future epochs to accept.
109    //
110    // For example, if the current epoch is 10, and the bounds are (1, 2), then
111    // epochs 9, 10, 11, and 12 are kept (and accepted);
112    // all others are pruned or rejected.
113    epoch_bounds: (EpochDelta, EpochDelta),
114
115    // The number of future heights to accept acks for.
116    // This is used to prevent spam of acks for arbitrary heights.
117    //
118    // For example, if the current tip for a sequencer is at height 100,
119    // and the height_bound is 10, then acks for heights 100-110 are accepted.
120    height_bound: u64,
121
122    ////////////////////////////////////////
123    // Messaging
124    ////////////////////////////////////////
125
126    // A stream of futures.
127    //
128    // Each future represents a verification request to the automaton
129    // that will either timeout or resolve with a boolean.
130    //
131    // There is no limit to the number of futures in this pool, so the automaton
132    // can apply backpressure by dropping the verification requests if necessary.
133    pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
134
135    ////////////////////////////////////////
136    // Storage
137    ////////////////////////////////////////
138
139    // The number of heights per each journal section.
140    journal_heights_per_section: u64,
141
142    // The number of bytes to buffer when replaying a journal.
143    journal_replay_buffer: NonZeroUsize,
144
145    // The size of the write buffer to use for each blob in the journal.
146    journal_write_buffer: NonZeroUsize,
147
148    // A prefix for the journal names.
149    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
150    journal_name_prefix: String,
151
152    // Compression level for the journal.
153    journal_compression: Option<u8>,
154
155    // Buffer pool for the journal.
156    journal_buffer_pool: PoolRef,
157
158    // A map of sequencer public keys to their journals.
159    #[allow(clippy::type_complexity)]
160    journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
161
162    ////////////////////////////////////////
163    // State
164    ////////////////////////////////////////
165
166    // Tracks the current tip for each sequencer.
167    // The tip is a `Node` which is comprised of a `Chunk` and,
168    // if not the genesis chunk for that sequencer,
169    // a certificate over the parent chunk.
170    tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
171
172    // Tracks the acknowledgements for chunks.
173    // This is comprised of votes or certificates.
174    ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
175
176    // The current epoch.
177    epoch: Epoch,
178
179    ////////////////////////////////////////
180    // Network
181    ////////////////////////////////////////
182
183    // Whether to send proposals as priority messages.
184    priority_proposals: bool,
185
186    // Whether to send acks as priority messages.
187    priority_acks: bool,
188
189    ////////////////////////////////////////
190    // Metrics
191    ////////////////////////////////////////
192
193    // Metrics
194    metrics: metrics::Metrics<E>,
195
196    // The timer of my last new proposal
197    propose_timer: Option<histogram::Timer<E>>,
198}
199
200impl<
201        E: Clock + Spawner + Rng + CryptoRng + Storage + Metrics,
202        C: Signer,
203        S: SequencersProvider<PublicKey = C::PublicKey>,
204        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
205        D: Digest,
206        A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
207        R: Relay<Digest = D>,
208        Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
209        M: Monitor<Index = Epoch>,
210    > Engine<E, C, S, P, D, A, R, Z, M>
211{
212    /// Creates a new engine with the given context and configuration.
213    pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M>) -> Self {
214        // TODO(#1833): Metrics should use the post-start context
215        let metrics = metrics::Metrics::init(context.clone());
216
217        Self {
218            context: ContextCell::new(context),
219            sequencer_signer: cfg.sequencer_signer,
220            sequencers_provider: cfg.sequencers_provider,
221            validators_provider: cfg.validators_provider,
222            automaton: cfg.automaton,
223            relay: cfg.relay,
224            reporter: cfg.reporter,
225            monitor: cfg.monitor,
226            namespace: cfg.namespace,
227            rebroadcast_timeout: cfg.rebroadcast_timeout,
228            rebroadcast_deadline: None,
229            epoch_bounds: cfg.epoch_bounds,
230            height_bound: cfg.height_bound,
231            pending_verifies: FuturesPool::default(),
232            journal_heights_per_section: cfg.journal_heights_per_section,
233            journal_replay_buffer: cfg.journal_replay_buffer,
234            journal_write_buffer: cfg.journal_write_buffer,
235            journal_name_prefix: cfg.journal_name_prefix,
236            journal_compression: cfg.journal_compression,
237            journal_buffer_pool: cfg.journal_buffer_pool,
238            journals: BTreeMap::new(),
239            tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
240            ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
241            epoch: Epoch::zero(),
242            priority_proposals: cfg.priority_proposals,
243            priority_acks: cfg.priority_acks,
244            metrics,
245            propose_timer: None,
246        }
247    }
248
249    /// Runs the engine until the context is stopped.
250    ///
251    /// The engine will handle:
252    /// - Requesting and processing proposals from the application
253    /// - Timeouts
254    ///   - Refreshing the Epoch
255    ///   - Rebroadcasting Proposals
256    /// - Messages from the network:
257    ///   - Nodes
258    ///   - Acks
259    pub fn start(
260        mut self,
261        chunk_network: (
262            impl Sender<PublicKey = C::PublicKey>,
263            impl Receiver<PublicKey = C::PublicKey>,
264        ),
265        ack_network: (
266            impl Sender<PublicKey = C::PublicKey>,
267            impl Receiver<PublicKey = C::PublicKey>,
268        ),
269    ) -> Handle<()> {
270        spawn_cell!(self.context, self.run(chunk_network, ack_network).await)
271    }
272
273    /// Inner run loop called by `start`.
274    async fn run(
275        mut self,
276        chunk_network: (
277            impl Sender<PublicKey = C::PublicKey>,
278            impl Receiver<PublicKey = C::PublicKey>,
279        ),
280        ack_network: (
281            impl Sender<PublicKey = C::PublicKey>,
282            impl Receiver<PublicKey = C::PublicKey>,
283        ),
284    ) {
285        let mut node_sender = chunk_network.0;
286        let mut node_receiver = chunk_network.1;
287        let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
288        let mut shutdown = self.context.stopped();
289
290        // Tracks if there is an outstanding proposal request to the automaton.
291        let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
292
293        // Initialize the epoch
294        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
295        self.epoch = latest;
296
297        // Before starting on the main loop, initialize my own sequencer journal
298        // and attempt to rebroadcast if necessary.
299        if let Some(ref signer) = self.sequencer_signer {
300            self.journal_prepare(&signer.public_key()).await;
301            if let Err(err) = self.rebroadcast(&mut node_sender).await {
302                // Rebroadcasting may return a non-critical error, so log the error and continue.
303                info!(?err, "initial rebroadcast failed");
304            }
305        }
306
307        loop {
308            // Request a new proposal if necessary
309            if pending.is_none() {
310                if let Some(context) = self.should_propose() {
311                    let receiver = self.automaton.propose(context.clone()).await;
312                    pending = Some((context, receiver));
313                }
314            }
315
316            // Create deadline futures.
317            //
318            // If the deadline is None, the future will never resolve.
319            let rebroadcast = match self.rebroadcast_deadline {
320                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
321                None => Either::Right(future::pending()),
322            };
323            let propose = match &mut pending {
324                Some((_context, receiver)) => Either::Left(receiver),
325                None => Either::Right(futures::future::pending()),
326            };
327
328            // Process the next event
329            select! {
330                // Handle shutdown signal
331                _ = &mut shutdown => {
332                    debug!("shutdown");
333                    break;
334                },
335
336                // Handle refresh epoch deadline
337                epoch = epoch_updates.next() => {
338                    // Error handling
339                    let Some(epoch) = epoch else {
340                        error!("epoch subscription failed");
341                        break;
342                    };
343
344                    // Refresh the epoch
345                    debug!(current = %self.epoch, new = %epoch, "refresh epoch");
346                    assert!(epoch >= self.epoch);
347                    self.epoch = epoch;
348                    continue;
349                },
350
351                // Handle rebroadcast deadline
352                _ = rebroadcast => {
353                    if let Some(ref signer) = self.sequencer_signer {
354                        debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
355                        if let Err(err) = self.rebroadcast(&mut node_sender).await {
356                            info!(?err, "rebroadcast failed");
357                            continue;
358                        }
359                    }
360                },
361
362                // Propose a new chunk
363                receiver = propose => {
364                    // Clear the pending proposal
365                    let (context, _) = pending.take().unwrap();
366                    debug!(height = context.height, "propose");
367
368                    // Error handling for dropped proposals
369                    let Ok(payload) = receiver else {
370                        warn!(?context, "automaton dropped proposal");
371                        continue;
372                    };
373
374                    // Propose the chunk
375                    if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
376                        warn!(?err, ?context, "propose new failed");
377                        continue;
378                    }
379                },
380
381                // Handle incoming nodes
382                msg = node_receiver.recv() => {
383                    // Error handling
384                    let (sender, msg) = match msg {
385                        Ok(r) => r,
386                        Err(err) => {
387                            error!(?err, "node receiver failed");
388                            break;
389                        }
390                    };
391                    let mut guard = self.metrics.nodes.guard(Status::Invalid);
392
393                    // Decode using staged decoding with epoch-aware certificate bounds
394                    let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
395                        Ok(node) => node,
396                        Err(err) => {
397                            debug!(?err, ?sender, "node decode failed");
398                            continue;
399                        }
400                    };
401                    let result = match self.validate_node(&node, &sender) {
402                        Ok(result) => result,
403                        Err(err) => {
404                            debug!(?err, ?sender, "node validate failed");
405                            continue;
406                        }
407                    };
408
409                    // Initialize journal for sequencer if it does not exist
410                    self.journal_prepare(&sender).await;
411
412                    // Handle the parent certificate
413                    if let Some(parent_chunk) = result {
414                        let parent = node.parent.as_ref().unwrap();
415                        self.handle_certificate(&parent_chunk, parent.epoch, parent.certificate.clone()).await;
416                    }
417
418                    // Process the node
419                    //
420                    // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote
421                    // on it again (our original vote may have been lost).
422                    self.handle_node(&node).await;
423                    debug!(?sender, height=node.chunk.height, "node");
424                    guard.set(Status::Success);
425                },
426
427                // Handle incoming acks
428                msg = ack_receiver.recv() => {
429                    // Error handling
430                    let (sender, msg) = match msg {
431                        Ok(r) => r,
432                        Err(err) => {
433                            warn!(?err, "ack receiver failed");
434                            break;
435                        }
436                    };
437                    let mut guard = self.metrics.acks.guard(Status::Invalid);
438                    let ack = match msg {
439                        Ok(ack) => ack,
440                        Err(err) => {
441                            debug!(?err, ?sender, "ack decode failed");
442                            continue;
443                        }
444                    };
445                    if let Err(err) = self.validate_ack(&ack, &sender) {
446                        debug!(?err, ?sender, "ack validate failed");
447                        continue;
448                    };
449                    if let Err(err) = self.handle_ack(&ack).await {
450                        debug!(?err, ?sender, "ack handle failed");
451                        guard.set(Status::Failure);
452                        continue;
453                    }
454                    debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = ack.chunk.height, "ack");
455                    guard.set(Status::Success);
456                },
457
458                // Handle completed verification futures.
459                verify = self.pending_verifies.next_completed() => {
460                    let Verify { timer, context, payload, result } = verify;
461                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
462                    match result {
463                        Err(err) => {
464                            warn!(?err, ?context, "verified returned error");
465                            self.metrics.verify.inc(Status::Dropped);
466                        }
467                        Ok(false) => {
468                            debug!(?context, "verified was false");
469                            self.metrics.verify.inc(Status::Failure);
470                        }
471                        Ok(true) => {
472                            debug!(?context, "verified");
473                            self.metrics.verify.inc(Status::Success);
474                            if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
475                                debug!(?err, ?context, ?payload, "verified handle failed");
476                            }
477                        },
478                    }
479                },
480            }
481        }
482
483        // Close all journals, regardless of how we exit the loop
484        self.pending_verifies.cancel_all();
485        while let Some((_, journal)) = self.journals.pop_first() {
486            journal.close().await.expect("unable to close journal");
487        }
488    }
489
490    ////////////////////////////////////////
491    // Handling
492    ////////////////////////////////////////
493
494    /// Handles a verified message from the automaton.
495    ///
496    /// This is called when the automaton has verified a payload.
497    /// The chunk will be signed if it matches the current tip.
498    async fn handle_app_verified(
499        &mut self,
500        context: &Context<C::PublicKey>,
501        payload: &D,
502        ack_sender: &mut WrappedSender<
503            impl Sender<PublicKey = C::PublicKey>,
504            Ack<C::PublicKey, P::Scheme, D>,
505        >,
506    ) -> Result<(), Error> {
507        // Get the tip
508        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
509            return Err(Error::AppVerifiedNoTip);
510        };
511
512        // Return early if the height does not match
513        if tip.chunk.height != context.height {
514            return Err(Error::AppVerifiedHeightMismatch);
515        }
516
517        // Return early if the payload does not match
518        if tip.chunk.payload != *payload {
519            return Err(Error::AppVerifiedPayloadMismatch);
520        }
521
522        // Emit the activity
523        self.reporter
524            .report(Activity::Tip(Proposal::new(
525                tip.chunk.clone(),
526                tip.signature.clone(),
527            )))
528            .await;
529
530        // Get the validator scheme for the current epoch
531        let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
532            return Err(Error::UnknownScheme(self.epoch));
533        };
534
535        // Construct vote (if a validator)
536        let Some(ack) = Ack::sign(
537            &self.namespace,
538            scheme.as_ref(),
539            tip.chunk.clone(),
540            self.epoch,
541        ) else {
542            return Err(Error::NotSigner(self.epoch));
543        };
544
545        // Sync the journal to prevent ever acking two conflicting chunks at
546        // the same height, even if the node crashes and restarts.
547        self.journal_sync(&context.sequencer, context.height).await;
548
549        // The recipients are all the validators in the epoch and the sequencer.
550        // The sequencer may or may not be a validator.
551        let recipients = {
552            let validators = scheme.participants();
553            let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
554            if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
555                recipients.push(tip.chunk.sequencer.clone());
556            }
557            recipients
558        };
559
560        // Handle the ack internally
561        self.handle_ack(&ack).await?;
562
563        // Send the ack to the network
564        ack_sender
565            .send(Recipients::Some(recipients), ack, self.priority_acks)
566            .await
567            .map_err(|_| Error::UnableToSendMessage)?;
568
569        Ok(())
570    }
571
572    /// Handles a certificate, either received from a `Node` from the network or generated locally.
573    ///
574    /// The certificate must already be verified.
575    /// If the certificate is new, it is stored and the proof is emitted to the committer.
576    /// If the certificate is already known, it is ignored.
577    async fn handle_certificate(
578        &mut self,
579        chunk: &Chunk<C::PublicKey, D>,
580        epoch: Epoch,
581        certificate: <P::Scheme as Scheme>::Certificate,
582    ) {
583        // Set the certificate, returning early if it already exists
584        if !self.ack_manager.add_certificate(
585            &chunk.sequencer,
586            chunk.height,
587            epoch,
588            certificate.clone(),
589        ) {
590            return;
591        }
592
593        // If the certificate is for my sequencer, record metric
594        if let Some(ref signer) = self.sequencer_signer {
595            if chunk.sequencer == signer.public_key() {
596                self.propose_timer.take();
597            }
598        }
599
600        // Emit the activity
601        self.reporter
602            .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)))
603            .await;
604    }
605
606    /// Handles an ack
607    ///
608    /// Returns an error if the ack is invalid, or can be ignored
609    /// (e.g. already exists, certificate already exists, is outside the epoch bounds, etc.).
610    async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
611        // Get the scheme for the ack's epoch
612        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
613            return Err(Error::UnknownScheme(ack.epoch));
614        };
615
616        // Add the vote. If a new certificate is formed, handle it.
617        if let Some(certificate) = self.ack_manager.add_ack(ack, scheme.as_ref()) {
618            debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = ack.chunk.height, "recovered certificate");
619            self.metrics.certificates.inc();
620            self.handle_certificate(&ack.chunk, ack.epoch, certificate)
621                .await;
622        }
623
624        Ok(())
625    }
626
627    /// Handles a valid `Node` message, storing it as the tip.
628    /// Alerts the automaton of the new node.
629    /// Also appends the `Node` to the journal if it's new.
630    async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
631        // Store the tip
632        let is_new = self.tip_manager.put(node);
633
634        // If a higher height than the previous tip...
635        if is_new {
636            // Update metrics for sequencer height
637            let _ = self
638                .metrics
639                .sequencer_heights
640                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
641                .try_set(node.chunk.height);
642
643            // Append to journal if the `Node` is new, making sure to sync the journal
644            // to prevent sending two conflicting chunks to the automaton, even if
645            // the node crashes and restarts.
646            self.journal_append(node.clone()).await;
647            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
648                .await;
649        }
650
651        // Verify the chunk with the automaton
652        let context = Context {
653            sequencer: node.chunk.sequencer.clone(),
654            height: node.chunk.height,
655        };
656        let payload = node.chunk.payload;
657        let mut automaton = self.automaton.clone();
658        let timer = self.metrics.verify_duration.timer();
659        self.pending_verifies.push(async move {
660            let receiver = automaton.verify(context.clone(), payload).await;
661            let result = receiver.await.map_err(Error::AppVerifyCanceled);
662            Verify {
663                timer,
664                context,
665                payload,
666                result,
667            }
668        });
669    }
670
671    ////////////////////////////////////////
672    // Proposing
673    ////////////////////////////////////////
674
675    /// Returns a `Context` if the engine should request a proposal from the automaton.
676    ///
677    /// Should only be called if the engine is not already waiting for a proposal.
678    fn should_propose(&self) -> Option<Context<C::PublicKey>> {
679        // Return `None` if we don't have a sequencer signer
680        let me = self.sequencer_signer.as_ref()?.public_key();
681
682        // Return `None` if I am not a sequencer in the current epoch
683        self.sequencers_provider
684            .sequencers(self.epoch)?
685            .position(&me)?;
686
687        // Return the next context unless my current tip has no certificate
688        match self.tip_manager.get(&me) {
689            None => Some(Context {
690                sequencer: me,
691                height: 0,
692            }),
693            Some(tip) => self
694                .ack_manager
695                .get_certificate(&me, tip.chunk.height)
696                .map(|_| Context {
697                    sequencer: me,
698                    height: tip.chunk.height.checked_add(1).unwrap(),
699                }),
700        }
701    }
702
703    /// Propose a new chunk to the network.
704    ///
705    /// The result is returned to the caller via the provided channel.
706    /// The proposal is only successful if the parent Chunk and certificate are known.
707    async fn propose(
708        &mut self,
709        context: Context<C::PublicKey>,
710        payload: D,
711        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
712    ) -> Result<(), Error> {
713        let mut guard = self.metrics.propose.guard(Status::Dropped);
714        let signer = self
715            .sequencer_signer
716            .as_mut()
717            .ok_or(Error::IAmNotASequencer(self.epoch))?;
718        let me = signer.public_key();
719
720        // Error-check context sequencer
721        if context.sequencer != me {
722            return Err(Error::ContextSequencer);
723        }
724
725        // Error-check that I am a sequencer in the current epoch
726        self.sequencers_provider
727            .sequencers(self.epoch)
728            .and_then(|s| s.position(&me))
729            .ok_or(Error::IAmNotASequencer(self.epoch))?;
730
731        // Get parent Chunk and certificate
732        let mut height = 0;
733        let mut parent = None;
734        if let Some(tip) = self.tip_manager.get(&me) {
735            // Get certificate, or, if it doesn't exist, return an error
736            let Some((epoch, certificate)) =
737                self.ack_manager.get_certificate(&me, tip.chunk.height)
738            else {
739                return Err(Error::MissingCertificate);
740            };
741
742            // Update height and parent
743            height = tip.chunk.height + 1;
744            parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
745        }
746
747        // Error-check context height
748        if context.height != height {
749            return Err(Error::ContextHeight);
750        }
751
752        // Construct new node
753        let node = Node::sign(&self.namespace, signer, height, payload, parent);
754
755        // Deal with the chunk as if it were received over the network
756        self.handle_node(&node).await;
757
758        // Sync the journal to prevent ever proposing two conflicting chunks
759        // at the same height, even if the node crashes and restarts
760        self.journal_sync(&me, height).await;
761
762        // Record the start time of the proposal
763        self.propose_timer = Some(self.metrics.e2e_duration.timer());
764
765        // Broadcast to network
766        if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
767            guard.set(Status::Failure);
768            return Err(err);
769        };
770
771        // Return success
772        guard.set(Status::Success);
773        Ok(())
774    }
775
776    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all validators.
777    ///
778    /// This is only done if:
779    /// - this instance is the sequencer for the current epoch.
780    /// - this instance has a chunk to rebroadcast.
781    /// - this instance has not yet collected the certificate for the chunk.
782    async fn rebroadcast(
783        &mut self,
784        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
785    ) -> Result<(), Error> {
786        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
787
788        // Unset the rebroadcast deadline
789        self.rebroadcast_deadline = None;
790
791        // Return if we don't have a sequencer signer
792        let signer = self
793            .sequencer_signer
794            .as_ref()
795            .ok_or(Error::IAmNotASequencer(self.epoch))?;
796        let me = signer.public_key();
797
798        // Return if not a sequencer in the current epoch
799        self.sequencers_provider
800            .sequencers(self.epoch)
801            .and_then(|s| s.position(&me))
802            .ok_or(Error::IAmNotASequencer(self.epoch))?;
803
804        // Return if no chunk to rebroadcast
805        let Some(tip) = self.tip_manager.get(&me) else {
806            return Err(Error::NothingToRebroadcast);
807        };
808
809        // Return if certificate already collected
810        if self
811            .ack_manager
812            .get_certificate(&me, tip.chunk.height)
813            .is_some()
814        {
815            return Err(Error::AlreadyCertified);
816        }
817
818        // Broadcast the message, which resets the rebroadcast deadline
819        guard.set(Status::Failure);
820        self.broadcast(tip, node_sender, self.epoch).await?;
821        guard.set(Status::Success);
822        Ok(())
823    }
824
825    /// Send a  `Node` message to all validators in the given epoch.
826    async fn broadcast(
827        &mut self,
828        node: Node<C::PublicKey, P::Scheme, D>,
829        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
830        epoch: Epoch,
831    ) -> Result<(), Error> {
832        // Get the scheme for the epoch to access validators
833        let Some(scheme) = self.validators_provider.scoped(epoch) else {
834            return Err(Error::UnknownScheme(epoch));
835        };
836        let validators = scheme.participants();
837
838        // Tell the relay to broadcast the full data
839        self.relay.broadcast(node.chunk.payload).await;
840
841        // Send the node to all validators
842        node_sender
843            .send(
844                Recipients::Some(validators.iter().cloned().collect()),
845                node.encode().into(),
846                self.priority_proposals,
847            )
848            .await
849            .map_err(|_| Error::BroadcastFailed)?;
850
851        // Set the rebroadcast deadline
852        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
853
854        Ok(())
855    }
856
857    ////////////////////////////////////////
858    // Validation
859    ////////////////////////////////////////
860
861    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
862    ///
863    /// If valid (and not already the tracked tip for the sender), returns the implied
864    /// parent chunk and its certificate.
865    /// Else returns an error if the `Node` is invalid.
866    fn validate_node(
867        &mut self,
868        node: &Node<C::PublicKey, P::Scheme, D>,
869        sender: &C::PublicKey,
870    ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
871        // Verify the sender
872        if node.chunk.sequencer != *sender {
873            return Err(Error::PeerMismatch);
874        }
875
876        // Optimization: If the node is exactly equal to the tip,
877        // don't perform further validation.
878        if let Some(tip) = self.tip_manager.get(sender) {
879            if tip == *node {
880                return Ok(None);
881            }
882        }
883
884        // Validate chunk
885        self.validate_chunk(&node.chunk, self.epoch)?;
886
887        // Verify the node
888        node.verify(
889            &mut self.context,
890            &self.namespace,
891            &self.validators_provider,
892        )
893    }
894
895    /// Takes a raw ack (from sender) from the p2p network and validates it.
896    ///
897    /// Returns the chunk, epoch, and vote if the ack is valid.
898    /// Returns an error if the ack is invalid.
899    fn validate_ack(
900        &self,
901        ack: &Ack<C::PublicKey, P::Scheme, D>,
902        sender: &<P::Scheme as Scheme>::PublicKey,
903    ) -> Result<(), Error> {
904        // Validate chunk
905        self.validate_chunk(&ack.chunk, ack.epoch)?;
906
907        // Get the scheme for the epoch to validate the sender
908        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
909            return Err(Error::UnknownScheme(ack.epoch));
910        };
911
912        // Validate sender is a participant and matches the vote signer
913        let participants = scheme.participants();
914        let Some(index) = participants.iter().position(|p| p == sender) else {
915            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
916        };
917        if index as u32 != ack.attestation.signer {
918            return Err(Error::PeerMismatch);
919        }
920
921        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
922        {
923            let (eb_lo, eb_hi) = self.epoch_bounds;
924            let bound_lo = self.epoch.saturating_sub(eb_lo);
925            let bound_hi = self.epoch.saturating_add(eb_hi);
926            if ack.epoch < bound_lo || ack.epoch > bound_hi {
927                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
928            }
929        }
930
931        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
932        {
933            let bound_lo = self
934                .tip_manager
935                .get(&ack.chunk.sequencer)
936                .map(|t| t.chunk.height)
937                .unwrap_or(0);
938            let bound_hi = bound_lo + self.height_bound;
939            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
940                return Err(Error::AckHeightOutsideBounds(
941                    ack.chunk.height,
942                    bound_lo,
943                    bound_hi,
944                ));
945            }
946        }
947
948        // Validate the vote signature
949        if !ack.verify(&self.namespace, scheme.as_ref()) {
950            return Err(Error::InvalidAckSignature);
951        }
952
953        Ok(())
954    }
955
956    /// Takes a raw chunk from the p2p network and validates it against the epoch.
957    ///
958    /// Returns the chunk if the chunk is valid.
959    /// Returns an error if the chunk is invalid.
960    fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
961        // Verify sequencer
962        if self
963            .sequencers_provider
964            .sequencers(epoch)
965            .and_then(|s| s.position(&chunk.sequencer))
966            .is_none()
967        {
968            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
969        }
970
971        // Verify height
972        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
973            // Height must be at least the tip height
974            match chunk.height.cmp(&tip.chunk.height) {
975                std::cmp::Ordering::Less => {
976                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
977                }
978                std::cmp::Ordering::Equal => {
979                    // Ensure this matches the tip if the height is the same
980                    if tip.chunk.payload != chunk.payload {
981                        return Err(Error::ChunkMismatch(
982                            chunk.sequencer.to_string(),
983                            chunk.height,
984                        ));
985                    }
986                }
987                std::cmp::Ordering::Greater => {}
988            }
989        }
990
991        Ok(())
992    }
993
994    ////////////////////////////////////////
995    // Journal
996    ////////////////////////////////////////
997
998    /// Returns the section of the journal for the given height.
999    const fn get_journal_section(&self, height: u64) -> u64 {
1000        height / self.journal_heights_per_section
1001    }
1002
1003    /// Ensures the journal exists and is initialized for the given sequencer.
1004    /// If the journal does not exist, it is created and replayed.
1005    /// Else, no action is taken.
1006    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1007        // Return early if the journal already exists
1008        if self.journals.contains_key(sequencer) {
1009            return;
1010        }
1011
1012        // Initialize journal
1013        let cfg = JournalConfig {
1014            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1015            compression: self.journal_compression,
1016            codec_config: P::Scheme::certificate_codec_config_unbounded(),
1017            buffer_pool: self.journal_buffer_pool.clone(),
1018            write_buffer: self.journal_write_buffer,
1019        };
1020        let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1021            self.context.with_label("journal").into_present(),
1022            cfg,
1023        )
1024        .await
1025        .expect("unable to init journal");
1026
1027        // Replay journal
1028        {
1029            debug!(?sequencer, "journal replay begin");
1030
1031            // Prepare the stream
1032            let stream = journal
1033                .replay(0, 0, self.journal_replay_buffer)
1034                .await
1035                .expect("unable to replay journal");
1036            pin_mut!(stream);
1037
1038            // Read from the stream, which may be in arbitrary order.
1039            // Remember the highest node height
1040            let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1041            let mut num_items = 0;
1042            while let Some(msg) = stream.next().await {
1043                let (_, _, _, node) = msg.expect("unable to read from journal");
1044                num_items += 1;
1045                let height = node.chunk.height;
1046                match tip {
1047                    None => {
1048                        tip = Some(node);
1049                    }
1050                    Some(ref t) => {
1051                        if height > t.chunk.height {
1052                            tip = Some(node);
1053                        }
1054                    }
1055                }
1056            }
1057
1058            // Set the tip only once. The items from the journal may be in arbitrary order,
1059            // and the tip manager will panic if inserting tips out-of-order.
1060            if let Some(node) = tip.take() {
1061                let is_new = self.tip_manager.put(&node);
1062                assert!(is_new);
1063            }
1064
1065            debug!(?sequencer, ?num_items, "journal replay end");
1066        }
1067
1068        // Store journal
1069        self.journals.insert(sequencer.clone(), journal);
1070    }
1071
1072    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1073    ///
1074    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1075    /// the journal must already be open and replayed.
1076    async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1077        let section = self.get_journal_section(node.chunk.height);
1078        self.journals
1079            .get_mut(&node.chunk.sequencer)
1080            .expect("journal does not exist")
1081            .append(section, node)
1082            .await
1083            .expect("unable to append to journal");
1084    }
1085
1086    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1087    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1088        let section = self.get_journal_section(height);
1089
1090        // Get journal
1091        let journal = self
1092            .journals
1093            .get_mut(sequencer)
1094            .expect("journal does not exist");
1095
1096        // Sync journal
1097        journal.sync(section).await.expect("unable to sync journal");
1098
1099        // Prune journal, ignoring errors
1100        let _ = journal.prune(section).await;
1101    }
1102}