commonware_consensus/ordered_broadcast/
engine.rs

1//! Engine for the module.
2//!
3//! It is responsible for:
4//! - Proposing nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer’s chain
7//! - Recovering threshold signatures from partial signatures for each chunk
8//! - Notifying other actors of new chunks and threshold signatures
9
10use super::{
11    metrics,
12    types::{Ack, Activity, Chunk, Context, Epoch, Error, Lock, Node, Parent, Proposal},
13    AckManager, Config, TipManager,
14};
15use crate::{Automaton, Monitor, Relay, Reporter, Supervisor, ThresholdSupervisor};
16use commonware_cryptography::{
17    bls12381::primitives::{group, poly, variant::Variant},
18    Digest, PublicKey, Signer,
19};
20use commonware_macros::select;
21use commonware_p2p::{
22    utils::codec::{wrap, WrappedSender},
23    Receiver, Recipients, Sender,
24};
25use commonware_runtime::{
26    telemetry::metrics::{
27        histogram,
28        status::{CounterExt, Status},
29    },
30    Clock, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::{self, variable::Journal};
33use commonware_utils::futures::Pool as FuturesPool;
34use futures::{
35    channel::oneshot,
36    future::{self, Either},
37    pin_mut, StreamExt,
38};
39use std::{
40    collections::BTreeMap,
41    marker::PhantomData,
42    time::{Duration, SystemTime},
43};
44use tracing::{debug, error, info, warn};
45
46/// Represents a pending verification request to the automaton.
47struct Verify<C: PublicKey, D: Digest, E: Clock> {
48    timer: histogram::Timer<E>,
49    context: Context<C>,
50    payload: D,
51    result: Result<bool, Error>,
52}
53
54/// Instance of the engine.
55pub struct Engine<
56    E: Clock + Spawner + Storage + Metrics,
57    C: Signer,
58    V: Variant,
59    D: Digest,
60    A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
61    R: Relay<Digest = D>,
62    Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
63    M: Monitor<Index = Epoch>,
64    Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
65    TSu: ThresholdSupervisor<
66        Index = Epoch,
67        PublicKey = C::PublicKey,
68        Identity = V::Public,
69        Polynomial = poly::Public<V>,
70        Share = group::Share,
71    >,
72    NetS: Sender<PublicKey = C::PublicKey>,
73    NetR: Receiver<PublicKey = C::PublicKey>,
74> {
75    ////////////////////////////////////////
76    // Interfaces
77    ////////////////////////////////////////
78    context: E,
79    crypto: C,
80    automaton: A,
81    relay: R,
82    monitor: M,
83    sequencers: Su,
84    validators: TSu,
85    reporter: Z,
86
87    ////////////////////////////////////////
88    // Namespace Constants
89    ////////////////////////////////////////
90
91    // The namespace signatures.
92    namespace: Vec<u8>,
93
94    ////////////////////////////////////////
95    // Timeouts
96    ////////////////////////////////////////
97
98    // The configured timeout for rebroadcasting a chunk to all validators
99    rebroadcast_timeout: Duration,
100    rebroadcast_deadline: Option<SystemTime>,
101
102    ////////////////////////////////////////
103    // Pruning
104    ////////////////////////////////////////
105
106    // A tuple representing the epochs to keep in memory.
107    // The first element is the number of old epochs to keep.
108    // The second element is the number of future epochs to accept.
109    //
110    // For example, if the current epoch is 10, and the bounds are (1, 2), then
111    // epochs 9, 10, 11, and 12 are kept (and accepted);
112    // all others are pruned or rejected.
113    epoch_bounds: (u64, u64),
114
115    // The number of future heights to accept acks for.
116    // This is used to prevent spam of acks for arbitrary heights.
117    //
118    // For example, if the current tip for a sequencer is at height 100,
119    // and the height_bound is 10, then acks for heights 100-110 are accepted.
120    height_bound: u64,
121
122    ////////////////////////////////////////
123    // Messaging
124    ////////////////////////////////////////
125
126    // A stream of futures.
127    //
128    // Each future represents a verification request to the automaton
129    // that will either timeout or resolve with a boolean.
130    //
131    // There is no limit to the number of futures in this pool, so the automaton
132    // can apply backpressure by dropping the verification requests if necessary.
133    pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
134
135    ////////////////////////////////////////
136    // Storage
137    ////////////////////////////////////////
138
139    // The number of heights per each journal section.
140    journal_heights_per_section: u64,
141
142    // The number of bytes to buffer when replaying a journal.
143    journal_replay_buffer: usize,
144
145    // The size of the write buffer to use for each blob in the journal.
146    journal_write_buffer: usize,
147
148    // A prefix for the journal names.
149    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
150    journal_name_prefix: String,
151
152    // Compression level for the journal.
153    journal_compression: Option<u8>,
154
155    // A map of sequencer public keys to their journals.
156    #[allow(clippy::type_complexity)]
157    journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, V, D>>>,
158
159    ////////////////////////////////////////
160    // State
161    ////////////////////////////////////////
162
163    // Tracks the current tip for each sequencer.
164    // The tip is a `Node` which is comprised of a `Chunk` and,
165    // if not the genesis chunk for that sequencer,
166    // a threshold signature over the parent chunk.
167    tip_manager: TipManager<C::PublicKey, V, D>,
168
169    // Tracks the acknowledgements for chunks.
170    // This is comprised of partial signatures or threshold signatures.
171    ack_manager: AckManager<C::PublicKey, V, D>,
172
173    // The current epoch.
174    epoch: Epoch,
175
176    ////////////////////////////////////////
177    // Network
178    ////////////////////////////////////////
179
180    // Whether to send proposals as priority messages.
181    priority_proposals: bool,
182
183    // Whether to send acks as priority messages.
184    priority_acks: bool,
185
186    // The network sender and receiver types.
187    _phantom: PhantomData<(NetS, NetR)>,
188
189    ////////////////////////////////////////
190    // Metrics
191    ////////////////////////////////////////
192
193    // Metrics
194    metrics: metrics::Metrics<E>,
195
196    // The timer of my last new proposal
197    propose_timer: Option<histogram::Timer<E>>,
198}
199
200impl<
201        E: Clock + Spawner + Storage + Metrics,
202        C: Signer,
203        V: Variant,
204        D: Digest,
205        A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
206        R: Relay<Digest = D>,
207        Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
208        M: Monitor<Index = Epoch>,
209        Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
210        TSu: ThresholdSupervisor<
211            Index = Epoch,
212            PublicKey = C::PublicKey,
213            Identity = V::Public,
214            Polynomial = poly::Public<V>,
215            Share = group::Share,
216        >,
217        NetS: Sender<PublicKey = C::PublicKey>,
218        NetR: Receiver<PublicKey = C::PublicKey>,
219    > Engine<E, C, V, D, A, R, Z, M, Su, TSu, NetS, NetR>
220{
221    /// Creates a new engine with the given context and configuration.
222    pub fn new(context: E, cfg: Config<C, V, D, A, R, Z, M, Su, TSu>) -> Self {
223        let metrics = metrics::Metrics::init(context.clone());
224
225        Self {
226            context,
227            crypto: cfg.crypto,
228            automaton: cfg.automaton,
229            relay: cfg.relay,
230            reporter: cfg.reporter,
231            monitor: cfg.monitor,
232            sequencers: cfg.sequencers,
233            validators: cfg.validators,
234            namespace: cfg.namespace,
235            rebroadcast_timeout: cfg.rebroadcast_timeout,
236            rebroadcast_deadline: None,
237            epoch_bounds: cfg.epoch_bounds,
238            height_bound: cfg.height_bound,
239            pending_verifies: FuturesPool::default(),
240            journal_heights_per_section: cfg.journal_heights_per_section,
241            journal_replay_buffer: cfg.journal_replay_buffer,
242            journal_write_buffer: cfg.journal_write_buffer,
243            journal_name_prefix: cfg.journal_name_prefix,
244            journal_compression: cfg.journal_compression,
245            journals: BTreeMap::new(),
246            tip_manager: TipManager::<C::PublicKey, V, D>::new(),
247            ack_manager: AckManager::<C::PublicKey, V, D>::new(),
248            epoch: 0,
249            priority_proposals: cfg.priority_proposals,
250            priority_acks: cfg.priority_acks,
251            _phantom: PhantomData,
252            metrics,
253            propose_timer: None,
254        }
255    }
256
257    /// Runs the engine until the context is stopped.
258    ///
259    /// The engine will handle:
260    /// - Requesting and processing proposals from the application
261    /// - Timeouts
262    ///   - Refreshing the Epoch
263    ///   - Rebroadcasting Proposals
264    /// - Messages from the network:
265    ///   - Nodes
266    ///   - Acks
267    pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
268        self.context.spawn_ref()(self.run(chunk_network, ack_network))
269    }
270
271    /// Inner run loop called by `start`.
272    async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
273        let (mut node_sender, mut node_receiver) = wrap((), chunk_network.0, chunk_network.1);
274        let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
275        let mut shutdown = self.context.stopped();
276
277        // Tracks if there is an outstanding proposal request to the automaton.
278        let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
279
280        // Initialize the epoch
281        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
282        self.epoch = latest;
283
284        // Before starting on the main loop, initialize my own sequencer journal
285        // and attempt to rebroadcast if necessary.
286        self.journal_prepare(&self.crypto.public_key()).await;
287        if let Err(err) = self.rebroadcast(&mut node_sender).await {
288            // Rebroadcasting may return a non-critical error, so log the error and continue.
289            info!(?err, "initial rebroadcast failed");
290        }
291
292        loop {
293            // Request a new proposal if necessary
294            if pending.is_none() {
295                if let Some(context) = self.should_propose() {
296                    let receiver = self.automaton.propose(context.clone()).await;
297                    pending = Some((context, receiver));
298                }
299            }
300
301            // Create deadline futures.
302            //
303            // If the deadline is None, the future will never resolve.
304            let rebroadcast = match self.rebroadcast_deadline {
305                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
306                None => Either::Right(future::pending()),
307            };
308            let propose = match &mut pending {
309                Some((_context, receiver)) => Either::Left(receiver),
310                None => Either::Right(futures::future::pending()),
311            };
312
313            // Process the next event
314            select! {
315                // Handle shutdown signal
316                _ = &mut shutdown => {
317                    debug!("shutdown");
318                    break;
319                },
320
321                // Handle refresh epoch deadline
322                epoch = epoch_updates.next() => {
323                    // Error handling
324                    let Some(epoch) = epoch else {
325                        error!("epoch subscription failed");
326                        break;
327                    };
328
329                    // Refresh the epoch
330                    debug!(current=self.epoch, new=epoch, "refresh epoch");
331                    assert!(epoch >= self.epoch);
332                    self.epoch = epoch;
333                    continue;
334                },
335
336                // Handle rebroadcast deadline
337                _ = rebroadcast => {
338                    debug!(epoch = self.epoch, sender=?self.crypto.public_key(), "rebroadcast");
339                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
340                        info!(?err, "rebroadcast failed");
341                        continue;
342                    }
343                },
344
345                // Propose a new chunk
346                receiver = propose => {
347                    // Clear the pending proposal
348                    let (context, _) = pending.take().unwrap();
349                    debug!(height = context.height, "propose");
350
351                    // Error handling for dropped proposals
352                    let Ok(payload) = receiver else {
353                        warn!(?context, "automaton dropped proposal");
354                        continue;
355                    };
356
357                    // Propose the chunk
358                    if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
359                        warn!(?err, ?context, "propose new failed");
360                        continue;
361                    }
362                },
363
364                // Handle incoming nodes
365                msg = node_receiver.recv() => {
366                    // Error handling
367                    let (sender, msg) = match msg {
368                        Ok(r) => r,
369                        Err(err) => {
370                            error!(?err, "node receiver failed");
371                            break;
372                        }
373                    };
374                    let mut guard = self.metrics.nodes.guard(Status::Invalid);
375                    let node = match msg {
376                        Ok(node) => node,
377                        Err(err) => {
378                            debug!(?err, ?sender, "node decode failed");
379                            continue;
380                        }
381                    };
382                    let result = match self.validate_node(&node, &sender) {
383                        Ok(result) => result,
384                        Err(err) => {
385                            debug!(?err, ?sender, "node validate failed");
386                            continue;
387                        }
388                    };
389
390                    // Initialize journal for sequencer if it does not exist
391                    self.journal_prepare(&sender).await;
392
393                    // Handle the parent threshold signature
394                    if let Some(parent_chunk) = result {
395                        let parent = node.parent.as_ref().unwrap();
396                        self.handle_threshold(&parent_chunk, parent.epoch, parent.signature).await;
397                    }
398
399                    // Process the node
400                    //
401                    // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote
402                    // on it again (our original vote may have been lost).
403                    self.handle_node(&node).await;
404                    debug!(?sender, height=node.chunk.height, "node");
405                    guard.set(Status::Success);
406                },
407
408                // Handle incoming acks
409                msg = ack_receiver.recv() => {
410                    // Error handling
411                    let (sender, msg) = match msg {
412                        Ok(r) => r,
413                        Err(err) => {
414                            warn!(?err, "ack receiver failed");
415                            break;
416                        }
417                    };
418                    let mut guard = self.metrics.acks.guard(Status::Invalid);
419                    let ack = match msg {
420                        Ok(ack) => ack,
421                        Err(err) => {
422                            debug!(?err, ?sender, "ack decode failed");
423                            continue;
424                        }
425                    };
426                    if let Err(err) = self.validate_ack(&ack, &sender) {
427                        debug!(?err, ?sender, "ack validate failed");
428                        continue;
429                    };
430                    if let Err(err) = self.handle_ack(&ack).await {
431                        debug!(?err, ?sender, "ack handle failed");
432                        guard.set(Status::Failure);
433                        continue;
434                    }
435                    debug!(?sender, epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "ack");
436                    guard.set(Status::Success);
437                },
438
439                // Handle completed verification futures.
440                verify = self.pending_verifies.next_completed() => {
441                    let Verify { timer, context, payload, result } = verify;
442                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
443                    match result {
444                        Err(err) => {
445                            warn!(?err, ?context, "verified returned error");
446                            self.metrics.verify.inc(Status::Dropped);
447                        }
448                        Ok(false) => {
449                            debug!(?context, "verified was false");
450                            self.metrics.verify.inc(Status::Failure);
451                        }
452                        Ok(true) => {
453                            debug!(?context, "verified");
454                            self.metrics.verify.inc(Status::Success);
455                            if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
456                                debug!(?err, ?context, ?payload, "verified handle failed");
457                            }
458                        },
459                    }
460                },
461            }
462        }
463
464        // Close all journals, regardless of how we exit the loop
465        self.pending_verifies.cancel_all();
466        while let Some((_, journal)) = self.journals.pop_first() {
467            journal.close().await.expect("unable to close journal");
468        }
469    }
470
471    ////////////////////////////////////////
472    // Handling
473    ////////////////////////////////////////
474
475    /// Handles a verified message from the automaton.
476    ///
477    /// This is called when the automaton has verified a payload.
478    /// The chunk will be signed if it matches the current tip.
479    async fn handle_app_verified(
480        &mut self,
481        context: &Context<C::PublicKey>,
482        payload: &D,
483        ack_sender: &mut WrappedSender<NetS, Ack<C::PublicKey, V, D>>,
484    ) -> Result<(), Error> {
485        // Get the tip
486        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
487            return Err(Error::AppVerifiedNoTip);
488        };
489
490        // Return early if the height does not match
491        if tip.chunk.height != context.height {
492            return Err(Error::AppVerifiedHeightMismatch);
493        }
494
495        // Return early if the payload does not match
496        if tip.chunk.payload != *payload {
497            return Err(Error::AppVerifiedPayloadMismatch);
498        }
499
500        // Emit the activity
501        self.reporter
502            .report(Activity::Tip(Proposal::new(
503                tip.chunk.clone(),
504                tip.signature.clone(),
505            )))
506            .await;
507
508        // Construct partial signature (if a validator)
509        let Some(share) = self.validators.share(self.epoch) else {
510            return Err(Error::UnknownShare(self.epoch));
511        };
512        let ack = Ack::sign(&self.namespace, share, tip.chunk.clone(), self.epoch);
513
514        // Sync the journal to prevent ever acking two conflicting chunks at
515        // the same height, even if the node crashes and restarts.
516        self.journal_sync(&context.sequencer, context.height).await;
517
518        // The recipients are all the validators in the epoch and the sequencer.
519        // The sequencer may or may not be a validator.
520        let recipients = {
521            let Some(validators) = self.validators.participants(self.epoch) else {
522                return Err(Error::UnknownValidators(self.epoch));
523            };
524            let mut recipients = validators.clone();
525            if self
526                .validators
527                .is_participant(self.epoch, &tip.chunk.sequencer)
528                .is_none()
529            {
530                recipients.push(tip.chunk.sequencer.clone());
531            }
532            recipients
533        };
534
535        // Handle the ack internally
536        self.handle_ack(&ack).await?;
537
538        // Send the ack to the network
539        ack_sender
540            .send(Recipients::Some(recipients), ack, self.priority_acks)
541            .await
542            .map_err(|_| Error::UnableToSendMessage)?;
543
544        Ok(())
545    }
546
547    /// Handles a threshold, either received from a `Node` from the network or generated locally.
548    ///
549    /// The threshold must already be verified.
550    /// If the threshold is new, it is stored and the proof is emitted to the committer.
551    /// If the threshold is already known, it is ignored.
552    async fn handle_threshold(
553        &mut self,
554        chunk: &Chunk<C::PublicKey, D>,
555        epoch: Epoch,
556        threshold: V::Signature,
557    ) {
558        // Set the threshold signature, returning early if it already exists
559        if !self
560            .ack_manager
561            .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
562        {
563            return;
564        }
565
566        // If the threshold is for my sequencer, record metric
567        if chunk.sequencer == self.crypto.public_key() {
568            self.propose_timer.take();
569        }
570
571        // Emit the activity
572        self.reporter
573            .report(Activity::Lock(Lock::new(chunk.clone(), epoch, threshold)))
574            .await;
575    }
576
577    /// Handles an ack
578    ///
579    /// Returns an error if the ack is invalid, or can be ignored
580    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
581    async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, V, D>) -> Result<(), Error> {
582        // Get the quorum
583        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
584            return Err(Error::UnknownPolynomial(ack.epoch));
585        };
586        let quorum = polynomial.required();
587
588        // Add the partial signature. If a new threshold is formed, handle it.
589        if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
590            debug!(epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "recovered threshold");
591            self.metrics.threshold.inc();
592            self.handle_threshold(&ack.chunk, ack.epoch, threshold)
593                .await;
594        }
595
596        Ok(())
597    }
598
599    /// Handles a valid `Node` message, storing it as the tip.
600    /// Alerts the automaton of the new node.
601    /// Also appends the `Node` to the journal if it's new.
602    async fn handle_node(&mut self, node: &Node<C::PublicKey, V, D>) {
603        // Store the tip
604        let is_new = self.tip_manager.put(node);
605
606        // If a higher height than the previous tip...
607        if is_new {
608            // Update metrics for sequencer height
609            self.metrics
610                .sequencer_heights
611                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
612                .set(node.chunk.height as i64);
613
614            // Append to journal if the `Node` is new, making sure to sync the journal
615            // to prevent sending two conflicting chunks to the automaton, even if
616            // the node crashes and restarts.
617            self.journal_append(node.clone()).await;
618            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
619                .await;
620        }
621
622        // Verify the chunk with the automaton
623        let context = Context {
624            sequencer: node.chunk.sequencer.clone(),
625            height: node.chunk.height,
626        };
627        let payload = node.chunk.payload;
628        let mut automaton = self.automaton.clone();
629        let timer = self.metrics.verify_duration.timer();
630        self.pending_verifies.push(async move {
631            let receiver = automaton.verify(context.clone(), payload).await;
632            let result = receiver.await.map_err(Error::AppVerifyCanceled);
633            Verify {
634                timer,
635                context,
636                payload,
637                result,
638            }
639        });
640    }
641
642    ////////////////////////////////////////
643    // Proposing
644    ////////////////////////////////////////
645
646    /// Returns a `Context` if the engine should request a proposal from the automaton.
647    ///
648    /// Should only be called if the engine is not already waiting for a proposal.
649    fn should_propose(&self) -> Option<Context<C::PublicKey>> {
650        let me = self.crypto.public_key();
651
652        // Return `None` if I am not a sequencer in the current epoch
653        self.sequencers.is_participant(self.epoch, &me)?;
654
655        // Return the next context unless my current tip has no threshold signature
656        match self.tip_manager.get(&me) {
657            None => Some(Context {
658                sequencer: me,
659                height: 0,
660            }),
661            Some(tip) => self
662                .ack_manager
663                .get_threshold(&me, tip.chunk.height)
664                .map(|_| Context {
665                    sequencer: me,
666                    height: tip.chunk.height.checked_add(1).unwrap(),
667                }),
668        }
669    }
670
671    /// Propose a new chunk to the network.
672    ///
673    /// The result is returned to the caller via the provided channel.
674    /// The proposal is only successful if the parent Chunk and threshold signature are known.
675    async fn propose(
676        &mut self,
677        context: Context<C::PublicKey>,
678        payload: D,
679        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
680    ) -> Result<(), Error> {
681        let mut guard = self.metrics.propose.guard(Status::Dropped);
682        let me = self.crypto.public_key();
683
684        // Error-check context sequencer
685        if context.sequencer != me {
686            return Err(Error::ContextSequencer);
687        }
688
689        // Error-check that I am a sequencer in the current epoch
690        if self.sequencers.is_participant(self.epoch, &me).is_none() {
691            return Err(Error::IAmNotASequencer(self.epoch));
692        }
693
694        // Get parent Chunk and threshold signature
695        let mut height = 0;
696        let mut parent = None;
697        if let Some(tip) = self.tip_manager.get(&me) {
698            // Get threshold, or, if it doesn't exist, return an error
699            let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
700            else {
701                return Err(Error::MissingThreshold);
702            };
703
704            // Update height and parent
705            height = tip.chunk.height + 1;
706            parent = Some(Parent::new(tip.chunk.payload, epoch, threshold));
707        }
708
709        // Error-check context height
710        if context.height != height {
711            return Err(Error::ContextHeight);
712        }
713
714        // Construct new node
715        let node = Node::sign(&self.namespace, &mut self.crypto, height, payload, parent);
716
717        // Deal with the chunk as if it were received over the network
718        self.handle_node(&node).await;
719
720        // Sync the journal to prevent ever proposing two conflicting chunks
721        // at the same height, even if the node crashes and restarts
722        self.journal_sync(&me, height).await;
723
724        // Record the start time of the proposal
725        self.propose_timer = Some(self.metrics.e2e_duration.timer());
726
727        // Broadcast to network
728        if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
729            guard.set(Status::Failure);
730            return Err(err);
731        };
732
733        // Return success
734        guard.set(Status::Success);
735        Ok(())
736    }
737
738    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all validators.
739    ///
740    /// This is only done if:
741    /// - this instance is the sequencer for the current epoch.
742    /// - this instance has a chunk to rebroadcast.
743    /// - this instance has not yet collected the threshold signature for the chunk.
744    async fn rebroadcast(
745        &mut self,
746        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
747    ) -> Result<(), Error> {
748        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
749
750        // Unset the rebroadcast deadline
751        self.rebroadcast_deadline = None;
752
753        // Return if not a sequencer in the current epoch
754        let me = self.crypto.public_key();
755        if self.sequencers.is_participant(self.epoch, &me).is_none() {
756            return Err(Error::IAmNotASequencer(self.epoch));
757        }
758
759        // Return if no chunk to rebroadcast
760        let Some(tip) = self.tip_manager.get(&me) else {
761            return Err(Error::NothingToRebroadcast);
762        };
763
764        // Return if threshold already collected
765        if self
766            .ack_manager
767            .get_threshold(&me, tip.chunk.height)
768            .is_some()
769        {
770            return Err(Error::AlreadyThresholded);
771        }
772
773        // Broadcast the message, which resets the rebroadcast deadline
774        guard.set(Status::Failure);
775        self.broadcast(tip, node_sender, self.epoch).await?;
776        guard.set(Status::Success);
777        Ok(())
778    }
779
780    /// Send a  `Node` message to all validators in the given epoch.
781    async fn broadcast(
782        &mut self,
783        node: Node<C::PublicKey, V, D>,
784        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
785        epoch: Epoch,
786    ) -> Result<(), Error> {
787        // Get the validators for the epoch
788        let Some(validators) = self.validators.participants(epoch) else {
789            return Err(Error::UnknownValidators(epoch));
790        };
791
792        // Tell the relay to broadcast the full data
793        self.relay.broadcast(node.chunk.payload).await;
794
795        // Send the node to all validators
796        node_sender
797            .send(
798                Recipients::Some(validators.clone()),
799                node,
800                self.priority_proposals,
801            )
802            .await
803            .map_err(|_| Error::BroadcastFailed)?;
804
805        // Set the rebroadcast deadline
806        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
807
808        Ok(())
809    }
810
811    ////////////////////////////////////////
812    // Validation
813    ////////////////////////////////////////
814
815    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
816    ///
817    /// If valid (and not already the tracked tip for the sender), returns the implied
818    /// parent chunk and its threshold signature.
819    /// Else returns an error if the `Node` is invalid.
820    fn validate_node(
821        &mut self,
822        node: &Node<C::PublicKey, V, D>,
823        sender: &C::PublicKey,
824    ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
825        // Verify the sender
826        if node.chunk.sequencer != *sender {
827            return Err(Error::PeerMismatch);
828        }
829
830        // Optimization: If the node is exactly equal to the tip,
831        // don't perform further validation.
832        if let Some(tip) = self.tip_manager.get(sender) {
833            if tip == *node {
834                return Ok(None);
835            }
836        }
837
838        // Validate chunk
839        self.validate_chunk(&node.chunk, self.epoch)?;
840
841        // Verify the signature
842        node.verify(&self.namespace, self.validators.identity())
843            .map_err(|_| Error::InvalidNodeSignature)
844    }
845
846    /// Takes a raw ack (from sender) from the p2p network and validates it.
847    ///
848    /// Returns the chunk, epoch, and partial signature if the ack is valid.
849    /// Returns an error if the ack is invalid.
850    fn validate_ack(
851        &self,
852        ack: &Ack<C::PublicKey, V, D>,
853        sender: &C::PublicKey,
854    ) -> Result<(), Error> {
855        // Validate chunk
856        self.validate_chunk(&ack.chunk, ack.epoch)?;
857
858        // Validate sender
859        let Some(index) = self.validators.is_participant(ack.epoch, sender) else {
860            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
861        };
862        if index != ack.signature.index {
863            return Err(Error::PeerMismatch);
864        }
865
866        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
867        {
868            let (eb_lo, eb_hi) = self.epoch_bounds;
869            let bound_lo = self.epoch.saturating_sub(eb_lo);
870            let bound_hi = self.epoch.saturating_add(eb_hi);
871            if ack.epoch < bound_lo || ack.epoch > bound_hi {
872                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
873            }
874        }
875
876        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
877        {
878            let bound_lo = self
879                .tip_manager
880                .get(&ack.chunk.sequencer)
881                .map(|t| t.chunk.height)
882                .unwrap_or(0);
883            let bound_hi = bound_lo + self.height_bound;
884            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
885                return Err(Error::AckHeightOutsideBounds(
886                    ack.chunk.height,
887                    bound_lo,
888                    bound_hi,
889                ));
890            }
891        }
892
893        // Validate partial signature
894        // Optimization: If the ack already exists, don't verify
895        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
896            return Err(Error::UnknownPolynomial(ack.epoch));
897        };
898        if !ack.verify(&self.namespace, polynomial) {
899            return Err(Error::InvalidAckSignature);
900        }
901
902        Ok(())
903    }
904
905    /// Takes a raw chunk from the p2p network and validates it against the epoch.
906    ///
907    /// Returns the chunk if the chunk is valid.
908    /// Returns an error if the chunk is invalid.
909    fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
910        // Verify sequencer
911        if self
912            .sequencers
913            .is_participant(epoch, &chunk.sequencer)
914            .is_none()
915        {
916            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
917        }
918
919        // Verify height
920        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
921            // Height must be at least the tip height
922            match chunk.height.cmp(&tip.chunk.height) {
923                std::cmp::Ordering::Less => {
924                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
925                }
926                std::cmp::Ordering::Equal => {
927                    // Ensure this matches the tip if the height is the same
928                    if tip.chunk.payload != chunk.payload {
929                        return Err(Error::ChunkMismatch(
930                            chunk.sequencer.to_string(),
931                            chunk.height,
932                        ));
933                    }
934                }
935                std::cmp::Ordering::Greater => {}
936            }
937        }
938
939        Ok(())
940    }
941
942    ////////////////////////////////////////
943    // Journal
944    ////////////////////////////////////////
945
946    /// Returns the section of the journal for the given height.
947    fn get_journal_section(&self, height: u64) -> u64 {
948        height / self.journal_heights_per_section
949    }
950
951    /// Ensures the journal exists and is initialized for the given sequencer.
952    /// If the journal does not exist, it is created and replayed.
953    /// Else, no action is taken.
954    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
955        // Return early if the journal already exists
956        if self.journals.contains_key(sequencer) {
957            return;
958        }
959
960        // Initialize journal
961        let cfg = journal::variable::Config {
962            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
963            compression: self.journal_compression,
964            codec_config: (),
965            write_buffer: self.journal_write_buffer,
966        };
967        let journal =
968            Journal::<_, Node<C::PublicKey, V, D>>::init(self.context.with_label("journal"), cfg)
969                .await
970                .expect("unable to init journal");
971
972        // Replay journal
973        {
974            debug!(?sequencer, "journal replay begin");
975
976            // Prepare the stream
977            let stream = journal
978                .replay(self.journal_replay_buffer)
979                .await
980                .expect("unable to replay journal");
981            pin_mut!(stream);
982
983            // Read from the stream, which may be in arbitrary order.
984            // Remember the highest node height
985            let mut tip: Option<Node<C::PublicKey, V, D>> = None;
986            let mut num_items = 0;
987            while let Some(msg) = stream.next().await {
988                let (_, _, _, node) = msg.expect("unable to read from journal");
989                num_items += 1;
990                let height = node.chunk.height;
991                match tip {
992                    None => {
993                        tip = Some(node);
994                    }
995                    Some(ref t) => {
996                        if height > t.chunk.height {
997                            tip = Some(node);
998                        }
999                    }
1000                }
1001            }
1002
1003            // Set the tip only once. The items from the journal may be in arbitrary order,
1004            // and the tip manager will panic if inserting tips out-of-order.
1005            if let Some(node) = tip.take() {
1006                let is_new = self.tip_manager.put(&node);
1007                assert!(is_new);
1008            }
1009
1010            debug!(?sequencer, ?num_items, "journal replay end");
1011        }
1012
1013        // Store journal
1014        self.journals.insert(sequencer.clone(), journal);
1015    }
1016
1017    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1018    ///
1019    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1020    /// the journal must already be open and replayed.
1021    async fn journal_append(&mut self, node: Node<C::PublicKey, V, D>) {
1022        let section = self.get_journal_section(node.chunk.height);
1023        self.journals
1024            .get_mut(&node.chunk.sequencer)
1025            .expect("journal does not exist")
1026            .append(section, node)
1027            .await
1028            .expect("unable to append to journal");
1029    }
1030
1031    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1032    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1033        let section = self.get_journal_section(height);
1034
1035        // Get journal
1036        let journal = self
1037            .journals
1038            .get_mut(sequencer)
1039            .expect("journal does not exist");
1040
1041        // Sync journal
1042        journal.sync(section).await.expect("unable to sync journal");
1043
1044        // Prune journal, ignoring errors
1045        let _ = journal.prune(section).await;
1046    }
1047}