1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, net::SocketAddr};
3
4pub mod application;
5pub mod engine;
6pub mod indexer;
7pub mod supervisor;
8pub mod utils;
9
10#[derive(Deserialize, Serialize)]
12pub struct Config {
13 pub private_key: String,
14 pub share: String,
15 pub polynomial: String,
16
17 pub port: u16,
18 pub metrics_port: u16,
19 pub directory: String,
20 pub worker_threads: usize,
21 pub log_level: String,
22
23 pub allowed_peers: Vec<String>,
24 pub bootstrappers: Vec<String>,
25
26 pub message_backlog: usize,
27 pub mailbox_size: usize,
28 pub deque_size: usize,
29
30 pub indexer: Option<String>,
31}
32
33#[derive(Deserialize, Serialize)]
37pub struct Peers {
38 pub addresses: HashMap<String, SocketAddr>,
39}
40
41#[cfg(test)]
42mod tests {
43 use super::*;
44 use commonware_cryptography::{
45 bls12381::{
46 dkg::ops,
47 primitives::{poly, variant::MinSig},
48 },
49 ed25519::{PrivateKey, PublicKey},
50 PrivateKeyExt, Signer,
51 };
52 use commonware_macros::{select, test_traced};
53 use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
54 use commonware_runtime::{
55 deterministic::{self, Runner},
56 Clock, Metrics, Runner as _, Spawner,
57 };
58 use commonware_utils::quorum;
59 use engine::{Config, Engine};
60 use governor::Quota;
61 use indexer::{Indexer, Mock};
62 use rand::{rngs::StdRng, Rng, SeedableRng};
63 use std::{
64 collections::{HashMap, HashSet},
65 num::NonZeroU32,
66 time::Duration,
67 };
68 use tracing::info;
69
70 const FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(14); async fn register_validators(
76 oracle: &mut Oracle<PublicKey>,
77 validators: &[PublicKey],
78 ) -> HashMap<
79 PublicKey,
80 (
81 (Sender<PublicKey>, Receiver<PublicKey>),
82 (Sender<PublicKey>, Receiver<PublicKey>),
83 (Sender<PublicKey>, Receiver<PublicKey>),
84 (Sender<PublicKey>, Receiver<PublicKey>),
85 (Sender<PublicKey>, Receiver<PublicKey>),
86 ),
87 > {
88 let mut registrations = HashMap::new();
89 for validator in validators.iter() {
90 let (pending_sender, pending_receiver) =
91 oracle.register(validator.clone(), 0).await.unwrap();
92 let (recovered_sender, recovered_receiver) =
93 oracle.register(validator.clone(), 1).await.unwrap();
94 let (resolver_sender, resolver_receiver) =
95 oracle.register(validator.clone(), 2).await.unwrap();
96 let (broadcast_sender, broadcast_receiver) =
97 oracle.register(validator.clone(), 3).await.unwrap();
98 let (backfill_sender, backfill_receiver) =
99 oracle.register(validator.clone(), 4).await.unwrap();
100 registrations.insert(
101 validator.clone(),
102 (
103 (pending_sender, pending_receiver),
104 (recovered_sender, recovered_receiver),
105 (resolver_sender, resolver_receiver),
106 (broadcast_sender, broadcast_receiver),
107 (backfill_sender, backfill_receiver),
108 ),
109 );
110 }
111 registrations
112 }
113
114 async fn link_validators(
120 oracle: &mut Oracle<PublicKey>,
121 validators: &[PublicKey],
122 link: Link,
123 restrict_to: Option<fn(usize, usize, usize) -> bool>,
124 ) {
125 for (i1, v1) in validators.iter().enumerate() {
126 for (i2, v2) in validators.iter().enumerate() {
127 if v2 == v1 {
129 continue;
130 }
131
132 if let Some(f) = restrict_to {
134 if !f(validators.len(), i1, i2) {
135 continue;
136 }
137 }
138
139 oracle
141 .add_link(v1.clone(), v2.clone(), link.clone())
142 .await
143 .unwrap();
144 }
145 }
146 }
147
148 fn all_online(n: u32, seed: u64, link: Link, required: u64) -> String {
149 let threshold = quorum(n);
151 let cfg = deterministic::Config::default().with_seed(seed);
152 let executor = Runner::from(cfg);
153 executor.start(|mut context| async move {
154 let (network, mut oracle) = Network::new(
156 context.with_label("network"),
157 simulated::Config {
158 max_size: 1024 * 1024,
159 },
160 );
161
162 network.start();
164
165 let mut signers = Vec::new();
167 let mut validators = Vec::new();
168 for i in 0..n {
169 let signer = PrivateKey::from_seed(i as u64);
170 let pk = signer.public_key();
171 signers.push(signer);
172 validators.push(pk);
173 }
174 validators.sort();
175 signers.sort_by_key(|s| s.public_key());
176 let mut registrations = register_validators(&mut oracle, &validators).await;
177
178 link_validators(&mut oracle, &validators, link, None).await;
180
181 let (polynomial, shares) =
183 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
184
185 let mut public_keys = HashSet::new();
187 for (idx, signer) in signers.into_iter().enumerate() {
188 let public_key = signer.public_key();
190 public_keys.insert(public_key.clone());
191
192 let uid = format!("validator-{public_key}");
194 let config: Config<_, Mock> = engine::Config {
195 blocker: oracle.control(public_key.clone()),
196 partition_prefix: uid.clone(),
197 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
198 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
199 signer,
200 polynomial: polynomial.clone(),
201 share: shares[idx].clone(),
202 participants: validators.clone(),
203 mailbox_size: 1024,
204 deque_size: 10,
205 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
206 leader_timeout: Duration::from_secs(1),
207 notarization_timeout: Duration::from_secs(2),
208 nullify_retry: Duration::from_secs(10),
209 fetch_timeout: Duration::from_secs(1),
210 activity_timeout: 10,
211 skip_timeout: 5,
212 max_fetch_count: 10,
213 max_fetch_size: 1024 * 512,
214 fetch_concurrent: 10,
215 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
216 indexer: None,
217 };
218 let engine = Engine::new(context.with_label(&uid), config).await;
219
220 let (pending, recovered, resolver, broadcast, backfill) =
222 registrations.remove(&public_key).unwrap();
223
224 engine.start(pending, recovered, resolver, broadcast, backfill);
226 }
227
228 loop {
230 let metrics = context.encode();
231
232 let mut success = false;
234 for line in metrics.lines() {
235 if !line.starts_with("validator-") {
237 continue;
238 }
239
240 let mut parts = line.split_whitespace();
242 let metric = parts.next().unwrap();
243 let value = parts.next().unwrap();
244
245 if metric.ends_with("_peers_blocked") {
247 let value = value.parse::<u64>().unwrap();
248 assert_eq!(value, 0);
249 }
250
251 if metric.ends_with("_marshal_processed_height") {
253 let value = value.parse::<u64>().unwrap();
254 if value >= required {
255 success = true;
256 break;
257 }
258 }
259 }
260 if success {
261 break;
262 }
263
264 context.sleep(Duration::from_secs(1)).await;
266 }
267 context.auditor().state()
268 })
269 }
270
271 #[test_traced]
272 fn test_good_links() {
273 let link = Link {
274 latency: 10.0,
275 jitter: 1.0,
276 success_rate: 1.0,
277 };
278 for seed in 0..5 {
279 let state = all_online(5, seed, link.clone(), 25);
280 assert_eq!(state, all_online(5, seed, link.clone(), 25));
281 }
282 }
283
284 #[test_traced]
285 fn test_bad_links() {
286 let link = Link {
287 latency: 200.0,
288 jitter: 150.0,
289 success_rate: 0.75,
290 };
291 for seed in 0..5 {
292 let state = all_online(5, seed, link.clone(), 25);
293 assert_eq!(state, all_online(5, seed, link.clone(), 25));
294 }
295 }
296
297 #[test_traced]
298 fn test_1k() {
299 let link = Link {
300 latency: 80.0,
301 jitter: 10.0,
302 success_rate: 0.98,
303 };
304 all_online(10, 0, link.clone(), 1000);
305 }
306
307 #[test_traced]
308 fn test_backfill() {
309 let n = 5;
311 let threshold = quorum(n);
312 let initial_container_required = 10;
313 let final_container_required = 20;
314 let executor = Runner::timed(Duration::from_secs(30));
315 executor.start(|mut context| async move {
316 let (network, mut oracle) = Network::new(
318 context.with_label("network"),
319 simulated::Config {
320 max_size: 1024 * 1024,
321 },
322 );
323
324 network.start();
326
327 let mut signers = Vec::new();
329 let mut validators = Vec::new();
330 for i in 0..n {
331 let signer = PrivateKey::from_seed(i as u64);
332 let pk = signer.public_key();
333 signers.push(signer);
334 validators.push(pk);
335 }
336 validators.sort();
337 signers.sort_by_key(|s| s.public_key());
338 let mut registrations = register_validators(&mut oracle, &validators).await;
339
340 let link = Link {
342 latency: 10.0,
343 jitter: 1.0,
344 success_rate: 1.0,
345 };
346 link_validators(
347 &mut oracle,
348 &validators,
349 link.clone(),
350 Some(|_, i, j| ![i, j].contains(&0usize)),
351 )
352 .await;
353
354 let (polynomial, shares) =
356 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
357
358 for (idx, signer) in signers.iter().enumerate() {
360 if idx == 0 {
362 continue;
363 }
364
365 let public_key = signer.public_key();
367 let uid = format!("validator-{public_key}");
368 let config: Config<_, Mock> = engine::Config {
369 blocker: oracle.control(public_key.clone()),
370 partition_prefix: uid.clone(),
371 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
372 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
373 signer: signer.clone(),
374 polynomial: polynomial.clone(),
375 share: shares[idx].clone(),
376 participants: validators.clone(),
377 mailbox_size: 1024,
378 deque_size: 10,
379 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
380 leader_timeout: Duration::from_secs(1),
381 notarization_timeout: Duration::from_secs(2),
382 nullify_retry: Duration::from_secs(10),
383 fetch_timeout: Duration::from_secs(1),
384 activity_timeout: 10,
385 skip_timeout: 5,
386 max_fetch_count: 10,
387 max_fetch_size: 1024 * 512,
388 fetch_concurrent: 10,
389 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
390 indexer: None,
391 };
392 let engine = Engine::new(context.with_label(&uid), config).await;
393
394 let (pending, recovered, resolver, broadcast, backfill) =
396 registrations.remove(&public_key).unwrap();
397
398 engine.start(pending, recovered, resolver, broadcast, backfill);
400 }
401
402 loop {
404 let metrics = context.encode();
405
406 let mut success = true;
408 for line in metrics.lines() {
409 if !line.starts_with("validator-") {
411 continue;
412 }
413
414 let mut parts = line.split_whitespace();
416 let metric = parts.next().unwrap();
417 let value = parts.next().unwrap();
418
419 if metric.ends_with("_peers_blocked") {
421 let value = value.parse::<u64>().unwrap();
422 assert_eq!(value, 0);
423 }
424
425 if metric.ends_with("_marshal_processed_height") {
427 let value = value.parse::<u64>().unwrap();
428 if value >= initial_container_required {
429 success = true;
430 break;
431 }
432 }
433 }
434 if success {
435 break;
436 }
437
438 context.sleep(Duration::from_secs(1)).await;
440 }
441
442 link_validators(
444 &mut oracle,
445 &validators,
446 link,
447 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
448 )
449 .await;
450
451 let signer = signers[0].clone();
453 let share = shares[0].clone();
454 let public_key = signer.public_key();
455 let uid = format!("validator-{public_key}");
456 let config: Config<_, Mock> = engine::Config {
457 blocker: oracle.control(public_key.clone()),
458 partition_prefix: uid.clone(),
459 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
460 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
461 signer: signer.clone(),
462 polynomial: polynomial.clone(),
463 share,
464 participants: validators.clone(),
465 mailbox_size: 1024,
466 deque_size: 10,
467 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
468 leader_timeout: Duration::from_secs(1),
469 notarization_timeout: Duration::from_secs(2),
470 nullify_retry: Duration::from_secs(10),
471 fetch_timeout: Duration::from_secs(1),
472 activity_timeout: 10,
473 skip_timeout: 5,
474 max_fetch_count: 10,
475 max_fetch_size: 1024 * 512,
476 fetch_concurrent: 10,
477 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
478 indexer: None,
479 };
480 let engine = Engine::new(context.with_label(&uid), config).await;
481
482 let (pending, recovered, resolver, broadcast, backfill) =
484 registrations.remove(&public_key).unwrap();
485
486 engine.start(pending, recovered, resolver, broadcast, backfill);
488
489 loop {
491 let metrics = context.encode();
492
493 let mut success = false;
495 for line in metrics.lines() {
496 if !line.starts_with("validator-") {
498 continue;
499 }
500
501 let mut parts = line.split_whitespace();
503 let metric = parts.next().unwrap();
504 let value = parts.next().unwrap();
505
506 if metric.ends_with("_peers_blocked") {
508 let value = value.parse::<u64>().unwrap();
509 assert_eq!(value, 0);
510 }
511
512 if metric.ends_with("_marshal_processed_height") {
514 let value = value.parse::<u64>().unwrap();
515 if value >= final_container_required {
516 success = true;
517 break;
518 }
519 }
520 }
521 if success {
522 break;
523 }
524
525 context.sleep(Duration::from_secs(1)).await;
527 }
528 });
529 }
530
531 #[test_traced]
532 fn test_unclean_shutdown() {
533 let n = 5;
535 let threshold = quorum(n);
536 let required_container = 100;
537
538 let mut rng = StdRng::seed_from_u64(0);
540 let (polynomial, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, n, threshold);
541
542 let mut runs = 0;
544 let mut prev_ctx = None;
545 loop {
546 let polynomial = polynomial.clone();
548 let shares = shares.clone();
549 let f = |mut context: deterministic::Context| async move {
550 let (network, mut oracle) = Network::new(
552 context.with_label("network"),
553 simulated::Config {
554 max_size: 1024 * 1024,
555 },
556 );
557
558 network.start();
560
561 let mut signers = Vec::new();
563 let mut validators = Vec::new();
564 for i in 0..n {
565 let signer = PrivateKey::from_seed(i as u64);
566 let pk = signer.public_key();
567 signers.push(signer);
568 validators.push(pk);
569 }
570 validators.sort();
571 signers.sort_by_key(|s| s.public_key());
572 let mut registrations = register_validators(&mut oracle, &validators).await;
573
574 let link = Link {
576 latency: 10.0,
577 jitter: 1.0,
578 success_rate: 1.0,
579 };
580 link_validators(&mut oracle, &validators, link, None).await;
581
582 let mut public_keys = HashSet::new();
584 for (idx, signer) in signers.into_iter().enumerate() {
585 let public_key = signer.public_key();
587 public_keys.insert(public_key.clone());
588
589 let uid = format!("validator-{public_key}");
591 let config: Config<_, Mock> = engine::Config {
592 blocker: oracle.control(public_key.clone()),
593 partition_prefix: uid.clone(),
594 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
595 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
596 signer,
597 polynomial: polynomial.clone(),
598 share: shares[idx].clone(),
599 participants: validators.clone(),
600 mailbox_size: 1024,
601 deque_size: 10,
602 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
603 leader_timeout: Duration::from_secs(1),
604 notarization_timeout: Duration::from_secs(2),
605 nullify_retry: Duration::from_secs(10),
606 fetch_timeout: Duration::from_secs(1),
607 activity_timeout: 10,
608 skip_timeout: 5,
609 max_fetch_count: 10,
610 max_fetch_size: 1024 * 512,
611 fetch_concurrent: 10,
612 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
613 indexer: None,
614 };
615 let engine = Engine::new(context.with_label(&uid), config).await;
616
617 let (pending, recovered, resolver, broadcast, backfill) =
619 registrations.remove(&public_key).unwrap();
620
621 engine.start(pending, recovered, resolver, broadcast, backfill);
623 }
624
625 let poller = context
627 .with_label("metrics")
628 .spawn(move |context| async move {
629 loop {
630 let metrics = context.encode();
631
632 let mut success = false;
634 for line in metrics.lines() {
635 if !line.starts_with("validator-") {
637 continue;
638 }
639
640 let mut parts = line.split_whitespace();
642 let metric = parts.next().unwrap();
643 let value = parts.next().unwrap();
644
645 if metric.ends_with("_peers_blocked") {
647 let value = value.parse::<u64>().unwrap();
648 assert_eq!(value, 0);
649 }
650
651 if metric.ends_with("_marshal_processed_height") {
653 let value = value.parse::<u64>().unwrap();
654 if value >= required_container {
655 success = true;
656 break;
657 }
658 }
659 }
660 if success {
661 break;
662 }
663
664 context.sleep(Duration::from_millis(10)).await;
666 }
667 });
668
669 let wait =
671 context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
672
673 select! {
675 _ = poller => {
676 (true, context)
678 },
679 _ = context.sleep(wait) => {
680 (false, context)
682 }
683 }
684 };
685
686 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
688 Runner::from(prev_ctx)
689 } else {
690 Runner::timed(Duration::from_secs(30))
691 }
692 .start(f);
693 if complete {
694 break;
695 }
696
697 prev_ctx = Some(context.recover());
699 runs += 1;
700 }
701 assert!(runs > 1);
702 info!(runs, "unclean shutdown recovery worked");
703 }
704
705 #[test_traced]
706 fn test_indexer() {
707 let n = 5;
709 let threshold = quorum(n);
710 let required_container = 10;
711 let executor = Runner::timed(Duration::from_secs(30));
712 executor.start(|mut context| async move {
713 let (network, mut oracle) = Network::new(
715 context.with_label("network"),
716 simulated::Config {
717 max_size: 1024 * 1024,
718 },
719 );
720
721 network.start();
723
724 let mut signers = Vec::new();
726 let mut validators = Vec::new();
727 for i in 0..n {
728 let signer = PrivateKey::from_seed(i as u64);
729 let pk = signer.public_key();
730 signers.push(signer);
731 validators.push(pk);
732 }
733 validators.sort();
734 signers.sort_by_key(|s| s.public_key());
735 let mut registrations = register_validators(&mut oracle, &validators).await;
736
737 let link = Link {
739 latency: 10.0,
740 jitter: 1.0,
741 success_rate: 1.0,
742 };
743 link_validators(&mut oracle, &validators, link, None).await;
744
745 let (polynomial, shares) =
747 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
748 let identity = *poly::public::<MinSig>(&polynomial);
749
750 let indexer = Mock::new("", identity);
752
753 let mut public_keys = HashSet::new();
755 for (idx, signer) in signers.into_iter().enumerate() {
756 let public_key = signer.public_key();
758 public_keys.insert(public_key.clone());
759
760 let uid = format!("validator-{public_key}");
762 let config: Config<_, Mock> = engine::Config {
763 blocker: oracle.control(public_key.clone()),
764 partition_prefix: uid.clone(),
765 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
766 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
767 signer,
768 polynomial: polynomial.clone(),
769 share: shares[idx].clone(),
770 participants: validators.clone(),
771 mailbox_size: 1024,
772 deque_size: 10,
773 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
774 leader_timeout: Duration::from_secs(1),
775 notarization_timeout: Duration::from_secs(2),
776 nullify_retry: Duration::from_secs(10),
777 fetch_timeout: Duration::from_secs(1),
778 activity_timeout: 10,
779 skip_timeout: 5,
780 max_fetch_count: 10,
781 max_fetch_size: 1024 * 512,
782 fetch_concurrent: 10,
783 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
784 indexer: Some(indexer.clone()),
785 };
786 let engine = Engine::new(context.with_label(&uid), config).await;
787
788 let (pending, recovered, resolver, broadcast, backfill) =
790 registrations.remove(&public_key).unwrap();
791
792 engine.start(pending, recovered, resolver, broadcast, backfill);
794 }
795
796 loop {
798 let metrics = context.encode();
799
800 let mut success = false;
802 for line in metrics.lines() {
803 if !line.starts_with("validator-") {
805 continue;
806 }
807
808 let mut parts = line.split_whitespace();
810 let metric = parts.next().unwrap();
811 let value = parts.next().unwrap();
812
813 if metric.ends_with("_peers_blocked") {
815 let value = value.parse::<u64>().unwrap();
816 assert_eq!(value, 0);
817 }
818
819 if metric.ends_with("_marshal_processed_height") {
821 let value = value.parse::<u64>().unwrap();
822 if value >= required_container {
823 success = true;
824 break;
825 }
826 }
827 }
828 if success {
829 break;
830 }
831
832 context.sleep(Duration::from_secs(1)).await;
834 }
835
836 assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed));
838 assert!(indexer
839 .notarization_seen
840 .load(std::sync::atomic::Ordering::Relaxed));
841 assert!(indexer
842 .finalization_seen
843 .load(std::sync::atomic::Ordering::Relaxed));
844 });
845 }
846}