commonware_broadcast/linked/signer/
actor.rs

1use super::{AckManager, Config, Mailbox, Message, TipManager};
2use crate::{
3    linked::{namespace, parsed, prover::Prover, serializer, Context, Epoch},
4    Application, Collector, ThresholdCoordinator,
5};
6use commonware_cryptography::{
7    bls12381::primitives::{
8        group::{self},
9        ops,
10        poly::{self},
11    },
12    Array, Scheme,
13};
14use commonware_macros::select;
15use commonware_p2p::{Receiver, Recipients, Sender};
16use commonware_runtime::{Blob, Clock, Spawner, Storage};
17use commonware_storage::journal::{self, variable::Journal};
18use futures::{
19    channel::{mpsc, oneshot},
20    future::{self, Either},
21    pin_mut,
22    stream::FuturesUnordered,
23    StreamExt,
24};
25use prometheus_client::registry::Registry;
26use std::{
27    collections::BTreeMap,
28    future::Future,
29    marker::PhantomData,
30    pin::Pin,
31    sync::{Arc, Mutex},
32    time::{Duration, SystemTime},
33};
34use thiserror::Error;
35use tracing::{debug, error, info, warn};
36
37/// A future representing a request to the application to verify a payload.
38type VerifyFuture<D, P> =
39    Pin<Box<dyn Future<Output = (Context<P>, D, Result<bool, Error>)> + Send>>;
40
41/// The actor that implements the `Broadcaster` trait.
42pub struct Actor<
43    B: Blob,
44    E: Clock + Spawner + Storage<B>,
45    C: Scheme,
46    D: Array,
47    A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
48    Z: Collector<Digest = D>,
49    S: ThresholdCoordinator<
50        Index = Epoch,
51        Share = group::Share,
52        Identity = poly::Public,
53        PublicKey = C::PublicKey,
54    >,
55    NetS: Sender<PublicKey = C::PublicKey>,
56    NetR: Receiver<PublicKey = C::PublicKey>,
57> {
58    ////////////////////////////////////////
59    // Interfaces
60    ////////////////////////////////////////
61    runtime: E,
62    crypto: C,
63    coordinator: S,
64    application: A,
65    collector: Z,
66    _sender: PhantomData<NetS>,
67    _receiver: PhantomData<NetR>,
68
69    ////////////////////////////////////////
70    // Namespace Constants
71    ////////////////////////////////////////
72
73    // The namespace for chunk signatures.
74    chunk_namespace: Vec<u8>,
75
76    // The namespace for ack signatures.
77    ack_namespace: Vec<u8>,
78
79    ////////////////////////////////////////
80    // Timeouts
81    ////////////////////////////////////////
82
83    // The configured timeout for refreshing the epoch
84    refresh_epoch_timeout: Duration,
85    refresh_epoch_deadline: Option<SystemTime>,
86
87    // The configured timeout for rebroadcasting a chunk to all signers
88    rebroadcast_timeout: Duration,
89    rebroadcast_deadline: Option<SystemTime>,
90
91    ////////////////////////////////////////
92    // Pruning
93    ////////////////////////////////////////
94
95    // A tuple representing the epochs to keep in memory.
96    // The first element is the number of old epochs to keep.
97    // The second element is the number of future epochs to accept.
98    //
99    // For example, if the current epoch is 10, and the bounds are (1, 2), then
100    // epochs 9, 10, 11, and 12 are kept (and accepted);
101    // all others are pruned or rejected.
102    epoch_bounds: (u64, u64),
103
104    // The number of future heights to accept acks for.
105    // This is used to prevent spam of acks for arbitrary heights.
106    //
107    // For example, if the current tip for a sequencer is at height 100,
108    // and the height_bound is 10, then acks for heights 100-110 are accepted.
109    height_bound: u64,
110
111    ////////////////////////////////////////
112    // Messaging
113    ////////////////////////////////////////
114
115    // A stream of futures.
116    // Each future represents a verification request to the application
117    // that will either timeout or resolve with a boolean.
118    pending_verifies: FuturesUnordered<VerifyFuture<D, C::PublicKey>>,
119
120    // The maximum number of items in `pending_verifies`.
121    pending_verify_size: usize,
122
123    // The mailbox for receiving messages (primarily from the application).
124    mailbox_receiver: mpsc::Receiver<Message<D>>,
125
126    ////////////////////////////////////////
127    // Storage
128    ////////////////////////////////////////
129
130    // The number of heights per each journal section.
131    journal_heights_per_section: u64,
132
133    // The number of concurrent operations when replaying journals.
134    journal_replay_concurrency: usize,
135
136    // A prefix for the journal names.
137    // The rest of the name is the hex-encoded public keys of the relevant sequencer.
138    journal_name_prefix: String,
139
140    // A map of sequencer public keys to their journals.
141    journals: BTreeMap<C::PublicKey, Journal<B, E>>,
142
143    ////////////////////////////////////////
144    // State
145    ////////////////////////////////////////
146
147    // Tracks the current tip for each sequencer.
148    // The tip is a `Node` which is comprised of a `Chunk` and,
149    // if not the genesis chunk for that sequencer,
150    // a threshold signature over the parent chunk.
151    tip_manager: TipManager<C, D>,
152
153    // Tracks the acknowledgements for chunks.
154    // This is comprised of partial signatures or threshold signatures.
155    ack_manager: AckManager<D, C::PublicKey>,
156
157    // The current epoch.
158    epoch: Epoch,
159}
160
161impl<
162        B: Blob,
163        E: Clock + Spawner + Storage<B>,
164        C: Scheme,
165        D: Array,
166        A: Application<Context = Context<C::PublicKey>, Digest = D> + Clone,
167        Z: Collector<Digest = D>,
168        S: ThresholdCoordinator<
169            Index = Epoch,
170            Share = group::Share,
171            Identity = poly::Public,
172            PublicKey = C::PublicKey,
173        >,
174        NetS: Sender<PublicKey = C::PublicKey>,
175        NetR: Receiver<PublicKey = C::PublicKey>,
176    > Actor<B, E, C, D, A, Z, S, NetS, NetR>
177{
178    /// Creates a new actor with the given runtime and configuration.
179    /// Returns the actor and a mailbox for sending messages to the actor.
180    pub fn new(runtime: E, cfg: Config<C, D, A, Z, S>) -> (Self, Mailbox<D>) {
181        let (mailbox_sender, mailbox_receiver) = mpsc::channel(cfg.mailbox_size);
182        let mailbox = Mailbox::new(mailbox_sender);
183
184        // Initialize the pending verification futures.
185        let pending_verifies = FuturesUnordered::new();
186        {
187            // Inserts a dummy future (that never resolves) to prevent the stream from being empty.
188            // If the stream were empty, the `select_next_some()` function would return `None`
189            // instantly, front-running all other branches in the `select!` macro.
190            let dummy: VerifyFuture<D, C::PublicKey> = Box::pin(async {
191                future::pending::<(Context<C::PublicKey>, D, Result<bool, Error>)>().await
192            });
193            pending_verifies.push(dummy);
194        }
195
196        let result = Self {
197            runtime,
198            crypto: cfg.crypto,
199            _sender: PhantomData,
200            _receiver: PhantomData,
201            coordinator: cfg.coordinator,
202            application: cfg.application,
203            collector: cfg.collector,
204            chunk_namespace: namespace::chunk(&cfg.namespace),
205            ack_namespace: namespace::ack(&cfg.namespace),
206            refresh_epoch_timeout: cfg.refresh_epoch_timeout,
207            refresh_epoch_deadline: None,
208            rebroadcast_timeout: cfg.rebroadcast_timeout,
209            rebroadcast_deadline: None,
210            epoch_bounds: cfg.epoch_bounds,
211            height_bound: cfg.height_bound,
212            pending_verifies,
213            pending_verify_size: cfg.pending_verify_size,
214            mailbox_receiver,
215            journal_heights_per_section: cfg.journal_heights_per_section,
216            journal_replay_concurrency: cfg.journal_replay_concurrency,
217            journal_name_prefix: cfg.journal_name_prefix,
218            journals: BTreeMap::new(),
219            tip_manager: TipManager::<C, D>::new(),
220            ack_manager: AckManager::<D, C::PublicKey>::new(),
221            epoch: 0,
222        };
223
224        (result, mailbox)
225    }
226
227    /// Runs the actor until the runtime is stopped.
228    ///
229    /// The actor will handle:
230    /// - Timeouts
231    ///   - Refreshing the Epoch
232    ///   - Rebroadcasting Nodes
233    /// - Mailbox messages from the application:
234    ///   - Broadcast requests
235    ///   - Ack requests
236    /// - Messages from the network:
237    ///   - Nodes
238    ///   - Acks
239    pub async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
240        let (mut node_sender, mut node_receiver) = chunk_network;
241        let (mut ack_sender, mut ack_receiver) = ack_network;
242        let mut shutdown = self.runtime.stopped();
243
244        // Before starting on the main runtime loop, initialize my own sequencer journal
245        // and attempt to rebroadcast if necessary.
246        self.refresh_epoch();
247        self.journal_prepare(&self.crypto.public_key()).await;
248        if let Err(err) = self.rebroadcast(&mut node_sender).await {
249            // Rebroadcasting my return a non-critical error, so log the error and continue.
250            info!(?err, "initial rebroadcast failed");
251        }
252
253        loop {
254            // Enter the epoch
255            self.refresh_epoch();
256
257            // Create deadline futures.
258            // If the deadline is None, the future will never resolve.
259            let refresh_epoch = match self.refresh_epoch_deadline {
260                Some(deadline) => Either::Left(self.runtime.sleep_until(deadline)),
261                None => Either::Right(future::pending()),
262            };
263            let rebroadcast = match self.rebroadcast_deadline {
264                Some(deadline) => Either::Left(self.runtime.sleep_until(deadline)),
265                None => Either::Right(future::pending()),
266            };
267
268            select! {
269                // Handle shutdown signal
270                _ = &mut shutdown => {
271                    debug!("shutdown");
272                    while let Some((_, journal)) = self.journals.pop_first() {
273                        journal.close().await.expect("unable to close journal");
274                    }
275                    return;
276                },
277
278                // Handle refresh epoch deadline
279                _ = refresh_epoch => {
280                    debug!("refresh epoch");
281                    // Simply continue; the epoch will be refreshed on the next iteration.
282                    continue;
283                },
284
285                // Handle rebroadcast deadline
286                _ = rebroadcast => {
287                    debug!("rebroadcast");
288                    if let Err(err) = self.rebroadcast(&mut node_sender).await {
289                        info!(?err, "rebroadcast failed");
290                        continue;
291                    }
292                },
293
294                // Handle incoming nodes
295                msg = node_receiver.recv() => {
296                    debug!("node network");
297                    // Error handling
298                    let (sender, msg) = match msg {
299                        Ok(r) => r,
300                        Err(err) => {
301                            error!(?err, "node receiver failed");
302                            break;
303                        }
304                    };
305                    let node = match parsed::Node::<C, D>::decode(&msg) {
306                        Ok(node) => node,
307                        Err(err) => {
308                            warn!(?err, ?sender, "node decode failed");
309                            continue;
310                        }
311                    };
312                    if let Err(err) = self.validate_node(&node, &sender) {
313                        warn!(?err, ?node, ?sender, "node validate failed");
314                        continue;
315                    };
316
317                    // Initialize journal for sequencer if it does not exist
318                    self.journal_prepare(&sender).await;
319
320                    // Handle the parent threshold signature
321                    if let Some(parent) = node.parent.as_ref() {
322                        self.handle_threshold(&node.chunk, parent.epoch, parent.threshold).await;
323                    }
324
325                    // Process the new node
326                    self.handle_node(&node).await;
327                },
328
329                // Handle incoming acks
330                msg = ack_receiver.recv() => {
331                    debug!("ack network");
332                    // Error handling
333                    let (sender, msg) = match msg {
334                        Ok(r) => r,
335                        Err(err) => {
336                            warn!(?err, "ack receiver failed");
337                            break;
338                        }
339                    };
340                    let ack = match parsed::Ack::decode(&msg) {
341                        Ok(ack) => ack,
342                        Err(err) => {
343                            warn!(?err, ?sender, "ack decode failed");
344                            continue;
345                        }
346                    };
347                    if let Err(err) = self.validate_ack(&ack, &sender) {
348                        warn!(?err, ?ack, ?sender, "ack validate failed");
349                        continue;
350                    };
351                    if let Err(err) = self.handle_ack(&ack).await {
352                        warn!(?err, ?ack, "ack handle failed");
353                        continue;
354                    }
355                },
356
357                // Handle completed verification futures.
358                maybe_verified = self.pending_verifies.select_next_some() => {
359                    let (context, digest, verify_result) = maybe_verified;
360                    match verify_result {
361                        Ok(true) => {
362                            debug!(?context, ?digest, "verified");
363                            if let Err(err) = self.handle_app_verified(&context, &digest, &mut ack_sender).await {
364                                warn!(?err, ?context, ?digest, "verified handle failed");
365                            }
366                        },
367                        Ok(false) => {
368                            warn!(?context, ?digest, "verified was false");
369                        },
370                        Err(err) => {
371                            warn!(?err, ?context, ?digest, "verified returned error");
372                        },
373                    }
374                },
375
376                // Handle mailbox messages
377                mail = self.mailbox_receiver.next() => {
378                    let Some(msg) = mail else {
379                        error!("mailbox receiver failed");
380                        break;
381                    };
382                    match msg {
383                        Message::Broadcast{ payload, result } => {
384                            debug!("broadcast");
385                            if self.coordinator.is_sequencer(self.epoch, &self.crypto.public_key()).is_none() {
386                                warn!(epoch=?self.epoch, ?payload, "not a sequencer");
387                                continue;
388                            }
389
390                            // Broadcast the message
391                            if let Err(err) = self.broadcast_new(payload, result, &mut node_sender).await {
392                                warn!(?err, "broadcast new failed");
393                                continue;
394                            }
395                        }
396                    }
397                }
398            }
399        }
400    }
401
402    ////////////////////////////////////////
403    // Handling
404    ////////////////////////////////////////
405
406    /// Handles a verified message from the application.
407    ///
408    /// This is called when the application has verified a payload.
409    /// The chunk will be signed if it matches the current tip.
410    async fn handle_app_verified(
411        &mut self,
412        context: &Context<C::PublicKey>,
413        payload: &D,
414        ack_sender: &mut NetS,
415    ) -> Result<(), Error> {
416        // Get the tip
417        let Some(tip) = self.tip_manager.get(&context.sequencer) else {
418            return Err(Error::AppVerifiedNoTip);
419        };
420
421        // Return early if the height does not match
422        if tip.chunk.height != context.height {
423            return Err(Error::AppVerifiedHeightMismatch);
424        }
425
426        // Return early if the payload digest does not match
427        if tip.chunk.payload != *payload {
428            return Err(Error::AppVerifiedPayloadMismatch);
429        }
430
431        // Construct partial signature
432        let Some(share) = self.coordinator.share(self.epoch) else {
433            return Err(Error::UnknownShare(self.epoch));
434        };
435        let partial = ops::partial_sign_message(
436            share,
437            Some(&self.ack_namespace),
438            &serializer::ack(&tip.chunk, self.epoch),
439        );
440
441        // Sync the journal to prevent ever acking two conflicting chunks at
442        // the same height, even if the node crashes and restarts.
443        self.journal_sync(&context.sequencer, context.height).await;
444
445        // The recipients are all the signers in the epoch and the sequencer.
446        // The sequencer may or may not be a signer.
447        let recipients = {
448            let Some(signers) = self.coordinator.signers(self.epoch) else {
449                return Err(Error::UnknownSigners(self.epoch));
450            };
451            let mut recipients = signers.clone();
452            if self
453                .coordinator
454                .is_signer(self.epoch, &tip.chunk.sequencer)
455                .is_none()
456            {
457                recipients.push(tip.chunk.sequencer.clone());
458            }
459            recipients
460        };
461
462        // Send the ack to the network
463        let ack = parsed::Ack {
464            chunk: tip.chunk,
465            epoch: self.epoch,
466            partial,
467        };
468        ack_sender
469            .send(Recipients::Some(recipients), ack.encode().into(), false)
470            .await
471            .map_err(|_| Error::UnableToSendMessage)?;
472
473        // Handle the ack internally
474        self.handle_ack(&ack).await?;
475
476        Ok(())
477    }
478
479    /// Handles a threshold, either received from a `Node` from the network or generated locally.
480    ///
481    /// The threshold must already be verified.
482    /// If the threshold is new, it is stored and the proof is emitted to the collector.
483    /// If the threshold is already known, it is ignored.
484    async fn handle_threshold(
485        &mut self,
486        chunk: &parsed::Chunk<D, C::PublicKey>,
487        epoch: Epoch,
488        threshold: group::Signature,
489    ) {
490        // Set the threshold signature, returning early if it already exists
491        if !self
492            .ack_manager
493            .add_threshold(&chunk.sequencer, chunk.height, epoch, threshold)
494        {
495            return;
496        }
497
498        // Emit the proof
499        let context = Context {
500            sequencer: chunk.sequencer.clone(),
501            height: chunk.height,
502        };
503        let proof =
504            Prover::<C, D>::serialize_threshold(&context, &chunk.payload, epoch, &threshold);
505        self.collector
506            .acknowledged(proof, chunk.payload.clone())
507            .await;
508    }
509
510    /// Handles an ack
511    ///
512    /// Returns an error if the ack is invalid, or can be ignored
513    /// (e.g. already exists, threshold already exists, is outside the epoch bounds, etc.).
514    async fn handle_ack(&mut self, ack: &parsed::Ack<D, C::PublicKey>) -> Result<(), Error> {
515        // Get the quorum
516        let Some(identity) = self.coordinator.identity(ack.epoch) else {
517            return Err(Error::UnknownIdentity(ack.epoch));
518        };
519        let quorum = identity.required();
520
521        // Add the partial signature. If a new threshold is formed, handle it.
522        if let Some(threshold) = self.ack_manager.add_ack(ack, quorum) {
523            // Handle the threshold signature
524            self.handle_threshold(&ack.chunk, ack.epoch, threshold)
525                .await;
526        }
527
528        Ok(())
529    }
530
531    /// Handles a valid `Node` message, storing it as the tip.
532    /// Alerts the application of the new node.
533    /// Also appends the `Node` to the journal if it's new.
534    async fn handle_node(&mut self, node: &parsed::Node<C, D>) {
535        // Store the tip
536        let is_new = self.tip_manager.put(node);
537
538        // Append to journal if the `Node` is new, making sure to sync the journal
539        // to prevent sending two conflicting chunks to the application, even if
540        // the node crashes and restarts.
541        if is_new {
542            self.journal_append(node).await;
543            self.journal_sync(&node.chunk.sequencer, node.chunk.height)
544                .await;
545        }
546
547        // Ignore sending the verification request to the application
548        // if the number of pending requests is too high.
549        // We use > instead of >= because the stream always has one dummy future.
550        if self.pending_verifies.len() > self.pending_verify_size {
551            warn!(n=?self.pending_verifies.len(), "too many pending verifies");
552            return;
553        }
554
555        // Verify the chunk with the application
556        let context = Context {
557            sequencer: node.chunk.sequencer.clone(),
558            height: node.chunk.height,
559        };
560        let payload = node.chunk.payload.clone();
561        let mut app_clone = self.application.clone();
562        let verify_future = Box::pin(async move {
563            let receiver = app_clone.verify(context.clone(), payload.clone()).await;
564            let result = receiver.await.map_err(Error::AppVerifyCanceled);
565            (context, payload, result)
566        });
567
568        // Save the verification future
569        self.pending_verifies.push(verify_future);
570    }
571
572    ////////////////////////////////////////
573    // Broadcasting
574    ////////////////////////////////////////
575
576    /// Broadcast a message to the network.
577    ///
578    /// The result is returned to the caller via the provided channel.
579    /// The broadcast is only successful if the parent Chunk and threshold signature are known.
580    async fn broadcast_new(
581        &mut self,
582        payload: D,
583        result: oneshot::Sender<bool>,
584        node_sender: &mut NetS,
585    ) -> Result<(), Error> {
586        let me = self.crypto.public_key();
587
588        // Get parent Chunk and threshold signature
589        let mut height = 0;
590        let mut parent = None;
591        if let Some(tip) = self.tip_manager.get(&me) {
592            // Get threshold, or, if it doesn't exist, return an error
593            let Some((epoch, threshold)) = self.ack_manager.get_threshold(&me, tip.chunk.height)
594            else {
595                let _ = result.send(false);
596                return Err(Error::NoThresholdForTip(tip.chunk.height));
597            };
598
599            // Update height and parent
600            height = tip.chunk.height + 1;
601            parent = Some(parsed::Parent {
602                payload: tip.chunk.payload,
603                threshold,
604                epoch,
605            });
606        }
607
608        // Construct new node
609        let chunk = parsed::Chunk {
610            sequencer: me.clone(),
611            height,
612            payload,
613        };
614        let signature = self
615            .crypto
616            .sign(Some(&self.chunk_namespace), &serializer::chunk(&chunk));
617        let node = parsed::Node::<C, D> {
618            chunk,
619            signature,
620            parent,
621        };
622
623        // Deal with the chunk as if it were received over the network
624        self.handle_node(&node).await;
625
626        // Sync the journal to prevent ever broadcasting two conflicting chunks
627        // at the same height, even if the node crashes and restarts
628        self.journal_sync(&me, height).await;
629
630        // Broadcast to network
631        if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
632            let _ = result.send(false);
633            return Err(err);
634        };
635
636        // Return success
637        let _ = result.send(true);
638        Ok(())
639    }
640
641    /// Attempt to rebroadcast the highest-height chunk of this sequencer to all signers.
642    ///
643    /// This is only done if:
644    /// - this instance is the sequencer for the current epoch.
645    /// - this instance has a chunk to rebroadcast.
646    /// - this instance has not yet collected the threshold signature for the chunk.
647    async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
648        // Unset the rebroadcast deadline
649        self.rebroadcast_deadline = None;
650
651        // Return if not a sequencer in the current epoch
652        let me = self.crypto.public_key();
653        if self.coordinator.is_sequencer(self.epoch, &me).is_none() {
654            return Err(Error::IAmNotASequencer(self.epoch));
655        }
656
657        // Return if no chunk to rebroadcast
658        let Some(tip) = self.tip_manager.get(&me) else {
659            return Err(Error::NothingToRebroadcast);
660        };
661
662        // Return if threshold already collected
663        if self
664            .ack_manager
665            .get_threshold(&me, tip.chunk.height)
666            .is_some()
667        {
668            return Err(Error::AlreadyBroadcast);
669        }
670
671        // Broadcast the message, which resets the rebroadcast deadline
672        self.broadcast(&tip, node_sender, self.epoch).await?;
673
674        Ok(())
675    }
676
677    /// Send a  `Node` message to all signers in the given epoch.
678    async fn broadcast(
679        &mut self,
680        node: &parsed::Node<C, D>,
681        node_sender: &mut NetS,
682        epoch: Epoch,
683    ) -> Result<(), Error> {
684        // Send the node to all signers
685        let Some(signers) = self.coordinator.signers(epoch) else {
686            return Err(Error::UnknownSigners(epoch));
687        };
688        node_sender
689            .send(
690                Recipients::Some(signers.clone()),
691                node.encode().into(),
692                false,
693            )
694            .await
695            .map_err(|_| Error::BroadcastFailed)?;
696
697        // Set the rebroadcast deadline
698        self.rebroadcast_deadline = Some(self.runtime.current() + self.rebroadcast_timeout);
699
700        Ok(())
701    }
702
703    ////////////////////////////////////////
704    // Validation
705    ////////////////////////////////////////
706
707    /// Takes a raw `Node` (from sender) from the p2p network and validates it.
708    ///
709    /// If valid, returns the implied parent chunk and its threshold signature.
710    /// Else returns an error if the `Node` is invalid.
711    fn validate_node(
712        &mut self,
713        node: &parsed::Node<C, D>,
714        sender: &C::PublicKey,
715    ) -> Result<(), Error> {
716        // Verify the sender
717        if node.chunk.sequencer != *sender {
718            return Err(Error::PeerMismatch);
719        }
720
721        // Optimization: If the node is exactly equal to the tip,
722        // don't perform any further validation.
723        if let Some(tip) = self.tip_manager.get(sender) {
724            if tip == *node {
725                return Ok(());
726            }
727        }
728
729        // Validate chunk
730        self.validate_chunk(&node.chunk, self.epoch)?;
731
732        // Verify the signature
733        if !C::verify(
734            Some(&self.chunk_namespace),
735            &serializer::chunk(&node.chunk),
736            sender,
737            &node.signature,
738        ) {
739            return Err(Error::InvalidNodeSignature);
740        }
741
742        // Verify no parent
743        if node.chunk.height == 0 {
744            if node.parent.is_some() {
745                return Err(Error::GenesisChunkMustNotHaveParent);
746            }
747            return Ok(());
748        }
749
750        // Verify parent
751        let Some(parent) = &node.parent else {
752            return Err(Error::NodeMissingParent);
753        };
754        let parent_chunk = parsed::Chunk {
755            sequencer: sender.clone(),
756            height: node.chunk.height.checked_sub(1).unwrap(),
757            payload: parent.payload.clone(),
758        };
759
760        // Verify parent threshold signature
761        let Some(identity) = self.coordinator.identity(parent.epoch) else {
762            return Err(Error::UnknownIdentity(parent.epoch));
763        };
764        let public_key = poly::public(identity);
765        ops::verify_message(
766            &public_key,
767            Some(&self.ack_namespace),
768            &serializer::ack(&parent_chunk, parent.epoch),
769            &parent.threshold,
770        )
771        .map_err(|_| Error::InvalidThresholdSignature)?;
772
773        Ok(())
774    }
775
776    /// Takes a raw ack (from sender) from the p2p network and validates it.
777    ///
778    /// Returns the chunk, epoch, and partial signature if the ack is valid.
779    /// Returns an error if the ack is invalid.
780    fn validate_ack(
781        &self,
782        ack: &parsed::Ack<D, C::PublicKey>,
783        sender: &C::PublicKey,
784    ) -> Result<(), Error> {
785        // Validate chunk
786        self.validate_chunk(&ack.chunk, ack.epoch)?;
787
788        // Validate sender
789        let Some(signer_index) = self.coordinator.is_signer(ack.epoch, sender) else {
790            return Err(Error::UnknownSigner(ack.epoch, sender.to_string()));
791        };
792        if signer_index != ack.partial.index {
793            return Err(Error::PeerMismatch);
794        }
795
796        // Spam prevention: If the ack is for an epoch that is too old or too new, ignore.
797        {
798            let (eb_lo, eb_hi) = self.epoch_bounds;
799            let bound_lo = self.epoch.saturating_sub(eb_lo);
800            let bound_hi = self.epoch.saturating_add(eb_hi);
801            if ack.epoch < bound_lo || ack.epoch > bound_hi {
802                return Err(Error::AckEpochOutsideBounds(ack.epoch, bound_lo, bound_hi));
803            }
804        }
805
806        // Spam prevention: If the ack is for a height that is too old or too new, ignore.
807        {
808            let bound_lo = self
809                .tip_manager
810                .get(&ack.chunk.sequencer)
811                .map(|t| t.chunk.height)
812                .unwrap_or(0);
813            let bound_hi = bound_lo + self.height_bound;
814            if ack.chunk.height < bound_lo || ack.chunk.height > bound_hi {
815                return Err(Error::AckHeightOutsideBounds(
816                    ack.chunk.height,
817                    bound_lo,
818                    bound_hi,
819                ));
820            }
821        }
822
823        // Validate partial signature
824        // Optimization: If the ack already exists, don't verify
825        let Some(identity) = self.coordinator.identity(ack.epoch) else {
826            return Err(Error::UnknownIdentity(ack.epoch));
827        };
828        ops::partial_verify_message(
829            identity,
830            Some(&self.ack_namespace),
831            &serializer::ack(&ack.chunk, ack.epoch),
832            &ack.partial,
833        )
834        .map_err(|_| Error::InvalidPartialSignature)?;
835
836        Ok(())
837    }
838
839    /// Takes a raw chunk from the p2p network and validates it against the epoch.
840    ///
841    /// Returns the chunk if the chunk is valid.
842    /// Returns an error if the chunk is invalid.
843    fn validate_chunk(
844        &self,
845        chunk: &parsed::Chunk<D, C::PublicKey>,
846        epoch: Epoch,
847    ) -> Result<(), Error> {
848        // Verify sequencer
849        if self
850            .coordinator
851            .is_sequencer(epoch, &chunk.sequencer)
852            .is_none()
853        {
854            return Err(Error::UnknownSequencer(epoch, chunk.sequencer.to_string()));
855        }
856
857        // Verify height
858        if let Some(tip) = self.tip_manager.get(&chunk.sequencer) {
859            // Height must be at least the tip height
860            match chunk.height.cmp(&tip.chunk.height) {
861                std::cmp::Ordering::Less => {
862                    return Err(Error::ChunkHeightTooLow(chunk.height, tip.chunk.height));
863                }
864                std::cmp::Ordering::Equal => {
865                    // Ensure this matches the tip if the height is the same
866                    if tip.chunk.payload != chunk.payload {
867                        return Err(Error::ChunkMismatch(
868                            chunk.sequencer.to_string(),
869                            chunk.height,
870                        ));
871                    }
872                }
873                std::cmp::Ordering::Greater => {}
874            }
875        }
876
877        Ok(())
878    }
879
880    ////////////////////////////////////////
881    // Journal
882    ////////////////////////////////////////
883
884    /// Returns the section of the journal for the given height.
885    fn get_journal_section(&self, height: u64) -> u64 {
886        height / self.journal_heights_per_section
887    }
888
889    /// Ensures the journal exists and is initialized for the given sequencer.
890    /// If the journal does not exist, it is created and replayed.
891    /// Else, no action is taken.
892    async fn journal_prepare(&mut self, sequencer: &C::PublicKey) {
893        // Return early if the journal already exists
894        if self.journals.contains_key(sequencer) {
895            return;
896        }
897
898        // Initialize journal
899        let cfg = journal::variable::Config {
900            registry: Arc::new(Mutex::new(Registry::default())),
901            partition: format!("{}{}", &self.journal_name_prefix, sequencer),
902        };
903        let mut journal = Journal::init(self.runtime.clone(), cfg)
904            .await
905            .expect("unable to init journal");
906
907        // Replay journal
908        {
909            debug!(?sequencer, "journal replay begin");
910
911            // Prepare the stream
912            let stream = journal
913                .replay(self.journal_replay_concurrency, None)
914                .await
915                .expect("unable to replay journal");
916            pin_mut!(stream);
917
918            // Read from the stream, which may be in arbitrary order.
919            // Remember the highest node height
920            let mut tip: Option<parsed::Node<C, D>> = None;
921            let mut num_items = 0;
922            while let Some(msg) = stream.next().await {
923                num_items += 1;
924                let (_, _, _, msg) = msg.expect("unable to decode journal message");
925                let node = parsed::Node::<C, D>::decode(&msg)
926                    .expect("journal message is unexpected format");
927                let height = node.chunk.height;
928                match tip {
929                    None => {
930                        tip = Some(node);
931                    }
932                    Some(ref t) => {
933                        if height > t.chunk.height {
934                            tip = Some(node);
935                        }
936                    }
937                }
938            }
939
940            // Set the tip only once. The items from the journal may be in arbitrary order,
941            // and the tip manager will panic if inserting tips out-of-order.
942            if let Some(node) = tip.take() {
943                let is_new = self.tip_manager.put(&node);
944                assert!(is_new);
945            }
946
947            debug!(?sequencer, ?num_items, "journal replay end");
948        }
949
950        // Store journal
951        self.journals.insert(sequencer.clone(), journal);
952    }
953
954    /// Write a `Node` to the appropriate journal, which contains the tip `Chunk` for the sequencer.
955    ///
956    /// To prevent ever writing two conflicting `Chunk`s at the same height,
957    /// the journal must already be open and replayed.
958    async fn journal_append(&mut self, node: &parsed::Node<C, D>) {
959        let section = self.get_journal_section(node.chunk.height);
960        self.journals
961            .get_mut(&node.chunk.sequencer)
962            .expect("journal does not exist")
963            .append(section, node.encode().into())
964            .await
965            .expect("unable to append to journal");
966    }
967
968    /// Syncs (ensures all data is written to disk) and prunes the journal for the given sequencer and height.
969    async fn journal_sync(&mut self, sequencer: &C::PublicKey, height: u64) {
970        let section = self.get_journal_section(height);
971
972        // Get journal
973        let journal = self
974            .journals
975            .get_mut(sequencer)
976            .expect("journal does not exist");
977
978        // Sync journal
979        journal.sync(section).await.expect("unable to sync journal");
980
981        // Prune journal, ignoring errors
982        let _ = journal.prune(section).await;
983    }
984
985    ////////////////////////////////////////
986    // Epoch
987    ////////////////////////////////////////
988
989    /// Updates the epoch to the value of the coordinator, and sets the refresh epoch deadline.
990    fn refresh_epoch(&mut self) {
991        // Set the refresh epoch deadline
992        self.refresh_epoch_deadline = Some(self.runtime.current() + self.refresh_epoch_timeout);
993
994        // Ensure epoch is not before the current epoch
995        let epoch = self.coordinator.index();
996        assert!(epoch >= self.epoch);
997
998        // Update the epoch
999        self.epoch = epoch;
1000    }
1001}
1002
1003/// Errors that can occur when running the actor.
1004#[derive(Error, Debug)]
1005enum Error {
1006    // Application Verification Errors
1007    #[error("Application verify error: {0}")]
1008    AppVerifyCanceled(oneshot::Canceled),
1009    #[error("Application verified no tip")]
1010    AppVerifiedNoTip,
1011    #[error("Application verified height mismatch")]
1012    AppVerifiedHeightMismatch,
1013    #[error("Application verified payload mismatch")]
1014    AppVerifiedPayloadMismatch,
1015
1016    // P2P Errors
1017    #[error("Unable to send message")]
1018    UnableToSendMessage,
1019
1020    // Broadcast errors
1021    #[error("Already broadcast")]
1022    AlreadyBroadcast,
1023    #[error("I am not a sequencer in epoch {0}")]
1024    IAmNotASequencer(u64),
1025    #[error("Nothing to rebroadcast")]
1026    NothingToRebroadcast,
1027    #[error("Broadcast failed")]
1028    BroadcastFailed,
1029    #[error("No threshold for tip")]
1030    NoThresholdForTip(u64),
1031
1032    // Proto Malformed Errors
1033    #[error("Genesis chunk must not have a parent")]
1034    GenesisChunkMustNotHaveParent,
1035    #[error("Node missing parent")]
1036    NodeMissingParent,
1037
1038    // Epoch Errors
1039    #[error("Unknown identity at epoch {0}")]
1040    UnknownIdentity(u64),
1041    #[error("Unknown signers at epoch {0}")]
1042    UnknownSigners(u64),
1043    #[error("Epoch {0} has no sequencer {1}")]
1044    UnknownSequencer(u64, String),
1045    #[error("Epoch {0} has no signer {1}")]
1046    UnknownSigner(u64, String),
1047    #[error("Unknown share at epoch {0}")]
1048    UnknownShare(u64),
1049
1050    // Peer Errors
1051    #[error("Peer mismatch")]
1052    PeerMismatch,
1053
1054    // Signature Errors
1055    #[error("Invalid threshold signature")]
1056    InvalidThresholdSignature,
1057    #[error("Invalid partial signature")]
1058    InvalidPartialSignature,
1059    #[error("Invalid node signature")]
1060    InvalidNodeSignature,
1061
1062    // Ignorable Message Errors
1063    #[error("Invalid ack epoch {0} outside bounds {1} - {2}")]
1064    AckEpochOutsideBounds(u64, u64, u64),
1065    #[error("Invalid ack height {0} outside bounds {1} - {2}")]
1066    AckHeightOutsideBounds(u64, u64, u64),
1067    #[error("Chunk height {0} lower than tip height {1}")]
1068    ChunkHeightTooLow(u64, u64),
1069
1070    // Slashable Errors
1071    #[error("Chunk mismatch from sender {0} with height {1}")]
1072    ChunkMismatch(String, u64),
1073}