1use alto_types::Scheme;
2use commonware_consensus::marshal::SchemeProvider;
3use serde::{Deserialize, Serialize};
4use std::{collections::HashMap, net::SocketAddr, sync::Arc};
5
6pub mod application;
7pub mod engine;
8pub mod indexer;
9pub mod utils;
10
11#[derive(Deserialize, Serialize)]
13pub struct Config {
14 pub private_key: String,
15 pub share: String,
16 pub polynomial: String,
17
18 pub port: u16,
19 pub metrics_port: u16,
20 pub directory: String,
21 pub worker_threads: usize,
22 pub log_level: String,
23
24 pub local: bool,
25 pub allowed_peers: Vec<String>,
26 pub bootstrappers: Vec<String>,
27
28 pub message_backlog: usize,
29 pub mailbox_size: usize,
30 pub deque_size: usize,
31
32 pub indexer: Option<String>,
33}
34
35#[derive(Deserialize, Serialize)]
39pub struct Peers {
40 pub addresses: HashMap<String, SocketAddr>,
41}
42
43#[derive(Clone)]
45pub struct StaticSchemeProvider(Arc<Scheme>);
46
47impl SchemeProvider for StaticSchemeProvider {
48 type Scheme = Scheme;
49
50 fn scheme(&self, _epoch: u64) -> Option<Arc<Scheme>> {
51 Some(self.0.clone())
52 }
53}
54
55impl From<Scheme> for StaticSchemeProvider {
56 fn from(scheme: Scheme) -> Self {
57 Self(Arc::new(scheme))
58 }
59}
60
61#[cfg(test)]
62mod tests {
63 use super::*;
64 use commonware_consensus::marshal;
65 use commonware_cryptography::{
66 bls12381::{
67 dkg::ops,
68 primitives::{poly, variant::MinSig},
69 },
70 ed25519::{PrivateKey, PublicKey},
71 PrivateKeyExt, Signer,
72 };
73 use commonware_macros::{select, test_traced};
74 use commonware_p2p::{
75 simulated::{self, Link, Network, Oracle, Receiver, Sender},
76 utils::requester,
77 Manager,
78 };
79 use commonware_runtime::{
80 deterministic::{self, Runner},
81 Clock, Metrics, Runner as _, Spawner,
82 };
83 use commonware_utils::quorum;
84 use engine::{Config, Engine};
85 use governor::Quota;
86 use indexer::{Indexer, Mock};
87 use rand::{rngs::StdRng, Rng, SeedableRng};
88 use std::{
89 collections::{HashMap, HashSet},
90 num::NonZeroU32,
91 time::Duration,
92 };
93 use tracing::info;
94
95 const FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(14); async fn register_validators(
101 oracle: &mut Oracle<PublicKey>,
102 validators: &[PublicKey],
103 ) -> HashMap<
104 PublicKey,
105 (
106 (Sender<PublicKey>, Receiver<PublicKey>),
107 (Sender<PublicKey>, Receiver<PublicKey>),
108 (Sender<PublicKey>, Receiver<PublicKey>),
109 (Sender<PublicKey>, Receiver<PublicKey>),
110 (Sender<PublicKey>, Receiver<PublicKey>),
111 ),
112 > {
113 oracle.update(0, validators.into()).await;
114 let mut registrations = HashMap::new();
115 for validator in validators.iter() {
116 let mut oracle = oracle.control(validator.clone());
117 let (pending_sender, pending_receiver) = oracle.register(0).await.unwrap();
118 let (recovered_sender, recovered_receiver) = oracle.register(1).await.unwrap();
119 let (resolver_sender, resolver_receiver) = oracle.register(2).await.unwrap();
120 let (broadcast_sender, broadcast_receiver) = oracle.register(3).await.unwrap();
121 let (backfill_sender, backfill_receiver) = oracle.register(4).await.unwrap();
122 registrations.insert(
123 validator.clone(),
124 (
125 (pending_sender, pending_receiver),
126 (recovered_sender, recovered_receiver),
127 (resolver_sender, resolver_receiver),
128 (broadcast_sender, broadcast_receiver),
129 (backfill_sender, backfill_receiver),
130 ),
131 );
132 }
133 registrations
134 }
135
136 async fn link_validators(
142 oracle: &mut Oracle<PublicKey>,
143 validators: &[PublicKey],
144 link: Link,
145 restrict_to: Option<fn(usize, usize, usize) -> bool>,
146 ) {
147 for (i1, v1) in validators.iter().enumerate() {
148 for (i2, v2) in validators.iter().enumerate() {
149 if v2 == v1 {
151 continue;
152 }
153
154 if let Some(f) = restrict_to {
156 if !f(validators.len(), i1, i2) {
157 continue;
158 }
159 }
160
161 oracle
163 .add_link(v1.clone(), v2.clone(), link.clone())
164 .await
165 .unwrap();
166 }
167 }
168 }
169
170 fn all_online(n: u32, seed: u64, link: Link, required: u64) -> String {
171 let threshold = quorum(n);
173 let cfg = deterministic::Config::default().with_seed(seed);
174 let executor = Runner::from(cfg);
175 executor.start(|mut context| async move {
176 let (network, mut oracle) = Network::new(
178 context.with_label("network"),
179 simulated::Config {
180 max_size: 1024 * 1024,
181 disconnect_on_block: true,
182 tracked_peer_sets: Some(1),
183 },
184 );
185
186 network.start();
188
189 let mut signers = Vec::new();
191 let mut validators = Vec::new();
192 for i in 0..n {
193 let signer = PrivateKey::from_seed(i as u64);
194 let pk = signer.public_key();
195 signers.push(signer);
196 validators.push(pk);
197 }
198 validators.sort();
199 signers.sort_by_key(|s| s.public_key());
200 let mut registrations = register_validators(&mut oracle, &validators).await;
201
202 link_validators(&mut oracle, &validators, link, None).await;
204
205 let (polynomial, shares) =
207 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
208
209 let mut public_keys = HashSet::new();
211 for (idx, signer) in signers.into_iter().enumerate() {
212 let public_key = signer.public_key();
214 public_keys.insert(public_key.clone());
215
216 let uid = format!("validator-{public_key}");
218 let config: Config<_, Mock> = engine::Config {
219 blocker: oracle.control(public_key.clone()),
220 partition_prefix: uid.clone(),
221 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
222 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
223 me: signer.public_key(),
224 polynomial: polynomial.clone(),
225 share: shares[idx].clone(),
226 participants: validators.clone().into(),
227 mailbox_size: 1024,
228 deque_size: 10,
229 leader_timeout: Duration::from_secs(1),
230 notarization_timeout: Duration::from_secs(2),
231 nullify_retry: Duration::from_secs(10),
232 fetch_timeout: Duration::from_secs(1),
233 activity_timeout: 10,
234 skip_timeout: 5,
235 max_fetch_count: 10,
236 max_fetch_size: 1024 * 512,
237 fetch_concurrent: 10,
238 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
239 indexer: None,
240 };
241 let engine = Engine::new(context.with_label(&uid), config).await;
242
243 let (pending, recovered, resolver, broadcast, backfill) =
245 registrations.remove(&public_key).unwrap();
246
247 let marshal_resolver_cfg = marshal::resolver::p2p::Config {
249 public_key: public_key.clone(),
250 manager: oracle.clone(),
251 mailbox_size: 1024,
252 requester_config: requester::Config {
253 me: Some(public_key.clone()),
254 rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
255 initial: Duration::from_secs(1),
256 timeout: Duration::from_secs(2),
257 },
258 fetch_retry_timeout: Duration::from_millis(100),
259 priority_requests: false,
260 priority_responses: false,
261 };
262
263 let marshal_resolver =
264 marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
265
266 engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
268 }
269
270 loop {
272 let metrics = context.encode();
273
274 let mut success = false;
276 for line in metrics.lines() {
277 if !line.starts_with("validator-") {
279 continue;
280 }
281
282 let mut parts = line.split_whitespace();
284 let metric = parts.next().unwrap();
285 let value = parts.next().unwrap();
286
287 if metric.ends_with("_peers_blocked") {
289 let value = value.parse::<u64>().unwrap();
290 assert_eq!(value, 0);
291 }
292
293 if metric.ends_with("_marshal_processed_height") {
295 let value = value.parse::<u64>().unwrap();
296 if value >= required {
297 success = true;
298 break;
299 }
300 }
301 }
302 if success {
303 break;
304 }
305
306 context.sleep(Duration::from_secs(1)).await;
308 }
309 context.auditor().state()
310 })
311 }
312
313 #[test_traced]
314 fn test_good_links() {
315 let link = Link {
316 latency: Duration::from_millis(10),
317 jitter: Duration::from_millis(1),
318 success_rate: 1.0,
319 };
320 for seed in 0..5 {
321 let state = all_online(5, seed, link.clone(), 25);
322 assert_eq!(state, all_online(5, seed, link.clone(), 25));
323 }
324 }
325
326 #[test_traced]
327 fn test_bad_links() {
328 let link = Link {
329 latency: Duration::from_millis(200),
330 jitter: Duration::from_millis(150),
331 success_rate: 0.75,
332 };
333 for seed in 0..5 {
334 let state = all_online(5, seed, link.clone(), 25);
335 assert_eq!(state, all_online(5, seed, link.clone(), 25));
336 }
337 }
338
339 #[test_traced]
340 fn test_1k() {
341 let link = Link {
342 latency: Duration::from_millis(80),
343 jitter: Duration::from_millis(10),
344 success_rate: 0.98,
345 };
346 all_online(10, 0, link.clone(), 1000);
347 }
348
349 #[test_traced]
350 fn test_backfill() {
351 let n = 5;
353 let threshold = quorum(n);
354 let initial_container_required = 10;
355 let final_container_required = 20;
356 let executor = Runner::timed(Duration::from_secs(30));
357 executor.start(|mut context| async move {
358 let (network, mut oracle) = Network::new(
360 context.with_label("network"),
361 simulated::Config {
362 max_size: 1024 * 1024,
363 disconnect_on_block: true,
364 tracked_peer_sets: Some(1),
365 },
366 );
367
368 network.start();
370
371 let mut signers = Vec::new();
373 let mut validators = Vec::new();
374 for i in 0..n {
375 let signer = PrivateKey::from_seed(i as u64);
376 let pk = signer.public_key();
377 signers.push(signer);
378 validators.push(pk);
379 }
380 validators.sort();
381 signers.sort_by_key(|s| s.public_key());
382 let mut registrations = register_validators(&mut oracle, &validators).await;
383
384 let link = Link {
386 latency: Duration::from_millis(10),
387 jitter: Duration::from_millis(1),
388 success_rate: 1.0,
389 };
390 link_validators(
391 &mut oracle,
392 &validators,
393 link.clone(),
394 Some(|_, i, j| ![i, j].contains(&0usize)),
395 )
396 .await;
397
398 let (polynomial, shares) =
400 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
401
402 for (idx, signer) in signers.iter().enumerate() {
404 if idx == 0 {
406 continue;
407 }
408
409 let public_key = signer.public_key();
411 let uid = format!("validator-{public_key}");
412 let config: Config<_, Mock> = engine::Config {
413 blocker: oracle.control(public_key.clone()),
414 partition_prefix: uid.clone(),
415 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
416 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
417 me: signer.public_key(),
418 polynomial: polynomial.clone(),
419 share: shares[idx].clone(),
420 participants: validators.clone().into(),
421 mailbox_size: 1024,
422 deque_size: 10,
423 leader_timeout: Duration::from_secs(1),
424 notarization_timeout: Duration::from_secs(2),
425 nullify_retry: Duration::from_secs(10),
426 fetch_timeout: Duration::from_secs(1),
427 activity_timeout: 10,
428 skip_timeout: 5,
429 max_fetch_count: 10,
430 max_fetch_size: 1024 * 512,
431 fetch_concurrent: 10,
432 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
433 indexer: None,
434 };
435 let engine = Engine::new(context.with_label(&uid), config).await;
436
437 let (pending, recovered, resolver, broadcast, backfill) =
439 registrations.remove(&public_key).unwrap();
440
441 let marshal_resolver_cfg = marshal::resolver::p2p::Config {
443 public_key: public_key.clone(),
444 manager: oracle.clone(),
445 mailbox_size: 1024,
446 requester_config: requester::Config {
447 me: Some(public_key.clone()),
448 rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
449 initial: Duration::from_secs(1),
450 timeout: Duration::from_secs(2),
451 },
452 fetch_retry_timeout: Duration::from_millis(100),
453 priority_requests: false,
454 priority_responses: false,
455 };
456
457 let marshal_resolver =
458 marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
459
460 engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
462 }
463
464 loop {
466 let metrics = context.encode();
467
468 let mut success = false;
470 for line in metrics.lines() {
471 if !line.starts_with("validator-") {
473 continue;
474 }
475
476 let mut parts = line.split_whitespace();
478 let metric = parts.next().unwrap();
479 let value = parts.next().unwrap();
480
481 if metric.ends_with("_peers_blocked") {
483 let value = value.parse::<u64>().unwrap();
484 assert_eq!(value, 0);
485 }
486
487 if metric.ends_with("_marshal_processed_height") {
489 let value = value.parse::<u64>().unwrap();
490 if value >= initial_container_required {
491 success = true;
492 break;
493 }
494 }
495 }
496 if success {
497 break;
498 }
499
500 context.sleep(Duration::from_secs(1)).await;
502 }
503
504 link_validators(
506 &mut oracle,
507 &validators,
508 link,
509 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
510 )
511 .await;
512
513 let signer = signers[0].clone();
515 let share = shares[0].clone();
516 let public_key = signer.public_key();
517 let uid = format!("validator-{public_key}");
518 let config: Config<_, Mock> = engine::Config {
519 blocker: oracle.control(public_key.clone()),
520 partition_prefix: uid.clone(),
521 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
522 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
523 me: signer.public_key(),
524 polynomial: polynomial.clone(),
525 share,
526 participants: validators.clone().into(),
527 mailbox_size: 1024,
528 deque_size: 10,
529 leader_timeout: Duration::from_secs(1),
530 notarization_timeout: Duration::from_secs(2),
531 nullify_retry: Duration::from_secs(10),
532 fetch_timeout: Duration::from_secs(1),
533 activity_timeout: 10,
534 skip_timeout: 5,
535 max_fetch_count: 10,
536 max_fetch_size: 1024 * 512,
537 fetch_concurrent: 10,
538 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
539 indexer: None,
540 };
541 let engine = Engine::new(context.with_label(&uid), config).await;
542
543 let (pending, recovered, resolver, broadcast, backfill) =
545 registrations.remove(&public_key).unwrap();
546
547 let marshal_resolver_cfg = marshal::resolver::p2p::Config {
549 public_key: public_key.clone(),
550 manager: oracle,
551 mailbox_size: 1024,
552 requester_config: requester::Config {
553 me: Some(public_key.clone()),
554 rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
555 initial: Duration::from_secs(1),
556 timeout: Duration::from_secs(2),
557 },
558 fetch_retry_timeout: Duration::from_millis(100),
559 priority_requests: false,
560 priority_responses: false,
561 };
562
563 let marshal_resolver =
564 marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
565
566 engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
568
569 loop {
571 let metrics = context.encode();
572
573 let mut success = false;
575 for line in metrics.lines() {
576 if !line.starts_with("validator-") {
578 continue;
579 }
580
581 let mut parts = line.split_whitespace();
583 let metric = parts.next().unwrap();
584 let value = parts.next().unwrap();
585
586 if metric.ends_with("_peers_blocked") {
588 let value = value.parse::<u64>().unwrap();
589 assert_eq!(value, 0);
590 }
591
592 if metric.ends_with("_marshal_processed_height") {
594 let value = value.parse::<u64>().unwrap();
595 if value >= final_container_required {
596 success = true;
597 break;
598 }
599 }
600 }
601 if success {
602 break;
603 }
604
605 context.sleep(Duration::from_secs(1)).await;
607 }
608 });
609 }
610
611 #[test_traced]
612 fn test_unclean_shutdown() {
613 let n = 5;
615 let threshold = quorum(n);
616 let required_container = 100;
617
618 let mut rng = StdRng::seed_from_u64(0);
620 let (polynomial, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, n, threshold);
621
622 let mut runs = 0;
624 let mut prev_checkpoint = None;
625 loop {
626 let polynomial = polynomial.clone();
628 let shares = shares.clone();
629 let f = |mut context: deterministic::Context| async move {
630 let (network, mut oracle) = Network::new(
632 context.with_label("network"),
633 simulated::Config {
634 max_size: 1024 * 1024,
635 disconnect_on_block: true,
636 tracked_peer_sets: Some(1),
637 },
638 );
639
640 network.start();
642
643 let mut signers = Vec::new();
645 let mut validators = Vec::new();
646 for i in 0..n {
647 let signer = PrivateKey::from_seed(i as u64);
648 let pk = signer.public_key();
649 signers.push(signer);
650 validators.push(pk);
651 }
652 validators.sort();
653 signers.sort_by_key(|s| s.public_key());
654 let mut registrations = register_validators(&mut oracle, &validators).await;
655
656 let link = Link {
658 latency: Duration::from_millis(10),
659 jitter: Duration::from_millis(1),
660 success_rate: 1.0,
661 };
662 link_validators(&mut oracle, &validators, link, None).await;
663
664 let mut public_keys = HashSet::new();
666 for (idx, signer) in signers.into_iter().enumerate() {
667 let public_key = signer.public_key();
669 public_keys.insert(public_key.clone());
670
671 let uid = format!("validator-{public_key}");
673 let config: Config<_, Mock> = engine::Config {
674 blocker: oracle.control(public_key.clone()),
675 partition_prefix: uid.clone(),
676 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
677 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
678 me: signer.public_key(),
679 polynomial: polynomial.clone(),
680 share: shares[idx].clone(),
681 participants: validators.clone().into(),
682 mailbox_size: 1024,
683 deque_size: 10,
684 leader_timeout: Duration::from_secs(1),
685 notarization_timeout: Duration::from_secs(2),
686 nullify_retry: Duration::from_secs(10),
687 fetch_timeout: Duration::from_secs(1),
688 activity_timeout: 10,
689 skip_timeout: 5,
690 max_fetch_count: 10,
691 max_fetch_size: 1024 * 512,
692 fetch_concurrent: 10,
693 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
694 indexer: None,
695 };
696 let engine = Engine::new(context.with_label(&uid), config).await;
697
698 let (pending, recovered, resolver, broadcast, backfill) =
700 registrations.remove(&public_key).unwrap();
701
702 let marshal_resolver_cfg = marshal::resolver::p2p::Config {
704 public_key: public_key.clone(),
705 manager: oracle.clone(),
706 mailbox_size: 1024,
707 requester_config: requester::Config {
708 me: Some(public_key.clone()),
709 rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
710 initial: Duration::from_secs(1),
711 timeout: Duration::from_secs(2),
712 },
713 fetch_retry_timeout: Duration::from_millis(100),
714 priority_requests: false,
715 priority_responses: false,
716 };
717
718 let marshal_resolver =
719 marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
720
721 engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
723 }
724
725 let poller = context
727 .with_label("metrics")
728 .spawn(move |context| async move {
729 loop {
730 let metrics = context.encode();
731
732 let mut success = false;
734 for line in metrics.lines() {
735 if !line.starts_with("validator-") {
737 continue;
738 }
739
740 let mut parts = line.split_whitespace();
742 let metric = parts.next().unwrap();
743 let value = parts.next().unwrap();
744
745 if metric.ends_with("_peers_blocked") {
747 let value = value.parse::<u64>().unwrap();
748 assert_eq!(value, 0);
749 }
750
751 if metric.ends_with("_marshal_processed_height") {
753 let value = value.parse::<u64>().unwrap();
754 if value >= required_container {
755 success = true;
756 break;
757 }
758 }
759 }
760 if success {
761 break;
762 }
763
764 context.sleep(Duration::from_millis(10)).await;
766 }
767 });
768
769 let wait =
771 context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
772
773 select! {
775 _ = poller => {
776 true
778 },
779 _ = context.sleep(wait) => {
780 false
782 }
783 }
784 };
785
786 let (complete, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint {
788 Runner::from(prev_checkpoint)
789 } else {
790 Runner::timed(Duration::from_secs(30))
791 }
792 .start_and_recover(f);
793
794 if complete {
796 break;
797 }
798
799 prev_checkpoint = Some(checkpoint);
801 runs += 1;
802 }
803 assert!(runs > 1);
804 info!(runs, "unclean shutdown recovery worked");
805 }
806
807 #[test_traced]
808 fn test_indexer() {
809 let n = 5;
811 let threshold = quorum(n);
812 let required_container = 10;
813 let executor = Runner::timed(Duration::from_secs(30));
814 executor.start(|mut context| async move {
815 let (network, mut oracle) = Network::new(
817 context.with_label("network"),
818 simulated::Config {
819 max_size: 1024 * 1024,
820 disconnect_on_block: true,
821 tracked_peer_sets: Some(1),
822 },
823 );
824
825 network.start();
827
828 let mut signers = Vec::new();
830 let mut validators = Vec::new();
831 for i in 0..n {
832 let signer = PrivateKey::from_seed(i as u64);
833 let pk = signer.public_key();
834 signers.push(signer);
835 validators.push(pk);
836 }
837 validators.sort();
838 signers.sort_by_key(|s| s.public_key());
839 let mut registrations = register_validators(&mut oracle, &validators).await;
840
841 let link = Link {
843 latency: Duration::from_millis(10),
844 jitter: Duration::from_millis(1),
845 success_rate: 1.0,
846 };
847 link_validators(&mut oracle, &validators, link, None).await;
848
849 let (polynomial, shares) =
851 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
852 let identity = *poly::public::<MinSig>(&polynomial);
853
854 let indexer = Mock::new("", identity);
856
857 let mut public_keys = HashSet::new();
859 for (idx, signer) in signers.into_iter().enumerate() {
860 let public_key = signer.public_key();
862 public_keys.insert(public_key.clone());
863
864 let uid = format!("validator-{public_key}");
866 let config: Config<_, Mock> = engine::Config {
867 blocker: oracle.control(public_key.clone()),
868 partition_prefix: uid.clone(),
869 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
870 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
871 me: signer.public_key(),
872 polynomial: polynomial.clone(),
873 share: shares[idx].clone(),
874 participants: validators.clone().into(),
875 mailbox_size: 1024,
876 deque_size: 10,
877 leader_timeout: Duration::from_secs(1),
878 notarization_timeout: Duration::from_secs(2),
879 nullify_retry: Duration::from_secs(10),
880 fetch_timeout: Duration::from_secs(1),
881 activity_timeout: 10,
882 skip_timeout: 5,
883 max_fetch_count: 10,
884 max_fetch_size: 1024 * 512,
885 fetch_concurrent: 10,
886 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
887 indexer: Some(indexer.clone()),
888 };
889 let engine = Engine::new(context.with_label(&uid), config).await;
890
891 let (pending, recovered, resolver, broadcast, backfill) =
893 registrations.remove(&public_key).unwrap();
894
895 let marshal_resolver_cfg = marshal::resolver::p2p::Config {
897 public_key: public_key.clone(),
898 manager: oracle.clone(),
899 mailbox_size: 1024,
900 requester_config: requester::Config {
901 me: Some(public_key.clone()),
902 rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
903 initial: Duration::from_secs(1),
904 timeout: Duration::from_secs(2),
905 },
906 fetch_retry_timeout: Duration::from_millis(100),
907 priority_requests: false,
908 priority_responses: false,
909 };
910
911 let marshal_resolver =
912 marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
913
914 engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
916 }
917
918 loop {
920 let metrics = context.encode();
921
922 let mut success = false;
924 for line in metrics.lines() {
925 if !line.starts_with("validator-") {
927 continue;
928 }
929
930 let mut parts = line.split_whitespace();
932 let metric = parts.next().unwrap();
933 let value = parts.next().unwrap();
934
935 if metric.ends_with("_peers_blocked") {
937 let value = value.parse::<u64>().unwrap();
938 assert_eq!(value, 0);
939 }
940
941 if metric.ends_with("_marshal_processed_height") {
943 let value = value.parse::<u64>().unwrap();
944 if value >= required_container {
945 success = true;
946 break;
947 }
948 }
949 }
950 if success {
951 break;
952 }
953
954 context.sleep(Duration::from_secs(1)).await;
956 }
957
958 assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed));
960 assert!(indexer
961 .notarization_seen
962 .load(std::sync::atomic::Ordering::Relaxed));
963 assert!(indexer
964 .finalization_seen
965 .load(std::sync::atomic::Ordering::Relaxed));
966 });
967 }
968}