commonware_broadcast/linked/
engine.rs

1//! Engine for the broadcast module.
2//!
3//! It is responsible for:
4//! - Broadcasting nodes (if a sequencer)
5//! - Signing chunks (if a validator)
6//! - Tracking the latest chunk in each sequencer’s chain
7//! - Recovering threshold signatures from partial signatures for each chunk
8//! - Notifying other actors of new chunks and threshold signatures
9
10use super::{metrics, AckManager, Config, Mailbox, Message, TipManager};
11use crate::{
12    linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
13    Application, Collector, ThresholdCoordinator,
14};
15use commonware_cryptography::{
16    bls12381::primitives::{
17        group::{self},
18        ops,
19        poly::{self},
20    },
21    Scheme,
22};
23use commonware_macros::select;
24use commonware_p2p::{Receiver, Recipients, Sender};
25use commonware_runtime::{
26    telemetry::{
27        histogram,
28        status::{CounterExt, Status},
29    },
30    Blob, Clock, Handle, Metrics, Spawner, Storage,
31};
32use commonware_storage::journal::{self, variable::Journal};
33use commonware_utils::{futures::Pool as FuturesPool, Array};
34use futures::{
35    channel::{mpsc, oneshot},
36    future::{self, Either},
37    pin_mut, StreamExt,
38};
39use std::{
40    collections::BTreeMap,
41    marker::PhantomData,
42    time::{Duration, SystemTime},
43};
44use thiserror::Error;
45use tracing::{debug, error, info, warn};
46
47/// Represents a pending verification request to the application.
48struct Verify<C: Scheme, D: Array, E: Clock> {
49    timer: histogram::Timer<E>,
50    context: Context<C::PublicKey>,
51    payload: D,
52    result: Result<bool, Error>,
53}
54
55/// Instance of the `linked` broadcast engine.
56pub struct Engine<
57    B: Blob,
58    E: Clock + Spawner + Storage<B> + Metrics,
59    C: Scheme,
60    D: Array,
61    A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
62    Z: Collector<Digest = D>,
63    S: ThresholdCoordinator<
64        Index = Epoch,
65        Share = group::Share,
66        Identity = poly::Public,
67        PublicKey = C::PublicKey,
68    >,
69    NetS: Sender<PublicKey = C::PublicKey>,
70    NetR: Receiver<PublicKey = C::PublicKey>,
71> {
72    ////////////////////////////////////////
73    // Interfaces
74    ////////////////////////////////////////
75    context: E,
76    crypto: C,
77    coordinator: S,
78    application: A,
79    collector: Z,
80    _sender: PhantomData<NetS>,
81    _receiver: PhantomData<NetR>,
82
83    ////////////////////////////////////////
84    // Namespace Constants
85    ////////////////////////////////////////
86
87    // The namespace for chunk signatures.
88    chunk_namespace: Vec<u8>,
89
90    // The namespace for ack signatures.
91    ack_namespace: Vec<u8>,
92
93    ////////////////////////////////////////
94    // Timeouts
95    ////////////////////////////////////////
96
97    // The configured timeout for refreshing the epoch
98    refresh_epoch_timeout: Duration,
99    refresh_epoch_deadline: Option<SystemTime>,
100
101    // The configured timeout for rebroadcasting a chunk to all signers
102    rebroadcast_timeout: Duration,
103    rebroadcast_deadline: Option<SystemTime>,
104
105    ////////////////////////////////////////
106    // Pruning
107    ////////////////////////////////////////
108
109    // A tuple representing the epochs to keep in memory.
110    // The first element is the number of old epochs to keep.
111    // The second element is the number of future epochs to accept.
112    //
113    // For example, if the current epoch is 10, and the bounds are (1, 2), then
114    // epochs 9, 10, 11, and 12 are kept (and accepted);
115    // all others are pruned or rejected.
116    epoch_bounds: (u64, u64),
117
118    // The number of future heights to accept acks for.
119    // This is used to prevent spam of acks for arbitrary heights.
120    //
121    // For example, if the current tip for a sequencer is at height 100,
122    // and the height_bound is 10, then acks for heights 100-110 are accepted.
123    height_bound: u64,
124
125    ////////////////////////////////////////
126    // Messaging
127    ////////////////////////////////////////
128
129    // A stream of futures.
130    // Each future represents a verification request to the application
131    // that will either timeout or resolve with a boolean.
132    #[allow(clippy::type_complexity)]
133    pending_verifies: FuturesPool<Verify<C, D, E>>,
134
135    // The maximum number of items in `pending_verifies`.
136    verify_concurrent: usize,
137
138    // The mailbox for receiving messages (primarily from the application).
139    mailbox_receiver: mpsc::Receiver<Message<D>>,
140
141    ////////////////////////////////////////
142    // Storage
143    ////////////////////////////////////////
144
145    // The number of heights per each journal section.
146    journal_heights_per_section: u64,
147
148    // The number of concurrent operations when replaying journals.
149    journal_replay_concurrency: usize,
150
151    // A prefix for the journal names.
152    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
153    journal_name_prefix: String,
154
155    // A map of sequencer public keys to their journals.
156    journals: BTreeMap<C::PublicKey, Journal<B, E>>,
157
158    ////////////////////////////////////////
159    // State
160    ////////////////////////////////////////
161
162    // Tracks the current tip for each sequencer.
163    // The tip is a `Node` which is comprised of a `Chunk` and,
164    // if not the genesis chunk for that sequencer,
165    // a threshold signature over the parent chunk.
166    tip_manager: TipManager<C, D>,
167
168    // Tracks the acknowledgements for chunks.
169    // This is comprised of partial signatures or threshold signatures.
170    ack_manager: AckManager<D, C::PublicKey>,
171
172    // The current epoch.
173    epoch: Epoch,
174
175    ////////////////////////////////////////
176    // Metrics
177    ////////////////////////////////////////
178
179    // Metrics
180    metrics: metrics::Metrics<E>,
181
182    // The timer of my last broadcast_new
183    broadcast_timer: Option<histogram::Timer<E>>,
184}
185
186impl<
187        B: Blob,
188        E: Clock + Spawner + Storage<B> + Metrics,
189        C: Scheme,
190        D: Array,
191        A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
192        Z: Collector<Digest = D>,
193        S: ThresholdCoordinator<
194            Index = Epoch,
195            Share = group::Share,
196            Identity = poly::Public,
197            PublicKey = C::PublicKey,
198        >,
199        NetS: Sender<PublicKey = C::PublicKey>,
200        NetR: Receiver<PublicKey = C::PublicKey>,
201    > Engine<B, E, C, D, A, Z, S, NetS, NetR>
202{
203    /// Creates a new engine with the given context and configuration.
204    /// Returns the engine and a mailbox for sending messages to the engine.
205    pub fn new(context: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
206        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
207        let mailbox = Mailbox::new(mailbox_sender);
208        let metrics = metrics::Metrics::init(context.clone());
209
210        let result = Self {
211            context,
212            crypto: cfg.crypto,
213            _sender: PhantomData,
214            _receiver: PhantomData,
215            coordinator: cfg.coordinator,
216            application: cfg.application,
217            collector: cfg.collector,
218            chunk_namespace: namespace::chunk(&cfg.namespace),
219            ack_namespace: namespace::ack(&cfg.namespace),
220            refresh_epoch_timeout: cfg.refresh_epoch_timeout,
221            refresh_epoch_deadline: None,
222            rebroadcast_timeout: cfg.rebroadcast_timeout,
223            rebroadcast_deadline: None,
224            epoch_bounds: cfg.epoch_bounds,
225            height_bound: cfg.height_bound,
226            pending_verifies: FuturesPool::default(),
227            verify_concurrent: cfg.verify_concurrent,
228            mailbox_receiver,
229            journal_heights_per_section: cfg.journal_heights_per_section,
230            journal_replay_concurrency: cfg.journal_replay_concurrency,
231            journal_name_prefix: cfg.journal_name_prefix,
232            journals: BTreeMap::new(),
233            tip_manager: TipManager::<C, D>::new(),
234            ack_manager: AckManager::<D, C::PublicKey>::new(),
235            epoch: 0,
236            metrics,
237            broadcast_timer: None,
238        };
239
240        (result, mailbox)
241    }
242
243    /// Runs the engine until the context is stopped.
244    ///
245    /// The engine will handle:
246    /// - Timeouts
247    ///   - Refreshing the Epoch
248    ///   - Rebroadcasting Nodes
249    /// - Mailbox messages from the application:
250    ///   - Broadcast requests
251    ///   - Ack requests
252    /// - Messages from the network:
253    ///   - Nodes
254    ///   - Acks
255    pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
256        self.context.spawn_ref()(self.run(chunk_network, ack_network))
257    }
258
259    /// Inner run loop called by `start`.
260    async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
261        let (mut node_sender, mut node_receiver) = chunk_network;
262        let (mut ack_sender, mut ack_receiver) = ack_network;
263        let mut shutdown = self.context.stopped();
264
265        // Before starting on the main loop, initialize my own sequencer journal
266        // and attempt to rebroadcast if necessary.
267        self.refresh_epoch();
268        self.journal_prepare(&self.crypto.public_key()).await;
269        if let Err(err) = self.rebroadcast(&mut node_sender).await {
270            // Rebroadcasting my return a non-critical error, so log the error and continue.
271            info!(?err, "initial rebroadcast failed");
272        }
273
274        loop {
275            // Enter the epoch
276            self.refresh_epoch();
277
278            // Create deadline futures.
279            // If the deadline is None, the future will never resolve.
280            let refresh_epoch = match self.refresh_epoch_deadline {
281                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
282                None => Either::Right(future::pending()),
283            };
284            let rebroadcast = match self.rebroadcast_deadline {
285                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
286                None => Either::Right(future::pending()),
287            };
288
289            select! {
290                // Handle shutdown signal
291                _ = &mut shutdown => {
292                    debug!("shutdown");
293                    self.pending_verifies.cancel_all();
294                    while let Some((_, journal)) = self.journals.pop_first() {
295                        journal.close().await.expect("unable to close journal");
296                    }
297                    return;
298                },
299
300                // Handle refresh epoch deadline
301                _ = refresh_epoch => {
302                    debug!("refresh epoch");
303                    // Simply continue; the epoch will be refreshed on the next iteration.
304                    continue;
305                },
306
307                // Handle rebroadcast deadline
308                _ = rebroadcast => {
309                    debug!("rebroadcast");
310                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
311                        info!(?err, "rebroadcast failed");
312                        continue;
313                    }
314                },
315
316                // Handle incoming nodes
317                msg = node_receiver.recv() => {
318                    debug!("node network");
319                    // Error handling
320                    let (sender, msg) = match msg {
321                        Ok(r) => r,
322                        Err(err) => {
323                            error!(?err, "node receiver failed");
324                            break;
325                        }
326                    };
327                    let mut guard = self.metrics.nodes.guard(Status::Invalid);
328                    let node = match parsed::Node::<C, D>::decode(&msg) {
329                        Ok(node) => node,
330                        Err(err) => {
331                            warn!(?err, ?sender, "node decode failed");
332                            continue;
333                        }
334                    };
335                    if let Err(err) = self.validate_node(&node, &sender) {
336                        warn!(?err, ?node, ?sender, "node validate failed");
337                        continue;
338                    };
339
340                    // Initialize journal for sequencer if it does not exist
341                    self.journal_prepare(&sender).await;
342
343                    // Handle the parent threshold signature
344                    if let Some(parent) = node.parent.as_ref() {
345                        self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
346                    }
347
348                    // Process the new node
349                    self.handle_node(&node).await;
350                    guard.set(Status::Success);
351                },
352
353                // Handle incoming acks
354                msg = ack_receiver.recv() => {
355                    debug!("ack network");
356                    // Error handling
357                    let (sender, msg) = match msg {
358                        Ok(r) => r,
359                        Err(err) => {
360                            warn!(?err, "ack receiver failed");
361                            break;
362                        }
363                    };
364                    let mut guard = self.metrics.acks.guard(Status::Invalid);
365                    let ack = match parsed::Ack::decode(&msg) {
366                        Ok(ack) => ack,
367                        Err(err) => {
368                            warn!(?err, ?sender, "ack decode failed");
369                            continue;
370                        }
371                    };
372                    if let Err(err) = self.validate_ack(&ack, &sender) {
373                        warn!(?err, ?ack, ?sender, "ack validate failed");
374                        continue;
375                    };
376                    if let Err(err) = self.handle_ack(&ack).await {
377                        warn!(?err, ?ack, "ack handle failed");
378                        guard.set(Status::Failure);
379                        continue;
380                    }
381                    guard.set(Status::Success);
382                },
383
384                // Handle completed verification futures.
385                verify = self.pending_verifies.next_completed() => {
386                    let Verify { timer, context, payload, result } = verify;
387                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning
388                    match result {
389                        Err(err) => {
390                            warn!(?err, ?context, ?payload, "verified returned error");
391                            self.metrics.verify.inc(Status::Dropped);
392                        }
393                        Ok(false) => {
394                            warn!(?context, ?payload, "verified was false");
395                            self.metrics.verify.inc(Status::Failure);
396                        }
397                        Ok(true) => {
398                            debug!(?context, ?payload, "verified");
399                            self.metrics.verify.inc(Status::Success);
400                            if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
401                                warn!(?err, ?context, ?payload, "verified handle failed");
402                            }
403                        },
404                    }
405                },
406
407                // Handle mailbox messages
408                mail = self.mailbox_receiver.next() => {
409                    let Some(msg) = mail else {
410                        error!("mailbox receiver failed");
411                        break;
412                    };
413                    match msg {
414                        Message::Broadcast{ payload, result } => {
415                            debug!("broadcast");
416                            if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
417                                warn!(epoch=?self.epoch, ?payload, "not a sequencer");
418                                continue;
419                            }
420
421                            // Broadcast the message
422                            if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
423                                warn!(?err, "broadcast new failed");
424                                continue;
425                            }
426                        }
427                    }
428                }
429            }
430        }
431    }
432
433    ////////////////////////////////////////
434    // Handling
435    ////////////////////////////////////////
436
437    /// Handles a verified message from the application.
438    ///
439    /// This is called when the application has verified a payload.
440    /// The chunk will be signed if it matches the current tip.
441    async fn handle_app_verified(
442        &mut self,
443        context: &Context<C::PublicKey>,
444        payload: &D,
445        ack_sender: &mut NetS,
446    ) -> Result<(), Error> {
447        // Get the tip
448        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
449            return Err(Error::AppVerifiedNoTip);
450        };
451
452        // Return early if the height does not match
453        if tip.chunk.height != context.height {
454            return Err(Error::AppVerifiedHeightMismatch);
455        }
456
457        // Return early if the payload does not match
458        if tip.chunk.payload != *payload {
459            return Err(Error::AppVerifiedPayloadMismatch);
460        }
461
462        // Construct partial signature
463        let Some(share) = self.coordinator.share(self.epoch) else {
464            return Err(Error::UnknownShare(self.epoch));
465        };
466        let partial = ops::partial_sign_message(
467            share,
468            Some(&self.ack_namespace),
469            &serializer::ack(&tip.chunk, self.epoch),
470        );
471
472        // Sync the journal to prevent ever acking two conflicting chunks at
473        // the same height, even if the node crashes and restarts.
474        self.journal_sync(&context.sequencer, context.height).await;
475
476        // The recipients are all the signers in the epoch and the sequencer.
477        // The sequencer may or may not be a signer.
478        let recipients = {
479            let Some(signers) = self.coordinator.signers(self.epoch) else {
480                return Err(Error::UnknownSigners(self.epoch));
481            };
482            let mut recipients = signers.clone();
483            if self
484                .coordinator
485                .is_signer(self.epoch, &tip.chunk.sequencer)
486                .is_none()
487            {
488                recipients.push(tip.chunk.sequencer.clone());
489            }
490            recipients
491        };
492
493        // Send the ack to the network
494        let ack = parsed::Ack {
495            chunk: tip.chunk,
496            epoch: self.epoch,
497            partial,
498        };
499        ack_sender
500            .send(Recipients::Some(recipients), ack.encode().into(), false)
501            .await
502            .map_err(|_| Error::UnableToSendMessage)?;
503
504        // Handle the ack internally
505        self.handle_ack(&ack).await?;
506
507        Ok(())
508    }
509
510    /// Handles a threshold, either received from a `Node` from the network or generated locally.
511    ///
512    /// The threshold must already be verified.
513    /// If the threshold is new, it is stored and the proof is emitted to the collector.
514    /// If the threshold is already known, it is ignored.
515    async fn handle_threshold(
516        &mut self,
517        chunk: &parsed::Chunk<D, C::PublicKey>,
518        epoch: Epoch,
519        threshold: group::Signature,
520    ) {
521        // Set the threshold signature, returning early if it already exists
522        if !self
523            .ack_manager
524            .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
525        {
526            return;
527        }
528
529        // If the threshold is for my sequencer, record the metric
530        if chunk.sequencer == self.crypto.public_key() {
531            self.broadcast_timer.take();
532        }
533
534        // Emit the proof
535        let context = Context {
536            sequencer: chunk.sequencer.clone(),
537            height: chunk.height,
538        };
539        let proof =
540            Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
541        self.collector
542            .acknowledged(proof, chunk.payload.clone())
543            .await;
544    }
545
546    /// Handles an ack
547    ///
548    /// Returns an error if the ack is invalid, or can be ignored
549    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
550    async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
551        // Get the quorum
552        let Some(identity) = self.coordinator.identity(ack.epoch) else {
553            return Err(Error::UnknownIdentity(ack.epoch));
554        };
555        let quorum = identity.required();
556
557        // Add the partial signature. If a new threshold is formed, handle it.
558        if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
559            self.metrics.threshold.inc();
560            self.handle_threshold(&ack.chunk, ack.epoch, threshold)
561                .await;
562        }
563
564        Ok(())
565    }
566
567    /// Handles a valid `Node` message, storing it as the tip.
568    /// Alerts the application of the new node.
569    /// Also appends the `Node` to the journal if it's new.
570    async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
571        // Store the tip
572        let is_new = self.tip_manager.put(node);
573
574        // If a higher height than the previous tip...
575        if is_new {
576            // Update metrics for sequencer height
577            self.metrics
578                .sequencer_heights
579                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
580                .set(node.chunk.height as i64);
581
582            // Append to journal if the `Node` is new, making sure to sync the journal
583            // to prevent sending two conflicting chunks to the application, even if
584            // the node crashes and restarts.
585            self.journal_append(node).await;
586            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
587                .await;
588        }
589
590        // Drop the node if there are too many pending verifies
591        let n = self.pending_verifies.len();
592        if n >= self.verify_concurrent {
593            warn!(?n, "too many pending verifies");
594            return;
595        }
596
597        // Verify the chunk with the application
598        let context = Context {
599            sequencer: node.chunk.sequencer.clone(),
600            height: node.chunk.height,
601        };
602        let payload = node.chunk.payload.clone();
603        let mut application = self.application.clone();
604        let timer = self.metrics.verify_duration.timer();
605        self.pending_verifies.push(async move {
606            let receiver = application.verify(context.clone(), payload.clone()).await;
607            let result = receiver.await.map_err(Error::AppVerifyCanceled);
608            Verify {
609                timer,
610                context,
611                payload,
612                result,
613            }
614        });
615    }
616
617    ////////////////////////////////////////
618    // Broadcasting
619    ////////////////////////////////////////
620
621    /// Broadcast a message to the network.
622    ///
623    /// The result is returned to the caller via the provided channel.
624    /// The broadcast is only successful if the parent Chunk and threshold signature are known.
625    async fn broadcast_new(
626        &mut self,
627        payload: D,
628        result: oneshot::Sender<bool>,
629        node_sender: &mut NetS,
630    ) -> Result<(), Error> {
631        let mut guard = self.metrics.new_broadcast.guard(Status::Dropped);
632        let me = self.crypto.public_key();
633
634        // Get parent Chunk and threshold signature
635        let mut height = 0;
636        let mut parent = None;
637        if let Some(tip) = self.tip_manager.get(&me) {
638            // Get threshold, or, if it doesn't exist, return an error
639            let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
640            else {
641                let _ = result.send(false);
642                return Err(Error::NoThresholdForTip(tip.chunk.height));
643            };
644
645            // Update height and parent
646            height = tip.chunk.height + 1;
647            parent = Some(parsed::Parent {
648                payload: tip.chunk.payload,
649                threshold,
650                epoch,
651            });
652        }
653
654        // Construct new node
655        let chunk = parsed::Chunk {
656            sequencer: me.clone(),
657            height,
658            payload,
659        };
660        let signature = self
661            .crypto
662            .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
663        let node = parsed::Node::<C, D> {
664            chunk,
665            signature,
666            parent,
667        };
668
669        // Deal with the chunk as if it were received over the network
670        self.handle_node(&node).await;
671
672        // Sync the journal to prevent ever broadcasting two conflicting chunks
673        // at the same height, even if the node crashes and restarts
674        self.journal_sync(&me, height).await;
675
676        // Record the start time of the broadcast
677        self.broadcast_timer = Some(self.metrics.e2e_duration.timer());
678
679        // Broadcast to network
680        if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
681            let _ = result.send(false);
682            guard.set(Status::Failure);
683            return Err(err);
684        };
685
686        // Return success
687        let _ = result.send(true);
688        guard.set(Status::Success);
689        Ok(())
690    }
691
692    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all signers.
693    ///
694    /// This is only done if:
695    /// - this instance is the sequencer for the current epoch.
696    /// - this instance has a chunk to rebroadcast.
697    /// - this instance has not yet collected the threshold signature for the chunk.
698    async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
699        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
700
701        // Unset the rebroadcast deadline
702        self.rebroadcast_deadline = None;
703
704        // Return if not a sequencer in the current epoch
705        let me = self.crypto.public_key();
706        if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
707            return Err(Error::IAmNotASequencer(self.epoch));
708        }
709
710        // Return if no chunk to rebroadcast
711        let Some(tip) = self.tip_manager.get(&me) else {
712            return Err(Error::NothingToRebroadcast);
713        };
714
715        // Return if threshold already collected
716        if self
717            .ack_manager
718            .get_threshold(&me, tip.chunk.height)
719            .is_some()
720        {
721            return Err(Error::AlreadyBroadcast);
722        }
723
724        // Broadcast the message, which resets the rebroadcast deadline
725        guard.set(Status::Failure);
726        self.broadcast(&tip, node_sender, self.epoch).await?;
727        guard.set(Status::Success);
728        Ok(())
729    }
730
731    /// Send a  `Node` message to all signers in the given epoch.
732    async fn broadcast(
733        &mut self,
734        node: &parsed::Node<C, D>,
735        node_sender: &mut NetS,
736        epoch: Epoch,
737    ) -> Result<(), Error> {
738        // Send the node to all signers
739        let Some(signers) = self.coordinator.signers(epoch) else {
740            return Err(Error::UnknownSigners(epoch));
741        };
742        node_sender
743            .send(
744                Recipients::Some(signers.clone()),
745                node.encode().into(),
746                false,
747            )
748            .await
749            .map_err(|_| Error::BroadcastFailed)?;
750
751        // Set the rebroadcast deadline
752        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
753
754        Ok(())
755    }
756
757    ////////////////////////////////////////
758    // Validation
759    ////////////////////////////////////////
760
761    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
762    ///
763    /// If valid, returns the implied parent chunk and its threshold signature.
764    /// Else returns an error if the `Node` is invalid.
765    fn validate_node(
766        &mut self,
767        node: &parsed::Node<C, D>,
768        sender: &C::PublicKey,
769    ) -> Result<(), Error> {
770        // Verify the sender
771        if node.chunk.sequencer != *sender {
772            return Err(Error::PeerMismatch);
773        }
774
775        // Optimization: If the node is exactly equal to the tip,
776        // don't perform any further validation.
777        if let Some(tip) = self.tip_manager.get(sender) {
778            if tip == *node {
779                return Ok(());
780            }
781        }
782
783        // Validate chunk
784        self.validate_chunk(&node.chunk, self.epoch)?;
785
786        // Verify the signature
787        if !C::verify(
788            Some(&self.chunk_namespace),
789            &serializer::chunk(&node.chunk),
790            sender,
791            &node.signature,
792        ) {
793            return Err(Error::InvalidNodeSignature);
794        }
795
796        // Verify no parent
797        if node.chunk.height == 0 {
798            if node.parent.is_some() {
799                return Err(Error::GenesisChunkMustNotHaveParent);
800            }
801            return Ok(());
802        }
803
804        // Verify parent
805        let Some(parent) = &node.parent else {
806            return Err(Error::NodeMissingParent);
807        };
808        let parent_chunk = parsed::Chunk {
809            sequencer: sender.clone(),
810            height: node.chunk.height.checked_sub(1).unwrap(),
811            payload: parent.payload.clone(),
812        };
813
814        // Verify parent threshold signature
815        let Some(identity) = self.coordinator.identity(parent.epoch) else {
816            return Err(Error::UnknownIdentity(parent.epoch));
817        };
818        let public_key = poly::public(identity);
819        ops::verify_message(
820            &public_key,
821            Some(&self.ack_namespace),
822            &serializer::ack(&parent_chunk, parent.epoch),
823            &parent.threshold,
824        )
825        .map_err(|_| Error::InvalidThresholdSignature)?;
826
827        Ok(())
828    }
829
830    /// Takes a raw ack (from sender) from the p2p network and validates it.
831    ///
832    /// Returns the chunk, epoch, and partial signature if the ack is valid.
833    /// Returns an error if the ack is invalid.
834    fn validate_ack(
835        &self,
836        ack: &parsed::Ack<D, C::PublicKey>,
837        sender: &C::PublicKey,
838    ) -> Result<(), Error> {
839        // Validate chunk
840        self.validate_chunk(&ack.chunk, ack.epoch)?;
841
842        // Validate sender
843        let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
844            return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
845        };
846        if signer_index != ack.partial.index {
847            return Err(Error::PeerMismatch);
848        }
849
850        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
851        {
852            let (eb_lo, eb_hi) = self.epoch_bounds;
853            let bound_lo = self.epoch.saturating_sub(eb_lo);
854            let bound_hi = self.epoch.saturating_add(eb_hi);
855            if ack.epoch < bound_lo || ack.epoch > bound_hi {
856                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
857            }
858        }
859
860        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
861        {
862            let bound_lo = self
863                .tip_manager
864                .get(&ack.chunk.sequencer)
865                .map(|t| t.chunk.height)
866                .unwrap_or(0);
867            let bound_hi = bound_lo + self.height_bound;
868            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
869                return Err(Error::AckHeightOutsideBounds(
870                    ack.chunk.height,
871                    bound_lo,
872                    bound_hi,
873                ));
874            }
875        }
876
877        // Validate partial signature
878        // Optimization: If the ack already exists, don't verify
879        let Some(identity) = self.coordinator.identity(ack.epoch) else {
880            return Err(Error::UnknownIdentity(ack.epoch));
881        };
882        ops::partial_verify_message(
883            identity,
884            Some(&self.ack_namespace),
885            &serializer::ack(&ack.chunk, ack.epoch),
886            &ack.partial,
887        )
888        .map_err(|_| Error::InvalidPartialSignature)?;
889
890        Ok(())
891    }
892
893    /// Takes a raw chunk from the p2p network and validates it against the epoch.
894    ///
895    /// Returns the chunk if the chunk is valid.
896    /// Returns an error if the chunk is invalid.
897    fn validate_chunk(
898        &self,
899        chunk: &parsed::Chunk<D, C::PublicKey>,
900        epoch: Epoch,
901    ) -> Result<(), Error> {
902        // Verify sequencer
903        if self
904            .coordinator
905            .is_sequencer(epoch, &chunk.sequencer)
906            .is_none()
907        {
908            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
909        }
910
911        // Verify height
912        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
913            // Height must be at least the tip height
914            match chunk.height.cmp(&tip.chunk.height) {
915                std::cmp::Ordering::Less => {
916                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
917                }
918                std::cmp::Ordering::Equal => {
919                    // Ensure this matches the tip if the height is the same
920                    if tip.chunk.payload != chunk.payload {
921                        return Err(Error::ChunkMismatch(
922                            chunk.sequencer.to_string(),
923                            chunk.height,
924                        ));
925                    }
926                }
927                std::cmp::Ordering::Greater => {}
928            }
929        }
930
931        Ok(())
932    }
933
934    ////////////////////////////////////////
935    // Journal
936    ////////////////////////////////////////
937
938    /// Returns the section of the journal for the given height.
939    fn get_journal_section(&self, height: u64) -> u64 {
940        height / self.journal_heights_per_section
941    }
942
943    /// Ensures the journal exists and is initialized for the given sequencer.
944    /// If the journal does not exist, it is created and replayed.
945    /// Else, no action is taken.
946    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
947        // Return early if the journal already exists
948        if self.journals.contains_key(sequencer) {
949            return;
950        }
951
952        // Initialize journal
953        let cfg = journal::variable::Config {
954            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
955        };
956        let mut journal = Journal::init(self.context.clone(), cfg)
957            .await
958            .expect("unable to init journal");
959
960        // Replay journal
961        {
962            debug!(?sequencer, "journal replay begin");
963
964            // Prepare the stream
965            let stream = journal
966                .replay(self.journal_replay_concurrency, None)
967                .await
968                .expect("unable to replay journal");
969            pin_mut!(stream);
970
971            // Read from the stream, which may be in arbitrary order.
972            // Remember the highest node height
973            let mut tip: Option<parsed::Node<C, D>> = None;
974            let mut num_items = 0;
975            while let Some(msg) = stream.next().await {
976                num_items += 1;
977                let (_, _, _, msg) = msg.expect("unable to decode journal message");
978                let node = parsed::Node::<C, D>::decode(&msg)
979                    .expect("journal message is unexpected format");
980                let height = node.chunk.height;
981                match tip {
982                    None => {
983                        tip = Some(node);
984                    }
985                    Some(ref t) => {
986                        if height > t.chunk.height {
987                            tip = Some(node);
988                        }
989                    }
990                }
991            }
992
993            // Set the tip only once. The items from the journal may be in arbitrary order,
994            // and the tip manager will panic if inserting tips out-of-order.
995            if let Some(node) = tip.take() {
996                let is_new = self.tip_manager.put(&node);
997                assert!(is_new);
998            }
999
1000            debug!(?sequencer, ?num_items, "journal replay end");
1001        }
1002
1003        // Store journal
1004        self.journals.insert(sequencer.clone(), journal);
1005    }
1006
1007    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
1008    ///
1009    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1010    /// the journal must already be open and replayed.
1011    async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
1012        let section = self.get_journal_section(node.chunk.height);
1013        self.journals
1014            .get_mut(&node.chunk.sequencer)
1015            .expect("journal does not exist")
1016            .append(section, node.encode().into())
1017            .await
1018            .expect("unable to append to journal");
1019    }
1020
1021    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1022    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1023        let section = self.get_journal_section(height);
1024
1025        // Get journal
1026        let journal = self
1027            .journals
1028            .get_mut(sequencer)
1029            .expect("journal does not exist");
1030
1031        // Sync journal
1032        journal.sync(section).await.expect("unable to sync journal");
1033
1034        // Prune journal, ignoring errors
1035        let _ = journal.prune(section).await;
1036    }
1037
1038    ////////////////////////////////////////
1039    // Epoch
1040    ////////////////////////////////////////
1041
1042    /// Updates the epoch to the value of the coordinator, and sets the refresh epoch deadline.
1043    fn refresh_epoch(&mut self) {
1044        // Set the refresh epoch deadline
1045        self.refresh_epoch_deadline = Some(self.context.current() + self.refresh_epoch_timeout);
1046
1047        // Ensure epoch is not before the current epoch
1048        let epoch = self.coordinator.index();
1049        assert!(epoch >= self.epoch);
1050
1051        // Update the epoch
1052        self.epoch = epoch;
1053    }
1054}
1055
1056/// Errors that can occur when running the engine.
1057#[derive(Error, Debug)]
1058enum Error {
1059    // Application Verification Errors
1060    #[error("Application verify error: {0}")]
1061    AppVerifyCanceled(oneshot::Canceled),
1062    #[error("Application verified no tip")]
1063    AppVerifiedNoTip,
1064    #[error("Application verified height mismatch")]
1065    AppVerifiedHeightMismatch,
1066    #[error("Application verified payload mismatch")]
1067    AppVerifiedPayloadMismatch,
1068
1069    // P2P Errors
1070    #[error("Unable to send message")]
1071    UnableToSendMessage,
1072
1073    // Broadcast errors
1074    #[error("Already broadcast")]
1075    AlreadyBroadcast,
1076    #[error("I am not a sequencer in epoch {0}")]
1077    IAmNotASequencer(u64),
1078    #[error("Nothing to rebroadcast")]
1079    NothingToRebroadcast,
1080    #[error("Broadcast failed")]
1081    BroadcastFailed,
1082    #[error("No threshold for tip")]
1083    NoThresholdForTip(u64),
1084
1085    // Proto Malformed Errors
1086    #[error("Genesis chunk must not have a parent")]
1087    GenesisChunkMustNotHaveParent,
1088    #[error("Node missing parent")]
1089    NodeMissingParent,
1090
1091    // Epoch Errors
1092    #[error("Unknown identity at epoch {0}")]
1093    UnknownIdentity(u64),
1094    #[error("Unknown signers at epoch {0}")]
1095    UnknownSigners(u64),
1096    #[error("Epoch {0} has no sequencer {1}")]
1097    UnknownSequencer(u64, String),
1098    #[error("Epoch {0} has no signer {1}")]
1099    UnknownSigner(u64, String),
1100    #[error("Unknown share at epoch {0}")]
1101    UnknownShare(u64),
1102
1103    // Peer Errors
1104    #[error("Peer mismatch")]
1105    PeerMismatch,
1106
1107    // Signature Errors
1108    #[error("Invalid threshold signature")]
1109    InvalidThresholdSignature,
1110    #[error("Invalid partial signature")]
1111    InvalidPartialSignature,
1112    #[error("Invalid node signature")]
1113    InvalidNodeSignature,
1114
1115    // Ignorable Message Errors
1116    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1117    AckEpochOutsideBounds(u64, u64, u64),
1118    #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1119    AckHeightOutsideBounds(u64, u64, u64),
1120    #[error("Chunk height {0} lower than tip height {1}")]
1121    ChunkHeightTooLow(u64, u64),
1122
1123    // Slashable Errors
1124    #[error("Chunk mismatch from sender {0} with height {1}")]
1125    ChunkMismatch(String, u64),
1126}