Skip to main content

commonware_consensus/ordered_broadcast/
engine.rs

1//! Engine for the module.
2//!
3//! It is responsible for:
4//! - Proposing nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer's chain
7//! - Assembling certificates from votes for each chunk
8//! - Notifying other actors of new chunks and certificates
9
10use super::{
11    metrics, scheme,
12    types::{
13        Ack, Activity, Chunk, ChunkSigner, ChunkVerifier, Context, Error, Lock, Node, Parent,
14        Proposal, SequencersProvider,
15    },
16    AckManager, Config, TipManager,
17};
18use crate::{
19    types::{Epoch, EpochDelta, Height, HeightDelta},
20    Automaton, Monitor, Relay, Reporter,
21};
22use commonware_codec::Encode;
23use commonware_cryptography::{
24    certificate::{Provider, Scheme},
25    Digest, PublicKey, Signer,
26};
27use commonware_macros::select_loop;
28use commonware_p2p::{
29    utils::codec::{wrap, WrappedSender},
30    Receiver, Recipients, Sender,
31};
32use commonware_parallel::Strategy;
33use commonware_runtime::{
34    buffer::paged::CacheRef,
35    spawn_cell,
36    telemetry::metrics::{histogram, status::Status, GaugeExt},
37    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
38};
39use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
40use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, ordered::Quorum};
41use futures::{
42    future::{self, Either},
43    pin_mut, StreamExt,
44};
45use rand_core::CryptoRngCore;
46use std::{
47    collections::BTreeMap,
48    num::{NonZeroU64, NonZeroUsize},
49    time::{Duration, SystemTime},
50};
51use tracing::{debug, error, info, warn};
52
53/// Represents a pending verification request to the automaton.
54struct Verify<C: PublicKey, D: Digest> {
55    timer: histogram::Timer,
56    context: Context<C>,
57    payload: D,
58    result: Result<bool, Error>,
59}
60
61/// Instance of the engine.
62pub struct Engine<
63    E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics,
64    C: Signer,
65    S: SequencersProvider<PublicKey = C::PublicKey>,
66    P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
67    D: Digest,
68    A: Automaton<Context = Context<C::PublicKey>, Digest = D>,
69    R: Relay<Digest = D, PublicKey = C::PublicKey, Plan = ()>,
70    Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
71    M: Monitor<Index = Epoch>,
72    T: Strategy,
73> {
74    ////////////////////////////////////////
75    // Interfaces
76    ////////////////////////////////////////
77    context: ContextCell<E>,
78    sequencer_signer: Option<ChunkSigner<C>>,
79    sequencers_provider: S,
80    validators_provider: P,
81    automaton: A,
82    relay: R,
83    monitor: M,
84    reporter: Z,
85    strategy: T,
86
87    ////////////////////////////////////////
88    // Namespace Constants
89    ////////////////////////////////////////
90
91    // Verifier for chunk signatures.
92    chunk_verifier: ChunkVerifier,
93
94    ////////////////////////////////////////
95    // Timeouts
96    ////////////////////////////////////////
97
98    // The configured timeout for rebroadcasting a chunk to all validators
99    rebroadcast_timeout: Duration,
100    rebroadcast_deadline: Option<SystemTime>,
101
102    ////////////////////////////////////////
103    // Pruning
104    ////////////////////////////////////////
105
106    // A tuple representing the epochs to keep in memory.
107    // The first element is the number of old epochs to keep.
108    // The second element is the number of future epochs to accept.
109    //
110    // For example, if the current epoch is 10, and the bounds are (1, 2), then
111    // epochs 9, 10, 11, and 12 are kept (and accepted);
112    // all others are pruned or rejected.
113    epoch_bounds: (EpochDelta, EpochDelta),
114
115    // The number of future heights to accept acks for.
116    // This is used to prevent spam of acks for arbitrary heights.
117    //
118    // For example, if the current tip for a sequencer is at height 100,
119    // and the height_bound is 10, then acks for heights 100-110 are accepted.
120    height_bound: HeightDelta,
121
122    ////////////////////////////////////////
123    // Messaging
124    ////////////////////////////////////////
125
126    // A stream of futures.
127    //
128    // Each future represents a verification request to the automaton
129    // that will either timeout or resolve with a boolean.
130    //
131    // There is no limit to the number of futures in this pool, so the automaton
132    // can apply backpressure by dropping the verification requests if necessary.
133    pending_verifies: FuturesPool<Verify<C::PublicKey, D>>,
134
135    ////////////////////////////////////////
136    // Storage
137    ////////////////////////////////////////
138
139    // The number of heights per each journal section.
140    journal_heights_per_section: NonZeroU64,
141
142    // The number of bytes to buffer when replaying a journal.
143    journal_replay_buffer: NonZeroUsize,
144
145    // The size of the write buffer to use for each blob in the journal.
146    journal_write_buffer: NonZeroUsize,
147
148    // A prefix for the journal names.
149    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
150    journal_name_prefix: String,
151
152    // Compression level for the journal.
153    journal_compression: Option<u8>,
154
155    // Page cache for the journal.
156    journal_page_cache: CacheRef,
157
158    // A map of sequencer public keys to their journals.
159    #[allow(clippy::type_complexity)]
160    journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
161
162    ////////////////////////////////////////
163    // State
164    ////////////////////////////////////////
165
166    // Tracks the current tip for each sequencer.
167    // The tip is a `Node` which is comprised of a `Chunk` and,
168    // if not the genesis chunk for that sequencer,
169    // a certificate over the parent chunk.
170    tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
171
172    // Tracks the acknowledgements for chunks.
173    // This is comprised of votes or certificates.
174    ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
175
176    // The current epoch.
177    epoch: Epoch,
178
179    ////////////////////////////////////////
180    // Network
181    ////////////////////////////////////////
182
183    // Whether to send proposals as priority messages.
184    priority_proposals: bool,
185
186    // Whether to send acks as priority messages.
187    priority_acks: bool,
188
189    ////////////////////////////////////////
190    // Metrics
191    ////////////////////////////////////////
192
193    // Metrics
194    metrics: metrics::Metrics<C::PublicKey>,
195
196    // The timer of my last new proposal
197    propose_timer: Option<histogram::Timer>,
198}
199
200impl<
201        E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics,
202        C: Signer,
203        S: SequencersProvider<PublicKey = C::PublicKey>,
204        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
205        D: Digest,
206        A: Automaton<Context = Context<C::PublicKey>, Digest = D>,
207        R: Relay<Digest = D, PublicKey = C::PublicKey, Plan = ()>,
208        Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
209        M: Monitor<Index = Epoch>,
210        T: Strategy,
211    > Engine<E, C, S, P, D, A, R, Z, M, T>
212{
213    /// Creates a new engine with the given context and configuration.
214    pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M, T>) -> Self {
215        let metrics = metrics::Metrics::init(&context);
216
217        Self {
218            context: ContextCell::new(context),
219            sequencer_signer: cfg.sequencer_signer,
220            sequencers_provider: cfg.sequencers_provider,
221            validators_provider: cfg.validators_provider,
222            automaton: cfg.automaton,
223            relay: cfg.relay,
224            reporter: cfg.reporter,
225            monitor: cfg.monitor,
226            strategy: cfg.strategy,
227            chunk_verifier: cfg.chunk_verifier,
228            rebroadcast_timeout: cfg.rebroadcast_timeout,
229            rebroadcast_deadline: None,
230            epoch_bounds: cfg.epoch_bounds,
231            height_bound: cfg.height_bound,
232            pending_verifies: FuturesPool::default(),
233            journal_heights_per_section: cfg.journal_heights_per_section,
234            journal_replay_buffer: cfg.journal_replay_buffer,
235            journal_write_buffer: cfg.journal_write_buffer,
236            journal_name_prefix: cfg.journal_name_prefix,
237            journal_compression: cfg.journal_compression,
238            journal_page_cache: cfg.journal_page_cache,
239            journals: BTreeMap::new(),
240            tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
241            ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
242            epoch: Epoch::zero(),
243            priority_proposals: cfg.priority_proposals,
244            priority_acks: cfg.priority_acks,
245            metrics,
246            propose_timer: None,
247        }
248    }
249
250    /// Runs the engine until the context is stopped.
251    ///
252    /// The engine will handle:
253    /// - Requesting and processing proposals from the application
254    /// - Timeouts
255    ///   - Refreshing the Epoch
256    ///   - Rebroadcasting Proposals
257    /// - Messages from the network:
258    ///   - Nodes
259    ///   - Acks
260    pub fn start(
261        mut self,
262        chunk_network: (
263            impl Sender<PublicKey = C::PublicKey>,
264            impl Receiver<PublicKey = C::PublicKey>,
265        ),
266        ack_network: (
267            impl Sender<PublicKey = C::PublicKey>,
268            impl Receiver<PublicKey = C::PublicKey>,
269        ),
270    ) -> Handle<()> {
271        spawn_cell!(self.context, self.run(chunk_network, ack_network))
272    }
273
274    /// Inner run loop called by `start`.
275    async fn run(
276        mut self,
277        chunk_network: (
278            impl Sender<PublicKey = C::PublicKey>,
279            impl Receiver<PublicKey = C::PublicKey>,
280        ),
281        ack_network: (
282            impl Sender<PublicKey = C::PublicKey>,
283            impl Receiver<PublicKey = C::PublicKey>,
284        ),
285    ) {
286        let mut node_sender = chunk_network.0;
287        let mut node_receiver = chunk_network.1;
288        let (mut ack_sender, mut ack_receiver) = wrap(
289            (),
290            self.context.network_buffer_pool().clone(),
291            ack_network.0,
292            ack_network.1,
293        );
294
295        // Tracks if there is an outstanding proposal request to the automaton.
296        let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
297
298        // Initialize the epoch
299        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
300        self.epoch = latest;
301
302        // Before starting on the main loop, initialize my own sequencer journal
303        // and attempt to rebroadcast if necessary.
304        if let Some(ref signer) = self.sequencer_signer {
305            self.journal_prepare(&signer.public_key()).await;
306            if let Err(err) = self.rebroadcast(&mut node_sender) {
307                // Rebroadcasting may return a non-critical error, so log the error and continue.
308                info!(?err, "initial rebroadcast failed");
309            }
310        }
311
312        select_loop! {
313            self.context,
314            on_start => {
315                // Request a new proposal if necessary
316                if pending.is_none() {
317                    if let Some(context) = self.should_propose() {
318                        let receiver = self.automaton.propose(context.clone()).await;
319                        pending = Some((context, receiver));
320                    }
321                }
322
323                // Create deadline futures.
324                //
325                // If the deadline is None, the future will never resolve.
326                let rebroadcast = match self.rebroadcast_deadline {
327                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
328                    None => Either::Right(future::pending()),
329                };
330                let propose = match &mut pending {
331                    Some((_context, receiver)) => Either::Left(receiver),
332                    None => Either::Right(futures::future::pending()),
333                };
334            },
335            on_stopped => {
336                debug!("shutdown");
337            },
338            // Handle refresh epoch deadline
339            Some(epoch) = epoch_updates.recv() else {
340                error!("epoch subscription failed");
341                break;
342            } => {
343                // Refresh the epoch
344                debug!(current = %self.epoch, new = %epoch, "refresh epoch");
345                assert!(epoch >= self.epoch);
346                self.epoch = epoch;
347                continue;
348            },
349
350            // Handle rebroadcast deadline
351            _ = rebroadcast => {
352                if let Some(ref signer) = self.sequencer_signer {
353                    debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
354                    if let Err(err) = self.rebroadcast(&mut node_sender) {
355                        info!(?err, "rebroadcast failed");
356                        continue;
357                    }
358                }
359            },
360
361            // Propose a new chunk
362            receiver = propose => {
363                // Clear the pending proposal
364                let (context, _) = pending.take().unwrap();
365                debug!(height = %context.height, "propose");
366
367                // Error handling for dropped proposals
368                let Ok(payload) = receiver else {
369                    warn!(?context, "automaton dropped proposal");
370                    continue;
371                };
372
373                // Propose the chunk
374                if let Err(err) = self
375                    .propose(context.clone(), payload, &mut node_sender)
376                    .await
377                {
378                    warn!(?err, ?context, "propose new failed");
379                    continue;
380                }
381            },
382
383            // Handle incoming nodes
384            msg = node_receiver.recv() => {
385                // Error handling
386                let (sender, msg) = match msg {
387                    Ok(r) => r,
388                    Err(err) => {
389                        error!(?err, "node receiver failed");
390                        break;
391                    }
392                };
393                let mut guard = self.metrics.nodes.guard(Status::Invalid);
394
395                // Decode using staged decoding with epoch-aware certificate bounds
396                let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
397                    Ok(node) => node,
398                    Err(err) => {
399                        debug!(?err, ?sender, "node decode failed");
400                        continue;
401                    }
402                };
403                let result = match self.validate_node(&node, &sender) {
404                    Ok(result) => result,
405                    Err(err) => {
406                        debug!(?err, ?sender, "node validate failed");
407                        continue;
408                    }
409                };
410
411                // Initialize journal for sequencer if it does not exist
412                self.journal_prepare(&sender).await;
413
414                // Handle the parent certificate
415                if let Some(parent_chunk) = result {
416                    let parent = node.parent.as_ref().unwrap();
417                    self.handle_certificate(
418                        &parent_chunk,
419                        parent.epoch,
420                        parent.certificate.clone(),
421                    );
422                }
423
424                // Process the node
425                //
426                // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote
427                // on it again (our original vote may have been lost).
428                self.handle_node(&node).await;
429                debug!(?sender, height = %node.chunk.height, "node");
430                guard.set(Status::Success);
431            },
432
433            // Handle incoming acks
434            msg = ack_receiver.recv() => {
435                // Error handling
436                let (sender, msg) = match msg {
437                    Ok(r) => r,
438                    Err(err) => {
439                        warn!(?err, "ack receiver failed");
440                        break;
441                    }
442                };
443                let mut guard = self.metrics.acks.guard(Status::Invalid);
444                let ack = match msg {
445                    Ok(ack) => ack,
446                    Err(err) => {
447                        debug!(?err, ?sender, "ack decode failed");
448                        continue;
449                    }
450                };
451                if let Err(err) = self.validate_ack(&ack, &sender) {
452                    debug!(?err, ?sender, "ack validate failed");
453                    continue;
454                };
455                if let Err(err) = self.handle_ack(&ack) {
456                    debug!(?err, ?sender, "ack handle failed");
457                    guard.set(Status::Failure);
458                    continue;
459                }
460                debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack");
461                guard.set(Status::Success);
462            },
463
464            // Handle completed verification futures.
465            verify = self.pending_verifies.next_completed() => {
466                let Verify {
467                    timer,
468                    context,
469                    payload,
470                    result,
471                } = verify;
472                match result {
473                    Err(err) => {
474                        warn!(?err, ?context, "verified returned error");
475                        self.metrics.verify.inc(Status::Dropped);
476                    }
477                    Ok(false) => {
478                        timer.observe(self.context.as_ref());
479                        debug!(?context, "verified was false");
480                        self.metrics.verify.inc(Status::Failure);
481                    }
482                    Ok(true) => {
483                        timer.observe(self.context.as_ref());
484                        debug!(?context, "verified");
485                        self.metrics.verify.inc(Status::Success);
486                        if let Err(err) = self
487                            .handle_app_verified(&context, &payload, &mut ack_sender)
488                            .await
489                        {
490                            debug!(?err, ?context, ?payload, "verified handle failed");
491                        }
492                    }
493                }
494            },
495        }
496
497        // Sync and drop all journals, regardless of how we exit the loop
498        self.pending_verifies.cancel_all();
499        while let Some((_, journal)) = self.journals.pop_first() {
500            journal.sync_all().await.expect("unable to sync journal");
501        }
502    }
503
504    ////////////////////////////////////////
505    // Handling
506    ////////////////////////////////////////
507
508    /// Handles a verified message from the automaton.
509    ///
510    /// This is called when the automaton has verified a payload.
511    /// The chunk will be signed if it matches the current tip.
512    async fn handle_app_verified(
513        &mut self,
514        context: &Context<C::PublicKey>,
515        payload: &D,
516        ack_sender: &mut WrappedSender<
517            impl Sender<PublicKey = C::PublicKey>,
518            Ack<C::PublicKey, P::Scheme, D>,
519        >,
520    ) -> Result<(), Error> {
521        // Get the tip
522        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
523            return Err(Error::AppVerifiedNoTip);
524        };
525
526        // Return early if the height does not match
527        if tip.chunk.height != context.height {
528            return Err(Error::AppVerifiedHeightMismatch);
529        }
530
531        // Return early if the payload does not match
532        if tip.chunk.payload != *payload {
533            return Err(Error::AppVerifiedPayloadMismatch);
534        }
535
536        // Emit the activity
537        self.reporter.report(Activity::Tip(Proposal::new(
538            tip.chunk.clone(),
539            tip.signature.clone(),
540        )));
541
542        // Get the validator scheme for the current epoch
543        let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
544            return Err(Error::UnknownScheme(self.epoch));
545        };
546
547        // Construct vote (if a validator)
548        let Some(ack) = Ack::sign(scheme.as_ref(), tip.chunk.clone(), self.epoch) else {
549            return Err(Error::NotSigner(self.epoch));
550        };
551
552        // Sync the journal to prevent ever acking two conflicting chunks at
553        // the same height, even if the node crashes and restarts.
554        self.journal_sync(&context.sequencer, context.height).await;
555
556        // The recipients are all the validators in the epoch and the sequencer.
557        // The sequencer may or may not be a validator.
558        let recipients = {
559            let validators = scheme.participants();
560            let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
561            if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
562                recipients.push(tip.chunk.sequencer.clone());
563            }
564            recipients
565        };
566
567        // Handle the ack internally
568        self.handle_ack(&ack)?;
569
570        // Send the ack to the network
571        ack_sender.send(Recipients::Some(recipients), ack, self.priority_acks);
572
573        Ok(())
574    }
575
576    /// Handles a certificate, either received from a `Node` from the network or generated locally.
577    ///
578    /// The certificate must already be verified.
579    /// If the certificate is new, it is stored and the proof is emitted to the committer.
580    /// If the certificate is already known, it is ignored.
581    fn handle_certificate(
582        &mut self,
583        chunk: &Chunk<C::PublicKey, D>,
584        epoch: Epoch,
585        certificate: <P::Scheme as Scheme>::Certificate,
586    ) {
587        // Set the certificate, returning early if it already exists
588        if !self.ack_manager.add_certificate(
589            &chunk.sequencer,
590            chunk.height,
591            epoch,
592            certificate.clone(),
593        ) {
594            return;
595        }
596
597        // If the certificate is for my sequencer, record metric
598        if let Some(ref signer) = self.sequencer_signer {
599            if chunk.sequencer == signer.public_key() {
600                if let Some(timer) = self.propose_timer.take() {
601                    timer.observe(self.context.as_ref());
602                }
603            }
604        }
605
606        // Emit the activity
607        self.reporter
608            .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)));
609    }
610
611    /// Handles an ack
612    ///
613    /// Returns an error if the ack is invalid, or can be ignored
614    /// (e.g. already exists, certificate already exists, is outside the epoch bounds, etc.).
615    fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
616        // Get the scheme for the ack's epoch
617        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
618            return Err(Error::UnknownScheme(ack.epoch));
619        };
620
621        // Add the vote. If a new certificate is formed, handle it.
622        if let Some(certificate) = self
623            .ack_manager
624            .add_ack(ack, scheme.as_ref(), &self.strategy)
625        {
626            debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate");
627            self.metrics.certificates.inc();
628            self.handle_certificate(&ack.chunk, ack.epoch, certificate);
629        }
630
631        Ok(())
632    }
633
634    /// Handles a valid `Node` message, storing it as the tip.
635    /// Alerts the automaton of the new node.
636    /// Also appends the `Node` to the journal if it's new.
637    async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
638        // Store the tip
639        let is_new = self.tip_manager.put(node);
640
641        // If a higher height than the previous tip...
642        if is_new {
643            // Update metrics for sequencer height
644            let _ = self
645                .metrics
646                .sequencer_heights
647                .get_or_create_by(&node.chunk.sequencer)
648                .try_set(node.chunk.height.get());
649
650            // Append to journal if the `Node` is new, making sure to sync the journal
651            // to prevent sending two conflicting chunks to the automaton, even if
652            // the node crashes and restarts.
653            self.journal_append(node.clone()).await;
654            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
655                .await;
656        }
657
658        // Verify the chunk with the automaton
659        let context = Context {
660            sequencer: node.chunk.sequencer.clone(),
661            height: node.chunk.height,
662        };
663        let payload = node.chunk.payload;
664        let mut automaton = self.automaton.clone();
665        let timer = self.metrics.verify_duration.timer(self.context.as_ref());
666        self.pending_verifies.push(async move {
667            let receiver = automaton.verify(context.clone(), payload).await;
668            let result = receiver.await.map_err(Error::AppVerifyCanceled);
669            Verify {
670                timer,
671                context,
672                payload,
673                result,
674            }
675        });
676    }
677
678    ////////////////////////////////////////
679    // Proposing
680    ////////////////////////////////////////
681
682    /// Returns a `Context` if the engine should request a proposal from the automaton.
683    ///
684    /// Should only be called if the engine is not already waiting for a proposal.
685    fn should_propose(&self) -> Option<Context<C::PublicKey>> {
686        // Return `None` if we don't have a sequencer signer
687        let me = self.sequencer_signer.as_ref()?.public_key();
688
689        // Return `None` if I am not a sequencer in the current epoch
690        self.sequencers_provider
691            .sequencers(self.epoch)?
692            .position(&me)?;
693
694        // Return the next context unless my current tip has no certificate
695        match self.tip_manager.get(&me) {
696            None => Some(Context {
697                sequencer: me,
698                height: Height::zero(),
699            }),
700            Some(tip) => self
701                .ack_manager
702                .get_certificate(&me, tip.chunk.height)
703                .map(|_| Context {
704                    sequencer: me,
705                    height: tip.chunk.height.next(),
706                }),
707        }
708    }
709
710    /// Propose a new chunk to the network.
711    ///
712    /// The result is returned to the caller via the provided channel.
713    /// The proposal is only successful if the parent Chunk and certificate are known.
714    async fn propose(
715        &mut self,
716        context: Context<C::PublicKey>,
717        payload: D,
718        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
719    ) -> Result<(), Error> {
720        let mut guard = self.metrics.propose.guard(Status::Dropped);
721        let signer = self
722            .sequencer_signer
723            .as_mut()
724            .ok_or(Error::IAmNotASequencer(self.epoch))?;
725        let me = signer.public_key();
726
727        // Error-check context sequencer
728        if context.sequencer != me {
729            return Err(Error::ContextSequencer);
730        }
731
732        // Error-check that I am a sequencer in the current epoch
733        self.sequencers_provider
734            .sequencers(self.epoch)
735            .and_then(|s| s.position(&me))
736            .ok_or(Error::IAmNotASequencer(self.epoch))?;
737
738        // Get parent Chunk and certificate
739        let mut height = Height::zero();
740        let mut parent = None;
741        if let Some(tip) = self.tip_manager.get(&me) {
742            // Get certificate, or, if it doesn't exist, return an error
743            let Some((epoch, certificate)) =
744                self.ack_manager.get_certificate(&me, tip.chunk.height)
745            else {
746                return Err(Error::MissingCertificate);
747            };
748
749            // Update height and parent
750            height = tip.chunk.height.next();
751            parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
752        }
753
754        // Error-check context height
755        if context.height != height {
756            return Err(Error::ContextHeight);
757        }
758
759        // Construct new node
760        let node = Node::sign(signer, height, payload, parent);
761
762        // Deal with the chunk as if it were received over the network
763        self.handle_node(&node).await;
764
765        // Sync the journal to prevent ever proposing two conflicting chunks
766        // at the same height, even if the node crashes and restarts
767        self.journal_sync(&me, height).await;
768
769        // Record the start time of the proposal
770        self.propose_timer = Some(self.metrics.e2e_duration.timer(self.context.as_ref()));
771
772        // Broadcast to network
773        if let Err(err) = self.broadcast(node, node_sender, self.epoch) {
774            guard.set(Status::Failure);
775            return Err(err);
776        };
777
778        // Return success
779        guard.set(Status::Success);
780        Ok(())
781    }
782
783    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all validators.
784    ///
785    /// This is only done if:
786    /// - this instance is the sequencer for the current epoch.
787    /// - this instance has a chunk to rebroadcast.
788    /// - this instance has not yet collected the certificate for the chunk.
789    fn rebroadcast(
790        &mut self,
791        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
792    ) -> Result<(), Error> {
793        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
794
795        // Unset the rebroadcast deadline
796        self.rebroadcast_deadline = None;
797
798        // Return if we don't have a sequencer signer
799        let signer = self
800            .sequencer_signer
801            .as_ref()
802            .ok_or(Error::IAmNotASequencer(self.epoch))?;
803        let me = signer.public_key();
804
805        // Return if not a sequencer in the current epoch
806        self.sequencers_provider
807            .sequencers(self.epoch)
808            .and_then(|s| s.position(&me))
809            .ok_or(Error::IAmNotASequencer(self.epoch))?;
810
811        // Return if no chunk to rebroadcast
812        let Some(tip) = self.tip_manager.get(&me) else {
813            return Err(Error::NothingToRebroadcast);
814        };
815
816        // Return if certificate already collected
817        if self
818            .ack_manager
819            .get_certificate(&me, tip.chunk.height)
820            .is_some()
821        {
822            return Err(Error::AlreadyCertified);
823        }
824
825        // Broadcast the message, which resets the rebroadcast deadline
826        guard.set(Status::Failure);
827        self.broadcast(tip, node_sender, self.epoch)?;
828        guard.set(Status::Success);
829        Ok(())
830    }
831
832    /// Send a  `Node` message to all validators in the given epoch.
833    fn broadcast(
834        &mut self,
835        node: Node<C::PublicKey, P::Scheme, D>,
836        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
837        epoch: Epoch,
838    ) -> Result<(), Error> {
839        // Get the scheme for the epoch to access validators
840        let Some(scheme) = self.validators_provider.scoped(epoch) else {
841            return Err(Error::UnknownScheme(epoch));
842        };
843        let validators = scheme.participants();
844
845        // Tell the relay to broadcast the full data
846        let _ = self.relay.broadcast(node.chunk.payload, ());
847
848        // Send the node to all validators
849        node_sender.send(
850            Recipients::Some(validators.iter().cloned().collect()),
851            node.encode(),
852            self.priority_proposals,
853        );
854
855        // Set the rebroadcast deadline
856        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
857
858        Ok(())
859    }
860
861    ////////////////////////////////////////
862    // Validation
863    ////////////////////////////////////////
864
865    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
866    ///
867    /// If valid (and not already the tracked tip for the sender), returns the implied
868    /// parent chunk and its certificate.
869    /// Else returns an error if the `Node` is invalid.
870    fn validate_node(
871        &mut self,
872        node: &Node<C::PublicKey, P::Scheme, D>,
873        sender: &C::PublicKey,
874    ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
875        // Verify the sender
876        if node.chunk.sequencer != *sender {
877            return Err(Error::PeerMismatch);
878        }
879
880        // Optimization: If the node is exactly equal to the tip,
881        // don't perform further validation.
882        if let Some(tip) = self.tip_manager.get(sender) {
883            if tip == *node {
884                return Ok(None);
885            }
886        }
887
888        // Validate chunk
889        self.validate_chunk(&node.chunk, self.epoch)?;
890
891        // Verify the node
892        node.verify(
893            self.context.as_mut(),
894            &self.chunk_verifier,
895            &self.validators_provider,
896            &self.strategy,
897        )
898    }
899
900    /// Takes a raw ack (from sender) from the p2p network and validates it.
901    ///
902    /// Returns the chunk, epoch, and vote if the ack is valid.
903    /// Returns an error if the ack is invalid.
904    fn validate_ack(
905        &mut self,
906        ack: &Ack<C::PublicKey, P::Scheme, D>,
907        sender: &<P::Scheme as Scheme>::PublicKey,
908    ) -> Result<(), Error> {
909        // Validate chunk
910        self.validate_chunk(&ack.chunk, ack.epoch)?;
911
912        // Get the scheme for the epoch to validate the sender
913        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
914            return Err(Error::UnknownScheme(ack.epoch));
915        };
916
917        // Validate sender is a participant and matches the vote signer
918        let participants = scheme.participants();
919        let Some(index) = participants.index(sender) else {
920            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
921        };
922        if index != ack.attestation.signer {
923            return Err(Error::PeerMismatch);
924        }
925
926        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
927        {
928            let (eb_lo, eb_hi) = self.epoch_bounds;
929            let bound_lo = self.epoch.saturating_sub(eb_lo);
930            let bound_hi = self.epoch.saturating_add(eb_hi);
931            if ack.epoch < bound_lo || ack.epoch > bound_hi {
932                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
933            }
934        }
935
936        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
937        {
938            let bound_lo = self
939                .tip_manager
940                .get(&ack.chunk.sequencer)
941                .map(|t| t.chunk.height)
942                .unwrap_or(Height::zero());
943            let bound_hi = bound_lo.saturating_add(self.height_bound);
944            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
945                return Err(Error::AckHeightOutsideBounds(
946                    ack.chunk.height,
947                    bound_lo,
948                    bound_hi,
949                ));
950            }
951        }
952
953        // Validate the vote signature
954        if !ack.verify(self.context.as_mut(), scheme.as_ref(), &self.strategy) {
955            return Err(Error::InvalidAckSignature);
956        }
957
958        Ok(())
959    }
960
961    /// Takes a raw chunk from the p2p network and validates it against the epoch.
962    ///
963    /// Returns the chunk if the chunk is valid.
964    /// Returns an error if the chunk is invalid.
965    fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
966        // Verify sequencer
967        if self
968            .sequencers_provider
969            .sequencers(epoch)
970            .and_then(|s| s.position(&chunk.sequencer))
971            .is_none()
972        {
973            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
974        }
975
976        // Verify height
977        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
978            // Height must be at least the tip height
979            match chunk.height.cmp(&tip.chunk.height) {
980                std::cmp::Ordering::Less => {
981                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
982                }
983                std::cmp::Ordering::Equal => {
984                    // Ensure this matches the tip if the height is the same
985                    if tip.chunk.payload != chunk.payload {
986                        return Err(Error::ChunkMismatch(
987                            chunk.sequencer.to_string(),
988                            chunk.height,
989                        ));
990                    }
991                }
992                std::cmp::Ordering::Greater => {}
993            }
994        }
995
996        Ok(())
997    }
998
999    ////////////////////////////////////////
1000    // Journal
1001    ////////////////////////////////////////
1002
1003    /// Returns the section of the journal for the given height.
1004    const fn get_journal_section(&self, height: Height) -> u64 {
1005        height.get() / self.journal_heights_per_section.get()
1006    }
1007
1008    /// Ensures the journal exists and is initialized for the given sequencer.
1009    /// If the journal does not exist, it is created and replayed.
1010    /// Else, no action is taken.
1011    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1012        // Return early if the journal already exists
1013        if self.journals.contains_key(sequencer) {
1014            return;
1015        }
1016
1017        // Initialize journal
1018        let cfg = JournalConfig {
1019            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1020            compression: self.journal_compression,
1021            codec_config: P::Scheme::certificate_codec_config_unbounded(),
1022            page_cache: self.journal_page_cache.clone(),
1023            write_buffer: self.journal_write_buffer,
1024        };
1025        let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1026            self.context
1027                .child("journal")
1028                .with_attribute("sequencer", sequencer),
1029            cfg,
1030        )
1031        .await
1032        .expect("unable to init journal");
1033
1034        // Replay journal
1035        {
1036            debug!(?sequencer, "journal replay begin");
1037
1038            // Prepare the stream
1039            let stream = journal
1040                .replay(0, 0, self.journal_replay_buffer)
1041                .await
1042                .expect("unable to replay journal");
1043            pin_mut!(stream);
1044
1045            // Read from the stream, which may be in arbitrary order.
1046            // Remember the highest node height
1047            let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1048            let mut num_items = 0;
1049            while let Some(msg) = stream.next().await {
1050                let (_, _, _, node) = msg.expect("unable to read from journal");
1051                num_items += 1;
1052                let height = node.chunk.height;
1053                match tip {
1054                    None => {
1055                        tip = Some(node);
1056                    }
1057                    Some(ref t) => {
1058                        if height > t.chunk.height {
1059                            tip = Some(node);
1060                        }
1061                    }
1062                }
1063            }
1064
1065            // Set the tip only once. The items from the journal may be in arbitrary order,
1066            // and the tip manager will panic if inserting tips out-of-order.
1067            if let Some(node) = tip.take() {
1068                let is_new = self.tip_manager.put(&node);
1069                assert!(is_new);
1070            }
1071
1072            debug!(?sequencer, ?num_items, "journal replay end");
1073        }
1074
1075        // Store journal
1076        self.journals.insert(sequencer.clone(), journal);
1077    }
1078
1079    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1080    ///
1081    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1082    /// the journal must already be open and replayed.
1083    async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1084        let section = self.get_journal_section(node.chunk.height);
1085        self.journals
1086            .get_mut(&node.chunk.sequencer)
1087            .expect("journal does not exist")
1088            .append(section, &node)
1089            .await
1090            .expect("unable to append to journal");
1091    }
1092
1093    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1094    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: Height) {
1095        let section = self.get_journal_section(height);
1096
1097        // Get journal
1098        let journal = self
1099            .journals
1100            .get_mut(sequencer)
1101            .expect("journal does not exist");
1102
1103        // Sync journal
1104        journal.sync(section).await.expect("unable to sync journal");
1105
1106        // Prune journal, ignoring errors
1107        let _ = journal.prune(section).await;
1108    }
1109}