1pub mod actor;
66pub use actor::Actor;
67pub mod cache;
68pub mod config;
69pub use config::Config;
70pub mod ingress;
71pub use ingress::mailbox::Mailbox;
72pub mod resolver;
73pub mod store;
74
75use crate::Block;
76use commonware_utils::{acknowledgement::Exact, Acknowledgement};
77
78#[derive(Clone, Debug)]
83pub enum Update<B: Block, A: Acknowledgement = Exact> {
84 Tip(u64, B::Commitment),
86 Block(B, A),
95}
96
97#[cfg(test)]
98pub mod mocks;
99
100#[cfg(test)]
101mod tests {
102 use super::{
103 actor,
104 config::Config,
105 mocks::{application::Application, block::Block},
106 resolver::p2p as resolver,
107 };
108 use crate::{
109 application::marshaled::Marshaled,
110 marshal::ingress::mailbox::{AncestorStream, Identifier},
111 simplex::{
112 scheme::bls12381_threshold,
113 types::{Activity, Context, Finalization, Finalize, Notarization, Notarize, Proposal},
114 },
115 types::{Epoch, Epocher, FixedEpocher, Round, View, ViewDelta},
116 Automaton, Block as _, Reporter, VerifyingApplication,
117 };
118 use commonware_broadcast::buffered;
119 use commonware_cryptography::{
120 bls12381::primitives::variant::MinPk,
121 certificate::{mocks::Fixture, ConstantProvider, Scheme as _},
122 ed25519::PublicKey,
123 sha256::{Digest as Sha256Digest, Sha256},
124 Committable, Digestible, Hasher as _,
125 };
126 use commonware_macros::test_traced;
127 use commonware_p2p::{
128 simulated::{self, Link, Network, Oracle},
129 Manager,
130 };
131 use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Quota, Runner};
132 use commonware_storage::archive::immutable;
133 use commonware_utils::{vec::NonEmptyVec, NZUsize, NZU64};
134 use futures::StreamExt;
135 use rand::{
136 seq::{IteratorRandom, SliceRandom},
137 Rng,
138 };
139 use std::{
140 collections::BTreeMap,
141 num::{NonZeroU32, NonZeroU64, NonZeroUsize},
142 time::{Duration, Instant},
143 };
144 use tracing::info;
145
146 type D = Sha256Digest;
147 type B = Block<D>;
148 type K = PublicKey;
149 type V = MinPk;
150 type S = bls12381_threshold::Scheme<K, V>;
151 type P = ConstantProvider<S, Epoch>;
152
153 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
154 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
155 const NAMESPACE: &[u8] = b"test";
156 const NUM_VALIDATORS: u32 = 4;
157 const QUORUM: u32 = 3;
158 const NUM_BLOCKS: u64 = 160;
159 const BLOCKS_PER_EPOCH: NonZeroU64 = NZU64!(20);
160 const LINK: Link = Link {
161 latency: Duration::from_millis(100),
162 jitter: Duration::from_millis(1),
163 success_rate: 1.0,
164 };
165 const UNRELIABLE_LINK: Link = Link {
166 latency: Duration::from_millis(200),
167 jitter: Duration::from_millis(50),
168 success_rate: 0.7,
169 };
170 const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX);
171
172 async fn setup_validator(
173 context: deterministic::Context,
174 oracle: &mut Oracle<K, deterministic::Context>,
175 validator: K,
176 provider: P,
177 ) -> (
178 Application<B>,
179 crate::marshal::ingress::mailbox::Mailbox<S, B>,
180 u64,
181 ) {
182 let config = Config {
183 provider,
184 epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
185 mailbox_size: 100,
186 namespace: NAMESPACE.to_vec(),
187 view_retention_timeout: ViewDelta::new(10),
188 max_repair: NZUsize!(10),
189 block_codec_config: (),
190 partition_prefix: format!("validator-{}", validator.clone()),
191 prunable_items_per_section: NZU64!(10),
192 replay_buffer: NZUsize!(1024),
193 write_buffer: NZUsize!(1024),
194 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
195 };
196
197 let mut control = oracle.control(validator.clone());
199 let backfill = control.register(1, TEST_QUOTA).await.unwrap();
200 let resolver_cfg = resolver::Config {
201 public_key: validator.clone(),
202 manager: oracle.manager(),
203 blocker: control.clone(),
204 mailbox_size: config.mailbox_size,
205 initial: Duration::from_secs(1),
206 timeout: Duration::from_secs(2),
207 fetch_retry_timeout: Duration::from_millis(100),
208 priority_requests: false,
209 priority_responses: false,
210 };
211 let resolver = resolver::init(&context, resolver_cfg, backfill);
212
213 let broadcast_config = buffered::Config {
215 public_key: validator.clone(),
216 mailbox_size: config.mailbox_size,
217 deque_size: 10,
218 priority: false,
219 codec_config: (),
220 };
221 let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
222 let network = control.register(2, TEST_QUOTA).await.unwrap();
223 broadcast_engine.start(network);
224
225 let start = Instant::now();
227 let finalizations_by_height = immutable::Archive::init(
228 context.with_label("finalizations_by_height"),
229 immutable::Config {
230 metadata_partition: format!(
231 "{}-finalizations-by-height-metadata",
232 config.partition_prefix
233 ),
234 freezer_table_partition: format!(
235 "{}-finalizations-by-height-freezer-table",
236 config.partition_prefix
237 ),
238 freezer_table_initial_size: 64,
239 freezer_table_resize_frequency: 10,
240 freezer_table_resize_chunk_size: 10,
241 freezer_journal_partition: format!(
242 "{}-finalizations-by-height-freezer-journal",
243 config.partition_prefix
244 ),
245 freezer_journal_target_size: 1024,
246 freezer_journal_compression: None,
247 freezer_journal_buffer_pool: config.buffer_pool.clone(),
248 ordinal_partition: format!(
249 "{}-finalizations-by-height-ordinal",
250 config.partition_prefix
251 ),
252 items_per_section: NZU64!(10),
253 codec_config: S::certificate_codec_config_unbounded(),
254 replay_buffer: config.replay_buffer,
255 write_buffer: config.write_buffer,
256 },
257 )
258 .await
259 .expect("failed to initialize finalizations by height archive");
260 info!(elapsed = ?start.elapsed(), "restored finalizations by height archive");
261
262 let start = Instant::now();
264 let finalized_blocks = immutable::Archive::init(
265 context.with_label("finalized_blocks"),
266 immutable::Config {
267 metadata_partition: format!(
268 "{}-finalized_blocks-metadata",
269 config.partition_prefix
270 ),
271 freezer_table_partition: format!(
272 "{}-finalized_blocks-freezer-table",
273 config.partition_prefix
274 ),
275 freezer_table_initial_size: 64,
276 freezer_table_resize_frequency: 10,
277 freezer_table_resize_chunk_size: 10,
278 freezer_journal_partition: format!(
279 "{}-finalized_blocks-freezer-journal",
280 config.partition_prefix
281 ),
282 freezer_journal_target_size: 1024,
283 freezer_journal_compression: None,
284 freezer_journal_buffer_pool: config.buffer_pool.clone(),
285 ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix),
286 items_per_section: NZU64!(10),
287 codec_config: config.block_codec_config,
288 replay_buffer: config.replay_buffer,
289 write_buffer: config.write_buffer,
290 },
291 )
292 .await
293 .expect("failed to initialize finalized blocks archive");
294 info!(elapsed = ?start.elapsed(), "restored finalized blocks archive");
295
296 let (actor, mailbox, processed_height) = actor::Actor::init(
297 context.clone(),
298 finalizations_by_height,
299 finalized_blocks,
300 config,
301 )
302 .await;
303 let application = Application::<B>::default();
304
305 actor.start(application.clone(), buffer, resolver);
307
308 (application, mailbox, processed_height)
309 }
310
311 fn make_finalization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Finalization<S, D> {
312 let finalizes: Vec<_> = schemes
314 .iter()
315 .take(quorum as usize)
316 .map(|scheme| Finalize::sign(scheme, NAMESPACE, proposal.clone()).unwrap())
317 .collect();
318
319 Finalization::from_finalizes(&schemes[0], &finalizes).unwrap()
321 }
322
323 fn make_notarization(proposal: Proposal<D>, schemes: &[S], quorum: u32) -> Notarization<S, D> {
324 let notarizes: Vec<_> = schemes
326 .iter()
327 .take(quorum as usize)
328 .map(|scheme| Notarize::sign(scheme, NAMESPACE, proposal.clone()).unwrap())
329 .collect();
330
331 Notarization::from_notarizes(&schemes[0], ¬arizes).unwrap()
333 }
334
335 fn setup_network(
336 context: deterministic::Context,
337 tracked_peer_sets: Option<usize>,
338 ) -> Oracle<K, deterministic::Context> {
339 let (network, oracle) = Network::new(
340 context.with_label("network"),
341 simulated::Config {
342 max_size: 1024 * 1024,
343 disconnect_on_block: true,
344 tracked_peer_sets,
345 },
346 );
347 network.start();
348 oracle
349 }
350
351 async fn setup_network_links(
352 oracle: &mut Oracle<K, deterministic::Context>,
353 peers: &[K],
354 link: Link,
355 ) {
356 for p1 in peers.iter() {
357 for p2 in peers.iter() {
358 if p2 == p1 {
359 continue;
360 }
361 let _ = oracle.add_link(p1.clone(), p2.clone(), link.clone()).await;
362 }
363 }
364 }
365
366 #[test_traced("WARN")]
367 fn test_finalize_good_links() {
368 for seed in 0..5 {
369 let result1 = finalize(seed, LINK, false);
370 let result2 = finalize(seed, LINK, false);
371
372 assert_eq!(result1, result2);
374 }
375 }
376
377 #[test_traced("WARN")]
378 fn test_finalize_bad_links() {
379 for seed in 0..5 {
380 let result1 = finalize(seed, UNRELIABLE_LINK, false);
381 let result2 = finalize(seed, UNRELIABLE_LINK, false);
382
383 assert_eq!(result1, result2);
385 }
386 }
387
388 #[test_traced("WARN")]
389 fn test_finalize_good_links_quorum_sees_finalization() {
390 for seed in 0..5 {
391 let result1 = finalize(seed, LINK, true);
392 let result2 = finalize(seed, LINK, true);
393
394 assert_eq!(result1, result2);
396 }
397 }
398
399 #[test_traced("DEBUG")]
400 fn test_finalize_bad_links_quorum_sees_finalization() {
401 for seed in 0..5 {
402 let result1 = finalize(seed, UNRELIABLE_LINK, true);
403 let result2 = finalize(seed, UNRELIABLE_LINK, true);
404
405 assert_eq!(result1, result2);
407 }
408 }
409
410 fn finalize(seed: u64, link: Link, quorum_sees_finalization: bool) -> String {
411 let runner = deterministic::Runner::new(
412 deterministic::Config::new()
413 .with_seed(seed)
414 .with_timeout(Some(Duration::from_secs(600))),
415 );
416 runner.start(|mut context| async move {
417 let mut oracle = setup_network(context.clone(), Some(3));
418 let Fixture {
419 participants,
420 schemes,
421 ..
422 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
423
424 let mut applications = BTreeMap::new();
426 let mut actors = Vec::new();
427
428 let mut manager = oracle.manager();
430 manager
431 .update(0, participants.clone().try_into().unwrap())
432 .await;
433 for (i, validator) in participants.iter().enumerate() {
434 let (application, actor, _processed_height) = setup_validator(
435 context.with_label(&format!("validator_{i}")),
436 &mut oracle,
437 validator.clone(),
438 ConstantProvider::new(schemes[i].clone()),
439 )
440 .await;
441 applications.insert(validator.clone(), application);
442 actors.push(actor);
443 }
444
445 setup_network_links(&mut oracle, &participants, link.clone()).await;
447
448 let mut blocks = Vec::<B>::new();
450 let mut parent = Sha256::hash(b"");
451 for i in 1..=NUM_BLOCKS {
452 let block = B::new::<Sha256>(parent, i, i);
453 parent = block.digest();
454 blocks.push(block);
455 }
456
457 let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
459 blocks.shuffle(&mut context);
460 for block in blocks.iter() {
461 let height = block.height();
463 assert!(height > 0, "genesis block should not have been generated");
464
465 let bounds = epocher.containing(height).unwrap();
467 let round = Round::new(bounds.epoch(), View::new(height));
468
469 let actor_index: usize = (height % (NUM_VALIDATORS as u64)) as usize;
471 let mut actor = actors[actor_index].clone();
472 actor.proposed(round, block.clone()).await;
473 actor.verified(round, block.clone()).await;
474
475 context.sleep(link.latency).await;
478
479 let proposal = Proposal {
481 round,
482 parent: View::new(height.checked_sub(1).unwrap()),
483 payload: block.digest(),
484 };
485 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
486 actor
487 .report(Activity::Notarization(notarization.clone()))
488 .await;
489
490 let fin = make_finalization(proposal, &schemes, QUORUM);
493 if quorum_sees_finalization {
494 let do_finalize = context.gen_bool(0.2);
497 for (i, actor) in actors
498 .iter_mut()
499 .choose_multiple(&mut context, NUM_VALIDATORS as usize)
500 .iter_mut()
501 .enumerate()
502 {
503 if (do_finalize && i < QUORUM as usize)
504 || height == NUM_BLOCKS
505 || height == bounds.last()
506 {
507 actor.report(Activity::Finalization(fin.clone())).await;
508 }
509 }
510 } else {
511 for actor in actors.iter_mut() {
514 if context.gen_bool(0.2) || height == NUM_BLOCKS || height == bounds.last()
515 {
516 actor.report(Activity::Finalization(fin.clone())).await;
517 }
518 }
519 }
520 }
521
522 let mut finished = false;
524 while !finished {
525 context.sleep(Duration::from_secs(1)).await;
527
528 if applications.len() != NUM_VALIDATORS as usize {
530 continue;
531 }
532 finished = true;
533 for app in applications.values() {
534 if app.blocks().len() != NUM_BLOCKS as usize {
535 finished = false;
536 break;
537 }
538 let Some((height, _)) = app.tip() else {
539 finished = false;
540 break;
541 };
542 if height < NUM_BLOCKS {
543 finished = false;
544 break;
545 }
546 }
547 }
548
549 context.auditor().state()
551 })
552 }
553
554 #[test_traced("WARN")]
555 fn test_sync_height_floor() {
556 let runner = deterministic::Runner::new(
557 deterministic::Config::new()
558 .with_seed(0xFF)
559 .with_timeout(Some(Duration::from_secs(300))),
560 );
561 runner.start(|mut context| async move {
562 let mut oracle = setup_network(context.clone(), Some(3));
563 let Fixture {
564 participants,
565 schemes,
566 ..
567 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
568
569 let mut applications = BTreeMap::new();
571 let mut actors = Vec::new();
572
573 let mut manager = oracle.manager();
575 manager
576 .update(0, participants.clone().try_into().unwrap())
577 .await;
578 for (i, validator) in participants.iter().enumerate().skip(1) {
579 let (application, actor, _processed_height) = setup_validator(
580 context.with_label(&format!("validator_{i}")),
581 &mut oracle,
582 validator.clone(),
583 ConstantProvider::new(schemes[i].clone()),
584 )
585 .await;
586 applications.insert(validator.clone(), application);
587 actors.push(actor);
588 }
589
590 setup_network_links(&mut oracle, &participants[1..], LINK).await;
593
594 let mut blocks = Vec::<B>::new();
596 let mut parent = Sha256::hash(b"");
597 for i in 1..=NUM_BLOCKS {
598 let block = B::new::<Sha256>(parent, i, i);
599 parent = block.digest();
600 blocks.push(block);
601 }
602
603 let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
605 for block in blocks.iter() {
606 let height = block.height();
608 assert!(height > 0, "genesis block should not have been generated");
609
610 let bounds = epocher.containing(height).unwrap();
612 let round = Round::new(bounds.epoch(), View::new(height));
613
614 let actor_index: usize = (height % (applications.len() as u64)) as usize;
616 let mut actor = actors[actor_index].clone();
617 actor.proposed(round, block.clone()).await;
618 actor.verified(round, block.clone()).await;
619
620 context.sleep(LINK.latency).await;
623
624 let proposal = Proposal {
626 round,
627 parent: View::new(height.checked_sub(1).unwrap()),
628 payload: block.digest(),
629 };
630 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
631 actor
632 .report(Activity::Notarization(notarization.clone()))
633 .await;
634
635 let fin = make_finalization(proposal, &schemes, QUORUM);
637 for actor in actors.iter_mut() {
638 actor.report(Activity::Finalization(fin.clone())).await;
639 }
640 }
641
642 let mut finished = false;
644 while !finished {
645 context.sleep(Duration::from_secs(1)).await;
647
648 finished = true;
650 for app in applications.values().skip(1) {
651 if app.blocks().len() != NUM_BLOCKS as usize {
652 finished = false;
653 break;
654 }
655 let Some((height, _)) = app.tip() else {
656 finished = false;
657 break;
658 };
659 if height < NUM_BLOCKS {
660 finished = false;
661 break;
662 }
663 }
664 }
665
666 let validator = participants.first().unwrap();
668 let (app, mut actor, _processed_height) = setup_validator(
669 context.with_label("validator_0"),
670 &mut oracle,
671 validator.clone(),
672 ConstantProvider::new(schemes[0].clone()),
673 )
674 .await;
675
676 setup_network_links(&mut oracle, &participants, LINK).await;
678
679 const NEW_SYNC_FLOOR: u64 = 100;
680 let second_actor = &mut actors[1];
681 let latest_finalization = second_actor.get_finalization(NUM_BLOCKS).await.unwrap();
682
683 actor.set_floor(NEW_SYNC_FLOOR).await;
685
686 actor
689 .report(Activity::Finalization(latest_finalization))
690 .await;
691
692 let mut finished = false;
694 while !finished {
695 context.sleep(Duration::from_secs(1)).await;
697
698 finished = true;
699 if app.blocks().len() != (NUM_BLOCKS - NEW_SYNC_FLOOR) as usize {
700 finished = false;
701 continue;
702 }
703 let Some((height, _)) = app.tip() else {
704 finished = false;
705 continue;
706 };
707 if height < NUM_BLOCKS {
708 finished = false;
709 continue;
710 }
711 }
712
713 for height in 1..=NUM_BLOCKS {
715 let block = actor.get_block(Identifier::Height(height)).await;
716 if height <= NEW_SYNC_FLOOR {
717 assert!(block.is_none());
718 } else {
719 assert_eq!(block.unwrap().height(), height);
720 }
721 }
722 })
723 }
724
725 #[test_traced("WARN")]
726 fn test_subscribe_basic_block_delivery() {
727 let runner = deterministic::Runner::timed(Duration::from_secs(60));
728 runner.start(|mut context| async move {
729 let mut oracle = setup_network(context.clone(), None);
730 let Fixture {
731 participants,
732 schemes,
733 ..
734 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
735
736 let mut actors = Vec::new();
737 for (i, validator) in participants.iter().enumerate() {
738 let (_application, actor, _processed_height) = setup_validator(
739 context.with_label(&format!("validator_{i}")),
740 &mut oracle,
741 validator.clone(),
742 ConstantProvider::new(schemes[i].clone()),
743 )
744 .await;
745 actors.push(actor);
746 }
747 let mut actor = actors[0].clone();
748
749 setup_network_links(&mut oracle, &participants, LINK).await;
750
751 let parent = Sha256::hash(b"");
752 let block = B::new::<Sha256>(parent, 1, 1);
753 let commitment = block.digest();
754
755 let subscription_rx = actor
756 .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment)
757 .await;
758
759 actor
760 .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
761 .await;
762
763 let proposal = Proposal {
764 round: Round::new(Epoch::new(0), View::new(1)),
765 parent: View::new(0),
766 payload: commitment,
767 };
768 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
769 actor.report(Activity::Notarization(notarization)).await;
770
771 let finalization = make_finalization(proposal, &schemes, QUORUM);
772 actor.report(Activity::Finalization(finalization)).await;
773
774 let received_block = subscription_rx.await.unwrap();
775 assert_eq!(received_block.digest(), block.digest());
776 assert_eq!(received_block.height(), 1);
777 })
778 }
779
780 #[test_traced("WARN")]
781 fn test_subscribe_multiple_subscriptions() {
782 let runner = deterministic::Runner::timed(Duration::from_secs(60));
783 runner.start(|mut context| async move {
784 let mut oracle = setup_network(context.clone(), None);
785 let Fixture {
786 participants,
787 schemes,
788 ..
789 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
790
791 let mut actors = Vec::new();
792 for (i, validator) in participants.iter().enumerate() {
793 let (_application, actor, _processed_height) = setup_validator(
794 context.with_label(&format!("validator_{i}")),
795 &mut oracle,
796 validator.clone(),
797 ConstantProvider::new(schemes[i].clone()),
798 )
799 .await;
800 actors.push(actor);
801 }
802 let mut actor = actors[0].clone();
803
804 setup_network_links(&mut oracle, &participants, LINK).await;
805
806 let parent = Sha256::hash(b"");
807 let block1 = B::new::<Sha256>(parent, 1, 1);
808 let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
809 let commitment1 = block1.digest();
810 let commitment2 = block2.digest();
811
812 let sub1_rx = actor
813 .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
814 .await;
815 let sub2_rx = actor
816 .subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
817 .await;
818 let sub3_rx = actor
819 .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
820 .await;
821
822 actor
823 .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
824 .await;
825 actor
826 .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
827 .await;
828
829 for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
830 let view = View::new(view);
831 let proposal = Proposal {
832 round: Round::new(Epoch::zero(), view),
833 parent: view.previous().unwrap(),
834 payload: block.digest(),
835 };
836 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
837 actor.report(Activity::Notarization(notarization)).await;
838
839 let finalization = make_finalization(proposal, &schemes, QUORUM);
840 actor.report(Activity::Finalization(finalization)).await;
841 }
842
843 let received1_sub1 = sub1_rx.await.unwrap();
844 let received2 = sub2_rx.await.unwrap();
845 let received1_sub3 = sub3_rx.await.unwrap();
846
847 assert_eq!(received1_sub1.digest(), block1.digest());
848 assert_eq!(received2.digest(), block2.digest());
849 assert_eq!(received1_sub3.digest(), block1.digest());
850 assert_eq!(received1_sub1.height(), 1);
851 assert_eq!(received2.height(), 2);
852 assert_eq!(received1_sub3.height(), 1);
853 })
854 }
855
856 #[test_traced("WARN")]
857 fn test_subscribe_canceled_subscriptions() {
858 let runner = deterministic::Runner::timed(Duration::from_secs(60));
859 runner.start(|mut context| async move {
860 let mut oracle = setup_network(context.clone(), None);
861 let Fixture {
862 participants,
863 schemes,
864 ..
865 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
866
867 let mut actors = Vec::new();
868 for (i, validator) in participants.iter().enumerate() {
869 let (_application, actor, _processed_height) = setup_validator(
870 context.with_label(&format!("validator_{i}")),
871 &mut oracle,
872 validator.clone(),
873 ConstantProvider::new(schemes[i].clone()),
874 )
875 .await;
876 actors.push(actor);
877 }
878 let mut actor = actors[0].clone();
879
880 setup_network_links(&mut oracle, &participants, LINK).await;
881
882 let parent = Sha256::hash(b"");
883 let block1 = B::new::<Sha256>(parent, 1, 1);
884 let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
885 let commitment1 = block1.digest();
886 let commitment2 = block2.digest();
887
888 let sub1_rx = actor
889 .subscribe(Some(Round::new(Epoch::new(0), View::new(1))), commitment1)
890 .await;
891 let sub2_rx = actor
892 .subscribe(Some(Round::new(Epoch::new(0), View::new(2))), commitment2)
893 .await;
894
895 drop(sub1_rx);
896
897 actor
898 .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
899 .await;
900 actor
901 .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
902 .await;
903
904 for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
905 let view = View::new(view);
906 let proposal = Proposal {
907 round: Round::new(Epoch::zero(), view),
908 parent: view.previous().unwrap(),
909 payload: block.digest(),
910 };
911 let notarization = make_notarization(proposal.clone(), &schemes, QUORUM);
912 actor.report(Activity::Notarization(notarization)).await;
913
914 let finalization = make_finalization(proposal, &schemes, QUORUM);
915 actor.report(Activity::Finalization(finalization)).await;
916 }
917
918 let received2 = sub2_rx.await.unwrap();
919 assert_eq!(received2.digest(), block2.digest());
920 assert_eq!(received2.height(), 2);
921 })
922 }
923
924 #[test_traced("WARN")]
925 fn test_subscribe_blocks_from_different_sources() {
926 let runner = deterministic::Runner::timed(Duration::from_secs(60));
927 runner.start(|mut context| async move {
928 let mut oracle = setup_network(context.clone(), None);
929 let Fixture {
930 participants,
931 schemes,
932 ..
933 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
934
935 let mut actors = Vec::new();
936 for (i, validator) in participants.iter().enumerate() {
937 let (_application, actor, _processed_height) = setup_validator(
938 context.with_label(&format!("validator_{i}")),
939 &mut oracle,
940 validator.clone(),
941 ConstantProvider::new(schemes[i].clone()),
942 )
943 .await;
944 actors.push(actor);
945 }
946 let mut actor = actors[0].clone();
947
948 setup_network_links(&mut oracle, &participants, LINK).await;
949
950 let parent = Sha256::hash(b"");
951 let block1 = B::new::<Sha256>(parent, 1, 1);
952 let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
953 let block3 = B::new::<Sha256>(block2.digest(), 3, 3);
954 let block4 = B::new::<Sha256>(block3.digest(), 4, 4);
955 let block5 = B::new::<Sha256>(block4.digest(), 5, 5);
956
957 let sub1_rx = actor.subscribe(None, block1.digest()).await;
958 let sub2_rx = actor.subscribe(None, block2.digest()).await;
959 let sub3_rx = actor.subscribe(None, block3.digest()).await;
960 let sub4_rx = actor.subscribe(None, block4.digest()).await;
961 let sub5_rx = actor.subscribe(None, block5.digest()).await;
962
963 actor
965 .proposed(Round::new(Epoch::zero(), View::new(1)), block1.clone())
966 .await;
967 context.sleep(Duration::from_millis(20)).await;
968
969 let received1 = sub1_rx.await.unwrap();
971 assert_eq!(received1.digest(), block1.digest());
972 assert_eq!(received1.height(), 1);
973
974 actor
976 .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
977 .await;
978
979 let received2 = sub2_rx.await.unwrap();
981 assert_eq!(received2.digest(), block2.digest());
982 assert_eq!(received2.height(), 2);
983
984 let proposal3 = Proposal {
986 round: Round::new(Epoch::new(0), View::new(3)),
987 parent: View::new(2),
988 payload: block3.digest(),
989 };
990 let notarization3 = make_notarization(proposal3.clone(), &schemes, QUORUM);
991 actor.report(Activity::Notarization(notarization3)).await;
992 actor
993 .verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
994 .await;
995
996 let received3 = sub3_rx.await.unwrap();
998 assert_eq!(received3.digest(), block3.digest());
999 assert_eq!(received3.height(), 3);
1000
1001 let finalization4 = make_finalization(
1003 Proposal {
1004 round: Round::new(Epoch::new(0), View::new(4)),
1005 parent: View::new(3),
1006 payload: block4.digest(),
1007 },
1008 &schemes,
1009 QUORUM,
1010 );
1011 actor.report(Activity::Finalization(finalization4)).await;
1012 actor
1013 .verified(Round::new(Epoch::new(0), View::new(4)), block4.clone())
1014 .await;
1015
1016 let received4 = sub4_rx.await.unwrap();
1018 assert_eq!(received4.digest(), block4.digest());
1019 assert_eq!(received4.height(), 4);
1020
1021 let remote_actor = &mut actors[1].clone();
1023 remote_actor
1024 .proposed(Round::new(Epoch::zero(), View::new(5)), block5.clone())
1025 .await;
1026 context.sleep(Duration::from_millis(20)).await;
1027
1028 let received5 = sub5_rx.await.unwrap();
1030 assert_eq!(received5.digest(), block5.digest());
1031 assert_eq!(received5.height(), 5);
1032 })
1033 }
1034
1035 #[test_traced("WARN")]
1036 fn test_get_info_basic_queries_present_and_missing() {
1037 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1038 runner.start(|mut context| async move {
1039 let mut oracle = setup_network(context.clone(), None);
1040 let Fixture {
1041 participants,
1042 schemes,
1043 ..
1044 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1045
1046 let me = participants[0].clone();
1048 let (_application, mut actor, _processed_height) = setup_validator(
1049 context.with_label("validator_0"),
1050 &mut oracle,
1051 me,
1052 ConstantProvider::new(schemes[0].clone()),
1053 )
1054 .await;
1055
1056 assert!(actor.get_info(Identifier::Latest).await.is_none());
1058
1059 assert!(actor.get_info(1).await.is_none());
1061
1062 let parent = Sha256::hash(b"");
1064 let block = B::new::<Sha256>(parent, 1, 1);
1065 let digest = block.digest();
1066 let round = Round::new(Epoch::new(0), View::new(1));
1067 actor.verified(round, block.clone()).await;
1068
1069 let proposal = Proposal {
1070 round,
1071 parent: View::new(0),
1072 payload: digest,
1073 };
1074 let finalization = make_finalization(proposal, &schemes, QUORUM);
1075 actor.report(Activity::Finalization(finalization)).await;
1076
1077 assert_eq!(actor.get_info(Identifier::Latest).await, Some((1, digest)));
1079
1080 assert_eq!(actor.get_info(1).await, Some((1, digest)));
1082
1083 assert_eq!(actor.get_info(&digest).await, Some((1, digest)));
1085
1086 assert!(actor.get_info(2).await.is_none());
1088
1089 let missing = Sha256::hash(b"missing");
1091 assert!(actor.get_info(&missing).await.is_none());
1092 })
1093 }
1094
1095 #[test_traced("WARN")]
1096 fn test_get_info_latest_progression_multiple_finalizations() {
1097 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1098 runner.start(|mut context| async move {
1099 let mut oracle = setup_network(context.clone(), None);
1100 let Fixture {
1101 participants,
1102 schemes,
1103 ..
1104 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1105
1106 let me = participants[0].clone();
1108 let (_application, mut actor, _processed_height) = setup_validator(
1109 context.with_label("validator_0"),
1110 &mut oracle,
1111 me,
1112 ConstantProvider::new(schemes[0].clone()),
1113 )
1114 .await;
1115
1116 assert!(actor.get_info(Identifier::Latest).await.is_none());
1118
1119 let parent0 = Sha256::hash(b"");
1121 let block1 = B::new::<Sha256>(parent0, 1, 1);
1122 let d1 = block1.digest();
1123 actor
1124 .verified(Round::new(Epoch::new(0), View::new(1)), block1.clone())
1125 .await;
1126 let f1 = make_finalization(
1127 Proposal {
1128 round: Round::new(Epoch::new(0), View::new(1)),
1129 parent: View::new(0),
1130 payload: d1,
1131 },
1132 &schemes,
1133 QUORUM,
1134 );
1135 actor.report(Activity::Finalization(f1)).await;
1136 let latest = actor.get_info(Identifier::Latest).await;
1137 assert_eq!(latest, Some((1, d1)));
1138
1139 let block2 = B::new::<Sha256>(d1, 2, 2);
1140 let d2 = block2.digest();
1141 actor
1142 .verified(Round::new(Epoch::new(0), View::new(2)), block2.clone())
1143 .await;
1144 let f2 = make_finalization(
1145 Proposal {
1146 round: Round::new(Epoch::new(0), View::new(2)),
1147 parent: View::new(1),
1148 payload: d2,
1149 },
1150 &schemes,
1151 QUORUM,
1152 );
1153 actor.report(Activity::Finalization(f2)).await;
1154 let latest = actor.get_info(Identifier::Latest).await;
1155 assert_eq!(latest, Some((2, d2)));
1156
1157 let block3 = B::new::<Sha256>(d2, 3, 3);
1158 let d3 = block3.digest();
1159 actor
1160 .verified(Round::new(Epoch::new(0), View::new(3)), block3.clone())
1161 .await;
1162 let f3 = make_finalization(
1163 Proposal {
1164 round: Round::new(Epoch::new(0), View::new(3)),
1165 parent: View::new(2),
1166 payload: d3,
1167 },
1168 &schemes,
1169 QUORUM,
1170 );
1171 actor.report(Activity::Finalization(f3)).await;
1172 let latest = actor.get_info(Identifier::Latest).await;
1173 assert_eq!(latest, Some((3, d3)));
1174 })
1175 }
1176
1177 #[test_traced("WARN")]
1178 fn test_get_block_by_height_and_latest() {
1179 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1180 runner.start(|mut context| async move {
1181 let mut oracle = setup_network(context.clone(), None);
1182 let Fixture {
1183 participants,
1184 schemes,
1185 ..
1186 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1187
1188 let me = participants[0].clone();
1189 let (application, mut actor, _processed_height) = setup_validator(
1190 context.with_label("validator_0"),
1191 &mut oracle,
1192 me,
1193 ConstantProvider::new(schemes[0].clone()),
1194 )
1195 .await;
1196
1197 let latest_block = actor.get_block(Identifier::Latest).await;
1199 assert!(latest_block.is_none());
1200 assert!(application.tip().is_none());
1201
1202 let parent = Sha256::hash(b"");
1204 let block = B::new::<Sha256>(parent, 1, 1);
1205 let commitment = block.digest();
1206 let round = Round::new(Epoch::new(0), View::new(1));
1207 actor.verified(round, block.clone()).await;
1208 let proposal = Proposal {
1209 round,
1210 parent: View::new(0),
1211 payload: commitment,
1212 };
1213 let finalization = make_finalization(proposal, &schemes, QUORUM);
1214 actor.report(Activity::Finalization(finalization)).await;
1215
1216 let by_height = actor.get_block(1).await.expect("missing block by height");
1218 assert_eq!(by_height.height(), 1);
1219 assert_eq!(by_height.digest(), commitment);
1220 assert_eq!(application.tip(), Some((1, commitment)));
1221
1222 let by_latest = actor
1224 .get_block(Identifier::Latest)
1225 .await
1226 .expect("missing block by latest");
1227 assert_eq!(by_latest.height(), 1);
1228 assert_eq!(by_latest.digest(), commitment);
1229
1230 let by_height = actor.get_block(2).await;
1232 assert!(by_height.is_none());
1233 })
1234 }
1235
1236 #[test_traced("WARN")]
1237 fn test_get_block_by_commitment_from_sources_and_missing() {
1238 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1239 runner.start(|mut context| async move {
1240 let mut oracle = setup_network(context.clone(), None);
1241 let Fixture {
1242 participants,
1243 schemes,
1244 ..
1245 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1246
1247 let me = participants[0].clone();
1248 let (_application, mut actor, _processed_height) = setup_validator(
1249 context.with_label("validator_0"),
1250 &mut oracle,
1251 me,
1252 ConstantProvider::new(schemes[0].clone()),
1253 )
1254 .await;
1255
1256 let parent = Sha256::hash(b"");
1258 let ver_block = B::new::<Sha256>(parent, 1, 1);
1259 let ver_commitment = ver_block.digest();
1260 let round1 = Round::new(Epoch::new(0), View::new(1));
1261 actor.verified(round1, ver_block.clone()).await;
1262 let got = actor
1263 .get_block(&ver_commitment)
1264 .await
1265 .expect("missing block from cache");
1266 assert_eq!(got.digest(), ver_commitment);
1267
1268 let fin_block = B::new::<Sha256>(ver_commitment, 2, 2);
1270 let fin_commitment = fin_block.digest();
1271 let round2 = Round::new(Epoch::new(0), View::new(2));
1272 actor.verified(round2, fin_block.clone()).await;
1273 let proposal = Proposal {
1274 round: round2,
1275 parent: View::new(1),
1276 payload: fin_commitment,
1277 };
1278 let finalization = make_finalization(proposal, &schemes, QUORUM);
1279 actor.report(Activity::Finalization(finalization)).await;
1280 let got = actor
1281 .get_block(&fin_commitment)
1282 .await
1283 .expect("missing block from finalized archive");
1284 assert_eq!(got.digest(), fin_commitment);
1285 assert_eq!(got.height(), 2);
1286
1287 let missing = Sha256::hash(b"definitely-missing");
1289 let missing_block = actor.get_block(&missing).await;
1290 assert!(missing_block.is_none());
1291 })
1292 }
1293
1294 #[test_traced("WARN")]
1295 fn test_get_finalization_by_height() {
1296 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1297 runner.start(|mut context| async move {
1298 let mut oracle = setup_network(context.clone(), None);
1299 let Fixture {
1300 participants,
1301 schemes,
1302 ..
1303 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1304
1305 let me = participants[0].clone();
1306 let (_application, mut actor, _processed_height) = setup_validator(
1307 context.with_label("validator_0"),
1308 &mut oracle,
1309 me,
1310 ConstantProvider::new(schemes[0].clone()),
1311 )
1312 .await;
1313
1314 let finalization = actor.get_finalization(1).await;
1316 assert!(finalization.is_none());
1317
1318 let parent = Sha256::hash(b"");
1320 let block = B::new::<Sha256>(parent, 1, 1);
1321 let commitment = block.digest();
1322 let round = Round::new(Epoch::new(0), View::new(1));
1323 actor.verified(round, block.clone()).await;
1324 let proposal = Proposal {
1325 round,
1326 parent: View::new(0),
1327 payload: commitment,
1328 };
1329 let finalization = make_finalization(proposal, &schemes, QUORUM);
1330 actor.report(Activity::Finalization(finalization)).await;
1331
1332 let finalization = actor
1334 .get_finalization(1)
1335 .await
1336 .expect("missing finalization by height");
1337 assert_eq!(finalization.proposal.parent, View::new(0));
1338 assert_eq!(
1339 finalization.proposal.round,
1340 Round::new(Epoch::new(0), View::new(1))
1341 );
1342 assert_eq!(finalization.proposal.payload, commitment);
1343
1344 assert!(actor.get_finalization(2).await.is_none());
1345 })
1346 }
1347
1348 #[test_traced("WARN")]
1349 fn test_hint_finalized_triggers_fetch() {
1350 let runner = deterministic::Runner::new(
1351 deterministic::Config::new()
1352 .with_seed(42)
1353 .with_timeout(Some(Duration::from_secs(60))),
1354 );
1355 runner.start(|mut context| async move {
1356 let mut oracle = setup_network(context.clone(), Some(3));
1357 let Fixture {
1358 participants,
1359 schemes,
1360 ..
1361 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1362
1363 let mut manager = oracle.manager();
1365 manager
1366 .update(0, participants.clone().try_into().unwrap())
1367 .await;
1368
1369 let (app0, mut actor0, _) = setup_validator(
1371 context.with_label("validator_0"),
1372 &mut oracle,
1373 participants[0].clone(),
1374 ConstantProvider::new(schemes[0].clone()),
1375 )
1376 .await;
1377 let (_app1, mut actor1, _) = setup_validator(
1378 context.with_label("validator_1"),
1379 &mut oracle,
1380 participants[1].clone(),
1381 ConstantProvider::new(schemes[1].clone()),
1382 )
1383 .await;
1384
1385 setup_network_links(&mut oracle, &participants[..2], LINK).await;
1387
1388 let mut parent = Sha256::hash(b"");
1390 for i in 1..=5u64 {
1391 let block = B::new::<Sha256>(parent, i, i);
1392 let commitment = block.digest();
1393 let round = Round::new(Epoch::new(0), View::new(i));
1394
1395 actor0.verified(round, block.clone()).await;
1396 let proposal = Proposal {
1397 round,
1398 parent: View::new(i - 1),
1399 payload: commitment,
1400 };
1401 let finalization = make_finalization(proposal, &schemes, QUORUM);
1402 actor0.report(Activity::Finalization(finalization)).await;
1403
1404 parent = commitment;
1405 }
1406
1407 while app0.blocks().len() < 5 {
1409 context.sleep(Duration::from_millis(10)).await;
1410 }
1411
1412 assert!(actor1.get_finalization(5).await.is_none());
1414
1415 actor1
1417 .hint_finalized(5, NonEmptyVec::new(participants[0].clone()))
1418 .await;
1419
1420 while actor1.get_finalization(5).await.is_none() {
1422 context.sleep(Duration::from_millis(10)).await;
1423 }
1424
1425 let finalization = actor1
1427 .get_finalization(5)
1428 .await
1429 .expect("finalization should be fetched");
1430 assert_eq!(finalization.proposal.round.view(), View::new(5));
1431 })
1432 }
1433
1434 #[test_traced("WARN")]
1435 fn test_ancestry_stream() {
1436 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1437 runner.start(|mut context| async move {
1438 let mut oracle = setup_network(context.clone(), None);
1439 let Fixture {
1440 participants,
1441 schemes,
1442 ..
1443 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1444
1445 let me = participants[0].clone();
1446 let (_application, mut actor, _processed_height) = setup_validator(
1447 context.with_label("validator_0"),
1448 &mut oracle,
1449 me,
1450 ConstantProvider::new(schemes[0].clone()),
1451 )
1452 .await;
1453
1454 let mut parent = Sha256::hash(b"");
1456 for i in 1..=5 {
1457 let block = B::new::<Sha256>(parent, i, i);
1458 let commitment = block.digest();
1459 let round = Round::new(Epoch::new(0), View::new(i));
1460 actor.verified(round, block.clone()).await;
1461 let proposal = Proposal {
1462 round,
1463 parent: View::new(i - 1),
1464 payload: commitment,
1465 };
1466 let finalization = make_finalization(proposal, &schemes, QUORUM);
1467 actor.report(Activity::Finalization(finalization)).await;
1468
1469 parent = block.digest();
1470 }
1471
1472 let (_, commitment) = actor.get_info(Identifier::Latest).await.unwrap();
1474 let ancestry = actor.ancestry((None, commitment)).await.unwrap();
1475 let blocks = ancestry.collect::<Vec<_>>().await;
1476
1477 assert_eq!(blocks.len(), 5);
1479 (0..5).for_each(|i| {
1480 assert_eq!(blocks[i].height(), 5 - i as u64);
1481 });
1482 })
1483 }
1484
1485 #[test_traced("WARN")]
1486 fn test_marshaled_rejects_invalid_ancestry() {
1487 #[derive(Clone)]
1488 struct MockVerifyingApp {
1489 genesis: B,
1490 }
1491
1492 impl crate::Application<deterministic::Context> for MockVerifyingApp {
1493 type Block = B;
1494 type Context = Context<D, K>;
1495 type SigningScheme = S;
1496
1497 async fn genesis(&mut self) -> Self::Block {
1498 self.genesis.clone()
1499 }
1500
1501 async fn propose(
1502 &mut self,
1503 _context: (deterministic::Context, Self::Context),
1504 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1505 ) -> Option<Self::Block> {
1506 None
1507 }
1508 }
1509
1510 impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
1511 async fn verify(
1512 &mut self,
1513 _context: (deterministic::Context, Self::Context),
1514 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1515 ) -> bool {
1516 true
1518 }
1519 }
1520
1521 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1522 runner.start(|mut context| async move {
1523 let mut oracle = setup_network(context.clone(), None);
1524 let Fixture {
1525 participants,
1526 schemes,
1527 ..
1528 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1529
1530 let me = participants[0].clone();
1531 let (_base_app, marshal, _processed_height) = setup_validator(
1532 context.with_label("validator_0"),
1533 &mut oracle,
1534 me.clone(),
1535 ConstantProvider::new(schemes[0].clone()),
1536 )
1537 .await;
1538
1539 let genesis = B::new::<Sha256>(Sha256::hash(b""), 0, 0);
1541
1542 let mock_app = MockVerifyingApp {
1544 genesis: genesis.clone(),
1545 };
1546 let mut marshaled = Marshaled::new(
1547 context.clone(),
1548 mock_app,
1549 marshal.clone(),
1550 FixedEpocher::new(BLOCKS_PER_EPOCH),
1551 );
1552
1553 let honest_parent =
1560 B::new::<Sha256>(genesis.commitment(), BLOCKS_PER_EPOCH.get() + 1, 1000);
1561 let parent_commitment = honest_parent.commitment();
1562 let parent_round = Round::new(Epoch::new(1), View::new(21));
1563 marshal
1564 .clone()
1565 .verified(parent_round, honest_parent.clone())
1566 .await;
1567
1568 let malicious_block =
1572 B::new::<Sha256>(parent_commitment, BLOCKS_PER_EPOCH.get() + 15, 2000);
1573 let malicious_commitment = malicious_block.commitment();
1574 marshal
1575 .clone()
1576 .proposed(
1577 Round::new(Epoch::new(1), View::new(35)),
1578 malicious_block.clone(),
1579 )
1580 .await;
1581
1582 context.sleep(Duration::from_millis(10)).await;
1584
1585 let byzantine_context = Context {
1588 round: Round::new(Epoch::new(1), View::new(35)),
1589 leader: me.clone(),
1590 parent: (View::new(21), parent_commitment), };
1592
1593 let verify = marshaled
1600 .verify(byzantine_context, malicious_commitment)
1601 .await;
1602
1603 assert!(
1604 !verify.await.unwrap(),
1605 "Byzantine block with non-contiguous heights should be rejected"
1606 );
1607
1608 let malicious_block =
1612 B::new::<Sha256>(genesis.commitment(), BLOCKS_PER_EPOCH.get() + 2, 3000);
1613 let malicious_commitment = malicious_block.commitment();
1614 marshal
1615 .clone()
1616 .proposed(
1617 Round::new(Epoch::new(1), View::new(22)),
1618 malicious_block.clone(),
1619 )
1620 .await;
1621
1622 context.sleep(Duration::from_millis(10)).await;
1624
1625 let byzantine_context = Context {
1628 round: Round::new(Epoch::new(1), View::new(22)),
1629 leader: me.clone(),
1630 parent: (View::new(21), parent_commitment), };
1632
1633 let verify = marshaled
1641 .verify(byzantine_context, malicious_commitment)
1642 .await;
1643
1644 assert!(
1645 !verify.await.unwrap(),
1646 "Byzantine block with mismatched parent commitment should be rejected"
1647 );
1648 })
1649 }
1650
1651 #[test_traced("WARN")]
1652 fn test_finalize_same_height_different_views() {
1653 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1654 runner.start(|mut context| async move {
1655 let mut oracle = setup_network(context.clone(), None);
1656 let Fixture {
1657 participants,
1658 schemes,
1659 ..
1660 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1661
1662 let mut actors = Vec::new();
1664 for (i, validator) in participants.iter().enumerate().take(2) {
1665 let (_app, actor, _processed_height) = setup_validator(
1666 context.with_label(&format!("validator_{i}")),
1667 &mut oracle,
1668 validator.clone(),
1669 ConstantProvider::new(schemes[i].clone()),
1670 )
1671 .await;
1672 actors.push(actor);
1673 }
1674
1675 let parent = Sha256::hash(b"");
1677 let block = B::new::<Sha256>(parent, 1, 1);
1678 let commitment = block.digest();
1679
1680 actors[0]
1682 .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
1683 .await;
1684 actors[1]
1685 .verified(Round::new(Epoch::new(0), View::new(1)), block.clone())
1686 .await;
1687
1688 let proposal_v1 = Proposal {
1690 round: Round::new(Epoch::new(0), View::new(1)),
1691 parent: View::new(0),
1692 payload: commitment,
1693 };
1694 let notarization_v1 = make_notarization(proposal_v1.clone(), &schemes, QUORUM);
1695 let finalization_v1 = make_finalization(proposal_v1.clone(), &schemes, QUORUM);
1696 actors[0]
1697 .report(Activity::Notarization(notarization_v1.clone()))
1698 .await;
1699 actors[0]
1700 .report(Activity::Finalization(finalization_v1.clone()))
1701 .await;
1702
1703 let proposal_v2 = Proposal {
1707 round: Round::new(Epoch::new(0), View::new(2)), parent: View::new(0),
1709 payload: commitment, };
1711 let notarization_v2 = make_notarization(proposal_v2.clone(), &schemes, QUORUM);
1712 let finalization_v2 = make_finalization(proposal_v2.clone(), &schemes, QUORUM);
1713 actors[1]
1714 .report(Activity::Notarization(notarization_v2.clone()))
1715 .await;
1716 actors[1]
1717 .report(Activity::Finalization(finalization_v2.clone()))
1718 .await;
1719
1720 context.sleep(Duration::from_millis(100)).await;
1722
1723 let block0 = actors[0].get_block(1).await.unwrap();
1725 let block1 = actors[1].get_block(1).await.unwrap();
1726 assert_eq!(block0, block);
1727 assert_eq!(block1, block);
1728
1729 let fin0 = actors[0].get_finalization(1).await.unwrap();
1731 let fin1 = actors[1].get_finalization(1).await.unwrap();
1732
1733 assert_eq!(fin0.proposal.payload, block.commitment());
1735 assert_eq!(fin0.round().view(), View::new(1));
1736 assert_eq!(fin1.proposal.payload, block.commitment());
1737 assert_eq!(fin1.round().view(), View::new(2));
1738
1739 assert_eq!(actors[0].get_info(1).await, Some((1, commitment)));
1741 assert_eq!(actors[1].get_info(1).await, Some((1, commitment)));
1742
1743 actors[0]
1746 .report(Activity::Finalization(finalization_v2.clone()))
1747 .await;
1748 actors[1]
1749 .report(Activity::Finalization(finalization_v1.clone()))
1750 .await;
1751 context.sleep(Duration::from_millis(100)).await;
1752
1753 let fin0_after = actors[0].get_finalization(1).await.unwrap();
1755 assert_eq!(fin0_after.round().view(), View::new(1));
1756
1757 let fin0_after = actors[1].get_finalization(1).await.unwrap();
1759 assert_eq!(fin0_after.round().view(), View::new(2));
1760 })
1761 }
1762
1763 #[test_traced("WARN")]
1764 fn test_init_processed_height() {
1765 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1766 runner.start(|mut context| async move {
1767 let mut oracle = setup_network(context.clone(), None);
1768 let Fixture {
1769 participants,
1770 schemes,
1771 ..
1772 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1773
1774 let me = participants[0].clone();
1776 let (application, mut actor, initial_height) = setup_validator(
1777 context.with_label("validator_0"),
1778 &mut oracle,
1779 me.clone(),
1780 ConstantProvider::new(schemes[0].clone()),
1781 )
1782 .await;
1783 assert_eq!(initial_height, 0);
1784
1785 let mut parent = Sha256::hash(b"");
1787 let mut blocks = Vec::new();
1788 for i in 1..=3 {
1789 let block = B::new::<Sha256>(parent, i, i);
1790 let commitment = block.digest();
1791 let round = Round::new(Epoch::new(0), View::new(i));
1792
1793 actor.verified(round, block.clone()).await;
1794 let proposal = Proposal {
1795 round,
1796 parent: View::new(i - 1),
1797 payload: commitment,
1798 };
1799 let finalization = make_finalization(proposal, &schemes, QUORUM);
1800 actor.report(Activity::Finalization(finalization)).await;
1801
1802 blocks.push(block);
1803 parent = commitment;
1804 }
1805
1806 while application.blocks().len() < 3 {
1808 context.sleep(Duration::from_millis(10)).await;
1809 }
1810
1811 actor.set_floor(3).await;
1813 context.sleep(Duration::from_millis(10)).await;
1814
1815 assert_eq!(application.blocks().len(), 3);
1817 assert_eq!(application.tip(), Some((3, blocks[2].digest())));
1818
1819 let (_restart_application, _restart_actor, restart_height) = setup_validator(
1821 context.with_label("validator_0_restart"),
1822 &mut oracle,
1823 me,
1824 ConstantProvider::new(schemes[0].clone()),
1825 )
1826 .await;
1827
1828 assert_eq!(restart_height, 3);
1829 })
1830 }
1831
1832 #[test_traced("WARN")]
1833 fn test_marshaled_rejects_unsupported_epoch() {
1834 #[derive(Clone)]
1835 struct MockVerifyingApp {
1836 genesis: B,
1837 }
1838
1839 impl crate::Application<deterministic::Context> for MockVerifyingApp {
1840 type Block = B;
1841 type Context = Context<D, K>;
1842 type SigningScheme = S;
1843
1844 async fn genesis(&mut self) -> Self::Block {
1845 self.genesis.clone()
1846 }
1847
1848 async fn propose(
1849 &mut self,
1850 _context: (deterministic::Context, Self::Context),
1851 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1852 ) -> Option<Self::Block> {
1853 None
1854 }
1855 }
1856
1857 impl VerifyingApplication<deterministic::Context> for MockVerifyingApp {
1858 async fn verify(
1859 &mut self,
1860 _context: (deterministic::Context, Self::Context),
1861 _ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
1862 ) -> bool {
1863 true
1864 }
1865 }
1866
1867 #[derive(Clone)]
1868 struct LimitedEpocher {
1869 inner: FixedEpocher,
1870 max_epoch: u64,
1871 }
1872
1873 impl Epocher for LimitedEpocher {
1874 fn containing(&self, height: u64) -> Option<crate::types::EpochInfo> {
1875 let bounds = self.inner.containing(height)?;
1876 if bounds.epoch().get() > self.max_epoch {
1877 None
1878 } else {
1879 Some(bounds)
1880 }
1881 }
1882
1883 fn first(&self, epoch: Epoch) -> Option<u64> {
1884 if epoch.get() > self.max_epoch {
1885 None
1886 } else {
1887 self.inner.first(epoch)
1888 }
1889 }
1890
1891 fn last(&self, epoch: Epoch) -> Option<u64> {
1892 if epoch.get() > self.max_epoch {
1893 None
1894 } else {
1895 self.inner.last(epoch)
1896 }
1897 }
1898 }
1899
1900 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1901 runner.start(|mut context| async move {
1902 let mut oracle = setup_network(context.clone(), None);
1903 let Fixture {
1904 participants,
1905 schemes,
1906 ..
1907 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1908
1909 let me = participants[0].clone();
1910 let (_base_app, marshal, _processed_height) = setup_validator(
1911 context.with_label("validator_0"),
1912 &mut oracle,
1913 me.clone(),
1914 ConstantProvider::new(schemes[0].clone()),
1915 )
1916 .await;
1917
1918 let genesis = B::new::<Sha256>(Sha256::hash(b""), 0, 0);
1919
1920 let mock_app = MockVerifyingApp {
1921 genesis: genesis.clone(),
1922 };
1923 let limited_epocher = LimitedEpocher {
1924 inner: FixedEpocher::new(BLOCKS_PER_EPOCH),
1925 max_epoch: 0,
1926 };
1927 let mut marshaled =
1928 Marshaled::new(context.clone(), mock_app, marshal.clone(), limited_epocher);
1929
1930 let parent = B::new::<Sha256>(genesis.commitment(), 19, 1000);
1932 let parent_commitment = parent.commitment();
1933 let parent_round = Round::new(Epoch::new(0), View::new(19));
1934 marshal.clone().verified(parent_round, parent).await;
1935
1936 let block = B::new::<Sha256>(parent_commitment, 20, 2000);
1938 let block_commitment = block.commitment();
1939 marshal
1940 .clone()
1941 .proposed(Round::new(Epoch::new(1), View::new(20)), block)
1942 .await;
1943
1944 context.sleep(Duration::from_millis(10)).await;
1945
1946 let unsupported_context = Context {
1947 round: Round::new(Epoch::new(1), View::new(20)),
1948 leader: me.clone(),
1949 parent: (View::new(19), parent_commitment),
1950 };
1951
1952 let verify = marshaled
1953 .verify(unsupported_context, block_commitment)
1954 .await;
1955
1956 assert!(
1957 !verify.await.unwrap(),
1958 "Block in unsupported epoch should be rejected"
1959 );
1960 })
1961 }
1962
1963 #[test_traced("INFO")]
1964 fn test_broadcast_caches_block() {
1965 let runner = deterministic::Runner::timed(Duration::from_secs(60));
1966 runner.start(|mut context| async move {
1967 let mut oracle = setup_network(context.clone(), None);
1968 let Fixture {
1969 participants,
1970 schemes,
1971 ..
1972 } = bls12381_threshold::fixture::<V, _>(&mut context, NUM_VALIDATORS);
1973
1974 let (i, validator) = participants.iter().enumerate().next().unwrap();
1976 let mut actor = setup_validator(
1977 context.with_label(&format!("validator_{i}")),
1978 &mut oracle,
1979 validator.clone(),
1980 ConstantProvider::new(schemes[i].clone()),
1981 )
1982 .await
1983 .1;
1984
1985 let parent = Sha256::hash(b"");
1987 let block = B::new::<Sha256>(parent, 1, 1);
1988 let commitment = block.digest();
1989
1990 actor
1992 .proposed(Round::new(Epoch::new(0), View::new(1)), block.clone())
1993 .await;
1994
1995 actor
1998 .get_block(&commitment)
1999 .await
2000 .expect("block should be cached after broadcast");
2001
2002 let mut actor = setup_validator(
2004 context.with_label(&format!("validator_{i}")),
2005 &mut oracle,
2006 validator.clone(),
2007 ConstantProvider::new(schemes[i].clone()),
2008 )
2009 .await
2010 .1;
2011
2012 let notarization = make_notarization(
2016 Proposal {
2017 round: Round::new(Epoch::new(0), View::new(1)),
2018 parent: View::new(0),
2019 payload: commitment,
2020 },
2021 &schemes,
2022 QUORUM,
2023 );
2024 actor.report(Activity::Notarization(notarization)).await;
2025
2026 let fetched = actor
2028 .get_block(&commitment)
2029 .await
2030 .expect("block should be cached after broadcast");
2031 assert_eq!(fetched, block);
2032 });
2033 }
2034}