commonware_broadcast/linked/signer/
actor.rs

1use super::{metrics, AckManager, Config, Mailbox, Message, TipManager};
2use crate::{
3    linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
4    Application, Collector, ThresholdCoordinator,
5};
6use commonware_cryptography::{
7    bls12381::primitives::{
8        group::{self},
9        ops,
10        poly::{self},
11    },
12    Scheme,
13};
14use commonware_macros::select;
15use commonware_p2p::{Receiver, Recipients, Sender};
16use commonware_runtime::{
17    telemetry::{
18        histogram,
19        status::{CounterExt, Status},
20    },
21    Blob, Clock, Handle, Metrics, Spawner, Storage,
22};
23use commonware_storage::journal::{self, variable::Journal};
24use commonware_utils::{futures::Pool as FuturesPool, Array};
25use futures::{
26    channel::{mpsc, oneshot},
27    future::{self, Either},
28    pin_mut, StreamExt,
29};
30use std::{
31    collections::BTreeMap,
32    marker::PhantomData,
33    time::{Duration, SystemTime},
34};
35use thiserror::Error;
36use tracing::{debug, error, info, warn};
37
38/// Represents a pending verification request to the application.
39struct Verify<C: Scheme, D: Array, E: Clock> {
40    timer: histogram::Timer<E>,
41    context: Context<C::PublicKey>,
42    payload: D,
43    result: Result<bool, Error>,
44}
45
46/// The actor that implements the `Broadcaster` trait.
47pub struct Actor<
48    B: Blob,
49    E: Clock + Spawner + Storage<B> + Metrics,
50    C: Scheme,
51    D: Array,
52    A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
53    Z: Collector<Digest = D>,
54    S: ThresholdCoordinator<
55        Index = Epoch,
56        Share = group::Share,
57        Identity = poly::Public,
58        PublicKey = C::PublicKey,
59    >,
60    NetS: Sender<PublicKey = C::PublicKey>,
61    NetR: Receiver<PublicKey = C::PublicKey>,
62> {
63    ////////////////////////////////////////
64    // Interfaces
65    ////////////////////////////////////////
66    context: E,
67    crypto: C,
68    coordinator: S,
69    application: A,
70    collector: Z,
71    _sender: PhantomData<NetS>,
72    _receiver: PhantomData<NetR>,
73
74    ////////////////////////////////////////
75    // Namespace Constants
76    ////////////////////////////////////////
77
78    // The namespace for chunk signatures.
79    chunk_namespace: Vec<u8>,
80
81    // The namespace for ack signatures.
82    ack_namespace: Vec<u8>,
83
84    ////////////////////////////////////////
85    // Timeouts
86    ////////////////////////////////////////
87
88    // The configured timeout for refreshing the epoch
89    refresh_epoch_timeout: Duration,
90    refresh_epoch_deadline: Option<SystemTime>,
91
92    // The configured timeout for rebroadcasting a chunk to all signers
93    rebroadcast_timeout: Duration,
94    rebroadcast_deadline: Option<SystemTime>,
95
96    ////////////////////////////////////////
97    // Pruning
98    ////////////////////////////////////////
99
100    // A tuple representing the epochs to keep in memory.
101    // The first element is the number of old epochs to keep.
102    // The second element is the number of future epochs to accept.
103    //
104    // For example, if the current epoch is 10, and the bounds are (1, 2), then
105    // epochs 9, 10, 11, and 12 are kept (and accepted);
106    // all others are pruned or rejected.
107    epoch_bounds: (u64, u64),
108
109    // The number of future heights to accept acks for.
110    // This is used to prevent spam of acks for arbitrary heights.
111    //
112    // For example, if the current tip for a sequencer is at height 100,
113    // and the height_bound is 10, then acks for heights 100-110 are accepted.
114    height_bound: u64,
115
116    ////////////////////////////////////////
117    // Messaging
118    ////////////////////////////////////////
119
120    // A stream of futures.
121    // Each future represents a verification request to the application
122    // that will either timeout or resolve with a boolean.
123    #[allow(clippy::type_complexity)]
124    pending_verifies: FuturesPool<Verify<C, D, E>>,
125
126    // The maximum number of items in `pending_verifies`.
127    verify_concurrent: usize,
128
129    // The mailbox for receiving messages (primarily from the application).
130    mailbox_receiver: mpsc::Receiver<Message<D>>,
131
132    ////////////////////////////////////////
133    // Storage
134    ////////////////////////////////////////
135
136    // The number of heights per each journal section.
137    journal_heights_per_section: u64,
138
139    // The number of concurrent operations when replaying journals.
140    journal_replay_concurrency: usize,
141
142    // A prefix for the journal names.
143    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
144    journal_name_prefix: String,
145
146    // A map of sequencer public keys to their journals.
147    journals: BTreeMap<C::PublicKey, Journal<B, E>>,
148
149    ////////////////////////////////////////
150    // State
151    ////////////////////////////////////////
152
153    // Tracks the current tip for each sequencer.
154    // The tip is a `Node` which is comprised of a `Chunk` and,
155    // if not the genesis chunk for that sequencer,
156    // a threshold signature over the parent chunk.
157    tip_manager: TipManager<C, D>,
158
159    // Tracks the acknowledgements for chunks.
160    // This is comprised of partial signatures or threshold signatures.
161    ack_manager: AckManager<D, C::PublicKey>,
162
163    // The current epoch.
164    epoch: Epoch,
165
166    ////////////////////////////////////////
167    // Metrics
168    ////////////////////////////////////////
169
170    // Metrics
171    metrics: metrics::Metrics<E>,
172
173    // The timer of my last broadcast_new
174    broadcast_timer: Option<histogram::Timer<E>>,
175}
176
177impl<
178        B: Blob,
179        E: Clock + Spawner + Storage<B> + Metrics,
180        C: Scheme,
181        D: Array,
182        A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
183        Z: Collector<Digest = D>,
184        S: ThresholdCoordinator<
185            Index = Epoch,
186            Share = group::Share,
187            Identity = poly::Public,
188            PublicKey = C::PublicKey,
189        >,
190        NetS: Sender<PublicKey = C::PublicKey>,
191        NetR: Receiver<PublicKey = C::PublicKey>,
192    > Actor<B, E, C, D, A, Z, S, NetS, NetR>
193{
194    /// Creates a new actor with the given context and configuration.
195    /// Returns the actor and a mailbox for sending messages to the actor.
196    pub fn new(context: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
197        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
198        let mailbox = Mailbox::new(mailbox_sender);
199        let metrics = metrics::Metrics::init(context.clone());
200
201        let result = Self {
202            context,
203            crypto: cfg.crypto,
204            _sender: PhantomData,
205            _receiver: PhantomData,
206            coordinator: cfg.coordinator,
207            application: cfg.application,
208            collector: cfg.collector,
209            chunk_namespace: namespace::chunk(&cfg.namespace),
210            ack_namespace: namespace::ack(&cfg.namespace),
211            refresh_epoch_timeout: cfg.refresh_epoch_timeout,
212            refresh_epoch_deadline: None,
213            rebroadcast_timeout: cfg.rebroadcast_timeout,
214            rebroadcast_deadline: None,
215            epoch_bounds: cfg.epoch_bounds,
216            height_bound: cfg.height_bound,
217            pending_verifies: FuturesPool::default(),
218            verify_concurrent: cfg.verify_concurrent,
219            mailbox_receiver,
220            journal_heights_per_section: cfg.journal_heights_per_section,
221            journal_replay_concurrency: cfg.journal_replay_concurrency,
222            journal_name_prefix: cfg.journal_name_prefix,
223            journals: BTreeMap::new(),
224            tip_manager: TipManager::<C, D>::new(),
225            ack_manager: AckManager::<D, C::PublicKey>::new(),
226            epoch: 0,
227            metrics,
228            broadcast_timer: None,
229        };
230
231        (result, mailbox)
232    }
233
234    /// Runs the actor until the context is stopped.
235    ///
236    /// The actor will handle:
237    /// - Timeouts
238    ///   - Refreshing the Epoch
239    ///   - Rebroadcasting Nodes
240    /// - Mailbox messages from the application:
241    ///   - Broadcast requests
242    ///   - Ack requests
243    /// - Messages from the network:
244    ///   - Nodes
245    ///   - Acks
246    pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
247        self.context.spawn_ref()(self.run(chunk_network, ack_network))
248    }
249
250    /// Inner run loop called by `start`.
251    async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
252        let (mut node_sender, mut node_receiver) = chunk_network;
253        let (mut ack_sender, mut ack_receiver) = ack_network;
254        let mut shutdown = self.context.stopped();
255
256        // Before starting on the main loop, initialize my own sequencer journal
257        // and attempt to rebroadcast if necessary.
258        self.refresh_epoch();
259        self.journal_prepare(&self.crypto.public_key()).await;
260        if let Err(err) = self.rebroadcast(&mut node_sender).await {
261            // Rebroadcasting my return a non-critical error, so log the error and continue.
262            info!(?err, "initial rebroadcast failed");
263        }
264
265        loop {
266            // Enter the epoch
267            self.refresh_epoch();
268
269            // Create deadline futures.
270            // If the deadline is None, the future will never resolve.
271            let refresh_epoch = match self.refresh_epoch_deadline {
272                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
273                None => Either::Right(future::pending()),
274            };
275            let rebroadcast = match self.rebroadcast_deadline {
276                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
277                None => Either::Right(future::pending()),
278            };
279
280            select! {
281                // Handle shutdown signal
282                _ = &mut shutdown => {
283                    debug!("shutdown");
284                    self.pending_verifies.cancel_all();
285                    while let Some((_, journal)) = self.journals.pop_first() {
286                        journal.close().await.expect("unable to close journal");
287                    }
288                    return;
289                },
290
291                // Handle refresh epoch deadline
292                _ = refresh_epoch => {
293                    debug!("refresh epoch");
294                    // Simply continue; the epoch will be refreshed on the next iteration.
295                    continue;
296                },
297
298                // Handle rebroadcast deadline
299                _ = rebroadcast => {
300                    debug!("rebroadcast");
301                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
302                        info!(?err, "rebroadcast failed");
303                        continue;
304                    }
305                },
306
307                // Handle incoming nodes
308                msg = node_receiver.recv() => {
309                    debug!("node network");
310                    // Error handling
311                    let (sender, msg) = match msg {
312                        Ok(r) => r,
313                        Err(err) => {
314                            error!(?err, "node receiver failed");
315                            break;
316                        }
317                    };
318                    let mut guard = self.metrics.nodes.guard(Status::Invalid);
319                    let node = match parsed::Node::<C, D>::decode(&msg) {
320                        Ok(node) => node,
321                        Err(err) => {
322                            warn!(?err, ?sender, "node decode failed");
323                            continue;
324                        }
325                    };
326                    if let Err(err) = self.validate_node(&node, &sender) {
327                        warn!(?err, ?node, ?sender, "node validate failed");
328                        continue;
329                    };
330
331                    // Initialize journal for sequencer if it does not exist
332                    self.journal_prepare(&sender).await;
333
334                    // Handle the parent threshold signature
335                    if let Some(parent) = node.parent.as_ref() {
336                        self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
337                    }
338
339                    // Process the new node
340                    self.handle_node(&node).await;
341                    guard.set(Status::Success);
342                },
343
344                // Handle incoming acks
345                msg = ack_receiver.recv() => {
346                    debug!("ack network");
347                    // Error handling
348                    let (sender, msg) = match msg {
349                        Ok(r) => r,
350                        Err(err) => {
351                            warn!(?err, "ack receiver failed");
352                            break;
353                        }
354                    };
355                    let mut guard = self.metrics.acks.guard(Status::Invalid);
356                    let ack = match parsed::Ack::decode(&msg) {
357                        Ok(ack) => ack,
358                        Err(err) => {
359                            warn!(?err, ?sender, "ack decode failed");
360                            continue;
361                        }
362                    };
363                    if let Err(err) = self.validate_ack(&ack, &sender) {
364                        warn!(?err, ?ack, ?sender, "ack validate failed");
365                        continue;
366                    };
367                    if let Err(err) = self.handle_ack(&ack).await {
368                        warn!(?err, ?ack, "ack handle failed");
369                        guard.set(Status::Failure);
370                        continue;
371                    }
372                    guard.set(Status::Success);
373                },
374
375                // Handle completed verification futures.
376                verify = self.pending_verifies.next_completed() => {
377                    let Verify { timer, context, payload, result } = verify;
378                    drop(timer); // Record metric. Explicitly reference timer to avoid lint warning
379                    match result {
380                        Err(err) => {
381                            warn!(?err, ?context, ?payload, "verified returned error");
382                            self.metrics.verify.inc(Status::Dropped);
383                        }
384                        Ok(false) => {
385                            warn!(?context, ?payload, "verified was false");
386                            self.metrics.verify.inc(Status::Failure);
387                        }
388                        Ok(true) => {
389                            debug!(?context, ?payload, "verified");
390                            self.metrics.verify.inc(Status::Success);
391                            if let Err(err) = self.handle_app_verified(&context, &payload, &mut ack_sender).await {
392                                warn!(?err, ?context, ?payload, "verified handle failed");
393                            }
394                        },
395                    }
396                },
397
398                // Handle mailbox messages
399                mail = self.mailbox_receiver.next() => {
400                    let Some(msg) = mail else {
401                        error!("mailbox receiver failed");
402                        break;
403                    };
404                    match msg {
405                        Message::Broadcast{ payload, result } => {
406                            debug!("broadcast");
407                            if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
408                                warn!(epoch=?self.epoch, ?payload, "not a sequencer");
409                                continue;
410                            }
411
412                            // Broadcast the message
413                            if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
414                                warn!(?err, "broadcast new failed");
415                                continue;
416                            }
417                        }
418                    }
419                }
420            }
421        }
422    }
423
424    ////////////////////////////////////////
425    // Handling
426    ////////////////////////////////////////
427
428    /// Handles a verified message from the application.
429    ///
430    /// This is called when the application has verified a payload.
431    /// The chunk will be signed if it matches the current tip.
432    async fn handle_app_verified(
433        &mut self,
434        context: &Context<C::PublicKey>,
435        payload: &D,
436        ack_sender: &mut NetS,
437    ) -> Result<(), Error> {
438        // Get the tip
439        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
440            return Err(Error::AppVerifiedNoTip);
441        };
442
443        // Return early if the height does not match
444        if tip.chunk.height != context.height {
445            return Err(Error::AppVerifiedHeightMismatch);
446        }
447
448        // Return early if the payload does not match
449        if tip.chunk.payload != *payload {
450            return Err(Error::AppVerifiedPayloadMismatch);
451        }
452
453        // Construct partial signature
454        let Some(share) = self.coordinator.share(self.epoch) else {
455            return Err(Error::UnknownShare(self.epoch));
456        };
457        let partial = ops::partial_sign_message(
458            share,
459            Some(&self.ack_namespace),
460            &serializer::ack(&tip.chunk, self.epoch),
461        );
462
463        // Sync the journal to prevent ever acking two conflicting chunks at
464        // the same height, even if the node crashes and restarts.
465        self.journal_sync(&context.sequencer, context.height).await;
466
467        // The recipients are all the signers in the epoch and the sequencer.
468        // The sequencer may or may not be a signer.
469        let recipients = {
470            let Some(signers) = self.coordinator.signers(self.epoch) else {
471                return Err(Error::UnknownSigners(self.epoch));
472            };
473            let mut recipients = signers.clone();
474            if self
475                .coordinator
476                .is_signer(self.epoch, &tip.chunk.sequencer)
477                .is_none()
478            {
479                recipients.push(tip.chunk.sequencer.clone());
480            }
481            recipients
482        };
483
484        // Send the ack to the network
485        let ack = parsed::Ack {
486            chunk: tip.chunk,
487            epoch: self.epoch,
488            partial,
489        };
490        ack_sender
491            .send(Recipients::Some(recipients), ack.encode().into(), false)
492            .await
493            .map_err(|_| Error::UnableToSendMessage)?;
494
495        // Handle the ack internally
496        self.handle_ack(&ack).await?;
497
498        Ok(())
499    }
500
501    /// Handles a threshold, either received from a `Node` from the network or generated locally.
502    ///
503    /// The threshold must already be verified.
504    /// If the threshold is new, it is stored and the proof is emitted to the collector.
505    /// If the threshold is already known, it is ignored.
506    async fn handle_threshold(
507        &mut self,
508        chunk: &parsed::Chunk<D, C::PublicKey>,
509        epoch: Epoch,
510        threshold: group::Signature,
511    ) {
512        // Set the threshold signature, returning early if it already exists
513        if !self
514            .ack_manager
515            .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
516        {
517            return;
518        }
519
520        // If the threshold is for my sequencer, record the metric
521        if chunk.sequencer == self.crypto.public_key() {
522            self.broadcast_timer.take();
523        }
524
525        // Emit the proof
526        let context = Context {
527            sequencer: chunk.sequencer.clone(),
528            height: chunk.height,
529        };
530        let proof =
531            Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
532        self.collector
533            .acknowledged(proof, chunk.payload.clone())
534            .await;
535    }
536
537    /// Handles an ack
538    ///
539    /// Returns an error if the ack is invalid, or can be ignored
540    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
541    async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
542        // Get the quorum
543        let Some(identity) = self.coordinator.identity(ack.epoch) else {
544            return Err(Error::UnknownIdentity(ack.epoch));
545        };
546        let quorum = identity.required();
547
548        // Add the partial signature. If a new threshold is formed, handle it.
549        if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
550            self.metrics.threshold.inc();
551            self.handle_threshold(&ack.chunk, ack.epoch, threshold)
552                .await;
553        }
554
555        Ok(())
556    }
557
558    /// Handles a valid `Node` message, storing it as the tip.
559    /// Alerts the application of the new node.
560    /// Also appends the `Node` to the journal if it's new.
561    async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
562        // Store the tip
563        let is_new = self.tip_manager.put(node);
564
565        // If a higher height than the previous tip...
566        if is_new {
567            // Update metrics for sequencer height
568            self.metrics
569                .sequencer_heights
570                .get_or_create(&metrics::SequencerLabel::from(&node.chunk.sequencer))
571                .set(node.chunk.height as i64);
572
573            // Append to journal if the `Node` is new, making sure to sync the journal
574            // to prevent sending two conflicting chunks to the application, even if
575            // the node crashes and restarts.
576            self.journal_append(node).await;
577            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
578                .await;
579        }
580
581        // Drop the node if there are too many pending verifies
582        let n = self.pending_verifies.len();
583        if n >= self.verify_concurrent {
584            warn!(?n, "too many pending verifies");
585            return;
586        }
587
588        // Verify the chunk with the application
589        let context = Context {
590            sequencer: node.chunk.sequencer.clone(),
591            height: node.chunk.height,
592        };
593        let payload = node.chunk.payload.clone();
594        let mut application = self.application.clone();
595        let timer = self.metrics.verify_duration.timer();
596        self.pending_verifies.push(async move {
597            let receiver = application.verify(context.clone(), payload.clone()).await;
598            let result = receiver.await.map_err(Error::AppVerifyCanceled);
599            Verify {
600                timer,
601                context,
602                payload,
603                result,
604            }
605        });
606    }
607
608    ////////////////////////////////////////
609    // Broadcasting
610    ////////////////////////////////////////
611
612    /// Broadcast a message to the network.
613    ///
614    /// The result is returned to the caller via the provided channel.
615    /// The broadcast is only successful if the parent Chunk and threshold signature are known.
616    async fn broadcast_new(
617        &mut self,
618        payload: D,
619        result: oneshot::Sender<bool>,
620        node_sender: &mut NetS,
621    ) -> Result<(), Error> {
622        let mut guard = self.metrics.new_broadcast.guard(Status::Dropped);
623        let me = self.crypto.public_key();
624
625        // Get parent Chunk and threshold signature
626        let mut height = 0;
627        let mut parent = None;
628        if let Some(tip) = self.tip_manager.get(&me) {
629            // Get threshold, or, if it doesn't exist, return an error
630            let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
631            else {
632                let _ = result.send(false);
633                return Err(Error::NoThresholdForTip(tip.chunk.height));
634            };
635
636            // Update height and parent
637            height = tip.chunk.height + 1;
638            parent = Some(parsed::Parent {
639                payload: tip.chunk.payload,
640                threshold,
641                epoch,
642            });
643        }
644
645        // Construct new node
646        let chunk = parsed::Chunk {
647            sequencer: me.clone(),
648            height,
649            payload,
650        };
651        let signature = self
652            .crypto
653            .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
654        let node = parsed::Node::<C, D> {
655            chunk,
656            signature,
657            parent,
658        };
659
660        // Deal with the chunk as if it were received over the network
661        self.handle_node(&node).await;
662
663        // Sync the journal to prevent ever broadcasting two conflicting chunks
664        // at the same height, even if the node crashes and restarts
665        self.journal_sync(&me, height).await;
666
667        // Record the start time of the broadcast
668        self.broadcast_timer = Some(self.metrics.e2e_duration.timer());
669
670        // Broadcast to network
671        if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
672            let _ = result.send(false);
673            guard.set(Status::Failure);
674            return Err(err);
675        };
676
677        // Return success
678        let _ = result.send(true);
679        guard.set(Status::Success);
680        Ok(())
681    }
682
683    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all signers.
684    ///
685    /// This is only done if:
686    /// - this instance is the sequencer for the current epoch.
687    /// - this instance has a chunk to rebroadcast.
688    /// - this instance has not yet collected the threshold signature for the chunk.
689    async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
690        let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
691
692        // Unset the rebroadcast deadline
693        self.rebroadcast_deadline = None;
694
695        // Return if not a sequencer in the current epoch
696        let me = self.crypto.public_key();
697        if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
698            return Err(Error::IAmNotASequencer(self.epoch));
699        }
700
701        // Return if no chunk to rebroadcast
702        let Some(tip) = self.tip_manager.get(&me) else {
703            return Err(Error::NothingToRebroadcast);
704        };
705
706        // Return if threshold already collected
707        if self
708            .ack_manager
709            .get_threshold(&me, tip.chunk.height)
710            .is_some()
711        {
712            return Err(Error::AlreadyBroadcast);
713        }
714
715        // Broadcast the message, which resets the rebroadcast deadline
716        guard.set(Status::Failure);
717        self.broadcast(&tip, node_sender, self.epoch).await?;
718        guard.set(Status::Success);
719        Ok(())
720    }
721
722    /// Send a  `Node` message to all signers in the given epoch.
723    async fn broadcast(
724        &mut self,
725        node: &parsed::Node<C, D>,
726        node_sender: &mut NetS,
727        epoch: Epoch,
728    ) -> Result<(), Error> {
729        // Send the node to all signers
730        let Some(signers) = self.coordinator.signers(epoch) else {
731            return Err(Error::UnknownSigners(epoch));
732        };
733        node_sender
734            .send(
735                Recipients::Some(signers.clone()),
736                node.encode().into(),
737                false,
738            )
739            .await
740            .map_err(|_| Error::BroadcastFailed)?;
741
742        // Set the rebroadcast deadline
743        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
744
745        Ok(())
746    }
747
748    ////////////////////////////////////////
749    // Validation
750    ////////////////////////////////////////
751
752    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
753    ///
754    /// If valid, returns the implied parent chunk and its threshold signature.
755    /// Else returns an error if the `Node` is invalid.
756    fn validate_node(
757        &mut self,
758        node: &parsed::Node<C, D>,
759        sender: &C::PublicKey,
760    ) -> Result<(), Error> {
761        // Verify the sender
762        if node.chunk.sequencer != *sender {
763            return Err(Error::PeerMismatch);
764        }
765
766        // Optimization: If the node is exactly equal to the tip,
767        // don't perform any further validation.
768        if let Some(tip) = self.tip_manager.get(sender) {
769            if tip == *node {
770                return Ok(());
771            }
772        }
773
774        // Validate chunk
775        self.validate_chunk(&node.chunk, self.epoch)?;
776
777        // Verify the signature
778        if !C::verify(
779            Some(&self.chunk_namespace),
780            &serializer::chunk(&node.chunk),
781            sender,
782            &node.signature,
783        ) {
784            return Err(Error::InvalidNodeSignature);
785        }
786
787        // Verify no parent
788        if node.chunk.height == 0 {
789            if node.parent.is_some() {
790                return Err(Error::GenesisChunkMustNotHaveParent);
791            }
792            return Ok(());
793        }
794
795        // Verify parent
796        let Some(parent) = &node.parent else {
797            return Err(Error::NodeMissingParent);
798        };
799        let parent_chunk = parsed::Chunk {
800            sequencer: sender.clone(),
801            height: node.chunk.height.checked_sub(1).unwrap(),
802            payload: parent.payload.clone(),
803        };
804
805        // Verify parent threshold signature
806        let Some(identity) = self.coordinator.identity(parent.epoch) else {
807            return Err(Error::UnknownIdentity(parent.epoch));
808        };
809        let public_key = poly::public(identity);
810        ops::verify_message(
811            &public_key,
812            Some(&self.ack_namespace),
813            &serializer::ack(&parent_chunk, parent.epoch),
814            &parent.threshold,
815        )
816        .map_err(|_| Error::InvalidThresholdSignature)?;
817
818        Ok(())
819    }
820
821    /// Takes a raw ack (from sender) from the p2p network and validates it.
822    ///
823    /// Returns the chunk, epoch, and partial signature if the ack is valid.
824    /// Returns an error if the ack is invalid.
825    fn validate_ack(
826        &self,
827        ack: &parsed::Ack<D, C::PublicKey>,
828        sender: &C::PublicKey,
829    ) -> Result<(), Error> {
830        // Validate chunk
831        self.validate_chunk(&ack.chunk, ack.epoch)?;
832
833        // Validate sender
834        let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
835            return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
836        };
837        if signer_index != ack.partial.index {
838            return Err(Error::PeerMismatch);
839        }
840
841        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
842        {
843            let (eb_lo, eb_hi) = self.epoch_bounds;
844            let bound_lo = self.epoch.saturating_sub(eb_lo);
845            let bound_hi = self.epoch.saturating_add(eb_hi);
846            if ack.epoch < bound_lo || ack.epoch > bound_hi {
847                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
848            }
849        }
850
851        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
852        {
853            let bound_lo = self
854                .tip_manager
855                .get(&ack.chunk.sequencer)
856                .map(|t| t.chunk.height)
857                .unwrap_or(0);
858            let bound_hi = bound_lo + self.height_bound;
859            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
860                return Err(Error::AckHeightOutsideBounds(
861                    ack.chunk.height,
862                    bound_lo,
863                    bound_hi,
864                ));
865            }
866        }
867
868        // Validate partial signature
869        // Optimization: If the ack already exists, don't verify
870        let Some(identity) = self.coordinator.identity(ack.epoch) else {
871            return Err(Error::UnknownIdentity(ack.epoch));
872        };
873        ops::partial_verify_message(
874            identity,
875            Some(&self.ack_namespace),
876            &serializer::ack(&ack.chunk, ack.epoch),
877            &ack.partial,
878        )
879        .map_err(|_| Error::InvalidPartialSignature)?;
880
881        Ok(())
882    }
883
884    /// Takes a raw chunk from the p2p network and validates it against the epoch.
885    ///
886    /// Returns the chunk if the chunk is valid.
887    /// Returns an error if the chunk is invalid.
888    fn validate_chunk(
889        &self,
890        chunk: &parsed::Chunk<D, C::PublicKey>,
891        epoch: Epoch,
892    ) -> Result<(), Error> {
893        // Verify sequencer
894        if self
895            .coordinator
896            .is_sequencer(epoch, &chunk.sequencer)
897            .is_none()
898        {
899            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
900        }
901
902        // Verify height
903        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
904            // Height must be at least the tip height
905            match chunk.height.cmp(&tip.chunk.height) {
906                std::cmp::Ordering::Less => {
907                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
908                }
909                std::cmp::Ordering::Equal => {
910                    // Ensure this matches the tip if the height is the same
911                    if tip.chunk.payload != chunk.payload {
912                        return Err(Error::ChunkMismatch(
913                            chunk.sequencer.to_string(),
914                            chunk.height,
915                        ));
916                    }
917                }
918                std::cmp::Ordering::Greater => {}
919            }
920        }
921
922        Ok(())
923    }
924
925    ////////////////////////////////////////
926    // Journal
927    ////////////////////////////////////////
928
929    /// Returns the section of the journal for the given height.
930    fn get_journal_section(&self, height: u64) -> u64 {
931        height / self.journal_heights_per_section
932    }
933
934    /// Ensures the journal exists and is initialized for the given sequencer.
935    /// If the journal does not exist, it is created and replayed.
936    /// Else, no action is taken.
937    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
938        // Return early if the journal already exists
939        if self.journals.contains_key(sequencer) {
940            return;
941        }
942
943        // Initialize journal
944        let cfg = journal::variable::Config {
945            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
946        };
947        let mut journal = Journal::init(self.context.clone(), cfg)
948            .await
949            .expect("unable to init journal");
950
951        // Replay journal
952        {
953            debug!(?sequencer, "journal replay begin");
954
955            // Prepare the stream
956            let stream = journal
957                .replay(self.journal_replay_concurrency, None)
958                .await
959                .expect("unable to replay journal");
960            pin_mut!(stream);
961
962            // Read from the stream, which may be in arbitrary order.
963            // Remember the highest node height
964            let mut tip: Option<parsed::Node<C, D>> = None;
965            let mut num_items = 0;
966            while let Some(msg) = stream.next().await {
967                num_items += 1;
968                let (_, _, _, msg) = msg.expect("unable to decode journal message");
969                let node = parsed::Node::<C, D>::decode(&msg)
970                    .expect("journal message is unexpected format");
971                let height = node.chunk.height;
972                match tip {
973                    None => {
974                        tip = Some(node);
975                    }
976                    Some(ref t) => {
977                        if height > t.chunk.height {
978                            tip = Some(node);
979                        }
980                    }
981                }
982            }
983
984            // Set the tip only once. The items from the journal may be in arbitrary order,
985            // and the tip manager will panic if inserting tips out-of-order.
986            if let Some(node) = tip.take() {
987                let is_new = self.tip_manager.put(&node);
988                assert!(is_new);
989            }
990
991            debug!(?sequencer, ?num_items, "journal replay end");
992        }
993
994        // Store journal
995        self.journals.insert(sequencer.clone(), journal);
996    }
997
998    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
999    ///
1000    /// To prevent ever writing two conflicting `Chunk`s at the same height,
1001    /// the journal must already be open and replayed.
1002    async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
1003        let section = self.get_journal_section(node.chunk.height);
1004        self.journals
1005            .get_mut(&node.chunk.sequencer)
1006            .expect("journal does not exist")
1007            .append(section, node.encode().into())
1008            .await
1009            .expect("unable to append to journal");
1010    }
1011
1012    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
1013    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
1014        let section = self.get_journal_section(height);
1015
1016        // Get journal
1017        let journal = self
1018            .journals
1019            .get_mut(sequencer)
1020            .expect("journal does not exist");
1021
1022        // Sync journal
1023        journal.sync(section).await.expect("unable to sync journal");
1024
1025        // Prune journal, ignoring errors
1026        let _ = journal.prune(section).await;
1027    }
1028
1029    ////////////////////////////////////////
1030    // Epoch
1031    ////////////////////////////////////////
1032
1033    /// Updates the epoch to the value of the coordinator, and sets the refresh epoch deadline.
1034    fn refresh_epoch(&mut self) {
1035        // Set the refresh epoch deadline
1036        self.refresh_epoch_deadline = Some(self.context.current() + self.refresh_epoch_timeout);
1037
1038        // Ensure epoch is not before the current epoch
1039        let epoch = self.coordinator.index();
1040        assert!(epoch >= self.epoch);
1041
1042        // Update the epoch
1043        self.epoch = epoch;
1044    }
1045}
1046
1047/// Errors that can occur when running the actor.
1048#[derive(Error, Debug)]
1049enum Error {
1050    // Application Verification Errors
1051    #[error("Application verify error: {0}")]
1052    AppVerifyCanceled(oneshot::Canceled),
1053    #[error("Application verified no tip")]
1054    AppVerifiedNoTip,
1055    #[error("Application verified height mismatch")]
1056    AppVerifiedHeightMismatch,
1057    #[error("Application verified payload mismatch")]
1058    AppVerifiedPayloadMismatch,
1059
1060    // P2P Errors
1061    #[error("Unable to send message")]
1062    UnableToSendMessage,
1063
1064    // Broadcast errors
1065    #[error("Already broadcast")]
1066    AlreadyBroadcast,
1067    #[error("I am not a sequencer in epoch {0}")]
1068    IAmNotASequencer(u64),
1069    #[error("Nothing to rebroadcast")]
1070    NothingToRebroadcast,
1071    #[error("Broadcast failed")]
1072    BroadcastFailed,
1073    #[error("No threshold for tip")]
1074    NoThresholdForTip(u64),
1075
1076    // Proto Malformed Errors
1077    #[error("Genesis chunk must not have a parent")]
1078    GenesisChunkMustNotHaveParent,
1079    #[error("Node missing parent")]
1080    NodeMissingParent,
1081
1082    // Epoch Errors
1083    #[error("Unknown identity at epoch {0}")]
1084    UnknownIdentity(u64),
1085    #[error("Unknown signers at epoch {0}")]
1086    UnknownSigners(u64),
1087    #[error("Epoch {0} has no sequencer {1}")]
1088    UnknownSequencer(u64, String),
1089    #[error("Epoch {0} has no signer {1}")]
1090    UnknownSigner(u64, String),
1091    #[error("Unknown share at epoch {0}")]
1092    UnknownShare(u64),
1093
1094    // Peer Errors
1095    #[error("Peer mismatch")]
1096    PeerMismatch,
1097
1098    // Signature Errors
1099    #[error("Invalid threshold signature")]
1100    InvalidThresholdSignature,
1101    #[error("Invalid partial signature")]
1102    InvalidPartialSignature,
1103    #[error("Invalid node signature")]
1104    InvalidNodeSignature,
1105
1106    // Ignorable Message Errors
1107    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1108    AckEpochOutsideBounds(u64, u64, u64),
1109    #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1110    AckHeightOutsideBounds(u64, u64, u64),
1111    #[error("Chunk height {0} lower than tip height {1}")]
1112    ChunkHeightTooLow(u64, u64),
1113
1114    // Slashable Errors
1115    #[error("Chunk mismatch from sender {0} with height {1}")]
1116    ChunkMismatch(String, u64),
1117}