1pub mod actor;
59pub use actor::Actor;
60pub mod config;
61pub use config::Config;
62pub mod finalizer;
63pub use finalizer::Finalizer;
64pub mod ingress;
65pub use ingress::mailbox::Mailbox;
66
67#[cfg(test)]
68pub mod mocks;
69
70#[cfg(test)]
71mod tests {
72 use super::{
73 actor,
74 config::Config,
75 mocks::{application::Application, block::Block},
76 };
77 use crate::{
78 threshold_simplex::types::{
79 finalize_namespace, notarize_namespace, seed_namespace, view_message, Activity,
80 Finalization, Notarization, Proposal,
81 },
82 Block as _, Reporter,
83 };
84 use commonware_broadcast::buffered;
85 use commonware_codec::Encode;
86 use commonware_cryptography::{
87 bls12381::{
88 dkg::ops::generate_shares,
89 primitives::{
90 group::Share,
91 ops::{partial_sign_message, threshold_signature_recover},
92 poly,
93 variant::{MinPk, Variant},
94 },
95 },
96 ed25519::{PrivateKey, PublicKey},
97 sha256::{Digest as Sha256Digest, Sha256},
98 Digestible, Hasher as _, PrivateKeyExt as _, Signer as _,
99 };
100 use commonware_macros::test_traced;
101 use commonware_p2p::simulated::{self, Link, Network, Oracle};
102 use commonware_resolver::p2p as resolver;
103 use commonware_runtime::{buffer::PoolRef, deterministic, Clock, Metrics, Runner};
104 use commonware_utils::{NZUsize, NZU64};
105 use governor::Quota;
106 use rand::{seq::SliceRandom, Rng};
107 use std::{
108 collections::BTreeMap,
109 num::{NonZeroU32, NonZeroUsize},
110 time::Duration,
111 };
112
113 type D = Sha256Digest;
114 type B = Block<D>;
115 type P = PublicKey;
116 type V = MinPk;
117 type Sh = Share;
118 type E = PrivateKey;
119
120 const PAGE_SIZE: NonZeroUsize = NZUsize!(1024);
121 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
122
123 const NAMESPACE: &[u8] = b"test";
124 const NUM_VALIDATORS: u32 = 4;
125 const QUORUM: u32 = 3;
126 const NUM_BLOCKS: u64 = 100;
127
128 async fn setup_validator(
129 context: deterministic::Context,
130 oracle: &mut Oracle<P>,
131 coordinator: resolver::mocks::Coordinator<P>,
132 secret: E,
133 identity: <V as Variant>::Public,
134 ) -> (
135 Application<B>,
136 crate::marshal::ingress::mailbox::Mailbox<V, B>,
137 ) {
138 let config = Config {
139 public_key: secret.public_key(),
140 identity,
141 coordinator,
142 mailbox_size: 100,
143 backfill_quota: Quota::per_second(NonZeroU32::new(5).unwrap()),
144 namespace: NAMESPACE.to_vec(),
145 view_retention_timeout: 10,
146 max_repair: 10,
147 codec_config: (),
148 partition_prefix: format!("validator-{}", secret.public_key()),
149 prunable_items_per_section: NZU64!(10),
150 replay_buffer: NZUsize!(1024),
151 write_buffer: NZUsize!(1024),
152 freezer_table_initial_size: 64,
153 freezer_table_resize_frequency: 10,
154 freezer_table_resize_chunk_size: 10,
155 freezer_journal_target_size: 1024,
156 freezer_journal_compression: None,
157 freezer_journal_buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
158 immutable_items_per_section: NZU64!(10),
159 };
160
161 let (actor, mailbox) = actor::Actor::init(context.clone(), config).await;
162 let application = Application::<B>::default();
163
164 let broadcast_config = buffered::Config {
166 public_key: secret.public_key(),
167 mailbox_size: 100,
168 deque_size: 10,
169 priority: false,
170 codec_config: (),
171 };
172 let (broadcast_engine, buffer) = buffered::Engine::new(context.clone(), broadcast_config);
173 let network = oracle.register(secret.public_key(), 1).await.unwrap();
174 broadcast_engine.start(network);
175
176 let backfill = oracle.register(secret.public_key(), 2).await.unwrap();
178 actor.start(application.clone(), buffer, backfill);
179
180 (application, mailbox)
181 }
182
183 fn make_finalization(proposal: Proposal<D>, shares: &[Sh], quorum: u32) -> Finalization<V, D> {
184 let proposal_msg = proposal.encode();
185
186 let proposal_partials: Vec<_> = shares
188 .iter()
189 .take(quorum as usize)
190 .map(|s| {
191 partial_sign_message::<V>(s, Some(&finalize_namespace(NAMESPACE)), &proposal_msg)
192 })
193 .collect();
194 let proposal_signature =
195 threshold_signature_recover::<V, _>(quorum, &proposal_partials).unwrap();
196
197 let seed_msg = view_message(proposal.view);
199 let seed_partials: Vec<_> = shares
200 .iter()
201 .take(quorum as usize)
202 .map(|s| partial_sign_message::<V>(s, Some(&seed_namespace(NAMESPACE)), &seed_msg))
203 .collect();
204 let seed_signature = threshold_signature_recover::<V, _>(quorum, &seed_partials).unwrap();
205
206 Finalization {
207 proposal,
208 proposal_signature,
209 seed_signature,
210 }
211 }
212
213 fn make_notarization(proposal: Proposal<D>, shares: &[Sh], quorum: u32) -> Notarization<V, D> {
214 let proposal_msg = proposal.encode();
215
216 let proposal_partials: Vec<_> = shares
218 .iter()
219 .take(quorum as usize)
220 .map(|s| {
221 partial_sign_message::<V>(s, Some(¬arize_namespace(NAMESPACE)), &proposal_msg)
222 })
223 .collect();
224 let proposal_signature =
225 threshold_signature_recover::<V, _>(quorum, &proposal_partials).unwrap();
226
227 let seed_msg = view_message(proposal.view);
229 let seed_partials: Vec<_> = shares
230 .iter()
231 .take(quorum as usize)
232 .map(|s| partial_sign_message::<V>(s, Some(&seed_namespace(NAMESPACE)), &seed_msg))
233 .collect();
234 let seed_signature = threshold_signature_recover::<V, _>(quorum, &seed_partials).unwrap();
235
236 Notarization {
237 proposal,
238 proposal_signature,
239 seed_signature,
240 }
241 }
242
243 fn setup_network(context: deterministic::Context) -> Oracle<P> {
244 let (network, oracle) = Network::new(
245 context.with_label("network"),
246 simulated::Config {
247 max_size: 1024 * 1024,
248 },
249 );
250 network.start();
251 oracle
252 }
253
254 fn setup_validators_and_shares(
255 context: &mut deterministic::Context,
256 ) -> (Vec<E>, Vec<P>, <V as Variant>::Public, Vec<Sh>) {
257 let mut schemes = (0..NUM_VALIDATORS)
258 .map(|i| PrivateKey::from_seed(i as u64))
259 .collect::<Vec<_>>();
260 schemes.sort_by_key(|s| s.public_key());
261 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
262
263 let (identity, shares) = generate_shares::<_, V>(context, None, NUM_VALIDATORS, QUORUM);
264 let identity = *poly::public::<V>(&identity);
265
266 (schemes, peers, identity, shares)
267 }
268
269 async fn setup_network_links(oracle: &mut Oracle<P>, peers: &[P], link: Link) {
270 for p1 in peers.iter() {
271 for p2 in peers.iter() {
272 if p2 == p1 {
273 continue;
274 }
275 oracle
276 .add_link(p1.clone(), p2.clone(), link.clone())
277 .await
278 .unwrap();
279 }
280 }
281 }
282
283 #[test_traced("WARN")]
284 fn test_finalize_good_links() {
285 let link = Link {
286 latency: Duration::from_millis(100),
287 jitter: Duration::from_millis(1),
288 success_rate: 1.0,
289 };
290 for seed in 0..5 {
291 let result1 = finalize(seed, link.clone());
292 let result2 = finalize(seed, link.clone());
293
294 assert_eq!(result1, result2);
296 }
297 }
298
299 #[test_traced("WARN")]
300 fn test_finalize_bad_links() {
301 let link = Link {
302 latency: Duration::from_millis(200),
303 jitter: Duration::from_millis(50),
304 success_rate: 0.7,
305 };
306 for seed in 0..5 {
307 let result1 = finalize(seed, link.clone());
308 let result2 = finalize(seed, link.clone());
309
310 assert_eq!(result1, result2);
312 }
313 }
314
315 fn finalize(seed: u64, link: Link) -> String {
316 let runner = deterministic::Runner::new(
317 deterministic::Config::new()
318 .with_seed(seed)
319 .with_timeout(Some(Duration::from_secs(300))),
320 );
321 runner.start(|mut context| async move {
322 let mut oracle = setup_network(context.clone());
323 let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
324
325 let mut applications = BTreeMap::new();
327 let mut actors = Vec::new();
328 let coordinator = resolver::mocks::Coordinator::new(peers.clone());
329
330 for (i, secret) in schemes.iter().enumerate() {
331 let (application, actor) = setup_validator(
332 context.with_label(&format!("validator-{i}")),
333 &mut oracle,
334 coordinator.clone(),
335 secret.clone(),
336 identity,
337 )
338 .await;
339 applications.insert(peers[i].clone(), application);
340 actors.push(actor);
341 }
342
343 setup_network_links(&mut oracle, &peers, link.clone()).await;
345
346 let mut blocks = Vec::<B>::new();
348 let mut parent = Sha256::hash(b"");
349 for i in 1..=NUM_BLOCKS {
350 let block = B::new::<Sha256>(parent, i, i);
351 parent = block.digest();
352 blocks.push(block);
353 }
354
355 blocks.shuffle(&mut context);
357 for block in blocks.iter() {
358 let height = block.height();
360 assert!(height > 0, "genesis block should not have been generated");
361
362 let actor_index: usize = (height % (NUM_VALIDATORS as u64)) as usize;
364 let mut actor = actors[actor_index].clone();
365 actor.broadcast(block.clone()).await;
366 actor.verified(height, block.clone()).await;
367
368 context.sleep(link.latency).await;
371
372 let proposal = Proposal {
374 view: height,
375 parent: height.checked_sub(1).unwrap(),
376 payload: block.digest(),
377 };
378 let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
379 actor
380 .report(Activity::Notarization(notarization.clone()))
381 .await;
382
383 let fin = make_finalization(proposal, &shares, QUORUM);
385 for actor in actors.iter_mut() {
386 if height == NUM_BLOCKS || context.gen_bool(0.2) {
388 actor.report(Activity::Finalization(fin.clone())).await;
389 }
390 }
391 }
392
393 let mut finished = false;
395 while !finished {
396 context.sleep(Duration::from_secs(1)).await;
398
399 if applications.len() != NUM_VALIDATORS as usize {
401 continue;
402 }
403 finished = true;
404 for app in applications.values() {
405 if app.blocks().len() != NUM_BLOCKS as usize {
406 finished = false;
407 break;
408 }
409 }
410 }
411
412 context.auditor().state()
414 })
415 }
416
417 #[test_traced("WARN")]
418 fn test_subscribe_basic_block_delivery() {
419 let runner = deterministic::Runner::timed(Duration::from_secs(60));
420 runner.start(|mut context| async move {
421 let mut oracle = setup_network(context.clone());
422 let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
423 let coordinator = resolver::mocks::Coordinator::new(peers.clone());
424
425 let mut actors = Vec::new();
426 for (i, secret) in schemes.iter().enumerate() {
427 let (_application, actor) = setup_validator(
428 context.with_label(&format!("validator-{i}")),
429 &mut oracle,
430 coordinator.clone(),
431 secret.clone(),
432 identity,
433 )
434 .await;
435 actors.push(actor);
436 }
437 let mut actor = actors[0].clone();
438
439 let link = Link {
440 latency: Duration::from_millis(10),
441 jitter: Duration::from_millis(1),
442 success_rate: 1.0,
443 };
444 setup_network_links(&mut oracle, &peers, link).await;
445
446 let parent = Sha256::hash(b"");
447 let block = B::new::<Sha256>(parent, 1, 1);
448 let commitment = block.digest();
449
450 let subscription_rx = actor.subscribe(Some(1), commitment).await;
451
452 actor.verified(1, block.clone()).await;
453
454 let proposal = Proposal {
455 view: 1,
456 parent: 0,
457 payload: commitment,
458 };
459 let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
460 actor.report(Activity::Notarization(notarization)).await;
461
462 let finalization = make_finalization(proposal, &shares, QUORUM);
463 actor.report(Activity::Finalization(finalization)).await;
464
465 let received_block = subscription_rx.await.unwrap();
466 assert_eq!(received_block.digest(), block.digest());
467 assert_eq!(received_block.height(), 1);
468 })
469 }
470
471 #[test_traced("WARN")]
472 fn test_subscribe_multiple_subscriptions() {
473 let runner = deterministic::Runner::timed(Duration::from_secs(60));
474 runner.start(|mut context| async move {
475 let mut oracle = setup_network(context.clone());
476 let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
477 let coordinator = resolver::mocks::Coordinator::new(peers.clone());
478
479 let mut actors = Vec::new();
480 for (i, secret) in schemes.iter().enumerate() {
481 let (_application, actor) = setup_validator(
482 context.with_label(&format!("validator-{i}")),
483 &mut oracle,
484 coordinator.clone(),
485 secret.clone(),
486 identity,
487 )
488 .await;
489 actors.push(actor);
490 }
491 let mut actor = actors[0].clone();
492
493 let link = Link {
494 latency: Duration::from_millis(10),
495 jitter: Duration::from_millis(1),
496 success_rate: 1.0,
497 };
498 setup_network_links(&mut oracle, &peers, link).await;
499
500 let parent = Sha256::hash(b"");
501 let block1 = B::new::<Sha256>(parent, 1, 1);
502 let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
503 let commitment1 = block1.digest();
504 let commitment2 = block2.digest();
505
506 let sub1_rx = actor.subscribe(Some(1), commitment1).await;
507 let sub2_rx = actor.subscribe(Some(2), commitment2).await;
508 let sub3_rx = actor.subscribe(Some(1), commitment1).await;
509
510 actor.verified(1, block1.clone()).await;
511 actor.verified(2, block2.clone()).await;
512
513 for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
514 let proposal = Proposal {
515 view,
516 parent: view.checked_sub(1).unwrap(),
517 payload: block.digest(),
518 };
519 let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
520 actor.report(Activity::Notarization(notarization)).await;
521
522 let finalization = make_finalization(proposal, &shares, QUORUM);
523 actor.report(Activity::Finalization(finalization)).await;
524 }
525
526 let received1_sub1 = sub1_rx.await.unwrap();
527 let received2 = sub2_rx.await.unwrap();
528 let received1_sub3 = sub3_rx.await.unwrap();
529
530 assert_eq!(received1_sub1.digest(), block1.digest());
531 assert_eq!(received2.digest(), block2.digest());
532 assert_eq!(received1_sub3.digest(), block1.digest());
533 assert_eq!(received1_sub1.height(), 1);
534 assert_eq!(received2.height(), 2);
535 assert_eq!(received1_sub3.height(), 1);
536 })
537 }
538
539 #[test_traced("WARN")]
540 fn test_subscribe_canceled_subscriptions() {
541 let runner = deterministic::Runner::timed(Duration::from_secs(60));
542 runner.start(|mut context| async move {
543 let mut oracle = setup_network(context.clone());
544 let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
545 let coordinator = resolver::mocks::Coordinator::new(peers.clone());
546
547 let mut actors = Vec::new();
548 for (i, secret) in schemes.iter().enumerate() {
549 let (_application, actor) = setup_validator(
550 context.with_label(&format!("validator-{i}")),
551 &mut oracle,
552 coordinator.clone(),
553 secret.clone(),
554 identity,
555 )
556 .await;
557 actors.push(actor);
558 }
559 let mut actor = actors[0].clone();
560
561 let link = Link {
562 latency: Duration::from_millis(10),
563 jitter: Duration::from_millis(1),
564 success_rate: 1.0,
565 };
566 setup_network_links(&mut oracle, &peers, link).await;
567
568 let parent = Sha256::hash(b"");
569 let block1 = B::new::<Sha256>(parent, 1, 1);
570 let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
571 let commitment1 = block1.digest();
572 let commitment2 = block2.digest();
573
574 let sub1_rx = actor.subscribe(Some(1), commitment1).await;
575 let sub2_rx = actor.subscribe(Some(2), commitment2).await;
576
577 drop(sub1_rx);
578
579 actor.verified(1, block1.clone()).await;
580 actor.verified(2, block2.clone()).await;
581
582 for (view, block) in [(1, block1.clone()), (2, block2.clone())] {
583 let proposal = Proposal {
584 view,
585 parent: view.checked_sub(1).unwrap(),
586 payload: block.digest(),
587 };
588 let notarization = make_notarization(proposal.clone(), &shares, QUORUM);
589 actor.report(Activity::Notarization(notarization)).await;
590
591 let finalization = make_finalization(proposal, &shares, QUORUM);
592 actor.report(Activity::Finalization(finalization)).await;
593 }
594
595 let received2 = sub2_rx.await.unwrap();
596 assert_eq!(received2.digest(), block2.digest());
597 assert_eq!(received2.height(), 2);
598 })
599 }
600
601 #[test_traced("WARN")]
602 fn test_subscribe_blocks_from_different_sources() {
603 let runner = deterministic::Runner::timed(Duration::from_secs(60));
604 runner.start(|mut context| async move {
605 let mut oracle = setup_network(context.clone());
606 let (schemes, peers, identity, shares) = setup_validators_and_shares(&mut context);
607 let coordinator = resolver::mocks::Coordinator::new(peers.clone());
608
609 let mut actors = Vec::new();
610 for (i, secret) in schemes.iter().enumerate() {
611 let (_application, actor) = setup_validator(
612 context.with_label(&format!("validator-{i}")),
613 &mut oracle,
614 coordinator.clone(),
615 secret.clone(),
616 identity,
617 )
618 .await;
619 actors.push(actor);
620 }
621 let mut actor = actors[0].clone();
622
623 let link = Link {
624 latency: Duration::from_millis(10),
625 jitter: Duration::from_millis(1),
626 success_rate: 1.0,
627 };
628 setup_network_links(&mut oracle, &peers, link).await;
629
630 let parent = Sha256::hash(b"");
631 let block1 = B::new::<Sha256>(parent, 1, 1);
632 let block2 = B::new::<Sha256>(block1.digest(), 2, 2);
633 let block3 = B::new::<Sha256>(block2.digest(), 3, 3);
634 let block4 = B::new::<Sha256>(block3.digest(), 4, 4);
635 let block5 = B::new::<Sha256>(block4.digest(), 5, 5);
636
637 let sub1_rx = actor.subscribe(Some(1), block1.digest()).await;
638 let sub2_rx = actor.subscribe(Some(2), block2.digest()).await;
639 let sub3_rx = actor.subscribe(Some(3), block3.digest()).await;
640 let sub4_rx = actor.subscribe(Some(4), block4.digest()).await;
641 let sub5_rx = actor.subscribe(Some(5), block5.digest()).await;
642
643 actor.broadcast(block1.clone()).await;
645 context.sleep(Duration::from_millis(20)).await;
646
647 let received1 = sub1_rx.await.unwrap();
649 assert_eq!(received1.digest(), block1.digest());
650 assert_eq!(received1.height(), 1);
651
652 actor.verified(2, block2.clone()).await;
654
655 let received2 = sub2_rx.await.unwrap();
657 assert_eq!(received2.digest(), block2.digest());
658 assert_eq!(received2.height(), 2);
659
660 let proposal3 = Proposal {
662 view: 3,
663 parent: 2,
664 payload: block3.digest(),
665 };
666 let notarization3 = make_notarization(proposal3.clone(), &shares, QUORUM);
667 actor.report(Activity::Notarization(notarization3)).await;
668 actor.verified(3, block3.clone()).await;
669
670 let received3 = sub3_rx.await.unwrap();
672 assert_eq!(received3.digest(), block3.digest());
673 assert_eq!(received3.height(), 3);
674
675 let finalization4 = make_finalization(
677 Proposal {
678 view: 4,
679 parent: 3,
680 payload: block4.digest(),
681 },
682 &shares,
683 QUORUM,
684 );
685 actor.report(Activity::Finalization(finalization4)).await;
686 actor.verified(4, block4.clone()).await;
687
688 let received4 = sub4_rx.await.unwrap();
690 assert_eq!(received4.digest(), block4.digest());
691 assert_eq!(received4.height(), 4);
692
693 let remote_actor = &mut actors[1].clone();
695 remote_actor.broadcast(block5.clone()).await;
696 context.sleep(Duration::from_millis(20)).await;
697
698 let received5 = sub5_rx.await.unwrap();
700 assert_eq!(received5.digest(), block5.digest());
701 assert_eq!(received5.height(), 5);
702 })
703 }
704}