commonware_consensus/ordered_broadcast/
engine.rs

1//! Engine for the module.
2//!
3//! It is responsible for:
4//! - Proposing nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer’s chain
7//! - Recovering threshold signatures from partial signatures for each chunk
8//! - Notifying other actors of new chunks and threshold signatures
9
10use super::{
11    metrics,
12    types::{Ack, Activity, Chunk, Context, Epoch, Error, Lock, Node, Parent, Proposal},
13    AckManager, Config, TipManager,
14};
15use crate::{Automaton, Monitor, Relay, Reporter, Supervisor, ThresholdSupervisor};
16use commonware_cryptography::{
17    bls12381::primitives::{group, poly, variant::Variant},
18    Digest, PublicKey, Signer,
19};
20use commonware_macros::select;
21use commonware_p2p::{
22    utils::codec::{wrap, WrappedSender},
23    Receiver, Recipients, Sender,
24};
25use commonware_runtime::{
26    buffer::PoolRef,
27    telemetry::metrics::{
28        histogram,
29        status::{CounterExt, Status},
30    },
31    Clock, Handle, Metrics, Spawner, Storage,
32};
33use commonware_storage::journal::{self, variable::Journal};
34use commonware_utils::futures::Pool as FuturesPool;
35use futures::{
36    channel::oneshot,
37    future::{self, Either},
38    pin_mut, StreamExt,
39};
40use std::{
41    collections::BTreeMap,
42    marker::PhantomData,
43    num::NonZeroUsize,
44    time::{Duration, SystemTime},
45};
46use tracing::{debug, error, info, warn};
47
48/// Represents a pending verification request to the automaton.
49struct Verify<C: PublicKey, D: Digest, E: Clock> {
50    timer: histogram::Timer<E>,
51    context: Context<C>,
52    payload: D,
53    result: Result<bool, Error>,
54}
55
56/// Instance of the engine.
57pub struct Engine<
58    E: Clock + Spawner + Storage + Metrics,
59    C: Signer,
60    V: Variant,
61    D: Digest,
62    A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
63    R: Relay<Digest = D>,
64    Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
65    M: Monitor<Index = Epoch>,
66    Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
67    TSu: ThresholdSupervisor<
68        Index = Epoch,
69        PublicKey = C::PublicKey,
70        Identity = V::Public,
71        Polynomial = poly::Public<V>,
72        Share = group::Share,
73    >,
74    NetS: Sender<PublicKey = C::PublicKey>,
75    NetR: Receiver<PublicKey = C::PublicKey>,
76> {
77    ////////////////////////////////////////
78    // Interfaces
79    ////////////////////////////////////////
80    context: E,
81    crypto: C,
82    automaton: A,
83    relay: R,
84    monitor: M,
85    sequencers: Su,
86    validators: TSu,
87    reporter: Z,
88
89    ////////////////////////////////////////
90    // Namespace Constants
91    ////////////////////////////////////////
92
93    // The namespace signatures.
94    namespace: Vec<u8>,
95
96    ////////////////////////////////////////
97    // Timeouts
98    ////////////////////////////////////////
99
100    // The configured timeout for rebroadcasting a chunk to all validators
101    rebroadcast_timeout: Duration,
102    rebroadcast_deadline: Option<SystemTime>,
103
104    ////////////////////////////////////////
105    // Pruning
106    ////////////////////////////////////////
107
108    // A tuple representing the epochs to keep in memory.
109    // The first element is the number of old epochs to keep.
110    // The second element is the number of future epochs to accept.
111    //
112    // For example, if the current epoch is 10, and the bounds are (1, 2), then
113    // epochs 9, 10, 11, and 12 are kept (and accepted);
114    // all others are pruned or rejected.
115    epoch_bounds: (u64, u64),
116
117    // The number of future heights to accept acks for.
118    // This is used to prevent spam of acks for arbitrary heights.
119    //
120    // For example, if the current tip for a sequencer is at height 100,
121    // and the height_bound is 10, then acks for heights 100-110 are accepted.
122    height_bound: u64,
123
124    ////////////////////////////////////////
125    // Messaging
126    ////////////////////////////////////////
127
128    // A stream of futures.
129    //
130    // Each future represents a verification request to the automaton
131    // that will either timeout or resolve with a boolean.
132    //
133    // There is no limit to the number of futures in this pool, so the automaton
134    // can apply backpressure by dropping the verification requests if necessary.
135    pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
136
137    ////////////////////////////////////////
138    // Storage
139    ////////////////////////////////////////
140
141    // The number of heights per each journal section.
142    journal_heights_per_section: u64,
143
144    // The number of bytes to buffer when replaying a journal.
145    journal_replay_buffer: NonZeroUsize,
146
147    // The size of the write buffer to use for each blob in the journal.
148    journal_write_buffer: NonZeroUsize,
149
150    // A prefix for the journal names.
151    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
152    journal_name_prefix: String,
153
154    // Compression level for the journal.
155    journal_compression: Option<u8>,
156
157    // Buffer pool for the journal.
158    journal_buffer_pool: PoolRef,
159
160    // A map of sequencer public keys to their journals.
161    #[allow(clippy::type_complexity)]
162    journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, V, D>>>,
163
164    ////////////////////////////////////////
165    // State
166    ////////////////////////////////////////
167
168    // Tracks the current tip for each sequencer.
169    // The tip is a `Node` which is comprised of a `Chunk` and,
170    // if not the genesis chunk for that sequencer,
171    // a threshold signature over the parent chunk.
172    tip_manager: TipManager<C::PublicKey, V, D>,
173
174    // Tracks the acknowledgements for chunks.
175    // This is comprised of partial signatures or threshold signatures.
176    ack_manager: AckManager<C::PublicKey, V, D>,
177
178    // The current epoch.
179    epoch: Epoch,
180
181    ////////////////////////////////////////
182    // Network
183    ////////////////////////////////////////
184
185    // Whether to send proposals as priority messages.
186    priority_proposals: bool,
187
188    // Whether to send acks as priority messages.
189    priority_acks: bool,
190
191    // The network sender and receiver types.
192    _phantom: PhantomData<(NetS, NetR)>,
193
194    ////////////////////////////////////////
195    // Metrics
196    ////////////////////////////////////////
197
198    // Metrics
199    metrics: metrics::Metrics<E>,
200
201    // The timer of my last new proposal
202    propose_timer: Option<histogram::Timer<E>>,
203}
204
205impl<
206        E: Clock + Spawner + Storage + Metrics,
207        C: Signer,
208        V: Variant,
209        D: Digest,
210        A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
211        R: Relay<Digest = D>,
212        Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
213        M: Monitor<Index = Epoch>,
214        Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
215        TSu: ThresholdSupervisor<
216            Index = Epoch,
217            PublicKey = C::PublicKey,
218            Identity = V::Public,
219            Polynomial = poly::Public<V>,
220            Share = group::Share,
221        >,
222        NetS: Sender<PublicKey = C::PublicKey>,
223        NetR: Receiver<PublicKey = C::PublicKey>,
224    > Engine<E, C, V, D, A, R, Z, M, Su, TSu, NetS, NetR>
225{
226    /// Creates a new engine with the given context and configuration.
227    pub fn new(context: E, cfg: Config<C, V, D, A, R, Z, M, Su, TSu>) -> Self {
228        let metrics = metrics::Metrics::init(context.clone());
229
230        Self {
231            context,
232            crypto: cfg.crypto,
233            automaton: cfg.automaton,
234            relay: cfg.relay,
235            reporter: cfg.reporter,
236            monitor: cfg.monitor,
237            sequencers: cfg.sequencers,
238            validators: cfg.validators,
239            namespace: cfg.namespace,
240            rebroadcast_timeout: cfg.rebroadcast_timeout,
241            rebroadcast_deadline: None,
242            epoch_bounds: cfg.epoch_bounds,
243            height_bound: cfg.height_bound,
244            pending_verifies: FuturesPool::default(),
245            journal_heights_per_section: cfg.journal_heights_per_section,
246            journal_replay_buffer: cfg.journal_replay_buffer,
247            journal_write_buffer: cfg.journal_write_buffer,
248            journal_name_prefix: cfg.journal_name_prefix,
249            journal_compression: cfg.journal_compression,
250            journal_buffer_pool: cfg.journal_buffer_pool,
251            journals: BTreeMap::new(),
252            tip_manager: TipManager::<C::PublicKey, V, D>::new(),
253            ack_manager: AckManager::<C::PublicKey, V, D>::new(),
254            epoch: 0,
255            priority_proposals: cfg.priority_proposals,
256            priority_acks: cfg.priority_acks,
257            _phantom: PhantomData,
258            metrics,
259            propose_timer: None,
260        }
261    }
262
263    /// Runs the engine until the context is stopped.
264    ///
265    /// The engine will handle:
266    /// - Requesting and processing proposals from the application
267    /// - Timeouts
268    ///   - Refreshing the Epoch
269    ///   - Rebroadcasting Proposals
270    /// - Messages from the network:
271    ///   - Nodes
272    ///   - Acks
273    pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
274        self.context.spawn_ref()(self.run(chunk_network, ack_network))
275    }
276
277    /// Inner run loop called by `start`.
278    async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
279        let (mut node_sender, mut node_receiver) = wrap((), chunk_network.0, chunk_network.1);
280        let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
281        let mut shutdown = self.context.stopped();
282
283        // Tracks if there is an outstanding proposal request to the automaton.
284        let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
285
286        // Initialize the epoch
287        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
288        self.epoch = latest;
289
290        // Before starting on the main loop, initialize my own sequencer journal
291        // and attempt to rebroadcast if necessary.
292        self.journal_prepare(&self.crypto.public_key()).await;
293        if let Err(err) = self.rebroadcast(&mut node_sender).await {
294            // Rebroadcasting may return a non-critical error, so log the error and continue.
295            info!(?err, "initial rebroadcast failed");
296        }
297
298        loop {
299            // Request a new proposal if necessary
300            if pending.is_none() {
301                if let Some(context) = self.should_propose() {
302                    let receiver = self.automaton.propose(context.clone()).await;
303                    pending = Some((context, receiver));
304                }
305            }
306
307            // Create deadline futures.
308            //
309            // If the deadline is None, the future will never resolve.
310            let rebroadcast = match self.rebroadcast_deadline {
311                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
312                None => Either::Right(future::pending()),
313            };
314            let propose = match &mut pending {
315                Some((_context, receiver)) => Either::Left(receiver),
316                None => Either::Right(futures::future::pending()),
317            };
318
319            // Process the next event
320            select! {
321                // Handle shutdown signal
322                _ = &mut shutdown => {
323                    debug!("shutdown");
324                    break;
325                },
326
327                // Handle refresh epoch deadline
328                epoch = epoch_updates.next() => {
329                    // Error handling
330                    let Some(epoch) = epoch else {
331                        error!("epoch subscription failed");
332                        break;
333                    };
334
335                    // Refresh the epoch
336                    debug!(current=self.epoch, new=epoch, "refresh epoch");
337                    assert!(epoch >= self.epoch);
338                    self.epoch = epoch;
339                    continue;
340                },
341
342                // Handle rebroadcast deadline
343                _ = rebroadcast => {
344                    debug!(epoch = self.epoch, sender=?self.crypto.public_key(), "rebroadcast");
345                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
346                        info!(?err, "rebroadcast failed");
347                        continue;
348                    }
349                },
350
351                // Propose a new chunk
352                receiver = propose => {
353                    // Clear the pending proposal
354                    let (context, _) = pending.take().unwrap();
355                    debug!(height = context.height, "propose");
356
357                    // Error handling for dropped proposals
358                    let Ok(payload) = receiver else {
359                        warn!(?context, "automaton dropped proposal");
360                        continue;
361                    };
362
363                    // Propose the chunk
364                    if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
365                        warn!(?err, ?context, "propose new failed");
366                        continue;
367                    }
368                },
369
370                // Handle incoming nodes
371                msg = node_receiver.recv() => {
372                    // Error handling
373                    let (sender, msg) = match msg {
374                        Ok(r) => r,
375                        Err(err) => {
376                            error!(?err, "node receiver failed");
377                            break;
378                        }
379                    };
380                    let mut guard = self.metrics.nodes.guard(Status::Invalid);
381                    let node = match msg {
382                        Ok(node) => node,
383                        Err(err) => {
384                            debug!(?err, ?sender, "node decode failed");
385                            continue;
386                        }
387                    };
388                    let result = match self.validate_node(&node, &sender) {
389                        Ok(result) => result,
390                        Err(err) => {
391                            debug!(?err, ?sender, "node validate failed");
392                            continue;
393                        }
394                    };
395
396                    // Initialize journal for sequencer if it does not exist
397                    self.journal_prepare(&sender).await;
398
399                    // Handle the parent threshold signature
400                    if let Some(parent_chunk) = result {
401                        let parent = node.parent.as_ref().unwrap();
402                        self.handle_threshold(&parent_chunk, parent.epoch, parent.signature).await;
403                    }
404
405                    // Process the node
406                    //
407                    // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote
408                    // on it again (our original vote may have been lost).
409                    self.handle_node(&node).await;
410                    debug!(?sender, height=node.chunk.height, "node");
411                    guard.set(Status::Success);
412                },
413
414                // Handle incoming acks
415                msg = ack_receiver.recv() => {
416                    // Error handling
417                    let (sender, msg) = match msg {
418                        Ok(r) => r,
419                        Err(err) => {
420                            warn!(?err, "ack receiver failed");
421                            break;
422                        }
423                    };
424                    let mut guard = self.metrics.acks.guard(Status::Invalid);
425                    let ack = match msg {
426                        Ok(ack) => ack,
427                        Err(err) => {
428                            debug!(?err, ?sender, "ack decode failed");
429                            continue;
430                        }
431                    };
432                    if let Err(err) = self.validate_ack(&ack, &sender) {
433                        debug!(?err, ?sender, "ack validate failed");
434                        continue;
435                    };
436                    if let Err(err) = self.handle_ack(&ack).await {
437                        debug!(?err, ?sender, "ack handle failed");
438                        guard.set(Status::Failure);
439                        continue;
440                    }
441                    debug!(?sender, epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "ack");
442                    guard.set(Status::Success);
443                },
444
445                // Handle completed verification futures.
446                verify = self.pending_verifies.next_completed() => {
447                    let Verify { timer, context, payload, result } = verify;
448                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
449                    match result {
450                        Err(err) => {
451                            warn!(?err, ?context, "verified returned error");
452                            self.metrics.verify.inc(Status::Dropped);
453                        }
454                        Ok(false) => {
455                            debug!(?context, "verified was false");
456                            self.metrics.verify.inc(Status::Failure);
457                        }
458                        Ok(true) => {
459                            debug!(?context, "verified");
460                            self.metrics.verify.inc(Status::Success);
461                            if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
462                                debug!(?err, ?context, ?payload, "verified handle failed");
463                            }
464                        },
465                    }
466                },
467            }
468        }
469
470        // Close all journals, regardless of how we exit the loop
471        self.pending_verifies.cancel_all();
472        while let Some((_, journal)) = self.journals.pop_first() {
473            journal.close().await.expect("unable to close journal");
474        }
475    }
476
477    ////////////////////////////////////////
478    // Handling
479    ////////////////////////////////////////
480
481    /// Handles a verified message from the automaton.
482    ///
483    /// This is called when the automaton has verified a payload.
484    /// The chunk will be signed if it matches the current tip.
485    async fn handle_app_verified(
486        &mut self,
487        context: &Context<C::PublicKey>,
488        payload: &D,
489        ack_sender: &mut WrappedSender<NetS, Ack<C::PublicKey, V, D>>,
490    ) -> Result<(), Error> {
491        // Get the tip
492        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
493            return Err(Error::AppVerifiedNoTip);
494        };
495
496        // Return early if the height does not match
497        if tip.chunk.height != context.height {
498            return Err(Error::AppVerifiedHeightMismatch);
499        }
500
501        // Return early if the payload does not match
502        if tip.chunk.payload != *payload {
503            return Err(Error::AppVerifiedPayloadMismatch);
504        }
505
506        // Emit the activity
507        self.reporter
508            .report(Activity::Tip(Proposal::new(
509                tip.chunk.clone(),
510                tip.signature.clone(),
511            )))
512            .await;
513
514        // Construct partial signature (if a validator)
515        let Some(share) = self.validators.share(self.epoch) else {
516            return Err(Error::UnknownShare(self.epoch));
517        };
518        let ack = Ack::sign(&self.namespace, share, tip.chunk.clone(), self.epoch);
519
520        // Sync the journal to prevent ever acking two conflicting chunks at
521        // the same height, even if the node crashes and restarts.
522        self.journal_sync(&context.sequencer, context.height).await;
523
524        // The recipients are all the validators in the epoch and the sequencer.
525        // The sequencer may or may not be a validator.
526        let recipients = {
527            let Some(validators) = self.validators.participants(self.epoch) else {
528                return Err(Error::UnknownValidators(self.epoch));
529            };
530            let mut recipients = validators.clone();
531            if self
532                .validators
533                .is_participant(self.epoch, &tip.chunk.sequencer)
534                .is_none()
535            {
536                recipients.push(tip.chunk.sequencer.clone());
537            }
538            recipients
539        };
540
541        // Handle the ack internally
542        self.handle_ack(&ack).await?;
543
544        // Send the ack to the network
545        ack_sender
546            .send(Recipients::Some(recipients), ack, self.priority_acks)
547            .await
548            .map_err(|_| Error::UnableToSendMessage)?;
549
550        Ok(())
551    }
552
553    /// Handles a threshold, either received from a `Node` from the network or generated locally.
554    ///
555    /// The threshold must already be verified.
556    /// If the threshold is new, it is stored and the proof is emitted to the committer.
557    /// If the threshold is already known, it is ignored.
558    async fn handle_threshold(
559        &mut self,
560        chunk: &Chunk<C::PublicKey, D>,
561        epoch: Epoch,
562        threshold: V::Signature,
563    ) {
564        // Set the threshold signature, returning early if it already exists
565        if !self
566            .ack_manager
567            .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
568        {
569            return;
570        }
571
572        // If the threshold is for my sequencer, record metric
573        if chunk.sequencer == self.crypto.public_key() {
574            self.propose_timer.take();
575        }
576
577        // Emit the activity
578        self.reporter
579            .report(Activity::Lock(Lock::new(chunk.clone(), epoch, threshold)))
580            .await;
581    }
582
583    /// Handles an ack
584    ///
585    /// Returns an error if the ack is invalid, or can be ignored
586    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
587    async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, V, D>) -> Result<(), Error> {
588        // Get the quorum
589        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
590            return Err(Error::UnknownPolynomial(ack.epoch));
591        };
592        let quorum = polynomial.required();
593
594        // Add the partial signature. If a new threshold is formed, handle it.
595        if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
596            debug!(epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "recovered threshold");
597            self.metrics.threshold.inc();
598            self.handle_threshold(&ack.chunk, ack.epoch, threshold)
599                .await;
600        }
601
602        Ok(())
603    }
604
605    /// Handles a valid `Node` message, storing it as the tip.
606    /// Alerts the automaton of the new node.
607    /// Also appends the `Node` to the journal if it's new.
608    async fn handle_node(&mut self, node: &Node<C::PublicKey, V, D>) {
609        // Store the tip
610        let is_new = self.tip_manager.put(node);
611
612        // If a higher height than the previous tip...
613        if is_new {
614            // Update metrics for sequencer height
615            self.metrics
616                .sequencer_heights
617                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
618                .set(node.chunk.height as i64);
619
620            // Append to journal if the `Node` is new, making sure to sync the journal
621            // to prevent sending two conflicting chunks to the automaton, even if
622            // the node crashes and restarts.
623            self.journal_append(node.clone()).await;
624            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
625                .await;
626        }
627
628        // Verify the chunk with the automaton
629        let context = Context {
630            sequencer: node.chunk.sequencer.clone(),
631            height: node.chunk.height,
632        };
633        let payload = node.chunk.payload;
634        let mut automaton = self.automaton.clone();
635        let timer = self.metrics.verify_duration.timer();
636        self.pending_verifies.push(async move {
637            let receiver = automaton.verify(context.clone(), payload).await;
638            let result = receiver.await.map_err(Error::AppVerifyCanceled);
639            Verify {
640                timer,
641                context,
642                payload,
643                result,
644            }
645        });
646    }
647
648    ////////////////////////////////////////
649    // Proposing
650    ////////////////////////////////////////
651
652    /// Returns a `Context` if the engine should request a proposal from the automaton.
653    ///
654    /// Should only be called if the engine is not already waiting for a proposal.
655    fn should_propose(&self) -> Option<Context<C::PublicKey>> {
656        let me = self.crypto.public_key();
657
658        // Return `None` if I am not a sequencer in the current epoch
659        self.sequencers.is_participant(self.epoch, &me)?;
660
661        // Return the next context unless my current tip has no threshold signature
662        match self.tip_manager.get(&me) {
663            None => Some(Context {
664                sequencer: me,
665                height: 0,
666            }),
667            Some(tip) => self
668                .ack_manager
669                .get_threshold(&me, tip.chunk.height)
670                .map(|_| Context {
671                    sequencer: me,
672                    height: tip.chunk.height.checked_add(1).unwrap(),
673                }),
674        }
675    }
676
677    /// Propose a new chunk to the network.
678    ///
679    /// The result is returned to the caller via the provided channel.
680    /// The proposal is only successful if the parent Chunk and threshold signature are known.
681    async fn propose(
682        &mut self,
683        context: Context<C::PublicKey>,
684        payload: D,
685        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
686    ) -> Result<(), Error> {
687        let mut guard = self.metrics.propose.guard(Status::Dropped);
688        let me = self.crypto.public_key();
689
690        // Error-check context sequencer
691        if context.sequencer != me {
692            return Err(Error::ContextSequencer);
693        }
694
695        // Error-check that I am a sequencer in the current epoch
696        if self.sequencers.is_participant(self.epoch, &me).is_none() {
697            return Err(Error::IAmNotASequencer(self.epoch));
698        }
699
700        // Get parent Chunk and threshold signature
701        let mut height = 0;
702        let mut parent = None;
703        if let Some(tip) = self.tip_manager.get(&me) {
704            // Get threshold, or, if it doesn't exist, return an error
705            let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
706            else {
707                return Err(Error::MissingThreshold);
708            };
709
710            // Update height and parent
711            height = tip.chunk.height + 1;
712            parent = Some(Parent::new(tip.chunk.payload, epoch, threshold));
713        }
714
715        // Error-check context height
716        if context.height != height {
717            return Err(Error::ContextHeight);
718        }
719
720        // Construct new node
721        let node = Node::sign(&self.namespace, &mut self.crypto, height, payload, parent);
722
723        // Deal with the chunk as if it were received over the network
724        self.handle_node(&node).await;
725
726        // Sync the journal to prevent ever proposing two conflicting chunks
727        // at the same height, even if the node crashes and restarts
728        self.journal_sync(&me, height).await;
729
730        // Record the start time of the proposal
731        self.propose_timer = Some(self.metrics.e2e_duration.timer());
732
733        // Broadcast to network
734        if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
735            guard.set(Status::Failure);
736            return Err(err);
737        };
738
739        // Return success
740        guard.set(Status::Success);
741        Ok(())
742    }
743
744    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all validators.
745    ///
746    /// This is only done if:
747    /// - this instance is the sequencer for the current epoch.
748    /// - this instance has a chunk to rebroadcast.
749    /// - this instance has not yet collected the threshold signature for the chunk.
750    async fn rebroadcast(
751        &mut self,
752        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
753    ) -> Result<(), Error> {
754        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
755
756        // Unset the rebroadcast deadline
757        self.rebroadcast_deadline = None;
758
759        // Return if not a sequencer in the current epoch
760        let me = self.crypto.public_key();
761        if self.sequencers.is_participant(self.epoch, &me).is_none() {
762            return Err(Error::IAmNotASequencer(self.epoch));
763        }
764
765        // Return if no chunk to rebroadcast
766        let Some(tip) = self.tip_manager.get(&me) else {
767            return Err(Error::NothingToRebroadcast);
768        };
769
770        // Return if threshold already collected
771        if self
772            .ack_manager
773            .get_threshold(&me, tip.chunk.height)
774            .is_some()
775        {
776            return Err(Error::AlreadyThresholded);
777        }
778
779        // Broadcast the message, which resets the rebroadcast deadline
780        guard.set(Status::Failure);
781        self.broadcast(tip, node_sender, self.epoch).await?;
782        guard.set(Status::Success);
783        Ok(())
784    }
785
786    /// Send a  `Node` message to all validators in the given epoch.
787    async fn broadcast(
788        &mut self,
789        node: Node<C::PublicKey, V, D>,
790        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
791        epoch: Epoch,
792    ) -> Result<(), Error> {
793        // Get the validators for the epoch
794        let Some(validators) = self.validators.participants(epoch) else {
795            return Err(Error::UnknownValidators(epoch));
796        };
797
798        // Tell the relay to broadcast the full data
799        self.relay.broadcast(node.chunk.payload).await;
800
801        // Send the node to all validators
802        node_sender
803            .send(
804                Recipients::Some(validators.clone()),
805                node,
806                self.priority_proposals,
807            )
808            .await
809            .map_err(|_| Error::BroadcastFailed)?;
810
811        // Set the rebroadcast deadline
812        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
813
814        Ok(())
815    }
816
817    ////////////////////////////////////////
818    // Validation
819    ////////////////////////////////////////
820
821    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
822    ///
823    /// If valid (and not already the tracked tip for the sender), returns the implied
824    /// parent chunk and its threshold signature.
825    /// Else returns an error if the `Node` is invalid.
826    fn validate_node(
827        &mut self,
828        node: &Node<C::PublicKey, V, D>,
829        sender: &C::PublicKey,
830    ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
831        // Verify the sender
832        if node.chunk.sequencer != *sender {
833            return Err(Error::PeerMismatch);
834        }
835
836        // Optimization: If the node is exactly equal to the tip,
837        // don't perform further validation.
838        if let Some(tip) = self.tip_manager.get(sender) {
839            if tip == *node {
840                return Ok(None);
841            }
842        }
843
844        // Validate chunk
845        self.validate_chunk(&node.chunk, self.epoch)?;
846
847        // Verify the signature
848        node.verify(&self.namespace, self.validators.identity())
849            .map_err(|_| Error::InvalidNodeSignature)
850    }
851
852    /// Takes a raw ack (from sender) from the p2p network and validates it.
853    ///
854    /// Returns the chunk, epoch, and partial signature if the ack is valid.
855    /// Returns an error if the ack is invalid.
856    fn validate_ack(
857        &self,
858        ack: &Ack<C::PublicKey, V, D>,
859        sender: &C::PublicKey,
860    ) -> Result<(), Error> {
861        // Validate chunk
862        self.validate_chunk(&ack.chunk, ack.epoch)?;
863
864        // Validate sender
865        let Some(index) = self.validators.is_participant(ack.epoch, sender) else {
866            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
867        };
868        if index != ack.signature.index {
869            return Err(Error::PeerMismatch);
870        }
871
872        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
873        {
874            let (eb_lo, eb_hi) = self.epoch_bounds;
875            let bound_lo = self.epoch.saturating_sub(eb_lo);
876            let bound_hi = self.epoch.saturating_add(eb_hi);
877            if ack.epoch < bound_lo || ack.epoch > bound_hi {
878                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
879            }
880        }
881
882        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
883        {
884            let bound_lo = self
885                .tip_manager
886                .get(&ack.chunk.sequencer)
887                .map(|t| t.chunk.height)
888                .unwrap_or(0);
889            let bound_hi = bound_lo + self.height_bound;
890            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
891                return Err(Error::AckHeightOutsideBounds(
892                    ack.chunk.height,
893                    bound_lo,
894                    bound_hi,
895                ));
896            }
897        }
898
899        // Validate partial signature
900        // Optimization: If the ack already exists, don't verify
901        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
902            return Err(Error::UnknownPolynomial(ack.epoch));
903        };
904        if !ack.verify(&self.namespace, polynomial) {
905            return Err(Error::InvalidAckSignature);
906        }
907
908        Ok(())
909    }
910
911    /// Takes a raw chunk from the p2p network and validates it against the epoch.
912    ///
913    /// Returns the chunk if the chunk is valid.
914    /// Returns an error if the chunk is invalid.
915    fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
916        // Verify sequencer
917        if self
918            .sequencers
919            .is_participant(epoch, &chunk.sequencer)
920            .is_none()
921        {
922            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
923        }
924
925        // Verify height
926        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
927            // Height must be at least the tip height
928            match chunk.height.cmp(&tip.chunk.height) {
929                std::cmp::Ordering::Less => {
930                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
931                }
932                std::cmp::Ordering::Equal => {
933                    // Ensure this matches the tip if the height is the same
934                    if tip.chunk.payload != chunk.payload {
935                        return Err(Error::ChunkMismatch(
936                            chunk.sequencer.to_string(),
937                            chunk.height,
938                        ));
939                    }
940                }
941                std::cmp::Ordering::Greater => {}
942            }
943        }
944
945        Ok(())
946    }
947
948    ////////////////////////////////////////
949    // Journal
950    ////////////////////////////////////////
951
952    /// Returns the section of the journal for the given height.
953    fn get_journal_section(&self, height: u64) -> u64 {
954        height / self.journal_heights_per_section
955    }
956
957    /// Ensures the journal exists and is initialized for the given sequencer.
958    /// If the journal does not exist, it is created and replayed.
959    /// Else, no action is taken.
960    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
961        // Return early if the journal already exists
962        if self.journals.contains_key(sequencer) {
963            return;
964        }
965
966        // Initialize journal
967        let cfg = journal::variable::Config {
968            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
969            compression: self.journal_compression,
970            codec_config: (),
971            buffer_pool: self.journal_buffer_pool.clone(),
972            write_buffer: self.journal_write_buffer,
973        };
974        let journal =
975            Journal::<_, Node<C::PublicKey, V, D>>::init(self.context.with_label("journal"), cfg)
976                .await
977                .expect("unable to init journal");
978
979        // Replay journal
980        {
981            debug!(?sequencer, "journal replay begin");
982
983            // Prepare the stream
984            let stream = journal
985                .replay(0, 0, self.journal_replay_buffer)
986                .await
987                .expect("unable to replay journal");
988            pin_mut!(stream);
989
990            // Read from the stream, which may be in arbitrary order.
991            // Remember the highest node height
992            let mut tip: Option<Node<C::PublicKey, V, D>> = None;
993            let mut num_items = 0;
994            while let Some(msg) = stream.next().await {
995                let (_, _, _, node) = msg.expect("unable to read from journal");
996                num_items += 1;
997                let height = node.chunk.height;
998                match tip {
999                    None => {
1000                        tip = Some(node);
1001                    }
1002                    Some(ref t) => {
1003                        if height > t.chunk.height {
1004                            tip = Some(node);
1005                        }
1006                    }
1007                }
1008            }
1009
1010            // Set the tip only once. The items from the journal may be in arbitrary order,
1011            // and the tip manager will panic if inserting tips out-of-order.
1012            if let Some(node) = tip.take() {
1013                let is_new = self.tip_manager.put(&node);
1014                assert!(is_new);
1015            }
1016
1017            debug!(?sequencer, ?num_items, "journal replay end");
1018        }
1019
1020        // Store journal
1021        self.journals.insert(sequencer.clone(), journal);
1022    }
1023
1024    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1025    ///
1026    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1027    /// the journal must already be open and replayed.
1028    async fn journal_append(&mut self, node: Node<C::PublicKey, V, D>) {
1029        let section = self.get_journal_section(node.chunk.height);
1030        self.journals
1031            .get_mut(&node.chunk.sequencer)
1032            .expect("journal does not exist")
1033            .append(section, node)
1034            .await
1035            .expect("unable to append to journal");
1036    }
1037
1038    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1039    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1040        let section = self.get_journal_section(height);
1041
1042        // Get journal
1043        let journal = self
1044            .journals
1045            .get_mut(sequencer)
1046            .expect("journal does not exist");
1047
1048        // Sync journal
1049        journal.sync(section).await.expect("unable to sync journal");
1050
1051        // Prune journal, ignoring errors
1052        let _ = journal.prune(section).await;
1053    }
1054}