commonware_broadcast/linked/signer/
actor.rs

1use super::{AckManager, Config, Mailbox, Message, TipManager};
2use crate::{
3    linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
4    Application, Collector, ThresholdCoordinator,
5};
6use commonware_cryptography::{
7    bls12381::primitives::{
8        group::{self},
9        ops,
10        poly::{self},
11    },
12    Scheme,
13};
14use commonware_macros::select;
15use commonware_p2p::{Receiver, Recipients, Sender};
16use commonware_runtime::{Blob, Clock, Handle, Metrics, Spawner, Storage};
17use commonware_storage::journal::{self, variable::Journal};
18use commonware_utils::Array;
19use futures::{
20    channel::{mpsc, oneshot},
21    future::{self, Either},
22    pin_mut,
23    stream::FuturesUnordered,
24    StreamExt,
25};
26use std::{
27    collections::BTreeMap,
28    future::Future,
29    marker::PhantomData,
30    pin::Pin,
31    time::{Duration, SystemTime},
32};
33use thiserror::Error;
34use tracing::{debug, error, info, warn};
35
36/// A future representing a request to the application to verify a payload.
37type VerifyFuture<D, P> =
38    Pin<Box<dyn Future<Output = (Context<P>, D, Result<bool, Error>)> + Send>>;
39
40/// The actor that implements the `Broadcaster` trait.
41pub struct Actor<
42    B: Blob,
43    E: Clock + Spawner + Storage<B> + Metrics,
44    C: Scheme,
45    D: Array,
46    A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
47    Z: Collector<Digest = D>,
48    S: ThresholdCoordinator<
49        Index = Epoch,
50        Share = group::Share,
51        Identity = poly::Public,
52        PublicKey = C::PublicKey,
53    >,
54    NetS: Sender<PublicKey = C::PublicKey>,
55    NetR: Receiver<PublicKey = C::PublicKey>,
56> {
57    ////////////////////////////////////////
58    // Interfaces
59    ////////////////////////////////////////
60    context: E,
61    crypto: C,
62    coordinator: S,
63    application: A,
64    collector: Z,
65    _sender: PhantomData<NetS>,
66    _receiver: PhantomData<NetR>,
67
68    ////////////////////////////////////////
69    // Namespace Constants
70    ////////////////////////////////////////
71
72    // The namespace for chunk signatures.
73    chunk_namespace: Vec<u8>,
74
75    // The namespace for ack signatures.
76    ack_namespace: Vec<u8>,
77
78    ////////////////////////////////////////
79    // Timeouts
80    ////////////////////////////////////////
81
82    // The configured timeout for refreshing the epoch
83    refresh_epoch_timeout: Duration,
84    refresh_epoch_deadline: Option<SystemTime>,
85
86    // The configured timeout for rebroadcasting a chunk to all signers
87    rebroadcast_timeout: Duration,
88    rebroadcast_deadline: Option<SystemTime>,
89
90    ////////////////////////////////////////
91    // Pruning
92    ////////////////////////////////////////
93
94    // A tuple representing the epochs to keep in memory.
95    // The first element is the number of old epochs to keep.
96    // The second element is the number of future epochs to accept.
97    //
98    // For example, if the current epoch is 10, and the bounds are (1, 2), then
99    // epochs 9, 10, 11, and 12 are kept (and accepted);
100    // all others are pruned or rejected.
101    epoch_bounds: (u64, u64),
102
103    // The number of future heights to accept acks for.
104    // This is used to prevent spam of acks for arbitrary heights.
105    //
106    // For example, if the current tip for a sequencer is at height 100,
107    // and the height_bound is 10, then acks for heights 100-110 are accepted.
108    height_bound: u64,
109
110    ////////////////////////////////////////
111    // Messaging
112    ////////////////////////////////////////
113
114    // A stream of futures.
115    // Each future represents a verification request to the application
116    // that will either timeout or resolve with a boolean.
117    pending_verifies: FuturesUnordered<VerifyFuture<D, C::PublicKey>>,
118
119    // The maximum number of items in `pending_verifies`.
120    pending_verify_size: usize,
121
122    // The mailbox for receiving messages (primarily from the application).
123    mailbox_receiver: mpsc::Receiver<Message<D>>,
124
125    ////////////////////////////////////////
126    // Storage
127    ////////////////////////////////////////
128
129    // The number of heights per each journal section.
130    journal_heights_per_section: u64,
131
132    // The number of concurrent operations when replaying journals.
133    journal_replay_concurrency: usize,
134
135    // A prefix for the journal names.
136    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
137    journal_name_prefix: String,
138
139    // A map of sequencer public keys to their journals.
140    journals: BTreeMap<C::PublicKey, Journal<B, E>>,
141
142    ////////////////////////////////////////
143    // State
144    ////////////////////////////////////////
145
146    // Tracks the current tip for each sequencer.
147    // The tip is a `Node` which is comprised of a `Chunk` and,
148    // if not the genesis chunk for that sequencer,
149    // a threshold signature over the parent chunk.
150    tip_manager: TipManager<C, D>,
151
152    // Tracks the acknowledgements for chunks.
153    // This is comprised of partial signatures or threshold signatures.
154    ack_manager: AckManager<D, C::PublicKey>,
155
156    // The current epoch.
157    epoch: Epoch,
158}
159
160impl<
161        B: Blob,
162        E: Clock + Spawner + Storage<B> + Metrics,
163        C: Scheme,
164        D: Array,
165        A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
166        Z: Collector<Digest = D>,
167        S: ThresholdCoordinator<
168            Index = Epoch,
169            Share = group::Share,
170            Identity = poly::Public,
171            PublicKey = C::PublicKey,
172        >,
173        NetS: Sender<PublicKey = C::PublicKey>,
174        NetR: Receiver<PublicKey = C::PublicKey>,
175    > Actor<B, E, C, D, A, Z, S, NetS, NetR>
176{
177    /// Creates a new actor with the given context and configuration.
178    /// Returns the actor and a mailbox for sending messages to the actor.
179    pub fn new(context: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
180        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
181        let mailbox = Mailbox::new(mailbox_sender);
182
183        // Initialize the pending verification futures.
184        let pending_verifies = FuturesUnordered::new();
185        {
186            // Inserts a dummy future (that never resolves) to prevent the stream from being empty.
187            // If the stream were empty, the `select_next_some()` function would return `None`
188            // instantly, front-running all other branches in the `select!` macro.
189            let dummy: VerifyFuture<D, C::PublicKey> = Box::pin(async {
190                future::pending::<(Context<C::PublicKey>, D, Result<bool, Error>)>().await
191            });
192            pending_verifies.push(dummy);
193        }
194
195        let result = Self {
196            context,
197            crypto: cfg.crypto,
198            _sender: PhantomData,
199            _receiver: PhantomData,
200            coordinator: cfg.coordinator,
201            application: cfg.application,
202            collector: cfg.collector,
203            chunk_namespace: namespace::chunk(&cfg.namespace),
204            ack_namespace: namespace::ack(&cfg.namespace),
205            refresh_epoch_timeout: cfg.refresh_epoch_timeout,
206            refresh_epoch_deadline: None,
207            rebroadcast_timeout: cfg.rebroadcast_timeout,
208            rebroadcast_deadline: None,
209            epoch_bounds: cfg.epoch_bounds,
210            height_bound: cfg.height_bound,
211            pending_verifies,
212            pending_verify_size: cfg.pending_verify_size,
213            mailbox_receiver,
214            journal_heights_per_section: cfg.journal_heights_per_section,
215            journal_replay_concurrency: cfg.journal_replay_concurrency,
216            journal_name_prefix: cfg.journal_name_prefix,
217            journals: BTreeMap::new(),
218            tip_manager: TipManager::<C, D>::new(),
219            ack_manager: AckManager::<D, C::PublicKey>::new(),
220            epoch: 0,
221        };
222
223        (result, mailbox)
224    }
225
226    /// Runs the actor until the context is stopped.
227    ///
228    /// The actor will handle:
229    /// - Timeouts
230    ///   - Refreshing the Epoch
231    ///   - Rebroadcasting Nodes
232    /// - Mailbox messages from the application:
233    ///   - Broadcast requests
234    ///   - Ack requests
235    /// - Messages from the network:
236    ///   - Nodes
237    ///   - Acks
238    pub fn start(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) -> Handle<()> {
239        self.context.spawn_ref()(self.run(chunk_network, ack_network))
240    }
241
242    /// Inner run loop called by `start`.
243    async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
244        let (mut node_sender, mut node_receiver) = chunk_network;
245        let (mut ack_sender, mut ack_receiver) = ack_network;
246        let mut shutdown = self.context.stopped();
247
248        // Before starting on the main loop, initialize my own sequencer journal
249        // and attempt to rebroadcast if necessary.
250        self.refresh_epoch();
251        self.journal_prepare(&self.crypto.public_key()).await;
252        if let Err(err) = self.rebroadcast(&mut node_sender).await {
253            // Rebroadcasting my return a non-critical error, so log the error and continue.
254            info!(?err, "initial rebroadcast failed");
255        }
256
257        loop {
258            // Enter the epoch
259            self.refresh_epoch();
260
261            // Create deadline futures.
262            // If the deadline is None, the future will never resolve.
263            let refresh_epoch = match self.refresh_epoch_deadline {
264                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
265                None => Either::Right(future::pending()),
266            };
267            let rebroadcast = match self.rebroadcast_deadline {
268                Some(deadline) => Either::Left(self.context.sleep_until(deadline)),
269                None => Either::Right(future::pending()),
270            };
271
272            select! {
273                // Handle shutdown signal
274                _ = &mut shutdown => {
275                    debug!("shutdown");
276                    while let Some((_, journal)) = self.journals.pop_first() {
277                        journal.close().await.expect("unable to close journal");
278                    }
279                    return;
280                },
281
282                // Handle refresh epoch deadline
283                _ = refresh_epoch => {
284                    debug!("refresh epoch");
285                    // Simply continue; the epoch will be refreshed on the next iteration.
286                    continue;
287                },
288
289                // Handle rebroadcast deadline
290                _ = rebroadcast => {
291                    debug!("rebroadcast");
292                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
293                        info!(?err, "rebroadcast failed");
294                        continue;
295                    }
296                },
297
298                // Handle incoming nodes
299                msg = node_receiver.recv() => {
300                    debug!("node network");
301                    // Error handling
302                    let (sender, msg) = match msg {
303                        Ok(r) => r,
304                        Err(err) => {
305                            error!(?err, "node receiver failed");
306                            break;
307                        }
308                    };
309                    let node = match parsed::Node::<C, D>::decode(&msg) {
310                        Ok(node) => node,
311                        Err(err) => {
312                            warn!(?err, ?sender, "node decode failed");
313                            continue;
314                        }
315                    };
316                    if let Err(err) = self.validate_node(&node, &sender) {
317                        warn!(?err, ?node, ?sender, "node validate failed");
318                        continue;
319                    };
320
321                    // Initialize journal for sequencer if it does not exist
322                    self.journal_prepare(&sender).await;
323
324                    // Handle the parent threshold signature
325                    if let Some(parent) = node.parent.as_ref() {
326                        self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
327                    }
328
329                    // Process the new node
330                    self.handle_node(&node).await;
331                },
332
333                // Handle incoming acks
334                msg = ack_receiver.recv() => {
335                    debug!("ack network");
336                    // Error handling
337                    let (sender, msg) = match msg {
338                        Ok(r) => r,
339                        Err(err) => {
340                            warn!(?err, "ack receiver failed");
341                            break;
342                        }
343                    };
344                    let ack = match parsed::Ack::decode(&msg) {
345                        Ok(ack) => ack,
346                        Err(err) => {
347                            warn!(?err, ?sender, "ack decode failed");
348                            continue;
349                        }
350                    };
351                    if let Err(err) = self.validate_ack(&ack, &sender) {
352                        warn!(?err, ?ack, ?sender, "ack validate failed");
353                        continue;
354                    };
355                    if let Err(err) = self.handle_ack(&ack).await {
356                        warn!(?err, ?ack, "ack handle failed");
357                        continue;
358                    }
359                },
360
361                // Handle completed verification futures.
362                maybe_verified = self.pending_verifies.select_next_some() => {
363                    let (context, digest, verify_result) = maybe_verified;
364                    match verify_result {
365                        Ok(true) => {
366                            debug!(?context, ?digest, "verified");
367                            if let Err(err) = self.handle_app_verified(&context, &digest, &mut ack_sender).await {
368                                warn!(?err, ?context, ?digest, "verified handle failed");
369                            }
370                        },
371                        Ok(false) => {
372                            warn!(?context, ?digest, "verified was false");
373                        },
374                        Err(err) => {
375                            warn!(?err, ?context, ?digest, "verified returned error");
376                        },
377                    }
378                },
379
380                // Handle mailbox messages
381                mail = self.mailbox_receiver.next() => {
382                    let Some(msg) = mail else {
383                        error!("mailbox receiver failed");
384                        break;
385                    };
386                    match msg {
387                        Message::Broadcast{ payload, result } => {
388                            debug!("broadcast");
389                            if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
390                                warn!(epoch=?self.epoch, ?payload, "not a sequencer");
391                                continue;
392                            }
393
394                            // Broadcast the message
395                            if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
396                                warn!(?err, "broadcast new failed");
397                                continue;
398                            }
399                        }
400                    }
401                }
402            }
403        }
404    }
405
406    ////////////////////////////////////////
407    // Handling
408    ////////////////////////////////////////
409
410    /// Handles a verified message from the application.
411    ///
412    /// This is called when the application has verified a payload.
413    /// The chunk will be signed if it matches the current tip.
414    async fn handle_app_verified(
415        &mut self,
416        context: &Context<C::PublicKey>,
417        payload: &D,
418        ack_sender: &mut NetS,
419    ) -> Result<(), Error> {
420        // Get the tip
421        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
422            return Err(Error::AppVerifiedNoTip);
423        };
424
425        // Return early if the height does not match
426        if tip.chunk.height != context.height {
427            return Err(Error::AppVerifiedHeightMismatch);
428        }
429
430        // Return early if the payload digest does not match
431        if tip.chunk.payload != *payload {
432            return Err(Error::AppVerifiedPayloadMismatch);
433        }
434
435        // Construct partial signature
436        let Some(share) = self.coordinator.share(self.epoch) else {
437            return Err(Error::UnknownShare(self.epoch));
438        };
439        let partial = ops::partial_sign_message(
440            share,
441            Some(&self.ack_namespace),
442            &serializer::ack(&tip.chunk, self.epoch),
443        );
444
445        // Sync the journal to prevent ever acking two conflicting chunks at
446        // the same height, even if the node crashes and restarts.
447        self.journal_sync(&context.sequencer, context.height).await;
448
449        // The recipients are all the signers in the epoch and the sequencer.
450        // The sequencer may or may not be a signer.
451        let recipients = {
452            let Some(signers) = self.coordinator.signers(self.epoch) else {
453                return Err(Error::UnknownSigners(self.epoch));
454            };
455            let mut recipients = signers.clone();
456            if self
457                .coordinator
458                .is_signer(self.epoch, &tip.chunk.sequencer)
459                .is_none()
460            {
461                recipients.push(tip.chunk.sequencer.clone());
462            }
463            recipients
464        };
465
466        // Send the ack to the network
467        let ack = parsed::Ack {
468            chunk: tip.chunk,
469            epoch: self.epoch,
470            partial,
471        };
472        ack_sender
473            .send(Recipients::Some(recipients), ack.encode().into(), false)
474            .await
475            .map_err(|_| Error::UnableToSendMessage)?;
476
477        // Handle the ack internally
478        self.handle_ack(&ack).await?;
479
480        Ok(())
481    }
482
483    /// Handles a threshold, either received from a `Node` from the network or generated locally.
484    ///
485    /// The threshold must already be verified.
486    /// If the threshold is new, it is stored and the proof is emitted to the collector.
487    /// If the threshold is already known, it is ignored.
488    async fn handle_threshold(
489        &mut self,
490        chunk: &parsed::Chunk<D, C::PublicKey>,
491        epoch: Epoch,
492        threshold: group::Signature,
493    ) {
494        // Set the threshold signature, returning early if it already exists
495        if !self
496            .ack_manager
497            .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
498        {
499            return;
500        }
501
502        // Emit the proof
503        let context = Context {
504            sequencer: chunk.sequencer.clone(),
505            height: chunk.height,
506        };
507        let proof =
508            Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
509        self.collector
510            .acknowledged(proof, chunk.payload.clone())
511            .await;
512    }
513
514    /// Handles an ack
515    ///
516    /// Returns an error if the ack is invalid, or can be ignored
517    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
518    async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
519        // Get the quorum
520        let Some(identity) = self.coordinator.identity(ack.epoch) else {
521            return Err(Error::UnknownIdentity(ack.epoch));
522        };
523        let quorum = identity.required();
524
525        // Add the partial signature. If a new threshold is formed, handle it.
526        if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
527            // Handle the threshold signature
528            self.handle_threshold(&ack.chunk, ack.epoch, threshold)
529                .await;
530        }
531
532        Ok(())
533    }
534
535    /// Handles a valid `Node` message, storing it as the tip.
536    /// Alerts the application of the new node.
537    /// Also appends the `Node` to the journal if it's new.
538    async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
539        // Store the tip
540        let is_new = self.tip_manager.put(node);
541
542        // Append to journal if the `Node` is new, making sure to sync the journal
543        // to prevent sending two conflicting chunks to the application, even if
544        // the node crashes and restarts.
545        if is_new {
546            self.journal_append(node).await;
547            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
548                .await;
549        }
550
551        // Ignore sending the verification request to the application
552        // if the number of pending requests is too high.
553        // We use > instead of >= because the stream always has one dummy future.
554        if self.pending_verifies.len() > self.pending_verify_size {
555            warn!(n=?self.pending_verifies.len(), "too many pending verifies");
556            return;
557        }
558
559        // Verify the chunk with the application
560        let context = Context {
561            sequencer: node.chunk.sequencer.clone(),
562            height: node.chunk.height,
563        };
564        let payload = node.chunk.payload.clone();
565        let mut app_clone = self.application.clone();
566        let verify_future = Box::pin(async move {
567            let receiver = app_clone.verify(context.clone(), payload.clone()).await;
568            let result = receiver.await.map_err(Error::AppVerifyCanceled);
569            (context, payload, result)
570        });
571
572        // Save the verification future
573        self.pending_verifies.push(verify_future);
574    }
575
576    ////////////////////////////////////////
577    // Broadcasting
578    ////////////////////////////////////////
579
580    /// Broadcast a message to the network.
581    ///
582    /// The result is returned to the caller via the provided channel.
583    /// The broadcast is only successful if the parent Chunk and threshold signature are known.
584    async fn broadcast_new(
585        &mut self,
586        payload: D,
587        result: oneshot::Sender<bool>,
588        node_sender: &mut NetS,
589    ) -> Result<(), Error> {
590        let me = self.crypto.public_key();
591
592        // Get parent Chunk and threshold signature
593        let mut height = 0;
594        let mut parent = None;
595        if let Some(tip) = self.tip_manager.get(&me) {
596            // Get threshold, or, if it doesn't exist, return an error
597            let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
598            else {
599                let _ = result.send(false);
600                return Err(Error::NoThresholdForTip(tip.chunk.height));
601            };
602
603            // Update height and parent
604            height = tip.chunk.height + 1;
605            parent = Some(parsed::Parent {
606                payload: tip.chunk.payload,
607                threshold,
608                epoch,
609            });
610        }
611
612        // Construct new node
613        let chunk = parsed::Chunk {
614            sequencer: me.clone(),
615            height,
616            payload,
617        };
618        let signature = self
619            .crypto
620            .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
621        let node = parsed::Node::<C, D> {
622            chunk,
623            signature,
624            parent,
625        };
626
627        // Deal with the chunk as if it were received over the network
628        self.handle_node(&node).await;
629
630        // Sync the journal to prevent ever broadcasting two conflicting chunks
631        // at the same height, even if the node crashes and restarts
632        self.journal_sync(&me, height).await;
633
634        // Broadcast to network
635        if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
636            let _ = result.send(false);
637            return Err(err);
638        };
639
640        // Return success
641        let _ = result.send(true);
642        Ok(())
643    }
644
645    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all signers.
646    ///
647    /// This is only done if:
648    /// - this instance is the sequencer for the current epoch.
649    /// - this instance has a chunk to rebroadcast.
650    /// - this instance has not yet collected the threshold signature for the chunk.
651    async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
652        // Unset the rebroadcast deadline
653        self.rebroadcast_deadline = None;
654
655        // Return if not a sequencer in the current epoch
656        let me = self.crypto.public_key();
657        if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
658            return Err(Error::IAmNotASequencer(self.epoch));
659        }
660
661        // Return if no chunk to rebroadcast
662        let Some(tip) = self.tip_manager.get(&me) else {
663            return Err(Error::NothingToRebroadcast);
664        };
665
666        // Return if threshold already collected
667        if self
668            .ack_manager
669            .get_threshold(&me, tip.chunk.height)
670            .is_some()
671        {
672            return Err(Error::AlreadyBroadcast);
673        }
674
675        // Broadcast the message, which resets the rebroadcast deadline
676        self.broadcast(&tip, node_sender, self.epoch).await?;
677
678        Ok(())
679    }
680
681    /// Send a  `Node` message to all signers in the given epoch.
682    async fn broadcast(
683        &mut self,
684        node: &parsed::Node<C, D>,
685        node_sender: &mut NetS,
686        epoch: Epoch,
687    ) -> Result<(), Error> {
688        // Send the node to all signers
689        let Some(signers) = self.coordinator.signers(epoch) else {
690            return Err(Error::UnknownSigners(epoch));
691        };
692        node_sender
693            .send(
694                Recipients::Some(signers.clone()),
695                node.encode().into(),
696                false,
697            )
698            .await
699            .map_err(|_| Error::BroadcastFailed)?;
700
701        // Set the rebroadcast deadline
702        self.rebroadcast_deadline = Some(self.context.current() + self.rebroadcast_timeout);
703
704        Ok(())
705    }
706
707    ////////////////////////////////////////
708    // Validation
709    ////////////////////////////////////////
710
711    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
712    ///
713    /// If valid, returns the implied parent chunk and its threshold signature.
714    /// Else returns an error if the `Node` is invalid.
715    fn validate_node(
716        &mut self,
717        node: &parsed::Node<C, D>,
718        sender: &C::PublicKey,
719    ) -> Result<(), Error> {
720        // Verify the sender
721        if node.chunk.sequencer != *sender {
722            return Err(Error::PeerMismatch);
723        }
724
725        // Optimization: If the node is exactly equal to the tip,
726        // don't perform any further validation.
727        if let Some(tip) = self.tip_manager.get(sender) {
728            if tip == *node {
729                return Ok(());
730            }
731        }
732
733        // Validate chunk
734        self.validate_chunk(&node.chunk, self.epoch)?;
735
736        // Verify the signature
737        if !C::verify(
738            Some(&self.chunk_namespace),
739            &serializer::chunk(&node.chunk),
740            sender,
741            &node.signature,
742        ) {
743            return Err(Error::InvalidNodeSignature);
744        }
745
746        // Verify no parent
747        if node.chunk.height == 0 {
748            if node.parent.is_some() {
749                return Err(Error::GenesisChunkMustNotHaveParent);
750            }
751            return Ok(());
752        }
753
754        // Verify parent
755        let Some(parent) = &node.parent else {
756            return Err(Error::NodeMissingParent);
757        };
758        let parent_chunk = parsed::Chunk {
759            sequencer: sender.clone(),
760            height: node.chunk.height.checked_sub(1).unwrap(),
761            payload: parent.payload.clone(),
762        };
763
764        // Verify parent threshold signature
765        let Some(identity) = self.coordinator.identity(parent.epoch) else {
766            return Err(Error::UnknownIdentity(parent.epoch));
767        };
768        let public_key = poly::public(identity);
769        ops::verify_message(
770            &public_key,
771            Some(&self.ack_namespace),
772            &serializer::ack(&parent_chunk, parent.epoch),
773            &parent.threshold,
774        )
775        .map_err(|_| Error::InvalidThresholdSignature)?;
776
777        Ok(())
778    }
779
780    /// Takes a raw ack (from sender) from the p2p network and validates it.
781    ///
782    /// Returns the chunk, epoch, and partial signature if the ack is valid.
783    /// Returns an error if the ack is invalid.
784    fn validate_ack(
785        &self,
786        ack: &parsed::Ack<D, C::PublicKey>,
787        sender: &C::PublicKey,
788    ) -> Result<(), Error> {
789        // Validate chunk
790        self.validate_chunk(&ack.chunk, ack.epoch)?;
791
792        // Validate sender
793        let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
794            return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
795        };
796        if signer_index != ack.partial.index {
797            return Err(Error::PeerMismatch);
798        }
799
800        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
801        {
802            let (eb_lo, eb_hi) = self.epoch_bounds;
803            let bound_lo = self.epoch.saturating_sub(eb_lo);
804            let bound_hi = self.epoch.saturating_add(eb_hi);
805            if ack.epoch < bound_lo || ack.epoch > bound_hi {
806                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
807            }
808        }
809
810        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
811        {
812            let bound_lo = self
813                .tip_manager
814                .get(&ack.chunk.sequencer)
815                .map(|t| t.chunk.height)
816                .unwrap_or(0);
817            let bound_hi = bound_lo + self.height_bound;
818            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
819                return Err(Error::AckHeightOutsideBounds(
820                    ack.chunk.height,
821                    bound_lo,
822                    bound_hi,
823                ));
824            }
825        }
826
827        // Validate partial signature
828        // Optimization: If the ack already exists, don't verify
829        let Some(identity) = self.coordinator.identity(ack.epoch) else {
830            return Err(Error::UnknownIdentity(ack.epoch));
831        };
832        ops::partial_verify_message(
833            identity,
834            Some(&self.ack_namespace),
835            &serializer::ack(&ack.chunk, ack.epoch),
836            &ack.partial,
837        )
838        .map_err(|_| Error::InvalidPartialSignature)?;
839
840        Ok(())
841    }
842
843    /// Takes a raw chunk from the p2p network and validates it against the epoch.
844    ///
845    /// Returns the chunk if the chunk is valid.
846    /// Returns an error if the chunk is invalid.
847    fn validate_chunk(
848        &self,
849        chunk: &parsed::Chunk<D, C::PublicKey>,
850        epoch: Epoch,
851    ) -> Result<(), Error> {
852        // Verify sequencer
853        if self
854            .coordinator
855            .is_sequencer(epoch, &chunk.sequencer)
856            .is_none()
857        {
858            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
859        }
860
861        // Verify height
862        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
863            // Height must be at least the tip height
864            match chunk.height.cmp(&tip.chunk.height) {
865                std::cmp::Ordering::Less => {
866                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
867                }
868                std::cmp::Ordering::Equal => {
869                    // Ensure this matches the tip if the height is the same
870                    if tip.chunk.payload != chunk.payload {
871                        return Err(Error::ChunkMismatch(
872                            chunk.sequencer.to_string(),
873                            chunk.height,
874                        ));
875                    }
876                }
877                std::cmp::Ordering::Greater => {}
878            }
879        }
880
881        Ok(())
882    }
883
884    ////////////////////////////////////////
885    // Journal
886    ////////////////////////////////////////
887
888    /// Returns the section of the journal for the given height.
889    fn get_journal_section(&self, height: u64) -> u64 {
890        height / self.journal_heights_per_section
891    }
892
893    /// Ensures the journal exists and is initialized for the given sequencer.
894    /// If the journal does not exist, it is created and replayed.
895    /// Else, no action is taken.
896    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
897        // Return early if the journal already exists
898        if self.journals.contains_key(sequencer) {
899            return;
900        }
901
902        // Initialize journal
903        let cfg = journal::variable::Config {
904            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
905        };
906        let mut journal = Journal::init(self.context.clone(), cfg)
907            .await
908            .expect("unable to init journal");
909
910        // Replay journal
911        {
912            debug!(?sequencer, "journal replay begin");
913
914            // Prepare the stream
915            let stream = journal
916                .replay(self.journal_replay_concurrency, None)
917                .await
918                .expect("unable to replay journal");
919            pin_mut!(stream);
920
921            // Read from the stream, which may be in arbitrary order.
922            // Remember the highest node height
923            let mut tip: Option<parsed::Node<C, D>> = None;
924            let mut num_items = 0;
925            while let Some(msg) = stream.next().await {
926                num_items += 1;
927                let (_, _, _, msg) = msg.expect("unable to decode journal message");
928                let node = parsed::Node::<C, D>::decode(&msg)
929                    .expect("journal message is unexpected format");
930                let height = node.chunk.height;
931                match tip {
932                    None => {
933                        tip = Some(node);
934                    }
935                    Some(ref t) => {
936                        if height > t.chunk.height {
937                            tip = Some(node);
938                        }
939                    }
940                }
941            }
942
943            // Set the tip only once. The items from the journal may be in arbitrary order,
944            // and the tip manager will panic if inserting tips out-of-order.
945            if let Some(node) = tip.take() {
946                let is_new = self.tip_manager.put(&node);
947                assert!(is_new);
948            }
949
950            debug!(?sequencer, ?num_items, "journal replay end");
951        }
952
953        // Store journal
954        self.journals.insert(sequencer.clone(), journal);
955    }
956
957    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
958    ///
959    /// To prevent ever writing two conflicting `Chunk`s at the same height,
960    /// the journal must already be open and replayed.
961    async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
962        let section = self.get_journal_section(node.chunk.height);
963        self.journals
964            .get_mut(&node.chunk.sequencer)
965            .expect("journal does not exist")
966            .append(section, node.encode().into())
967            .await
968            .expect("unable to append to journal");
969    }
970
971    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
972    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
973        let section = self.get_journal_section(height);
974
975        // Get journal
976        let journal = self
977            .journals
978            .get_mut(sequencer)
979            .expect("journal does not exist");
980
981        // Sync journal
982        journal.sync(section).await.expect("unable to sync journal");
983
984        // Prune journal, ignoring errors
985        let _ = journal.prune(section).await;
986    }
987
988    ////////////////////////////////////////
989    // Epoch
990    ////////////////////////////////////////
991
992    /// Updates the epoch to the value of the coordinator, and sets the refresh epoch deadline.
993    fn refresh_epoch(&mut self) {
994        // Set the refresh epoch deadline
995        self.refresh_epoch_deadline = Some(self.context.current() + self.refresh_epoch_timeout);
996
997        // Ensure epoch is not before the current epoch
998        let epoch = self.coordinator.index();
999        assert!(epoch >= self.epoch);
1000
1001        // Update the epoch
1002        self.epoch = epoch;
1003    }
1004}
1005
1006/// Errors that can occur when running the actor.
1007#[derive(Error, Debug)]
1008enum Error {
1009    // Application Verification Errors
1010    #[error("Application verify error: {0}")]
1011    AppVerifyCanceled(oneshot::Canceled),
1012    #[error("Application verified no tip")]
1013    AppVerifiedNoTip,
1014    #[error("Application verified height mismatch")]
1015    AppVerifiedHeightMismatch,
1016    #[error("Application verified payload mismatch")]
1017    AppVerifiedPayloadMismatch,
1018
1019    // P2P Errors
1020    #[error("Unable to send message")]
1021    UnableToSendMessage,
1022
1023    // Broadcast errors
1024    #[error("Already broadcast")]
1025    AlreadyBroadcast,
1026    #[error("I am not a sequencer in epoch {0}")]
1027    IAmNotASequencer(u64),
1028    #[error("Nothing to rebroadcast")]
1029    NothingToRebroadcast,
1030    #[error("Broadcast failed")]
1031    BroadcastFailed,
1032    #[error("No threshold for tip")]
1033    NoThresholdForTip(u64),
1034
1035    // Proto Malformed Errors
1036    #[error("Genesis chunk must not have a parent")]
1037    GenesisChunkMustNotHaveParent,
1038    #[error("Node missing parent")]
1039    NodeMissingParent,
1040
1041    // Epoch Errors
1042    #[error("Unknown identity at epoch {0}")]
1043    UnknownIdentity(u64),
1044    #[error("Unknown signers at epoch {0}")]
1045    UnknownSigners(u64),
1046    #[error("Epoch {0} has no sequencer {1}")]
1047    UnknownSequencer(u64, String),
1048    #[error("Epoch {0} has no signer {1}")]
1049    UnknownSigner(u64, String),
1050    #[error("Unknown share at epoch {0}")]
1051    UnknownShare(u64),
1052
1053    // Peer Errors
1054    #[error("Peer mismatch")]
1055    PeerMismatch,
1056
1057    // Signature Errors
1058    #[error("Invalid threshold signature")]
1059    InvalidThresholdSignature,
1060    #[error("Invalid partial signature")]
1061    InvalidPartialSignature,
1062    #[error("Invalid node signature")]
1063    InvalidNodeSignature,
1064
1065    // Ignorable Message Errors
1066    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1067    AckEpochOutsideBounds(u64, u64, u64),
1068    #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1069    AckHeightOutsideBounds(u64, u64, u64),
1070    #[error("Chunk height {0} lower than tip height {1}")]
1071    ChunkHeightTooLow(u64, u64),
1072
1073    // Slashable Errors
1074    #[error("Chunk mismatch from sender {0} with height {1}")]
1075    ChunkMismatch(String, u64),
1076}