commonware_consensus/marshal/
mod.rs

1//! Ordered delivery of finalized blocks.
2//!
3//! # Architecture
4//!
5//! The core of the module is the [actor::Actor]. It marshals the finalized blocks into order by:
6//!
7//! - Receiving uncertified blocks from a broadcast mechanism
8//! - Receiving notarizations and finalizations from consensus
9//! - Reconstructing a total order of finalized blocks
10//! - Providing a backfill mechanism for missing blocks
11//!
12//! The actor interacts with four main components:
13//! - [crate::Reporter]: Receives ordered, finalized blocks at-least-once
14//! - [crate::simplex]: Provides consensus messages
15//! - Application: Provides verified blocks
16//! - [commonware_broadcast::buffered]: Provides uncertified blocks received from the network
17//! - [commonware_resolver::Resolver]: Provides a backfill mechanism for missing blocks
18//!
19//! # Design
20//!
21//! ## Delivery
22//!
23//! The actor will deliver a block to the reporter at-least-once. The reporter should be prepared to
24//! handle duplicate deliveries. However the blocks will be in order.
25//!
26//! ## Finalization
27//!
28//! The actor uses a view-based model to track the state of the chain. Each view corresponds
29//! to a potential block in the chain. The actor will only finalize a block (and its ancestors)
30//! if it has a corresponding finalization from consensus.
31//!
32//! ## Backfill
33//!
34//! The actor provides a backfill mechanism for missing blocks. If the actor notices a gap in its
35//! knowledge of finalized blocks, it will request the missing blocks from its peers. This ensures
36//! that the actor can catch up to the rest of the network if it falls behind.
37//!
38//! ## Storage
39//!
40//! The actor uses a combination of prunable and immutable storage to store blocks and
41//! finalizations. Prunable storage is used to store data that is only needed for a short
42//! period of time, such as unverified blocks or notarizations. Immutable storage is used to
43//! store data that needs to be persisted indefinitely, such as finalized blocks. This allows
44//! the actor to keep its storage footprint small while still providing a full history of the
45//! chain.
46//!
47//! ## Limitations and Future Work
48//!
49//! - Only works with [crate::simplex] rather than general consensus.
50//! - Assumes at-most one notarization per view, incompatible with some consensus protocols.
51//! - No state sync supported. Will attempt to sync every block in the history of the chain.
52//! - Stores the entire history of the chain, which requires indefinite amounts of disk space.
53//! - Uses [`broadcast::buffered`](`commonware_broadcast::buffered`) for broadcasting and receiving
54//!   uncertified blocks from the network.
55
56pub mod actor;
57pub use actor::Actor;
58pub mod cache;
59pub mod config;
60pub use config::Config;
61pub mod finalizer;
62pub use finalizer::Finalizer;
63pub mod ingress;
64pub use ingress::mailbox::Mailbox;
65pub mod resolver;
66
67use crate::{simplex::signing_scheme::Scheme, types::Epoch, Block};
68use std::sync::Arc;
69
70/// Supplies the signing scheme the marshal should use for a given epoch.
71pub trait SchemeProvider: Clone + Send + Sync + 'static {
72    /// The signing scheme to provide.
73    type Scheme: Scheme;
74
75    /// Return the signing scheme that corresponds to `epoch`.
76    fn scheme(&self, epoch: Epoch) -> Option<Arc<Self::Scheme>>;
77}
78
79/// An update reported to the application, either a new finalized tip or a finalized block.
80///
81/// Finalized tips are reported as soon as known, whether or not we hold all blocks up to that height.
82/// Finalized blocks are reported to the application in monotonically increasing order (no gaps permitted).
83#[derive(Clone, Debug)]
84pub enum Update<B: Block> {
85    /// A new finalized tip.
86    Tip(u64, B::Commitment),
87    /// A new finalized block.
88    Block(B),
89}
90
91#[cfg(test)]
92pub mod mocks;
93
94#[cfg(test)]
95mod tests {
96    use super::{
97        actor,
98        config::Config,
99        mocks::{application::Application, block::Block},
100        resolver::p2p as resolver,
101        SchemeProvider,
102    };
103    use crate::{
104        marshal::ingress::mailbox::Identifier,
105        simplex::{
106            mocks::fixtures::{bls12381_threshold, Fixture},
107            signing_scheme::bls12381_threshold,
108            types::{Activity, Finalization, Finalize, Notarization, Notarize, Proposal},
109        },
110        types::{Epoch, Round},
111        utils, Block as _, Reporter,
112    };
113    use commonware_broadcast::buffered;
114    use commonware_cryptography::{
115        bls12381::primitives::variant::MinPk,
116        ed25519::PublicKey,
117        sha256::{Digest as Sha256Digest, Sha256},
118        Digestible, Hasher as _,
119    };
120    use commonware_macros::test_traced;
121    use commonware_p2p::{
122        simulated::{self, Link, Network, Oracle},
123        utils::requester,
124        Manager,
125    };
126    use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner};
127    use commonware_utils::{NZUsize, NZU64};
128    use governor::Quota;
129    use rand::{seq::SliceRandom, Rng};
130    use std::{
131        collections::BTreeMap,
132        marker::PhantomData,
133        num::{NonZeroU32, NonZeroUsize},
134        sync::Arc,
135        time::Duration,
136    };
137
138    type D = Sha256Digest;
139    type B = Block<D>;
140    type K = PublicKey;
141    type V = MinPk;
142    type S = bls12381_threshold::Scheme<K, V>;
143    type P = ConstantSchemeProvider;
144
145    #[derive(Clone)]
146    struct ConstantSchemeProvider(Arc<S>);
147    impl SchemeProvider for ConstantSchemeProvider {
148        type Scheme = S;
149
150        fn scheme(&self, _: Epoch) -> Option<Arc<S>> {
151            Some(self.0.clone())
152        }
153    }
154    impl From<S> for ConstantSchemeProvider {
155        fn from(scheme: S) -> Self {
156            Self(Arc::new(scheme))
157        }
158    }
159
160    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
161    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
162    const NAMESPACE: &[u8] = b"test";
163    const NUM_VALIDATORS: u32 = 4;
164    const QUORUM: u32 = 3;
165    const NUM_BLOCKS: u64 = 160;
166    const BLOCKS_PER_EPOCH: u64 = 20;
167    const LINK: Link = Link {
168        latency: Duration::from_millis(10),
169        jitter: Duration::from_millis(1),
170        success_rate: 1.0,
171    };
172    const UNRELIABLE_LINK: Link = Link {
173        latency: Duration::from_millis(200),
174        jitter: Duration::from_millis(50),
175        success_rate: 0.7,
176    };
177
178    async fn setup_validator(
179        context: deterministic::Context,
180        oracle: &mut Oracle<K>,
181        validator: K,
182        scheme_provider: P,
183    ) -> (
184        Application<B>,
185        crate::marshal::ingress::mailbox::Mailbox<S, B>,
186    ) {
187        let config = Config {
188            scheme_provider,
189            epoch_length: BLOCKS_PER_EPOCH,
190            mailbox_size: 100,
191            namespace: NAMESPACE.to_vec(),
192            view_retention_timeout: 10,
193            max_repair: 10,
194            block_codec_config: (),
195            partition_prefix: format!("validator-{}", validator.clone()),
196            prunable_items_per_section: NZU64!(10),
197            replay_buffer: NZUsize!(1024),
198            write_buffer: NZUsize!(1024),
199            freezer_table_initial_size: 64,
200            freezer_table_resize_frequency: 10,
201            freezer_table_resize_chunk_size: 10,
202            freezer_journal_target_size: 1024,
203            freezer_journal_compression: None,
204            freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
205            immutable_items_per_section: NZU64!(10),
206            _marker: PhantomData,
207        };
208
209        // Create the resolver
210        let mut control = oracle.control(validator.clone());
211        let backfill = control.register(1).await.unwrap();
212        let resolver_cfg = resolver::Config {
213            public_key: validator.clone(),
214            manager: oracle.clone(),
215            mailbox_size: config.mailbox_size,
216            requester_config: requester::Config {
217                me: Some(validator.clone()),
218                rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
219                initial: Duration::from_secs(1),
220                timeout: Duration::from_secs(2),
221            },
222            fetch_retry_timeout: Duration::from_millis(100),
223            priority_requests: false,
224            priority_responses: false,
225        };
226        let resolver = resolver::init(&context, resolver_cfg, backfill);
227
228        // Create a buffered broadcast engine and get its mailbox
229        let broadcast_config = buffered::Config {
230            public_key: validator.clone(),
231            mailbox_size: config.mailbox_size,
232            deque_size: 10,
233            priority: false,
234            codec_config: (),
235        };
236        let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
237        let network = control.register(2).await.unwrap();
238        broadcast_engine.start(network);
239
240        let (actor, mailbox) = actor::Actor::init(context.clone(), config).await;
241        let application = Application::<B>::default();
242
243        // Start the application
244        actor.start(application.clone(), buffer, resolver);
245
246        (application, mailbox)
247    }
248
249    fn make_finalization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Finalization<S, D> {
250        // Generate proposal signature
251        let finalizes: Vec<_> = schemes
252            .iter()
253            .take(quorum as usize)
254            .map(|scheme| Finalize::sign(scheme, NAMESPACE, proposal.clone()).unwrap())
255            .collect();
256
257        // Generate certificate signatures
258        Finalization::from_finalizes(&schemes[0], &finalizes).unwrap()
259    }
260
261    fn make_notarization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Notarization<S, D> {
262        // Generate proposal signature
263        let notarizes: Vec<_> = schemes
264            .iter()
265            .take(quorum as usize)
266            .map(|scheme| Notarize::sign(scheme, NAMESPACE, proposal.clone()).unwrap())
267            .collect();
268
269        // Generate certificate signatures
270        Notarization::from_notarizes(&schemes[0], &notarizes).unwrap()
271    }
272
273    fn setup_network(
274        context: deterministic::Context,
275        tracked_peer_sets: Option<usize>,
276    ) -> Oracle<K> {
277        let (network, oracle) = Network::new(
278            context.with_label("network"),
279            simulated::Config {
280                max_size: 1024 * 1024,
281                disconnect_on_block: true,
282                tracked_peer_sets,
283            },
284        );
285        network.start();
286        oracle
287    }
288
289    async fn setup_network_links(oracle: &mut Oracle<K>, peers: &[K], link: Link) {
290        for p1 in peers.iter() {
291            for p2 in peers.iter() {
292                if p2 == p1 {
293                    continue;
294                }
295                oracle
296                    .add_link(p1.clone(), p2.clone(), link.clone())
297                    .await
298                    .unwrap();
299            }
300        }
301    }
302
303    #[test_traced("WARN")]
304    fn test_finalize_good_links() {
305        for seed in 0..5 {
306            let result1 = finalize(seed, LINK);
307            let result2 = finalize(seed, LINK);
308
309            // Ensure determinism
310            assert_eq!(result1, result2);
311        }
312    }
313
314    #[test_traced("WARN")]
315    fn test_finalize_bad_links() {
316        for seed in 0..5 {
317            let result1 = finalize(seed, UNRELIABLE_LINK);
318            let result2 = finalize(seed, UNRELIABLE_LINK);
319
320            // Ensure determinism
321            assert_eq!(result1, result2);
322        }
323    }
324
325    fn finalize(seed: u64, link: Link) -> String {
326        let runner = deterministic::Runner::new(
327            deterministic::Config::new()
328                .with_seed(seed)
329                .with_timeout(Some(Duration::from_secs(300))),
330        );
331        runner.start(|mut context| async move {
332            let mut oracle = setup_network(context.clone(), Some(3));
333            let Fixture {
334                participants,
335                schemes,
336                ..
337            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
338
339            // Initialize applications and actors
340            let mut applications = BTreeMap::new();
341            let mut actors = Vec::new();
342
343            // Register the initial peer set.
344            oracle.update(0, participants.clone().into()).await;
345            for (i, validator) in participants.iter().enumerate() {
346                let (application, actor) = setup_validator(
347                    context.with_label(&format!("validator-{i}")),
348                    &mut oracle,
349                    validator.clone(),
350                    schemes[i].clone().into(),
351                )
352                .await;
353                applications.insert(validator.clone(), application);
354                actors.push(actor);
355            }
356
357            // Add links between all peers
358            setup_network_links(&mut oracle, &participants, link.clone()).await;
359
360            // Generate blocks, skipping the genesis block.
361            let mut blocks = Vec::<B>::new();
362            let mut parent = Sha256::hash(b"");
363            for i in 1..=NUM_BLOCKS {
364                let block = B::new::<Sha256>(parent, i, i);
365                parent = block.digest();
366                blocks.push(block);
367            }
368
369            // Broadcast and finalize blocks in random order
370            blocks.shuffle(&mut context);
371            for block in blocks.iter() {
372                // Skip genesis block
373                let height = block.height();
374                assert!(height > 0, "genesis block should not have been generated");
375
376                // Calculate the epoch and round for the block
377                let epoch = utils::epoch(BLOCKS_PER_EPOCH, height);
378                let round = Round::new(epoch, height);
379
380                // Broadcast block by one validator
381                let actor_index: usize = (height % (NUM_VALIDATORS as u64)) as usize;
382                let mut actor = actors[actor_index].clone();
383                actor.broadcast(block.clone()).await;
384                actor.verified(round, block.clone()).await;
385
386                // Wait for the block to be broadcast, but due to jitter, we may or may not receive
387                // the block before continuing.
388                context.sleep(link.latency).await;
389
390                // Notarize block by the validator that broadcasted it
391                let proposal = Proposal {
392                    round,
393                    parent: height.checked_sub(1).unwrap(),
394                    payload: block.digest(),
395                };
396                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
397                actor
398                    .report(Activity::Notarization(notarization.clone()))
399                    .await;
400
401                // Finalize block by all validators
402                let fin = make_finalization(proposal, &schemes, QUORUM);
403                for actor in actors.iter_mut() {
404                    // Always finalize 1) the last block in each epoch 2) the last block in the chain.
405                    // Otherwise, finalize randomly.
406                    if height == NUM_BLOCKS
407                        || utils::is_last_block_in_epoch(BLOCKS_PER_EPOCH, epoch).is_some()
408                        || context.gen_bool(0.2)
409                    // 20% chance to finalize randomly
410                    {
411                        actor.report(Activity::Finalization(fin.clone())).await;
412                    }
413                }
414            }
415
416            // Check that all applications received all blocks.
417            let mut finished = false;
418            while !finished {
419                // Avoid a busy loop
420                context.sleep(Duration::from_secs(1)).await;
421
422                // If not all validators have finished, try again
423                if applications.len() != NUM_VALIDATORS as usize {
424                    continue;
425                }
426                finished = true;
427                for app in applications.values() {
428                    if app.blocks().len() != NUM_BLOCKS as usize {
429                        finished = false;
430                        break;
431                    }
432                    let Some((height, _)) = app.tip() else {
433                        finished = false;
434                        break;
435                    };
436                    if height < NUM_BLOCKS {
437                        finished = false;
438                        break;
439                    }
440                }
441            }
442
443            // Return state
444            context.auditor().state()
445        })
446    }
447
448    #[test_traced("WARN")]
449    fn test_subscribe_basic_block_delivery() {
450        let runner = deterministic::Runner::timed(Duration::from_secs(60));
451        runner.start(|mut context| async move {
452            let mut oracle = setup_network(context.clone(), None);
453            let Fixture {
454                participants,
455                schemes,
456                ..
457            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
458
459            let mut actors = Vec::new();
460            for (i, validator) in participants.iter().enumerate() {
461                let (_application, actor) = setup_validator(
462                    context.with_label(&format!("validator-{i}")),
463                    &mut oracle,
464                    validator.clone(),
465                    schemes[i].clone().into(),
466                )
467                .await;
468                actors.push(actor);
469            }
470            let mut actor = actors[0].clone();
471
472            setup_network_links(&mut oracle, &participants, LINK).await;
473
474            let parent = Sha256::hash(b"");
475            let block = B::new::<Sha256>(parent, 1, 1);
476            let commitment = block.digest();
477
478            let subscription_rx = actor.subscribe(Some(Round::from((0, 1))), commitment).await;
479
480            actor.verified(Round::from((0, 1)), block.clone()).await;
481
482            let proposal = Proposal {
483                round: Round::new(0, 1),
484                parent: 0,
485                payload: commitment,
486            };
487            let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
488            actor.report(Activity::Notarization(notarization)).await;
489
490            let finalization = make_finalization(proposal, &schemes, QUORUM);
491            actor.report(Activity::Finalization(finalization)).await;
492
493            let received_block = subscription_rx.await.unwrap();
494            assert_eq!(received_block.digest(), block.digest());
495            assert_eq!(received_block.height(), 1);
496        })
497    }
498
499    #[test_traced("WARN")]
500    fn test_subscribe_multiple_subscriptions() {
501        let runner = deterministic::Runner::timed(Duration::from_secs(60));
502        runner.start(|mut context| async move {
503            let mut oracle = setup_network(context.clone(), None);
504            let Fixture {
505                participants,
506                schemes,
507                ..
508            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
509
510            let mut actors = Vec::new();
511            for (i, validator) in participants.iter().enumerate() {
512                let (_application, actor) = setup_validator(
513                    context.with_label(&format!("validator-{i}")),
514                    &mut oracle,
515                    validator.clone(),
516                    schemes[i].clone().into(),
517                )
518                .await;
519                actors.push(actor);
520            }
521            let mut actor = actors[0].clone();
522
523            setup_network_links(&mut oracle, &participants, LINK).await;
524
525            let parent = Sha256::hash(b"");
526            let block1 = B::new::<Sha256>(parent, 1, 1);
527            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
528            let commitment1 = block1.digest();
529            let commitment2 = block2.digest();
530
531            let sub1_rx = actor
532                .subscribe(Some(Round::from((0, 1))), commitment1)
533                .await;
534            let sub2_rx = actor
535                .subscribe(Some(Round::from((0, 2))), commitment2)
536                .await;
537            let sub3_rx = actor
538                .subscribe(Some(Round::from((0, 1))), commitment1)
539                .await;
540
541            actor.verified(Round::from((0, 1)), block1.clone()).await;
542            actor.verified(Round::from((0, 2)), block2.clone()).await;
543
544            for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
545                let proposal = Proposal {
546                    round: Round::new(0, view),
547                    parent: view.checked_sub(1).unwrap(),
548                    payload: block.digest(),
549                };
550                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
551                actor.report(Activity::Notarization(notarization)).await;
552
553                let finalization = make_finalization(proposal, &schemes, QUORUM);
554                actor.report(Activity::Finalization(finalization)).await;
555            }
556
557            let received1_sub1 = sub1_rx.await.unwrap();
558            let received2 = sub2_rx.await.unwrap();
559            let received1_sub3 = sub3_rx.await.unwrap();
560
561            assert_eq!(received1_sub1.digest(), block1.digest());
562            assert_eq!(received2.digest(), block2.digest());
563            assert_eq!(received1_sub3.digest(), block1.digest());
564            assert_eq!(received1_sub1.height(), 1);
565            assert_eq!(received2.height(), 2);
566            assert_eq!(received1_sub3.height(), 1);
567        })
568    }
569
570    #[test_traced("WARN")]
571    fn test_subscribe_canceled_subscriptions() {
572        let runner = deterministic::Runner::timed(Duration::from_secs(60));
573        runner.start(|mut context| async move {
574            let mut oracle = setup_network(context.clone(), None);
575            let Fixture {
576                participants,
577                schemes,
578                ..
579            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
580
581            let mut actors = Vec::new();
582            for (i, validator) in participants.iter().enumerate() {
583                let (_application, actor) = setup_validator(
584                    context.with_label(&format!("validator-{i}")),
585                    &mut oracle,
586                    validator.clone(),
587                    schemes[i].clone().into(),
588                )
589                .await;
590                actors.push(actor);
591            }
592            let mut actor = actors[0].clone();
593
594            setup_network_links(&mut oracle, &participants, LINK).await;
595
596            let parent = Sha256::hash(b"");
597            let block1 = B::new::<Sha256>(parent, 1, 1);
598            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
599            let commitment1 = block1.digest();
600            let commitment2 = block2.digest();
601
602            let sub1_rx = actor
603                .subscribe(Some(Round::from((0, 1))), commitment1)
604                .await;
605            let sub2_rx = actor
606                .subscribe(Some(Round::from((0, 2))), commitment2)
607                .await;
608
609            drop(sub1_rx);
610
611            actor.verified(Round::from((0, 1)), block1.clone()).await;
612            actor.verified(Round::from((0, 2)), block2.clone()).await;
613
614            for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
615                let proposal = Proposal {
616                    round: Round::new(0, view),
617                    parent: view.checked_sub(1).unwrap(),
618                    payload: block.digest(),
619                };
620                let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
621                actor.report(Activity::Notarization(notarization)).await;
622
623                let finalization = make_finalization(proposal, &schemes, QUORUM);
624                actor.report(Activity::Finalization(finalization)).await;
625            }
626
627            let received2 = sub2_rx.await.unwrap();
628            assert_eq!(received2.digest(), block2.digest());
629            assert_eq!(received2.height(), 2);
630        })
631    }
632
633    #[test_traced("WARN")]
634    fn test_subscribe_blocks_from_different_sources() {
635        let runner = deterministic::Runner::timed(Duration::from_secs(60));
636        runner.start(|mut context| async move {
637            let mut oracle = setup_network(context.clone(), None);
638            let Fixture {
639                participants,
640                schemes,
641                ..
642            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
643
644            let mut actors = Vec::new();
645            for (i, validator) in participants.iter().enumerate() {
646                let (_application, actor) = setup_validator(
647                    context.with_label(&format!("validator-{i}")),
648                    &mut oracle,
649                    validator.clone(),
650                    schemes[i].clone().into(),
651                )
652                .await;
653                actors.push(actor);
654            }
655            let mut actor = actors[0].clone();
656
657            setup_network_links(&mut oracle, &participants, LINK).await;
658
659            let parent = Sha256::hash(b"");
660            let block1 = B::new::<Sha256>(parent, 1, 1);
661            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
662            let block3 = B::new::<Sha256>(block2.digest(), 3, 3);
663            let block4 = B::new::<Sha256>(block3.digest(), 4, 4);
664            let block5 = B::new::<Sha256>(block4.digest(), 5, 5);
665
666            let sub1_rx = actor.subscribe(None, block1.digest()).await;
667            let sub2_rx = actor.subscribe(None, block2.digest()).await;
668            let sub3_rx = actor.subscribe(None, block3.digest()).await;
669            let sub4_rx = actor.subscribe(None, block4.digest()).await;
670            let sub5_rx = actor.subscribe(None, block5.digest()).await;
671
672            // Block1: Broadcasted by the actor
673            actor.broadcast(block1.clone()).await;
674            context.sleep(Duration::from_millis(20)).await;
675
676            // Block1: delivered
677            let received1 = sub1_rx.await.unwrap();
678            assert_eq!(received1.digest(), block1.digest());
679            assert_eq!(received1.height(), 1);
680
681            // Block2: Verified by the actor
682            actor.verified(Round::from((0, 2)), block2.clone()).await;
683
684            // Block2: delivered
685            let received2 = sub2_rx.await.unwrap();
686            assert_eq!(received2.digest(), block2.digest());
687            assert_eq!(received2.height(), 2);
688
689            // Block3: Notarized by the actor
690            let proposal3 = Proposal {
691                round: Round::new(0, 3),
692                parent: 2,
693                payload: block3.digest(),
694            };
695            let notarization3 = make_notarization(proposal3.clone(), &schemes, QUORUM);
696            actor.report(Activity::Notarization(notarization3)).await;
697            actor.verified(Round::from((0, 3)), block3.clone()).await;
698
699            // Block3: delivered
700            let received3 = sub3_rx.await.unwrap();
701            assert_eq!(received3.digest(), block3.digest());
702            assert_eq!(received3.height(), 3);
703
704            // Block4: Finalized by the actor
705            let finalization4 = make_finalization(
706                Proposal {
707                    round: Round::new(0, 4),
708                    parent: 3,
709                    payload: block4.digest(),
710                },
711                &schemes,
712                QUORUM,
713            );
714            actor.report(Activity::Finalization(finalization4)).await;
715            actor.verified(Round::from((0, 4)), block4.clone()).await;
716
717            // Block4: delivered
718            let received4 = sub4_rx.await.unwrap();
719            assert_eq!(received4.digest(), block4.digest());
720            assert_eq!(received4.height(), 4);
721
722            // Block5: Broadcasted by a remote node (different actor)
723            let remote_actor = &mut actors[1].clone();
724            remote_actor.broadcast(block5.clone()).await;
725            context.sleep(Duration::from_millis(20)).await;
726
727            // Block5: delivered
728            let received5 = sub5_rx.await.unwrap();
729            assert_eq!(received5.digest(), block5.digest());
730            assert_eq!(received5.height(), 5);
731        })
732    }
733
734    #[test_traced("WARN")]
735    fn test_get_info_basic_queries_present_and_missing() {
736        let runner = deterministic::Runner::timed(Duration::from_secs(60));
737        runner.start(|mut context| async move {
738            let mut oracle = setup_network(context.clone(), None);
739            let Fixture {
740                participants,
741                schemes,
742                ..
743            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
744
745            // Single validator actor
746            let me = participants[0].clone();
747            let (_application, mut actor) = setup_validator(
748                context.with_label("validator-0"),
749                &mut oracle,
750                me,
751                schemes[0].clone().into(),
752            )
753            .await;
754
755            // Initially, no latest
756            assert!(actor.get_info(Identifier::Latest).await.is_none());
757
758            // Before finalization, specific height returns None
759            assert!(actor.get_info(1).await.is_none());
760
761            // Create and verify a block, then finalize it
762            let parent = Sha256::hash(b"");
763            let block = B::new::<Sha256>(parent, 1, 1);
764            let digest = block.digest();
765            let round = Round::new(0, 1);
766            actor.verified(round, block.clone()).await;
767
768            let proposal = Proposal {
769                round,
770                parent: 0,
771                payload: digest,
772            };
773            let finalization = make_finalization(proposal, &schemes, QUORUM);
774            actor.report(Activity::Finalization(finalization)).await;
775
776            // Latest should now be the finalized block
777            assert_eq!(actor.get_info(Identifier::Latest).await, Some((1, digest)));
778
779            // Height 1 now present
780            assert_eq!(actor.get_info(1).await, Some((1, digest)));
781
782            // Commitment should map to its height
783            assert_eq!(actor.get_info(&digest).await, Some((1, digest)));
784
785            // Missing height
786            assert!(actor.get_info(2).await.is_none());
787
788            // Missing commitment
789            let missing = Sha256::hash(b"missing");
790            assert!(actor.get_info(&missing).await.is_none());
791        })
792    }
793
794    #[test_traced("WARN")]
795    fn test_get_info_latest_progression_multiple_finalizations() {
796        let runner = deterministic::Runner::timed(Duration::from_secs(60));
797        runner.start(|mut context| async move {
798            let mut oracle = setup_network(context.clone(), None);
799            let Fixture {
800                participants,
801                schemes,
802                ..
803            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
804
805            // Single validator actor
806            let me = participants[0].clone();
807            let (_application, mut actor) = setup_validator(
808                context.with_label("validator-0"),
809                &mut oracle,
810                me,
811                schemes[0].clone().into(),
812            )
813            .await;
814
815            // Initially none
816            assert!(actor.get_info(Identifier::Latest).await.is_none());
817
818            // Build and finalize heights 1..=3
819            let parent0 = Sha256::hash(b"");
820            let block1 = B::new::<Sha256>(parent0, 1, 1);
821            let d1 = block1.digest();
822            actor.verified(Round::new(0, 1), block1.clone()).await;
823            let f1 = make_finalization(
824                Proposal {
825                    round: Round::new(0, 1),
826                    parent: 0,
827                    payload: d1,
828                },
829                &schemes,
830                QUORUM,
831            );
832            actor.report(Activity::Finalization(f1)).await;
833            let latest = actor.get_info(Identifier::Latest).await;
834            assert_eq!(latest, Some((1, d1)));
835
836            let block2 = B::new::<Sha256>(d1, 2, 2);
837            let d2 = block2.digest();
838            actor.verified(Round::new(0, 2), block2.clone()).await;
839            let f2 = make_finalization(
840                Proposal {
841                    round: Round::new(0, 2),
842                    parent: 1,
843                    payload: d2,
844                },
845                &schemes,
846                QUORUM,
847            );
848            actor.report(Activity::Finalization(f2)).await;
849            let latest = actor.get_info(Identifier::Latest).await;
850            assert_eq!(latest, Some((2, d2)));
851
852            let block3 = B::new::<Sha256>(d2, 3, 3);
853            let d3 = block3.digest();
854            actor.verified(Round::new(0, 3), block3.clone()).await;
855            let f3 = make_finalization(
856                Proposal {
857                    round: Round::new(0, 3),
858                    parent: 2,
859                    payload: d3,
860                },
861                &schemes,
862                QUORUM,
863            );
864            actor.report(Activity::Finalization(f3)).await;
865            let latest = actor.get_info(Identifier::Latest).await;
866            assert_eq!(latest, Some((3, d3)));
867        })
868    }
869
870    #[test_traced("WARN")]
871    fn test_get_block_by_height_and_latest() {
872        let runner = deterministic::Runner::timed(Duration::from_secs(60));
873        runner.start(|mut context| async move {
874            let mut oracle = setup_network(context.clone(), None);
875            let Fixture {
876                participants,
877                schemes,
878                ..
879            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
880
881            let me = participants[0].clone();
882            let (application, mut actor) = setup_validator(
883                context.with_label("validator-0"),
884                &mut oracle,
885                me,
886                schemes[0].clone().into(),
887            )
888            .await;
889
890            // Before any finalization, GetBlock::Latest should be None
891            let latest_block = actor.get_block(Identifier::Latest).await;
892            assert!(latest_block.is_none());
893            assert!(application.tip().is_none());
894
895            // Finalize a block at height 1
896            let parent = Sha256::hash(b"");
897            let block = B::new::<Sha256>(parent, 1, 1);
898            let commitment = block.digest();
899            let round = Round::new(0, 1);
900            actor.verified(round, block.clone()).await;
901            let proposal = Proposal {
902                round,
903                parent: 0,
904                payload: commitment,
905            };
906            let finalization = make_finalization(proposal, &schemes, QUORUM);
907            actor.report(Activity::Finalization(finalization)).await;
908
909            // Get by height
910            let by_height = actor.get_block(1).await.expect("missing block by height");
911            assert_eq!(by_height.height(), 1);
912            assert_eq!(by_height.digest(), commitment);
913            assert_eq!(application.tip(), Some((1, commitment)));
914
915            // Get by latest
916            let by_latest = actor
917                .get_block(Identifier::Latest)
918                .await
919                .expect("missing block by latest");
920            assert_eq!(by_latest.height(), 1);
921            assert_eq!(by_latest.digest(), commitment);
922
923            // Missing height
924            let by_height = actor.get_block(2).await;
925            assert!(by_height.is_none());
926        })
927    }
928
929    #[test_traced("WARN")]
930    fn test_get_block_by_commitment_from_sources_and_missing() {
931        let runner = deterministic::Runner::timed(Duration::from_secs(60));
932        runner.start(|mut context| async move {
933            let mut oracle = setup_network(context.clone(), None);
934            let Fixture {
935                participants,
936                schemes,
937                ..
938            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
939
940            let me = participants[0].clone();
941            let (_application, mut actor) = setup_validator(
942                context.with_label("validator-0"),
943                &mut oracle,
944                me,
945                schemes[0].clone().into(),
946            )
947            .await;
948
949            // 1) From cache via verified
950            let parent = Sha256::hash(b"");
951            let ver_block = B::new::<Sha256>(parent, 1, 1);
952            let ver_commitment = ver_block.digest();
953            let round1 = Round::new(0, 1);
954            actor.verified(round1, ver_block.clone()).await;
955            let got = actor
956                .get_block(&ver_commitment)
957                .await
958                .expect("missing block from cache");
959            assert_eq!(got.digest(), ver_commitment);
960
961            // 2) From finalized archive
962            let fin_block = B::new::<Sha256>(ver_commitment, 2, 2);
963            let fin_commitment = fin_block.digest();
964            let round2 = Round::new(0, 2);
965            actor.verified(round2, fin_block.clone()).await;
966            let proposal = Proposal {
967                round: round2,
968                parent: 1,
969                payload: fin_commitment,
970            };
971            let finalization = make_finalization(proposal, &schemes, QUORUM);
972            actor.report(Activity::Finalization(finalization)).await;
973            let got = actor
974                .get_block(&fin_commitment)
975                .await
976                .expect("missing block from finalized archive");
977            assert_eq!(got.digest(), fin_commitment);
978            assert_eq!(got.height(), 2);
979
980            // 3) Missing commitment
981            let missing = Sha256::hash(b"definitely-missing");
982            let missing_block = actor.get_block(&missing).await;
983            assert!(missing_block.is_none());
984        })
985    }
986
987    #[test_traced("WARN")]
988    fn test_get_finalization_by_height() {
989        let runner = deterministic::Runner::timed(Duration::from_secs(60));
990        runner.start(|mut context| async move {
991            let mut oracle = setup_network(context.clone(), None);
992            let Fixture {
993                participants,
994                schemes,
995                ..
996            } = bls12381_threshold::<V, _>(&mut context, NUM_VALIDATORS);
997
998            let me = participants[0].clone();
999            let (_application, mut actor) = setup_validator(
1000                context.with_label("validator-0"),
1001                &mut oracle,
1002                me,
1003                schemes[0].clone().into(),
1004            )
1005            .await;
1006
1007            // Before any finalization, get_finalization should be None
1008            let finalization = actor.get_finalization(1).await;
1009            assert!(finalization.is_none());
1010
1011            // Finalize a block at height 1
1012            let parent = Sha256::hash(b"");
1013            let block = B::new::<Sha256>(parent, 1, 1);
1014            let commitment = block.digest();
1015            let round = Round::new(0, 1);
1016            actor.verified(round, block.clone()).await;
1017            let proposal = Proposal {
1018                round,
1019                parent: 0,
1020                payload: commitment,
1021            };
1022            let finalization = make_finalization(proposal, &schemes, QUORUM);
1023            actor.report(Activity::Finalization(finalization)).await;
1024
1025            // Get finalization by height
1026            let finalization = actor
1027                .get_finalization(1)
1028                .await
1029                .expect("missing finalization by height");
1030            assert_eq!(finalization.proposal.parent, 0);
1031            assert_eq!(finalization.proposal.round, Round::new(0, 1));
1032            assert_eq!(finalization.proposal.payload, commitment);
1033
1034            assert!(actor.get_finalization(2).await.is_none());
1035        })
1036    }
1037}