commonware_consensus/ordered_broadcast/
engine.rs

1//! Engine for the module.
2//!
3//! It is responsible for:
4//! - Proposing nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer’s chain
7//! - Recovering threshold signatures from partial signatures for each chunk
8//! - Notifying other actors of new chunks and threshold signatures
9
10use super::{
11    metrics,
12    types::{Ack, Activity, Chunk, Context, Epoch, Error, Lock, Node, Parent, Proposal},
13    AckManager, Config, TipManager,
14};
15use crate::{Automaton, Monitor, Relay, Reporter, Supervisor, ThresholdSupervisor};
16use commonware_cryptography::{
17    bls12381::primitives::{group, poly, variant::Variant},
18    Digest, PublicKey, Signer,
19};
20use commonware_macros::select;
21use commonware_p2p::{
22    utils::codec::{wrap, WrappedSender},
23    Receiver, Recipients, Sender,
24};
25use commonware_runtime::{
26    telemetry::metrics::{
27        histogram,
28        status::{CounterExt, Status},
29    },
30    Clock, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::{self, variable::Journal};
33use commonware_utils::futures::Pool as FuturesPool;
34use futures::{
35    channel::oneshot,
36    future::{self, Either},
37    pin_mut, StreamExt,
38};
39use std::{
40    collections::BTreeMap,
41    marker::PhantomData,
42    time::{Duration, SystemTime},
43};
44use tracing::{debug, error, info, warn};
45
46/// Represents a pending verification request to the automaton.
47struct Verify<C: PublicKey, D: Digest, E: Clock> {
48    timer: histogram::Timer<E>,
49    context: Context<C>,
50    payload: D,
51    result: Result<bool, Error>,
52}
53
54/// Instance of the engine.
55pub struct Engine<
56    E: Clock + Spawner + Storage + Metrics,
57    C: Signer,
58    V: Variant,
59    D: Digest,
60    A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
61    R: Relay<Digest = D>,
62    Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
63    M: Monitor<Index = Epoch>,
64    Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
65    TSu: ThresholdSupervisor<
66        Index = Epoch,
67        PublicKey = C::PublicKey,
68        Identity = V::Public,
69        Polynomial = poly::Public<V>,
70        Share = group::Share,
71    >,
72    NetS: Sender<PublicKey = C::PublicKey>,
73    NetR: Receiver<PublicKey = C::PublicKey>,
74> {
75    ////////////////////////////////////////
76    // Interfaces
77    ////////////////////////////////////////
78    context: E,
79    crypto: C,
80    automaton: A,
81    relay: R,
82    monitor: M,
83    sequencers: Su,
84    validators: TSu,
85    reporter: Z,
86
87    ////////////////////////////////////////
88    // Namespace Constants
89    ////////////////////////////////////////
90
91    // The namespace signatures.
92    namespace: Vec<u8>,
93
94    ////////////////////////////////////////
95    // Timeouts
96    ////////////////////////////////////////
97
98    // The configured timeout for rebroadcasting a chunk to all validators
99    rebroadcast_timeout: Duration,
100    rebroadcast_deadline: Option<SystemTime>,
101
102    ////////////////////////////////////////
103    // Pruning
104    ////////////////////////////////////////
105
106    // A tuple representing the epochs to keep in memory.
107    // The first element is the number of old epochs to keep.
108    // The second element is the number of future epochs to accept.
109    //
110    // For example, if the current epoch is 10, and the bounds are (1, 2), then
111    // epochs 9, 10, 11, and 12 are kept (and accepted);
112    // all others are pruned or rejected.
113    epoch_bounds: (u64, u64),
114
115    // The number of future heights to accept acks for.
116    // This is used to prevent spam of acks for arbitrary heights.
117    //
118    // For example, if the current tip for a sequencer is at height 100,
119    // and the height_bound is 10, then acks for heights 100-110 are accepted.
120    height_bound: u64,
121
122    ////////////////////////////////////////
123    // Messaging
124    ////////////////////////////////////////
125
126    // A stream of futures.
127    //
128    // Each future represents a verification request to the automaton
129    // that will either timeout or resolve with a boolean.
130    //
131    // There is no limit to the number of futures in this pool, so the automaton
132    // can apply backpressure by dropping the verification requests if necessary.
133    pending_verifies: FuturesPool<Verify<C::PublicKey, D, E>>,
134
135    ////////////////////////////////////////
136    // Storage
137    ////////////////////////////////////////
138
139    // The number of heights per each journal section.
140    journal_heights_per_section: u64,
141
142    // The number of concurrent operations when replaying journals.
143    journal_replay_concurrency: usize,
144
145    // The number of bytes to buffer when replaying a journal.
146    journal_replay_buffer: usize,
147
148    // The size of the write buffer to use for each blob in the journal.
149    journal_write_buffer: usize,
150
151    // A prefix for the journal names.
152    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
153    journal_name_prefix: String,
154
155    // Compression level for the journal.
156    journal_compression: Option<u8>,
157
158    // A map of sequencer public keys to their journals.
159    #[allow(clippy::type_complexity)]
160    journals: BTreeMap<C::PublicKey, Journal<E, Node<C::PublicKey, V, D>>>,
161
162    ////////////////////////////////////////
163    // State
164    ////////////////////////////////////////
165
166    // Tracks the current tip for each sequencer.
167    // The tip is a `Node` which is comprised of a `Chunk` and,
168    // if not the genesis chunk for that sequencer,
169    // a threshold signature over the parent chunk.
170    tip_manager: TipManager<C::PublicKey, V, D>,
171
172    // Tracks the acknowledgements for chunks.
173    // This is comprised of partial signatures or threshold signatures.
174    ack_manager: AckManager<C::PublicKey, V, D>,
175
176    // The current epoch.
177    epoch: Epoch,
178
179    ////////////////////////////////////////
180    // Network
181    ////////////////////////////////////////
182
183    // Whether to send proposals as priority messages.
184    priority_proposals: bool,
185
186    // Whether to send acks as priority messages.
187    priority_acks: bool,
188
189    // The network sender and receiver types.
190    _phantom: PhantomData<(NetS, NetR)>,
191
192    ////////////////////////////////////////
193    // Metrics
194    ////////////////////////////////////////
195
196    // Metrics
197    metrics: metrics::Metrics<E>,
198
199    // The timer of my last new proposal
200    propose_timer: Option<histogram::Timer<E>>,
201}
202
203impl<
204        E: Clock + Spawner + Storage + Metrics,
205        C: Signer,
206        V: Variant,
207        D: Digest,
208        A: Automaton<Context = Context<C::PublicKey>, Digest = D> + Clone,
209        R: Relay<Digest = D>,
210        Z: Reporter<Activity = Activity<C::PublicKey, V, D>>,
211        M: Monitor<Index = Epoch>,
212        Su: Supervisor<Index = Epoch, PublicKey = C::PublicKey>,
213        TSu: ThresholdSupervisor<
214            Index = Epoch,
215            PublicKey = C::PublicKey,
216            Identity = V::Public,
217            Polynomial = poly::Public<V>,
218            Share = group::Share,
219        >,
220        NetS: Sender<PublicKey = C::PublicKey>,
221        NetR: Receiver<PublicKey = C::PublicKey>,
222    > Engine<E, C, V, D, A, R, Z, M, Su, TSu, NetS, NetR>
223{
224    /// Creates a new engine with the given context and configuration.
225    pub fn new(context: E, cfg: Config<C, V, D, A, R, Z, M, Su, TSu>) -> Self {
226        let metrics = metrics::Metrics::init(context.clone());
227
228        Self {
229            context,
230            crypto: cfg.crypto,
231            automaton: cfg.automaton,
232            relay: cfg.relay,
233            reporter: cfg.reporter,
234            monitor: cfg.monitor,
235            sequencers: cfg.sequencers,
236            validators: cfg.validators,
237            namespace: cfg.namespace,
238            rebroadcast_timeout: cfg.rebroadcast_timeout,
239            rebroadcast_deadline: None,
240            epoch_bounds: cfg.epoch_bounds,
241            height_bound: cfg.height_bound,
242            pending_verifies: FuturesPool::default(),
243            journal_heights_per_section: cfg.journal_heights_per_section,
244            journal_replay_concurrency: cfg.journal_replay_concurrency,
245            journal_replay_buffer: cfg.journal_replay_buffer,
246            journal_write_buffer: cfg.journal_write_buffer,
247            journal_name_prefix: cfg.journal_name_prefix,
248            journal_compression: cfg.journal_compression,
249            journals: BTreeMap::new(),
250            tip_manager: TipManager::<C::PublicKey, V, D>::new(),
251            ack_manager: AckManager::<C::PublicKey, V, D>::new(),
252            epoch: 0,
253            priority_proposals: cfg.priority_proposals,
254            priority_acks: cfg.priority_acks,
255            _phantom: PhantomData,
256            metrics,
257            propose_timer: None,
258        }
259    }
260
261    /// Runs the engine until the context is stopped.
262    ///
263    /// The engine will handle:
264    /// - Requesting and processing proposals from the application
265    /// - Timeouts
266    ///   - Refreshing the Epoch
267    ///   - Rebroadcasting Proposals
268    /// - Messages from the network:
269    ///   - Nodes
270    ///   - Acks
271    pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
272        self.context.spawn_ref()(self.run(chunk_network, ack_network))
273    }
274
275    /// Inner run loop called by `start`.
276    async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
277        let (mut node_sender, mut node_receiver) = wrap((), chunk_network.0, chunk_network.1);
278        let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
279        let mut shutdown = self.context.stopped();
280
281        // Tracks if there is an outstanding proposal request to the automaton.
282        let mut pending: Option<(Context<C::PublicKey>, oneshot::Receiver<D>)> = None;
283
284        // Initialize the epoch
285        let (latest, mut epoch_updates) = self.monitor.subscribe().await;
286        self.epoch = latest;
287
288        // Before starting on the main loop, initialize my own sequencer journal
289        // and attempt to rebroadcast if necessary.
290        self.journal_prepare(&self.crypto.public_key()).await;
291        if let Err(err) = self.rebroadcast(&mut node_sender).await {
292            // Rebroadcasting may return a non-critical error, so log the error and continue.
293            info!(?err, "initial rebroadcast failed");
294        }
295
296        loop {
297            // Request a new proposal if necessary
298            if pending.is_none() {
299                if let Some(context) = self.should_propose() {
300                    let receiver = self.automaton.propose(context.clone()).await;
301                    pending = Some((context, receiver));
302                }
303            }
304
305            // Create deadline futures.
306            //
307            // If the deadline is None, the future will never resolve.
308            let rebroadcast = match self.rebroadcast_deadline {
309                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
310                None => Either::Right(future::pending()),
311            };
312            let propose = match &mut pending {
313                Some((_context, receiver)) => Either::Left(receiver),
314                None => Either::Right(futures::future::pending()),
315            };
316
317            // Process the next event
318            select! {
319                // Handle shutdown signal
320                _ = &mut shutdown => {
321                    debug!("shutdown");
322                    break;
323                },
324
325                // Handle refresh epoch deadline
326                epoch = epoch_updates.next() => {
327                    // Error handling
328                    let Some(epoch) = epoch else {
329                        error!("epoch subscription failed");
330                        break;
331                    };
332
333                    // Refresh the epoch
334                    debug!(current=self.epoch, new=epoch, "refresh epoch");
335                    assert!(epoch >= self.epoch);
336                    self.epoch = epoch;
337                    continue;
338                },
339
340                // Handle rebroadcast deadline
341                _ = rebroadcast => {
342                    debug!(epoch = self.epoch, sender=?self.crypto.public_key(), "rebroadcast");
343                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
344                        info!(?err, "rebroadcast failed");
345                        continue;
346                    }
347                },
348
349                // Propose a new chunk
350                receiver = propose => {
351                    // Clear the pending proposal
352                    let (context, _) = pending.take().unwrap();
353                    debug!(height = context.height, "propose");
354
355                    // Error handling for dropped proposals
356                    let Ok(payload) = receiver else {
357                        warn!(?context, "automaton dropped proposal");
358                        continue;
359                    };
360
361                    // Propose the chunk
362                    if let Err(err) = self.propose(context.clone(), payload, &mut node_sender).await {
363                        warn!(?err, ?context, "propose new failed");
364                        continue;
365                    }
366                },
367
368                // Handle incoming nodes
369                msg = node_receiver.recv() => {
370                    // Error handling
371                    let (sender, msg) = match msg {
372                        Ok(r) => r,
373                        Err(err) => {
374                            error!(?err, "node receiver failed");
375                            break;
376                        }
377                    };
378                    let mut guard = self.metrics.nodes.guard(Status::Invalid);
379                    let node = match msg {
380                        Ok(node) => node,
381                        Err(err) => {
382                            warn!(?err, ?sender, "node decode failed");
383                            continue;
384                        }
385                    };
386                    let result = match self.validate_node(&node, &sender) {
387                        Ok(result) => result,
388                        Err(err) => {
389                            warn!(?err, ?sender, "node validate failed");
390                            continue;
391                        }
392                    };
393
394                    // Initialize journal for sequencer if it does not exist
395                    self.journal_prepare(&sender).await;
396
397                    // Handle the parent threshold signature
398                    if let Some(parent_chunk) = result {
399                        let parent = node.parent.as_ref().unwrap();
400                        self.handle_threshold(&parent_chunk, parent.epoch, parent.signature).await;
401                    }
402
403                    // Process the node
404                    //
405                    // Note, this node may be a duplicate. If it is, we will attempt to verify it and vote
406                    // on it again (our original vote may have been lost).
407                    self.handle_node(&node).await;
408                    debug!(?sender, height=node.chunk.height, "node");
409                    guard.set(Status::Success);
410                },
411
412                // Handle incoming acks
413                msg = ack_receiver.recv() => {
414                    // Error handling
415                    let (sender, msg) = match msg {
416                        Ok(r) => r,
417                        Err(err) => {
418                            warn!(?err, "ack receiver failed");
419                            break;
420                        }
421                    };
422                    let mut guard = self.metrics.acks.guard(Status::Invalid);
423                    let ack = match msg {
424                        Ok(ack) => ack,
425                        Err(err) => {
426                            warn!(?err, ?sender, "ack decode failed");
427                            continue;
428                        }
429                    };
430                    if let Err(err) = self.validate_ack(&ack, &sender) {
431                        warn!(?err, ?sender, "ack validate failed");
432                        continue;
433                    };
434                    if let Err(err) = self.handle_ack(&ack).await {
435                        warn!(?err, ?sender, "ack handle failed");
436                        guard.set(Status::Failure);
437                        continue;
438                    }
439                    debug!(?sender, epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "ack");
440                    guard.set(Status::Success);
441                },
442
443                // Handle completed verification futures.
444                verify = self.pending_verifies.next_completed() => {
445                    let Verify { timer, context, payload, result } = verify;
446                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning
447                    match result {
448                        Err(err) => {
449                            warn!(?err, ?context, "verified returned error");
450                            self.metrics.verify.inc(Status::Dropped);
451                        }
452                        Ok(false) => {
453                            warn!(?context, "verified was false");
454                            self.metrics.verify.inc(Status::Failure);
455                        }
456                        Ok(true) => {
457                            debug!(?context, "verified");
458                            self.metrics.verify.inc(Status::Success);
459                            if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
460                                warn!(?err, ?context, ?payload, "verified handle failed");
461                            }
462                        },
463                    }
464                },
465            }
466        }
467
468        // Close all journals, regardless of how we exit the loop
469        self.pending_verifies.cancel_all();
470        while let Some((_, journal)) = self.journals.pop_first() {
471            journal.close().await.expect("unable to close journal");
472        }
473    }
474
475    ////////////////////////////////////////
476    // Handling
477    ////////////////////////////////////////
478
479    /// Handles a verified message from the automaton.
480    ///
481    /// This is called when the automaton has verified a payload.
482    /// The chunk will be signed if it matches the current tip.
483    async fn handle_app_verified(
484        &mut self,
485        context: &Context<C::PublicKey>,
486        payload: &D,
487        ack_sender: &mut WrappedSender<NetS, Ack<C::PublicKey, V, D>>,
488    ) -> Result<(), Error> {
489        // Get the tip
490        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
491            return Err(Error::AppVerifiedNoTip);
492        };
493
494        // Return early if the height does not match
495        if tip.chunk.height != context.height {
496            return Err(Error::AppVerifiedHeightMismatch);
497        }
498
499        // Return early if the payload does not match
500        if tip.chunk.payload != *payload {
501            return Err(Error::AppVerifiedPayloadMismatch);
502        }
503
504        // Emit the activity
505        self.reporter
506            .report(Activity::Tip(Proposal::new(
507                tip.chunk.clone(),
508                tip.signature.clone(),
509            )))
510            .await;
511
512        // Construct partial signature (if a validator)
513        let Some(share) = self.validators.share(self.epoch) else {
514            return Err(Error::UnknownShare(self.epoch));
515        };
516        let ack = Ack::sign(&self.namespace, share, tip.chunk.clone(), self.epoch);
517
518        // Sync the journal to prevent ever acking two conflicting chunks at
519        // the same height, even if the node crashes and restarts.
520        self.journal_sync(&context.sequencer, context.height).await;
521
522        // The recipients are all the validators in the epoch and the sequencer.
523        // The sequencer may or may not be a validator.
524        let recipients = {
525            let Some(validators) = self.validators.participants(self.epoch) else {
526                return Err(Error::UnknownValidators(self.epoch));
527            };
528            let mut recipients = validators.clone();
529            if self
530                .validators
531                .is_participant(self.epoch, &tip.chunk.sequencer)
532                .is_none()
533            {
534                recipients.push(tip.chunk.sequencer.clone());
535            }
536            recipients
537        };
538
539        // Handle the ack internally
540        self.handle_ack(&ack).await?;
541
542        // Send the ack to the network
543        ack_sender
544            .send(Recipients::Some(recipients), ack, self.priority_acks)
545            .await
546            .map_err(|_| Error::UnableToSendMessage)?;
547
548        Ok(())
549    }
550
551    /// Handles a threshold, either received from a `Node` from the network or generated locally.
552    ///
553    /// The threshold must already be verified.
554    /// If the threshold is new, it is stored and the proof is emitted to the committer.
555    /// If the threshold is already known, it is ignored.
556    async fn handle_threshold(
557        &mut self,
558        chunk: &Chunk<C::PublicKey, D>,
559        epoch: Epoch,
560        threshold: V::Signature,
561    ) {
562        // Set the threshold signature, returning early if it already exists
563        if !self
564            .ack_manager
565            .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
566        {
567            return;
568        }
569
570        // If the threshold is for my sequencer, record metric
571        if chunk.sequencer == self.crypto.public_key() {
572            self.propose_timer.take();
573        }
574
575        // Emit the activity
576        self.reporter
577            .report(Activity::Lock(Lock::new(chunk.clone(), epoch, threshold)))
578            .await;
579    }
580
581    /// Handles an ack
582    ///
583    /// Returns an error if the ack is invalid, or can be ignored
584    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
585    async fn handle_ack(&mut self, ack: &Ack<C::PublicKey, V, D>) -> Result<(), Error> {
586        // Get the quorum
587        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
588            return Err(Error::UnknownPolynomial(ack.epoch));
589        };
590        let quorum = polynomial.required();
591
592        // Add the partial signature. If a new threshold is formed, handle it.
593        if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
594            debug!(epoch=ack.epoch, sequencer=?ack.chunk.sequencer, height=ack.chunk.height, "recovered threshold");
595            self.metrics.threshold.inc();
596            self.handle_threshold(&ack.chunk, ack.epoch, threshold)
597                .await;
598        }
599
600        Ok(())
601    }
602
603    /// Handles a valid `Node` message, storing it as the tip.
604    /// Alerts the automaton of the new node.
605    /// Also appends the `Node` to the journal if it's new.
606    async fn handle_node(&mut self, node: &Node<C::PublicKey, V, D>) {
607        // Store the tip
608        let is_new = self.tip_manager.put(node);
609
610        // If a higher height than the previous tip...
611        if is_new {
612            // Update metrics for sequencer height
613            self.metrics
614                .sequencer_heights
615                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
616                .set(node.chunk.height as i64);
617
618            // Append to journal if the `Node` is new, making sure to sync the journal
619            // to prevent sending two conflicting chunks to the automaton, even if
620            // the node crashes and restarts.
621            self.journal_append(node.clone()).await;
622            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
623                .await;
624        }
625
626        // Verify the chunk with the automaton
627        let context = Context {
628            sequencer: node.chunk.sequencer.clone(),
629            height: node.chunk.height,
630        };
631        let payload = node.chunk.payload;
632        let mut automaton = self.automaton.clone();
633        let timer = self.metrics.verify_duration.timer();
634        self.pending_verifies.push(async move {
635            let receiver = automaton.verify(context.clone(), payload).await;
636            let result = receiver.await.map_err(Error::AppVerifyCanceled);
637            Verify {
638                timer,
639                context,
640                payload,
641                result,
642            }
643        });
644    }
645
646    ////////////////////////////////////////
647    // Proposing
648    ////////////////////////////////////////
649
650    /// Returns a `Context` if the engine should request a proposal from the automaton.
651    ///
652    /// Should only be called if the engine is not already waiting for a proposal.
653    fn should_propose(&self) -> Option<Context<C::PublicKey>> {
654        let me = self.crypto.public_key();
655
656        // Return `None` if I am not a sequencer in the current epoch
657        self.sequencers.is_participant(self.epoch, &me)?;
658
659        // Return the next context unless my current tip has no threshold signature
660        match self.tip_manager.get(&me) {
661            None => Some(Context {
662                sequencer: me,
663                height: 0,
664            }),
665            Some(tip) => self
666                .ack_manager
667                .get_threshold(&me, tip.chunk.height)
668                .map(|_| Context {
669                    sequencer: me,
670                    height: tip.chunk.height.checked_add(1).unwrap(),
671                }),
672        }
673    }
674
675    /// Propose a new chunk to the network.
676    ///
677    /// The result is returned to the caller via the provided channel.
678    /// The proposal is only successful if the parent Chunk and threshold signature are known.
679    async fn propose(
680        &mut self,
681        context: Context<C::PublicKey>,
682        payload: D,
683        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
684    ) -> Result<(), Error> {
685        let mut guard = self.metrics.propose.guard(Status::Dropped);
686        let me = self.crypto.public_key();
687
688        // Error-check context sequencer
689        if context.sequencer != me {
690            return Err(Error::ContextSequencer);
691        }
692
693        // Error-check that I am a sequencer in the current epoch
694        if self.sequencers.is_participant(self.epoch, &me).is_none() {
695            return Err(Error::IAmNotASequencer(self.epoch));
696        }
697
698        // Get parent Chunk and threshold signature
699        let mut height = 0;
700        let mut parent = None;
701        if let Some(tip) = self.tip_manager.get(&me) {
702            // Get threshold, or, if it doesn't exist, return an error
703            let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
704            else {
705                return Err(Error::MissingThreshold);
706            };
707
708            // Update height and parent
709            height = tip.chunk.height + 1;
710            parent = Some(Parent::new(tip.chunk.payload, epoch, threshold));
711        }
712
713        // Error-check context height
714        if context.height != height {
715            return Err(Error::ContextHeight);
716        }
717
718        // Construct new node
719        let node = Node::sign(&self.namespace, &mut self.crypto, height, payload, parent);
720
721        // Deal with the chunk as if it were received over the network
722        self.handle_node(&node).await;
723
724        // Sync the journal to prevent ever proposing two conflicting chunks
725        // at the same height, even if the node crashes and restarts
726        self.journal_sync(&me, height).await;
727
728        // Record the start time of the proposal
729        self.propose_timer = Some(self.metrics.e2e_duration.timer());
730
731        // Broadcast to network
732        if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
733            guard.set(Status::Failure);
734            return Err(err);
735        };
736
737        // Return success
738        guard.set(Status::Success);
739        Ok(())
740    }
741
742    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all validators.
743    ///
744    /// This is only done if:
745    /// - this instance is the sequencer for the current epoch.
746    /// - this instance has a chunk to rebroadcast.
747    /// - this instance has not yet collected the threshold signature for the chunk.
748    async fn rebroadcast(
749        &mut self,
750        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
751    ) -> Result<(), Error> {
752        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
753
754        // Unset the rebroadcast deadline
755        self.rebroadcast_deadline = None;
756
757        // Return if not a sequencer in the current epoch
758        let me = self.crypto.public_key();
759        if self.sequencers.is_participant(self.epoch, &me).is_none() {
760            return Err(Error::IAmNotASequencer(self.epoch));
761        }
762
763        // Return if no chunk to rebroadcast
764        let Some(tip) = self.tip_manager.get(&me) else {
765            return Err(Error::NothingToRebroadcast);
766        };
767
768        // Return if threshold already collected
769        if self
770            .ack_manager
771            .get_threshold(&me, tip.chunk.height)
772            .is_some()
773        {
774            return Err(Error::AlreadyThresholded);
775        }
776
777        // Broadcast the message, which resets the rebroadcast deadline
778        guard.set(Status::Failure);
779        self.broadcast(tip, node_sender, self.epoch).await?;
780        guard.set(Status::Success);
781        Ok(())
782    }
783
784    /// Send a  `Node` message to all validators in the given epoch.
785    async fn broadcast(
786        &mut self,
787        node: Node<C::PublicKey, V, D>,
788        node_sender: &mut WrappedSender<NetS, Node<C::PublicKey, V, D>>,
789        epoch: Epoch,
790    ) -> Result<(), Error> {
791        // Get the validators for the epoch
792        let Some(validators) = self.validators.participants(epoch) else {
793            return Err(Error::UnknownValidators(epoch));
794        };
795
796        // Tell the relay to broadcast the full data
797        self.relay.broadcast(node.chunk.payload).await;
798
799        // Send the node to all validators
800        node_sender
801            .send(
802                Recipients::Some(validators.clone()),
803                node,
804                self.priority_proposals,
805            )
806            .await
807            .map_err(|_| Error::BroadcastFailed)?;
808
809        // Set the rebroadcast deadline
810        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
811
812        Ok(())
813    }
814
815    ////////////////////////////////////////
816    // Validation
817    ////////////////////////////////////////
818
819    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
820    ///
821    /// If valid (and not already the tracked tip for the sender), returns the implied
822    /// parent chunk and its threshold signature.
823    /// Else returns an error if the `Node` is invalid.
824    fn validate_node(
825        &mut self,
826        node: &Node<C::PublicKey, V, D>,
827        sender: &C::PublicKey,
828    ) -> Result<Option<Chunk<C::PublicKey, D>>, Error> {
829        // Verify the sender
830        if node.chunk.sequencer != *sender {
831            return Err(Error::PeerMismatch);
832        }
833
834        // Optimization: If the node is exactly equal to the tip,
835        // don't perform further validation.
836        if let Some(tip) = self.tip_manager.get(sender) {
837            if tip == *node {
838                return Ok(None);
839            }
840        }
841
842        // Validate chunk
843        self.validate_chunk(&node.chunk, self.epoch)?;
844
845        // Verify the signature
846        node.verify(&self.namespace, self.validators.identity())
847            .map_err(|_| Error::InvalidNodeSignature)
848    }
849
850    /// Takes a raw ack (from sender) from the p2p network and validates it.
851    ///
852    /// Returns the chunk, epoch, and partial signature if the ack is valid.
853    /// Returns an error if the ack is invalid.
854    fn validate_ack(
855        &self,
856        ack: &Ack<C::PublicKey, V, D>,
857        sender: &C::PublicKey,
858    ) -> Result<(), Error> {
859        // Validate chunk
860        self.validate_chunk(&ack.chunk, ack.epoch)?;
861
862        // Validate sender
863        let Some(index) = self.validators.is_participant(ack.epoch, sender) else {
864            return Err(Error::UnknownValidator(ack.epoch, sender.to_string()));
865        };
866        if index != ack.signature.index {
867            return Err(Error::PeerMismatch);
868        }
869
870        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
871        {
872            let (eb_lo, eb_hi) = self.epoch_bounds;
873            let bound_lo = self.epoch.saturating_sub(eb_lo);
874            let bound_hi = self.epoch.saturating_add(eb_hi);
875            if ack.epoch < bound_lo || ack.epoch > bound_hi {
876                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
877            }
878        }
879
880        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
881        {
882            let bound_lo = self
883                .tip_manager
884                .get(&ack.chunk.sequencer)
885                .map(|t| t.chunk.height)
886                .unwrap_or(0);
887            let bound_hi = bound_lo + self.height_bound;
888            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
889                return Err(Error::AckHeightOutsideBounds(
890                    ack.chunk.height,
891                    bound_lo,
892                    bound_hi,
893                ));
894            }
895        }
896
897        // Validate partial signature
898        // Optimization: If the ack already exists, don't verify
899        let Some(polynomial) = self.validators.polynomial(ack.epoch) else {
900            return Err(Error::UnknownPolynomial(ack.epoch));
901        };
902        if !ack.verify(&self.namespace, polynomial) {
903            return Err(Error::InvalidAckSignature);
904        }
905
906        Ok(())
907    }
908
909    /// Takes a raw chunk from the p2p network and validates it against the epoch.
910    ///
911    /// Returns the chunk if the chunk is valid.
912    /// Returns an error if the chunk is invalid.
913    fn validate_chunk(&self, chunk: &Chunk<C::PublicKey, D>, epoch: Epoch) -> Result<(), Error> {
914        // Verify sequencer
915        if self
916            .sequencers
917            .is_participant(epoch, &chunk.sequencer)
918            .is_none()
919        {
920            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
921        }
922
923        // Verify height
924        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
925            // Height must be at least the tip height
926            match chunk.height.cmp(&tip.chunk.height) {
927                std::cmp::Ordering::Less => {
928                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
929                }
930                std::cmp::Ordering::Equal => {
931                    // Ensure this matches the tip if the height is the same
932                    if tip.chunk.payload != chunk.payload {
933                        return Err(Error::ChunkMismatch(
934                            chunk.sequencer.to_string(),
935                            chunk.height,
936                        ));
937                    }
938                }
939                std::cmp::Ordering::Greater => {}
940            }
941        }
942
943        Ok(())
944    }
945
946    ////////////////////////////////////////
947    // Journal
948    ////////////////////////////////////////
949
950    /// Returns the section of the journal for the given height.
951    fn get_journal_section(&self, height: u64) -> u64 {
952        height / self.journal_heights_per_section
953    }
954
955    /// Ensures the journal exists and is initialized for the given sequencer.
956    /// If the journal does not exist, it is created and replayed.
957    /// Else, no action is taken.
958    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
959        // Return early if the journal already exists
960        if self.journals.contains_key(sequencer) {
961            return;
962        }
963
964        // Initialize journal
965        let cfg = journal::variable::Config {
966            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
967            compression: self.journal_compression,
968            codec_config: (),
969            write_buffer: self.journal_write_buffer,
970        };
971        let journal =
972            Journal::<_, Node<C::PublicKey, V, D>>::init(self.context.with_label("journal"), cfg)
973                .await
974                .expect("unable to init journal");
975
976        // Replay journal
977        {
978            debug!(?sequencer, "journal replay begin");
979
980            // Prepare the stream
981            let stream = journal
982                .replay(self.journal_replay_concurrency, self.journal_replay_buffer)
983                .await
984                .expect("unable to replay journal");
985            pin_mut!(stream);
986
987            // Read from the stream, which may be in arbitrary order.
988            // Remember the highest node height
989            let mut tip: Option<Node<C::PublicKey, V, D>> = None;
990            let mut num_items = 0;
991            while let Some(msg) = stream.next().await {
992                let (_, _, _, node) = msg.expect("unable to read from journal");
993                num_items += 1;
994                let height = node.chunk.height;
995                match tip {
996                    None => {
997                        tip = Some(node);
998                    }
999                    Some(ref t) => {
1000                        if height > t.chunk.height {
1001                            tip = Some(node);
1002                        }
1003                    }
1004                }
1005            }
1006
1007            // Set the tip only once. The items from the journal may be in arbitrary order,
1008            // and the tip manager will panic if inserting tips out-of-order.
1009            if let Some(node) = tip.take() {
1010                let is_new = self.tip_manager.put(&node);
1011                assert!(is_new);
1012            }
1013
1014            debug!(?sequencer, ?num_items, "journal replay end");
1015        }
1016
1017        // Store journal
1018        self.journals.insert(sequencer.clone(), journal);
1019    }
1020
1021    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1022    ///
1023    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1024    /// the journal must already be open and replayed.
1025    async fn journal_append(&mut self, node: Node<C::PublicKey, V, D>) {
1026        let section = self.get_journal_section(node.chunk.height);
1027        self.journals
1028            .get_mut(&node.chunk.sequencer)
1029            .expect("journal does not exist")
1030            .append(section, node)
1031            .await
1032            .expect("unable to append to journal");
1033    }
1034
1035    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1036    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1037        let section = self.get_journal_section(height);
1038
1039        // Get journal
1040        let journal = self
1041            .journals
1042            .get_mut(sequencer)
1043            .expect("journal does not exist");
1044
1045        // Sync journal
1046        journal.sync(section).await.expect("unable to sync journal");
1047
1048        // Prune journal, ignoring errors
1049        let _ = journal.prune(section).await;
1050    }
1051}