commonware_consensus/ordered_broadcast/
engine.rs

1//! Engine for the module.
2//!
3//! It is responsible for:
4//! - Proposing nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer's chain
7//! - Assembling certificates from votes for each chunk
8//! - Notifying other actors of new chunks and certificates
9
10use super::{
11    metrics, scheme,
12    types::{
13        Ack, Activity, Chunk, ChunkSigner, ChunkVerifier, Context, Error, Lock, Node, Parent,
14        Proposal, SequencersProvider,
15    },
16    AckManager, Config, TipManager,
17};
18use crate::{
19    types::{Epoch, EpochDelta, Height, HeightDelta},
20    Automaton, Monitor, Relay, Reporter,
21};
22use commonware_codec::Encode;
23use commonware_cryptography::{
24    certificate::{Provider, Scheme},
25    Digest, PublicKey, Signer,
26};
27use commonware_macros::select;
28use commonware_p2p::{
29    utils::codec::{wrap, WrappedSender},
30    Receiver, Recipients, Sender,
31};
32use commonware_parallel::Strategy;
33use commonware_runtime::{
34    buffer::PoolRef,
35    spawn_cell,
36    telemetry::metrics::{
37        histogram,
38        status::{CounterExt, GaugeExt, Status},
39    },
40    Clock, ContextCell, Handle, Metrics, Spawner, Storage,
41};
42use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
43use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum};
44use futures::{
45    channel::oneshot,
46    future::{self, Either},
47    pin_mut, StreamExt,
48};
49use rand_core::CryptoRngCore;
50use std::{
51    collections::BTreeMap,
52    num::{NonZeroU64, NonZeroUsize},
53    time::{Duration, SystemTime},
54};
55use tracing::{debug, error, info, warn};
56
57/// Represents a pending verification request to the automaton.
58struct Verify<C: PublicKey, D: Digest, E: Clock> {
59    timer: histogram::Timer<E>,
60    context: Context<C>,
61    payload: D,
62    result: Result<bool, Error>,
63}
64
65/// Instance of the engine.
66pub struct Engine<
67    E: Clock + Spawner + CryptoRngCore + Storage + Metrics,
68    C: Signer,
69    S: SequencersProvider<PublicKey = C::PublicKey>,
70    P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
71    D: Digest,
72    A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
73    R: Relay<Digest = D>,
74    Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
75    M: Monitor<Index = Epoch>,
76    T: Strategy,
77> {
78    ////////////////////////////////////////
79    // Interfaces
80    ////////////////////////////////////////
81    context: ContextCell<E>,
82    sequencer_signer: Option<ChunkSigner<C>>,
83    sequencers_provider: S,
84    validators_provider: P,
85    automaton: A,
86    relay: R,
87    monitor: M,
88    reporter: Z,
89    strategy: T,
90
91    ////////////////////////////////////////
92    // Namespace Constants
93    ////////////////////////////////////////
94
95    // Verifier for chunk signatures.
96    chunk_verifier: ChunkVerifier,
97
98    ////////////////////////////////////////
99    // Timeouts
100    ////////////////////////////////////////
101
102    // The configured timeout for rebroadcasting a chunk to all validators
103    rebroadcast_timeout: Duration,
104    rebroadcast_deadline: Option<SystemTime>,
105
106    ////////////////////////////////////////
107    // Pruning
108    ////////////////////////////////////////
109
110    // A tuple representing the epochs to keep in memory.
111    // The first element is the number of old epochs to keep.
112    // The second element is the number of future epochs to accept.
113    //
114    // For example, if the current epoch is 10, and the bounds are (1, 2), then
115    // epochs 9, 10, 11, and 12 are kept (and accepted);
116    // all others are pruned or rejected.
117    epoch_bounds: (EpochDelta, EpochDelta),
118
119    // The number of future heights to accept acks for.
120    // This is used to prevent spam of acks for arbitrary heights.
121    //
122    // For example, if the current tip for a sequencer is at height 100,
123    // and the height_bound is 10, then acks for heights 100-110 are accepted.
124    height_bound: HeightDelta,
125
126    ////////////////////////////////////////
127    // Messaging
128    ////////////////////////////////////////
129
130    // A stream of futures.
131    //
132    // Each future represents a verification request to the automaton
133    // that will either timeout or resolve with a boolean.
134    //
135    // There is no limit to the number of futures in this pool, so the automaton
136    // can apply backpressure by dropping the verification requests if necessary.
137    pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
138
139    ////////////////////////////////////////
140    // Storage
141    ////////////////////////////////////////
142
143    // The number of heights per each journal section.
144    journal_heights_per_section: NonZeroU64,
145
146    // The number of bytes to buffer when replaying a journal.
147    journal_replay_buffer: NonZeroUsize,
148
149    // The size of the write buffer to use for each blob in the journal.
150    journal_write_buffer: NonZeroUsize,
151
152    // A prefix for the journal names.
153    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
154    journal_name_prefix: String,
155
156    // Compression level for the journal.
157    journal_compression: Option<u8>,
158
159    // Buffer pool for the journal.
160    journal_buffer_pool: PoolRef,
161
162    // A map of sequencer public keys to their journals.
163    #[allow(clippy::type_complexity)]
164    journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
165
166    ////////////////////////////////////////
167    // State
168    ////////////////////////////////////////
169
170    // Tracks the current tip for each sequencer.
171    // The tip is a `Node` which is comprised of a `Chunk` and,
172    // if not the genesis chunk for that sequencer,
173    // a certificate over the parent chunk.
174    tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
175
176    // Tracks the acknowledgements for chunks.
177    // This is comprised of votes or certificates.
178    ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
179
180    // The current epoch.
181    epoch: Epoch,
182
183    ////////////////////////////////////////
184    // Network
185    ////////////////////////////////////////
186
187    // Whether to send proposals as priority messages.
188    priority_proposals: bool,
189
190    // Whether to send acks as priority messages.
191    priority_acks: bool,
192
193    ////////////////////////////////////////
194    // Metrics
195    ////////////////////////////////////////
196
197    // Metrics
198    metrics: metrics::Metrics<E>,
199
200    // The timer of my last new proposal
201    propose_timer: Option<histogram::Timer<E>>,
202}
203
204impl<
205        E: Clock + Spawner + CryptoRngCore + Storage + Metrics,
206        C: Signer,
207        S: SequencersProvider<PublicKey = C::PublicKey>,
208        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
209        D: Digest,
210        A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
211        R: Relay<Digest = D>,
212        Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
213        M: Monitor<Index = Epoch>,
214        T: Strategy,
215    > Engine<E, C, S, P, D, A, R, Z, M, T>
216{
217    /// Creates a new engine with the given context and configuration.
218    pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M, T>) -> Self {
219        // TODO(#1833): Metrics should use the post-start context
220        let metrics = metrics::Metrics::init(context.clone());
221
222        Self {
223            context: ContextCell::new(context),
224            sequencer_signer: cfg.sequencer_signer,
225            sequencers_provider: cfg.sequencers_provider,
226            validators_provider: cfg.validators_provider,
227            automaton: cfg.automaton,
228            relay: cfg.relay,
229            reporter: cfg.reporter,
230            monitor: cfg.monitor,
231            strategy: cfg.strategy,
232            chunk_verifier: cfg.chunk_verifier,
233            rebroadcast_timeout: cfg.rebroadcast_timeout,
234            rebroadcast_deadline: None,
235            epoch_bounds: cfg.epoch_bounds,
236            height_bound: cfg.height_bound,
237            pending_verifies: FuturesPool::default(),
238            journal_heights_per_section: cfg.journal_heights_per_section,
239            journal_replay_buffer: cfg.journal_replay_buffer,
240            journal_write_buffer: cfg.journal_write_buffer,
241            journal_name_prefix: cfg.journal_name_prefix,
242            journal_compression: cfg.journal_compression,
243            journal_buffer_pool: cfg.journal_buffer_pool,
244            journals: BTreeMap::new(),
245            tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
246            ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
247            epoch: Epoch::zero(),
248            priority_proposals: cfg.priority_proposals,
249            priority_acks: cfg.priority_acks,
250            metrics,
251            propose_timer: None,
252        }
253    }
254
255    /// Runs the engine until the context is stopped.
256    ///
257    /// The engine will handle:
258    /// - Requesting and processing proposals from the application
259    /// - Timeouts
260    ///   - Refreshing the Epoch
261    ///   - Rebroadcasting Proposals
262    /// - Messages from the network:
263    ///   - Nodes
264    ///   - Acks
265    pub fn start(
266        mut self,
267        chunk_network: (
268            impl Sender<PublicKey = C::PublicKey>,
269            impl Receiver<PublicKey = C::PublicKey>,
270        ),
271        ack_network: (
272            impl Sender<PublicKey = C::PublicKey>,
273            impl Receiver<PublicKey = C::PublicKey>,
274        ),
275    ) -> Handle<()> {
276        spawn_cell!(self.context, self.run(chunk_network, ack_network).await)
277    }
278
279    /// Inner run loop called by `start`.
280    async fn run(
281        mut self,
282        chunk_network: (
283            impl Sender<PublicKey = C::PublicKey>,
284            impl Receiver<PublicKey = C::PublicKey>,
285        ),
286        ack_network: (
287            impl Sender<PublicKey = C::PublicKey>,
288            impl Receiver<PublicKey = C::PublicKey>,
289        ),
290    ) {
291        let mut node_sender = chunk_network.0;
292        let mut node_receiver = chunk_network.1;
293        let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
294        let mut shutdown = self.context.stopped();
295
296        // Tracks if there is an outstanding proposal request to the automaton.
297        let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
298
299        // Initialize the epoch
300        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
301        self.epoch = latest;
302
303        // Before starting on the main loop, initialize my own sequencer journal
304        // and attempt to rebroadcast if necessary.
305        if let Some(ref signer) = self.sequencer_signer {
306            self.journal_prepare(&signer.public_key()).await;
307            if let Err(err) = self.rebroadcast(&mut node_sender).await {
308                // Rebroadcasting may return a non-critical error, so log the error and continue.
309                info!(?err, "initial rebroadcast failed");
310            }
311        }
312
313        loop {
314            // Request a new proposal if necessary
315            if pending.is_none() {
316                if let Some(context) = self.should_propose() {
317                    let receiver = self.automaton.propose(context.clone()).await;
318                    pending = Some((context, receiver));
319                }
320            }
321
322            // Create deadline futures.
323            //
324            // If the deadline is None, the future will never resolve.
325            let rebroadcast = match self.rebroadcast_deadline {
326                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
327                None => Either::Right(future::pending()),
328            };
329            let propose = match &mut pending {
330                Some((_context, receiver)) => Either::Left(receiver),
331                None => Either::Right(futures::future::pending()),
332            };
333
334            // Process the next event
335            select! {
336                // Handle shutdown signal
337                _ = &mut shutdown => {
338                    debug!("shutdown");
339                    break;
340                },
341
342                // Handle refresh epoch deadline
343                epoch = epoch_updates.next() => {
344                    // Error handling
345                    let Some(epoch) = epoch else {
346                        error!("epoch subscription failed");
347                        break;
348                    };
349
350                    // Refresh the epoch
351                    debug!(current = %self.epoch, new = %epoch, "refresh epoch");
352                    assert!(epoch >= self.epoch);
353                    self.epoch = epoch;
354                    continue;
355                },
356
357                // Handle rebroadcast deadline
358                _ = rebroadcast => {
359                    if let Some(ref signer) = self.sequencer_signer {
360                        debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
361                        if let Err(err) = self.rebroadcast(&mut node_sender).await {
362                            info!(?err, "rebroadcast failed");
363                            continue;
364                        }
365                    }
366                },
367
368                // Propose a new chunk
369                receiver = propose => {
370                    // Clear the pending proposal
371                    let (context, _) = pending.take().unwrap();
372                    debug!(height = %context.height, "propose");
373
374                    // Error handling for dropped proposals
375                    let Ok(payload) = receiver else {
376                        warn!(?context, "automaton dropped proposal");
377                        continue;
378                    };
379
380                    // Propose the chunk
381                    if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
382                        warn!(?err, ?context, "propose new failed");
383                        continue;
384                    }
385                },
386
387                // Handle incoming nodes
388                msg = node_receiver.recv() => {
389                    // Error handling
390                    let (sender, msg) = match msg {
391                        Ok(r) => r,
392                        Err(err) => {
393                            error!(?err, "node receiver failed");
394                            break;
395                        }
396                    };
397                    let mut guard = self.metrics.nodes.guard(Status::Invalid);
398
399                    // Decode using staged decoding with epoch-aware certificate bounds
400                    let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
401                        Ok(node) => node,
402                        Err(err) => {
403                            debug!(?err, ?sender, "node decode failed");
404                            continue;
405                        }
406                    };
407                    let result = match self.validate_node(&node, &sender) {
408                        Ok(result) => result,
409                        Err(err) => {
410                            debug!(?err, ?sender, "node validate failed");
411                            continue;
412                        }
413                    };
414
415                    // Initialize journal for sequencer if it does not exist
416                    self.journal_prepare(&sender).await;
417
418                    // Handle the parent certificate
419                    if let Some(parent_chunk) = result {
420                        let parent = node.parent.as_ref().unwrap();
421                        self.handle_certificate(&parent_chunk, parent.epoch, parent.certificate.clone()).await;
422                    }
423
424                    // Process the node
425                    //
426                    // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote
427                    // on it again (our original vote may have been lost).
428                    self.handle_node(&node).await;
429                    debug!(?sender, height = %node.chunk.height, "node");
430                    guard.set(Status::Success);
431                },
432
433                // Handle incoming acks
434                msg = ack_receiver.recv() => {
435                    // Error handling
436                    let (sender, msg) = match msg {
437                        Ok(r) => r,
438                        Err(err) => {
439                            warn!(?err, "ack receiver failed");
440                            break;
441                        }
442                    };
443                    let mut guard = self.metrics.acks.guard(Status::Invalid);
444                    let ack = match msg {
445                        Ok(ack) => ack,
446                        Err(err) => {
447                            debug!(?err, ?sender, "ack decode failed");
448                            continue;
449                        }
450                    };
451                    if let Err(err) = self.validate_ack(&ack, &sender) {
452                        debug!(?err, ?sender, "ack validate failed");
453                        continue;
454                    };
455                    if let Err(err) = self.handle_ack(&ack).await {
456                        debug!(?err, ?sender, "ack handle failed");
457                        guard.set(Status::Failure);
458                        continue;
459                    }
460                    debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack");
461                    guard.set(Status::Success);
462                },
463
464                // Handle completed verification futures.
465                verify = self.pending_verifies.next_completed() => {
466                    let Verify { timer, context, payload, result } = verify;
467                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
468                    match result {
469                        Err(err) => {
470                            warn!(?err, ?context, "verified returned error");
471                            self.metrics.verify.inc(Status::Dropped);
472                        }
473                        Ok(false) => {
474                            debug!(?context, "verified was false");
475                            self.metrics.verify.inc(Status::Failure);
476                        }
477                        Ok(true) => {
478                            debug!(?context, "verified");
479                            self.metrics.verify.inc(Status::Success);
480                            if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
481                                debug!(?err, ?context, ?payload, "verified handle failed");
482                            }
483                        },
484                    }
485                },
486            }
487        }
488
489        // Sync and drop all journals, regardless of how we exit the loop
490        self.pending_verifies.cancel_all();
491        while let Some((_, journal)) = self.journals.pop_first() {
492            journal.sync_all().await.expect("unable to sync journal");
493        }
494    }
495
496    ////////////////////////////////////////
497    // Handling
498    ////////////////////////////////////////
499
500    /// Handles a verified message from the automaton.
501    ///
502    /// This is called when the automaton has verified a payload.
503    /// The chunk will be signed if it matches the current tip.
504    async fn handle_app_verified(
505        &mut self,
506        context: &Context<C::PublicKey>,
507        payload: &D,
508        ack_sender: &mut WrappedSender<
509            impl Sender<PublicKey = C::PublicKey>,
510            Ack<C::PublicKey, P::Scheme, D>,
511        >,
512    ) -> Result<(), Error> {
513        // Get the tip
514        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
515            return Err(Error::AppVerifiedNoTip);
516        };
517
518        // Return early if the height does not match
519        if tip.chunk.height != context.height {
520            return Err(Error::AppVerifiedHeightMismatch);
521        }
522
523        // Return early if the payload does not match
524        if tip.chunk.payload != *payload {
525            return Err(Error::AppVerifiedPayloadMismatch);
526        }
527
528        // Emit the activity
529        self.reporter
530            .report(Activity::Tip(Proposal::new(
531                tip.chunk.clone(),
532                tip.signature.clone(),
533            )))
534            .await;
535
536        // Get the validator scheme for the current epoch
537        let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
538            return Err(Error::UnknownScheme(self.epoch));
539        };
540
541        // Construct vote (if a validator)
542        let Some(ack) = Ack::sign(scheme.as_ref(), tip.chunk.clone(), self.epoch) else {
543            return Err(Error::NotSigner(self.epoch));
544        };
545
546        // Sync the journal to prevent ever acking two conflicting chunks at
547        // the same height, even if the node crashes and restarts.
548        self.journal_sync(&context.sequencer, context.height).await;
549
550        // The recipients are all the validators in the epoch and the sequencer.
551        // The sequencer may or may not be a validator.
552        let recipients = {
553            let validators = scheme.participants();
554            let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
555            if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
556                recipients.push(tip.chunk.sequencer.clone());
557            }
558            recipients
559        };
560
561        // Handle the ack internally
562        self.handle_ack(&ack).await?;
563
564        // Send the ack to the network
565        ack_sender
566            .send(Recipients::Some(recipients), ack, self.priority_acks)
567            .await
568            .map_err(|_| Error::UnableToSendMessage)?;
569
570        Ok(())
571    }
572
573    /// Handles a certificate, either received from a `Node` from the network or generated locally.
574    ///
575    /// The certificate must already be verified.
576    /// If the certificate is new, it is stored and the proof is emitted to the committer.
577    /// If the certificate is already known, it is ignored.
578    async fn handle_certificate(
579        &mut self,
580        chunk: &Chunk<C::PublicKey, D>,
581        epoch: Epoch,
582        certificate: <P::Scheme as Scheme>::Certificate,
583    ) {
584        // Set the certificate, returning early if it already exists
585        if !self.ack_manager.add_certificate(
586            &chunk.sequencer,
587            chunk.height,
588            epoch,
589            certificate.clone(),
590        ) {
591            return;
592        }
593
594        // If the certificate is for my sequencer, record metric
595        if let Some(ref signer) = self.sequencer_signer {
596            if chunk.sequencer == signer.public_key() {
597                self.propose_timer.take();
598            }
599        }
600
601        // Emit the activity
602        self.reporter
603            .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)))
604            .await;
605    }
606
607    /// Handles an ack
608    ///
609    /// Returns an error if the ack is invalid, or can be ignored
610    /// (e.g. already exists, certificate already exists, is outside the epoch bounds, etc.).
611    async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
612        // Get the scheme for the ack's epoch
613        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
614            return Err(Error::UnknownScheme(ack.epoch));
615        };
616
617        // Add the vote. If a new certificate is formed, handle it.
618        if let Some(certificate) = self
619            .ack_manager
620            .add_ack(ack, scheme.as_ref(), &self.strategy)
621        {
622            debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate");
623            self.metrics.certificates.inc();
624            self.handle_certificate(&ack.chunk, ack.epoch, certificate)
625                .await;
626        }
627
628        Ok(())
629    }
630
631    /// Handles a valid `Node` message, storing it as the tip.
632    /// Alerts the automaton of the new node.
633    /// Also appends the `Node` to the journal if it's new.
634    async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
635        // Store the tip
636        let is_new = self.tip_manager.put(node);
637
638        // If a higher height than the previous tip...
639        if is_new {
640            // Update metrics for sequencer height
641            let _ = self
642                .metrics
643                .sequencer_heights
644                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
645                .try_set(node.chunk.height.get());
646
647            // Append to journal if the `Node` is new, making sure to sync the journal
648            // to prevent sending two conflicting chunks to the automaton, even if
649            // the node crashes and restarts.
650            self.journal_append(node.clone()).await;
651            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
652                .await;
653        }
654
655        // Verify the chunk with the automaton
656        let context = Context {
657            sequencer: node.chunk.sequencer.clone(),
658            height: node.chunk.height,
659        };
660        let payload = node.chunk.payload;
661        let mut automaton = self.automaton.clone();
662        let timer = self.metrics.verify_duration.timer();
663        self.pending_verifies.push(async move {
664            let receiver = automaton.verify(context.clone(), payload).await;
665            let result = receiver.await.map_err(Error::AppVerifyCanceled);
666            Verify {
667                timer,
668                context,
669                payload,
670                result,
671            }
672        });
673    }
674
675    ////////////////////////////////////////
676    // Proposing
677    ////////////////////////////////////////
678
679    /// Returns a `Context` if the engine should request a proposal from the automaton.
680    ///
681    /// Should only be called if the engine is not already waiting for a proposal.
682    fn should_propose(&self) -> Option<Context<C::PublicKey>> {
683        // Return `None` if we don't have a sequencer signer
684        let me = self.sequencer_signer.as_ref()?.public_key();
685
686        // Return `None` if I am not a sequencer in the current epoch
687        self.sequencers_provider
688            .sequencers(self.epoch)?
689            .position(&me)?;
690
691        // Return the next context unless my current tip has no certificate
692        match self.tip_manager.get(&me) {
693            None => Some(Context {
694                sequencer: me,
695                height: Height::zero(),
696            }),
697            Some(tip) => self
698                .ack_manager
699                .get_certificate(&me, tip.chunk.height)
700                .map(|_| Context {
701                    sequencer: me,
702                    height: tip.chunk.height.next(),
703                }),
704        }
705    }
706
707    /// Propose a new chunk to the network.
708    ///
709    /// The result is returned to the caller via the provided channel.
710    /// The proposal is only successful if the parent Chunk and certificate are known.
711    async fn propose(
712        &mut self,
713        context: Context<C::PublicKey>,
714        payload: D,
715        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
716    ) -> Result<(), Error> {
717        let mut guard = self.metrics.propose.guard(Status::Dropped);
718        let signer = self
719            .sequencer_signer
720            .as_mut()
721            .ok_or(Error::IAmNotASequencer(self.epoch))?;
722        let me = signer.public_key();
723
724        // Error-check context sequencer
725        if context.sequencer != me {
726            return Err(Error::ContextSequencer);
727        }
728
729        // Error-check that I am a sequencer in the current epoch
730        self.sequencers_provider
731            .sequencers(self.epoch)
732            .and_then(|s| s.position(&me))
733            .ok_or(Error::IAmNotASequencer(self.epoch))?;
734
735        // Get parent Chunk and certificate
736        let mut height = Height::zero();
737        let mut parent = None;
738        if let Some(tip) = self.tip_manager.get(&me) {
739            // Get certificate, or, if it doesn't exist, return an error
740            let Some((epoch, certificate)) =
741                self.ack_manager.get_certificate(&me, tip.chunk.height)
742            else {
743                return Err(Error::MissingCertificate);
744            };
745
746            // Update height and parent
747            height = tip.chunk.height.next();
748            parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
749        }
750
751        // Error-check context height
752        if context.height != height {
753            return Err(Error::ContextHeight);
754        }
755
756        // Construct new node
757        let node = Node::sign(signer, height, payload, parent);
758
759        // Deal with the chunk as if it were received over the network
760        self.handle_node(&node).await;
761
762        // Sync the journal to prevent ever proposing two conflicting chunks
763        // at the same height, even if the node crashes and restarts
764        self.journal_sync(&me, height).await;
765
766        // Record the start time of the proposal
767        self.propose_timer = Some(self.metrics.e2e_duration.timer());
768
769        // Broadcast to network
770        if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
771            guard.set(Status::Failure);
772            return Err(err);
773        };
774
775        // Return success
776        guard.set(Status::Success);
777        Ok(())
778    }
779
780    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all validators.
781    ///
782    /// This is only done if:
783    /// - this instance is the sequencer for the current epoch.
784    /// - this instance has a chunk to rebroadcast.
785    /// - this instance has not yet collected the certificate for the chunk.
786    async fn rebroadcast(
787        &mut self,
788        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
789    ) -> Result<(), Error> {
790        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
791
792        // Unset the rebroadcast deadline
793        self.rebroadcast_deadline = None;
794
795        // Return if we don't have a sequencer signer
796        let signer = self
797            .sequencer_signer
798            .as_ref()
799            .ok_or(Error::IAmNotASequencer(self.epoch))?;
800        let me = signer.public_key();
801
802        // Return if not a sequencer in the current epoch
803        self.sequencers_provider
804            .sequencers(self.epoch)
805            .and_then(|s| s.position(&me))
806            .ok_or(Error::IAmNotASequencer(self.epoch))?;
807
808        // Return if no chunk to rebroadcast
809        let Some(tip) = self.tip_manager.get(&me) else {
810            return Err(Error::NothingToRebroadcast);
811        };
812
813        // Return if certificate already collected
814        if self
815            .ack_manager
816            .get_certificate(&me, tip.chunk.height)
817            .is_some()
818        {
819            return Err(Error::AlreadyCertified);
820        }
821
822        // Broadcast the message, which resets the rebroadcast deadline
823        guard.set(Status::Failure);
824        self.broadcast(tip, node_sender, self.epoch).await?;
825        guard.set(Status::Success);
826        Ok(())
827    }
828
829    /// Send a  `Node` message to all validators in the given epoch.
830    async fn broadcast(
831        &mut self,
832        node: Node<C::PublicKey, P::Scheme, D>,
833        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
834        epoch: Epoch,
835    ) -> Result<(), Error> {
836        // Get the scheme for the epoch to access validators
837        let Some(scheme) = self.validators_provider.scoped(epoch) else {
838            return Err(Error::UnknownScheme(epoch));
839        };
840        let validators = scheme.participants();
841
842        // Tell the relay to broadcast the full data
843        self.relay.broadcast(node.chunk.payload).await;
844
845        // Send the node to all validators
846        node_sender
847            .send(
848                Recipients::Some(validators.iter().cloned().collect()),
849                node.encode(),
850                self.priority_proposals,
851            )
852            .await
853            .map_err(|_| Error::BroadcastFailed)?;
854
855        // Set the rebroadcast deadline
856        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
857
858        Ok(())
859    }
860
861    ////////////////////////////////////////
862    // Validation
863    ////////////////////////////////////////
864
865    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
866    ///
867    /// If valid (and not already the tracked tip for the sender), returns the implied
868    /// parent chunk and its certificate.
869    /// Else returns an error if the `Node` is invalid.
870    fn validate_node(
871        &mut self,
872        node: &Node<C::PublicKey, P::Scheme, D>,
873        sender: &C::PublicKey,
874    ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
875        // Verify the sender
876        if node.chunk.sequencer != *sender {
877            return Err(Error::PeerMismatch);
878        }
879
880        // Optimization: If the node is exactly equal to the tip,
881        // don't perform further validation.
882        if let Some(tip) = self.tip_manager.get(sender) {
883            if tip == *node {
884                return Ok(None);
885            }
886        }
887
888        // Validate chunk
889        self.validate_chunk(&node.chunk, self.epoch)?;
890
891        // Verify the node
892        node.verify(
893            &mut self.context,
894            &self.chunk_verifier,
895            &self.validators_provider,
896            &self.strategy,
897        )
898    }
899
900    /// Takes a raw ack (from sender) from the p2p network and validates it.
901    ///
902    /// Returns the chunk, epoch, and vote if the ack is valid.
903    /// Returns an error if the ack is invalid.
904    fn validate_ack(
905        &mut self,
906        ack: &Ack<C::PublicKey, P::Scheme, D>,
907        sender: &<P::Scheme as Scheme>::PublicKey,
908    ) -> Result<(), Error> {
909        // Validate chunk
910        self.validate_chunk(&ack.chunk, ack.epoch)?;
911
912        // Get the scheme for the epoch to validate the sender
913        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
914            return Err(Error::UnknownScheme(ack.epoch));
915        };
916
917        // Validate sender is a participant and matches the vote signer
918        let participants = scheme.participants();
919        let Some(index) = participants.index(sender) else {
920            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
921        };
922        if index != ack.attestation.signer {
923            return Err(Error::PeerMismatch);
924        }
925
926        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
927        {
928            let (eb_lo, eb_hi) = self.epoch_bounds;
929            let bound_lo = self.epoch.saturating_sub(eb_lo);
930            let bound_hi = self.epoch.saturating_add(eb_hi);
931            if ack.epoch < bound_lo || ack.epoch > bound_hi {
932                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
933            }
934        }
935
936        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
937        {
938            let bound_lo = self
939                .tip_manager
940                .get(&ack.chunk.sequencer)
941                .map(|t| t.chunk.height)
942                .unwrap_or(Height::zero());
943            let bound_hi = bound_lo.saturating_add(self.height_bound);
944            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
945                return Err(Error::AckHeightOutsideBounds(
946                    ack.chunk.height,
947                    bound_lo,
948                    bound_hi,
949                ));
950            }
951        }
952
953        // Validate the vote signature
954        if !ack.verify(&mut self.context, scheme.as_ref(), &self.strategy) {
955            return Err(Error::InvalidAckSignature);
956        }
957
958        Ok(())
959    }
960
961    /// Takes a raw chunk from the p2p network and validates it against the epoch.
962    ///
963    /// Returns the chunk if the chunk is valid.
964    /// Returns an error if the chunk is invalid.
965    fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
966        // Verify sequencer
967        if self
968            .sequencers_provider
969            .sequencers(epoch)
970            .and_then(|s| s.position(&chunk.sequencer))
971            .is_none()
972        {
973            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
974        }
975
976        // Verify height
977        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
978            // Height must be at least the tip height
979            match chunk.height.cmp(&tip.chunk.height) {
980                std::cmp::Ordering::Less => {
981                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
982                }
983                std::cmp::Ordering::Equal => {
984                    // Ensure this matches the tip if the height is the same
985                    if tip.chunk.payload != chunk.payload {
986                        return Err(Error::ChunkMismatch(
987                            chunk.sequencer.to_string(),
988                            chunk.height,
989                        ));
990                    }
991                }
992                std::cmp::Ordering::Greater => {}
993            }
994        }
995
996        Ok(())
997    }
998
999    ////////////////////////////////////////
1000    // Journal
1001    ////////////////////////////////////////
1002
1003    /// Returns the section of the journal for the given height.
1004    const fn get_journal_section(&self, height: Height) -> u64 {
1005        height.get() / self.journal_heights_per_section.get()
1006    }
1007
1008    /// Ensures the journal exists and is initialized for the given sequencer.
1009    /// If the journal does not exist, it is created and replayed.
1010    /// Else, no action is taken.
1011    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1012        // Return early if the journal already exists
1013        if self.journals.contains_key(sequencer) {
1014            return;
1015        }
1016
1017        // Initialize journal
1018        let cfg = JournalConfig {
1019            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1020            compression: self.journal_compression,
1021            codec_config: P::Scheme::certificate_codec_config_unbounded(),
1022            buffer_pool: self.journal_buffer_pool.clone(),
1023            write_buffer: self.journal_write_buffer,
1024        };
1025        let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1026            self.context.with_label("journal").into_present(),
1027            cfg,
1028        )
1029        .await
1030        .expect("unable to init journal");
1031
1032        // Replay journal
1033        {
1034            debug!(?sequencer, "journal replay begin");
1035
1036            // Prepare the stream
1037            let stream = journal
1038                .replay(0, 0, self.journal_replay_buffer)
1039                .await
1040                .expect("unable to replay journal");
1041            pin_mut!(stream);
1042
1043            // Read from the stream, which may be in arbitrary order.
1044            // Remember the highest node height
1045            let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1046            let mut num_items = 0;
1047            while let Some(msg) = stream.next().await {
1048                let (_, _, _, node) = msg.expect("unable to read from journal");
1049                num_items += 1;
1050                let height = node.chunk.height;
1051                match tip {
1052                    None => {
1053                        tip = Some(node);
1054                    }
1055                    Some(ref t) => {
1056                        if height > t.chunk.height {
1057                            tip = Some(node);
1058                        }
1059                    }
1060                }
1061            }
1062
1063            // Set the tip only once. The items from the journal may be in arbitrary order,
1064            // and the tip manager will panic if inserting tips out-of-order.
1065            if let Some(node) = tip.take() {
1066                let is_new = self.tip_manager.put(&node);
1067                assert!(is_new);
1068            }
1069
1070            debug!(?sequencer, ?num_items, "journal replay end");
1071        }
1072
1073        // Store journal
1074        self.journals.insert(sequencer.clone(), journal);
1075    }
1076
1077    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1078    ///
1079    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1080    /// the journal must already be open and replayed.
1081    async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1082        let section = self.get_journal_section(node.chunk.height);
1083        self.journals
1084            .get_mut(&node.chunk.sequencer)
1085            .expect("journal does not exist")
1086            .append(section, node)
1087            .await
1088            .expect("unable to append to journal");
1089    }
1090
1091    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1092    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: Height) {
1093        let section = self.get_journal_section(height);
1094
1095        // Get journal
1096        let journal = self
1097            .journals
1098            .get_mut(sequencer)
1099            .expect("journal does not exist");
1100
1101        // Sync journal
1102        journal.sync(section).await.expect("unable to sync journal");
1103
1104        // Prune journal, ignoring errors
1105        let _ = journal.prune(section).await;
1106    }
1107}