commonware_consensus/marshal/
mod.rs

1//! Ordered delivery of finalized blocks.
2//!
3//! # Architecture
4//!
5//! The core of the module is the [actor::Actor]. It marshals the finalized blocks into order by:
6//!
7//! - Receiving uncertified blocks from a broadcast mechanism
8//! - Receiving notarizations and finalizations from consensus
9//! - Reconstructing a total order of finalized blocks
10//! - Providing a backfill mechanism for missing blocks
11//!
12//! The actor interacts with four main components:
13//! - [crate::Reporter]: Receives ordered, finalized blocks at-least-once
14//! - [crate::simplex]: Provides consensus messages
15//! - Application: Provides verified blocks
16//! - [commonware_broadcast::buffered]: Provides uncertified blocks received from the network
17//! - [commonware_resolver::Resolver]: Provides a backfill mechanism for missing blocks
18//!
19//! # Design
20//!
21//! ## Delivery
22//!
23//! The actor will deliver a block to the reporter at-least-once. The reporter should be prepared to
24//! handle duplicate deliveries. However the blocks will be in order.
25//!
26//! ## Finalization
27//!
28//! The actor uses a view-based model to track the state of the chain. Each view corresponds
29//! to a potential block in the chain. The actor will only finalize a block (and its ancestors)
30//! if it has a corresponding finalization from consensus.
31//!
32//! _It is possible that there may exist multiple finalizations for the same block in different views. Marshal
33//! only concerns itself with verifying a valid finalization exists for a block, not that a specific finalization
34//! exists. This means different Marshals may have different finalizations for the same block persisted to disk._
35//!
36//! ## Backfill
37//!
38//! The actor provides a backfill mechanism for missing blocks. If the actor notices a gap in its
39//! knowledge of finalized blocks, it will request the missing blocks from its peers. This ensures
40//! that the actor can catch up to the rest of the network if it falls behind.
41//!
42//! ## Storage
43//!
44//! The actor uses a combination of internal and external ([`store::Certificates`], [`store::Blocks`]) storage
45//! to store blocks and finalizations. Internal storage is used to store data that is only needed for a short
46//! period of time, such as unverified blocks or notarizations. External storage is used to
47//! store data that needs to be persisted indefinitely, such as finalized blocks.
48//!
49//! Marshal will store all blocks after a configurable starting height (or, floor) onward.
50//! This allows for state sync from a specific height rather than from genesis. When
51//! updating the starting height, marshal will attempt to prune blocks in external storage
52//! that are no longer needed.
53//!
54//! _Setting a configurable starting height will prevent others from backfilling blocks below said height. This
55//! feature is only recommended for applications that support state sync (i.e., those that don't require full
56//! block history to participate in consensus)._
57//!
58//! ## Limitations and Future Work
59//!
60//! - Only works with [crate::simplex] rather than general consensus.
61//! - Assumes at-most one notarization per view, incompatible with some consensus protocols.
62//! - Uses [`broadcast::buffered`](`commonware_broadcast::buffered`) for broadcasting and receiving
63//!   uncertified blocks from the network.
64
65pub mod actor;
66pub use actor::Actor;
67pub mod cache;
68pub mod config;
69pub use config::Config;
70pub mod ingress;
71pub use ingress::mailbox::Mailbox;
72pub mod resolver;
73pub mod store;
74
75use crate::Block;
76use commonware_utils::{acknowledgement::Exact, Acknowledgement};
77
78/// An update reported to the application, either a new finalized tip or a finalized block.
79///
80/// Finalized tips are reported as soon as known, whether or not we hold all blocks up to that height.
81/// Finalized blocks are reported to the application in monotonically increasing order (no gaps permitted).
82#[derive(Clone, Debug)]
83pub enum Update<B: Block, A: Acknowledgement = Exact> {
84    /// A new finalized tip.
85    Tip(u64, B::Commitment),
86    /// A new finalized block and an [Acknowledgement] for the application to signal once processed.
87    ///
88    /// To ensure all blocks are delivered at least once, marshal waits to mark a block as delivered
89    /// until the application explicitly acknowledges the update. If the [Acknowledgement] is dropped before
90    /// handling, marshal will exit (assuming the application is shutting down).
91    ///
92    /// Because the [Acknowledgement] is clonable, the application can pass [Update] to multiple consumers
93    /// (and marshal will only consider the block delivered once all consumers have acknowledged it).
94    Block(B, A),
95}
96
97#[cfg(test)]
98pub mod mocks;
99
100#[cfg(test)]
101mod tests {
102    use super::{
103        actor,
104        config::Config,
105        mocks::{application::Application, block::Block},
106        resolver::p2p as resolver,
107    };
108    use crate::{
109        application::marshaled::Marshaled,
110        marshal::ingress::mailbox::{AncestorStream, Identifier},
111        simplex::{
112            scheme::bls12381_threshold,
113            types::{Activity, Context, Finalization, Finalize, Notarization, Notarize, Proposal},
114        },
115        types::{Epoch, Epocher, FixedEpocher, Round, View, ViewDelta},
116        Automaton, Block as _, Reporter, VerifyingApplication,
117    };
118    use commonware_broadcast::buffered;
119    use commonware_cryptography::{
120        bls12381::primitives::variant::MinPk,
121        certificate::{mocks::Fixture, ConstantProvider, Scheme as _},
122        ed25519::PublicKey,
123        sha256::{Digest as Sha256Digest, Sha256},
124        Committable, Digestible, Hasher as _,
125    };
126    use commonware_macros::test_traced;
127    use commonware_p2p::{
128        simulated::{self, Link, Network, Oracle},
129        Manager,
130    };
131    use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Quota, Runner};
132    use commonware_storage::archive::immutable;
133    use commonware_utils::{vec::NonEmptyVec, NZUsize, NZU64};
134    use futures::StreamExt;
135    use rand::{
136        seq::{IteratorRandom, SliceRandom},
137        Rng,
138    };
139    use std::{
140        collections::BTreeMap,
141        num::{NonZeroU32, NonZeroU64, NonZeroUsize},
142        time::{Duration, Instant},
143    };
144    use tracing::info;
145
146    type D = Sha256Digest;
147    type B = Block<D>;
148    type K = PublicKey;
149    type V = MinPk;
150    type S = bls12381_threshold::Scheme<K, V>;
151    type P = ConstantProvider<S, Epoch>;
152
153    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
154    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
155    const NAMESPACE: &[u8] = b"test";
156    const NUM_VALIDATORS: u32 = 4;
157    const QUORUM: u32 = 3;
158    const NUM_BLOCKS: u64 = 160;
159    const BLOCKS_PER_EPOCH: NonZeroU64 = NZU64!(20);
160    const LINK: Link = Link {
161        latency: Duration::from_millis(100),
162        jitter: Duration::from_millis(1),
163        success_rate: 1.0,
164    };
165    const UNRELIABLE_LINK: Link = Link {
166        latency: Duration::from_millis(200),
167        jitter: Duration::from_millis(50),
168        success_rate: 0.7,
169    };
170    const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
171
172    async fn setup_validator(
173        context: deterministic::Context,
174        oracle: &mut Oracle<K, deterministic::Context>,
175        validator: K,
176        provider: P,
177    ) -> (
178        Application<B>,
179        crate::marshal::ingress::mailbox::Mailbox<S, B>,
180        u64,
181    ) {
182        let config = Config {
183            provider,
184            epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
185            mailbox_size: 100,
186            namespace: NAMESPACE.to_vec(),
187            view_retention_timeout: ViewDelta::new(10),
188            max_repair: NZUsize!(10),
189            block_codec_config: (),
190            partition_prefix: format!("validator-{}", validator.clone()),
191            prunable_items_per_section: NZU64!(10),
192            replay_buffer: NZUsize!(1024),
193            write_buffer: NZUsize!(1024),
194            buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
195        };
196
197        // Create the resolver
198        let mut control = oracle.control(validator.clone());
199        let backfill = control.register(1, TEST_QUOTA).await.unwrap();
200        let resolver_cfg = resolver::Config {
201            public_key: validator.clone(),
202            manager: oracle.manager(),
203            blocker: control.clone(),
204            mailbox_size: config.mailbox_size,
205            initial: Duration::from_secs(1),
206            timeout: Duration::from_secs(2),
207            fetch_retry_timeout: Duration::from_millis(100),
208            priority_requests: false,
209            priority_responses: false,
210        };
211        let resolver = resolver::init(&context, resolver_cfg, backfill);
212
213        // Create a buffered broadcast engine and get its mailbox
214        let broadcast_config = buffered::Config {
215            public_key: validator.clone(),
216            mailbox_size: config.mailbox_size,
217            deque_size: 10,
218            priority: false,
219            codec_config: (),
220        };
221        let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
222        let network = control.register(2, TEST_QUOTA).await.unwrap();
223        broadcast_engine.start(network);
224
225        // Initialize finalizations by height
226        let start = Instant::now();
227        let finalizations_by_height = immutable::Archive::init(
228            context.with_label("finalizations_by_height"),
229            immutable::Config {
230                metadata_partition: format!(
231                    "{}-finalizations-by-height-metadata",
232                    config.partition_prefix
233                ),
234                freezer_table_partition: format!(
235                    "{}-finalizations-by-height-freezer-table",
236                    config.partition_prefix
237                ),
238                freezer_table_initial_size: 64,
239                freezer_table_resize_frequency: 10,
240                freezer_table_resize_chunk_size: 10,
241                freezer_journal_partition: format!(
242                    "{}-finalizations-by-height-freezer-journal",
243                    config.partition_prefix
244                ),
245                freezer_journal_target_size: 1024,
246                freezer_journal_compression: None,
247                freezer_journal_buffer_pool: config.buffer_pool.clone(),
248                ordinal_partition: format!(
249                    "{}-finalizations-by-height-ordinal",
250                    config.partition_prefix
251                ),
252                items_per_section: NZU64!(10),
253                codec_config: S::certificate_codec_config_unbounded(),
254                replay_buffer: config.replay_buffer,
255                write_buffer: config.write_buffer,
256            },
257        )
258        .await
259        .expect("failed to initialize finalizations by height archive");
260        info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
261
262        // Initialize finalized blocks
263        let start = Instant::now();
264        let finalized_blocks = immutable::Archive::init(
265            context.with_label("finalized_blocks"),
266            immutable::Config {
267                metadata_partition: format!(
268                    "{}-finalized_blocks-metadata",
269                    config.partition_prefix
270                ),
271                freezer_table_partition: format!(
272                    "{}-finalized_blocks-freezer-table",
273                    config.partition_prefix
274                ),
275                freezer_table_initial_size: 64,
276                freezer_table_resize_frequency: 10,
277                freezer_table_resize_chunk_size: 10,
278                freezer_journal_partition: format!(
279                    "{}-finalized_blocks-freezer-journal",
280                    config.partition_prefix
281                ),
282                freezer_journal_target_size: 1024,
283                freezer_journal_compression: None,
284                freezer_journal_buffer_pool: config.buffer_pool.clone(),
285                ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
286                items_per_section: NZU64!(10),
287                codec_config: config.block_codec_config,
288                replay_buffer: config.replay_buffer,
289                write_buffer: config.write_buffer,
290            },
291        )
292        .await
293        .expect("failed to initialize finalized blocks archive");
294        info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
295
296        let (actor, mailbox, processed_height) = actor::Actor::init(
297            context.clone(),
298            finalizations_by_height,
299            finalized_blocks,
300            config,
301        )
302        .await;
303        let application = Application::<B>::default();
304
305        // Start the application
306        actor.start(application.clone(), buffer, resolver);
307
308        (application, mailbox, processed_height)
309    }
310
311    fn make_finalization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Finalization<S, D> {
312        // Generate proposal signature
313        let finalizes: Vec<_> = schemes
314            .iter()
315            .take(quorum as usize)
316            .map(|scheme| Finalize::sign(scheme, NAMESPACE, proposal.clone()).unwrap())
317            .collect();
318
319        // Generate certificate signatures
320        Finalization::from_finalizes(&schemes[0], &finalizes).unwrap()
321    }
322
323    fn make_notarization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Notarization<S, D> {
324        // Generate proposal signature
325        let notarizes: Vec<_> = schemes
326            .iter()
327            .take(quorum as usize)
328            .map(|scheme| Notarize::sign(scheme, NAMESPACE, proposal.clone()).unwrap())
329            .collect();
330
331        // Generate certificate signatures
332        Notarization::from_notarizes(&schemes[0], &notarizes).unwrap()
333    }
334
335    fn setup_network(
336        context: deterministic::Context,
337        tracked_peer_sets: Option<usize>,
338    ) -> Oracle<K, deterministic::Context> {
339        let (network, oracle) = Network::new(
340            context.with_label("network"),
341            simulated::Config {
342                max_size: 1024 * 1024,
343                disconnect_on_block: true,
344                tracked_peer_sets,
345            },
346        );
347        network.start();
348        oracle
349    }
350
351    async fn setup_network_links(
352        oracle: &mut Oracle<K, deterministic::Context>,
353        peers: &[K],
354        link: Link,
355    ) {
356        for p1 in peers.iter() {
357            for p2 in peers.iter() {
358                if p2 == p1 {
359                    continue;
360                }
361                let _ = oracle.add_link(p1.clone(), p2.clone(), link.clone()).await;
362            }
363        }
364    }
365
366    #[test_traced("WARN")]
367    fn test_finalize_good_links() {
368        for seed in 0..5 {
369            let result1 = finalize(seed, LINK, false);
370            let result2 = finalize(seed, LINK, false);
371
372            // Ensure determinism
373            assert_eq!(result1, result2);
374        }
375    }
376
377    #[test_traced("WARN")]
378    fn test_finalize_bad_links() {
379        for seed in 0..5 {
380            let result1 = finalize(seed, UNRELIABLE_LINK, false);
381            let result2 = finalize(seed, UNRELIABLE_LINK, false);
382
383            // Ensure determinism
384            assert_eq!(result1, result2);
385        }
386    }
387
388    #[test_traced("WARN")]
389    fn test_finalize_good_links_quorum_sees_finalization() {
390        for seed in 0..5 {
391            let result1 = finalize(seed, LINK, true);
392            let result2 = finalize(seed, LINK, true);
393
394            // Ensure determinism
395            assert_eq!(result1, result2);
396        }
397    }
398
399    #[test_traced("DEBUG")]
400    fn test_finalize_bad_links_quorum_sees_finalization() {
401        for seed in 0..5 {
402            let result1 = finalize(seed, UNRELIABLE_LINK, true);
403            let result2 = finalize(seed, UNRELIABLE_LINK, true);
404
405            // Ensure determinism
406            assert_eq!(result1, result2);
407        }
408    }
409
410    fn finalize(seed: u64, link: Link, quorum_sees_finalization: bool) -> String {
411        let runner = deterministic::Runner::new(
412            deterministic::Config::new()
413                .with_seed(seed)
414                .with_timeout(Some(Duration::from_secs(600))),
415        );
416        runner.start(|mut context| async move {
417            let mut oracle = setup_network(context.clone(), Some(3));
418            let Fixture {
419                participants,
420                schemes,
421                ..
422            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
423
424            // Initialize applications and actors
425            let mut applications = BTreeMap::new();
426            let mut actors = Vec::new();
427
428            // Register the initial peer set.
429            let mut manager = oracle.manager();
430            manager
431                .update(0, participants.clone().try_into().unwrap())
432                .await;
433            for (i, validator) in participants.iter().enumerate() {
434                let (application, actor, _processed_height) = setup_validator(
435                    context.with_label(&format!("validator_{i}")),
436                    &mut oracle,
437                    validator.clone(),
438                    ConstantProvider::new(schemes[i].clone()),
439                )
440                .await;
441                applications.insert(validator.clone(), application);
442                actors.push(actor);
443            }
444
445            // Add links between all peers
446            setup_network_links(&mut oracle, &participants, link.clone()).await;
447
448            // Generate blocks, skipping the genesis block.
449            let mut blocks = Vec::<B>::new();
450            let mut parent = Sha256::hash(b"");
451            for i in 1..=NUM_BLOCKS {
452                let block = B::new::<Sha256>(parent, i, i);
453                parent = block.digest();
454                blocks.push(block);
455            }
456
457            // Broadcast and finalize blocks in random order
458            let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
459            blocks.shuffle(&mut context);
460            for block in blocks.iter() {
461                // Skip genesis block
462                let height = block.height();
463                assert!(height > 0, "genesis block should not have been generated");
464
465                // Calculate the epoch and round for the block
466                let bounds = epocher.containing(height).unwrap();
467                let round = Round::new(bounds.epoch(), View::new(height));
468
469                // Broadcast block by one validator
470                let actor_index: usize = (height % (NUM_VALIDATORS as u64)) as usize;
471                let mut actor = actors[actor_index].clone();
472                actor.proposed(round, block.clone()).await;
473                actor.verified(round, block.clone()).await;
474
475                // Wait for the block to be broadcast, but due to jitter, we may or may not receive
476                // the block before continuing.
477                context.sleep(link.latency).await;
478
479                // Notarize block by the validator that broadcasted it
480                let proposal = Proposal {
481                    round,
482                    parent: View::new(height.checked_sub(1).unwrap()),
483                    payload: block.digest(),
484                };
485                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
486                actor
487                    .report(Activity::Notarization(notarization.clone()))
488                    .await;
489
490                // Finalize block by all validators
491                // Always finalize 1) the last block in each epoch 2) the last block in the chain.
492                let fin = make_finalization(proposal, &schemes, QUORUM);
493                if quorum_sees_finalization {
494                    // If `quorum_sees_finalization` is set, ensure at least `QUORUM` sees a finalization 20%
495                    // of the time.
496                    let do_finalize = context.gen_bool(0.2);
497                    for (i, actor) in actors
498                        .iter_mut()
499                        .choose_multiple(&mut context, NUM_VALIDATORS as usize)
500                        .iter_mut()
501                        .enumerate()
502                    {
503                        if (do_finalize && i < QUORUM as usize)
504                            || height == NUM_BLOCKS
505                            || height == bounds.last()
506                        {
507                            actor.report(Activity::Finalization(fin.clone())).await;
508                        }
509                    }
510                } else {
511                    // If `quorum_sees_finalization` is not set, finalize randomly with a 20% chance for each
512                    // individual participant.
513                    for actor in actors.iter_mut() {
514                        if context.gen_bool(0.2) || height == NUM_BLOCKS || height == bounds.last()
515                        {
516                            actor.report(Activity::Finalization(fin.clone())).await;
517                        }
518                    }
519                }
520            }
521
522            // Check that all applications received all blocks.
523            let mut finished = false;
524            while !finished {
525                // Avoid a busy loop
526                context.sleep(Duration::from_secs(1)).await;
527
528                // If not all validators have finished, try again
529                if applications.len() != NUM_VALIDATORS as usize {
530                    continue;
531                }
532                finished = true;
533                for app in applications.values() {
534                    if app.blocks().len() != NUM_BLOCKS as usize {
535                        finished = false;
536                        break;
537                    }
538                    let Some((height, _)) = app.tip() else {
539                        finished = false;
540                        break;
541                    };
542                    if height < NUM_BLOCKS {
543                        finished = false;
544                        break;
545                    }
546                }
547            }
548
549            // Return state
550            context.auditor().state()
551        })
552    }
553
554    #[test_traced("WARN")]
555    fn test_sync_height_floor() {
556        let runner = deterministic::Runner::new(
557            deterministic::Config::new()
558                .with_seed(0xFF)
559                .with_timeout(Some(Duration::from_secs(300))),
560        );
561        runner.start(|mut context| async move {
562            let mut oracle = setup_network(context.clone(), Some(3));
563            let Fixture {
564                participants,
565                schemes,
566                ..
567            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
568
569            // Initialize applications and actors
570            let mut applications = BTreeMap::new();
571            let mut actors = Vec::new();
572
573            // Register the initial peer set.
574            let mut manager = oracle.manager();
575            manager
576                .update(0, participants.clone().try_into().unwrap())
577                .await;
578            for (i, validator) in participants.iter().enumerate().skip(1) {
579                let (application, actor, _processed_height) = setup_validator(
580                    context.with_label(&format!("validator_{i}")),
581                    &mut oracle,
582                    validator.clone(),
583                    ConstantProvider::new(schemes[i].clone()),
584                )
585                .await;
586                applications.insert(validator.clone(), application);
587                actors.push(actor);
588            }
589
590            // Add links between all peers except for the first, to guarantee
591            // the first peer does not receive any blocks during broadcast.
592            setup_network_links(&mut oracle, &participants[1..], LINK).await;
593
594            // Generate blocks, skipping the genesis block.
595            let mut blocks = Vec::<B>::new();
596            let mut parent = Sha256::hash(b"");
597            for i in 1..=NUM_BLOCKS {
598                let block = B::new::<Sha256>(parent, i, i);
599                parent = block.digest();
600                blocks.push(block);
601            }
602
603            // Broadcast and finalize blocks
604            let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
605            for block in blocks.iter() {
606                // Skip genesis block
607                let height = block.height();
608                assert!(height > 0, "genesis block should not have been generated");
609
610                // Calculate the epoch and round for the block
611                let bounds = epocher.containing(height).unwrap();
612                let round = Round::new(bounds.epoch(), View::new(height));
613
614                // Broadcast block by one validator
615                let actor_index: usize = (height % (applications.len() as u64)) as usize;
616                let mut actor = actors[actor_index].clone();
617                actor.proposed(round, block.clone()).await;
618                actor.verified(round, block.clone()).await;
619
620                // Wait for the block to be broadcast, but due to jitter, we may or may not receive
621                // the block before continuing.
622                context.sleep(LINK.latency).await;
623
624                // Notarize block by the validator that broadcasted it
625                let proposal = Proposal {
626                    round,
627                    parent: View::new(height.checked_sub(1).unwrap()),
628                    payload: block.digest(),
629                };
630                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
631                actor
632                    .report(Activity::Notarization(notarization.clone()))
633                    .await;
634
635                // Finalize block by all validators except for the first.
636                let fin = make_finalization(proposal, &schemes, QUORUM);
637                for actor in actors.iter_mut() {
638                    actor.report(Activity::Finalization(fin.clone())).await;
639                }
640            }
641
642            // Check that all applications (except for the first) received all blocks.
643            let mut finished = false;
644            while !finished {
645                // Avoid a busy loop
646                context.sleep(Duration::from_secs(1)).await;
647
648                // If not all validators have finished, try again
649                finished = true;
650                for app in applications.values().skip(1) {
651                    if app.blocks().len() != NUM_BLOCKS as usize {
652                        finished = false;
653                        break;
654                    }
655                    let Some((height, _)) = app.tip() else {
656                        finished = false;
657                        break;
658                    };
659                    if height < NUM_BLOCKS {
660                        finished = false;
661                        break;
662                    }
663                }
664            }
665
666            // Create the first validator now that all blocks have been finalized by the others.
667            let validator = participants.first().unwrap();
668            let (app, mut actor, _processed_height) = setup_validator(
669                context.with_label("validator_0"),
670                &mut oracle,
671                validator.clone(),
672                ConstantProvider::new(schemes[0].clone()),
673            )
674            .await;
675
676            // Add links between all peers, including the first.
677            setup_network_links(&mut oracle, &participants, LINK).await;
678
679            const NEW_SYNC_FLOOR: u64 = 100;
680            let second_actor = &mut actors[1];
681            let latest_finalization = second_actor.get_finalization(NUM_BLOCKS).await.unwrap();
682
683            // Set the sync height floor of the first actor to block #100.
684            actor.set_floor(NEW_SYNC_FLOOR).await;
685
686            // Notify the first actor of the latest finalization to the first actor to trigger backfill.
687            // The sync should only reach the sync height floor.
688            actor
689                .report(Activity::Finalization(latest_finalization))
690                .await;
691
692            // Wait until the first actor has backfilled to the sync height floor.
693            let mut finished = false;
694            while !finished {
695                // Avoid a busy loop
696                context.sleep(Duration::from_secs(1)).await;
697
698                finished = true;
699                if app.blocks().len() != (NUM_BLOCKS - NEW_SYNC_FLOOR) as usize {
700                    finished = false;
701                    continue;
702                }
703                let Some((height, _)) = app.tip() else {
704                    finished = false;
705                    continue;
706                };
707                if height < NUM_BLOCKS {
708                    finished = false;
709                    continue;
710                }
711            }
712
713            // Check that the first actor has blocks from NEW_SYNC_FLOOR onward, but not before.
714            for height in 1..=NUM_BLOCKS {
715                let block = actor.get_block(Identifier::Height(height)).await;
716                if height <= NEW_SYNC_FLOOR {
717                    assert!(block.is_none());
718                } else {
719                    assert_eq!(block.unwrap().height(), height);
720                }
721            }
722        })
723    }
724
725    #[test_traced("WARN")]
726    fn test_subscribe_basic_block_delivery() {
727        let runner = deterministic::Runner::timed(Duration::from_secs(60));
728        runner.start(|mut context| async move {
729            let mut oracle = setup_network(context.clone(), None);
730            let Fixture {
731                participants,
732                schemes,
733                ..
734            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
735
736            let mut actors = Vec::new();
737            for (i, validator) in participants.iter().enumerate() {
738                let (_application, actor, _processed_height) = setup_validator(
739                    context.with_label(&format!("validator_{i}")),
740                    &mut oracle,
741                    validator.clone(),
742                    ConstantProvider::new(schemes[i].clone()),
743                )
744                .await;
745                actors.push(actor);
746            }
747            let mut actor = actors[0].clone();
748
749            setup_network_links(&mut oracle, &participants, LINK).await;
750
751            let parent = Sha256::hash(b"");
752            let block = B::new::<Sha256>(parent, 1, 1);
753            let commitment = block.digest();
754
755            let subscription_rx = actor
756                .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment)
757                .await;
758
759            actor
760                .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
761                .await;
762
763            let proposal = Proposal {
764                round: Round::new(Epoch::new(0), View::new(1)),
765                parent: View::new(0),
766                payload: commitment,
767            };
768            let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
769            actor.report(Activity::Notarization(notarization)).await;
770
771            let finalization = make_finalization(proposal, &schemes, QUORUM);
772            actor.report(Activity::Finalization(finalization)).await;
773
774            let received_block = subscription_rx.await.unwrap();
775            assert_eq!(received_block.digest(), block.digest());
776            assert_eq!(received_block.height(), 1);
777        })
778    }
779
780    #[test_traced("WARN")]
781    fn test_subscribe_multiple_subscriptions() {
782        let runner = deterministic::Runner::timed(Duration::from_secs(60));
783        runner.start(|mut context| async move {
784            let mut oracle = setup_network(context.clone(), None);
785            let Fixture {
786                participants,
787                schemes,
788                ..
789            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
790
791            let mut actors = Vec::new();
792            for (i, validator) in participants.iter().enumerate() {
793                let (_application, actor, _processed_height) = setup_validator(
794                    context.with_label(&format!("validator_{i}")),
795                    &mut oracle,
796                    validator.clone(),
797                    ConstantProvider::new(schemes[i].clone()),
798                )
799                .await;
800                actors.push(actor);
801            }
802            let mut actor = actors[0].clone();
803
804            setup_network_links(&mut oracle, &participants, LINK).await;
805
806            let parent = Sha256::hash(b"");
807            let block1 = B::new::<Sha256>(parent, 1, 1);
808            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
809            let commitment1 = block1.digest();
810            let commitment2 = block2.digest();
811
812            let sub1_rx = actor
813                .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
814                .await;
815            let sub2_rx = actor
816                .subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
817                .await;
818            let sub3_rx = actor
819                .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
820                .await;
821
822            actor
823                .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
824                .await;
825            actor
826                .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
827                .await;
828
829            for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
830                let view = View::new(view);
831                let proposal = Proposal {
832                    round: Round::new(Epoch::zero(), view),
833                    parent: view.previous().unwrap(),
834                    payload: block.digest(),
835                };
836                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
837                actor.report(Activity::Notarization(notarization)).await;
838
839                let finalization = make_finalization(proposal, &schemes, QUORUM);
840                actor.report(Activity::Finalization(finalization)).await;
841            }
842
843            let received1_sub1 = sub1_rx.await.unwrap();
844            let received2 = sub2_rx.await.unwrap();
845            let received1_sub3 = sub3_rx.await.unwrap();
846
847            assert_eq!(received1_sub1.digest(), block1.digest());
848            assert_eq!(received2.digest(), block2.digest());
849            assert_eq!(received1_sub3.digest(), block1.digest());
850            assert_eq!(received1_sub1.height(), 1);
851            assert_eq!(received2.height(), 2);
852            assert_eq!(received1_sub3.height(), 1);
853        })
854    }
855
856    #[test_traced("WARN")]
857    fn test_subscribe_canceled_subscriptions() {
858        let runner = deterministic::Runner::timed(Duration::from_secs(60));
859        runner.start(|mut context| async move {
860            let mut oracle = setup_network(context.clone(), None);
861            let Fixture {
862                participants,
863                schemes,
864                ..
865            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
866
867            let mut actors = Vec::new();
868            for (i, validator) in participants.iter().enumerate() {
869                let (_application, actor, _processed_height) = setup_validator(
870                    context.with_label(&format!("validator_{i}")),
871                    &mut oracle,
872                    validator.clone(),
873                    ConstantProvider::new(schemes[i].clone()),
874                )
875                .await;
876                actors.push(actor);
877            }
878            let mut actor = actors[0].clone();
879
880            setup_network_links(&mut oracle, &participants, LINK).await;
881
882            let parent = Sha256::hash(b"");
883            let block1 = B::new::<Sha256>(parent, 1, 1);
884            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
885            let commitment1 = block1.digest();
886            let commitment2 = block2.digest();
887
888            let sub1_rx = actor
889                .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
890                .await;
891            let sub2_rx = actor
892                .subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
893                .await;
894
895            drop(sub1_rx);
896
897            actor
898                .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
899                .await;
900            actor
901                .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
902                .await;
903
904            for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
905                let view = View::new(view);
906                let proposal = Proposal {
907                    round: Round::new(Epoch::zero(), view),
908                    parent: view.previous().unwrap(),
909                    payload: block.digest(),
910                };
911                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
912                actor.report(Activity::Notarization(notarization)).await;
913
914                let finalization = make_finalization(proposal, &schemes, QUORUM);
915                actor.report(Activity::Finalization(finalization)).await;
916            }
917
918            let received2 = sub2_rx.await.unwrap();
919            assert_eq!(received2.digest(), block2.digest());
920            assert_eq!(received2.height(), 2);
921        })
922    }
923
924    #[test_traced("WARN")]
925    fn test_subscribe_blocks_from_different_sources() {
926        let runner = deterministic::Runner::timed(Duration::from_secs(60));
927        runner.start(|mut context| async move {
928            let mut oracle = setup_network(context.clone(), None);
929            let Fixture {
930                participants,
931                schemes,
932                ..
933            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
934
935            let mut actors = Vec::new();
936            for (i, validator) in participants.iter().enumerate() {
937                let (_application, actor, _processed_height) = setup_validator(
938                    context.with_label(&format!("validator_{i}")),
939                    &mut oracle,
940                    validator.clone(),
941                    ConstantProvider::new(schemes[i].clone()),
942                )
943                .await;
944                actors.push(actor);
945            }
946            let mut actor = actors[0].clone();
947
948            setup_network_links(&mut oracle, &participants, LINK).await;
949
950            let parent = Sha256::hash(b"");
951            let block1 = B::new::<Sha256>(parent, 1, 1);
952            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
953            let block3 = B::new::<Sha256>(block2.digest(), 3, 3);
954            let block4 = B::new::<Sha256>(block3.digest(), 4, 4);
955            let block5 = B::new::<Sha256>(block4.digest(), 5, 5);
956
957            let sub1_rx = actor.subscribe(None, block1.digest()).await;
958            let sub2_rx = actor.subscribe(None, block2.digest()).await;
959            let sub3_rx = actor.subscribe(None, block3.digest()).await;
960            let sub4_rx = actor.subscribe(None, block4.digest()).await;
961            let sub5_rx = actor.subscribe(None, block5.digest()).await;
962
963            // Block1: Broadcasted by the actor
964            actor
965                .proposed(Round::new(Epoch::zero(), View::new(1)), block1.clone())
966                .await;
967            context.sleep(Duration::from_millis(20)).await;
968
969            // Block1: delivered
970            let received1 = sub1_rx.await.unwrap();
971            assert_eq!(received1.digest(), block1.digest());
972            assert_eq!(received1.height(), 1);
973
974            // Block2: Verified by the actor
975            actor
976                .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
977                .await;
978
979            // Block2: delivered
980            let received2 = sub2_rx.await.unwrap();
981            assert_eq!(received2.digest(), block2.digest());
982            assert_eq!(received2.height(), 2);
983
984            // Block3: Notarized by the actor
985            let proposal3 = Proposal {
986                round: Round::new(Epoch::new(0), View::new(3)),
987                parent: View::new(2),
988                payload: block3.digest(),
989            };
990            let notarization3 = make_notarization(proposal3.clone(), &schemes, QUORUM);
991            actor.report(Activity::Notarization(notarization3)).await;
992            actor
993                .verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
994                .await;
995
996            // Block3: delivered
997            let received3 = sub3_rx.await.unwrap();
998            assert_eq!(received3.digest(), block3.digest());
999            assert_eq!(received3.height(), 3);
1000
1001            // Block4: Finalized by the actor
1002            let finalization4 = make_finalization(
1003                Proposal {
1004                    round: Round::new(Epoch::new(0), View::new(4)),
1005                    parent: View::new(3),
1006                    payload: block4.digest(),
1007                },
1008                &schemes,
1009                QUORUM,
1010            );
1011            actor.report(Activity::Finalization(finalization4)).await;
1012            actor
1013                .verified(Round::new(Epoch::new(0), View::new(4)), block4.clone())
1014                .await;
1015
1016            // Block4: delivered
1017            let received4 = sub4_rx.await.unwrap();
1018            assert_eq!(received4.digest(), block4.digest());
1019            assert_eq!(received4.height(), 4);
1020
1021            // Block5: Broadcasted by a remote node (different actor)
1022            let remote_actor = &mut actors[1].clone();
1023            remote_actor
1024                .proposed(Round::new(Epoch::zero(), View::new(5)), block5.clone())
1025                .await;
1026            context.sleep(Duration::from_millis(20)).await;
1027
1028            // Block5: delivered
1029            let received5 = sub5_rx.await.unwrap();
1030            assert_eq!(received5.digest(), block5.digest());
1031            assert_eq!(received5.height(), 5);
1032        })
1033    }
1034
1035    #[test_traced("WARN")]
1036    fn test_get_info_basic_queries_present_and_missing() {
1037        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1038        runner.start(|mut context| async move {
1039            let mut oracle = setup_network(context.clone(), None);
1040            let Fixture {
1041                participants,
1042                schemes,
1043                ..
1044            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1045
1046            // Single validator actor
1047            let me = participants[0].clone();
1048            let (_application, mut actor, _processed_height) = setup_validator(
1049                context.with_label("validator_0"),
1050                &mut oracle,
1051                me,
1052                ConstantProvider::new(schemes[0].clone()),
1053            )
1054            .await;
1055
1056            // Initially, no latest
1057            assert!(actor.get_info(Identifier::Latest).await.is_none());
1058
1059            // Before finalization, specific height returns None
1060            assert!(actor.get_info(1).await.is_none());
1061
1062            // Create and verify a block, then finalize it
1063            let parent = Sha256::hash(b"");
1064            let block = B::new::<Sha256>(parent, 1, 1);
1065            let digest = block.digest();
1066            let round = Round::new(Epoch::new(0), View::new(1));
1067            actor.verified(round, block.clone()).await;
1068
1069            let proposal = Proposal {
1070                round,
1071                parent: View::new(0),
1072                payload: digest,
1073            };
1074            let finalization = make_finalization(proposal, &schemes, QUORUM);
1075            actor.report(Activity::Finalization(finalization)).await;
1076
1077            // Latest should now be the finalized block
1078            assert_eq!(actor.get_info(Identifier::Latest).await, Some((1, digest)));
1079
1080            // Height 1 now present
1081            assert_eq!(actor.get_info(1).await, Some((1, digest)));
1082
1083            // Commitment should map to its height
1084            assert_eq!(actor.get_info(&digest).await, Some((1, digest)));
1085
1086            // Missing height
1087            assert!(actor.get_info(2).await.is_none());
1088
1089            // Missing commitment
1090            let missing = Sha256::hash(b"missing");
1091            assert!(actor.get_info(&missing).await.is_none());
1092        })
1093    }
1094
1095    #[test_traced("WARN")]
1096    fn test_get_info_latest_progression_multiple_finalizations() {
1097        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1098        runner.start(|mut context| async move {
1099            let mut oracle = setup_network(context.clone(), None);
1100            let Fixture {
1101                participants,
1102                schemes,
1103                ..
1104            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1105
1106            // Single validator actor
1107            let me = participants[0].clone();
1108            let (_application, mut actor, _processed_height) = setup_validator(
1109                context.with_label("validator_0"),
1110                &mut oracle,
1111                me,
1112                ConstantProvider::new(schemes[0].clone()),
1113            )
1114            .await;
1115
1116            // Initially none
1117            assert!(actor.get_info(Identifier::Latest).await.is_none());
1118
1119            // Build and finalize heights 1..=3
1120            let parent0 = Sha256::hash(b"");
1121            let block1 = B::new::<Sha256>(parent0, 1, 1);
1122            let d1 = block1.digest();
1123            actor
1124                .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
1125                .await;
1126            let f1 = make_finalization(
1127                Proposal {
1128                    round: Round::new(Epoch::new(0), View::new(1)),
1129                    parent: View::new(0),
1130                    payload: d1,
1131                },
1132                &schemes,
1133                QUORUM,
1134            );
1135            actor.report(Activity::Finalization(f1)).await;
1136            let latest = actor.get_info(Identifier::Latest).await;
1137            assert_eq!(latest, Some((1, d1)));
1138
1139            let block2 = B::new::<Sha256>(d1, 2, 2);
1140            let d2 = block2.digest();
1141            actor
1142                .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1143                .await;
1144            let f2 = make_finalization(
1145                Proposal {
1146                    round: Round::new(Epoch::new(0), View::new(2)),
1147                    parent: View::new(1),
1148                    payload: d2,
1149                },
1150                &schemes,
1151                QUORUM,
1152            );
1153            actor.report(Activity::Finalization(f2)).await;
1154            let latest = actor.get_info(Identifier::Latest).await;
1155            assert_eq!(latest, Some((2, d2)));
1156
1157            let block3 = B::new::<Sha256>(d2, 3, 3);
1158            let d3 = block3.digest();
1159            actor
1160                .verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
1161                .await;
1162            let f3 = make_finalization(
1163                Proposal {
1164                    round: Round::new(Epoch::new(0), View::new(3)),
1165                    parent: View::new(2),
1166                    payload: d3,
1167                },
1168                &schemes,
1169                QUORUM,
1170            );
1171            actor.report(Activity::Finalization(f3)).await;
1172            let latest = actor.get_info(Identifier::Latest).await;
1173            assert_eq!(latest, Some((3, d3)));
1174        })
1175    }
1176
1177    #[test_traced("WARN")]
1178    fn test_get_block_by_height_and_latest() {
1179        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1180        runner.start(|mut context| async move {
1181            let mut oracle = setup_network(context.clone(), None);
1182            let Fixture {
1183                participants,
1184                schemes,
1185                ..
1186            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1187
1188            let me = participants[0].clone();
1189            let (application, mut actor, _processed_height) = setup_validator(
1190                context.with_label("validator_0"),
1191                &mut oracle,
1192                me,
1193                ConstantProvider::new(schemes[0].clone()),
1194            )
1195            .await;
1196
1197            // Before any finalization, GetBlock::Latest should be None
1198            let latest_block = actor.get_block(Identifier::Latest).await;
1199            assert!(latest_block.is_none());
1200            assert!(application.tip().is_none());
1201
1202            // Finalize a block at height 1
1203            let parent = Sha256::hash(b"");
1204            let block = B::new::<Sha256>(parent, 1, 1);
1205            let commitment = block.digest();
1206            let round = Round::new(Epoch::new(0), View::new(1));
1207            actor.verified(round, block.clone()).await;
1208            let proposal = Proposal {
1209                round,
1210                parent: View::new(0),
1211                payload: commitment,
1212            };
1213            let finalization = make_finalization(proposal, &schemes, QUORUM);
1214            actor.report(Activity::Finalization(finalization)).await;
1215
1216            // Get by height
1217            let by_height = actor.get_block(1).await.expect("missing block by height");
1218            assert_eq!(by_height.height(), 1);
1219            assert_eq!(by_height.digest(), commitment);
1220            assert_eq!(application.tip(), Some((1, commitment)));
1221
1222            // Get by latest
1223            let by_latest = actor
1224                .get_block(Identifier::Latest)
1225                .await
1226                .expect("missing block by latest");
1227            assert_eq!(by_latest.height(), 1);
1228            assert_eq!(by_latest.digest(), commitment);
1229
1230            // Missing height
1231            let by_height = actor.get_block(2).await;
1232            assert!(by_height.is_none());
1233        })
1234    }
1235
1236    #[test_traced("WARN")]
1237    fn test_get_block_by_commitment_from_sources_and_missing() {
1238        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1239        runner.start(|mut context| async move {
1240            let mut oracle = setup_network(context.clone(), None);
1241            let Fixture {
1242                participants,
1243                schemes,
1244                ..
1245            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1246
1247            let me = participants[0].clone();
1248            let (_application, mut actor, _processed_height) = setup_validator(
1249                context.with_label("validator_0"),
1250                &mut oracle,
1251                me,
1252                ConstantProvider::new(schemes[0].clone()),
1253            )
1254            .await;
1255
1256            // 1) From cache via verified
1257            let parent = Sha256::hash(b"");
1258            let ver_block = B::new::<Sha256>(parent, 1, 1);
1259            let ver_commitment = ver_block.digest();
1260            let round1 = Round::new(Epoch::new(0), View::new(1));
1261            actor.verified(round1, ver_block.clone()).await;
1262            let got = actor
1263                .get_block(&ver_commitment)
1264                .await
1265                .expect("missing block from cache");
1266            assert_eq!(got.digest(), ver_commitment);
1267
1268            // 2) From finalized archive
1269            let fin_block = B::new::<Sha256>(ver_commitment, 2, 2);
1270            let fin_commitment = fin_block.digest();
1271            let round2 = Round::new(Epoch::new(0), View::new(2));
1272            actor.verified(round2, fin_block.clone()).await;
1273            let proposal = Proposal {
1274                round: round2,
1275                parent: View::new(1),
1276                payload: fin_commitment,
1277            };
1278            let finalization = make_finalization(proposal, &schemes, QUORUM);
1279            actor.report(Activity::Finalization(finalization)).await;
1280            let got = actor
1281                .get_block(&fin_commitment)
1282                .await
1283                .expect("missing block from finalized archive");
1284            assert_eq!(got.digest(), fin_commitment);
1285            assert_eq!(got.height(), 2);
1286
1287            // 3) Missing commitment
1288            let missing = Sha256::hash(b"definitely-missing");
1289            let missing_block = actor.get_block(&missing).await;
1290            assert!(missing_block.is_none());
1291        })
1292    }
1293
1294    #[test_traced("WARN")]
1295    fn test_get_finalization_by_height() {
1296        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1297        runner.start(|mut context| async move {
1298            let mut oracle = setup_network(context.clone(), None);
1299            let Fixture {
1300                participants,
1301                schemes,
1302                ..
1303            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1304
1305            let me = participants[0].clone();
1306            let (_application, mut actor, _processed_height) = setup_validator(
1307                context.with_label("validator_0"),
1308                &mut oracle,
1309                me,
1310                ConstantProvider::new(schemes[0].clone()),
1311            )
1312            .await;
1313
1314            // Before any finalization, get_finalization should be None
1315            let finalization = actor.get_finalization(1).await;
1316            assert!(finalization.is_none());
1317
1318            // Finalize a block at height 1
1319            let parent = Sha256::hash(b"");
1320            let block = B::new::<Sha256>(parent, 1, 1);
1321            let commitment = block.digest();
1322            let round = Round::new(Epoch::new(0), View::new(1));
1323            actor.verified(round, block.clone()).await;
1324            let proposal = Proposal {
1325                round,
1326                parent: View::new(0),
1327                payload: commitment,
1328            };
1329            let finalization = make_finalization(proposal, &schemes, QUORUM);
1330            actor.report(Activity::Finalization(finalization)).await;
1331
1332            // Get finalization by height
1333            let finalization = actor
1334                .get_finalization(1)
1335                .await
1336                .expect("missing finalization by height");
1337            assert_eq!(finalization.proposal.parent, View::new(0));
1338            assert_eq!(
1339                finalization.proposal.round,
1340                Round::new(Epoch::new(0), View::new(1))
1341            );
1342            assert_eq!(finalization.proposal.payload, commitment);
1343
1344            assert!(actor.get_finalization(2).await.is_none());
1345        })
1346    }
1347
1348    #[test_traced("WARN")]
1349    fn test_hint_finalized_triggers_fetch() {
1350        let runner = deterministic::Runner::new(
1351            deterministic::Config::new()
1352                .with_seed(42)
1353                .with_timeout(Some(Duration::from_secs(60))),
1354        );
1355        runner.start(|mut context| async move {
1356            let mut oracle = setup_network(context.clone(), Some(3));
1357            let Fixture {
1358                participants,
1359                schemes,
1360                ..
1361            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1362
1363            // Register the initial peer set
1364            let mut manager = oracle.manager();
1365            manager
1366                .update(0, participants.clone().try_into().unwrap())
1367                .await;
1368
1369            // Set up two validators
1370            let (app0, mut actor0, _) = setup_validator(
1371                context.with_label("validator_0"),
1372                &mut oracle,
1373                participants[0].clone(),
1374                ConstantProvider::new(schemes[0].clone()),
1375            )
1376            .await;
1377            let (_app1, mut actor1, _) = setup_validator(
1378                context.with_label("validator_1"),
1379                &mut oracle,
1380                participants[1].clone(),
1381                ConstantProvider::new(schemes[1].clone()),
1382            )
1383            .await;
1384
1385            // Add links between validators
1386            setup_network_links(&mut oracle, &participants[..2], LINK).await;
1387
1388            // Validator 0: Create and finalize blocks 1-5
1389            let mut parent = Sha256::hash(b"");
1390            for i in 1..=5u64 {
1391                let block = B::new::<Sha256>(parent, i, i);
1392                let commitment = block.digest();
1393                let round = Round::new(Epoch::new(0), View::new(i));
1394
1395                actor0.verified(round, block.clone()).await;
1396                let proposal = Proposal {
1397                    round,
1398                    parent: View::new(i - 1),
1399                    payload: commitment,
1400                };
1401                let finalization = make_finalization(proposal, &schemes, QUORUM);
1402                actor0.report(Activity::Finalization(finalization)).await;
1403
1404                parent = commitment;
1405            }
1406
1407            // Wait for validator 0 to process all blocks
1408            while app0.blocks().len() < 5 {
1409                context.sleep(Duration::from_millis(10)).await;
1410            }
1411
1412            // Validator 1 should not have block 5 yet
1413            assert!(actor1.get_finalization(5).await.is_none());
1414
1415            // Validator 1: hint that block 5 is finalized, targeting validator 0
1416            actor1
1417                .hint_finalized(5, NonEmptyVec::new(participants[0].clone()))
1418                .await;
1419
1420            // Wait for the fetch to complete
1421            while actor1.get_finalization(5).await.is_none() {
1422                context.sleep(Duration::from_millis(10)).await;
1423            }
1424
1425            // Verify validator 1 now has the finalization
1426            let finalization = actor1
1427                .get_finalization(5)
1428                .await
1429                .expect("finalization should be fetched");
1430            assert_eq!(finalization.proposal.round.view(), View::new(5));
1431        })
1432    }
1433
1434    #[test_traced("WARN")]
1435    fn test_ancestry_stream() {
1436        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1437        runner.start(|mut context| async move {
1438            let mut oracle = setup_network(context.clone(), None);
1439            let Fixture {
1440                participants,
1441                schemes,
1442                ..
1443            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1444
1445            let me = participants[0].clone();
1446            let (_application, mut actor, _processed_height) = setup_validator(
1447                context.with_label("validator_0"),
1448                &mut oracle,
1449                me,
1450                ConstantProvider::new(schemes[0].clone()),
1451            )
1452            .await;
1453
1454            // Finalize blocks at heights 1-5
1455            let mut parent = Sha256::hash(b"");
1456            for i in 1..=5 {
1457                let block = B::new::<Sha256>(parent, i, i);
1458                let commitment = block.digest();
1459                let round = Round::new(Epoch::new(0), View::new(i));
1460                actor.verified(round, block.clone()).await;
1461                let proposal = Proposal {
1462                    round,
1463                    parent: View::new(i - 1),
1464                    payload: commitment,
1465                };
1466                let finalization = make_finalization(proposal, &schemes, QUORUM);
1467                actor.report(Activity::Finalization(finalization)).await;
1468
1469                parent = block.digest();
1470            }
1471
1472            // Stream from latest -> height 1
1473            let (_, commitment) = actor.get_info(Identifier::Latest).await.unwrap();
1474            let ancestry = actor.ancestry((None, commitment)).await.unwrap();
1475            let blocks = ancestry.collect::<Vec<_>>().await;
1476
1477            // Ensure correct delivery order: 5,4,3,2,1
1478            assert_eq!(blocks.len(), 5);
1479            (0..5).for_each(|i| {
1480                assert_eq!(blocks[i].height(), 5 - i as u64);
1481            });
1482        })
1483    }
1484
1485    #[test_traced("WARN")]
1486    fn test_marshaled_rejects_invalid_ancestry() {
1487        #[derive(Clone)]
1488        struct MockVerifyingApp {
1489            genesis: B,
1490        }
1491
1492        impl crate::Application<deterministic::Context> for MockVerifyingApp {
1493            type Block = B;
1494            type Context = Context<D, K>;
1495            type SigningScheme = S;
1496
1497            async fn genesis(&mut self) -> Self::Block {
1498                self.genesis.clone()
1499            }
1500
1501            async fn propose(
1502                &mut self,
1503                _context: (deterministic::Context, Self::Context),
1504                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1505            ) -> Option<Self::Block> {
1506                None
1507            }
1508        }
1509
1510        impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
1511            async fn verify(
1512                &mut self,
1513                _context: (deterministic::Context, Self::Context),
1514                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1515            ) -> bool {
1516                // Ancestry verification occurs entirely in `Marshaled`.
1517                true
1518            }
1519        }
1520
1521        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1522        runner.start(|mut context| async move {
1523            let mut oracle = setup_network(context.clone(), None);
1524            let Fixture {
1525                participants,
1526                schemes,
1527                ..
1528            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1529
1530            let me = participants[0].clone();
1531            let (_base_app, marshal, _processed_height) = setup_validator(
1532                context.with_label("validator_0"),
1533                &mut oracle,
1534                me.clone(),
1535                ConstantProvider::new(schemes[0].clone()),
1536            )
1537            .await;
1538
1539            // Create genesis block
1540            let genesis = B::new::<Sha256>(Sha256::hash(b""), 0, 0);
1541
1542            // Wrap with Marshaled verifier
1543            let mock_app = MockVerifyingApp {
1544                genesis: genesis.clone(),
1545            };
1546            let mut marshaled = Marshaled::new(
1547                context.clone(),
1548                mock_app,
1549                marshal.clone(),
1550                FixedEpocher::new(BLOCKS_PER_EPOCH),
1551            );
1552
1553            // Test case 1: Non-contiguous height
1554            //
1555            // We need both blocks in the same epoch.
1556            // With BLOCKS_PER_EPOCH=20: epoch 0 is heights 0-19, epoch 1 is heights 20-39
1557            //
1558            // Store honest parent at height 21 (epoch 1)
1559            let honest_parent =
1560                B::new::<Sha256>(genesis.commitment(), BLOCKS_PER_EPOCH.get() + 1, 1000);
1561            let parent_commitment = honest_parent.commitment();
1562            let parent_round = Round::new(Epoch::new(1), View::new(21));
1563            marshal
1564                .clone()
1565                .verified(parent_round, honest_parent.clone())
1566                .await;
1567
1568            // Byzantine proposer broadcasts malicious block at height 35
1569            // In reality this would come via buffered broadcast, but for test simplicity
1570            // we call broadcast() directly which makes it available for subscription
1571            let malicious_block =
1572                B::new::<Sha256>(parent_commitment, BLOCKS_PER_EPOCH.get() + 15, 2000);
1573            let malicious_commitment = malicious_block.commitment();
1574            marshal
1575                .clone()
1576                .proposed(
1577                    Round::new(Epoch::new(1), View::new(35)),
1578                    malicious_block.clone(),
1579                )
1580                .await;
1581
1582            // Small delay to ensure broadcast is processed
1583            context.sleep(Duration::from_millis(10)).await;
1584
1585            // Consensus determines parent should be block at height 21
1586            // and calls verify on the Marshaled automaton with a block at height 35
1587            let byzantine_context = Context {
1588                round: Round::new(Epoch::new(1), View::new(35)),
1589                leader: me.clone(),
1590                parent: (View::new(21), parent_commitment), // Consensus says parent is at height 21
1591            };
1592
1593            // Marshaled.verify() should reject the malicious block
1594            // The Marshaled verifier will:
1595            // 1. Fetch honest_parent (height 21) from marshal based on context.parent
1596            // 2. Fetch malicious_block (height 35) from marshal based on digest
1597            // 3. Validate height is contiguous (fail)
1598            // 4. Return false
1599            let verify = marshaled
1600                .verify(byzantine_context, malicious_commitment)
1601                .await;
1602
1603            assert!(
1604                !verify.await.unwrap(),
1605                "Byzantine block with non-contiguous heights should be rejected"
1606            );
1607
1608            // Test case 2: Mismatched parent commitment
1609            //
1610            // Create another malicious block with correct height but invalid parent commitment
1611            let malicious_block =
1612                B::new::<Sha256>(genesis.commitment(), BLOCKS_PER_EPOCH.get() + 2, 3000);
1613            let malicious_commitment = malicious_block.commitment();
1614            marshal
1615                .clone()
1616                .proposed(
1617                    Round::new(Epoch::new(1), View::new(22)),
1618                    malicious_block.clone(),
1619                )
1620                .await;
1621
1622            // Small delay to ensure broadcast is processed
1623            context.sleep(Duration::from_millis(10)).await;
1624
1625            // Consensus determines parent should be block at height 21
1626            // and calls verify on the Marshaled automaton with a block at height 22
1627            let byzantine_context = Context {
1628                round: Round::new(Epoch::new(1), View::new(22)),
1629                leader: me.clone(),
1630                parent: (View::new(21), parent_commitment), // Consensus says parent is at height 21
1631            };
1632
1633            // Marshaled.verify() should reject the malicious block
1634            // The Marshaled verifier will:
1635            // 1. Fetch honest_parent (height 21) from marshal based on context.parent
1636            // 2. Fetch malicious_block (height 22) from marshal based on digest
1637            // 3. Validate height is contiguous
1638            // 3. Validate parent commitment matches (fail)
1639            // 4. Return false
1640            let verify = marshaled
1641                .verify(byzantine_context, malicious_commitment)
1642                .await;
1643
1644            assert!(
1645                !verify.await.unwrap(),
1646                "Byzantine block with mismatched parent commitment should be rejected"
1647            );
1648        })
1649    }
1650
1651    #[test_traced("WARN")]
1652    fn test_finalize_same_height_different_views() {
1653        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1654        runner.start(|mut context| async move {
1655            let mut oracle = setup_network(context.clone(), None);
1656            let Fixture {
1657                participants,
1658                schemes,
1659                ..
1660            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1661
1662            // Set up two validators
1663            let mut actors = Vec::new();
1664            for (i, validator) in participants.iter().enumerate().take(2) {
1665                let (_app, actor, _processed_height) = setup_validator(
1666                    context.with_label(&format!("validator_{i}")),
1667                    &mut oracle,
1668                    validator.clone(),
1669                    ConstantProvider::new(schemes[i].clone()),
1670                )
1671                .await;
1672                actors.push(actor);
1673            }
1674
1675            // Create block at height 1
1676            let parent = Sha256::hash(b"");
1677            let block = B::new::<Sha256>(parent, 1, 1);
1678            let commitment = block.digest();
1679
1680            // Both validators verify the block
1681            actors[0]
1682                .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
1683                .await;
1684            actors[1]
1685                .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
1686                .await;
1687
1688            // Validator 0: Finalize with view 1
1689            let proposal_v1 = Proposal {
1690                round: Round::new(Epoch::new(0), View::new(1)),
1691                parent: View::new(0),
1692                payload: commitment,
1693            };
1694            let notarization_v1 = make_notarization(proposal_v1.clone(), &schemes, QUORUM);
1695            let finalization_v1 = make_finalization(proposal_v1.clone(), &schemes, QUORUM);
1696            actors[0]
1697                .report(Activity::Notarization(notarization_v1.clone()))
1698                .await;
1699            actors[0]
1700                .report(Activity::Finalization(finalization_v1.clone()))
1701                .await;
1702
1703            // Validator 1: Finalize with view 2 (simulates receiving finalization from different view)
1704            // This could happen during epoch transitions where the same block gets finalized
1705            // with different views by different validators.
1706            let proposal_v2 = Proposal {
1707                round: Round::new(Epoch::new(0), View::new(2)), // Different view
1708                parent: View::new(0),
1709                payload: commitment, // Same block
1710            };
1711            let notarization_v2 = make_notarization(proposal_v2.clone(), &schemes, QUORUM);
1712            let finalization_v2 = make_finalization(proposal_v2.clone(), &schemes, QUORUM);
1713            actors[1]
1714                .report(Activity::Notarization(notarization_v2.clone()))
1715                .await;
1716            actors[1]
1717                .report(Activity::Finalization(finalization_v2.clone()))
1718                .await;
1719
1720            // Wait for finalization processing
1721            context.sleep(Duration::from_millis(100)).await;
1722
1723            // Verify both validators stored the block correctly
1724            let block0 = actors[0].get_block(1).await.unwrap();
1725            let block1 = actors[1].get_block(1).await.unwrap();
1726            assert_eq!(block0, block);
1727            assert_eq!(block1, block);
1728
1729            // Verify both validators have finalizations stored
1730            let fin0 = actors[0].get_finalization(1).await.unwrap();
1731            let fin1 = actors[1].get_finalization(1).await.unwrap();
1732
1733            // Verify the finalizations have the expected different views
1734            assert_eq!(fin0.proposal.payload, block.commitment());
1735            assert_eq!(fin0.round().view(), View::new(1));
1736            assert_eq!(fin1.proposal.payload, block.commitment());
1737            assert_eq!(fin1.round().view(), View::new(2));
1738
1739            // Both validators can retrieve block by height
1740            assert_eq!(actors[0].get_info(1).await, Some((1, commitment)));
1741            assert_eq!(actors[1].get_info(1).await, Some((1, commitment)));
1742
1743            // Test that a validator receiving BOTH finalizations handles it correctly
1744            // (the second one should be ignored since archive ignores duplicates for same height)
1745            actors[0]
1746                .report(Activity::Finalization(finalization_v2.clone()))
1747                .await;
1748            actors[1]
1749                .report(Activity::Finalization(finalization_v1.clone()))
1750                .await;
1751            context.sleep(Duration::from_millis(100)).await;
1752
1753            // Validator 0 should still have the original finalization (v1)
1754            let fin0_after = actors[0].get_finalization(1).await.unwrap();
1755            assert_eq!(fin0_after.round().view(), View::new(1));
1756
1757            // Validator 1 should still have the original finalization (v2)
1758            let fin0_after = actors[1].get_finalization(1).await.unwrap();
1759            assert_eq!(fin0_after.round().view(), View::new(2));
1760        })
1761    }
1762
1763    #[test_traced("WARN")]
1764    fn test_init_processed_height() {
1765        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1766        runner.start(|mut context| async move {
1767            let mut oracle = setup_network(context.clone(), None);
1768            let Fixture {
1769                participants,
1770                schemes,
1771                ..
1772            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1773
1774            // Test 1: Fresh init should return processed height 0
1775            let me = participants[0].clone();
1776            let (application, mut actor, initial_height) = setup_validator(
1777                context.with_label("validator_0"),
1778                &mut oracle,
1779                me.clone(),
1780                ConstantProvider::new(schemes[0].clone()),
1781            )
1782            .await;
1783            assert_eq!(initial_height, 0);
1784
1785            // Process multiple blocks (1, 2, 3)
1786            let mut parent = Sha256::hash(b"");
1787            let mut blocks = Vec::new();
1788            for i in 1..=3 {
1789                let block = B::new::<Sha256>(parent, i, i);
1790                let commitment = block.digest();
1791                let round = Round::new(Epoch::new(0), View::new(i));
1792
1793                actor.verified(round, block.clone()).await;
1794                let proposal = Proposal {
1795                    round,
1796                    parent: View::new(i - 1),
1797                    payload: commitment,
1798                };
1799                let finalization = make_finalization(proposal, &schemes, QUORUM);
1800                actor.report(Activity::Finalization(finalization)).await;
1801
1802                blocks.push(block);
1803                parent = commitment;
1804            }
1805
1806            // Wait for application to process all blocks
1807            while application.blocks().len() < 3 {
1808                context.sleep(Duration::from_millis(10)).await;
1809            }
1810
1811            // Set marshal's processed height to 3
1812            actor.set_floor(3).await;
1813            context.sleep(Duration::from_millis(10)).await;
1814
1815            // Verify application received all blocks
1816            assert_eq!(application.blocks().len(), 3);
1817            assert_eq!(application.tip(), Some((3, blocks[2].digest())));
1818
1819            // Test 2: Restart with marshal processed height = 3
1820            let (_restart_application, _restart_actor, restart_height) = setup_validator(
1821                context.with_label("validator_0_restart"),
1822                &mut oracle,
1823                me,
1824                ConstantProvider::new(schemes[0].clone()),
1825            )
1826            .await;
1827
1828            assert_eq!(restart_height, 3);
1829        })
1830    }
1831
1832    #[test_traced("WARN")]
1833    fn test_marshaled_rejects_unsupported_epoch() {
1834        #[derive(Clone)]
1835        struct MockVerifyingApp {
1836            genesis: B,
1837        }
1838
1839        impl crate::Application<deterministic::Context> for MockVerifyingApp {
1840            type Block = B;
1841            type Context = Context<D, K>;
1842            type SigningScheme = S;
1843
1844            async fn genesis(&mut self) -> Self::Block {
1845                self.genesis.clone()
1846            }
1847
1848            async fn propose(
1849                &mut self,
1850                _context: (deterministic::Context, Self::Context),
1851                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1852            ) -> Option<Self::Block> {
1853                None
1854            }
1855        }
1856
1857        impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
1858            async fn verify(
1859                &mut self,
1860                _context: (deterministic::Context, Self::Context),
1861                _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1862            ) -> bool {
1863                true
1864            }
1865        }
1866
1867        #[derive(Clone)]
1868        struct LimitedEpocher {
1869            inner: FixedEpocher,
1870            max_epoch: u64,
1871        }
1872
1873        impl Epocher for LimitedEpocher {
1874            fn containing(&self, height: u64) -> Option<crate::types::EpochInfo> {
1875                let bounds = self.inner.containing(height)?;
1876                if bounds.epoch().get() > self.max_epoch {
1877                    None
1878                } else {
1879                    Some(bounds)
1880                }
1881            }
1882
1883            fn first(&self, epoch: Epoch) -> Option<u64> {
1884                if epoch.get() > self.max_epoch {
1885                    None
1886                } else {
1887                    self.inner.first(epoch)
1888                }
1889            }
1890
1891            fn last(&self, epoch: Epoch) -> Option<u64> {
1892                if epoch.get() > self.max_epoch {
1893                    None
1894                } else {
1895                    self.inner.last(epoch)
1896                }
1897            }
1898        }
1899
1900        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1901        runner.start(|mut context| async move {
1902            let mut oracle = setup_network(context.clone(), None);
1903            let Fixture {
1904                participants,
1905                schemes,
1906                ..
1907            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1908
1909            let me = participants[0].clone();
1910            let (_base_app, marshal, _processed_height) = setup_validator(
1911                context.with_label("validator_0"),
1912                &mut oracle,
1913                me.clone(),
1914                ConstantProvider::new(schemes[0].clone()),
1915            )
1916            .await;
1917
1918            let genesis = B::new::<Sha256>(Sha256::hash(b""), 0, 0);
1919
1920            let mock_app = MockVerifyingApp {
1921                genesis: genesis.clone(),
1922            };
1923            let limited_epocher = LimitedEpocher {
1924                inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
1925                max_epoch: 0,
1926            };
1927            let mut marshaled =
1928                Marshaled::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
1929
1930            // Create a parent block at height 19 (last block in epoch 0, which is supported)
1931            let parent = B::new::<Sha256>(genesis.commitment(), 19, 1000);
1932            let parent_commitment = parent.commitment();
1933            let parent_round = Round::new(Epoch::new(0), View::new(19));
1934            marshal.clone().verified(parent_round, parent).await;
1935
1936            // Create a block at height 20 (first block in epoch 1, which is NOT supported)
1937            let block = B::new::<Sha256>(parent_commitment, 20, 2000);
1938            let block_commitment = block.commitment();
1939            marshal
1940                .clone()
1941                .proposed(Round::new(Epoch::new(1), View::new(20)), block)
1942                .await;
1943
1944            context.sleep(Duration::from_millis(10)).await;
1945
1946            let unsupported_context = Context {
1947                round: Round::new(Epoch::new(1), View::new(20)),
1948                leader: me.clone(),
1949                parent: (View::new(19), parent_commitment),
1950            };
1951
1952            let verify = marshaled
1953                .verify(unsupported_context, block_commitment)
1954                .await;
1955
1956            assert!(
1957                !verify.await.unwrap(),
1958                "Block in unsupported epoch should be rejected"
1959            );
1960        })
1961    }
1962
1963    #[test_traced("INFO")]
1964    fn test_broadcast_caches_block() {
1965        let runner = deterministic::Runner::timed(Duration::from_secs(60));
1966        runner.start(|mut context| async move {
1967            let mut oracle = setup_network(context.clone(), None);
1968            let Fixture {
1969                participants,
1970                schemes,
1971                ..
1972            } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1973
1974            // Set up one validator
1975            let (i, validator) = participants.iter().enumerate().next().unwrap();
1976            let mut actor = setup_validator(
1977                context.with_label(&format!("validator_{i}")),
1978                &mut oracle,
1979                validator.clone(),
1980                ConstantProvider::new(schemes[i].clone()),
1981            )
1982            .await
1983            .1;
1984
1985            // Create block at height 1
1986            let parent = Sha256::hash(b"");
1987            let block = B::new::<Sha256>(parent, 1, 1);
1988            let commitment = block.digest();
1989
1990            // Broadcast the block
1991            actor
1992                .proposed(Round::new(Epoch::new(0), View::new(1)), block.clone())
1993                .await;
1994
1995            // Ensure the block is cached and retrievable; This should hit the in-memory cache
1996            // via `buffered::Mailbox`.
1997            actor
1998                .get_block(&commitment)
1999                .await
2000                .expect("block should be cached after broadcast");
2001
2002            // Restart marshal, removing any in-memory cache
2003            let mut actor = setup_validator(
2004                context.with_label(&format!("validator_{i}")),
2005                &mut oracle,
2006                validator.clone(),
2007                ConstantProvider::new(schemes[i].clone()),
2008            )
2009            .await
2010            .1;
2011
2012            // Put a notarization into the cache to re-initialize the ephemeral cache for the
2013            // first epoch. Without this, the marshal cannot determine the epoch of the block being fetched,
2014            // so it won't look to restore the cache for the epoch.
2015            let notarization = make_notarization(
2016                Proposal {
2017                    round: Round::new(Epoch::new(0), View::new(1)),
2018                    parent: View::new(0),
2019                    payload: commitment,
2020                },
2021                &schemes,
2022                QUORUM,
2023            );
2024            actor.report(Activity::Notarization(notarization)).await;
2025
2026            // Ensure the block is cached and retrievable
2027            let fetched = actor
2028                .get_block(&commitment)
2029                .await
2030                .expect("block should be cached after broadcast");
2031            assert_eq!(fetched, block);
2032        });
2033    }
2034}