Skip to main content

commonware_consensus/ordered_broadcast/
engine.rs

1//! Engine for the module.
2//!
3//! It is responsible for:
4//! - Proposing nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer's chain
7//! - Assembling certificates from votes for each chunk
8//! - Notifying other actors of new chunks and certificates
9
10use super::{
11    metrics, scheme,
12    types::{
13        Ack, Activity, Chunk, ChunkSigner, ChunkVerifier, Context, Error, Lock, Node, Parent,
14        Proposal, SequencersProvider,
15    },
16    AckManager, Config, TipManager,
17};
18use crate::{
19    types::{Epoch, EpochDelta, Height, HeightDelta},
20    Automaton, Monitor, Relay, Reporter,
21};
22use commonware_codec::Encode;
23use commonware_cryptography::{
24    certificate::{Provider, Scheme},
25    Digest, PublicKey, Signer,
26};
27use commonware_macros::select_loop;
28use commonware_p2p::{
29    utils::codec::{wrap, WrappedSender},
30    Receiver, Recipients, Sender,
31};
32use commonware_parallel::Strategy;
33use commonware_runtime::{
34    buffer::paged::CacheRef,
35    spawn_cell,
36    telemetry::metrics::{
37        histogram,
38        status::{CounterExt, GaugeExt, Status},
39    },
40    BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
41};
42use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal};
43use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, ordered::Quorum};
44use futures::{
45    future::{self, Either},
46    pin_mut, StreamExt,
47};
48use rand_core::CryptoRngCore;
49use std::{
50    collections::BTreeMap,
51    num::{NonZeroU64, NonZeroUsize},
52    time::{Duration, SystemTime},
53};
54use tracing::{debug, error, info, warn};
55
56/// Represents a pending verification request to the automaton.
57struct Verify<C: PublicKey, D: Digest, E: Clock> {
58    timer: histogram::Timer<E>,
59    context: Context<C>,
60    payload: D,
61    result: Result<bool, Error>,
62}
63
64/// Instance of the engine.
65pub struct Engine<
66    E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics,
67    C: Signer,
68    S: SequencersProvider<PublicKey = C::PublicKey>,
69    P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D>>,
70    D: Digest,
71    A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
72    R: Relay<Digest = D>,
73    Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
74    M: Monitor<Index = Epoch>,
75    T: Strategy,
76> {
77    ////////////////////////////////////////
78    // Interfaces
79    ////////////////////////////////////////
80    context: ContextCell<E>,
81    sequencer_signer: Option<ChunkSigner<C>>,
82    sequencers_provider: S,
83    validators_provider: P,
84    automaton: A,
85    relay: R,
86    monitor: M,
87    reporter: Z,
88    strategy: T,
89
90    ////////////////////////////////////////
91    // Namespace Constants
92    ////////////////////////////////////////
93
94    // Verifier for chunk signatures.
95    chunk_verifier: ChunkVerifier,
96
97    ////////////////////////////////////////
98    // Timeouts
99    ////////////////////////////////////////
100
101    // The configured timeout for rebroadcasting a chunk to all validators
102    rebroadcast_timeout: Duration,
103    rebroadcast_deadline: Option<SystemTime>,
104
105    ////////////////////////////////////////
106    // Pruning
107    ////////////////////////////////////////
108
109    // A tuple representing the epochs to keep in memory.
110    // The first element is the number of old epochs to keep.
111    // The second element is the number of future epochs to accept.
112    //
113    // For example, if the current epoch is 10, and the bounds are (1, 2), then
114    // epochs 9, 10, 11, and 12 are kept (and accepted);
115    // all others are pruned or rejected.
116    epoch_bounds: (EpochDelta, EpochDelta),
117
118    // The number of future heights to accept acks for.
119    // This is used to prevent spam of acks for arbitrary heights.
120    //
121    // For example, if the current tip for a sequencer is at height 100,
122    // and the height_bound is 10, then acks for heights 100-110 are accepted.
123    height_bound: HeightDelta,
124
125    ////////////////////////////////////////
126    // Messaging
127    ////////////////////////////////////////
128
129    // A stream of futures.
130    //
131    // Each future represents a verification request to the automaton
132    // that will either timeout or resolve with a boolean.
133    //
134    // There is no limit to the number of futures in this pool, so the automaton
135    // can apply backpressure by dropping the verification requests if necessary.
136    pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
137
138    ////////////////////////////////////////
139    // Storage
140    ////////////////////////////////////////
141
142    // The number of heights per each journal section.
143    journal_heights_per_section: NonZeroU64,
144
145    // The number of bytes to buffer when replaying a journal.
146    journal_replay_buffer: NonZeroUsize,
147
148    // The size of the write buffer to use for each blob in the journal.
149    journal_write_buffer: NonZeroUsize,
150
151    // A prefix for the journal names.
152    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
153    journal_name_prefix: String,
154
155    // Compression level for the journal.
156    journal_compression: Option<u8>,
157
158    // Page cache for the journal.
159    journal_page_cache: CacheRef,
160
161    // A map of sequencer public keys to their journals.
162    #[allow(clippy::type_complexity)]
163    journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, P::Scheme, D>>>,
164
165    ////////////////////////////////////////
166    // State
167    ////////////////////////////////////////
168
169    // Tracks the current tip for each sequencer.
170    // The tip is a `Node` which is comprised of a `Chunk` and,
171    // if not the genesis chunk for that sequencer,
172    // a certificate over the parent chunk.
173    tip_manager: TipManager<C::PublicKey, P::Scheme, D>,
174
175    // Tracks the acknowledgements for chunks.
176    // This is comprised of votes or certificates.
177    ack_manager: AckManager<C::PublicKey, P::Scheme, D>,
178
179    // The current epoch.
180    epoch: Epoch,
181
182    ////////////////////////////////////////
183    // Network
184    ////////////////////////////////////////
185
186    // Whether to send proposals as priority messages.
187    priority_proposals: bool,
188
189    // Whether to send acks as priority messages.
190    priority_acks: bool,
191
192    ////////////////////////////////////////
193    // Metrics
194    ////////////////////////////////////////
195
196    // Metrics
197    metrics: metrics::Metrics<E>,
198
199    // The timer of my last new proposal
200    propose_timer: Option<histogram::Timer<E>>,
201}
202
203impl<
204        E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics,
205        C: Signer,
206        S: SequencersProvider<PublicKey = C::PublicKey>,
207        P: Provider<Scope = Epoch, Scheme: scheme::Scheme<C::PublicKey, D, PublicKey = C::PublicKey>>,
208        D: Digest,
209        A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
210        R: Relay<Digest = D>,
211        Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
212        M: Monitor<Index = Epoch>,
213        T: Strategy,
214    > Engine<E, C, S, P, D, A, R, Z, M, T>
215{
216    /// Creates a new engine with the given context and configuration.
217    pub fn new(context: E, cfg: Config<C, S, P, D, A, R, Z, M, T>) -> Self {
218        // TODO(#1833): Metrics should use the post-start context
219        let metrics = metrics::Metrics::init(context.clone());
220
221        Self {
222            context: ContextCell::new(context),
223            sequencer_signer: cfg.sequencer_signer,
224            sequencers_provider: cfg.sequencers_provider,
225            validators_provider: cfg.validators_provider,
226            automaton: cfg.automaton,
227            relay: cfg.relay,
228            reporter: cfg.reporter,
229            monitor: cfg.monitor,
230            strategy: cfg.strategy,
231            chunk_verifier: cfg.chunk_verifier,
232            rebroadcast_timeout: cfg.rebroadcast_timeout,
233            rebroadcast_deadline: None,
234            epoch_bounds: cfg.epoch_bounds,
235            height_bound: cfg.height_bound,
236            pending_verifies: FuturesPool::default(),
237            journal_heights_per_section: cfg.journal_heights_per_section,
238            journal_replay_buffer: cfg.journal_replay_buffer,
239            journal_write_buffer: cfg.journal_write_buffer,
240            journal_name_prefix: cfg.journal_name_prefix,
241            journal_compression: cfg.journal_compression,
242            journal_page_cache: cfg.journal_page_cache,
243            journals: BTreeMap::new(),
244            tip_manager: TipManager::<C::PublicKey, P::Scheme, D>::new(),
245            ack_manager: AckManager::<C::PublicKey, P::Scheme, D>::new(),
246            epoch: Epoch::zero(),
247            priority_proposals: cfg.priority_proposals,
248            priority_acks: cfg.priority_acks,
249            metrics,
250            propose_timer: None,
251        }
252    }
253
254    /// Runs the engine until the context is stopped.
255    ///
256    /// The engine will handle:
257    /// - Requesting and processing proposals from the application
258    /// - Timeouts
259    ///   - Refreshing the Epoch
260    ///   - Rebroadcasting Proposals
261    /// - Messages from the network:
262    ///   - Nodes
263    ///   - Acks
264    pub fn start(
265        mut self,
266        chunk_network: (
267            impl Sender<PublicKey = C::PublicKey>,
268            impl Receiver<PublicKey = C::PublicKey>,
269        ),
270        ack_network: (
271            impl Sender<PublicKey = C::PublicKey>,
272            impl Receiver<PublicKey = C::PublicKey>,
273        ),
274    ) -> Handle<()> {
275        spawn_cell!(self.context, self.run(chunk_network, ack_network).await)
276    }
277
278    /// Inner run loop called by `start`.
279    async fn run(
280        mut self,
281        chunk_network: (
282            impl Sender<PublicKey = C::PublicKey>,
283            impl Receiver<PublicKey = C::PublicKey>,
284        ),
285        ack_network: (
286            impl Sender<PublicKey = C::PublicKey>,
287            impl Receiver<PublicKey = C::PublicKey>,
288        ),
289    ) {
290        let mut node_sender = chunk_network.0;
291        let mut node_receiver = chunk_network.1;
292        let (mut ack_sender, mut ack_receiver) = wrap(
293            (),
294            self.context.network_buffer_pool().clone(),
295            ack_network.0,
296            ack_network.1,
297        );
298
299        // Tracks if there is an outstanding proposal request to the automaton.
300        let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
301
302        // Initialize the epoch
303        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
304        self.epoch = latest;
305
306        // Before starting on the main loop, initialize my own sequencer journal
307        // and attempt to rebroadcast if necessary.
308        if let Some(ref signer) = self.sequencer_signer {
309            self.journal_prepare(&signer.public_key()).await;
310            if let Err(err) = self.rebroadcast(&mut node_sender).await {
311                // Rebroadcasting may return a non-critical error, so log the error and continue.
312                info!(?err, "initial rebroadcast failed");
313            }
314        }
315
316        select_loop! {
317            self.context,
318            on_start => {
319                // Request a new proposal if necessary
320                if pending.is_none() {
321                    if let Some(context) = self.should_propose() {
322                        let receiver = self.automaton.propose(context.clone()).await;
323                        pending = Some((context, receiver));
324                    }
325                }
326
327                // Create deadline futures.
328                //
329                // If the deadline is None, the future will never resolve.
330                let rebroadcast = match self.rebroadcast_deadline {
331                    Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
332                    None => Either::Right(future::pending()),
333                };
334                let propose = match &mut pending {
335                    Some((_context, receiver)) => Either::Left(receiver),
336                    None => Either::Right(futures::future::pending()),
337                };
338            },
339            on_stopped => {
340                debug!("shutdown");
341            },
342            // Handle refresh epoch deadline
343            Some(epoch) = epoch_updates.recv() else {
344                error!("epoch subscription failed");
345                break;
346            } => {
347                // Refresh the epoch
348                debug!(current = %self.epoch, new = %epoch, "refresh epoch");
349                assert!(epoch >= self.epoch);
350                self.epoch = epoch;
351                continue;
352            },
353
354            // Handle rebroadcast deadline
355            _ = rebroadcast => {
356                if let Some(ref signer) = self.sequencer_signer {
357                    debug!(epoch = %self.epoch, sender = ?signer.public_key(), "rebroadcast");
358                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
359                        info!(?err, "rebroadcast failed");
360                        continue;
361                    }
362                }
363            },
364
365            // Propose a new chunk
366            receiver = propose => {
367                // Clear the pending proposal
368                let (context, _) = pending.take().unwrap();
369                debug!(height = %context.height, "propose");
370
371                // Error handling for dropped proposals
372                let Ok(payload) = receiver else {
373                    warn!(?context, "automaton dropped proposal");
374                    continue;
375                };
376
377                // Propose the chunk
378                if let Err(err) = self
379                    .propose(context.clone(), payload, &mut node_sender)
380                    .await
381                {
382                    warn!(?err, ?context, "propose new failed");
383                    continue;
384                }
385            },
386
387            // Handle incoming nodes
388            msg = node_receiver.recv() => {
389                // Error handling
390                let (sender, msg) = match msg {
391                    Ok(r) => r,
392                    Err(err) => {
393                        error!(?err, "node receiver failed");
394                        break;
395                    }
396                };
397                let mut guard = self.metrics.nodes.guard(Status::Invalid);
398
399                // Decode using staged decoding with epoch-aware certificate bounds
400                let node = match Node::read_staged(&mut msg.as_ref(), &self.validators_provider) {
401                    Ok(node) => node,
402                    Err(err) => {
403                        debug!(?err, ?sender, "node decode failed");
404                        continue;
405                    }
406                };
407                let result = match self.validate_node(&node, &sender) {
408                    Ok(result) => result,
409                    Err(err) => {
410                        debug!(?err, ?sender, "node validate failed");
411                        continue;
412                    }
413                };
414
415                // Initialize journal for sequencer if it does not exist
416                self.journal_prepare(&sender).await;
417
418                // Handle the parent certificate
419                if let Some(parent_chunk) = result {
420                    let parent = node.parent.as_ref().unwrap();
421                    self.handle_certificate(
422                        &parent_chunk,
423                        parent.epoch,
424                        parent.certificate.clone(),
425                    )
426                    .await;
427                }
428
429                // Process the node
430                //
431                // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote
432                // on it again (our original vote may have been lost).
433                self.handle_node(&node).await;
434                debug!(?sender, height = %node.chunk.height, "node");
435                guard.set(Status::Success);
436            },
437
438            // Handle incoming acks
439            msg = ack_receiver.recv() => {
440                // Error handling
441                let (sender, msg) = match msg {
442                    Ok(r) => r,
443                    Err(err) => {
444                        warn!(?err, "ack receiver failed");
445                        break;
446                    }
447                };
448                let mut guard = self.metrics.acks.guard(Status::Invalid);
449                let ack = match msg {
450                    Ok(ack) => ack,
451                    Err(err) => {
452                        debug!(?err, ?sender, "ack decode failed");
453                        continue;
454                    }
455                };
456                if let Err(err) = self.validate_ack(&ack, &sender) {
457                    debug!(?err, ?sender, "ack validate failed");
458                    continue;
459                };
460                if let Err(err) = self.handle_ack(&ack).await {
461                    debug!(?err, ?sender, "ack handle failed");
462                    guard.set(Status::Failure);
463                    continue;
464                }
465                debug!(?sender, epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "ack");
466                guard.set(Status::Success);
467            },
468
469            // Handle completed verification futures.
470            verify = self.pending_verifies.next_completed() => {
471                let Verify {
472                    timer,
473                    context,
474                    payload,
475                    result,
476                } = verify;
477                drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
478                match result {
479                    Err(err) => {
480                        warn!(?err, ?context, "verified returned error");
481                        self.metrics.verify.inc(Status::Dropped);
482                    }
483                    Ok(false) => {
484                        debug!(?context, "verified was false");
485                        self.metrics.verify.inc(Status::Failure);
486                    }
487                    Ok(true) => {
488                        debug!(?context, "verified");
489                        self.metrics.verify.inc(Status::Success);
490                        if let Err(err) = self
491                            .handle_app_verified(&context, &payload, &mut ack_sender)
492                            .await
493                        {
494                            debug!(?err, ?context, ?payload, "verified handle failed");
495                        }
496                    }
497                }
498            },
499        }
500
501        // Sync and drop all journals, regardless of how we exit the loop
502        self.pending_verifies.cancel_all();
503        while let Some((_, journal)) = self.journals.pop_first() {
504            journal.sync_all().await.expect("unable to sync journal");
505        }
506    }
507
508    ////////////////////////////////////////
509    // Handling
510    ////////////////////////////////////////
511
512    /// Handles a verified message from the automaton.
513    ///
514    /// This is called when the automaton has verified a payload.
515    /// The chunk will be signed if it matches the current tip.
516    async fn handle_app_verified(
517        &mut self,
518        context: &Context<C::PublicKey>,
519        payload: &D,
520        ack_sender: &mut WrappedSender<
521            impl Sender<PublicKey = C::PublicKey>,
522            Ack<C::PublicKey, P::Scheme, D>,
523        >,
524    ) -> Result<(), Error> {
525        // Get the tip
526        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
527            return Err(Error::AppVerifiedNoTip);
528        };
529
530        // Return early if the height does not match
531        if tip.chunk.height != context.height {
532            return Err(Error::AppVerifiedHeightMismatch);
533        }
534
535        // Return early if the payload does not match
536        if tip.chunk.payload != *payload {
537            return Err(Error::AppVerifiedPayloadMismatch);
538        }
539
540        // Emit the activity
541        self.reporter
542            .report(Activity::Tip(Proposal::new(
543                tip.chunk.clone(),
544                tip.signature.clone(),
545            )))
546            .await;
547
548        // Get the validator scheme for the current epoch
549        let Some(scheme) = self.validators_provider.scoped(self.epoch) else {
550            return Err(Error::UnknownScheme(self.epoch));
551        };
552
553        // Construct vote (if a validator)
554        let Some(ack) = Ack::sign(scheme.as_ref(), tip.chunk.clone(), self.epoch) else {
555            return Err(Error::NotSigner(self.epoch));
556        };
557
558        // Sync the journal to prevent ever acking two conflicting chunks at
559        // the same height, even if the node crashes and restarts.
560        self.journal_sync(&context.sequencer, context.height).await;
561
562        // The recipients are all the validators in the epoch and the sequencer.
563        // The sequencer may or may not be a validator.
564        let recipients = {
565            let validators = scheme.participants();
566            let mut recipients = validators.iter().cloned().collect::<Vec<_>>();
567            if !validators.iter().any(|v| v == &tip.chunk.sequencer) {
568                recipients.push(tip.chunk.sequencer.clone());
569            }
570            recipients
571        };
572
573        // Handle the ack internally
574        self.handle_ack(&ack).await?;
575
576        // Send the ack to the network
577        ack_sender
578            .send(Recipients::Some(recipients), ack, self.priority_acks)
579            .await
580            .map_err(|_| Error::UnableToSendMessage)?;
581
582        Ok(())
583    }
584
585    /// Handles a certificate, either received from a `Node` from the network or generated locally.
586    ///
587    /// The certificate must already be verified.
588    /// If the certificate is new, it is stored and the proof is emitted to the committer.
589    /// If the certificate is already known, it is ignored.
590    async fn handle_certificate(
591        &mut self,
592        chunk: &Chunk<C::PublicKey, D>,
593        epoch: Epoch,
594        certificate: <P::Scheme as Scheme>::Certificate,
595    ) {
596        // Set the certificate, returning early if it already exists
597        if !self.ack_manager.add_certificate(
598            &chunk.sequencer,
599            chunk.height,
600            epoch,
601            certificate.clone(),
602        ) {
603            return;
604        }
605
606        // If the certificate is for my sequencer, record metric
607        if let Some(ref signer) = self.sequencer_signer {
608            if chunk.sequencer == signer.public_key() {
609                self.propose_timer.take();
610            }
611        }
612
613        // Emit the activity
614        self.reporter
615            .report(Activity::Lock(Lock::new(chunk.clone(), epoch, certificate)))
616            .await;
617    }
618
619    /// Handles an ack
620    ///
621    /// Returns an error if the ack is invalid, or can be ignored
622    /// (e.g. already exists, certificate already exists, is outside the epoch bounds, etc.).
623    async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, P::Scheme, D>) -> Result<(), Error> {
624        // Get the scheme for the ack's epoch
625        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
626            return Err(Error::UnknownScheme(ack.epoch));
627        };
628
629        // Add the vote. If a new certificate is formed, handle it.
630        if let Some(certificate) = self
631            .ack_manager
632            .add_ack(ack, scheme.as_ref(), &self.strategy)
633        {
634            debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate");
635            self.metrics.certificates.inc();
636            self.handle_certificate(&ack.chunk, ack.epoch, certificate)
637                .await;
638        }
639
640        Ok(())
641    }
642
643    /// Handles a valid `Node` message, storing it as the tip.
644    /// Alerts the automaton of the new node.
645    /// Also appends the `Node` to the journal if it's new.
646    async fn handle_node(&mut self, node: &Node<C::PublicKey, P::Scheme, D>) {
647        // Store the tip
648        let is_new = self.tip_manager.put(node);
649
650        // If a higher height than the previous tip...
651        if is_new {
652            // Update metrics for sequencer height
653            let _ = self
654                .metrics
655                .sequencer_heights
656                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
657                .try_set(node.chunk.height.get());
658
659            // Append to journal if the `Node` is new, making sure to sync the journal
660            // to prevent sending two conflicting chunks to the automaton, even if
661            // the node crashes and restarts.
662            self.journal_append(node.clone()).await;
663            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
664                .await;
665        }
666
667        // Verify the chunk with the automaton
668        let context = Context {
669            sequencer: node.chunk.sequencer.clone(),
670            height: node.chunk.height,
671        };
672        let payload = node.chunk.payload;
673        let mut automaton = self.automaton.clone();
674        let timer = self.metrics.verify_duration.timer();
675        self.pending_verifies.push(async move {
676            let receiver = automaton.verify(context.clone(), payload).await;
677            let result = receiver.await.map_err(Error::AppVerifyCanceled);
678            Verify {
679                timer,
680                context,
681                payload,
682                result,
683            }
684        });
685    }
686
687    ////////////////////////////////////////
688    // Proposing
689    ////////////////////////////////////////
690
691    /// Returns a `Context` if the engine should request a proposal from the automaton.
692    ///
693    /// Should only be called if the engine is not already waiting for a proposal.
694    fn should_propose(&self) -> Option<Context<C::PublicKey>> {
695        // Return `None` if we don't have a sequencer signer
696        let me = self.sequencer_signer.as_ref()?.public_key();
697
698        // Return `None` if I am not a sequencer in the current epoch
699        self.sequencers_provider
700            .sequencers(self.epoch)?
701            .position(&me)?;
702
703        // Return the next context unless my current tip has no certificate
704        match self.tip_manager.get(&me) {
705            None => Some(Context {
706                sequencer: me,
707                height: Height::zero(),
708            }),
709            Some(tip) => self
710                .ack_manager
711                .get_certificate(&me, tip.chunk.height)
712                .map(|_| Context {
713                    sequencer: me,
714                    height: tip.chunk.height.next(),
715                }),
716        }
717    }
718
719    /// Propose a new chunk to the network.
720    ///
721    /// The result is returned to the caller via the provided channel.
722    /// The proposal is only successful if the parent Chunk and certificate are known.
723    async fn propose(
724        &mut self,
725        context: Context<C::PublicKey>,
726        payload: D,
727        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
728    ) -> Result<(), Error> {
729        let mut guard = self.metrics.propose.guard(Status::Dropped);
730        let signer = self
731            .sequencer_signer
732            .as_mut()
733            .ok_or(Error::IAmNotASequencer(self.epoch))?;
734        let me = signer.public_key();
735
736        // Error-check context sequencer
737        if context.sequencer != me {
738            return Err(Error::ContextSequencer);
739        }
740
741        // Error-check that I am a sequencer in the current epoch
742        self.sequencers_provider
743            .sequencers(self.epoch)
744            .and_then(|s| s.position(&me))
745            .ok_or(Error::IAmNotASequencer(self.epoch))?;
746
747        // Get parent Chunk and certificate
748        let mut height = Height::zero();
749        let mut parent = None;
750        if let Some(tip) = self.tip_manager.get(&me) {
751            // Get certificate, or, if it doesn't exist, return an error
752            let Some((epoch, certificate)) =
753                self.ack_manager.get_certificate(&me, tip.chunk.height)
754            else {
755                return Err(Error::MissingCertificate);
756            };
757
758            // Update height and parent
759            height = tip.chunk.height.next();
760            parent = Some(Parent::new(tip.chunk.payload, epoch, certificate.clone()));
761        }
762
763        // Error-check context height
764        if context.height != height {
765            return Err(Error::ContextHeight);
766        }
767
768        // Construct new node
769        let node = Node::sign(signer, height, payload, parent);
770
771        // Deal with the chunk as if it were received over the network
772        self.handle_node(&node).await;
773
774        // Sync the journal to prevent ever proposing two conflicting chunks
775        // at the same height, even if the node crashes and restarts
776        self.journal_sync(&me, height).await;
777
778        // Record the start time of the proposal
779        self.propose_timer = Some(self.metrics.e2e_duration.timer());
780
781        // Broadcast to network
782        if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
783            guard.set(Status::Failure);
784            return Err(err);
785        };
786
787        // Return success
788        guard.set(Status::Success);
789        Ok(())
790    }
791
792    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all validators.
793    ///
794    /// This is only done if:
795    /// - this instance is the sequencer for the current epoch.
796    /// - this instance has a chunk to rebroadcast.
797    /// - this instance has not yet collected the certificate for the chunk.
798    async fn rebroadcast(
799        &mut self,
800        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
801    ) -> Result<(), Error> {
802        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
803
804        // Unset the rebroadcast deadline
805        self.rebroadcast_deadline = None;
806
807        // Return if we don't have a sequencer signer
808        let signer = self
809            .sequencer_signer
810            .as_ref()
811            .ok_or(Error::IAmNotASequencer(self.epoch))?;
812        let me = signer.public_key();
813
814        // Return if not a sequencer in the current epoch
815        self.sequencers_provider
816            .sequencers(self.epoch)
817            .and_then(|s| s.position(&me))
818            .ok_or(Error::IAmNotASequencer(self.epoch))?;
819
820        // Return if no chunk to rebroadcast
821        let Some(tip) = self.tip_manager.get(&me) else {
822            return Err(Error::NothingToRebroadcast);
823        };
824
825        // Return if certificate already collected
826        if self
827            .ack_manager
828            .get_certificate(&me, tip.chunk.height)
829            .is_some()
830        {
831            return Err(Error::AlreadyCertified);
832        }
833
834        // Broadcast the message, which resets the rebroadcast deadline
835        guard.set(Status::Failure);
836        self.broadcast(tip, node_sender, self.epoch).await?;
837        guard.set(Status::Success);
838        Ok(())
839    }
840
841    /// Send a  `Node` message to all validators in the given epoch.
842    async fn broadcast(
843        &mut self,
844        node: Node<C::PublicKey, P::Scheme, D>,
845        node_sender: &mut impl Sender<PublicKey = C::PublicKey>,
846        epoch: Epoch,
847    ) -> Result<(), Error> {
848        // Get the scheme for the epoch to access validators
849        let Some(scheme) = self.validators_provider.scoped(epoch) else {
850            return Err(Error::UnknownScheme(epoch));
851        };
852        let validators = scheme.participants();
853
854        // Tell the relay to broadcast the full data
855        self.relay.broadcast(node.chunk.payload).await;
856
857        // Send the node to all validators
858        node_sender
859            .send(
860                Recipients::Some(validators.iter().cloned().collect()),
861                node.encode(),
862                self.priority_proposals,
863            )
864            .await
865            .map_err(|_| Error::BroadcastFailed)?;
866
867        // Set the rebroadcast deadline
868        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
869
870        Ok(())
871    }
872
873    ////////////////////////////////////////
874    // Validation
875    ////////////////////////////////////////
876
877    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
878    ///
879    /// If valid (and not already the tracked tip for the sender), returns the implied
880    /// parent chunk and its certificate.
881    /// Else returns an error if the `Node` is invalid.
882    fn validate_node(
883        &mut self,
884        node: &Node<C::PublicKey, P::Scheme, D>,
885        sender: &C::PublicKey,
886    ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
887        // Verify the sender
888        if node.chunk.sequencer != *sender {
889            return Err(Error::PeerMismatch);
890        }
891
892        // Optimization: If the node is exactly equal to the tip,
893        // don't perform further validation.
894        if let Some(tip) = self.tip_manager.get(sender) {
895            if tip == *node {
896                return Ok(None);
897            }
898        }
899
900        // Validate chunk
901        self.validate_chunk(&node.chunk, self.epoch)?;
902
903        // Verify the node
904        node.verify(
905            &mut self.context,
906            &self.chunk_verifier,
907            &self.validators_provider,
908            &self.strategy,
909        )
910    }
911
912    /// Takes a raw ack (from sender) from the p2p network and validates it.
913    ///
914    /// Returns the chunk, epoch, and vote if the ack is valid.
915    /// Returns an error if the ack is invalid.
916    fn validate_ack(
917        &mut self,
918        ack: &Ack<C::PublicKey, P::Scheme, D>,
919        sender: &<P::Scheme as Scheme>::PublicKey,
920    ) -> Result<(), Error> {
921        // Validate chunk
922        self.validate_chunk(&ack.chunk, ack.epoch)?;
923
924        // Get the scheme for the epoch to validate the sender
925        let Some(scheme) = self.validators_provider.scoped(ack.epoch) else {
926            return Err(Error::UnknownScheme(ack.epoch));
927        };
928
929        // Validate sender is a participant and matches the vote signer
930        let participants = scheme.participants();
931        let Some(index) = participants.index(sender) else {
932            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
933        };
934        if index != ack.attestation.signer {
935            return Err(Error::PeerMismatch);
936        }
937
938        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
939        {
940            let (eb_lo, eb_hi) = self.epoch_bounds;
941            let bound_lo = self.epoch.saturating_sub(eb_lo);
942            let bound_hi = self.epoch.saturating_add(eb_hi);
943            if ack.epoch < bound_lo || ack.epoch > bound_hi {
944                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
945            }
946        }
947
948        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
949        {
950            let bound_lo = self
951                .tip_manager
952                .get(&ack.chunk.sequencer)
953                .map(|t| t.chunk.height)
954                .unwrap_or(Height::zero());
955            let bound_hi = bound_lo.saturating_add(self.height_bound);
956            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
957                return Err(Error::AckHeightOutsideBounds(
958                    ack.chunk.height,
959                    bound_lo,
960                    bound_hi,
961                ));
962            }
963        }
964
965        // Validate the vote signature
966        if !ack.verify(&mut self.context, scheme.as_ref(), &self.strategy) {
967            return Err(Error::InvalidAckSignature);
968        }
969
970        Ok(())
971    }
972
973    /// Takes a raw chunk from the p2p network and validates it against the epoch.
974    ///
975    /// Returns the chunk if the chunk is valid.
976    /// Returns an error if the chunk is invalid.
977    fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
978        // Verify sequencer
979        if self
980            .sequencers_provider
981            .sequencers(epoch)
982            .and_then(|s| s.position(&chunk.sequencer))
983            .is_none()
984        {
985            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
986        }
987
988        // Verify height
989        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
990            // Height must be at least the tip height
991            match chunk.height.cmp(&tip.chunk.height) {
992                std::cmp::Ordering::Less => {
993                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
994                }
995                std::cmp::Ordering::Equal => {
996                    // Ensure this matches the tip if the height is the same
997                    if tip.chunk.payload != chunk.payload {
998                        return Err(Error::ChunkMismatch(
999                            chunk.sequencer.to_string(),
1000                            chunk.height,
1001                        ));
1002                    }
1003                }
1004                std::cmp::Ordering::Greater => {}
1005            }
1006        }
1007
1008        Ok(())
1009    }
1010
1011    ////////////////////////////////////////
1012    // Journal
1013    ////////////////////////////////////////
1014
1015    /// Returns the section of the journal for the given height.
1016    const fn get_journal_section(&self, height: Height) -> u64 {
1017        height.get() / self.journal_heights_per_section.get()
1018    }
1019
1020    /// Ensures the journal exists and is initialized for the given sequencer.
1021    /// If the journal does not exist, it is created and replayed.
1022    /// Else, no action is taken.
1023    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
1024        // Return early if the journal already exists
1025        if self.journals.contains_key(sequencer) {
1026            return;
1027        }
1028
1029        // Initialize journal
1030        let cfg = JournalConfig {
1031            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
1032            compression: self.journal_compression,
1033            codec_config: P::Scheme::certificate_codec_config_unbounded(),
1034            page_cache: self.journal_page_cache.clone(),
1035            write_buffer: self.journal_write_buffer,
1036        };
1037        let journal = Journal::<_, Node<C::PublicKey, P::Scheme, D>>::init(
1038            self.context
1039                .with_label("journal")
1040                .with_attribute("sequencer", sequencer)
1041                .into_present(),
1042            cfg,
1043        )
1044        .await
1045        .expect("unable to init journal");
1046
1047        // Replay journal
1048        {
1049            debug!(?sequencer, "journal replay begin");
1050
1051            // Prepare the stream
1052            let stream = journal
1053                .replay(0, 0, self.journal_replay_buffer)
1054                .await
1055                .expect("unable to replay journal");
1056            pin_mut!(stream);
1057
1058            // Read from the stream, which may be in arbitrary order.
1059            // Remember the highest node height
1060            let mut tip: Option<Node<C::PublicKey, P::Scheme, D>> = None;
1061            let mut num_items = 0;
1062            while let Some(msg) = stream.next().await {
1063                let (_, _, _, node) = msg.expect("unable to read from journal");
1064                num_items += 1;
1065                let height = node.chunk.height;
1066                match tip {
1067                    None => {
1068                        tip = Some(node);
1069                    }
1070                    Some(ref t) => {
1071                        if height > t.chunk.height {
1072                            tip = Some(node);
1073                        }
1074                    }
1075                }
1076            }
1077
1078            // Set the tip only once. The items from the journal may be in arbitrary order,
1079            // and the tip manager will panic if inserting tips out-of-order.
1080            if let Some(node) = tip.take() {
1081                let is_new = self.tip_manager.put(&node);
1082                assert!(is_new);
1083            }
1084
1085            debug!(?sequencer, ?num_items, "journal replay end");
1086        }
1087
1088        // Store journal
1089        self.journals.insert(sequencer.clone(), journal);
1090    }
1091
1092    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1093    ///
1094    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1095    /// the journal must already be open and replayed.
1096    async fn journal_append(&mut self, node: Node<C::PublicKey, P::Scheme, D>) {
1097        let section = self.get_journal_section(node.chunk.height);
1098        self.journals
1099            .get_mut(&node.chunk.sequencer)
1100            .expect("journal does not exist")
1101            .append(section, &node)
1102            .await
1103            .expect("unable to append to journal");
1104    }
1105
1106    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1107    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: Height) {
1108        let section = self.get_journal_section(height);
1109
1110        // Get journal
1111        let journal = self
1112            .journals
1113            .get_mut(sequencer)
1114            .expect("journal does not exist");
1115
1116        // Sync journal
1117        journal.sync(section).await.expect("unable to sync journal");
1118
1119        // Prune journal, ignoring errors
1120        let _ = journal.prune(section).await;
1121    }
1122}