Skip to main content

commonware_consensus/marshal/
mod.rs

1//! Ordered delivery of finalized blocks.
2//!
3//! # Architecture
4//!
5//! The core of the module is the [actor::Actor]. It marshals the finalized blocks into order by:
6//!
7//! - Receiving uncertified blocks from a broadcast mechanism
8//! - Receiving notarizations and finalizations from consensus
9//! - Reconstructing a total order of finalized blocks
10//! - Providing a backfill mechanism for missing blocks
11//!
12//! The actor interacts with four main components:
13//! - [crate::Reporter]: Receives ordered, finalized blocks at-least-once
14//! - [crate::simplex]: Provides consensus messages
15//! - Application: Provides verified blocks
16//! - [commonware_broadcast::buffered]: Provides uncertified blocks received from the network
17//! - [commonware_resolver::Resolver]: Provides a backfill mechanism for missing blocks
18//!
19//! # Design
20//!
21//! ## Delivery
22//!
23//! The actor will deliver a block to the reporter at-least-once. The reporter should be prepared to
24//! handle duplicate deliveries. However the blocks will be in order.
25//!
26//! ## Finalization
27//!
28//! The actor uses a view-based model to track the state of the chain. Each view corresponds
29//! to a potential block in the chain. The actor will only finalize a block (and its ancestors)
30//! if it has a corresponding finalization from consensus.
31//!
32//! _It is possible that there may exist multiple finalizations for the same block in different views. Marshal
33//! only concerns itself with verifying a valid finalization exists for a block, not that a specific finalization
34//! exists. This means different Marshals may have different finalizations for the same block persisted to disk._
35//!
36//! ## Backfill
37//!
38//! The actor provides a backfill mechanism for missing blocks. If the actor notices a gap in its
39//! knowledge of finalized blocks, it will request the missing blocks from its peers. This ensures
40//! that the actor can catch up to the rest of the network if it falls behind.
41//!
42//! ## Storage
43//!
44//! The actor uses a combination of internal and external ([`store::Certificates`], [`store::Blocks`]) storage
45//! to store blocks and finalizations. Internal storage is used to store data that is only needed for a short
46//! period of time, such as unverified blocks or notarizations. External storage is used to
47//! store data that needs to be persisted indefinitely, such as finalized blocks.
48//!
49//! Marshal will store all blocks after a configurable starting height (or, floor) onward.
50//! This allows for state sync from a specific height rather than from genesis. When
51//! updating the starting height, marshal will attempt to prune blocks in external storage
52//! that are no longer needed, if the backing [`store::Blocks`] supports pruning.
53//!
54//! _Setting a configurable starting height will prevent others from backfilling blocks below said height. This
55//! feature is only recommended for applications that support state sync (i.e., those that don't require full
56//! block history to participate in consensus)._
57//!
58//! ## Limitations and Future Work
59//!
60//! - Only works with [crate::simplex] rather than general consensus.
61//! - Assumes at-most one notarization per view, incompatible with some consensus protocols.
62//! - Uses [`broadcast::buffered`](`commonware_broadcast::buffered`) for broadcasting and receiving
63//!   uncertified blocks from the network.
64
65pub mod actor;
66pub use actor::Actor;
67pub mod cache;
68pub mod config;
69pub use config::Config;
70pub mod ingress;
71pub use ingress::mailbox::Mailbox;
72pub mod resolver;
73pub mod store;
74
75use crate::{
76    types::{Height, Round},
77    Block,
78};
79use commonware_utils::{acknowledgement::Exact, Acknowledgement};
80
81/// An update reported to the application, either a new finalized tip or a finalized block.
82///
83/// Finalized tips are reported as soon as known, whether or not we hold all blocks up to that height.
84/// Finalized blocks are reported to the application in monotonically increasing order (no gaps permitted).
85#[derive(Clone, Debug)]
86pub enum Update<B: Block, A: Acknowledgement = Exact> {
87    /// A new finalized tip and the finalization round.
88    Tip(Round, Height, B::Commitment),
89    /// A new finalized block and an [Acknowledgement] for the application to signal once processed.
90    ///
91    /// To ensure all blocks are delivered at least once, marshal waits to mark a block as delivered
92    /// until the application explicitly acknowledges the update. If the [Acknowledgement] is dropped before
93    /// handling, marshal will exit (assuming the application is shutting down).
94    ///
95    /// Because the [Acknowledgement] is clonable, the application can pass [Update] to multiple consumers
96    /// (and marshal will only consider the block delivered once all consumers have acknowledged it).
97    Block(B, A),
98}
99
100#[cfg(test)]
101pub mod mocks;
102
103#[cfg(test)]
104mod tests {
105    use super::{
106        actor,
107        config::Config,
108        mocks::{application::Application, block::Block},
109        resolver::p2p as resolver,
110    };
111    use crate::{
112        application::marshaled::Marshaled,
113        marshal::ingress::mailbox::{AncestorStream, Identifier},
114        simplex::{
115            scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
116            types::{Activity, Context, Finalization, Finalize, Notarization, Notarize, Proposal},
117        },
118        types::{Epoch, Epocher, FixedEpocher, Height, Round, View, ViewDelta},
119        Automaton, CertifiableAutomaton, Heightable, Reporter, VerifyingApplication,
120    };
121    use commonware_broadcast::buffered;
122    use commonware_cryptography::{
123        bls12381::primitives::variant::MinPk,
124        certificate::{mocks::Fixture, ConstantProvider, Scheme as _},
125        ed25519::{PrivateKey, PublicKey},
126        sha256::{Digest as Sha256Digest, Sha256},
127        Committable, Digestible, Hasher as _, Signer,
128    };
129    use commonware_macros::{select, test_traced};
130    use commonware_p2p::{
131        simulated::{self, Link, Network, Oracle},
132        Manager,
133    };
134    use commonware_parallel::Sequential;
135    use commonware_runtime::{
136        buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner,
137    };
138    use commonware_storage::{
139        archive::{immutable, prunable},
140        translator::EightCap,
141    };
142    use commonware_utils::{vec::NonEmptyVec, NZUsize, NZU16, NZU64};
143    use futures::StreamExt;
144    use rand::{
145        seq::{IteratorRandom, SliceRandom},
146        Rng,
147    };
148    use std::{
149        collections::BTreeMap,
150        num::{NonZeroU16, NonZeroU32, NonZeroU64, NonZeroUsize},
151        time::{Duration, Instant},
152    };
153    use tracing::info;
154
155    type D = Sha256Digest;
156    type K = PublicKey;
157    type Ctx = crate::simplex::types::Context<D, K>;
158    type B = Block<D, Ctx>;
159    type V = MinPk;
160    type S = bls12381_threshold_vrf::Scheme<K, V>;
161    type P = ConstantProvider<S, Epoch>;
162
163    /// Default leader key for tests.
164    fn default_leader() -> K {
165        PrivateKey::from_seed(0).public_key()
166    }
167
168    /// Create a test block with a derived context.
169    ///
170    /// The context is constructed with:
171    /// - Round: epoch 0, view = height
172    /// - Leader: default (all zeros)
173    /// - Parent: (view = height - 1, commitment = parent)
174    fn make_block(parent: D, height: Height, timestamp: u64) -> B {
175        let parent_view = height
176            .previous()
177            .map(|h| View::new(h.get()))
178            .unwrap_or(View::zero());
179        let context = Ctx {
180            round: Round::new(Epoch::zero(), View::new(height.get())),
181            leader: default_leader(),
182            parent: (parent_view, parent),
183        };
184        B::new::<Sha256>(context, parent, height, timestamp)
185    }
186
187    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
188    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
189    const NAMESPACE: &[u8] = b"test";
190    const NUM_VALIDATORS: u32 = 4;
191    const QUORUM: u32 = 3;
192    const NUM_BLOCKS: u64 = 160;
193    const BLOCKS_PER_EPOCH: NonZeroU64 = NZU64!(20);
194    const LINK: Link = Link {
195        latency: Duration::from_millis(100),
196        jitter: Duration::from_millis(1),
197        success_rate: 1.0,
198    };
199    const UNRELIABLE_LINK: Link = Link {
200        latency: Duration::from_millis(200),
201        jitter: Duration::from_millis(50),
202        success_rate: 0.7,
203    };
204    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
205
206    async fn setup_validator(
207        context: deterministic::Context,
208        oracle: &mut Oracle<K, deterministic::Context>,
209        validator: K,
210        provider: P,
211    ) -> (
212        Application<B>,
213        crate::marshal::ingress::mailbox::Mailbox<S, B>,
214        Height,
215    ) {
216        let config = Config {
217            provider,
218            epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
219            mailbox_size: 100,
220            view_retention_timeout: ViewDelta::new(10),
221            max_repair: NZUsize!(10),
222            block_codec_config: (),
223            partition_prefix: format!("validator-{}", validator.clone()),
224            prunable_items_per_section: NZU64!(10),
225            replay_buffer: NZUsize!(1024),
226            key_write_buffer: NZUsize!(1024),
227            value_write_buffer: NZUsize!(1024),
228            page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
229            strategy: Sequential,
230        };
231
232        // Create the resolver
233        let control = oracle.control(validator.clone());
234        let backfill = control.register(1, TEST_QUOTA).await.unwrap();
235        let resolver_cfg = resolver::Config {
236            public_key: validator.clone(),
237            provider: oracle.manager(),
238            blocker: control.clone(),
239            mailbox_size: config.mailbox_size,
240            initial: Duration::from_secs(1),
241            timeout: Duration::from_secs(2),
242            fetch_retry_timeout: Duration::from_millis(100),
243            priority_requests: false,
244            priority_responses: false,
245        };
246        let resolver = resolver::init(&context, resolver_cfg, backfill);
247
248        // Create a buffered broadcast engine and get its mailbox
249        let broadcast_config = buffered::Config {
250            public_key: validator.clone(),
251            mailbox_size: config.mailbox_size,
252            deque_size: 10,
253            priority: false,
254            codec_config: (),
255        };
256        let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
257        let network = control.register(2, TEST_QUOTA).await.unwrap();
258        broadcast_engine.start(network);
259
260        // Initialize finalizations by height
261        let start = Instant::now();
262        let finalizations_by_height = immutable::Archive::init(
263            context.with_label("finalizations_by_height"),
264            immutable::Config {
265                metadata_partition: format!(
266                    "{}-finalizations-by-height-metadata",
267                    config.partition_prefix
268                ),
269                freezer_table_partition: format!(
270                    "{}-finalizations-by-height-freezer-table",
271                    config.partition_prefix
272                ),
273                freezer_table_initial_size: 64,
274                freezer_table_resize_frequency: 10,
275                freezer_table_resize_chunk_size: 10,
276                freezer_key_partition: format!(
277                    "{}-finalizations-by-height-freezer-key",
278                    config.partition_prefix
279                ),
280                freezer_key_page_cache: config.page_cache.clone(),
281                freezer_value_partition: format!(
282                    "{}-finalizations-by-height-freezer-value",
283                    config.partition_prefix
284                ),
285                freezer_value_target_size: 1024,
286                freezer_value_compression: None,
287                ordinal_partition: format!(
288                    "{}-finalizations-by-height-ordinal",
289                    config.partition_prefix
290                ),
291                items_per_section: NZU64!(10),
292                codec_config: S::certificate_codec_config_unbounded(),
293                replay_buffer: config.replay_buffer,
294                freezer_key_write_buffer: config.key_write_buffer,
295                freezer_value_write_buffer: config.value_write_buffer,
296                ordinal_write_buffer: config.key_write_buffer,
297            },
298        )
299        .await
300        .expect("failed to initialize finalizations by height archive");
301        info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
302
303        // Initialize finalized blocks
304        let start = Instant::now();
305        let finalized_blocks = immutable::Archive::init(
306            context.with_label("finalized_blocks"),
307            immutable::Config {
308                metadata_partition: format!(
309                    "{}-finalized_blocks-metadata",
310                    config.partition_prefix
311                ),
312                freezer_table_partition: format!(
313                    "{}-finalized_blocks-freezer-table",
314                    config.partition_prefix
315                ),
316                freezer_table_initial_size: 64,
317                freezer_table_resize_frequency: 10,
318                freezer_table_resize_chunk_size: 10,
319                freezer_key_partition: format!(
320                    "{}-finalized_blocks-freezer-key",
321                    config.partition_prefix
322                ),
323                freezer_key_page_cache: config.page_cache.clone(),
324                freezer_value_partition: format!(
325                    "{}-finalized_blocks-freezer-value",
326                    config.partition_prefix
327                ),
328                freezer_value_target_size: 1024,
329                freezer_value_compression: None,
330                ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
331                items_per_section: NZU64!(10),
332                codec_config: config.block_codec_config,
333                replay_buffer: config.replay_buffer,
334                freezer_key_write_buffer: config.key_write_buffer,
335                freezer_value_write_buffer: config.value_write_buffer,
336                ordinal_write_buffer: config.key_write_buffer,
337            },
338        )
339        .await
340        .expect("failed to initialize finalized blocks archive");
341        info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
342
343        let (actor, mailbox, processed_height) = actor::Actor::init(
344            context.clone(),
345            finalizations_by_height,
346            finalized_blocks,
347            config,
348        )
349        .await;
350        let application = Application::<B>::default();
351
352        // Start the application
353        actor.start(application.clone(), buffer, resolver);
354
355        (application, mailbox, processed_height)
356    }
357
358    fn make_finalization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Finalization<S, D> {
359        // Generate proposal signature
360        let finalizes: Vec<_> = schemes
361            .iter()
362            .take(quorum as usize)
363            .map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
364            .collect();
365
366        // Generate certificate signatures
367        Finalization::from_finalizes(&schemes[0], &finalizes, &Sequential).unwrap()
368    }
369
370    fn make_notarization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Notarization<S, D> {
371        // Generate proposal signature
372        let notarizes: Vec<_> = schemes
373            .iter()
374            .take(quorum as usize)
375            .map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
376            .collect();
377
378        // Generate certificate signatures
379        Notarization::from_notarizes(&schemes[0], &notarizes, &Sequential).unwrap()
380    }
381
382    fn setup_network(
383        context: deterministic::Context,
384        tracked_peer_sets: Option<usize>,
385    ) -> Oracle<K, deterministic::Context> {
386        let (network, oracle) = Network::new(
387            context.with_label("network"),
388            simulated::Config {
389                max_size: 1024 * 1024,
390                disconnect_on_block: true,
391                tracked_peer_sets,
392            },
393        );
394        network.start();
395        oracle
396    }
397
398    async fn setup_network_links(
399        oracle: &mut Oracle<K, deterministic::Context>,
400        peers: &[K],
401        link: Link,
402    ) {
403        for p1 in peers.iter() {
404            for p2 in peers.iter() {
405                if p2 == p1 {
406                    continue;
407                }
408                let _ = oracle.add_link(p1.clone(), p2.clone(), link.clone()).await;
409            }
410        }
411    }
412
413    #[test_traced("WARN")]
414    fn test_finalize_good_links() {
415        for seed in 0..5 {
416            let result1 = finalize(seed, LINK, false);
417            let result2 = finalize(seed, LINK, false);
418
419            // Ensure determinism
420            assert_eq!(result1, result2);
421        }
422    }
423
424    #[test_traced("WARN")]
425    fn test_finalize_bad_links() {
426        for seed in 0..5 {
427            let result1 = finalize(seed, UNRELIABLE_LINK, false);
428            let result2 = finalize(seed, UNRELIABLE_LINK, false);
429
430            // Ensure determinism
431            assert_eq!(result1, result2);
432        }
433    }
434
435    #[test_traced("WARN")]
436    fn test_finalize_good_links_quorum_sees_finalization() {
437        for seed in 0..5 {
438            let result1 = finalize(seed, LINK, true);
439            let result2 = finalize(seed, LINK, true);
440
441            // Ensure determinism
442            assert_eq!(result1, result2);
443        }
444    }
445
446    #[test_traced("DEBUG")]
447    fn test_finalize_bad_links_quorum_sees_finalization() {
448        for seed in 0..5 {
449            let result1 = finalize(seed, UNRELIABLE_LINK, true);
450            let result2 = finalize(seed, UNRELIABLE_LINK, true);
451
452            // Ensure determinism
453            assert_eq!(result1, result2);
454        }
455    }
456
457    fn finalize(seed: u64, link: Link, quorum_sees_finalization: bool) -> String {
458        let runner = deterministic::Runner::new(
459            deterministic::Config::new()
460                .with_seed(seed)
461                .with_timeout(Some(Duration::from_secs(600))),
462        );
463        runner.start(|mut context| async move {
464            let mut oracle = setup_network(context.clone(), Some(3));
465            let Fixture {
466                participants,
467                schemes,
468                ..
469            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
470
471            // Initialize applications and actors
472            let mut applications = BTreeMap::new();
473            let mut actors = Vec::new();
474
475            // Register the initial peer set.
476            let mut manager = oracle.manager();
477            manager
478                .track(0, participants.clone().try_into().unwrap())
479                .await;
480            for (i, validator) in participants.iter().enumerate() {
481                let (application, actor, _processed_height) = setup_validator(
482                    context.with_label(&format!("validator_{i}")),
483                    &mut oracle,
484                    validator.clone(),
485                    ConstantProvider::new(schemes[i].clone()),
486                )
487                .await;
488                applications.insert(validator.clone(), application);
489                actors.push(actor);
490            }
491
492            // Add links between all peers
493            setup_network_links(&mut oracle, &participants, link.clone()).await;
494
495            // Generate blocks, skipping the genesis block.
496            let mut blocks = Vec::<B>::new();
497            let mut parent = Sha256::hash(b"");
498            for i in 1..=NUM_BLOCKS {
499                let block = make_block(parent, Height::new(i), i);
500                parent = block.digest();
501                blocks.push(block);
502            }
503
504            // Broadcast and finalize blocks in random order
505            let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
506            blocks.shuffle(&mut context);
507            for block in blocks.iter() {
508                // Skip genesis block
509                let height = block.height();
510                assert!(
511                    !height.is_zero(),
512                    "genesis block should not have been generated"
513                );
514
515                // Calculate the epoch and round for the block
516                let bounds = epocher.containing(height).unwrap();
517                let round = Round::new(bounds.epoch(), View::new(height.get()));
518
519                // Broadcast block by one validator
520                let actor_index: usize = (height.get() % (NUM_VALIDATORS as u64)) as usize;
521                let mut actor = actors[actor_index].clone();
522                actor.proposed(round, block.clone()).await;
523                actor.verified(round, block.clone()).await;
524
525                // Wait for the block to be broadcast, but due to jitter, we may or may not receive
526                // the block before continuing.
527                context.sleep(link.latency).await;
528
529                // Notarize block by the validator that broadcasted it
530                let proposal = Proposal {
531                    round,
532                    parent: View::new(height.previous().unwrap().get()),
533                    payload: block.digest(),
534                };
535                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
536                actor
537                    .report(Activity::Notarization(notarization.clone()))
538                    .await;
539
540                // Finalize block by all validators
541                // Always finalize 1) the last block in each epoch 2) the last block in the chain.
542                let fin = make_finalization(proposal, &schemes, QUORUM);
543                if quorum_sees_finalization {
544                    // If `quorum_sees_finalization` is set, ensure at least `QUORUM` sees a finalization 20%
545                    // of the time.
546                    let do_finalize = context.gen_bool(0.2);
547                    for (i, actor) in actors
548                        .iter_mut()
549                        .choose_multiple(&mut context, NUM_VALIDATORS as usize)
550                        .iter_mut()
551                        .enumerate()
552                    {
553                        if (do_finalize && i < QUORUM as usize)
554                            || height == Height::new(NUM_BLOCKS)
555                            || height == bounds.last()
556                        {
557                            actor.report(Activity::Finalization(fin.clone())).await;
558                        }
559                    }
560                } else {
561                    // If `quorum_sees_finalization` is not set, finalize randomly with a 20% chance for each
562                    // individual participant.
563                    for actor in actors.iter_mut() {
564                        if context.gen_bool(0.2)
565                            || height == Height::new(NUM_BLOCKS)
566                            || height == bounds.last()
567                        {
568                            actor.report(Activity::Finalization(fin.clone())).await;
569                        }
570                    }
571                }
572            }
573
574            // Check that all applications received all blocks.
575            let mut finished = false;
576            while !finished {
577                // Avoid a busy loop
578                context.sleep(Duration::from_secs(1)).await;
579
580                // If not all validators have finished, try again
581                if applications.len() != NUM_VALIDATORS as usize {
582                    continue;
583                }
584                finished = true;
585                for app in applications.values() {
586                    if app.blocks().len() != NUM_BLOCKS as usize {
587                        finished = false;
588                        break;
589                    }
590                    let Some((height, _)) = app.tip() else {
591                        finished = false;
592                        break;
593                    };
594                    if height < Height::new(NUM_BLOCKS) {
595                        finished = false;
596                        break;
597                    }
598                }
599            }
600
601            // Return state
602            context.auditor().state()
603        })
604    }
605
606    #[test_traced("WARN")]
607    fn test_sync_height_floor() {
608        let runner = deterministic::Runner::new(
609            deterministic::Config::new()
610                .with_seed(0xFF)
611                .with_timeout(Some(Duration::from_secs(300))),
612        );
613        runner.start(|mut context| async move {
614            let mut oracle = setup_network(context.clone(), Some(3));
615            let Fixture {
616                participants,
617                schemes,
618                ..
619            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
620
621            // Initialize applications and actors
622            let mut applications = BTreeMap::new();
623            let mut actors = Vec::new();
624
625            // Register the initial peer set.
626            let mut manager = oracle.manager();
627            manager
628                .track(0, participants.clone().try_into().unwrap())
629                .await;
630            for (i, validator) in participants.iter().enumerate().skip(1) {
631                let (application, actor, _processed_height) = setup_validator(
632                    context.with_label(&format!("validator_{i}")),
633                    &mut oracle,
634                    validator.clone(),
635                    ConstantProvider::new(schemes[i].clone()),
636                )
637                .await;
638                applications.insert(validator.clone(), application);
639                actors.push(actor);
640            }
641
642            // Add links between all peers except for the first, to guarantee
643            // the first peer does not receive any blocks during broadcast.
644            setup_network_links(&mut oracle, &participants[1..], LINK).await;
645
646            // Generate blocks, skipping the genesis block.
647            let mut blocks = Vec::<B>::new();
648            let mut parent = Sha256::hash(b"");
649            for i in 1..=NUM_BLOCKS {
650                let block = make_block(parent, Height::new(i), i);
651                parent = block.digest();
652                blocks.push(block);
653            }
654
655            // Broadcast and finalize blocks
656            let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
657            for block in blocks.iter() {
658                // Skip genesis block
659                let height = block.height();
660                assert!(
661                    !height.is_zero(),
662                    "genesis block should not have been generated"
663                );
664
665                // Calculate the epoch and round for the block
666                let bounds = epocher.containing(height).unwrap();
667                let round = Round::new(bounds.epoch(), View::new(height.get()));
668
669                // Broadcast block by one validator
670                let actor_index: usize = (height.get() % (applications.len() as u64)) as usize;
671                let mut actor = actors[actor_index].clone();
672                actor.proposed(round, block.clone()).await;
673                actor.verified(round, block.clone()).await;
674
675                // Wait for the block to be broadcast, but due to jitter, we may or may not receive
676                // the block before continuing.
677                context.sleep(LINK.latency).await;
678
679                // Notarize block by the validator that broadcasted it
680                let proposal = Proposal {
681                    round,
682                    parent: View::new(height.previous().unwrap().get()),
683                    payload: block.digest(),
684                };
685                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
686                actor
687                    .report(Activity::Notarization(notarization.clone()))
688                    .await;
689
690                // Finalize block by all validators except for the first.
691                let fin = make_finalization(proposal, &schemes, QUORUM);
692                for actor in actors.iter_mut() {
693                    actor.report(Activity::Finalization(fin.clone())).await;
694                }
695            }
696
697            // Check that all applications (except for the first) received all blocks.
698            let mut finished = false;
699            while !finished {
700                // Avoid a busy loop
701                context.sleep(Duration::from_secs(1)).await;
702
703                // If not all validators have finished, try again
704                finished = true;
705                for app in applications.values().skip(1) {
706                    if app.blocks().len() != NUM_BLOCKS as usize {
707                        finished = false;
708                        break;
709                    }
710                    let Some((height, _)) = app.tip() else {
711                        finished = false;
712                        break;
713                    };
714                    if height < Height::new(NUM_BLOCKS) {
715                        finished = false;
716                        break;
717                    }
718                }
719            }
720
721            // Create the first validator now that all blocks have been finalized by the others.
722            let validator = participants.first().unwrap();
723            let (app, mut actor, _processed_height) = setup_validator(
724                context.with_label("validator_0"),
725                &mut oracle,
726                validator.clone(),
727                ConstantProvider::new(schemes[0].clone()),
728            )
729            .await;
730
731            // Add links between all peers, including the first.
732            setup_network_links(&mut oracle, &participants, LINK).await;
733
734            const NEW_SYNC_FLOOR: u64 = 100;
735            let second_actor = &mut actors[1];
736            let latest_finalization = second_actor
737                .get_finalization(Height::new(NUM_BLOCKS))
738                .await
739                .unwrap();
740
741            // Set the sync height floor of the first actor to block #100.
742            actor.set_floor(Height::new(NEW_SYNC_FLOOR)).await;
743
744            // Notify the first actor of the latest finalization to the first actor to trigger backfill.
745            // The sync should only reach the sync height floor.
746            actor
747                .report(Activity::Finalization(latest_finalization))
748                .await;
749
750            // Wait until the first actor has backfilled to the sync height floor.
751            let mut finished = false;
752            while !finished {
753                // Avoid a busy loop
754                context.sleep(Duration::from_secs(1)).await;
755
756                finished = true;
757                if app.blocks().len() != (NUM_BLOCKS - NEW_SYNC_FLOOR) as usize {
758                    finished = false;
759                    continue;
760                }
761                let Some((height, _)) = app.tip() else {
762                    finished = false;
763                    continue;
764                };
765                if height < Height::new(NUM_BLOCKS) {
766                    finished = false;
767                    continue;
768                }
769            }
770
771            // Check that the first actor has blocks from NEW_SYNC_FLOOR onward, but not before.
772            for height in 1..=NUM_BLOCKS {
773                let block = actor
774                    .get_block(Identifier::Height(Height::new(height)))
775                    .await;
776                if height <= NEW_SYNC_FLOOR {
777                    assert!(block.is_none());
778                } else {
779                    assert_eq!(block.unwrap().height(), Height::new(height));
780                }
781            }
782        })
783    }
784
785    #[test_traced("WARN")]
786    fn test_prune_finalized_archives() {
787        let runner = deterministic::Runner::new(
788            deterministic::Config::new().with_timeout(Some(Duration::from_secs(120))),
789        );
790        runner.start(|mut context| async move {
791            let oracle = setup_network(context.clone(), None);
792            let Fixture {
793                participants,
794                schemes,
795                ..
796            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
797
798            let validator = participants[0].clone();
799            let partition_prefix = format!("prune-test-{}", validator.clone());
800            let page_cache = CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE);
801            let control = oracle.control(validator.clone());
802
803            // Closure to initialize marshal with prunable archives
804            let init_marshal = |label: &str| {
805                let ctx = context.with_label(label);
806                let validator = validator.clone();
807                let schemes = schemes.clone();
808                let partition_prefix = partition_prefix.clone();
809                let page_cache = page_cache.clone();
810                let control = control.clone();
811                let oracle_manager = oracle.manager();
812                async move {
813                    let provider = ConstantProvider::new(schemes[0].clone());
814                    let config = Config {
815                        provider,
816                        epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
817                        mailbox_size: 100,
818                        view_retention_timeout: ViewDelta::new(10),
819                        max_repair: NZUsize!(10),
820                        block_codec_config: (),
821                        partition_prefix: partition_prefix.clone(),
822                        prunable_items_per_section: NZU64!(10),
823                        replay_buffer: NZUsize!(1024),
824                        key_write_buffer: NZUsize!(1024),
825                        value_write_buffer: NZUsize!(1024),
826                        page_cache: page_cache.clone(),
827                        strategy: Sequential,
828                    };
829
830                    // Create resolver
831                    let backfill = control.register(0, TEST_QUOTA).await.unwrap();
832                    let resolver_cfg = resolver::Config {
833                        public_key: validator.clone(),
834                        provider: oracle_manager,
835                        blocker: control.clone(),
836                        mailbox_size: config.mailbox_size,
837                        initial: Duration::from_secs(1),
838                        timeout: Duration::from_secs(2),
839                        fetch_retry_timeout: Duration::from_millis(100),
840                        priority_requests: false,
841                        priority_responses: false,
842                    };
843                    let resolver = resolver::init(&ctx, resolver_cfg, backfill);
844
845                    // Create buffered broadcast engine
846                    let broadcast_config = buffered::Config {
847                        public_key: validator.clone(),
848                        mailbox_size: config.mailbox_size,
849                        deque_size: 10,
850                        priority: false,
851                        codec_config: (),
852                    };
853                    let (broadcast_engine, buffer) =
854                        buffered::Engine::new(ctx.clone(), broadcast_config);
855                    let network = control.register(1, TEST_QUOTA).await.unwrap();
856                    broadcast_engine.start(network);
857
858                    // Initialize prunable archives
859                    let finalizations_by_height = prunable::Archive::init(
860                        ctx.with_label("finalizations_by_height"),
861                        prunable::Config {
862                            translator: EightCap,
863                            key_partition: format!(
864                                "{}-finalizations-by-height-key",
865                                partition_prefix
866                            ),
867                            key_page_cache: page_cache.clone(),
868                            value_partition: format!(
869                                "{}-finalizations-by-height-value",
870                                partition_prefix
871                            ),
872                            compression: None,
873                            codec_config: S::certificate_codec_config_unbounded(),
874                            items_per_section: NZU64!(10),
875                            key_write_buffer: config.key_write_buffer,
876                            value_write_buffer: config.value_write_buffer,
877                            replay_buffer: config.replay_buffer,
878                        },
879                    )
880                    .await
881                    .expect("failed to initialize finalizations by height archive");
882
883                    let finalized_blocks = prunable::Archive::init(
884                        ctx.with_label("finalized_blocks"),
885                        prunable::Config {
886                            translator: EightCap,
887                            key_partition: format!("{}-finalized-blocks-key", partition_prefix),
888                            key_page_cache: page_cache.clone(),
889                            value_partition: format!("{}-finalized-blocks-value", partition_prefix),
890                            compression: None,
891                            codec_config: config.block_codec_config,
892                            items_per_section: NZU64!(10),
893                            key_write_buffer: config.key_write_buffer,
894                            value_write_buffer: config.value_write_buffer,
895                            replay_buffer: config.replay_buffer,
896                        },
897                    )
898                    .await
899                    .expect("failed to initialize finalized blocks archive");
900
901                    let (actor, mailbox, _processed_height) = actor::Actor::init(
902                        ctx.clone(),
903                        finalizations_by_height,
904                        finalized_blocks,
905                        config,
906                    )
907                    .await;
908                    let application = Application::<B>::default();
909                    actor.start(application.clone(), buffer, resolver);
910
911                    (mailbox, application)
912                }
913            };
914
915            // Initial setup
916            let (mut mailbox, application) = init_marshal("init").await;
917
918            // Finalize blocks 1-20
919            let mut parent = Sha256::hash(b"");
920            let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
921            for i in 1..=20u64 {
922                let block = make_block(parent, Height::new(i), i);
923                let commitment = block.digest();
924                let bounds = epocher.containing(Height::new(i)).unwrap();
925                let round = Round::new(bounds.epoch(), View::new(i));
926
927                mailbox.verified(round, block.clone()).await;
928                let proposal = Proposal {
929                    round,
930                    parent: View::new(i - 1),
931                    payload: commitment,
932                };
933                let finalization = make_finalization(proposal, &schemes, QUORUM);
934                mailbox.report(Activity::Finalization(finalization)).await;
935
936                parent = commitment;
937            }
938
939            // Wait for application to process all blocks
940            // After this, last_processed_height will be 20
941            while application.blocks().len() < 20 {
942                context.sleep(Duration::from_millis(10)).await;
943            }
944
945            // Verify all blocks are accessible before pruning
946            for i in 1..=20u64 {
947                assert!(
948                    mailbox.get_block(Height::new(i)).await.is_some(),
949                    "block {i} should exist before pruning"
950                );
951                assert!(
952                    mailbox.get_finalization(Height::new(i)).await.is_some(),
953                    "finalization {i} should exist before pruning"
954                );
955            }
956
957            // All blocks should still be accessible (prune was ignored)
958            mailbox.prune(Height::new(25)).await;
959            context.sleep(Duration::from_millis(50)).await;
960            for i in 1..=20u64 {
961                assert!(
962                    mailbox.get_block(Height::new(i)).await.is_some(),
963                    "block {i} should still exist after pruning above floor"
964                );
965            }
966
967            // Pruning at height 10 should prune blocks below 10 (heights 1-9)
968            mailbox.prune(Height::new(10)).await;
969            context.sleep(Duration::from_millis(100)).await;
970            for i in 1..10u64 {
971                assert!(
972                    mailbox.get_block(Height::new(i)).await.is_none(),
973                    "block {i} should be pruned"
974                );
975                assert!(
976                    mailbox.get_finalization(Height::new(i)).await.is_none(),
977                    "finalization {i} should be pruned"
978                );
979            }
980
981            // Blocks at or above prune height (10-20) should still be accessible
982            for i in 10..=20u64 {
983                assert!(
984                    mailbox.get_block(Height::new(i)).await.is_some(),
985                    "block {i} should still exist after pruning"
986                );
987                assert!(
988                    mailbox.get_finalization(Height::new(i)).await.is_some(),
989                    "finalization {i} should still exist after pruning"
990                );
991            }
992
993            // Pruning at height 20 should prune blocks 10-19
994            mailbox.prune(Height::new(20)).await;
995            context.sleep(Duration::from_millis(100)).await;
996            for i in 10..20u64 {
997                assert!(
998                    mailbox.get_block(Height::new(i)).await.is_none(),
999                    "block {i} should be pruned after second prune"
1000                );
1001                assert!(
1002                    mailbox.get_finalization(Height::new(i)).await.is_none(),
1003                    "finalization {i} should be pruned after second prune"
1004                );
1005            }
1006
1007            // Block 20 should still be accessible
1008            assert!(
1009                mailbox.get_block(Height::new(20)).await.is_some(),
1010                "block 20 should still exist"
1011            );
1012            assert!(
1013                mailbox.get_finalization(Height::new(20)).await.is_some(),
1014                "finalization 20 should still exist"
1015            );
1016
1017            // Restart to verify pruning persisted to storage (not just in-memory)
1018            drop(mailbox);
1019            let (mut mailbox, _application) = init_marshal("restart").await;
1020
1021            // Verify blocks 1-19 are still pruned after restart
1022            for i in 1..20u64 {
1023                assert!(
1024                    mailbox.get_block(Height::new(i)).await.is_none(),
1025                    "block {i} should still be pruned after restart"
1026                );
1027                assert!(
1028                    mailbox.get_finalization(Height::new(i)).await.is_none(),
1029                    "finalization {i} should still be pruned after restart"
1030                );
1031            }
1032
1033            // Verify block 20 persisted correctly after restart
1034            assert!(
1035                mailbox.get_block(Height::new(20)).await.is_some(),
1036                "block 20 should still exist after restart"
1037            );
1038            assert!(
1039                mailbox.get_finalization(Height::new(20)).await.is_some(),
1040                "finalization 20 should still exist after restart"
1041            );
1042        })
1043    }
1044
1045    #[test_traced("WARN")]
1046    fn test_subscribe_basic_block_delivery() {
1047        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1048        runner.start(|mut context| async move {
1049            let mut oracle = setup_network(context.clone(), None);
1050            let Fixture {
1051                participants,
1052                schemes,
1053                ..
1054            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1055
1056            let mut actors = Vec::new();
1057            for (i, validator) in participants.iter().enumerate() {
1058                let (_application, actor, _processed_height) = setup_validator(
1059                    context.with_label(&format!("validator_{i}")),
1060                    &mut oracle,
1061                    validator.clone(),
1062                    ConstantProvider::new(schemes[i].clone()),
1063                )
1064                .await;
1065                actors.push(actor);
1066            }
1067            let mut actor = actors[0].clone();
1068
1069            setup_network_links(&mut oracle, &participants, LINK).await;
1070
1071            let parent = Sha256::hash(b"");
1072            let block = make_block(parent, Height::new(1), 1);
1073            let commitment = block.digest();
1074
1075            let subscription_rx = actor
1076                .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment)
1077                .await;
1078
1079            actor
1080                .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
1081                .await;
1082
1083            let proposal = Proposal {
1084                round: Round::new(Epoch::new(0), View::new(1)),
1085                parent: View::new(0),
1086                payload: commitment,
1087            };
1088            let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
1089            actor.report(Activity::Notarization(notarization)).await;
1090
1091            let finalization = make_finalization(proposal, &schemes, QUORUM);
1092            actor.report(Activity::Finalization(finalization)).await;
1093
1094            let received_block = subscription_rx.await.unwrap();
1095            assert_eq!(received_block.digest(), block.digest());
1096            assert_eq!(received_block.height(), Height::new(1));
1097        })
1098    }
1099
1100    #[test_traced("WARN")]
1101    fn test_subscribe_multiple_subscriptions() {
1102        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1103        runner.start(|mut context| async move {
1104            let mut oracle = setup_network(context.clone(), None);
1105            let Fixture {
1106                participants,
1107                schemes,
1108                ..
1109            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1110
1111            let mut actors = Vec::new();
1112            for (i, validator) in participants.iter().enumerate() {
1113                let (_application, actor, _processed_height) = setup_validator(
1114                    context.with_label(&format!("validator_{i}")),
1115                    &mut oracle,
1116                    validator.clone(),
1117                    ConstantProvider::new(schemes[i].clone()),
1118                )
1119                .await;
1120                actors.push(actor);
1121            }
1122            let mut actor = actors[0].clone();
1123
1124            setup_network_links(&mut oracle, &participants, LINK).await;
1125
1126            let parent = Sha256::hash(b"");
1127            let block1 = make_block(parent, Height::new(1), 1);
1128            let block2 = make_block(block1.digest(), Height::new(2), 2);
1129            let commitment1 = block1.digest();
1130            let commitment2 = block2.digest();
1131
1132            let sub1_rx = actor
1133                .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
1134                .await;
1135            let sub2_rx = actor
1136                .subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
1137                .await;
1138            let sub3_rx = actor
1139                .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
1140                .await;
1141
1142            actor
1143                .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
1144                .await;
1145            actor
1146                .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1147                .await;
1148
1149            for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
1150                let view = View::new(view);
1151                let proposal = Proposal {
1152                    round: Round::new(Epoch::zero(), view),
1153                    parent: view.previous().unwrap(),
1154                    payload: block.digest(),
1155                };
1156                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
1157                actor.report(Activity::Notarization(notarization)).await;
1158
1159                let finalization = make_finalization(proposal, &schemes, QUORUM);
1160                actor.report(Activity::Finalization(finalization)).await;
1161            }
1162
1163            let received1_sub1 = sub1_rx.await.unwrap();
1164            let received2 = sub2_rx.await.unwrap();
1165            let received1_sub3 = sub3_rx.await.unwrap();
1166
1167            assert_eq!(received1_sub1.digest(), block1.digest());
1168            assert_eq!(received2.digest(), block2.digest());
1169            assert_eq!(received1_sub3.digest(), block1.digest());
1170            assert_eq!(received1_sub1.height(), Height::new(1));
1171            assert_eq!(received2.height(), Height::new(2));
1172            assert_eq!(received1_sub3.height(), Height::new(1));
1173        })
1174    }
1175
1176    #[test_traced("WARN")]
1177    fn test_subscribe_canceled_subscriptions() {
1178        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1179        runner.start(|mut context| async move {
1180            let mut oracle = setup_network(context.clone(), None);
1181            let Fixture {
1182                participants,
1183                schemes,
1184                ..
1185            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1186
1187            let mut actors = Vec::new();
1188            for (i, validator) in participants.iter().enumerate() {
1189                let (_application, actor, _processed_height) = setup_validator(
1190                    context.with_label(&format!("validator_{i}")),
1191                    &mut oracle,
1192                    validator.clone(),
1193                    ConstantProvider::new(schemes[i].clone()),
1194                )
1195                .await;
1196                actors.push(actor);
1197            }
1198            let mut actor = actors[0].clone();
1199
1200            setup_network_links(&mut oracle, &participants, LINK).await;
1201
1202            let parent = Sha256::hash(b"");
1203            let block1 = make_block(parent, Height::new(1), 1);
1204            let block2 = make_block(block1.digest(), Height::new(2), 2);
1205            let commitment1 = block1.digest();
1206            let commitment2 = block2.digest();
1207
1208            let sub1_rx = actor
1209                .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
1210                .await;
1211            let sub2_rx = actor
1212                .subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
1213                .await;
1214
1215            drop(sub1_rx);
1216
1217            actor
1218                .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
1219                .await;
1220            actor
1221                .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1222                .await;
1223
1224            for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
1225                let view = View::new(view);
1226                let proposal = Proposal {
1227                    round: Round::new(Epoch::zero(), view),
1228                    parent: view.previous().unwrap(),
1229                    payload: block.digest(),
1230                };
1231                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
1232                actor.report(Activity::Notarization(notarization)).await;
1233
1234                let finalization = make_finalization(proposal, &schemes, QUORUM);
1235                actor.report(Activity::Finalization(finalization)).await;
1236            }
1237
1238            let received2 = sub2_rx.await.unwrap();
1239            assert_eq!(received2.digest(), block2.digest());
1240            assert_eq!(received2.height(), Height::new(2));
1241        })
1242    }
1243
1244    #[test_traced("WARN")]
1245    fn test_subscribe_blocks_from_different_sources() {
1246        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1247        runner.start(|mut context| async move {
1248            let mut oracle = setup_network(context.clone(), None);
1249            let Fixture {
1250                participants,
1251                schemes,
1252                ..
1253            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1254
1255            let mut actors = Vec::new();
1256            for (i, validator) in participants.iter().enumerate() {
1257                let (_application, actor, _processed_height) = setup_validator(
1258                    context.with_label(&format!("validator_{i}")),
1259                    &mut oracle,
1260                    validator.clone(),
1261                    ConstantProvider::new(schemes[i].clone()),
1262                )
1263                .await;
1264                actors.push(actor);
1265            }
1266            let mut actor = actors[0].clone();
1267
1268            setup_network_links(&mut oracle, &participants, LINK).await;
1269
1270            let parent = Sha256::hash(b"");
1271            let block1 = make_block(parent, Height::new(1), 1);
1272            let block2 = make_block(block1.digest(), Height::new(2), 2);
1273            let block3 = make_block(block2.digest(), Height::new(3), 3);
1274            let block4 = make_block(block3.digest(), Height::new(4), 4);
1275            let block5 = make_block(block4.digest(), Height::new(5), 5);
1276
1277            let sub1_rx = actor.subscribe(None, block1.digest()).await;
1278            let sub2_rx = actor.subscribe(None, block2.digest()).await;
1279            let sub3_rx = actor.subscribe(None, block3.digest()).await;
1280            let sub4_rx = actor.subscribe(None, block4.digest()).await;
1281            let sub5_rx = actor.subscribe(None, block5.digest()).await;
1282
1283            // Block1: Broadcasted by the actor
1284            actor
1285                .proposed(Round::new(Epoch::zero(), View::new(1)), block1.clone())
1286                .await;
1287            context.sleep(Duration::from_millis(20)).await;
1288
1289            // Block1: delivered
1290            let received1 = sub1_rx.await.unwrap();
1291            assert_eq!(received1.digest(), block1.digest());
1292            assert_eq!(received1.height(), Height::new(1));
1293
1294            // Block2: Verified by the actor
1295            actor
1296                .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1297                .await;
1298
1299            // Block2: delivered
1300            let received2 = sub2_rx.await.unwrap();
1301            assert_eq!(received2.digest(), block2.digest());
1302            assert_eq!(received2.height(), Height::new(2));
1303
1304            // Block3: Notarized by the actor
1305            let proposal3 = Proposal {
1306                round: Round::new(Epoch::new(0), View::new(3)),
1307                parent: View::new(2),
1308                payload: block3.digest(),
1309            };
1310            let notarization3 = make_notarization(proposal3.clone(), &schemes, QUORUM);
1311            actor.report(Activity::Notarization(notarization3)).await;
1312            actor
1313                .verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
1314                .await;
1315
1316            // Block3: delivered
1317            let received3 = sub3_rx.await.unwrap();
1318            assert_eq!(received3.digest(), block3.digest());
1319            assert_eq!(received3.height(), Height::new(3));
1320
1321            // Block4: Finalized by the actor
1322            let finalization4 = make_finalization(
1323                Proposal {
1324                    round: Round::new(Epoch::new(0), View::new(4)),
1325                    parent: View::new(3),
1326                    payload: block4.digest(),
1327                },
1328                &schemes,
1329                QUORUM,
1330            );
1331            actor.report(Activity::Finalization(finalization4)).await;
1332            actor
1333                .verified(Round::new(Epoch::new(0), View::new(4)), block4.clone())
1334                .await;
1335
1336            // Block4: delivered
1337            let received4 = sub4_rx.await.unwrap();
1338            assert_eq!(received4.digest(), block4.digest());
1339            assert_eq!(received4.height(), Height::new(4));
1340
1341            // Block5: Broadcasted by a remote node (different actor)
1342            let remote_actor = &mut actors[1].clone();
1343            remote_actor
1344                .proposed(Round::new(Epoch::zero(), View::new(5)), block5.clone())
1345                .await;
1346            context.sleep(Duration::from_millis(20)).await;
1347
1348            // Block5: delivered
1349            let received5 = sub5_rx.await.unwrap();
1350            assert_eq!(received5.digest(), block5.digest());
1351            assert_eq!(received5.height(), Height::new(5));
1352        })
1353    }
1354
1355    #[test_traced("WARN")]
1356    fn test_get_info_basic_queries_present_and_missing() {
1357        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1358        runner.start(|mut context| async move {
1359            let mut oracle = setup_network(context.clone(), None);
1360            let Fixture {
1361                participants,
1362                schemes,
1363                ..
1364            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1365
1366            // Single validator actor
1367            let me = participants[0].clone();
1368            let (_application, mut actor, _processed_height) = setup_validator(
1369                context.with_label("validator_0"),
1370                &mut oracle,
1371                me,
1372                ConstantProvider::new(schemes[0].clone()),
1373            )
1374            .await;
1375
1376            // Initially, no latest
1377            assert!(actor.get_info(Identifier::Latest).await.is_none());
1378
1379            // Before finalization, specific height returns None
1380            assert!(actor.get_info(Height::new(1)).await.is_none());
1381
1382            // Create and verify a block, then finalize it
1383            let parent = Sha256::hash(b"");
1384            let block = make_block(parent, Height::new(1), 1);
1385            let digest = block.digest();
1386            let round = Round::new(Epoch::new(0), View::new(1));
1387            actor.verified(round, block.clone()).await;
1388
1389            let proposal = Proposal {
1390                round,
1391                parent: View::new(0),
1392                payload: digest,
1393            };
1394            let finalization = make_finalization(proposal, &schemes, QUORUM);
1395            actor.report(Activity::Finalization(finalization)).await;
1396
1397            // Latest should now be the finalized block
1398            assert_eq!(
1399                actor.get_info(Identifier::Latest).await,
1400                Some((Height::new(1), digest))
1401            );
1402
1403            // Height 1 now present
1404            assert_eq!(
1405                actor.get_info(Height::new(1)).await,
1406                Some((Height::new(1), digest))
1407            );
1408
1409            // Commitment should map to its height
1410            assert_eq!(
1411                actor.get_info(&digest).await,
1412                Some((Height::new(1), digest))
1413            );
1414
1415            // Missing height
1416            assert!(actor.get_info(Height::new(2)).await.is_none());
1417
1418            // Missing commitment
1419            let missing = Sha256::hash(b"missing");
1420            assert!(actor.get_info(&missing).await.is_none());
1421        })
1422    }
1423
1424    #[test_traced("WARN")]
1425    fn test_get_info_latest_progression_multiple_finalizations() {
1426        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1427        runner.start(|mut context| async move {
1428            let mut oracle = setup_network(context.clone(), None);
1429            let Fixture {
1430                participants,
1431                schemes,
1432                ..
1433            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1434
1435            // Single validator actor
1436            let me = participants[0].clone();
1437            let (_application, mut actor, _processed_height) = setup_validator(
1438                context.with_label("validator_0"),
1439                &mut oracle,
1440                me,
1441                ConstantProvider::new(schemes[0].clone()),
1442            )
1443            .await;
1444
1445            // Initially none
1446            assert!(actor.get_info(Identifier::Latest).await.is_none());
1447
1448            // Build and finalize heights 1..=3
1449            let parent0 = Sha256::hash(b"");
1450            let block1 = make_block(parent0, Height::new(1), 1);
1451            let d1 = block1.digest();
1452            actor
1453                .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
1454                .await;
1455            let f1 = make_finalization(
1456                Proposal {
1457                    round: Round::new(Epoch::new(0), View::new(1)),
1458                    parent: View::new(0),
1459                    payload: d1,
1460                },
1461                &schemes,
1462                QUORUM,
1463            );
1464            actor.report(Activity::Finalization(f1)).await;
1465            let latest = actor.get_info(Identifier::Latest).await;
1466            assert_eq!(latest, Some((Height::new(1), d1)));
1467
1468            let block2 = make_block(d1, Height::new(2), 2);
1469            let d2 = block2.digest();
1470            actor
1471                .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1472                .await;
1473            let f2 = make_finalization(
1474                Proposal {
1475                    round: Round::new(Epoch::new(0), View::new(2)),
1476                    parent: View::new(1),
1477                    payload: d2,
1478                },
1479                &schemes,
1480                QUORUM,
1481            );
1482            actor.report(Activity::Finalization(f2)).await;
1483            let latest = actor.get_info(Identifier::Latest).await;
1484            assert_eq!(latest, Some((Height::new(2), d2)));
1485
1486            let block3 = make_block(d2, Height::new(3), 3);
1487            let d3 = block3.digest();
1488            actor
1489                .verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
1490                .await;
1491            let f3 = make_finalization(
1492                Proposal {
1493                    round: Round::new(Epoch::new(0), View::new(3)),
1494                    parent: View::new(2),
1495                    payload: d3,
1496                },
1497                &schemes,
1498                QUORUM,
1499            );
1500            actor.report(Activity::Finalization(f3)).await;
1501            let latest = actor.get_info(Identifier::Latest).await;
1502            assert_eq!(latest, Some((Height::new(3), d3)));
1503        })
1504    }
1505
1506    #[test_traced("WARN")]
1507    fn test_get_block_by_height_and_latest() {
1508        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1509        runner.start(|mut context| async move {
1510            let mut oracle = setup_network(context.clone(), None);
1511            let Fixture {
1512                participants,
1513                schemes,
1514                ..
1515            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1516
1517            let me = participants[0].clone();
1518            let (application, mut actor, _processed_height) = setup_validator(
1519                context.with_label("validator_0"),
1520                &mut oracle,
1521                me,
1522                ConstantProvider::new(schemes[0].clone()),
1523            )
1524            .await;
1525
1526            // Before any finalization, GetBlock::Latest should be None
1527            let latest_block = actor.get_block(Identifier::Latest).await;
1528            assert!(latest_block.is_none());
1529            assert!(application.tip().is_none());
1530
1531            // Finalize a block at height 1
1532            let parent = Sha256::hash(b"");
1533            let block = make_block(parent, Height::new(1), 1);
1534            let commitment = block.digest();
1535            let round = Round::new(Epoch::new(0), View::new(1));
1536            actor.verified(round, block.clone()).await;
1537            let proposal = Proposal {
1538                round,
1539                parent: View::new(0),
1540                payload: commitment,
1541            };
1542            let finalization = make_finalization(proposal, &schemes, QUORUM);
1543            actor.report(Activity::Finalization(finalization)).await;
1544
1545            // Get by height
1546            let by_height = actor
1547                .get_block(Height::new(1))
1548                .await
1549                .expect("missing block by height");
1550            assert_eq!(by_height.height(), Height::new(1));
1551            assert_eq!(by_height.digest(), commitment);
1552            assert_eq!(application.tip(), Some((Height::new(1), commitment)));
1553
1554            // Get by latest
1555            let by_latest = actor
1556                .get_block(Identifier::Latest)
1557                .await
1558                .expect("missing block by latest");
1559            assert_eq!(by_latest.height(), Height::new(1));
1560            assert_eq!(by_latest.digest(), commitment);
1561
1562            // Missing height
1563            let by_height = actor.get_block(Height::new(2)).await;
1564            assert!(by_height.is_none());
1565        })
1566    }
1567
1568    #[test_traced("WARN")]
1569    fn test_get_block_by_commitment_from_sources_and_missing() {
1570        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1571        runner.start(|mut context| async move {
1572            let mut oracle = setup_network(context.clone(), None);
1573            let Fixture {
1574                participants,
1575                schemes,
1576                ..
1577            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1578
1579            let me = participants[0].clone();
1580            let (_application, mut actor, _processed_height) = setup_validator(
1581                context.with_label("validator_0"),
1582                &mut oracle,
1583                me,
1584                ConstantProvider::new(schemes[0].clone()),
1585            )
1586            .await;
1587
1588            // 1) From cache via verified
1589            let parent = Sha256::hash(b"");
1590            let ver_block = make_block(parent, Height::new(1), 1);
1591            let ver_commitment = ver_block.digest();
1592            let round1 = Round::new(Epoch::new(0), View::new(1));
1593            actor.verified(round1, ver_block.clone()).await;
1594            let got = actor
1595                .get_block(&ver_commitment)
1596                .await
1597                .expect("missing block from cache");
1598            assert_eq!(got.digest(), ver_commitment);
1599
1600            // 2) From finalized archive
1601            let fin_block = make_block(ver_commitment, Height::new(2), 2);
1602            let fin_commitment = fin_block.digest();
1603            let round2 = Round::new(Epoch::new(0), View::new(2));
1604            actor.verified(round2, fin_block.clone()).await;
1605            let proposal = Proposal {
1606                round: round2,
1607                parent: View::new(1),
1608                payload: fin_commitment,
1609            };
1610            let finalization = make_finalization(proposal, &schemes, QUORUM);
1611            actor.report(Activity::Finalization(finalization)).await;
1612            let got = actor
1613                .get_block(&fin_commitment)
1614                .await
1615                .expect("missing block from finalized archive");
1616            assert_eq!(got.digest(), fin_commitment);
1617            assert_eq!(got.height(), Height::new(2));
1618
1619            // 3) Missing commitment
1620            let missing = Sha256::hash(b"definitely-missing");
1621            let missing_block = actor.get_block(&missing).await;
1622            assert!(missing_block.is_none());
1623        })
1624    }
1625
1626    #[test_traced("WARN")]
1627    fn test_get_finalization_by_height() {
1628        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1629        runner.start(|mut context| async move {
1630            let mut oracle = setup_network(context.clone(), None);
1631            let Fixture {
1632                participants,
1633                schemes,
1634                ..
1635            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1636
1637            let me = participants[0].clone();
1638            let (_application, mut actor, _processed_height) = setup_validator(
1639                context.with_label("validator_0"),
1640                &mut oracle,
1641                me,
1642                ConstantProvider::new(schemes[0].clone()),
1643            )
1644            .await;
1645
1646            // Before any finalization, get_finalization should be None
1647            let finalization = actor.get_finalization(Height::new(1)).await;
1648            assert!(finalization.is_none());
1649
1650            // Finalize a block at height 1
1651            let parent = Sha256::hash(b"");
1652            let block = make_block(parent, Height::new(1), 1);
1653            let commitment = block.digest();
1654            let round = Round::new(Epoch::new(0), View::new(1));
1655            actor.verified(round, block.clone()).await;
1656            let proposal = Proposal {
1657                round,
1658                parent: View::new(0),
1659                payload: commitment,
1660            };
1661            let finalization = make_finalization(proposal, &schemes, QUORUM);
1662            actor.report(Activity::Finalization(finalization)).await;
1663
1664            // Get finalization by height
1665            let finalization = actor
1666                .get_finalization(Height::new(1))
1667                .await
1668                .expect("missing finalization by height");
1669            assert_eq!(finalization.proposal.parent, View::new(0));
1670            assert_eq!(
1671                finalization.proposal.round,
1672                Round::new(Epoch::new(0), View::new(1))
1673            );
1674            assert_eq!(finalization.proposal.payload, commitment);
1675
1676            assert!(actor.get_finalization(Height::new(2)).await.is_none());
1677        })
1678    }
1679
1680    #[test_traced("WARN")]
1681    fn test_hint_finalized_triggers_fetch() {
1682        let runner = deterministic::Runner::new(
1683            deterministic::Config::new()
1684                .with_seed(42)
1685                .with_timeout(Some(Duration::from_secs(60))),
1686        );
1687        runner.start(|mut context| async move {
1688            let mut oracle = setup_network(context.clone(), Some(3));
1689            let Fixture {
1690                participants,
1691                schemes,
1692                ..
1693            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1694
1695            // Register the initial peer set
1696            let mut manager = oracle.manager();
1697            manager
1698                .track(0, participants.clone().try_into().unwrap())
1699                .await;
1700
1701            // Set up two validators
1702            let (app0, mut actor0, _) = setup_validator(
1703                context.with_label("validator_0"),
1704                &mut oracle,
1705                participants[0].clone(),
1706                ConstantProvider::new(schemes[0].clone()),
1707            )
1708            .await;
1709            let (_app1, mut actor1, _) = setup_validator(
1710                context.with_label("validator_1"),
1711                &mut oracle,
1712                participants[1].clone(),
1713                ConstantProvider::new(schemes[1].clone()),
1714            )
1715            .await;
1716
1717            // Add links between validators
1718            setup_network_links(&mut oracle, &participants[..2], LINK).await;
1719
1720            // Validator 0: Create and finalize blocks 1-5
1721            let mut parent = Sha256::hash(b"");
1722            for i in 1..=5u64 {
1723                let block = make_block(parent, Height::new(i), i);
1724                let commitment = block.digest();
1725                let round = Round::new(Epoch::new(0), View::new(i));
1726
1727                actor0.verified(round, block.clone()).await;
1728                let proposal = Proposal {
1729                    round,
1730                    parent: View::new(i - 1),
1731                    payload: commitment,
1732                };
1733                let finalization = make_finalization(proposal, &schemes, QUORUM);
1734                actor0.report(Activity::Finalization(finalization)).await;
1735
1736                parent = commitment;
1737            }
1738
1739            // Wait for validator 0 to process all blocks
1740            while app0.blocks().len() < 5 {
1741                context.sleep(Duration::from_millis(10)).await;
1742            }
1743
1744            // Validator 1 should not have block 5 yet
1745            assert!(actor1.get_finalization(Height::new(5)).await.is_none());
1746
1747            // Validator 1: hint that block 5 is finalized, targeting validator 0
1748            actor1
1749                .hint_finalized(Height::new(5), NonEmptyVec::new(participants[0].clone()))
1750                .await;
1751
1752            // Wait for the fetch to complete
1753            while actor1.get_finalization(Height::new(5)).await.is_none() {
1754                context.sleep(Duration::from_millis(10)).await;
1755            }
1756
1757            // Verify validator 1 now has the finalization
1758            let finalization = actor1
1759                .get_finalization(Height::new(5))
1760                .await
1761                .expect("finalization should be fetched");
1762            assert_eq!(finalization.proposal.round.view(), View::new(5));
1763        })
1764    }
1765
1766    #[test_traced("WARN")]
1767    fn test_ancestry_stream() {
1768        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1769        runner.start(|mut context| async move {
1770            let mut oracle = setup_network(context.clone(), None);
1771            let Fixture {
1772                participants,
1773                schemes,
1774                ..
1775            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1776
1777            let me = participants[0].clone();
1778            let (_application, mut actor, _processed_height) = setup_validator(
1779                context.with_label("validator_0"),
1780                &mut oracle,
1781                me,
1782                ConstantProvider::new(schemes[0].clone()),
1783            )
1784            .await;
1785
1786            // Finalize blocks at heights 1-5
1787            let mut parent = Sha256::hash(b"");
1788            for i in 1..=5 {
1789                let block = make_block(parent, Height::new(i), i);
1790                let commitment = block.digest();
1791                let round = Round::new(Epoch::new(0), View::new(i));
1792                actor.verified(round, block.clone()).await;
1793                let proposal = Proposal {
1794                    round,
1795                    parent: View::new(i - 1),
1796                    payload: commitment,
1797                };
1798                let finalization = make_finalization(proposal, &schemes, QUORUM);
1799                actor.report(Activity::Finalization(finalization)).await;
1800
1801                parent = block.digest();
1802            }
1803
1804            // Stream from latest -> height 1
1805            let (_, commitment) = actor.get_info(Identifier::Latest).await.unwrap();
1806            let ancestry = actor.ancestry((None, commitment)).await.unwrap();
1807            let blocks = ancestry.collect::<Vec<_>>().await;
1808
1809            // Ensure correct delivery order: 5,4,3,2,1
1810            assert_eq!(blocks.len(), 5);
1811            (0..5).for_each(|i| {
1812                assert_eq!(blocks[i].height(), Height::new(5 - i as u64));
1813            });
1814        })
1815    }
1816
1817    #[test_traced("WARN")]
1818    fn test_marshaled_rejects_invalid_ancestry() {
1819        #[derive(Clone)]
1820        struct MockVerifyingApp {
1821            genesis: B,
1822        }
1823
1824        impl crate::Application<deterministic::Context> for MockVerifyingApp {
1825            type Block = B;
1826            type Context = Context<D, K>;
1827            type SigningScheme = S;
1828
1829            async fn genesis(&mut self) -> Self::Block {
1830                self.genesis.clone()
1831            }
1832
1833            async fn propose(
1834                &mut self,
1835                _context: (deterministic::Context, Self::Context),
1836                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1837            ) -> Option<Self::Block> {
1838                None
1839            }
1840        }
1841
1842        impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
1843            async fn verify(
1844                &mut self,
1845                _context: (deterministic::Context, Self::Context),
1846                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1847            ) -> bool {
1848                // Ancestry verification occurs entirely in `Marshaled`.
1849                true
1850            }
1851        }
1852
1853        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1854        runner.start(|mut context| async move {
1855            let mut oracle = setup_network(context.clone(), None);
1856            let Fixture {
1857                participants,
1858                schemes,
1859                ..
1860            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1861
1862            let me = participants[0].clone();
1863            let (_base_app, marshal, _processed_height) = setup_validator(
1864                context.with_label("validator_0"),
1865                &mut oracle,
1866                me.clone(),
1867                ConstantProvider::new(schemes[0].clone()),
1868            )
1869            .await;
1870
1871            // Create genesis block
1872            let genesis = make_block(Sha256::hash(b""), Height::zero(), 0);
1873
1874            // Wrap with Marshaled verifier
1875            let mock_app = MockVerifyingApp {
1876                genesis: genesis.clone(),
1877            };
1878            let mut marshaled = Marshaled::new(
1879                context.clone(),
1880                mock_app,
1881                marshal.clone(),
1882                FixedEpocher::new(BLOCKS_PER_EPOCH),
1883            );
1884
1885            // Test case 1: Non-contiguous height
1886            //
1887            // We need both blocks in the same epoch.
1888            // With BLOCKS_PER_EPOCH=20: epoch 0 is heights 0-19, epoch 1 is heights 20-39
1889            //
1890            // Store honest parent at height 21 (epoch 1)
1891            let honest_parent = make_block(
1892                genesis.commitment(),
1893                Height::new(BLOCKS_PER_EPOCH.get() + 1),
1894                1000,
1895            );
1896            let parent_commitment = honest_parent.commitment();
1897            let parent_round = Round::new(Epoch::new(1), View::new(21));
1898            marshal
1899                .clone()
1900                .verified(parent_round, honest_parent.clone())
1901                .await;
1902
1903            // Byzantine proposer broadcasts malicious block at height 35
1904            // In reality this would come via buffered broadcast, but for test simplicity
1905            // we call broadcast() directly which makes it available for subscription
1906            let malicious_block = make_block(
1907                parent_commitment,
1908                Height::new(BLOCKS_PER_EPOCH.get() + 15),
1909                2000,
1910            );
1911            let malicious_commitment = malicious_block.commitment();
1912            marshal
1913                .clone()
1914                .proposed(
1915                    Round::new(Epoch::new(1), View::new(35)),
1916                    malicious_block.clone(),
1917                )
1918                .await;
1919
1920            // Small delay to ensure broadcast is processed
1921            context.sleep(Duration::from_millis(10)).await;
1922
1923            // Consensus determines parent should be block at height 21
1924            // and calls verify on the Marshaled automaton with a block at height 35
1925            let byzantine_round = Round::new(Epoch::new(1), View::new(35));
1926            let byzantine_context = Context {
1927                round: byzantine_round,
1928                leader: me.clone(),
1929                parent: (View::new(21), parent_commitment), // Consensus says parent is at height 21
1930            };
1931
1932            // Marshaled.certify() should reject the malicious block
1933            // The Marshaled verifier will:
1934            // 1. Fetch honest_parent (height 21) from marshal based on context.parent
1935            // 2. Fetch malicious_block (height 35) from marshal based on digest
1936            // 3. Validate height is contiguous (fail)
1937            // 4. Return false
1938            let _ = marshaled
1939                .verify(byzantine_context, malicious_commitment)
1940                .await
1941                .await;
1942            let verify = marshaled
1943                .certify(byzantine_round, malicious_commitment)
1944                .await;
1945
1946            assert!(
1947                !verify.await.unwrap(),
1948                "Byzantine block with non-contiguous heights should be rejected"
1949            );
1950
1951            // Test case 2: Mismatched parent commitment
1952            //
1953            // Create another malicious block with correct height but invalid parent commitment
1954            let malicious_block = make_block(
1955                genesis.commitment(),
1956                Height::new(BLOCKS_PER_EPOCH.get() + 2),
1957                3000,
1958            );
1959            let malicious_commitment = malicious_block.commitment();
1960            marshal
1961                .clone()
1962                .proposed(
1963                    Round::new(Epoch::new(1), View::new(22)),
1964                    malicious_block.clone(),
1965                )
1966                .await;
1967
1968            // Small delay to ensure broadcast is processed
1969            context.sleep(Duration::from_millis(10)).await;
1970
1971            // Consensus determines parent should be block at height 21
1972            // and calls verify on the Marshaled automaton with a block at height 22
1973            let byzantine_round = Round::new(Epoch::new(1), View::new(22));
1974            let byzantine_context = Context {
1975                round: byzantine_round,
1976                leader: me.clone(),
1977                parent: (View::new(21), parent_commitment), // Consensus says parent is at height 21
1978            };
1979
1980            // Marshaled.certify() should reject the malicious block
1981            // The Marshaled verifier will:
1982            // 1. Fetch honest_parent (height 21) from marshal based on context.parent
1983            // 2. Fetch malicious_block (height 22) from marshal based on digest
1984            // 3. Validate height is contiguous
1985            // 3. Validate parent commitment matches (fail)
1986            // 4. Return false
1987            let _ = marshaled
1988                .verify(byzantine_context, malicious_commitment)
1989                .await
1990                .await;
1991            let verify = marshaled
1992                .certify(byzantine_round, malicious_commitment)
1993                .await;
1994
1995            assert!(
1996                !verify.await.unwrap(),
1997                "Byzantine block with mismatched parent commitment should be rejected"
1998            );
1999        })
2000    }
2001
2002    #[test_traced("WARN")]
2003    fn test_finalize_same_height_different_views() {
2004        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2005        runner.start(|mut context| async move {
2006            let mut oracle = setup_network(context.clone(), None);
2007            let Fixture {
2008                participants,
2009                schemes,
2010                ..
2011            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2012
2013            // Set up two validators
2014            let mut actors = Vec::new();
2015            for (i, validator) in participants.iter().enumerate().take(2) {
2016                let (_app, actor, _processed_height) = setup_validator(
2017                    context.with_label(&format!("validator_{i}")),
2018                    &mut oracle,
2019                    validator.clone(),
2020                    ConstantProvider::new(schemes[i].clone()),
2021                )
2022                .await;
2023                actors.push(actor);
2024            }
2025
2026            // Create block at height 1
2027            let parent = Sha256::hash(b"");
2028            let block = make_block(parent, Height::new(1), 1);
2029            let commitment = block.digest();
2030
2031            // Both validators verify the block
2032            actors[0]
2033                .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
2034                .await;
2035            actors[1]
2036                .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
2037                .await;
2038
2039            // Validator 0: Finalize with view 1
2040            let proposal_v1 = Proposal {
2041                round: Round::new(Epoch::new(0), View::new(1)),
2042                parent: View::new(0),
2043                payload: commitment,
2044            };
2045            let notarization_v1 = make_notarization(proposal_v1.clone(), &schemes, QUORUM);
2046            let finalization_v1 = make_finalization(proposal_v1.clone(), &schemes, QUORUM);
2047            actors[0]
2048                .report(Activity::Notarization(notarization_v1.clone()))
2049                .await;
2050            actors[0]
2051                .report(Activity::Finalization(finalization_v1.clone()))
2052                .await;
2053
2054            // Validator 1: Finalize with view 2 (simulates receiving finalization from different view)
2055            // This could happen during epoch transitions where the same block gets finalized
2056            // with different views by different validators.
2057            let proposal_v2 = Proposal {
2058                round: Round::new(Epoch::new(0), View::new(2)), // Different view
2059                parent: View::new(0),
2060                payload: commitment, // Same block
2061            };
2062            let notarization_v2 = make_notarization(proposal_v2.clone(), &schemes, QUORUM);
2063            let finalization_v2 = make_finalization(proposal_v2.clone(), &schemes, QUORUM);
2064            actors[1]
2065                .report(Activity::Notarization(notarization_v2.clone()))
2066                .await;
2067            actors[1]
2068                .report(Activity::Finalization(finalization_v2.clone()))
2069                .await;
2070
2071            // Wait for finalization processing
2072            context.sleep(Duration::from_millis(100)).await;
2073
2074            // Verify both validators stored the block correctly
2075            let block0 = actors[0].get_block(Height::new(1)).await.unwrap();
2076            let block1 = actors[1].get_block(Height::new(1)).await.unwrap();
2077            assert_eq!(block0, block);
2078            assert_eq!(block1, block);
2079
2080            // Verify both validators have finalizations stored
2081            let fin0 = actors[0].get_finalization(Height::new(1)).await.unwrap();
2082            let fin1 = actors[1].get_finalization(Height::new(1)).await.unwrap();
2083
2084            // Verify the finalizations have the expected different views
2085            assert_eq!(fin0.proposal.payload, block.commitment());
2086            assert_eq!(fin0.round().view(), View::new(1));
2087            assert_eq!(fin1.proposal.payload, block.commitment());
2088            assert_eq!(fin1.round().view(), View::new(2));
2089
2090            // Both validators can retrieve block by height
2091            assert_eq!(
2092                actors[0].get_info(Height::new(1)).await,
2093                Some((Height::new(1), commitment))
2094            );
2095            assert_eq!(
2096                actors[1].get_info(Height::new(1)).await,
2097                Some((Height::new(1), commitment))
2098            );
2099
2100            // Test that a validator receiving BOTH finalizations handles it correctly
2101            // (the second one should be ignored since archive ignores duplicates for same height)
2102            actors[0]
2103                .report(Activity::Finalization(finalization_v2.clone()))
2104                .await;
2105            actors[1]
2106                .report(Activity::Finalization(finalization_v1.clone()))
2107                .await;
2108            context.sleep(Duration::from_millis(100)).await;
2109
2110            // Validator 0 should still have the original finalization (v1)
2111            let fin0_after = actors[0].get_finalization(Height::new(1)).await.unwrap();
2112            assert_eq!(fin0_after.round().view(), View::new(1));
2113
2114            // Validator 1 should still have the original finalization (v2)
2115            let fin0_after = actors[1].get_finalization(Height::new(1)).await.unwrap();
2116            assert_eq!(fin0_after.round().view(), View::new(2));
2117        })
2118    }
2119
2120    #[test_traced("WARN")]
2121    fn test_init_processed_height() {
2122        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2123        runner.start(|mut context| async move {
2124            let mut oracle = setup_network(context.clone(), None);
2125            let Fixture {
2126                participants,
2127                schemes,
2128                ..
2129            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2130
2131            // Test 1: Fresh init should return processed height 0
2132            let me = participants[0].clone();
2133            let (application, mut actor, initial_height) = setup_validator(
2134                context.with_label("validator_0"),
2135                &mut oracle,
2136                me.clone(),
2137                ConstantProvider::new(schemes[0].clone()),
2138            )
2139            .await;
2140            assert_eq!(initial_height, Height::zero());
2141
2142            // Process multiple blocks (1, 2, 3)
2143            let mut parent = Sha256::hash(b"");
2144            let mut blocks = Vec::new();
2145            for i in 1..=3 {
2146                let block = make_block(parent, Height::new(i), i);
2147                let commitment = block.digest();
2148                let round = Round::new(Epoch::new(0), View::new(i));
2149
2150                actor.verified(round, block.clone()).await;
2151                let proposal = Proposal {
2152                    round,
2153                    parent: View::new(i - 1),
2154                    payload: commitment,
2155                };
2156                let finalization = make_finalization(proposal, &schemes, QUORUM);
2157                actor.report(Activity::Finalization(finalization)).await;
2158
2159                blocks.push(block);
2160                parent = commitment;
2161            }
2162
2163            // Wait for application to process all blocks
2164            while application.blocks().len() < 3 {
2165                context.sleep(Duration::from_millis(10)).await;
2166            }
2167
2168            // Set marshal's processed height to 3
2169            actor.set_floor(Height::new(3)).await;
2170            context.sleep(Duration::from_millis(10)).await;
2171
2172            // Verify application received all blocks
2173            assert_eq!(application.blocks().len(), 3);
2174            assert_eq!(
2175                application.tip(),
2176                Some((Height::new(3), blocks[2].digest()))
2177            );
2178
2179            // Test 2: Restart with marshal processed height = 3
2180            let (_restart_application, _restart_actor, restart_height) = setup_validator(
2181                context.with_label("validator_0_restart"),
2182                &mut oracle,
2183                me,
2184                ConstantProvider::new(schemes[0].clone()),
2185            )
2186            .await;
2187
2188            assert_eq!(restart_height, Height::new(3));
2189        })
2190    }
2191
2192    #[test_traced("WARN")]
2193    fn test_marshaled_rejects_unsupported_epoch() {
2194        #[derive(Clone)]
2195        struct MockVerifyingApp {
2196            genesis: B,
2197        }
2198
2199        impl crate::Application<deterministic::Context> for MockVerifyingApp {
2200            type Block = B;
2201            type Context = Context<D, K>;
2202            type SigningScheme = S;
2203
2204            async fn genesis(&mut self) -> Self::Block {
2205                self.genesis.clone()
2206            }
2207
2208            async fn propose(
2209                &mut self,
2210                _context: (deterministic::Context, Self::Context),
2211                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2212            ) -> Option<Self::Block> {
2213                None
2214            }
2215        }
2216
2217        impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
2218            async fn verify(
2219                &mut self,
2220                _context: (deterministic::Context, Self::Context),
2221                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2222            ) -> bool {
2223                true
2224            }
2225        }
2226
2227        #[derive(Clone)]
2228        struct LimitedEpocher {
2229            inner: FixedEpocher,
2230            max_epoch: u64,
2231        }
2232
2233        impl Epocher for LimitedEpocher {
2234            fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
2235                let bounds = self.inner.containing(height)?;
2236                if bounds.epoch().get() > self.max_epoch {
2237                    None
2238                } else {
2239                    Some(bounds)
2240                }
2241            }
2242
2243            fn first(&self, epoch: Epoch) -> Option<Height> {
2244                if epoch.get() > self.max_epoch {
2245                    None
2246                } else {
2247                    self.inner.first(epoch)
2248                }
2249            }
2250
2251            fn last(&self, epoch: Epoch) -> Option<Height> {
2252                if epoch.get() > self.max_epoch {
2253                    None
2254                } else {
2255                    self.inner.last(epoch)
2256                }
2257            }
2258        }
2259
2260        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2261        runner.start(|mut context| async move {
2262            let mut oracle = setup_network(context.clone(), None);
2263            let Fixture {
2264                participants,
2265                schemes,
2266                ..
2267            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2268
2269            let me = participants[0].clone();
2270            let (_base_app, marshal, _processed_height) = setup_validator(
2271                context.with_label("validator_0"),
2272                &mut oracle,
2273                me.clone(),
2274                ConstantProvider::new(schemes[0].clone()),
2275            )
2276            .await;
2277
2278            let genesis = make_block(Sha256::hash(b""), Height::zero(), 0);
2279
2280            let mock_app = MockVerifyingApp {
2281                genesis: genesis.clone(),
2282            };
2283            let limited_epocher = LimitedEpocher {
2284                inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
2285                max_epoch: 0,
2286            };
2287            let mut marshaled =
2288                Marshaled::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
2289
2290            // Create a parent block at height 19 (last block in epoch 0, which is supported)
2291            let parent = make_block(genesis.commitment(), Height::new(19), 1000);
2292            let parent_commitment = parent.commitment();
2293            let parent_round = Round::new(Epoch::new(0), View::new(19));
2294            marshal.clone().verified(parent_round, parent).await;
2295
2296            // Create a block at height 20 (first block in epoch 1, which is NOT supported)
2297            let block = make_block(parent_commitment, Height::new(20), 2000);
2298            let block_commitment = block.commitment();
2299            marshal
2300                .clone()
2301                .proposed(Round::new(Epoch::new(1), View::new(20)), block)
2302                .await;
2303
2304            context.sleep(Duration::from_millis(10)).await;
2305
2306            let unsupported_round = Round::new(Epoch::new(1), View::new(20));
2307            let unsupported_context = Context {
2308                round: unsupported_round,
2309                leader: me.clone(),
2310                parent: (View::new(19), parent_commitment),
2311            };
2312
2313            let verify_result = marshaled
2314                .verify(unsupported_context, block_commitment)
2315                .await
2316                .await;
2317
2318            assert!(
2319                !verify_result.unwrap(),
2320                "Block in unsupported epoch should be rejected"
2321            );
2322        })
2323    }
2324
2325    /// Regression test for verification task cleanup.
2326    ///
2327    /// Verifies that certifying blocks out of order works correctly. When multiple
2328    /// blocks are verified at different views, certifying a higher-view block should
2329    /// not interfere with certifying a lower-view block that was verified earlier.
2330    ///
2331    /// Scenario:
2332    /// 1. Verify block A at view V
2333    /// 2. Verify block B at view V+K
2334    /// 3. Certify block B at view V+K
2335    /// 4. Certify block A at view V - should succeed
2336    #[test_traced("INFO")]
2337    fn test_certify_lower_view_after_higher_view() {
2338        #[derive(Clone)]
2339        struct MockVerifyingApp {
2340            genesis: B,
2341        }
2342
2343        impl crate::Application<deterministic::Context> for MockVerifyingApp {
2344            type Block = B;
2345            type Context = Context<D, K>;
2346            type SigningScheme = S;
2347
2348            async fn genesis(&mut self) -> Self::Block {
2349                self.genesis.clone()
2350            }
2351
2352            async fn propose(
2353                &mut self,
2354                _context: (deterministic::Context, Self::Context),
2355                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2356            ) -> Option<Self::Block> {
2357                None
2358            }
2359        }
2360
2361        impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
2362            async fn verify(
2363                &mut self,
2364                _context: (deterministic::Context, Self::Context),
2365                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2366            ) -> bool {
2367                true
2368            }
2369        }
2370
2371        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2372        runner.start(|mut context| async move {
2373            let mut oracle = setup_network(context.clone(), None);
2374            let Fixture {
2375                participants,
2376                schemes,
2377                ..
2378            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2379
2380            let me = participants[0].clone();
2381            let (_base_app, marshal, _processed_height) = setup_validator(
2382                context.with_label("validator_0"),
2383                &mut oracle,
2384                me.clone(),
2385                ConstantProvider::new(schemes[0].clone()),
2386            )
2387            .await;
2388
2389            let genesis = make_block(Sha256::hash(b""), Height::zero(), 0);
2390
2391            let mock_app = MockVerifyingApp {
2392                genesis: genesis.clone(),
2393            };
2394            let mut marshaled = Marshaled::new(
2395                context.clone(),
2396                mock_app,
2397                marshal.clone(),
2398                FixedEpocher::new(BLOCKS_PER_EPOCH),
2399            );
2400
2401            // Create parent block at height 1
2402            let parent = make_block(genesis.commitment(), Height::new(1), 100);
2403            let parent_commitment = parent.commitment();
2404            let parent_round = Round::new(Epoch::new(0), View::new(1));
2405            marshal.clone().verified(parent_round, parent).await;
2406
2407            // Block A at view 5 (height 2) - create with context matching what verify will receive
2408            let round_a = Round::new(Epoch::new(0), View::new(5));
2409            let context_a = Context {
2410                round: round_a,
2411                leader: me.clone(),
2412                parent: (View::new(1), parent_commitment),
2413            };
2414            let block_a =
2415                B::new::<Sha256>(context_a.clone(), parent_commitment, Height::new(2), 200);
2416            let commitment_a = block_a.commitment();
2417            marshal.clone().proposed(round_a, block_a).await;
2418
2419            // Block B at view 10 (height 2, different block same height - could happen with
2420            // different proposers or re-proposals)
2421            let round_b = Round::new(Epoch::new(0), View::new(10));
2422            let context_b = Context {
2423                round: round_b,
2424                leader: me.clone(),
2425                parent: (View::new(1), parent_commitment),
2426            };
2427            let block_b =
2428                B::new::<Sha256>(context_b.clone(), parent_commitment, Height::new(2), 300);
2429            let commitment_b = block_b.commitment();
2430            marshal.clone().proposed(round_b, block_b).await;
2431
2432            context.sleep(Duration::from_millis(10)).await;
2433
2434            // Step 1: Verify block A at view 5
2435            let _ = marshaled.verify(context_a, commitment_a).await.await;
2436
2437            // Step 2: Verify block B at view 10
2438            let _ = marshaled.verify(context_b, commitment_b).await.await;
2439
2440            // Step 3: Certify block B at view 10 FIRST
2441            let certify_b = marshaled.certify(round_b, commitment_b).await;
2442            assert!(
2443                certify_b.await.unwrap(),
2444                "Block B certification should succeed"
2445            );
2446
2447            // Step 4: Certify block A at view 5 - should succeed
2448            let certify_a = marshaled.certify(round_a, commitment_a).await;
2449
2450            // Use select with timeout to detect never-resolving receiver
2451            select! {
2452                result = certify_a => {
2453                    assert!(result.unwrap(), "Block A certification should succeed");
2454                },
2455                _ = context.sleep(Duration::from_secs(5)) => {
2456                    panic!("Block A certification timed out");
2457                },
2458            }
2459        })
2460    }
2461
2462    /// Regression test for re-proposal validation in optimistic_verify.
2463    ///
2464    /// Verifies that:
2465    /// 1. Valid re-proposals at epoch boundaries are accepted
2466    /// 2. Invalid re-proposals (not at epoch boundary) are rejected
2467    ///
2468    /// A re-proposal occurs when the parent commitment equals the block being verified,
2469    /// meaning the same block is being proposed again in a new view.
2470    #[test_traced("INFO")]
2471    fn test_marshaled_reproposal_validation() {
2472        #[derive(Clone)]
2473        struct MockVerifyingApp {
2474            genesis: B,
2475        }
2476
2477        impl crate::Application<deterministic::Context> for MockVerifyingApp {
2478            type Block = B;
2479            type Context = Context<D, K>;
2480            type SigningScheme = S;
2481
2482            async fn genesis(&mut self) -> Self::Block {
2483                self.genesis.clone()
2484            }
2485
2486            async fn propose(
2487                &mut self,
2488                _context: (deterministic::Context, Self::Context),
2489                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2490            ) -> Option<Self::Block> {
2491                None
2492            }
2493        }
2494
2495        impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
2496            async fn verify(
2497                &mut self,
2498                _context: (deterministic::Context, Self::Context),
2499                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2500            ) -> bool {
2501                true
2502            }
2503        }
2504
2505        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2506        runner.start(|mut context| async move {
2507            let mut oracle = setup_network(context.clone(), None);
2508            let Fixture {
2509                participants,
2510                schemes,
2511                ..
2512            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2513
2514            let me = participants[0].clone();
2515            let (_base_app, marshal, _processed_height) = setup_validator(
2516                context.with_label("validator_0"),
2517                &mut oracle,
2518                me.clone(),
2519                ConstantProvider::new(schemes[0].clone()),
2520            )
2521            .await;
2522
2523            let genesis = make_block(Sha256::hash(b""), Height::zero(), 0);
2524
2525            let mock_app = MockVerifyingApp {
2526                genesis: genesis.clone(),
2527            };
2528            let mut marshaled = Marshaled::new(
2529                context.clone(),
2530                mock_app,
2531                marshal.clone(),
2532                FixedEpocher::new(BLOCKS_PER_EPOCH),
2533            );
2534
2535            // Build a chain up to the epoch boundary (height 19 is the last block in epoch 0
2536            // with BLOCKS_PER_EPOCH=20, since epoch 0 covers heights 0-19)
2537            let mut parent = genesis.commitment();
2538            let mut last_view = View::zero();
2539            for i in 1..BLOCKS_PER_EPOCH.get() {
2540                let round = Round::new(Epoch::new(0), View::new(i));
2541                let ctx = Context {
2542                    round,
2543                    leader: me.clone(),
2544                    parent: (last_view, parent),
2545                };
2546                let block = B::new::<Sha256>(ctx.clone(), parent, Height::new(i), i * 100);
2547                marshal.clone().verified(round, block.clone()).await;
2548                parent = block.commitment();
2549                last_view = View::new(i);
2550            }
2551
2552            // Create the epoch boundary block (height 19, last block in epoch 0)
2553            let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
2554            let boundary_round = Round::new(Epoch::new(0), View::new(boundary_height.get()));
2555            let boundary_context = Context {
2556                round: boundary_round,
2557                leader: me.clone(),
2558                parent: (last_view, parent),
2559            };
2560            let boundary_block = B::new::<Sha256>(
2561                boundary_context.clone(),
2562                parent,
2563                boundary_height,
2564                boundary_height.get() * 100,
2565            );
2566            let boundary_commitment = boundary_block.commitment();
2567            marshal
2568                .clone()
2569                .verified(boundary_round, boundary_block.clone())
2570                .await;
2571
2572            // Make the boundary block available for subscription
2573            marshal
2574                .clone()
2575                .proposed(boundary_round, boundary_block.clone())
2576                .await;
2577
2578            context.sleep(Duration::from_millis(10)).await;
2579
2580            // Test 1: Valid re-proposal at epoch boundary should be accepted
2581            // Re-proposal context: parent commitment equals the block being verified
2582            // Re-proposals happen within the same epoch when the parent is the last block
2583            let reproposal_round = Round::new(Epoch::new(0), View::new(20));
2584            let reproposal_context = Context {
2585                round: reproposal_round,
2586                leader: me.clone(),
2587                parent: (View::new(boundary_height.get()), boundary_commitment), // Parent IS the boundary block
2588            };
2589
2590            // Call verify (which calls optimistic_verify internally via Automaton trait)
2591            let verify_result = marshaled
2592                .verify(reproposal_context.clone(), boundary_commitment)
2593                .await
2594                .await;
2595            assert!(
2596                verify_result.unwrap(),
2597                "Valid re-proposal at epoch boundary should be accepted"
2598            );
2599
2600            // Test 2: Invalid re-proposal (not at epoch boundary) should be rejected
2601            // Create a block at height 10 (not at epoch boundary)
2602            let non_boundary_height = Height::new(10);
2603            let non_boundary_round = Round::new(Epoch::new(0), View::new(10));
2604            let non_boundary_context = Context {
2605                round: non_boundary_round,
2606                leader: me.clone(),
2607                parent: (View::new(9), parent),
2608            };
2609            let non_boundary_block = B::new::<Sha256>(
2610                non_boundary_context.clone(),
2611                parent,
2612                non_boundary_height,
2613                1000,
2614            );
2615            let non_boundary_commitment = non_boundary_block.commitment();
2616
2617            // Make the non-boundary block available
2618            marshal
2619                .clone()
2620                .proposed(non_boundary_round, non_boundary_block.clone())
2621                .await;
2622
2623            context.sleep(Duration::from_millis(10)).await;
2624
2625            // Attempt to re-propose the non-boundary block
2626            let invalid_reproposal_round = Round::new(Epoch::new(0), View::new(15));
2627            let invalid_reproposal_context = Context {
2628                round: invalid_reproposal_round,
2629                leader: me.clone(),
2630                parent: (View::new(10), non_boundary_commitment),
2631            };
2632
2633            let verify_result = marshaled
2634                .verify(invalid_reproposal_context, non_boundary_commitment)
2635                .await
2636                .await;
2637            assert!(
2638                !verify_result.unwrap(),
2639                "Invalid re-proposal (not at epoch boundary) should be rejected"
2640            );
2641
2642            // Test 3: Re-proposal with mismatched epoch should be rejected
2643            // This is a regression test - re-proposals must be in the same epoch as the block.
2644            let cross_epoch_reproposal_round = Round::new(Epoch::new(1), View::new(20));
2645            let cross_epoch_reproposal_context = Context {
2646                round: cross_epoch_reproposal_round,
2647                leader: me.clone(),
2648                parent: (View::new(boundary_height.get()), boundary_commitment),
2649            };
2650
2651            let verify_result = marshaled
2652                .verify(cross_epoch_reproposal_context, boundary_commitment)
2653                .await
2654                .await;
2655            assert!(
2656                !verify_result.unwrap(),
2657                "Re-proposal with mismatched epoch should be rejected"
2658            );
2659
2660            // Test 4: Certify-only path for re-proposal (no prior verify call)
2661            // This tests the crash recovery scenario where a validator needs to certify
2662            // a re-proposal without having called verify first.
2663            let certify_only_round = Round::new(Epoch::new(0), View::new(21));
2664            let certify_result = marshaled
2665                .certify(certify_only_round, boundary_commitment)
2666                .await
2667                .await;
2668            assert!(
2669                certify_result.unwrap(),
2670                "Certify-only path for re-proposal should succeed"
2671            );
2672
2673            // Test 5: Certify-only path for a normal block (no prior verify call)
2674            // Build a normal block (not at epoch boundary) and test certify without verify.
2675            // Use genesis as the parent since we don't have finalized blocks at other heights.
2676            let normal_height = Height::new(1);
2677            let normal_round = Round::new(Epoch::new(0), View::new(100));
2678            let genesis_commitment = genesis.commitment();
2679
2680            let normal_context = Context {
2681                round: normal_round,
2682                leader: me.clone(),
2683                parent: (View::zero(), genesis_commitment),
2684            };
2685            let normal_block = B::new::<Sha256>(
2686                normal_context.clone(),
2687                genesis_commitment,
2688                normal_height,
2689                500,
2690            );
2691            let normal_commitment = normal_block.commitment();
2692            marshal
2693                .clone()
2694                .proposed(normal_round, normal_block.clone())
2695                .await;
2696
2697            context.sleep(Duration::from_millis(10)).await;
2698
2699            // Certify without calling verify first
2700            let certify_result = marshaled
2701                .certify(normal_round, normal_commitment)
2702                .await
2703                .await;
2704            assert!(
2705                certify_result.unwrap(),
2706                "Certify-only path for normal block should succeed"
2707            );
2708        })
2709    }
2710
2711    #[test_traced("INFO")]
2712    fn test_broadcast_caches_block() {
2713        let runner = deterministic::Runner::timed(Duration::from_secs(60));
2714        runner.start(|mut context| async move {
2715            let mut oracle = setup_network(context.clone(), None);
2716            let Fixture {
2717                participants,
2718                schemes,
2719                ..
2720            } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2721
2722            // Set up one validator
2723            let (i, validator) = participants.iter().enumerate().next().unwrap();
2724            let mut actor = setup_validator(
2725                context.with_label(&format!("validator_{i}")),
2726                &mut oracle,
2727                validator.clone(),
2728                ConstantProvider::new(schemes[i].clone()),
2729            )
2730            .await
2731            .1;
2732
2733            // Create block at height 1
2734            let parent = Sha256::hash(b"");
2735            let block = make_block(parent, Height::new(1), 1);
2736            let commitment = block.digest();
2737
2738            // Broadcast the block
2739            actor
2740                .proposed(Round::new(Epoch::new(0), View::new(1)), block.clone())
2741                .await;
2742
2743            // Ensure the block is cached and retrievable; This should hit the in-memory cache
2744            // via `buffered::Mailbox`.
2745            actor
2746                .get_block(&commitment)
2747                .await
2748                .expect("block should be cached after broadcast");
2749
2750            // Restart marshal, removing any in-memory cache
2751            let mut actor = setup_validator(
2752                context.with_label(&format!("validator_{i}_restart")),
2753                &mut oracle,
2754                validator.clone(),
2755                ConstantProvider::new(schemes[i].clone()),
2756            )
2757            .await
2758            .1;
2759
2760            // Put a notarization into the cache to re-initialize the ephemeral cache for the
2761            // first epoch. Without this, the marshal cannot determine the epoch of the block being fetched,
2762            // so it won't look to restore the cache for the epoch.
2763            let notarization = make_notarization(
2764                Proposal {
2765                    round: Round::new(Epoch::new(0), View::new(1)),
2766                    parent: View::new(0),
2767                    payload: commitment,
2768                },
2769                &schemes,
2770                QUORUM,
2771            );
2772            actor.report(Activity::Notarization(notarization)).await;
2773
2774            // Ensure the block is cached and retrievable
2775            let fetched = actor
2776                .get_block(&commitment)
2777                .await
2778                .expect("block should be cached after broadcast");
2779            assert_eq!(fetched, block);
2780        });
2781    }
2782}