1pub mod actor;
66pub use actor::Actor;
67pub mod cache;
68pub mod config;
69pub use config::Config;
70pub mod ingress;
71pub use ingress::mailbox::Mailbox;
72pub mod resolver;
73pub mod store;
74
75use crate::{
76 types::{Height, Round},
77 Block,
78};
79use commonware_utils::{acknowledgement::Exact, Acknowledgement};
80
81#[derive(Clone, Debug)]
86pub enum Update<B: Block, A: Acknowledgement = Exact> {
87 Tip(Round, Height, B::Commitment),
89 Block(B, A),
98}
99
100#[cfg(test)]
101pub mod mocks;
102
103#[cfg(test)]
104mod tests {
105 use super::{
106 actor,
107 config::Config,
108 mocks::{application::Application, block::Block},
109 resolver::p2p as resolver,
110 };
111 use crate::{
112 application::marshaled::Marshaled,
113 marshal::ingress::mailbox::{AncestorStream, Identifier},
114 simplex::{
115 scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
116 types::{Activity, Context, Finalization, Finalize, Notarization, Notarize, Proposal},
117 },
118 types::{Epoch, Epocher, FixedEpocher, Height, Round, View, ViewDelta},
119 Automaton, CertifiableAutomaton, Heightable, Reporter, VerifyingApplication,
120 };
121 use commonware_broadcast::buffered;
122 use commonware_cryptography::{
123 bls12381::primitives::variant::MinPk,
124 certificate::{mocks::Fixture, ConstantProvider, Scheme as _},
125 ed25519::{PrivateKey, PublicKey},
126 sha256::{Digest as Sha256Digest, Sha256},
127 Committable, Digestible, Hasher as _, Signer,
128 };
129 use commonware_macros::{select, test_traced};
130 use commonware_p2p::{
131 simulated::{self, Link, Network, Oracle},
132 Manager,
133 };
134 use commonware_parallel::Sequential;
135 use commonware_runtime::{
136 buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner,
137 };
138 use commonware_storage::{
139 archive::{immutable, prunable},
140 translator::EightCap,
141 };
142 use commonware_utils::{vec::NonEmptyVec, NZUsize, NZU16, NZU64};
143 use futures::StreamExt;
144 use rand::{
145 seq::{IteratorRandom, SliceRandom},
146 Rng,
147 };
148 use std::{
149 collections::BTreeMap,
150 num::{NonZeroU16, NonZeroU32, NonZeroU64, NonZeroUsize},
151 time::{Duration, Instant},
152 };
153 use tracing::info;
154
155 type D = Sha256Digest;
156 type K = PublicKey;
157 type Ctx = crate::simplex::types::Context<D, K>;
158 type B = Block<D, Ctx>;
159 type V = MinPk;
160 type S = bls12381_threshold_vrf::Scheme<K, V>;
161 type P = ConstantProvider<S, Epoch>;
162
163 fn default_leader() -> K {
165 PrivateKey::from_seed(0).public_key()
166 }
167
168 fn make_block(parent: D, height: Height, timestamp: u64) -> B {
175 let parent_view = height
176 .previous()
177 .map(|h| View::new(h.get()))
178 .unwrap_or(View::zero());
179 let context = Ctx {
180 round: Round::new(Epoch::zero(), View::new(height.get())),
181 leader: default_leader(),
182 parent: (parent_view, parent),
183 };
184 B::new::<Sha256>(context, parent, height, timestamp)
185 }
186
187 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
188 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
189 const NAMESPACE: &[u8] = b"test";
190 const NUM_VALIDATORS: u32 = 4;
191 const QUORUM: u32 = 3;
192 const NUM_BLOCKS: u64 = 160;
193 const BLOCKS_PER_EPOCH: NonZeroU64 = NZU64!(20);
194 const LINK: Link = Link {
195 latency: Duration::from_millis(100),
196 jitter: Duration::from_millis(1),
197 success_rate: 1.0,
198 };
199 const UNRELIABLE_LINK: Link = Link {
200 latency: Duration::from_millis(200),
201 jitter: Duration::from_millis(50),
202 success_rate: 0.7,
203 };
204 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
205
206 async fn setup_validator(
207 context: deterministic::Context,
208 oracle: &mut Oracle<K, deterministic::Context>,
209 validator: K,
210 provider: P,
211 ) -> (
212 Application<B>,
213 crate::marshal::ingress::mailbox::Mailbox<S, B>,
214 Height,
215 ) {
216 let config = Config {
217 provider,
218 epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
219 mailbox_size: 100,
220 view_retention_timeout: ViewDelta::new(10),
221 max_repair: NZUsize!(10),
222 block_codec_config: (),
223 partition_prefix: format!("validator-{}", validator.clone()),
224 prunable_items_per_section: NZU64!(10),
225 replay_buffer: NZUsize!(1024),
226 key_write_buffer: NZUsize!(1024),
227 value_write_buffer: NZUsize!(1024),
228 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
229 strategy: Sequential,
230 };
231
232 let control = oracle.control(validator.clone());
234 let backfill = control.register(1, TEST_QUOTA).await.unwrap();
235 let resolver_cfg = resolver::Config {
236 public_key: validator.clone(),
237 provider: oracle.manager(),
238 blocker: control.clone(),
239 mailbox_size: config.mailbox_size,
240 initial: Duration::from_secs(1),
241 timeout: Duration::from_secs(2),
242 fetch_retry_timeout: Duration::from_millis(100),
243 priority_requests: false,
244 priority_responses: false,
245 };
246 let resolver = resolver::init(&context, resolver_cfg, backfill);
247
248 let broadcast_config = buffered::Config {
250 public_key: validator.clone(),
251 mailbox_size: config.mailbox_size,
252 deque_size: 10,
253 priority: false,
254 codec_config: (),
255 };
256 let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
257 let network = control.register(2, TEST_QUOTA).await.unwrap();
258 broadcast_engine.start(network);
259
260 let start = Instant::now();
262 let finalizations_by_height = immutable::Archive::init(
263 context.with_label("finalizations_by_height"),
264 immutable::Config {
265 metadata_partition: format!(
266 "{}-finalizations-by-height-metadata",
267 config.partition_prefix
268 ),
269 freezer_table_partition: format!(
270 "{}-finalizations-by-height-freezer-table",
271 config.partition_prefix
272 ),
273 freezer_table_initial_size: 64,
274 freezer_table_resize_frequency: 10,
275 freezer_table_resize_chunk_size: 10,
276 freezer_key_partition: format!(
277 "{}-finalizations-by-height-freezer-key",
278 config.partition_prefix
279 ),
280 freezer_key_page_cache: config.page_cache.clone(),
281 freezer_value_partition: format!(
282 "{}-finalizations-by-height-freezer-value",
283 config.partition_prefix
284 ),
285 freezer_value_target_size: 1024,
286 freezer_value_compression: None,
287 ordinal_partition: format!(
288 "{}-finalizations-by-height-ordinal",
289 config.partition_prefix
290 ),
291 items_per_section: NZU64!(10),
292 codec_config: S::certificate_codec_config_unbounded(),
293 replay_buffer: config.replay_buffer,
294 freezer_key_write_buffer: config.key_write_buffer,
295 freezer_value_write_buffer: config.value_write_buffer,
296 ordinal_write_buffer: config.key_write_buffer,
297 },
298 )
299 .await
300 .expect("failed to initialize finalizations by height archive");
301 info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
302
303 let start = Instant::now();
305 let finalized_blocks = immutable::Archive::init(
306 context.with_label("finalized_blocks"),
307 immutable::Config {
308 metadata_partition: format!(
309 "{}-finalized_blocks-metadata",
310 config.partition_prefix
311 ),
312 freezer_table_partition: format!(
313 "{}-finalized_blocks-freezer-table",
314 config.partition_prefix
315 ),
316 freezer_table_initial_size: 64,
317 freezer_table_resize_frequency: 10,
318 freezer_table_resize_chunk_size: 10,
319 freezer_key_partition: format!(
320 "{}-finalized_blocks-freezer-key",
321 config.partition_prefix
322 ),
323 freezer_key_page_cache: config.page_cache.clone(),
324 freezer_value_partition: format!(
325 "{}-finalized_blocks-freezer-value",
326 config.partition_prefix
327 ),
328 freezer_value_target_size: 1024,
329 freezer_value_compression: None,
330 ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
331 items_per_section: NZU64!(10),
332 codec_config: config.block_codec_config,
333 replay_buffer: config.replay_buffer,
334 freezer_key_write_buffer: config.key_write_buffer,
335 freezer_value_write_buffer: config.value_write_buffer,
336 ordinal_write_buffer: config.key_write_buffer,
337 },
338 )
339 .await
340 .expect("failed to initialize finalized blocks archive");
341 info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
342
343 let (actor, mailbox, processed_height) = actor::Actor::init(
344 context.clone(),
345 finalizations_by_height,
346 finalized_blocks,
347 config,
348 )
349 .await;
350 let application = Application::<B>::default();
351
352 actor.start(application.clone(), buffer, resolver);
354
355 (application, mailbox, processed_height)
356 }
357
358 fn make_finalization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Finalization<S, D> {
359 let finalizes: Vec<_> = schemes
361 .iter()
362 .take(quorum as usize)
363 .map(|scheme| Finalize::sign(scheme, proposal.clone()).unwrap())
364 .collect();
365
366 Finalization::from_finalizes(&schemes[0], &finalizes, &Sequential).unwrap()
368 }
369
370 fn make_notarization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Notarization<S, D> {
371 let notarizes: Vec<_> = schemes
373 .iter()
374 .take(quorum as usize)
375 .map(|scheme| Notarize::sign(scheme, proposal.clone()).unwrap())
376 .collect();
377
378 Notarization::from_notarizes(&schemes[0], ¬arizes, &Sequential).unwrap()
380 }
381
382 fn setup_network(
383 context: deterministic::Context,
384 tracked_peer_sets: Option<usize>,
385 ) -> Oracle<K, deterministic::Context> {
386 let (network, oracle) = Network::new(
387 context.with_label("network"),
388 simulated::Config {
389 max_size: 1024 * 1024,
390 disconnect_on_block: true,
391 tracked_peer_sets,
392 },
393 );
394 network.start();
395 oracle
396 }
397
398 async fn setup_network_links(
399 oracle: &mut Oracle<K, deterministic::Context>,
400 peers: &[K],
401 link: Link,
402 ) {
403 for p1 in peers.iter() {
404 for p2 in peers.iter() {
405 if p2 == p1 {
406 continue;
407 }
408 let _ = oracle.add_link(p1.clone(), p2.clone(), link.clone()).await;
409 }
410 }
411 }
412
413 #[test_traced("WARN")]
414 fn test_finalize_good_links() {
415 for seed in 0..5 {
416 let result1 = finalize(seed, LINK, false);
417 let result2 = finalize(seed, LINK, false);
418
419 assert_eq!(result1, result2);
421 }
422 }
423
424 #[test_traced("WARN")]
425 fn test_finalize_bad_links() {
426 for seed in 0..5 {
427 let result1 = finalize(seed, UNRELIABLE_LINK, false);
428 let result2 = finalize(seed, UNRELIABLE_LINK, false);
429
430 assert_eq!(result1, result2);
432 }
433 }
434
435 #[test_traced("WARN")]
436 fn test_finalize_good_links_quorum_sees_finalization() {
437 for seed in 0..5 {
438 let result1 = finalize(seed, LINK, true);
439 let result2 = finalize(seed, LINK, true);
440
441 assert_eq!(result1, result2);
443 }
444 }
445
446 #[test_traced("DEBUG")]
447 fn test_finalize_bad_links_quorum_sees_finalization() {
448 for seed in 0..5 {
449 let result1 = finalize(seed, UNRELIABLE_LINK, true);
450 let result2 = finalize(seed, UNRELIABLE_LINK, true);
451
452 assert_eq!(result1, result2);
454 }
455 }
456
457 fn finalize(seed: u64, link: Link, quorum_sees_finalization: bool) -> String {
458 let runner = deterministic::Runner::new(
459 deterministic::Config::new()
460 .with_seed(seed)
461 .with_timeout(Some(Duration::from_secs(600))),
462 );
463 runner.start(|mut context| async move {
464 let mut oracle = setup_network(context.clone(), Some(3));
465 let Fixture {
466 participants,
467 schemes,
468 ..
469 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
470
471 let mut applications = BTreeMap::new();
473 let mut actors = Vec::new();
474
475 let mut manager = oracle.manager();
477 manager
478 .track(0, participants.clone().try_into().unwrap())
479 .await;
480 for (i, validator) in participants.iter().enumerate() {
481 let (application, actor, _processed_height) = setup_validator(
482 context.with_label(&format!("validator_{i}")),
483 &mut oracle,
484 validator.clone(),
485 ConstantProvider::new(schemes[i].clone()),
486 )
487 .await;
488 applications.insert(validator.clone(), application);
489 actors.push(actor);
490 }
491
492 setup_network_links(&mut oracle, &participants, link.clone()).await;
494
495 let mut blocks = Vec::<B>::new();
497 let mut parent = Sha256::hash(b"");
498 for i in 1..=NUM_BLOCKS {
499 let block = make_block(parent, Height::new(i), i);
500 parent = block.digest();
501 blocks.push(block);
502 }
503
504 let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
506 blocks.shuffle(&mut context);
507 for block in blocks.iter() {
508 let height = block.height();
510 assert!(
511 !height.is_zero(),
512 "genesis block should not have been generated"
513 );
514
515 let bounds = epocher.containing(height).unwrap();
517 let round = Round::new(bounds.epoch(), View::new(height.get()));
518
519 let actor_index: usize = (height.get() % (NUM_VALIDATORS as u64)) as usize;
521 let mut actor = actors[actor_index].clone();
522 actor.proposed(round, block.clone()).await;
523 actor.verified(round, block.clone()).await;
524
525 context.sleep(link.latency).await;
528
529 let proposal = Proposal {
531 round,
532 parent: View::new(height.previous().unwrap().get()),
533 payload: block.digest(),
534 };
535 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
536 actor
537 .report(Activity::Notarization(notarization.clone()))
538 .await;
539
540 let fin = make_finalization(proposal, &schemes, QUORUM);
543 if quorum_sees_finalization {
544 let do_finalize = context.gen_bool(0.2);
547 for (i, actor) in actors
548 .iter_mut()
549 .choose_multiple(&mut context, NUM_VALIDATORS as usize)
550 .iter_mut()
551 .enumerate()
552 {
553 if (do_finalize && i < QUORUM as usize)
554 || height == Height::new(NUM_BLOCKS)
555 || height == bounds.last()
556 {
557 actor.report(Activity::Finalization(fin.clone())).await;
558 }
559 }
560 } else {
561 for actor in actors.iter_mut() {
564 if context.gen_bool(0.2)
565 || height == Height::new(NUM_BLOCKS)
566 || height == bounds.last()
567 {
568 actor.report(Activity::Finalization(fin.clone())).await;
569 }
570 }
571 }
572 }
573
574 let mut finished = false;
576 while !finished {
577 context.sleep(Duration::from_secs(1)).await;
579
580 if applications.len() != NUM_VALIDATORS as usize {
582 continue;
583 }
584 finished = true;
585 for app in applications.values() {
586 if app.blocks().len() != NUM_BLOCKS as usize {
587 finished = false;
588 break;
589 }
590 let Some((height, _)) = app.tip() else {
591 finished = false;
592 break;
593 };
594 if height < Height::new(NUM_BLOCKS) {
595 finished = false;
596 break;
597 }
598 }
599 }
600
601 context.auditor().state()
603 })
604 }
605
606 #[test_traced("WARN")]
607 fn test_sync_height_floor() {
608 let runner = deterministic::Runner::new(
609 deterministic::Config::new()
610 .with_seed(0xFF)
611 .with_timeout(Some(Duration::from_secs(300))),
612 );
613 runner.start(|mut context| async move {
614 let mut oracle = setup_network(context.clone(), Some(3));
615 let Fixture {
616 participants,
617 schemes,
618 ..
619 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
620
621 let mut applications = BTreeMap::new();
623 let mut actors = Vec::new();
624
625 let mut manager = oracle.manager();
627 manager
628 .track(0, participants.clone().try_into().unwrap())
629 .await;
630 for (i, validator) in participants.iter().enumerate().skip(1) {
631 let (application, actor, _processed_height) = setup_validator(
632 context.with_label(&format!("validator_{i}")),
633 &mut oracle,
634 validator.clone(),
635 ConstantProvider::new(schemes[i].clone()),
636 )
637 .await;
638 applications.insert(validator.clone(), application);
639 actors.push(actor);
640 }
641
642 setup_network_links(&mut oracle, &participants[1..], LINK).await;
645
646 let mut blocks = Vec::<B>::new();
648 let mut parent = Sha256::hash(b"");
649 for i in 1..=NUM_BLOCKS {
650 let block = make_block(parent, Height::new(i), i);
651 parent = block.digest();
652 blocks.push(block);
653 }
654
655 let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
657 for block in blocks.iter() {
658 let height = block.height();
660 assert!(
661 !height.is_zero(),
662 "genesis block should not have been generated"
663 );
664
665 let bounds = epocher.containing(height).unwrap();
667 let round = Round::new(bounds.epoch(), View::new(height.get()));
668
669 let actor_index: usize = (height.get() % (applications.len() as u64)) as usize;
671 let mut actor = actors[actor_index].clone();
672 actor.proposed(round, block.clone()).await;
673 actor.verified(round, block.clone()).await;
674
675 context.sleep(LINK.latency).await;
678
679 let proposal = Proposal {
681 round,
682 parent: View::new(height.previous().unwrap().get()),
683 payload: block.digest(),
684 };
685 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
686 actor
687 .report(Activity::Notarization(notarization.clone()))
688 .await;
689
690 let fin = make_finalization(proposal, &schemes, QUORUM);
692 for actor in actors.iter_mut() {
693 actor.report(Activity::Finalization(fin.clone())).await;
694 }
695 }
696
697 let mut finished = false;
699 while !finished {
700 context.sleep(Duration::from_secs(1)).await;
702
703 finished = true;
705 for app in applications.values().skip(1) {
706 if app.blocks().len() != NUM_BLOCKS as usize {
707 finished = false;
708 break;
709 }
710 let Some((height, _)) = app.tip() else {
711 finished = false;
712 break;
713 };
714 if height < Height::new(NUM_BLOCKS) {
715 finished = false;
716 break;
717 }
718 }
719 }
720
721 let validator = participants.first().unwrap();
723 let (app, mut actor, _processed_height) = setup_validator(
724 context.with_label("validator_0"),
725 &mut oracle,
726 validator.clone(),
727 ConstantProvider::new(schemes[0].clone()),
728 )
729 .await;
730
731 setup_network_links(&mut oracle, &participants, LINK).await;
733
734 const NEW_SYNC_FLOOR: u64 = 100;
735 let second_actor = &mut actors[1];
736 let latest_finalization = second_actor
737 .get_finalization(Height::new(NUM_BLOCKS))
738 .await
739 .unwrap();
740
741 actor.set_floor(Height::new(NEW_SYNC_FLOOR)).await;
743
744 actor
747 .report(Activity::Finalization(latest_finalization))
748 .await;
749
750 let mut finished = false;
752 while !finished {
753 context.sleep(Duration::from_secs(1)).await;
755
756 finished = true;
757 if app.blocks().len() != (NUM_BLOCKS - NEW_SYNC_FLOOR) as usize {
758 finished = false;
759 continue;
760 }
761 let Some((height, _)) = app.tip() else {
762 finished = false;
763 continue;
764 };
765 if height < Height::new(NUM_BLOCKS) {
766 finished = false;
767 continue;
768 }
769 }
770
771 for height in 1..=NUM_BLOCKS {
773 let block = actor
774 .get_block(Identifier::Height(Height::new(height)))
775 .await;
776 if height <= NEW_SYNC_FLOOR {
777 assert!(block.is_none());
778 } else {
779 assert_eq!(block.unwrap().height(), Height::new(height));
780 }
781 }
782 })
783 }
784
785 #[test_traced("WARN")]
786 fn test_prune_finalized_archives() {
787 let runner = deterministic::Runner::new(
788 deterministic::Config::new().with_timeout(Some(Duration::from_secs(120))),
789 );
790 runner.start(|mut context| async move {
791 let oracle = setup_network(context.clone(), None);
792 let Fixture {
793 participants,
794 schemes,
795 ..
796 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
797
798 let validator = participants[0].clone();
799 let partition_prefix = format!("prune-test-{}", validator.clone());
800 let page_cache = CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE);
801 let control = oracle.control(validator.clone());
802
803 let init_marshal = |label: &str| {
805 let ctx = context.with_label(label);
806 let validator = validator.clone();
807 let schemes = schemes.clone();
808 let partition_prefix = partition_prefix.clone();
809 let page_cache = page_cache.clone();
810 let control = control.clone();
811 let oracle_manager = oracle.manager();
812 async move {
813 let provider = ConstantProvider::new(schemes[0].clone());
814 let config = Config {
815 provider,
816 epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
817 mailbox_size: 100,
818 view_retention_timeout: ViewDelta::new(10),
819 max_repair: NZUsize!(10),
820 block_codec_config: (),
821 partition_prefix: partition_prefix.clone(),
822 prunable_items_per_section: NZU64!(10),
823 replay_buffer: NZUsize!(1024),
824 key_write_buffer: NZUsize!(1024),
825 value_write_buffer: NZUsize!(1024),
826 page_cache: page_cache.clone(),
827 strategy: Sequential,
828 };
829
830 let backfill = control.register(0, TEST_QUOTA).await.unwrap();
832 let resolver_cfg = resolver::Config {
833 public_key: validator.clone(),
834 provider: oracle_manager,
835 blocker: control.clone(),
836 mailbox_size: config.mailbox_size,
837 initial: Duration::from_secs(1),
838 timeout: Duration::from_secs(2),
839 fetch_retry_timeout: Duration::from_millis(100),
840 priority_requests: false,
841 priority_responses: false,
842 };
843 let resolver = resolver::init(&ctx, resolver_cfg, backfill);
844
845 let broadcast_config = buffered::Config {
847 public_key: validator.clone(),
848 mailbox_size: config.mailbox_size,
849 deque_size: 10,
850 priority: false,
851 codec_config: (),
852 };
853 let (broadcast_engine, buffer) =
854 buffered::Engine::new(ctx.clone(), broadcast_config);
855 let network = control.register(1, TEST_QUOTA).await.unwrap();
856 broadcast_engine.start(network);
857
858 let finalizations_by_height = prunable::Archive::init(
860 ctx.with_label("finalizations_by_height"),
861 prunable::Config {
862 translator: EightCap,
863 key_partition: format!(
864 "{}-finalizations-by-height-key",
865 partition_prefix
866 ),
867 key_page_cache: page_cache.clone(),
868 value_partition: format!(
869 "{}-finalizations-by-height-value",
870 partition_prefix
871 ),
872 compression: None,
873 codec_config: S::certificate_codec_config_unbounded(),
874 items_per_section: NZU64!(10),
875 key_write_buffer: config.key_write_buffer,
876 value_write_buffer: config.value_write_buffer,
877 replay_buffer: config.replay_buffer,
878 },
879 )
880 .await
881 .expect("failed to initialize finalizations by height archive");
882
883 let finalized_blocks = prunable::Archive::init(
884 ctx.with_label("finalized_blocks"),
885 prunable::Config {
886 translator: EightCap,
887 key_partition: format!("{}-finalized-blocks-key", partition_prefix),
888 key_page_cache: page_cache.clone(),
889 value_partition: format!("{}-finalized-blocks-value", partition_prefix),
890 compression: None,
891 codec_config: config.block_codec_config,
892 items_per_section: NZU64!(10),
893 key_write_buffer: config.key_write_buffer,
894 value_write_buffer: config.value_write_buffer,
895 replay_buffer: config.replay_buffer,
896 },
897 )
898 .await
899 .expect("failed to initialize finalized blocks archive");
900
901 let (actor, mailbox, _processed_height) = actor::Actor::init(
902 ctx.clone(),
903 finalizations_by_height,
904 finalized_blocks,
905 config,
906 )
907 .await;
908 let application = Application::<B>::default();
909 actor.start(application.clone(), buffer, resolver);
910
911 (mailbox, application)
912 }
913 };
914
915 let (mut mailbox, application) = init_marshal("init").await;
917
918 let mut parent = Sha256::hash(b"");
920 let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
921 for i in 1..=20u64 {
922 let block = make_block(parent, Height::new(i), i);
923 let commitment = block.digest();
924 let bounds = epocher.containing(Height::new(i)).unwrap();
925 let round = Round::new(bounds.epoch(), View::new(i));
926
927 mailbox.verified(round, block.clone()).await;
928 let proposal = Proposal {
929 round,
930 parent: View::new(i - 1),
931 payload: commitment,
932 };
933 let finalization = make_finalization(proposal, &schemes, QUORUM);
934 mailbox.report(Activity::Finalization(finalization)).await;
935
936 parent = commitment;
937 }
938
939 while application.blocks().len() < 20 {
942 context.sleep(Duration::from_millis(10)).await;
943 }
944
945 for i in 1..=20u64 {
947 assert!(
948 mailbox.get_block(Height::new(i)).await.is_some(),
949 "block {i} should exist before pruning"
950 );
951 assert!(
952 mailbox.get_finalization(Height::new(i)).await.is_some(),
953 "finalization {i} should exist before pruning"
954 );
955 }
956
957 mailbox.prune(Height::new(25)).await;
959 context.sleep(Duration::from_millis(50)).await;
960 for i in 1..=20u64 {
961 assert!(
962 mailbox.get_block(Height::new(i)).await.is_some(),
963 "block {i} should still exist after pruning above floor"
964 );
965 }
966
967 mailbox.prune(Height::new(10)).await;
969 context.sleep(Duration::from_millis(100)).await;
970 for i in 1..10u64 {
971 assert!(
972 mailbox.get_block(Height::new(i)).await.is_none(),
973 "block {i} should be pruned"
974 );
975 assert!(
976 mailbox.get_finalization(Height::new(i)).await.is_none(),
977 "finalization {i} should be pruned"
978 );
979 }
980
981 for i in 10..=20u64 {
983 assert!(
984 mailbox.get_block(Height::new(i)).await.is_some(),
985 "block {i} should still exist after pruning"
986 );
987 assert!(
988 mailbox.get_finalization(Height::new(i)).await.is_some(),
989 "finalization {i} should still exist after pruning"
990 );
991 }
992
993 mailbox.prune(Height::new(20)).await;
995 context.sleep(Duration::from_millis(100)).await;
996 for i in 10..20u64 {
997 assert!(
998 mailbox.get_block(Height::new(i)).await.is_none(),
999 "block {i} should be pruned after second prune"
1000 );
1001 assert!(
1002 mailbox.get_finalization(Height::new(i)).await.is_none(),
1003 "finalization {i} should be pruned after second prune"
1004 );
1005 }
1006
1007 assert!(
1009 mailbox.get_block(Height::new(20)).await.is_some(),
1010 "block 20 should still exist"
1011 );
1012 assert!(
1013 mailbox.get_finalization(Height::new(20)).await.is_some(),
1014 "finalization 20 should still exist"
1015 );
1016
1017 drop(mailbox);
1019 let (mut mailbox, _application) = init_marshal("restart").await;
1020
1021 for i in 1..20u64 {
1023 assert!(
1024 mailbox.get_block(Height::new(i)).await.is_none(),
1025 "block {i} should still be pruned after restart"
1026 );
1027 assert!(
1028 mailbox.get_finalization(Height::new(i)).await.is_none(),
1029 "finalization {i} should still be pruned after restart"
1030 );
1031 }
1032
1033 assert!(
1035 mailbox.get_block(Height::new(20)).await.is_some(),
1036 "block 20 should still exist after restart"
1037 );
1038 assert!(
1039 mailbox.get_finalization(Height::new(20)).await.is_some(),
1040 "finalization 20 should still exist after restart"
1041 );
1042 })
1043 }
1044
1045 #[test_traced("WARN")]
1046 fn test_subscribe_basic_block_delivery() {
1047 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1048 runner.start(|mut context| async move {
1049 let mut oracle = setup_network(context.clone(), None);
1050 let Fixture {
1051 participants,
1052 schemes,
1053 ..
1054 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1055
1056 let mut actors = Vec::new();
1057 for (i, validator) in participants.iter().enumerate() {
1058 let (_application, actor, _processed_height) = setup_validator(
1059 context.with_label(&format!("validator_{i}")),
1060 &mut oracle,
1061 validator.clone(),
1062 ConstantProvider::new(schemes[i].clone()),
1063 )
1064 .await;
1065 actors.push(actor);
1066 }
1067 let mut actor = actors[0].clone();
1068
1069 setup_network_links(&mut oracle, &participants, LINK).await;
1070
1071 let parent = Sha256::hash(b"");
1072 let block = make_block(parent, Height::new(1), 1);
1073 let commitment = block.digest();
1074
1075 let subscription_rx = actor
1076 .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment)
1077 .await;
1078
1079 actor
1080 .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
1081 .await;
1082
1083 let proposal = Proposal {
1084 round: Round::new(Epoch::new(0), View::new(1)),
1085 parent: View::new(0),
1086 payload: commitment,
1087 };
1088 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
1089 actor.report(Activity::Notarization(notarization)).await;
1090
1091 let finalization = make_finalization(proposal, &schemes, QUORUM);
1092 actor.report(Activity::Finalization(finalization)).await;
1093
1094 let received_block = subscription_rx.await.unwrap();
1095 assert_eq!(received_block.digest(), block.digest());
1096 assert_eq!(received_block.height(), Height::new(1));
1097 })
1098 }
1099
1100 #[test_traced("WARN")]
1101 fn test_subscribe_multiple_subscriptions() {
1102 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1103 runner.start(|mut context| async move {
1104 let mut oracle = setup_network(context.clone(), None);
1105 let Fixture {
1106 participants,
1107 schemes,
1108 ..
1109 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1110
1111 let mut actors = Vec::new();
1112 for (i, validator) in participants.iter().enumerate() {
1113 let (_application, actor, _processed_height) = setup_validator(
1114 context.with_label(&format!("validator_{i}")),
1115 &mut oracle,
1116 validator.clone(),
1117 ConstantProvider::new(schemes[i].clone()),
1118 )
1119 .await;
1120 actors.push(actor);
1121 }
1122 let mut actor = actors[0].clone();
1123
1124 setup_network_links(&mut oracle, &participants, LINK).await;
1125
1126 let parent = Sha256::hash(b"");
1127 let block1 = make_block(parent, Height::new(1), 1);
1128 let block2 = make_block(block1.digest(), Height::new(2), 2);
1129 let commitment1 = block1.digest();
1130 let commitment2 = block2.digest();
1131
1132 let sub1_rx = actor
1133 .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
1134 .await;
1135 let sub2_rx = actor
1136 .subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
1137 .await;
1138 let sub3_rx = actor
1139 .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
1140 .await;
1141
1142 actor
1143 .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
1144 .await;
1145 actor
1146 .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1147 .await;
1148
1149 for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
1150 let view = View::new(view);
1151 let proposal = Proposal {
1152 round: Round::new(Epoch::zero(), view),
1153 parent: view.previous().unwrap(),
1154 payload: block.digest(),
1155 };
1156 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
1157 actor.report(Activity::Notarization(notarization)).await;
1158
1159 let finalization = make_finalization(proposal, &schemes, QUORUM);
1160 actor.report(Activity::Finalization(finalization)).await;
1161 }
1162
1163 let received1_sub1 = sub1_rx.await.unwrap();
1164 let received2 = sub2_rx.await.unwrap();
1165 let received1_sub3 = sub3_rx.await.unwrap();
1166
1167 assert_eq!(received1_sub1.digest(), block1.digest());
1168 assert_eq!(received2.digest(), block2.digest());
1169 assert_eq!(received1_sub3.digest(), block1.digest());
1170 assert_eq!(received1_sub1.height(), Height::new(1));
1171 assert_eq!(received2.height(), Height::new(2));
1172 assert_eq!(received1_sub3.height(), Height::new(1));
1173 })
1174 }
1175
1176 #[test_traced("WARN")]
1177 fn test_subscribe_canceled_subscriptions() {
1178 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1179 runner.start(|mut context| async move {
1180 let mut oracle = setup_network(context.clone(), None);
1181 let Fixture {
1182 participants,
1183 schemes,
1184 ..
1185 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1186
1187 let mut actors = Vec::new();
1188 for (i, validator) in participants.iter().enumerate() {
1189 let (_application, actor, _processed_height) = setup_validator(
1190 context.with_label(&format!("validator_{i}")),
1191 &mut oracle,
1192 validator.clone(),
1193 ConstantProvider::new(schemes[i].clone()),
1194 )
1195 .await;
1196 actors.push(actor);
1197 }
1198 let mut actor = actors[0].clone();
1199
1200 setup_network_links(&mut oracle, &participants, LINK).await;
1201
1202 let parent = Sha256::hash(b"");
1203 let block1 = make_block(parent, Height::new(1), 1);
1204 let block2 = make_block(block1.digest(), Height::new(2), 2);
1205 let commitment1 = block1.digest();
1206 let commitment2 = block2.digest();
1207
1208 let sub1_rx = actor
1209 .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
1210 .await;
1211 let sub2_rx = actor
1212 .subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
1213 .await;
1214
1215 drop(sub1_rx);
1216
1217 actor
1218 .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
1219 .await;
1220 actor
1221 .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1222 .await;
1223
1224 for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
1225 let view = View::new(view);
1226 let proposal = Proposal {
1227 round: Round::new(Epoch::zero(), view),
1228 parent: view.previous().unwrap(),
1229 payload: block.digest(),
1230 };
1231 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
1232 actor.report(Activity::Notarization(notarization)).await;
1233
1234 let finalization = make_finalization(proposal, &schemes, QUORUM);
1235 actor.report(Activity::Finalization(finalization)).await;
1236 }
1237
1238 let received2 = sub2_rx.await.unwrap();
1239 assert_eq!(received2.digest(), block2.digest());
1240 assert_eq!(received2.height(), Height::new(2));
1241 })
1242 }
1243
1244 #[test_traced("WARN")]
1245 fn test_subscribe_blocks_from_different_sources() {
1246 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1247 runner.start(|mut context| async move {
1248 let mut oracle = setup_network(context.clone(), None);
1249 let Fixture {
1250 participants,
1251 schemes,
1252 ..
1253 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1254
1255 let mut actors = Vec::new();
1256 for (i, validator) in participants.iter().enumerate() {
1257 let (_application, actor, _processed_height) = setup_validator(
1258 context.with_label(&format!("validator_{i}")),
1259 &mut oracle,
1260 validator.clone(),
1261 ConstantProvider::new(schemes[i].clone()),
1262 )
1263 .await;
1264 actors.push(actor);
1265 }
1266 let mut actor = actors[0].clone();
1267
1268 setup_network_links(&mut oracle, &participants, LINK).await;
1269
1270 let parent = Sha256::hash(b"");
1271 let block1 = make_block(parent, Height::new(1), 1);
1272 let block2 = make_block(block1.digest(), Height::new(2), 2);
1273 let block3 = make_block(block2.digest(), Height::new(3), 3);
1274 let block4 = make_block(block3.digest(), Height::new(4), 4);
1275 let block5 = make_block(block4.digest(), Height::new(5), 5);
1276
1277 let sub1_rx = actor.subscribe(None, block1.digest()).await;
1278 let sub2_rx = actor.subscribe(None, block2.digest()).await;
1279 let sub3_rx = actor.subscribe(None, block3.digest()).await;
1280 let sub4_rx = actor.subscribe(None, block4.digest()).await;
1281 let sub5_rx = actor.subscribe(None, block5.digest()).await;
1282
1283 actor
1285 .proposed(Round::new(Epoch::zero(), View::new(1)), block1.clone())
1286 .await;
1287 context.sleep(Duration::from_millis(20)).await;
1288
1289 let received1 = sub1_rx.await.unwrap();
1291 assert_eq!(received1.digest(), block1.digest());
1292 assert_eq!(received1.height(), Height::new(1));
1293
1294 actor
1296 .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1297 .await;
1298
1299 let received2 = sub2_rx.await.unwrap();
1301 assert_eq!(received2.digest(), block2.digest());
1302 assert_eq!(received2.height(), Height::new(2));
1303
1304 let proposal3 = Proposal {
1306 round: Round::new(Epoch::new(0), View::new(3)),
1307 parent: View::new(2),
1308 payload: block3.digest(),
1309 };
1310 let notarization3 = make_notarization(proposal3.clone(), &schemes, QUORUM);
1311 actor.report(Activity::Notarization(notarization3)).await;
1312 actor
1313 .verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
1314 .await;
1315
1316 let received3 = sub3_rx.await.unwrap();
1318 assert_eq!(received3.digest(), block3.digest());
1319 assert_eq!(received3.height(), Height::new(3));
1320
1321 let finalization4 = make_finalization(
1323 Proposal {
1324 round: Round::new(Epoch::new(0), View::new(4)),
1325 parent: View::new(3),
1326 payload: block4.digest(),
1327 },
1328 &schemes,
1329 QUORUM,
1330 );
1331 actor.report(Activity::Finalization(finalization4)).await;
1332 actor
1333 .verified(Round::new(Epoch::new(0), View::new(4)), block4.clone())
1334 .await;
1335
1336 let received4 = sub4_rx.await.unwrap();
1338 assert_eq!(received4.digest(), block4.digest());
1339 assert_eq!(received4.height(), Height::new(4));
1340
1341 let remote_actor = &mut actors[1].clone();
1343 remote_actor
1344 .proposed(Round::new(Epoch::zero(), View::new(5)), block5.clone())
1345 .await;
1346 context.sleep(Duration::from_millis(20)).await;
1347
1348 let received5 = sub5_rx.await.unwrap();
1350 assert_eq!(received5.digest(), block5.digest());
1351 assert_eq!(received5.height(), Height::new(5));
1352 })
1353 }
1354
1355 #[test_traced("WARN")]
1356 fn test_get_info_basic_queries_present_and_missing() {
1357 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1358 runner.start(|mut context| async move {
1359 let mut oracle = setup_network(context.clone(), None);
1360 let Fixture {
1361 participants,
1362 schemes,
1363 ..
1364 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1365
1366 let me = participants[0].clone();
1368 let (_application, mut actor, _processed_height) = setup_validator(
1369 context.with_label("validator_0"),
1370 &mut oracle,
1371 me,
1372 ConstantProvider::new(schemes[0].clone()),
1373 )
1374 .await;
1375
1376 assert!(actor.get_info(Identifier::Latest).await.is_none());
1378
1379 assert!(actor.get_info(Height::new(1)).await.is_none());
1381
1382 let parent = Sha256::hash(b"");
1384 let block = make_block(parent, Height::new(1), 1);
1385 let digest = block.digest();
1386 let round = Round::new(Epoch::new(0), View::new(1));
1387 actor.verified(round, block.clone()).await;
1388
1389 let proposal = Proposal {
1390 round,
1391 parent: View::new(0),
1392 payload: digest,
1393 };
1394 let finalization = make_finalization(proposal, &schemes, QUORUM);
1395 actor.report(Activity::Finalization(finalization)).await;
1396
1397 assert_eq!(
1399 actor.get_info(Identifier::Latest).await,
1400 Some((Height::new(1), digest))
1401 );
1402
1403 assert_eq!(
1405 actor.get_info(Height::new(1)).await,
1406 Some((Height::new(1), digest))
1407 );
1408
1409 assert_eq!(
1411 actor.get_info(&digest).await,
1412 Some((Height::new(1), digest))
1413 );
1414
1415 assert!(actor.get_info(Height::new(2)).await.is_none());
1417
1418 let missing = Sha256::hash(b"missing");
1420 assert!(actor.get_info(&missing).await.is_none());
1421 })
1422 }
1423
1424 #[test_traced("WARN")]
1425 fn test_get_info_latest_progression_multiple_finalizations() {
1426 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1427 runner.start(|mut context| async move {
1428 let mut oracle = setup_network(context.clone(), None);
1429 let Fixture {
1430 participants,
1431 schemes,
1432 ..
1433 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1434
1435 let me = participants[0].clone();
1437 let (_application, mut actor, _processed_height) = setup_validator(
1438 context.with_label("validator_0"),
1439 &mut oracle,
1440 me,
1441 ConstantProvider::new(schemes[0].clone()),
1442 )
1443 .await;
1444
1445 assert!(actor.get_info(Identifier::Latest).await.is_none());
1447
1448 let parent0 = Sha256::hash(b"");
1450 let block1 = make_block(parent0, Height::new(1), 1);
1451 let d1 = block1.digest();
1452 actor
1453 .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
1454 .await;
1455 let f1 = make_finalization(
1456 Proposal {
1457 round: Round::new(Epoch::new(0), View::new(1)),
1458 parent: View::new(0),
1459 payload: d1,
1460 },
1461 &schemes,
1462 QUORUM,
1463 );
1464 actor.report(Activity::Finalization(f1)).await;
1465 let latest = actor.get_info(Identifier::Latest).await;
1466 assert_eq!(latest, Some((Height::new(1), d1)));
1467
1468 let block2 = make_block(d1, Height::new(2), 2);
1469 let d2 = block2.digest();
1470 actor
1471 .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1472 .await;
1473 let f2 = make_finalization(
1474 Proposal {
1475 round: Round::new(Epoch::new(0), View::new(2)),
1476 parent: View::new(1),
1477 payload: d2,
1478 },
1479 &schemes,
1480 QUORUM,
1481 );
1482 actor.report(Activity::Finalization(f2)).await;
1483 let latest = actor.get_info(Identifier::Latest).await;
1484 assert_eq!(latest, Some((Height::new(2), d2)));
1485
1486 let block3 = make_block(d2, Height::new(3), 3);
1487 let d3 = block3.digest();
1488 actor
1489 .verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
1490 .await;
1491 let f3 = make_finalization(
1492 Proposal {
1493 round: Round::new(Epoch::new(0), View::new(3)),
1494 parent: View::new(2),
1495 payload: d3,
1496 },
1497 &schemes,
1498 QUORUM,
1499 );
1500 actor.report(Activity::Finalization(f3)).await;
1501 let latest = actor.get_info(Identifier::Latest).await;
1502 assert_eq!(latest, Some((Height::new(3), d3)));
1503 })
1504 }
1505
1506 #[test_traced("WARN")]
1507 fn test_get_block_by_height_and_latest() {
1508 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1509 runner.start(|mut context| async move {
1510 let mut oracle = setup_network(context.clone(), None);
1511 let Fixture {
1512 participants,
1513 schemes,
1514 ..
1515 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1516
1517 let me = participants[0].clone();
1518 let (application, mut actor, _processed_height) = setup_validator(
1519 context.with_label("validator_0"),
1520 &mut oracle,
1521 me,
1522 ConstantProvider::new(schemes[0].clone()),
1523 )
1524 .await;
1525
1526 let latest_block = actor.get_block(Identifier::Latest).await;
1528 assert!(latest_block.is_none());
1529 assert!(application.tip().is_none());
1530
1531 let parent = Sha256::hash(b"");
1533 let block = make_block(parent, Height::new(1), 1);
1534 let commitment = block.digest();
1535 let round = Round::new(Epoch::new(0), View::new(1));
1536 actor.verified(round, block.clone()).await;
1537 let proposal = Proposal {
1538 round,
1539 parent: View::new(0),
1540 payload: commitment,
1541 };
1542 let finalization = make_finalization(proposal, &schemes, QUORUM);
1543 actor.report(Activity::Finalization(finalization)).await;
1544
1545 let by_height = actor
1547 .get_block(Height::new(1))
1548 .await
1549 .expect("missing block by height");
1550 assert_eq!(by_height.height(), Height::new(1));
1551 assert_eq!(by_height.digest(), commitment);
1552 assert_eq!(application.tip(), Some((Height::new(1), commitment)));
1553
1554 let by_latest = actor
1556 .get_block(Identifier::Latest)
1557 .await
1558 .expect("missing block by latest");
1559 assert_eq!(by_latest.height(), Height::new(1));
1560 assert_eq!(by_latest.digest(), commitment);
1561
1562 let by_height = actor.get_block(Height::new(2)).await;
1564 assert!(by_height.is_none());
1565 })
1566 }
1567
1568 #[test_traced("WARN")]
1569 fn test_get_block_by_commitment_from_sources_and_missing() {
1570 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1571 runner.start(|mut context| async move {
1572 let mut oracle = setup_network(context.clone(), None);
1573 let Fixture {
1574 participants,
1575 schemes,
1576 ..
1577 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1578
1579 let me = participants[0].clone();
1580 let (_application, mut actor, _processed_height) = setup_validator(
1581 context.with_label("validator_0"),
1582 &mut oracle,
1583 me,
1584 ConstantProvider::new(schemes[0].clone()),
1585 )
1586 .await;
1587
1588 let parent = Sha256::hash(b"");
1590 let ver_block = make_block(parent, Height::new(1), 1);
1591 let ver_commitment = ver_block.digest();
1592 let round1 = Round::new(Epoch::new(0), View::new(1));
1593 actor.verified(round1, ver_block.clone()).await;
1594 let got = actor
1595 .get_block(&ver_commitment)
1596 .await
1597 .expect("missing block from cache");
1598 assert_eq!(got.digest(), ver_commitment);
1599
1600 let fin_block = make_block(ver_commitment, Height::new(2), 2);
1602 let fin_commitment = fin_block.digest();
1603 let round2 = Round::new(Epoch::new(0), View::new(2));
1604 actor.verified(round2, fin_block.clone()).await;
1605 let proposal = Proposal {
1606 round: round2,
1607 parent: View::new(1),
1608 payload: fin_commitment,
1609 };
1610 let finalization = make_finalization(proposal, &schemes, QUORUM);
1611 actor.report(Activity::Finalization(finalization)).await;
1612 let got = actor
1613 .get_block(&fin_commitment)
1614 .await
1615 .expect("missing block from finalized archive");
1616 assert_eq!(got.digest(), fin_commitment);
1617 assert_eq!(got.height(), Height::new(2));
1618
1619 let missing = Sha256::hash(b"definitely-missing");
1621 let missing_block = actor.get_block(&missing).await;
1622 assert!(missing_block.is_none());
1623 })
1624 }
1625
1626 #[test_traced("WARN")]
1627 fn test_get_finalization_by_height() {
1628 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1629 runner.start(|mut context| async move {
1630 let mut oracle = setup_network(context.clone(), None);
1631 let Fixture {
1632 participants,
1633 schemes,
1634 ..
1635 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1636
1637 let me = participants[0].clone();
1638 let (_application, mut actor, _processed_height) = setup_validator(
1639 context.with_label("validator_0"),
1640 &mut oracle,
1641 me,
1642 ConstantProvider::new(schemes[0].clone()),
1643 )
1644 .await;
1645
1646 let finalization = actor.get_finalization(Height::new(1)).await;
1648 assert!(finalization.is_none());
1649
1650 let parent = Sha256::hash(b"");
1652 let block = make_block(parent, Height::new(1), 1);
1653 let commitment = block.digest();
1654 let round = Round::new(Epoch::new(0), View::new(1));
1655 actor.verified(round, block.clone()).await;
1656 let proposal = Proposal {
1657 round,
1658 parent: View::new(0),
1659 payload: commitment,
1660 };
1661 let finalization = make_finalization(proposal, &schemes, QUORUM);
1662 actor.report(Activity::Finalization(finalization)).await;
1663
1664 let finalization = actor
1666 .get_finalization(Height::new(1))
1667 .await
1668 .expect("missing finalization by height");
1669 assert_eq!(finalization.proposal.parent, View::new(0));
1670 assert_eq!(
1671 finalization.proposal.round,
1672 Round::new(Epoch::new(0), View::new(1))
1673 );
1674 assert_eq!(finalization.proposal.payload, commitment);
1675
1676 assert!(actor.get_finalization(Height::new(2)).await.is_none());
1677 })
1678 }
1679
1680 #[test_traced("WARN")]
1681 fn test_hint_finalized_triggers_fetch() {
1682 let runner = deterministic::Runner::new(
1683 deterministic::Config::new()
1684 .with_seed(42)
1685 .with_timeout(Some(Duration::from_secs(60))),
1686 );
1687 runner.start(|mut context| async move {
1688 let mut oracle = setup_network(context.clone(), Some(3));
1689 let Fixture {
1690 participants,
1691 schemes,
1692 ..
1693 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1694
1695 let mut manager = oracle.manager();
1697 manager
1698 .track(0, participants.clone().try_into().unwrap())
1699 .await;
1700
1701 let (app0, mut actor0, _) = setup_validator(
1703 context.with_label("validator_0"),
1704 &mut oracle,
1705 participants[0].clone(),
1706 ConstantProvider::new(schemes[0].clone()),
1707 )
1708 .await;
1709 let (_app1, mut actor1, _) = setup_validator(
1710 context.with_label("validator_1"),
1711 &mut oracle,
1712 participants[1].clone(),
1713 ConstantProvider::new(schemes[1].clone()),
1714 )
1715 .await;
1716
1717 setup_network_links(&mut oracle, &participants[..2], LINK).await;
1719
1720 let mut parent = Sha256::hash(b"");
1722 for i in 1..=5u64 {
1723 let block = make_block(parent, Height::new(i), i);
1724 let commitment = block.digest();
1725 let round = Round::new(Epoch::new(0), View::new(i));
1726
1727 actor0.verified(round, block.clone()).await;
1728 let proposal = Proposal {
1729 round,
1730 parent: View::new(i - 1),
1731 payload: commitment,
1732 };
1733 let finalization = make_finalization(proposal, &schemes, QUORUM);
1734 actor0.report(Activity::Finalization(finalization)).await;
1735
1736 parent = commitment;
1737 }
1738
1739 while app0.blocks().len() < 5 {
1741 context.sleep(Duration::from_millis(10)).await;
1742 }
1743
1744 assert!(actor1.get_finalization(Height::new(5)).await.is_none());
1746
1747 actor1
1749 .hint_finalized(Height::new(5), NonEmptyVec::new(participants[0].clone()))
1750 .await;
1751
1752 while actor1.get_finalization(Height::new(5)).await.is_none() {
1754 context.sleep(Duration::from_millis(10)).await;
1755 }
1756
1757 let finalization = actor1
1759 .get_finalization(Height::new(5))
1760 .await
1761 .expect("finalization should be fetched");
1762 assert_eq!(finalization.proposal.round.view(), View::new(5));
1763 })
1764 }
1765
1766 #[test_traced("WARN")]
1767 fn test_ancestry_stream() {
1768 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1769 runner.start(|mut context| async move {
1770 let mut oracle = setup_network(context.clone(), None);
1771 let Fixture {
1772 participants,
1773 schemes,
1774 ..
1775 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1776
1777 let me = participants[0].clone();
1778 let (_application, mut actor, _processed_height) = setup_validator(
1779 context.with_label("validator_0"),
1780 &mut oracle,
1781 me,
1782 ConstantProvider::new(schemes[0].clone()),
1783 )
1784 .await;
1785
1786 let mut parent = Sha256::hash(b"");
1788 for i in 1..=5 {
1789 let block = make_block(parent, Height::new(i), i);
1790 let commitment = block.digest();
1791 let round = Round::new(Epoch::new(0), View::new(i));
1792 actor.verified(round, block.clone()).await;
1793 let proposal = Proposal {
1794 round,
1795 parent: View::new(i - 1),
1796 payload: commitment,
1797 };
1798 let finalization = make_finalization(proposal, &schemes, QUORUM);
1799 actor.report(Activity::Finalization(finalization)).await;
1800
1801 parent = block.digest();
1802 }
1803
1804 let (_, commitment) = actor.get_info(Identifier::Latest).await.unwrap();
1806 let ancestry = actor.ancestry((None, commitment)).await.unwrap();
1807 let blocks = ancestry.collect::<Vec<_>>().await;
1808
1809 assert_eq!(blocks.len(), 5);
1811 (0..5).for_each(|i| {
1812 assert_eq!(blocks[i].height(), Height::new(5 - i as u64));
1813 });
1814 })
1815 }
1816
1817 #[test_traced("WARN")]
1818 fn test_marshaled_rejects_invalid_ancestry() {
1819 #[derive(Clone)]
1820 struct MockVerifyingApp {
1821 genesis: B,
1822 }
1823
1824 impl crate::Application<deterministic::Context> for MockVerifyingApp {
1825 type Block = B;
1826 type Context = Context<D, K>;
1827 type SigningScheme = S;
1828
1829 async fn genesis(&mut self) -> Self::Block {
1830 self.genesis.clone()
1831 }
1832
1833 async fn propose(
1834 &mut self,
1835 _context: (deterministic::Context, Self::Context),
1836 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1837 ) -> Option<Self::Block> {
1838 None
1839 }
1840 }
1841
1842 impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
1843 async fn verify(
1844 &mut self,
1845 _context: (deterministic::Context, Self::Context),
1846 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1847 ) -> bool {
1848 true
1850 }
1851 }
1852
1853 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1854 runner.start(|mut context| async move {
1855 let mut oracle = setup_network(context.clone(), None);
1856 let Fixture {
1857 participants,
1858 schemes,
1859 ..
1860 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1861
1862 let me = participants[0].clone();
1863 let (_base_app, marshal, _processed_height) = setup_validator(
1864 context.with_label("validator_0"),
1865 &mut oracle,
1866 me.clone(),
1867 ConstantProvider::new(schemes[0].clone()),
1868 )
1869 .await;
1870
1871 let genesis = make_block(Sha256::hash(b""), Height::zero(), 0);
1873
1874 let mock_app = MockVerifyingApp {
1876 genesis: genesis.clone(),
1877 };
1878 let mut marshaled = Marshaled::new(
1879 context.clone(),
1880 mock_app,
1881 marshal.clone(),
1882 FixedEpocher::new(BLOCKS_PER_EPOCH),
1883 );
1884
1885 let honest_parent = make_block(
1892 genesis.commitment(),
1893 Height::new(BLOCKS_PER_EPOCH.get() + 1),
1894 1000,
1895 );
1896 let parent_commitment = honest_parent.commitment();
1897 let parent_round = Round::new(Epoch::new(1), View::new(21));
1898 marshal
1899 .clone()
1900 .verified(parent_round, honest_parent.clone())
1901 .await;
1902
1903 let malicious_block = make_block(
1907 parent_commitment,
1908 Height::new(BLOCKS_PER_EPOCH.get() + 15),
1909 2000,
1910 );
1911 let malicious_commitment = malicious_block.commitment();
1912 marshal
1913 .clone()
1914 .proposed(
1915 Round::new(Epoch::new(1), View::new(35)),
1916 malicious_block.clone(),
1917 )
1918 .await;
1919
1920 context.sleep(Duration::from_millis(10)).await;
1922
1923 let byzantine_round = Round::new(Epoch::new(1), View::new(35));
1926 let byzantine_context = Context {
1927 round: byzantine_round,
1928 leader: me.clone(),
1929 parent: (View::new(21), parent_commitment), };
1931
1932 let _ = marshaled
1939 .verify(byzantine_context, malicious_commitment)
1940 .await
1941 .await;
1942 let verify = marshaled
1943 .certify(byzantine_round, malicious_commitment)
1944 .await;
1945
1946 assert!(
1947 !verify.await.unwrap(),
1948 "Byzantine block with non-contiguous heights should be rejected"
1949 );
1950
1951 let malicious_block = make_block(
1955 genesis.commitment(),
1956 Height::new(BLOCKS_PER_EPOCH.get() + 2),
1957 3000,
1958 );
1959 let malicious_commitment = malicious_block.commitment();
1960 marshal
1961 .clone()
1962 .proposed(
1963 Round::new(Epoch::new(1), View::new(22)),
1964 malicious_block.clone(),
1965 )
1966 .await;
1967
1968 context.sleep(Duration::from_millis(10)).await;
1970
1971 let byzantine_round = Round::new(Epoch::new(1), View::new(22));
1974 let byzantine_context = Context {
1975 round: byzantine_round,
1976 leader: me.clone(),
1977 parent: (View::new(21), parent_commitment), };
1979
1980 let _ = marshaled
1988 .verify(byzantine_context, malicious_commitment)
1989 .await
1990 .await;
1991 let verify = marshaled
1992 .certify(byzantine_round, malicious_commitment)
1993 .await;
1994
1995 assert!(
1996 !verify.await.unwrap(),
1997 "Byzantine block with mismatched parent commitment should be rejected"
1998 );
1999 })
2000 }
2001
2002 #[test_traced("WARN")]
2003 fn test_finalize_same_height_different_views() {
2004 let runner = deterministic::Runner::timed(Duration::from_secs(60));
2005 runner.start(|mut context| async move {
2006 let mut oracle = setup_network(context.clone(), None);
2007 let Fixture {
2008 participants,
2009 schemes,
2010 ..
2011 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2012
2013 let mut actors = Vec::new();
2015 for (i, validator) in participants.iter().enumerate().take(2) {
2016 let (_app, actor, _processed_height) = setup_validator(
2017 context.with_label(&format!("validator_{i}")),
2018 &mut oracle,
2019 validator.clone(),
2020 ConstantProvider::new(schemes[i].clone()),
2021 )
2022 .await;
2023 actors.push(actor);
2024 }
2025
2026 let parent = Sha256::hash(b"");
2028 let block = make_block(parent, Height::new(1), 1);
2029 let commitment = block.digest();
2030
2031 actors[0]
2033 .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
2034 .await;
2035 actors[1]
2036 .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
2037 .await;
2038
2039 let proposal_v1 = Proposal {
2041 round: Round::new(Epoch::new(0), View::new(1)),
2042 parent: View::new(0),
2043 payload: commitment,
2044 };
2045 let notarization_v1 = make_notarization(proposal_v1.clone(), &schemes, QUORUM);
2046 let finalization_v1 = make_finalization(proposal_v1.clone(), &schemes, QUORUM);
2047 actors[0]
2048 .report(Activity::Notarization(notarization_v1.clone()))
2049 .await;
2050 actors[0]
2051 .report(Activity::Finalization(finalization_v1.clone()))
2052 .await;
2053
2054 let proposal_v2 = Proposal {
2058 round: Round::new(Epoch::new(0), View::new(2)), parent: View::new(0),
2060 payload: commitment, };
2062 let notarization_v2 = make_notarization(proposal_v2.clone(), &schemes, QUORUM);
2063 let finalization_v2 = make_finalization(proposal_v2.clone(), &schemes, QUORUM);
2064 actors[1]
2065 .report(Activity::Notarization(notarization_v2.clone()))
2066 .await;
2067 actors[1]
2068 .report(Activity::Finalization(finalization_v2.clone()))
2069 .await;
2070
2071 context.sleep(Duration::from_millis(100)).await;
2073
2074 let block0 = actors[0].get_block(Height::new(1)).await.unwrap();
2076 let block1 = actors[1].get_block(Height::new(1)).await.unwrap();
2077 assert_eq!(block0, block);
2078 assert_eq!(block1, block);
2079
2080 let fin0 = actors[0].get_finalization(Height::new(1)).await.unwrap();
2082 let fin1 = actors[1].get_finalization(Height::new(1)).await.unwrap();
2083
2084 assert_eq!(fin0.proposal.payload, block.commitment());
2086 assert_eq!(fin0.round().view(), View::new(1));
2087 assert_eq!(fin1.proposal.payload, block.commitment());
2088 assert_eq!(fin1.round().view(), View::new(2));
2089
2090 assert_eq!(
2092 actors[0].get_info(Height::new(1)).await,
2093 Some((Height::new(1), commitment))
2094 );
2095 assert_eq!(
2096 actors[1].get_info(Height::new(1)).await,
2097 Some((Height::new(1), commitment))
2098 );
2099
2100 actors[0]
2103 .report(Activity::Finalization(finalization_v2.clone()))
2104 .await;
2105 actors[1]
2106 .report(Activity::Finalization(finalization_v1.clone()))
2107 .await;
2108 context.sleep(Duration::from_millis(100)).await;
2109
2110 let fin0_after = actors[0].get_finalization(Height::new(1)).await.unwrap();
2112 assert_eq!(fin0_after.round().view(), View::new(1));
2113
2114 let fin0_after = actors[1].get_finalization(Height::new(1)).await.unwrap();
2116 assert_eq!(fin0_after.round().view(), View::new(2));
2117 })
2118 }
2119
2120 #[test_traced("WARN")]
2121 fn test_init_processed_height() {
2122 let runner = deterministic::Runner::timed(Duration::from_secs(60));
2123 runner.start(|mut context| async move {
2124 let mut oracle = setup_network(context.clone(), None);
2125 let Fixture {
2126 participants,
2127 schemes,
2128 ..
2129 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2130
2131 let me = participants[0].clone();
2133 let (application, mut actor, initial_height) = setup_validator(
2134 context.with_label("validator_0"),
2135 &mut oracle,
2136 me.clone(),
2137 ConstantProvider::new(schemes[0].clone()),
2138 )
2139 .await;
2140 assert_eq!(initial_height, Height::zero());
2141
2142 let mut parent = Sha256::hash(b"");
2144 let mut blocks = Vec::new();
2145 for i in 1..=3 {
2146 let block = make_block(parent, Height::new(i), i);
2147 let commitment = block.digest();
2148 let round = Round::new(Epoch::new(0), View::new(i));
2149
2150 actor.verified(round, block.clone()).await;
2151 let proposal = Proposal {
2152 round,
2153 parent: View::new(i - 1),
2154 payload: commitment,
2155 };
2156 let finalization = make_finalization(proposal, &schemes, QUORUM);
2157 actor.report(Activity::Finalization(finalization)).await;
2158
2159 blocks.push(block);
2160 parent = commitment;
2161 }
2162
2163 while application.blocks().len() < 3 {
2165 context.sleep(Duration::from_millis(10)).await;
2166 }
2167
2168 actor.set_floor(Height::new(3)).await;
2170 context.sleep(Duration::from_millis(10)).await;
2171
2172 assert_eq!(application.blocks().len(), 3);
2174 assert_eq!(
2175 application.tip(),
2176 Some((Height::new(3), blocks[2].digest()))
2177 );
2178
2179 let (_restart_application, _restart_actor, restart_height) = setup_validator(
2181 context.with_label("validator_0_restart"),
2182 &mut oracle,
2183 me,
2184 ConstantProvider::new(schemes[0].clone()),
2185 )
2186 .await;
2187
2188 assert_eq!(restart_height, Height::new(3));
2189 })
2190 }
2191
2192 #[test_traced("WARN")]
2193 fn test_marshaled_rejects_unsupported_epoch() {
2194 #[derive(Clone)]
2195 struct MockVerifyingApp {
2196 genesis: B,
2197 }
2198
2199 impl crate::Application<deterministic::Context> for MockVerifyingApp {
2200 type Block = B;
2201 type Context = Context<D, K>;
2202 type SigningScheme = S;
2203
2204 async fn genesis(&mut self) -> Self::Block {
2205 self.genesis.clone()
2206 }
2207
2208 async fn propose(
2209 &mut self,
2210 _context: (deterministic::Context, Self::Context),
2211 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2212 ) -> Option<Self::Block> {
2213 None
2214 }
2215 }
2216
2217 impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
2218 async fn verify(
2219 &mut self,
2220 _context: (deterministic::Context, Self::Context),
2221 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2222 ) -> bool {
2223 true
2224 }
2225 }
2226
2227 #[derive(Clone)]
2228 struct LimitedEpocher {
2229 inner: FixedEpocher,
2230 max_epoch: u64,
2231 }
2232
2233 impl Epocher for LimitedEpocher {
2234 fn containing(&self, height: Height) -> Option<crate::types::EpochInfo> {
2235 let bounds = self.inner.containing(height)?;
2236 if bounds.epoch().get() > self.max_epoch {
2237 None
2238 } else {
2239 Some(bounds)
2240 }
2241 }
2242
2243 fn first(&self, epoch: Epoch) -> Option<Height> {
2244 if epoch.get() > self.max_epoch {
2245 None
2246 } else {
2247 self.inner.first(epoch)
2248 }
2249 }
2250
2251 fn last(&self, epoch: Epoch) -> Option<Height> {
2252 if epoch.get() > self.max_epoch {
2253 None
2254 } else {
2255 self.inner.last(epoch)
2256 }
2257 }
2258 }
2259
2260 let runner = deterministic::Runner::timed(Duration::from_secs(60));
2261 runner.start(|mut context| async move {
2262 let mut oracle = setup_network(context.clone(), None);
2263 let Fixture {
2264 participants,
2265 schemes,
2266 ..
2267 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2268
2269 let me = participants[0].clone();
2270 let (_base_app, marshal, _processed_height) = setup_validator(
2271 context.with_label("validator_0"),
2272 &mut oracle,
2273 me.clone(),
2274 ConstantProvider::new(schemes[0].clone()),
2275 )
2276 .await;
2277
2278 let genesis = make_block(Sha256::hash(b""), Height::zero(), 0);
2279
2280 let mock_app = MockVerifyingApp {
2281 genesis: genesis.clone(),
2282 };
2283 let limited_epocher = LimitedEpocher {
2284 inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
2285 max_epoch: 0,
2286 };
2287 let mut marshaled =
2288 Marshaled::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
2289
2290 let parent = make_block(genesis.commitment(), Height::new(19), 1000);
2292 let parent_commitment = parent.commitment();
2293 let parent_round = Round::new(Epoch::new(0), View::new(19));
2294 marshal.clone().verified(parent_round, parent).await;
2295
2296 let block = make_block(parent_commitment, Height::new(20), 2000);
2298 let block_commitment = block.commitment();
2299 marshal
2300 .clone()
2301 .proposed(Round::new(Epoch::new(1), View::new(20)), block)
2302 .await;
2303
2304 context.sleep(Duration::from_millis(10)).await;
2305
2306 let unsupported_round = Round::new(Epoch::new(1), View::new(20));
2307 let unsupported_context = Context {
2308 round: unsupported_round,
2309 leader: me.clone(),
2310 parent: (View::new(19), parent_commitment),
2311 };
2312
2313 let verify_result = marshaled
2314 .verify(unsupported_context, block_commitment)
2315 .await
2316 .await;
2317
2318 assert!(
2319 !verify_result.unwrap(),
2320 "Block in unsupported epoch should be rejected"
2321 );
2322 })
2323 }
2324
2325 #[test_traced("INFO")]
2337 fn test_certify_lower_view_after_higher_view() {
2338 #[derive(Clone)]
2339 struct MockVerifyingApp {
2340 genesis: B,
2341 }
2342
2343 impl crate::Application<deterministic::Context> for MockVerifyingApp {
2344 type Block = B;
2345 type Context = Context<D, K>;
2346 type SigningScheme = S;
2347
2348 async fn genesis(&mut self) -> Self::Block {
2349 self.genesis.clone()
2350 }
2351
2352 async fn propose(
2353 &mut self,
2354 _context: (deterministic::Context, Self::Context),
2355 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2356 ) -> Option<Self::Block> {
2357 None
2358 }
2359 }
2360
2361 impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
2362 async fn verify(
2363 &mut self,
2364 _context: (deterministic::Context, Self::Context),
2365 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2366 ) -> bool {
2367 true
2368 }
2369 }
2370
2371 let runner = deterministic::Runner::timed(Duration::from_secs(60));
2372 runner.start(|mut context| async move {
2373 let mut oracle = setup_network(context.clone(), None);
2374 let Fixture {
2375 participants,
2376 schemes,
2377 ..
2378 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2379
2380 let me = participants[0].clone();
2381 let (_base_app, marshal, _processed_height) = setup_validator(
2382 context.with_label("validator_0"),
2383 &mut oracle,
2384 me.clone(),
2385 ConstantProvider::new(schemes[0].clone()),
2386 )
2387 .await;
2388
2389 let genesis = make_block(Sha256::hash(b""), Height::zero(), 0);
2390
2391 let mock_app = MockVerifyingApp {
2392 genesis: genesis.clone(),
2393 };
2394 let mut marshaled = Marshaled::new(
2395 context.clone(),
2396 mock_app,
2397 marshal.clone(),
2398 FixedEpocher::new(BLOCKS_PER_EPOCH),
2399 );
2400
2401 let parent = make_block(genesis.commitment(), Height::new(1), 100);
2403 let parent_commitment = parent.commitment();
2404 let parent_round = Round::new(Epoch::new(0), View::new(1));
2405 marshal.clone().verified(parent_round, parent).await;
2406
2407 let round_a = Round::new(Epoch::new(0), View::new(5));
2409 let context_a = Context {
2410 round: round_a,
2411 leader: me.clone(),
2412 parent: (View::new(1), parent_commitment),
2413 };
2414 let block_a =
2415 B::new::<Sha256>(context_a.clone(), parent_commitment, Height::new(2), 200);
2416 let commitment_a = block_a.commitment();
2417 marshal.clone().proposed(round_a, block_a).await;
2418
2419 let round_b = Round::new(Epoch::new(0), View::new(10));
2422 let context_b = Context {
2423 round: round_b,
2424 leader: me.clone(),
2425 parent: (View::new(1), parent_commitment),
2426 };
2427 let block_b =
2428 B::new::<Sha256>(context_b.clone(), parent_commitment, Height::new(2), 300);
2429 let commitment_b = block_b.commitment();
2430 marshal.clone().proposed(round_b, block_b).await;
2431
2432 context.sleep(Duration::from_millis(10)).await;
2433
2434 let _ = marshaled.verify(context_a, commitment_a).await.await;
2436
2437 let _ = marshaled.verify(context_b, commitment_b).await.await;
2439
2440 let certify_b = marshaled.certify(round_b, commitment_b).await;
2442 assert!(
2443 certify_b.await.unwrap(),
2444 "Block B certification should succeed"
2445 );
2446
2447 let certify_a = marshaled.certify(round_a, commitment_a).await;
2449
2450 select! {
2452 result = certify_a => {
2453 assert!(result.unwrap(), "Block A certification should succeed");
2454 },
2455 _ = context.sleep(Duration::from_secs(5)) => {
2456 panic!("Block A certification timed out");
2457 },
2458 }
2459 })
2460 }
2461
2462 #[test_traced("INFO")]
2471 fn test_marshaled_reproposal_validation() {
2472 #[derive(Clone)]
2473 struct MockVerifyingApp {
2474 genesis: B,
2475 }
2476
2477 impl crate::Application<deterministic::Context> for MockVerifyingApp {
2478 type Block = B;
2479 type Context = Context<D, K>;
2480 type SigningScheme = S;
2481
2482 async fn genesis(&mut self) -> Self::Block {
2483 self.genesis.clone()
2484 }
2485
2486 async fn propose(
2487 &mut self,
2488 _context: (deterministic::Context, Self::Context),
2489 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2490 ) -> Option<Self::Block> {
2491 None
2492 }
2493 }
2494
2495 impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
2496 async fn verify(
2497 &mut self,
2498 _context: (deterministic::Context, Self::Context),
2499 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
2500 ) -> bool {
2501 true
2502 }
2503 }
2504
2505 let runner = deterministic::Runner::timed(Duration::from_secs(60));
2506 runner.start(|mut context| async move {
2507 let mut oracle = setup_network(context.clone(), None);
2508 let Fixture {
2509 participants,
2510 schemes,
2511 ..
2512 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2513
2514 let me = participants[0].clone();
2515 let (_base_app, marshal, _processed_height) = setup_validator(
2516 context.with_label("validator_0"),
2517 &mut oracle,
2518 me.clone(),
2519 ConstantProvider::new(schemes[0].clone()),
2520 )
2521 .await;
2522
2523 let genesis = make_block(Sha256::hash(b""), Height::zero(), 0);
2524
2525 let mock_app = MockVerifyingApp {
2526 genesis: genesis.clone(),
2527 };
2528 let mut marshaled = Marshaled::new(
2529 context.clone(),
2530 mock_app,
2531 marshal.clone(),
2532 FixedEpocher::new(BLOCKS_PER_EPOCH),
2533 );
2534
2535 let mut parent = genesis.commitment();
2538 let mut last_view = View::zero();
2539 for i in 1..BLOCKS_PER_EPOCH.get() {
2540 let round = Round::new(Epoch::new(0), View::new(i));
2541 let ctx = Context {
2542 round,
2543 leader: me.clone(),
2544 parent: (last_view, parent),
2545 };
2546 let block = B::new::<Sha256>(ctx.clone(), parent, Height::new(i), i * 100);
2547 marshal.clone().verified(round, block.clone()).await;
2548 parent = block.commitment();
2549 last_view = View::new(i);
2550 }
2551
2552 let boundary_height = Height::new(BLOCKS_PER_EPOCH.get() - 1);
2554 let boundary_round = Round::new(Epoch::new(0), View::new(boundary_height.get()));
2555 let boundary_context = Context {
2556 round: boundary_round,
2557 leader: me.clone(),
2558 parent: (last_view, parent),
2559 };
2560 let boundary_block = B::new::<Sha256>(
2561 boundary_context.clone(),
2562 parent,
2563 boundary_height,
2564 boundary_height.get() * 100,
2565 );
2566 let boundary_commitment = boundary_block.commitment();
2567 marshal
2568 .clone()
2569 .verified(boundary_round, boundary_block.clone())
2570 .await;
2571
2572 marshal
2574 .clone()
2575 .proposed(boundary_round, boundary_block.clone())
2576 .await;
2577
2578 context.sleep(Duration::from_millis(10)).await;
2579
2580 let reproposal_round = Round::new(Epoch::new(0), View::new(20));
2584 let reproposal_context = Context {
2585 round: reproposal_round,
2586 leader: me.clone(),
2587 parent: (View::new(boundary_height.get()), boundary_commitment), };
2589
2590 let verify_result = marshaled
2592 .verify(reproposal_context.clone(), boundary_commitment)
2593 .await
2594 .await;
2595 assert!(
2596 verify_result.unwrap(),
2597 "Valid re-proposal at epoch boundary should be accepted"
2598 );
2599
2600 let non_boundary_height = Height::new(10);
2603 let non_boundary_round = Round::new(Epoch::new(0), View::new(10));
2604 let non_boundary_context = Context {
2605 round: non_boundary_round,
2606 leader: me.clone(),
2607 parent: (View::new(9), parent),
2608 };
2609 let non_boundary_block = B::new::<Sha256>(
2610 non_boundary_context.clone(),
2611 parent,
2612 non_boundary_height,
2613 1000,
2614 );
2615 let non_boundary_commitment = non_boundary_block.commitment();
2616
2617 marshal
2619 .clone()
2620 .proposed(non_boundary_round, non_boundary_block.clone())
2621 .await;
2622
2623 context.sleep(Duration::from_millis(10)).await;
2624
2625 let invalid_reproposal_round = Round::new(Epoch::new(0), View::new(15));
2627 let invalid_reproposal_context = Context {
2628 round: invalid_reproposal_round,
2629 leader: me.clone(),
2630 parent: (View::new(10), non_boundary_commitment),
2631 };
2632
2633 let verify_result = marshaled
2634 .verify(invalid_reproposal_context, non_boundary_commitment)
2635 .await
2636 .await;
2637 assert!(
2638 !verify_result.unwrap(),
2639 "Invalid re-proposal (not at epoch boundary) should be rejected"
2640 );
2641
2642 let cross_epoch_reproposal_round = Round::new(Epoch::new(1), View::new(20));
2645 let cross_epoch_reproposal_context = Context {
2646 round: cross_epoch_reproposal_round,
2647 leader: me.clone(),
2648 parent: (View::new(boundary_height.get()), boundary_commitment),
2649 };
2650
2651 let verify_result = marshaled
2652 .verify(cross_epoch_reproposal_context, boundary_commitment)
2653 .await
2654 .await;
2655 assert!(
2656 !verify_result.unwrap(),
2657 "Re-proposal with mismatched epoch should be rejected"
2658 );
2659
2660 let certify_only_round = Round::new(Epoch::new(0), View::new(21));
2664 let certify_result = marshaled
2665 .certify(certify_only_round, boundary_commitment)
2666 .await
2667 .await;
2668 assert!(
2669 certify_result.unwrap(),
2670 "Certify-only path for re-proposal should succeed"
2671 );
2672
2673 let normal_height = Height::new(1);
2677 let normal_round = Round::new(Epoch::new(0), View::new(100));
2678 let genesis_commitment = genesis.commitment();
2679
2680 let normal_context = Context {
2681 round: normal_round,
2682 leader: me.clone(),
2683 parent: (View::zero(), genesis_commitment),
2684 };
2685 let normal_block = B::new::<Sha256>(
2686 normal_context.clone(),
2687 genesis_commitment,
2688 normal_height,
2689 500,
2690 );
2691 let normal_commitment = normal_block.commitment();
2692 marshal
2693 .clone()
2694 .proposed(normal_round, normal_block.clone())
2695 .await;
2696
2697 context.sleep(Duration::from_millis(10)).await;
2698
2699 let certify_result = marshaled
2701 .certify(normal_round, normal_commitment)
2702 .await
2703 .await;
2704 assert!(
2705 certify_result.unwrap(),
2706 "Certify-only path for normal block should succeed"
2707 );
2708 })
2709 }
2710
2711 #[test_traced("INFO")]
2712 fn test_broadcast_caches_block() {
2713 let runner = deterministic::Runner::timed(Duration::from_secs(60));
2714 runner.start(|mut context| async move {
2715 let mut oracle = setup_network(context.clone(), None);
2716 let Fixture {
2717 participants,
2718 schemes,
2719 ..
2720 } = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2721
2722 let (i, validator) = participants.iter().enumerate().next().unwrap();
2724 let mut actor = setup_validator(
2725 context.with_label(&format!("validator_{i}")),
2726 &mut oracle,
2727 validator.clone(),
2728 ConstantProvider::new(schemes[i].clone()),
2729 )
2730 .await
2731 .1;
2732
2733 let parent = Sha256::hash(b"");
2735 let block = make_block(parent, Height::new(1), 1);
2736 let commitment = block.digest();
2737
2738 actor
2740 .proposed(Round::new(Epoch::new(0), View::new(1)), block.clone())
2741 .await;
2742
2743 actor
2746 .get_block(&commitment)
2747 .await
2748 .expect("block should be cached after broadcast");
2749
2750 let mut actor = setup_validator(
2752 context.with_label(&format!("validator_{i}_restart")),
2753 &mut oracle,
2754 validator.clone(),
2755 ConstantProvider::new(schemes[i].clone()),
2756 )
2757 .await
2758 .1;
2759
2760 let notarization = make_notarization(
2764 Proposal {
2765 round: Round::new(Epoch::new(0), View::new(1)),
2766 parent: View::new(0),
2767 payload: commitment,
2768 },
2769 &schemes,
2770 QUORUM,
2771 );
2772 actor.report(Activity::Notarization(notarization)).await;
2773
2774 let fetched = actor
2776 .get_block(&commitment)
2777 .await
2778 .expect("block should be cached after broadcast");
2779 assert_eq!(fetched, block);
2780 });
2781 }
2782}