commonware_consensus/marshal/
mod.rs

1//! Ordered delivery of finalized blocks.
2//!
3//! # Architecture
4//!
5//! The core of the module is the [actor::Actor]. It marshals the finalized blocks into order by:
6//!
7//! - Receiving uncertified blocks from a broadcast mechanism
8//! - Receiving notarizations and finalizations from consensus
9//! - Reconstructing a total order of finalized blocks
10//! - Providing a backfill mechanism for missing blocks
11//!
12//! The actor interacts with four main components:
13//! - [crate::Reporter]: Receives ordered, finalized blocks at-least-once
14//! - [crate::threshold_simplex]: Provides consensus messages
15//! - Application: Provides verified blocks
16//! - [commonware_broadcast::buffered]: Provides uncertified blocks received from the network
17//! - [commonware_resolver::p2p]: Provides a backfill mechanism for missing blocks
18//!
19//! # Design
20//!
21//! ## Delivery
22//!
23//! The actor will deliver a block to the reporter at-least-once. The reporter should be prepared to
24//! handle duplicate deliveries. However the blocks will be in order.
25//!
26//! ## Finalization
27//!
28//! The actor uses a view-based model to track the state of the chain. Each view corresponds
29//! to a potential block in the chain. The actor will only finalize a block (and its ancestors)
30//! if it has a corresponding finalization from consensus.
31//!
32//! ## Backfill
33//!
34//! The actor provides a backfill mechanism for missing blocks. If the actor notices a gap in its
35//! knowledge of finalized blocks, it will request the missing blocks from its peers. This ensures
36//! that the actor can catch up to the rest of the network if it falls behind.
37//!
38//! ## Storage
39//!
40//! The actor uses a combination of prunable and immutable storage to store blocks and
41//! finalizations. Prunable storage is used to store data that is only needed for a short
42//! period of time, such as unverified blocks or notarizations. Immutable storage is used to
43//! store data that needs to be persisted indefinitely, such as finalized blocks. This allows
44//! the actor to keep its storage footprint small while still providing a full history of the
45//! chain.
46//!
47//! ## Limitations and Future Work
48//!
49//! - Only works with [crate::threshold_simplex] rather than general consensus.
50//! - Assumes at-most one notarization per view, incompatible with some consensus protocols.
51//! - No state sync supported. Will attempt to sync every block in the history of the chain.
52//! - Stores the entire history of the chain, which requires indefinite amounts of disk space.
53//! - Uses [`resolver::p2p`](`commonware_resolver::p2p`) for backfilling rather than a general
54//!   [`Resolver`](`commonware_resolver::Resolver`).
55//! - Uses [`broadcast::buffered`](`commonware_broadcast::buffered`) for broadcasting and receiving
56//!   uncertified blocks from the network.
57
58pub mod actor;
59pub use actor::Actor;
60pub mod config;
61pub use config::Config;
62pub mod finalizer;
63pub use finalizer::Finalizer;
64pub mod ingress;
65pub use ingress::mailbox::Mailbox;
66
67#[cfg(test)]
68pub mod mocks;
69
70#[cfg(test)]
71mod tests {
72    use super::{
73        actor,
74        config::Config,
75        mocks::{application::Application, block::Block},
76    };
77    use crate::{
78        threshold_simplex::types::{
79            finalize_namespace, notarize_namespace, seed_namespace, view_message, Activity,
80            Finalization, Notarization, Proposal,
81        },
82        Block as _, Reporter,
83    };
84    use commonware_broadcast::buffered;
85    use commonware_codec::Encode;
86    use commonware_cryptography::{
87        bls12381::{
88            dkg::ops::generate_shares,
89            primitives::{
90                group::Share,
91                ops::{partial_sign_message, threshold_signature_recover},
92                poly,
93                variant::{MinPk, Variant},
94            },
95        },
96        ed25519::{PrivateKey, PublicKey},
97        sha256::{Digest as Sha256Digest, Sha256},
98        Digestible, Hasher as _, PrivateKeyExt as _, Signer as _,
99    };
100    use commonware_macros::test_traced;
101    use commonware_p2p::simulated::{self, Link, Network, Oracle};
102    use commonware_resolver::p2p as resolver;
103    use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner};
104    use commonware_utils::{NZUsize, NZU64};
105    use governor::Quota;
106    use rand::{seq::SliceRandom, Rng};
107    use std::{
108        collections::BTreeMap,
109        num::{NonZeroU32, NonZeroUsize},
110        time::Duration,
111    };
112
113    type D = Sha256Digest;
114    type B = Block<D>;
115    type P = PublicKey;
116    type V = MinPk;
117    type Sh = Share;
118    type E = PrivateKey;
119
120    const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
121    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
122
123    const NAMESPACE: &[u8] = b"test";
124    const NUM_VALIDATORS: u32 = 4;
125    const QUORUM: u32 = 3;
126    const NUM_BLOCKS: u64 = 100;
127
128    async fn setup_validator(
129        context: deterministic::Context,
130        oracle: &mut Oracle<P>,
131        coordinator: resolver::mocks::Coordinator<P>,
132        secret: E,
133        identity: <V as Variant>::Public,
134    ) -> (
135        Application<B>,
136        crate::marshal::ingress::mailbox::Mailbox<V, B>,
137    ) {
138        let config = Config {
139            public_key: secret.public_key(),
140            identity,
141            coordinator,
142            mailbox_size: 100,
143            backfill_quota: Quota::per_second(NonZeroU32::new(5).unwrap()),
144            namespace: NAMESPACE.to_vec(),
145            view_retention_timeout: 10,
146            max_repair: 10,
147            codec_config: (),
148            partition_prefix: format!("validator-{}", secret.public_key()),
149            prunable_items_per_section: NZU64!(10),
150            replay_buffer: NZUsize!(1024),
151            write_buffer: NZUsize!(1024),
152            freezer_table_initial_size: 64,
153            freezer_table_resize_frequency: 10,
154            freezer_table_resize_chunk_size: 10,
155            freezer_journal_target_size: 1024,
156            freezer_journal_compression: None,
157            freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
158            immutable_items_per_section: NZU64!(10),
159        };
160
161        let (actor, mailbox) = actor::Actor::init(context.clone(), config).await;
162        let application = Application::<B>::default();
163
164        // Create a buffered broadcast engine and get its mailbox
165        let broadcast_config = buffered::Config {
166            public_key: secret.public_key(),
167            mailbox_size: 100,
168            deque_size: 10,
169            priority: false,
170            codec_config: (),
171        };
172        let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
173        let network = oracle.register(secret.public_key(), 1).await.unwrap();
174        broadcast_engine.start(network);
175
176        // Start the actor
177        let backfill = oracle.register(secret.public_key(), 2).await.unwrap();
178        actor.start(application.clone(), buffer, backfill);
179
180        (application, mailbox)
181    }
182
183    fn make_finalization(proposal: Proposal<D>, shares: &[Sh], quorum: u32) -> Finalization<V, D> {
184        let proposal_msg = proposal.encode();
185
186        // Generate proposal signature
187        let proposal_partials: Vec<_> = shares
188            .iter()
189            .take(quorum as usize)
190            .map(|s| {
191                partial_sign_message::<V>(s, Some(&finalize_namespace(NAMESPACE)), &proposal_msg)
192            })
193            .collect();
194        let proposal_signature =
195            threshold_signature_recover::<V, _>(quorum, &proposal_partials).unwrap();
196
197        // Generate seed signature (for the view number)
198        let seed_msg = view_message(proposal.view);
199        let seed_partials: Vec<_> = shares
200            .iter()
201            .take(quorum as usize)
202            .map(|s| partial_sign_message::<V>(s, Some(&seed_namespace(NAMESPACE)), &seed_msg))
203            .collect();
204        let seed_signature = threshold_signature_recover::<V, _>(quorum, &seed_partials).unwrap();
205
206        Finalization {
207            proposal,
208            proposal_signature,
209            seed_signature,
210        }
211    }
212
213    fn make_notarization(proposal: Proposal<D>, shares: &[Sh], quorum: u32) -> Notarization<V, D> {
214        let proposal_msg = proposal.encode();
215
216        // Generate proposal signature
217        let proposal_partials: Vec<_> = shares
218            .iter()
219            .take(quorum as usize)
220            .map(|s| {
221                partial_sign_message::<V>(s, Some(&notarize_namespace(NAMESPACE)), &proposal_msg)
222            })
223            .collect();
224        let proposal_signature =
225            threshold_signature_recover::<V, _>(quorum, &proposal_partials).unwrap();
226
227        // Generate seed signature (for the view number)
228        let seed_msg = view_message(proposal.view);
229        let seed_partials: Vec<_> = shares
230            .iter()
231            .take(quorum as usize)
232            .map(|s| partial_sign_message::<V>(s, Some(&seed_namespace(NAMESPACE)), &seed_msg))
233            .collect();
234        let seed_signature = threshold_signature_recover::<V, _>(quorum, &seed_partials).unwrap();
235
236        Notarization {
237            proposal,
238            proposal_signature,
239            seed_signature,
240        }
241    }
242
243    fn setup_network(context: deterministic::Context) -> Oracle<P> {
244        let (network, oracle) = Network::new(
245            context.with_label("network"),
246            simulated::Config {
247                max_size: 1024 * 1024,
248            },
249        );
250        network.start();
251        oracle
252    }
253
254    fn setup_validators_and_shares(
255        context: &mut deterministic::Context,
256    ) -> (Vec<E>, Vec<P>, <V as Variant>::Public, Vec<Sh>) {
257        let mut schemes = (0..NUM_VALIDATORS)
258            .map(|i| PrivateKey::from_seed(i as u64))
259            .collect::<Vec<_>>();
260        schemes.sort_by_key(|s| s.public_key());
261        let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
262
263        let (identity, shares) = generate_shares::<_, V>(context, None, NUM_VALIDATORS, QUORUM);
264        let identity = *poly::public::<V>(&identity);
265
266        (schemes, peers, identity, shares)
267    }
268
269    async fn setup_network_links(oracle: &mut Oracle<P>, peers: &[P], link: Link) {
270        for p1 in peers.iter() {
271            for p2 in peers.iter() {
272                if p2 == p1 {
273                    continue;
274                }
275                oracle
276                    .add_link(p1.clone(), p2.clone(), link.clone())
277                    .await
278                    .unwrap();
279            }
280        }
281    }
282
283    #[test_traced("WARN")]
284    fn test_finalize_good_links() {
285        let link = Link {
286            latency: Duration::from_millis(100),
287            jitter: Duration::from_millis(1),
288            success_rate: 1.0,
289        };
290        for seed in 0..5 {
291            let result1 = finalize(seed, link.clone());
292            let result2 = finalize(seed, link.clone());
293
294            // Ensure determinism
295            assert_eq!(result1, result2);
296        }
297    }
298
299    #[test_traced("WARN")]
300    fn test_finalize_bad_links() {
301        let link = Link {
302            latency: Duration::from_millis(200),
303            jitter: Duration::from_millis(50),
304            success_rate: 0.7,
305        };
306        for seed in 0..5 {
307            let result1 = finalize(seed, link.clone());
308            let result2 = finalize(seed, link.clone());
309
310            // Ensure determinism
311            assert_eq!(result1, result2);
312        }
313    }
314
315    fn finalize(seed: u64, link: Link) -> String {
316        let runner = deterministic::Runner::new(
317            deterministic::Config::new()
318                .with_seed(seed)
319                .with_timeout(Some(Duration::from_secs(300))),
320        );
321        runner.start(|mut context| async move {
322            let mut oracle = setup_network(context.clone());
323            let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
324
325            // Initialize applications and actors
326            let mut applications = BTreeMap::new();
327            let mut actors = Vec::new();
328            let coordinator = resolver::mocks::Coordinator::new(peers.clone());
329
330            for (i, secret) in schemes.iter().enumerate() {
331                let (application, actor) = setup_validator(
332                    context.with_label(&format!("validator-{i}")),
333                    &mut oracle,
334                    coordinator.clone(),
335                    secret.clone(),
336                    identity,
337                )
338                .await;
339                applications.insert(peers[i].clone(), application);
340                actors.push(actor);
341            }
342
343            // Add links between all peers
344            setup_network_links(&mut oracle, &peers, link.clone()).await;
345
346            // Generate blocks, skipping the genesis block.
347            let mut blocks = Vec::<B>::new();
348            let mut parent = Sha256::hash(b"");
349            for i in 1..=NUM_BLOCKS {
350                let block = B::new::<Sha256>(parent, i, i);
351                parent = block.digest();
352                blocks.push(block);
353            }
354
355            // Broadcast and finalize blocks in random order
356            blocks.shuffle(&mut context);
357            for block in blocks.iter() {
358                // Skip genesis block
359                let height = block.height();
360                assert!(height > 0, "genesis block should not have been generated");
361
362                // Broadcast block by one validator
363                let actor_index: usize = (height % (NUM_VALIDATORS as u64)) as usize;
364                let mut actor = actors[actor_index].clone();
365                actor.broadcast(block.clone()).await;
366                actor.verified(height, block.clone()).await;
367
368                // Wait for the block to be broadcast, but due to jitter, we may or may not receive
369                // the block before continuing.
370                context.sleep(link.latency).await;
371
372                // Notarize block by the validator that broadcasted it
373                let proposal = Proposal {
374                    view: height,
375                    parent: height.checked_sub(1).unwrap(),
376                    payload: block.digest(),
377                };
378                let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
379                actor
380                    .report(Activity::Notarization(notarization.clone()))
381                    .await;
382
383                // Finalize block by all validators
384                let fin = make_finalization(proposal, &shares, QUORUM);
385                for actor in actors.iter_mut() {
386                    // Always finalize the last block. Otherwise, finalize randomly.
387                    if height == NUM_BLOCKS || context.gen_bool(0.2) {
388                        actor.report(Activity::Finalization(fin.clone())).await;
389                    }
390                }
391            }
392
393            // Check that all applications received all blocks.
394            let mut finished = false;
395            while !finished {
396                // Avoid a busy loop
397                context.sleep(Duration::from_secs(1)).await;
398
399                // If not all validators have finished, try again
400                if applications.len() != NUM_VALIDATORS as usize {
401                    continue;
402                }
403                finished = true;
404                for app in applications.values() {
405                    if app.blocks().len() != NUM_BLOCKS as usize {
406                        finished = false;
407                        break;
408                    }
409                }
410            }
411
412            // Return state
413            context.auditor().state()
414        })
415    }
416
417    #[test_traced("WARN")]
418    fn test_subscribe_basic_block_delivery() {
419        let runner = deterministic::Runner::timed(Duration::from_secs(60));
420        runner.start(|mut context| async move {
421            let mut oracle = setup_network(context.clone());
422            let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
423            let coordinator = resolver::mocks::Coordinator::new(peers.clone());
424
425            let mut actors = Vec::new();
426            for (i, secret) in schemes.iter().enumerate() {
427                let (_application, actor) = setup_validator(
428                    context.with_label(&format!("validator-{i}")),
429                    &mut oracle,
430                    coordinator.clone(),
431                    secret.clone(),
432                    identity,
433                )
434                .await;
435                actors.push(actor);
436            }
437            let mut actor = actors[0].clone();
438
439            let link = Link {
440                latency: Duration::from_millis(10),
441                jitter: Duration::from_millis(1),
442                success_rate: 1.0,
443            };
444            setup_network_links(&mut oracle, &peers, link).await;
445
446            let parent = Sha256::hash(b"");
447            let block = B::new::<Sha256>(parent, 1, 1);
448            let commitment = block.digest();
449
450            let subscription_rx = actor.subscribe(Some(1), commitment).await;
451
452            actor.verified(1, block.clone()).await;
453
454            let proposal = Proposal {
455                view: 1,
456                parent: 0,
457                payload: commitment,
458            };
459            let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
460            actor.report(Activity::Notarization(notarization)).await;
461
462            let finalization = make_finalization(proposal, &shares, QUORUM);
463            actor.report(Activity::Finalization(finalization)).await;
464
465            let received_block = subscription_rx.await.unwrap();
466            assert_eq!(received_block.digest(), block.digest());
467            assert_eq!(received_block.height(), 1);
468        })
469    }
470
471    #[test_traced("WARN")]
472    fn test_subscribe_multiple_subscriptions() {
473        let runner = deterministic::Runner::timed(Duration::from_secs(60));
474        runner.start(|mut context| async move {
475            let mut oracle = setup_network(context.clone());
476            let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
477            let coordinator = resolver::mocks::Coordinator::new(peers.clone());
478
479            let mut actors = Vec::new();
480            for (i, secret) in schemes.iter().enumerate() {
481                let (_application, actor) = setup_validator(
482                    context.with_label(&format!("validator-{i}")),
483                    &mut oracle,
484                    coordinator.clone(),
485                    secret.clone(),
486                    identity,
487                )
488                .await;
489                actors.push(actor);
490            }
491            let mut actor = actors[0].clone();
492
493            let link = Link {
494                latency: Duration::from_millis(10),
495                jitter: Duration::from_millis(1),
496                success_rate: 1.0,
497            };
498            setup_network_links(&mut oracle, &peers, link).await;
499
500            let parent = Sha256::hash(b"");
501            let block1 = B::new::<Sha256>(parent, 1, 1);
502            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
503            let commitment1 = block1.digest();
504            let commitment2 = block2.digest();
505
506            let sub1_rx = actor.subscribe(Some(1), commitment1).await;
507            let sub2_rx = actor.subscribe(Some(2), commitment2).await;
508            let sub3_rx = actor.subscribe(Some(1), commitment1).await;
509
510            actor.verified(1, block1.clone()).await;
511            actor.verified(2, block2.clone()).await;
512
513            for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
514                let proposal = Proposal {
515                    view,
516                    parent: view.checked_sub(1).unwrap(),
517                    payload: block.digest(),
518                };
519                let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
520                actor.report(Activity::Notarization(notarization)).await;
521
522                let finalization = make_finalization(proposal, &shares, QUORUM);
523                actor.report(Activity::Finalization(finalization)).await;
524            }
525
526            let received1_sub1 = sub1_rx.await.unwrap();
527            let received2 = sub2_rx.await.unwrap();
528            let received1_sub3 = sub3_rx.await.unwrap();
529
530            assert_eq!(received1_sub1.digest(), block1.digest());
531            assert_eq!(received2.digest(), block2.digest());
532            assert_eq!(received1_sub3.digest(), block1.digest());
533            assert_eq!(received1_sub1.height(), 1);
534            assert_eq!(received2.height(), 2);
535            assert_eq!(received1_sub3.height(), 1);
536        })
537    }
538
539    #[test_traced("WARN")]
540    fn test_subscribe_canceled_subscriptions() {
541        let runner = deterministic::Runner::timed(Duration::from_secs(60));
542        runner.start(|mut context| async move {
543            let mut oracle = setup_network(context.clone());
544            let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
545            let coordinator = resolver::mocks::Coordinator::new(peers.clone());
546
547            let mut actors = Vec::new();
548            for (i, secret) in schemes.iter().enumerate() {
549                let (_application, actor) = setup_validator(
550                    context.with_label(&format!("validator-{i}")),
551                    &mut oracle,
552                    coordinator.clone(),
553                    secret.clone(),
554                    identity,
555                )
556                .await;
557                actors.push(actor);
558            }
559            let mut actor = actors[0].clone();
560
561            let link = Link {
562                latency: Duration::from_millis(10),
563                jitter: Duration::from_millis(1),
564                success_rate: 1.0,
565            };
566            setup_network_links(&mut oracle, &peers, link).await;
567
568            let parent = Sha256::hash(b"");
569            let block1 = B::new::<Sha256>(parent, 1, 1);
570            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
571            let commitment1 = block1.digest();
572            let commitment2 = block2.digest();
573
574            let sub1_rx = actor.subscribe(Some(1), commitment1).await;
575            let sub2_rx = actor.subscribe(Some(2), commitment2).await;
576
577            drop(sub1_rx);
578
579            actor.verified(1, block1.clone()).await;
580            actor.verified(2, block2.clone()).await;
581
582            for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
583                let proposal = Proposal {
584                    view,
585                    parent: view.checked_sub(1).unwrap(),
586                    payload: block.digest(),
587                };
588                let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
589                actor.report(Activity::Notarization(notarization)).await;
590
591                let finalization = make_finalization(proposal, &shares, QUORUM);
592                actor.report(Activity::Finalization(finalization)).await;
593            }
594
595            let received2 = sub2_rx.await.unwrap();
596            assert_eq!(received2.digest(), block2.digest());
597            assert_eq!(received2.height(), 2);
598        })
599    }
600
601    #[test_traced("WARN")]
602    fn test_subscribe_blocks_from_different_sources() {
603        let runner = deterministic::Runner::timed(Duration::from_secs(60));
604        runner.start(|mut context| async move {
605            let mut oracle = setup_network(context.clone());
606            let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
607            let coordinator = resolver::mocks::Coordinator::new(peers.clone());
608
609            let mut actors = Vec::new();
610            for (i, secret) in schemes.iter().enumerate() {
611                let (_application, actor) = setup_validator(
612                    context.with_label(&format!("validator-{i}")),
613                    &mut oracle,
614                    coordinator.clone(),
615                    secret.clone(),
616                    identity,
617                )
618                .await;
619                actors.push(actor);
620            }
621            let mut actor = actors[0].clone();
622
623            let link = Link {
624                latency: Duration::from_millis(10),
625                jitter: Duration::from_millis(1),
626                success_rate: 1.0,
627            };
628            setup_network_links(&mut oracle, &peers, link).await;
629
630            let parent = Sha256::hash(b"");
631            let block1 = B::new::<Sha256>(parent, 1, 1);
632            let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
633            let block3 = B::new::<Sha256>(block2.digest(), 3, 3);
634            let block4 = B::new::<Sha256>(block3.digest(), 4, 4);
635            let block5 = B::new::<Sha256>(block4.digest(), 5, 5);
636
637            let sub1_rx = actor.subscribe(Some(1), block1.digest()).await;
638            let sub2_rx = actor.subscribe(Some(2), block2.digest()).await;
639            let sub3_rx = actor.subscribe(Some(3), block3.digest()).await;
640            let sub4_rx = actor.subscribe(Some(4), block4.digest()).await;
641            let sub5_rx = actor.subscribe(Some(5), block5.digest()).await;
642
643            // Block1: Broadcasted by the actor
644            actor.broadcast(block1.clone()).await;
645            context.sleep(Duration::from_millis(20)).await;
646
647            // Block1: delivered
648            let received1 = sub1_rx.await.unwrap();
649            assert_eq!(received1.digest(), block1.digest());
650            assert_eq!(received1.height(), 1);
651
652            // Block2: Verified by the actor
653            actor.verified(2, block2.clone()).await;
654
655            // Block2: delivered
656            let received2 = sub2_rx.await.unwrap();
657            assert_eq!(received2.digest(), block2.digest());
658            assert_eq!(received2.height(), 2);
659
660            // Block3: Notarized by the actor
661            let proposal3 = Proposal {
662                view: 3,
663                parent: 2,
664                payload: block3.digest(),
665            };
666            let notarization3 = make_notarization(proposal3.clone(), &shares, QUORUM);
667            actor.report(Activity::Notarization(notarization3)).await;
668            actor.verified(3, block3.clone()).await;
669
670            // Block3: delivered
671            let received3 = sub3_rx.await.unwrap();
672            assert_eq!(received3.digest(), block3.digest());
673            assert_eq!(received3.height(), 3);
674
675            // Block4: Finalized by the actor
676            let finalization4 = make_finalization(
677                Proposal {
678                    view: 4,
679                    parent: 3,
680                    payload: block4.digest(),
681                },
682                &shares,
683                QUORUM,
684            );
685            actor.report(Activity::Finalization(finalization4)).await;
686            actor.verified(4, block4.clone()).await;
687
688            // Block4: delivered
689            let received4 = sub4_rx.await.unwrap();
690            assert_eq!(received4.digest(), block4.digest());
691            assert_eq!(received4.height(), 4);
692
693            // Block5: Broadcasted by a remote node (different actor)
694            let remote_actor = &mut actors[1].clone();
695            remote_actor.broadcast(block5.clone()).await;
696            context.sleep(Duration::from_millis(20)).await;
697
698            // Block5: delivered
699            let received5 = sub5_rx.await.unwrap();
700            assert_eq!(received5.digest(), block5.digest());
701            assert_eq!(received5.height(), 5);
702        })
703    }
704}