1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, net::SocketAddr};
3
4pub mod aggregator;
5pub mod application;
6pub mod engine;
7pub mod indexer;
8pub mod seeder;
9pub mod supervisor;
10
11#[derive(Deserialize, Serialize)]
13pub struct Config {
14 pub private_key: String,
15 pub share: String,
16 pub polynomial: String,
17
18 pub port: u16,
19 pub metrics_port: u16,
20 pub directory: String,
21 pub worker_threads: usize,
22 pub log_level: String,
23
24 pub allowed_peers: Vec<String>,
25 pub bootstrappers: Vec<String>,
26
27 pub message_backlog: usize,
28 pub mailbox_size: usize,
29 pub deque_size: usize,
30
31 pub indexer: String,
32 pub execution_concurrency: usize,
33}
34
35#[derive(Deserialize, Serialize)]
39pub struct Peers {
40 pub addresses: HashMap<String, SocketAddr>,
41}
42
43#[cfg(test)]
44mod tests {
45 use super::*;
46 use battleware_types::execution::{Instruction, Transaction};
47 use commonware_cryptography::{
48 bls12381::{
49 dkg::ops,
50 primitives::{poly::public, variant::MinSig},
51 },
52 ed25519::{PrivateKey, PublicKey},
53 PrivateKeyExt, Signer,
54 };
55 use commonware_macros::{select, test_traced};
56 use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
57 use commonware_runtime::{
58 deterministic::{self, Runner},
59 Clock, Metrics, Runner as _, Spawner,
60 };
61 use commonware_utils::{quorum, NZUsize};
62 use engine::{Config, Engine};
63 use governor::Quota;
64 use indexer::Mock;
65 use rand::{rngs::StdRng, Rng, SeedableRng};
66 use std::{
67 collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
68 num::{NonZeroU32, NonZeroUsize},
69 time::Duration,
70 };
71 use tracing::info;
72
73 const FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(14); const BUFFER_POOL_PAGE_SIZE: NonZeroUsize = NZUsize!(4_096);
79
80 const BUFFER_POOL_CAPACITY: NonZeroUsize = NZUsize!(1024 * 1024);
82
83 async fn register_validators(
85 oracle: &mut Oracle<PublicKey>,
86 validators: &[PublicKey],
87 ) -> HashMap<
88 PublicKey,
89 (
90 (Sender<PublicKey>, Receiver<PublicKey>),
91 (Sender<PublicKey>, Receiver<PublicKey>),
92 (Sender<PublicKey>, Receiver<PublicKey>),
93 (Sender<PublicKey>, Receiver<PublicKey>),
94 (Sender<PublicKey>, Receiver<PublicKey>),
95 (Sender<PublicKey>, Receiver<PublicKey>),
96 (Sender<PublicKey>, Receiver<PublicKey>),
97 (Sender<PublicKey>, Receiver<PublicKey>),
98 ),
99 > {
100 let mut registrations = HashMap::new();
101 for validator in validators.iter() {
102 let (pending_sender, pending_receiver) =
103 oracle.register(validator.clone(), 0).await.unwrap();
104 let (recovered_sender, recovered_receiver) =
105 oracle.register(validator.clone(), 1).await.unwrap();
106 let (resolver_sender, resolver_receiver) =
107 oracle.register(validator.clone(), 2).await.unwrap();
108 let (broadcast_sender, broadcast_receiver) =
109 oracle.register(validator.clone(), 3).await.unwrap();
110 let (backfill_sender, backfill_receiver) =
111 oracle.register(validator.clone(), 4).await.unwrap();
112 let (seeder_sender, seeder_receiver) =
113 oracle.register(validator.clone(), 5).await.unwrap();
114 let (aggregator_sender, aggregator_receiver) =
115 oracle.register(validator.clone(), 6).await.unwrap();
116 let (aggregation_sender, aggregation_receiver) =
117 oracle.register(validator.clone(), 7).await.unwrap();
118 registrations.insert(
119 validator.clone(),
120 (
121 (pending_sender, pending_receiver),
122 (recovered_sender, recovered_receiver),
123 (resolver_sender, resolver_receiver),
124 (broadcast_sender, broadcast_receiver),
125 (backfill_sender, backfill_receiver),
126 (seeder_sender, seeder_receiver),
127 (aggregator_sender, aggregator_receiver),
128 (aggregation_sender, aggregation_receiver),
129 ),
130 );
131 }
132 registrations
133 }
134
135 async fn link_validators(
141 oracle: &mut Oracle<PublicKey>,
142 validators: &[PublicKey],
143 link: Link,
144 restrict_to: Option<fn(usize, usize, usize) -> bool>,
145 ) {
146 for (i1, v1) in validators.iter().enumerate() {
147 for (i2, v2) in validators.iter().enumerate() {
148 if v2 == v1 {
150 continue;
151 }
152
153 if let Some(f) = restrict_to {
155 if !f(validators.len(), i1, i2) {
156 continue;
157 }
158 }
159
160 oracle
162 .add_link(v1.clone(), v2.clone(), link.clone())
163 .await
164 .unwrap();
165 }
166 }
167 }
168
169 fn all_online(n: u32, seed: u64, link: Link, required: u64) -> String {
170 let threshold = quorum(n);
172 let cfg = deterministic::Config::default().with_seed(seed);
173 let executor = Runner::from(cfg);
174 executor.start(|mut context| async move {
175 let (network, mut oracle) = Network::new(
177 context.with_label("network"),
178 simulated::Config {
179 max_size: 1024 * 1024,
180 },
181 );
182
183 network.start();
185
186 let mut signers = Vec::new();
188 let mut validators = Vec::new();
189 for i in 0..n {
190 let signer = PrivateKey::from_seed(i as u64);
191 let pk = signer.public_key();
192 signers.push(signer);
193 validators.push(pk);
194 }
195 validators.sort();
196 signers.sort_by_key(|s| s.public_key());
197 let mut registrations = register_validators(&mut oracle, &validators).await;
198
199 link_validators(&mut oracle, &validators, link, None).await;
201
202 let (polynomial, shares) =
204 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
205 let identity = *public::<MinSig>(&polynomial);
206
207 let indexer = Mock::new(identity);
209
210 let mut public_keys = HashSet::new();
212 for (idx, signer) in signers.into_iter().enumerate() {
213 let public_key = signer.public_key();
215 public_keys.insert(public_key.clone());
216
217 let uid = format!("validator-{public_key}");
219 let config: Config<_, Mock> = engine::Config {
220 blocker: oracle.control(public_key.clone()),
221 partition_prefix: uid.clone(),
222 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
223 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
224 signer,
225 polynomial: polynomial.clone(),
226 share: shares[idx].clone(),
227 participants: validators.clone(),
228 mailbox_size: 1024,
229 deque_size: 10,
230 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
231 leader_timeout: Duration::from_secs(1),
232 notarization_timeout: Duration::from_secs(2),
233 nullify_retry: Duration::from_secs(10),
234 fetch_timeout: Duration::from_secs(1),
235 activity_timeout: 10,
236 skip_timeout: 5,
237 max_fetch_count: 10,
238 max_fetch_size: 1024 * 512,
239 fetch_concurrent: 10,
240 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
241 buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
242 buffer_pool_capacity: BUFFER_POOL_CAPACITY,
243 indexer: indexer.clone(),
244 execution_concurrency: 2,
245 max_uploads_outstanding: 4,
246 };
247 let engine = Engine::new(context.with_label(&uid), config).await;
248
249 let (
251 pending,
252 recovered,
253 resolver,
254 broadcast,
255 backfill,
256 seeder,
257 aggregator,
258 aggregation,
259 ) = registrations.remove(&public_key).unwrap();
260
261 engine.start(
263 pending,
264 recovered,
265 resolver,
266 broadcast,
267 backfill,
268 seeder,
269 aggregator,
270 aggregation,
271 );
272 }
273
274 loop {
276 let metrics = context.encode();
277
278 let mut success = 0;
280 for line in metrics.lines() {
281 if !line.starts_with("validator-") {
283 continue;
284 }
285
286 let mut parts = line.split_whitespace();
288 let metric = parts.next().unwrap();
289 let value = parts.next().unwrap();
290
291 if metric.ends_with("_peers_blocked") {
293 let value = value.parse::<u64>().unwrap();
294 assert_eq!(value, 0);
295 }
296
297 if metric.ends_with("_certificates_processed") {
299 let value = value.parse::<u64>().unwrap();
300 if value >= required {
301 success += 1;
302 }
303 }
304 }
305 if success == n {
306 break;
307 }
308
309 context.sleep(Duration::from_secs(1)).await;
311 }
312
313 loop {
315 let contains_seeds = {
316 let mut contains_seeds = true;
317 let seeds = indexer.seeds.lock().unwrap();
318 for i in 1..=required {
319 if !seeds.contains_key(&i) {
320 contains_seeds = false;
321 break;
322 }
323 }
324 contains_seeds
325 };
326 let contains_summaries = {
327 let summaries = indexer.summaries.write().await;
328 let seen_summaries = summaries.iter().map(|(i, _)| *i).collect::<HashSet<_>>();
329 let mut contains_summaries = true;
330 for i in 1..=required {
331 if !seen_summaries.contains(&i) {
332 contains_summaries = false;
333 break;
334 }
335 }
336 contains_summaries
337 };
338
339 if contains_seeds && contains_summaries {
341 break;
342 }
343
344 context.sleep(Duration::from_millis(10)).await;
346 }
347
348 context.auditor().state()
349 })
350 }
351
352 #[test_traced("INFO")]
353 fn test_good_links() {
354 let link = Link {
355 latency: Duration::from_millis(10),
356 jitter: Duration::from_millis(1),
357 success_rate: 1.0,
358 };
359 for seed in 0..5 {
360 let state = all_online(5, seed, link.clone(), 25);
361 assert_eq!(state, all_online(5, seed, link.clone(), 25));
362 }
363 }
364
365 #[test_traced("INFO")]
366 fn test_bad_links() {
367 let link = Link {
368 latency: Duration::from_millis(200),
369 jitter: Duration::from_millis(150),
370 success_rate: 0.75,
371 };
372 for seed in 0..5 {
373 let state = all_online(5, seed, link.clone(), 25);
374 assert_eq!(state, all_online(5, seed, link.clone(), 25));
375 }
376 }
377
378 #[test_traced("INFO")]
379 fn test_1k() {
380 let link = Link {
381 latency: Duration::from_millis(80),
382 jitter: Duration::from_millis(10),
383 success_rate: 0.98,
384 };
385 all_online(10, 0, link.clone(), 1000);
386 }
387
388 #[test_traced("INFO")]
389 fn test_backfill() {
390 let n = 5;
392 let threshold = quorum(n);
393 let initial_container_required = 10;
394 let final_container_required = 20;
395 let executor = Runner::timed(Duration::from_secs(30));
396 executor.start(|mut context| async move {
397 let (network, mut oracle) = Network::new(
399 context.with_label("network"),
400 simulated::Config {
401 max_size: 1024 * 1024,
402 },
403 );
404
405 network.start();
407
408 let mut signers = Vec::new();
410 let mut validators = Vec::new();
411 for i in 0..n {
412 let signer = PrivateKey::from_seed(i as u64);
413 let pk = signer.public_key();
414 signers.push(signer);
415 validators.push(pk);
416 }
417 validators.sort();
418 signers.sort_by_key(|s| s.public_key());
419 let mut registrations = register_validators(&mut oracle, &validators).await;
420
421 let link = Link {
423 latency: Duration::from_millis(10),
424 jitter: Duration::from_millis(1),
425 success_rate: 1.0,
426 };
427 link_validators(
428 &mut oracle,
429 &validators,
430 link.clone(),
431 Some(|_, i, j| ![i, j].contains(&0usize)),
432 )
433 .await;
434
435 let (polynomial, shares) =
437 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
438 let identity = *public::<MinSig>(&polynomial);
439
440 let indexer = Mock::new(identity);
442
443 for (idx, signer) in signers.iter().enumerate() {
445 if idx == 0 {
447 continue;
448 }
449
450 let public_key = signer.public_key();
452 let uid = format!("validator-{public_key}");
453 let config: Config<_, Mock> = engine::Config {
454 blocker: oracle.control(public_key.clone()),
455 partition_prefix: uid.clone(),
456 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
457 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
458 signer: signer.clone(),
459 polynomial: polynomial.clone(),
460 share: shares[idx].clone(),
461 participants: validators.clone(),
462 mailbox_size: 1024,
463 deque_size: 10,
464 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
465 leader_timeout: Duration::from_secs(1),
466 notarization_timeout: Duration::from_secs(2),
467 nullify_retry: Duration::from_secs(10),
468 fetch_timeout: Duration::from_secs(1),
469 activity_timeout: 10,
470 skip_timeout: 5,
471 max_fetch_count: 10,
472 max_fetch_size: 1024 * 512,
473 fetch_concurrent: 10,
474 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
475 buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
476 buffer_pool_capacity: BUFFER_POOL_CAPACITY,
477 indexer: indexer.clone(),
478 execution_concurrency: 2,
479 max_uploads_outstanding: 4,
480 };
481 let engine = Engine::new(context.with_label(&uid), config).await;
482
483 let (
485 pending,
486 recovered,
487 resolver,
488 broadcast,
489 backfill,
490 seeder,
491 aggregator,
492 aggregation,
493 ) = registrations.remove(&public_key).unwrap();
494
495 engine.start(
497 pending,
498 recovered,
499 resolver,
500 broadcast,
501 backfill,
502 seeder,
503 aggregator,
504 aggregation,
505 );
506 }
507
508 loop {
510 let metrics = context.encode();
511
512 let mut success = 0;
514 for line in metrics.lines() {
515 if !line.starts_with("validator-") {
517 continue;
518 }
519
520 let mut parts = line.split_whitespace();
522 let metric = parts.next().unwrap();
523 let value = parts.next().unwrap();
524
525 if metric.ends_with("_peers_blocked") {
527 let value = value.parse::<u64>().unwrap();
528 assert_eq!(value, 0);
529 }
530
531 if metric.ends_with("_certificates_processed") {
533 let value = value.parse::<u64>().unwrap();
534 if value >= initial_container_required {
535 success += 1;
536 }
537 }
538 }
539 if success == n - 1 {
540 break;
541 }
542
543 context.sleep(Duration::from_secs(1)).await;
545 }
546
547 link_validators(
549 &mut oracle,
550 &validators,
551 link,
552 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
553 )
554 .await;
555
556 let signer = signers[0].clone();
558 let share = shares[0].clone();
559 let public_key = signer.public_key();
560 let uid = format!("validator-{public_key}");
561 let config: Config<_, Mock> = engine::Config {
562 blocker: oracle.control(public_key.clone()),
563 partition_prefix: uid.clone(),
564 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
565 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
566 signer: signer.clone(),
567 polynomial: polynomial.clone(),
568 share,
569 participants: validators.clone(),
570 mailbox_size: 1024,
571 deque_size: 10,
572 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
573 leader_timeout: Duration::from_secs(1),
574 notarization_timeout: Duration::from_secs(2),
575 nullify_retry: Duration::from_secs(10),
576 fetch_timeout: Duration::from_secs(1),
577 activity_timeout: 10,
578 skip_timeout: 5,
579 max_fetch_count: 10,
580 max_fetch_size: 1024 * 512,
581 fetch_concurrent: 10,
582 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
583 buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
584 buffer_pool_capacity: BUFFER_POOL_CAPACITY,
585 indexer: indexer.clone(),
586 execution_concurrency: 2,
587 max_uploads_outstanding: 4,
588 };
589 let engine = Engine::new(context.with_label(&uid), config).await;
590
591 let (
593 pending,
594 recovered,
595 resolver,
596 broadcast,
597 backfill,
598 seeder,
599 aggregator,
600 aggregation,
601 ) = registrations.remove(&public_key).unwrap();
602
603 engine.start(
605 pending,
606 recovered,
607 resolver,
608 broadcast,
609 backfill,
610 seeder,
611 aggregator,
612 aggregation,
613 );
614
615 loop {
617 let metrics = context.encode();
618
619 let mut success = 0;
621 for line in metrics.lines() {
622 if !line.starts_with("validator-") {
624 continue;
625 }
626
627 let mut parts = line.split_whitespace();
629 let metric = parts.next().unwrap();
630 let value = parts.next().unwrap();
631
632 if metric.ends_with("_peers_blocked") {
634 let value = value.parse::<u64>().unwrap();
635 assert_eq!(value, 0);
636 }
637
638 if metric.ends_with("_certificates_processed") {
640 let value = value.parse::<u64>().unwrap();
641 if value >= final_container_required {
642 success += 1;
643 }
644 }
645 }
646 if success == n - 1 {
647 break;
648 }
649
650 context.sleep(Duration::from_secs(1)).await;
652 }
653 });
654 }
655
656 #[test_traced("INFO")]
657 fn test_unclean_shutdown() {
658 let n = 5;
660 let threshold = quorum(n);
661 let required_container = 100;
662
663 let mut rng = StdRng::seed_from_u64(0);
665 let (polynomial, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, n, threshold);
666 let identity = *public::<MinSig>(&polynomial);
667
668 let indexer = Mock::new(identity);
671
672 let mut runs = 0;
674 let mut prev_ctx = None;
675 loop {
676 let polynomial = polynomial.clone();
678 let shares = shares.clone();
679 let indexer = indexer.clone();
680 let f = |mut context: deterministic::Context| async move {
681 let (network, mut oracle) = Network::new(
683 context.with_label("network"),
684 simulated::Config {
685 max_size: 1024 * 1024,
686 },
687 );
688
689 network.start();
691
692 let mut signers = Vec::new();
694 let mut validators = Vec::new();
695 for i in 0..n {
696 let signer = PrivateKey::from_seed(i as u64);
697 let pk = signer.public_key();
698 signers.push(signer);
699 validators.push(pk);
700 }
701 validators.sort();
702 signers.sort_by_key(|s| s.public_key());
703 let mut registrations = register_validators(&mut oracle, &validators).await;
704
705 let link = Link {
707 latency: Duration::from_millis(10),
708 jitter: Duration::from_millis(1),
709 success_rate: 1.0,
710 };
711 link_validators(&mut oracle, &validators, link, None).await;
712
713 let mut public_keys = HashSet::new();
715 for (idx, signer) in signers.into_iter().enumerate() {
716 let public_key = signer.public_key();
718 public_keys.insert(public_key.clone());
719
720 let uid = format!("validator-{public_key}");
722 let config: Config<_, Mock> = engine::Config {
723 blocker: oracle.control(public_key.clone()),
724 partition_prefix: uid.clone(),
725 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
726 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
727 signer,
728 polynomial: polynomial.clone(),
729 share: shares[idx].clone(),
730 participants: validators.clone(),
731 mailbox_size: 1024,
732 deque_size: 10,
733 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
734 leader_timeout: Duration::from_secs(1),
735 notarization_timeout: Duration::from_secs(2),
736 nullify_retry: Duration::from_secs(10),
737 fetch_timeout: Duration::from_secs(1),
738 activity_timeout: 10,
739 skip_timeout: 5,
740 max_fetch_count: 10,
741 max_fetch_size: 1024 * 512,
742 fetch_concurrent: 10,
743 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
744 buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
745 buffer_pool_capacity: BUFFER_POOL_CAPACITY,
746 indexer: indexer.clone(),
747 execution_concurrency: 2,
748 max_uploads_outstanding: 4,
749 };
750 let engine = Engine::new(context.with_label(&uid), config).await;
751
752 let (
754 pending,
755 recovered,
756 resolver,
757 broadcast,
758 backfill,
759 seeder,
760 aggregator,
761 aggregation,
762 ) = registrations.remove(&public_key).unwrap();
763
764 engine.start(
766 pending,
767 recovered,
768 resolver,
769 broadcast,
770 backfill,
771 seeder,
772 aggregator,
773 aggregation,
774 );
775 }
776
777 let poller = context
779 .with_label("metrics")
780 .spawn(move |context| async move {
781 loop {
783 let metrics = context.encode();
784
785 let mut success = 0;
787 for line in metrics.lines() {
788 if !line.starts_with("validator-") {
790 continue;
791 }
792
793 let mut parts = line.split_whitespace();
795 let metric = parts.next().unwrap();
796 let value = parts.next().unwrap();
797
798 if metric.ends_with("_peers_blocked") {
800 let value = value.parse::<u64>().unwrap();
801 assert_eq!(value, 0);
802 }
803
804 if metric.ends_with("_certificates_processed") {
806 let value = value.parse::<u64>().unwrap();
807 if value >= required_container {
808 success += 1;
809 }
810 }
811 }
812 if success == n {
813 break;
814 }
815
816 context.sleep(Duration::from_millis(10)).await;
818 }
819
820 loop {
822 let contains_seeds = {
823 let mut contains_seeds = true;
824 let seeds = indexer.seeds.lock().unwrap();
825 for i in 1..=required_container {
826 if !seeds.contains_key(&i) {
827 contains_seeds = false;
828 break;
829 }
830 }
831 contains_seeds
832 };
833 let contains_summaries = {
834 let summaries = indexer.summaries.write().await;
835 let seen_summaries =
836 summaries.iter().map(|(i, _)| *i).collect::<HashSet<_>>();
837 let mut contains_summaries = true;
838 for i in 1..=required_container {
839 if !seen_summaries.contains(&i) {
840 contains_summaries = false;
841 break;
842 }
843 }
844 contains_summaries
845 };
846
847 if contains_seeds && contains_summaries {
849 break;
850 }
851
852 context.sleep(Duration::from_millis(10)).await;
854 }
855 });
856
857 let wait =
859 context.gen_range(Duration::from_millis(100)..Duration::from_millis(1_000));
860
861 select! {
863 _ = poller => {
864 (true, context)
866 },
867 _ = context.sleep(wait) => {
868 (false, context)
870 }
871 }
872 };
873
874 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
876 Runner::from(prev_ctx)
877 } else {
878 Runner::timed(Duration::from_secs(300))
879 }
880 .start(f);
881 if complete {
882 break;
883 }
884
885 prev_ctx = Some(context.recover());
887 runs += 1;
888 }
889 assert!(runs > 1);
890 info!(runs, "unclean shutdown recovery worked");
891 }
892
893 fn test_execution(seed: u64, link: Link) -> String {
894 let n = 5;
896 let threshold = quorum(n);
897 let cfg = deterministic::Config::default()
898 .with_seed(seed)
899 .with_timeout(Some(Duration::from_secs(1200)));
900 let executor = Runner::from(cfg);
901 executor.start(|mut context| async move {
902 let (network, mut oracle) = Network::new(
904 context.with_label("network"),
905 simulated::Config {
906 max_size: 1024 * 1024,
907 },
908 );
909
910 network.start();
912
913 let mut signers = Vec::new();
915 let mut validators = Vec::new();
916 for i in 0..n {
917 let signer = PrivateKey::from_seed(i as u64);
918 let pk = signer.public_key();
919 signers.push(signer);
920 validators.push(pk);
921 }
922 validators.sort();
923 signers.sort_by_key(|s| s.public_key());
924 let mut registrations = register_validators(&mut oracle, &validators).await;
925
926 link_validators(&mut oracle, &validators, link, None).await;
928
929 let (polynomial, shares) =
931 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
932 let identity = *public::<MinSig>(&polynomial);
933
934 let indexer = Mock::new(identity);
936
937 let mut public_keys = HashSet::new();
939 for (idx, signer) in signers.into_iter().enumerate() {
940 let public_key = signer.public_key();
942 public_keys.insert(public_key.clone());
943
944 let uid = format!("validator-{public_key}");
946 let config: Config<_, Mock> = engine::Config {
947 blocker: oracle.control(public_key.clone()),
948 partition_prefix: uid.clone(),
949 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
950 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
951 signer,
952 polynomial: polynomial.clone(),
953 share: shares[idx].clone(),
954 participants: validators.clone(),
955 mailbox_size: 1024,
956 deque_size: 10,
957 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
958 leader_timeout: Duration::from_secs(1),
959 notarization_timeout: Duration::from_secs(2),
960 nullify_retry: Duration::from_secs(10),
961 fetch_timeout: Duration::from_secs(1),
962 activity_timeout: 10,
963 skip_timeout: 5,
964 max_fetch_count: 10,
965 max_fetch_size: 1024 * 1024,
966 fetch_concurrent: 10,
967 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
968 buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
969 buffer_pool_capacity: BUFFER_POOL_CAPACITY,
970 indexer: indexer.clone(),
971 execution_concurrency: 2,
972 max_uploads_outstanding: 4,
973 };
974 let engine = Engine::new(context.with_label(&uid), config).await;
975
976 let (
978 pending,
979 recovered,
980 resolver,
981 broadcast,
982 backfill,
983 seeder,
984 aggregator,
985 aggregation,
986 ) = registrations.remove(&public_key).unwrap();
987
988 engine.start(
990 pending,
991 recovered,
992 resolver,
993 broadcast,
994 backfill,
995 seeder,
996 aggregator,
997 aggregation,
998 );
999 }
1000
1001 let mut remaining = BTreeMap::new();
1003 for i in 0..1_000 {
1004 let signer = PrivateKey::from_seed(i as u64);
1006
1007 let tx = Transaction::sign(&signer, 0, Instruction::Generate);
1009 indexer.submit_tx(tx.clone());
1010 remaining.insert(signer.public_key(), tx);
1011
1012 context.sleep(Duration::from_millis(5)).await;
1014 }
1015
1016 let mut seen = HashMap::new();
1018 let mut last_height = None;
1019 let mut all_height = 1;
1020 while last_height.is_none() || all_height < last_height.unwrap() {
1021 let summaries = indexer
1023 .summaries
1024 .write()
1025 .await
1026 .drain(..)
1027 .collect::<Vec<_>>();
1028
1029 if summaries.is_empty() {
1031 for (_, tx) in remaining.iter() {
1033 indexer.submit_tx(tx.clone());
1034 }
1035
1036 context.sleep(Duration::from_secs(1)).await;
1038 continue;
1039 }
1040
1041 for (height, summary) in summaries.into_iter() {
1043 for event in summary.events_proof_ops.iter() {
1045 if let commonware_storage::store::operation::Keyless::Append(
1046 battleware_types::execution::Output::Event(
1047 battleware_types::execution::Event::Generated { account, .. },
1048 ),
1049 ) = event
1050 {
1051 remaining.remove(account);
1052 }
1053 }
1054
1055 match seen.entry(height) {
1057 Entry::Vacant(entry) => {
1058 entry.insert((1, summary));
1059 }
1060 Entry::Occupied(mut entry) => {
1061 assert_eq!(entry.get().1, summary);
1062 entry.get_mut().0 += 1;
1063 }
1064 }
1065
1066 if last_height.is_none() && remaining.is_empty() {
1068 last_height = Some(height);
1069 }
1070 }
1071
1072 loop {
1074 let Some((seen, _)) = seen.get(&all_height) else {
1075 break;
1076 };
1077 if seen < &n {
1078 break;
1079 }
1080 all_height += 1;
1081 }
1082 }
1083
1084 context.auditor().state()
1086 })
1087 }
1088
1089 #[test_traced]
1090 fn test_execution_basic() {
1091 test_execution(
1092 42,
1093 Link {
1094 latency: Duration::from_millis(10),
1095 jitter: Duration::from_millis(1),
1096 success_rate: 1.0,
1097 },
1098 );
1099 }
1100
1101 #[test_traced("INFO")]
1102 fn test_execution_good_links() {
1103 let link = Link {
1104 latency: Duration::from_millis(10),
1105 jitter: Duration::from_millis(1),
1106 success_rate: 1.0,
1107 };
1108 for seed in 0..5 {
1109 let state1 = test_execution(seed, link.clone());
1110 let state2 = test_execution(seed, link.clone());
1111 assert_eq!(state1, state2);
1112 }
1113 }
1114
1115 #[test_traced("INFO")]
1116 fn test_execution_bad_links() {
1117 let link = Link {
1118 latency: Duration::from_millis(200),
1119 jitter: Duration::from_millis(150),
1120 success_rate: 0.75,
1121 };
1122 for seed in 0..5 {
1123 let state1 = test_execution(seed, link.clone());
1124 let state2 = test_execution(seed, link.clone());
1125 assert_eq!(state1, state2);
1126 }
1127 }
1128}