1use alto_types::{Finalized, Identity, Notarized, Seed};
2use serde::{Deserialize, Serialize};
3use std::{collections::HashMap, future::Future, net::SocketAddr};
4
5pub mod actors;
6pub mod engine;
7
8pub trait Indexer: Clone + Send + Sync + 'static {
10 type Error: std::error::Error + Send + Sync + 'static;
11
12 fn new(uri: &str, public: Identity) -> Self;
14
15 fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
17
18 fn notarized_upload(
20 &self,
21 notarized: Notarized,
22 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
23
24 fn finalized_upload(
26 &self,
27 finalized: Finalized,
28 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
29}
30
31impl Indexer for alto_client::Client {
32 type Error = alto_client::Error;
33
34 fn new(uri: &str, identity: Identity) -> Self {
35 Self::new(uri, identity)
36 }
37
38 fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send {
39 self.seed_upload(seed)
40 }
41
42 fn notarized_upload(
43 &self,
44 notarized: Notarized,
45 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
46 self.notarized_upload(notarized)
47 }
48
49 fn finalized_upload(
50 &self,
51 finalized: Finalized,
52 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
53 self.finalized_upload(finalized)
54 }
55}
56
57#[derive(Deserialize, Serialize)]
59pub struct Config {
60 pub private_key: String,
61 pub share: String,
62 pub polynomial: String,
63
64 pub port: u16,
65 pub metrics_port: u16,
66 pub directory: String,
67 pub worker_threads: usize,
68 pub log_level: String,
69
70 pub allowed_peers: Vec<String>,
71 pub bootstrappers: Vec<String>,
72
73 pub message_backlog: usize,
74 pub mailbox_size: usize,
75 pub deque_size: usize,
76
77 pub indexer: Option<String>,
78}
79
80#[derive(Deserialize, Serialize)]
84pub struct Peers {
85 pub addresses: HashMap<String, SocketAddr>,
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use alto_types::{Finalized, Notarized};
92 use commonware_cryptography::{
93 bls12381::{
94 dkg::ops,
95 primitives::{poly, variant::MinSig},
96 },
97 ed25519::{PrivateKey, PublicKey},
98 PrivateKeyExt, Signer,
99 };
100 use commonware_macros::{select, test_traced};
101 use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
102 use commonware_runtime::{
103 deterministic::{self, Runner},
104 Clock, Metrics, Runner as _, Spawner,
105 };
106 use commonware_utils::quorum;
107 use engine::{Config, Engine};
108 use governor::Quota;
109 use rand::{rngs::StdRng, Rng, SeedableRng};
110 use std::{
111 collections::{HashMap, HashSet},
112 num::NonZeroU32,
113 sync::Arc,
114 };
115 use std::{sync::atomic::AtomicBool, time::Duration};
116 use tracing::info;
117
118 const FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(14); #[derive(Clone)]
124 struct MockIndexer {
125 seed_seen: Arc<AtomicBool>,
126 notarization_seen: Arc<AtomicBool>,
127 finalization_seen: Arc<AtomicBool>,
128 }
129
130 impl Indexer for MockIndexer {
131 type Error = std::io::Error;
132
133 fn new(_: &str, _: Identity) -> Self {
134 MockIndexer {
135 seed_seen: Arc::new(AtomicBool::new(false)),
136 notarization_seen: Arc::new(AtomicBool::new(false)),
137 finalization_seen: Arc::new(AtomicBool::new(false)),
138 }
139 }
140
141 async fn seed_upload(&self, _: Seed) -> Result<(), Self::Error> {
142 self.seed_seen
143 .store(true, std::sync::atomic::Ordering::Relaxed);
144 Ok(())
145 }
146
147 async fn notarized_upload(&self, _: Notarized) -> Result<(), Self::Error> {
148 self.notarization_seen
149 .store(true, std::sync::atomic::Ordering::Relaxed);
150 Ok(())
151 }
152
153 async fn finalized_upload(&self, _: Finalized) -> Result<(), Self::Error> {
154 self.finalization_seen
155 .store(true, std::sync::atomic::Ordering::Relaxed);
156 Ok(())
157 }
158 }
159
160 async fn register_validators(
162 oracle: &mut Oracle<PublicKey>,
163 validators: &[PublicKey],
164 ) -> HashMap<
165 PublicKey,
166 (
167 (Sender<PublicKey>, Receiver<PublicKey>),
168 (Sender<PublicKey>, Receiver<PublicKey>),
169 (Sender<PublicKey>, Receiver<PublicKey>),
170 (Sender<PublicKey>, Receiver<PublicKey>),
171 (Sender<PublicKey>, Receiver<PublicKey>),
172 ),
173 > {
174 let mut registrations = HashMap::new();
175 for validator in validators.iter() {
176 let (pending_sender, pending_receiver) =
177 oracle.register(validator.clone(), 0).await.unwrap();
178 let (recovered_sender, recovered_receiver) =
179 oracle.register(validator.clone(), 1).await.unwrap();
180 let (resolver_sender, resolver_receiver) =
181 oracle.register(validator.clone(), 2).await.unwrap();
182 let (broadcast_sender, broadcast_receiver) =
183 oracle.register(validator.clone(), 3).await.unwrap();
184 let (backfill_sender, backfill_receiver) =
185 oracle.register(validator.clone(), 4).await.unwrap();
186 registrations.insert(
187 validator.clone(),
188 (
189 (pending_sender, pending_receiver),
190 (recovered_sender, recovered_receiver),
191 (resolver_sender, resolver_receiver),
192 (broadcast_sender, broadcast_receiver),
193 (backfill_sender, backfill_receiver),
194 ),
195 );
196 }
197 registrations
198 }
199
200 async fn link_validators(
206 oracle: &mut Oracle<PublicKey>,
207 validators: &[PublicKey],
208 link: Link,
209 restrict_to: Option<fn(usize, usize, usize) -> bool>,
210 ) {
211 for (i1, v1) in validators.iter().enumerate() {
212 for (i2, v2) in validators.iter().enumerate() {
213 if v2 == v1 {
215 continue;
216 }
217
218 if let Some(f) = restrict_to {
220 if !f(validators.len(), i1, i2) {
221 continue;
222 }
223 }
224
225 oracle
227 .add_link(v1.clone(), v2.clone(), link.clone())
228 .await
229 .unwrap();
230 }
231 }
232 }
233
234 fn all_online(n: u32, seed: u64, link: Link, required: u64) -> String {
235 let threshold = quorum(n);
237 let cfg = deterministic::Config::default().with_seed(seed);
238 let executor = Runner::from(cfg);
239 executor.start(|mut context| async move {
240 let (network, mut oracle) = Network::new(
242 context.with_label("network"),
243 simulated::Config {
244 max_size: 1024 * 1024,
245 },
246 );
247
248 network.start();
250
251 let mut signers = Vec::new();
253 let mut validators = Vec::new();
254 for i in 0..n {
255 let signer = PrivateKey::from_seed(i as u64);
256 let pk = signer.public_key();
257 signers.push(signer);
258 validators.push(pk);
259 }
260 validators.sort();
261 signers.sort_by_key(|s| s.public_key());
262 let mut registrations = register_validators(&mut oracle, &validators).await;
263
264 link_validators(&mut oracle, &validators, link, None).await;
266
267 let (polynomial, shares) =
269 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
270
271 let mut public_keys = HashSet::new();
273 for (idx, signer) in signers.into_iter().enumerate() {
274 let public_key = signer.public_key();
276 public_keys.insert(public_key.clone());
277
278 let uid = format!("validator-{public_key}");
280 let config: Config<_, MockIndexer> = engine::Config {
281 blocker: oracle.control(public_key.clone()),
282 partition_prefix: uid.clone(),
283 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
284 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
285 signer,
286 polynomial: polynomial.clone(),
287 share: shares[idx].clone(),
288 participants: validators.clone(),
289 mailbox_size: 1024,
290 deque_size: 10,
291 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
292 leader_timeout: Duration::from_secs(1),
293 notarization_timeout: Duration::from_secs(2),
294 nullify_retry: Duration::from_secs(10),
295 fetch_timeout: Duration::from_secs(1),
296 activity_timeout: 10,
297 skip_timeout: 5,
298 max_fetch_count: 10,
299 max_fetch_size: 1024 * 512,
300 fetch_concurrent: 10,
301 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
302 indexer: None,
303 };
304 let engine = Engine::new(context.with_label(&uid), config).await;
305
306 let (pending, recovered, resolver, broadcast, backfill) =
308 registrations.remove(&public_key).unwrap();
309
310 engine.start(pending, recovered, resolver, broadcast, backfill);
312 }
313
314 loop {
316 let metrics = context.encode();
317
318 let mut success = false;
320 for line in metrics.lines() {
321 if !line.starts_with("validator-") {
323 continue;
324 }
325
326 let mut parts = line.split_whitespace();
328 let metric = parts.next().unwrap();
329 let value = parts.next().unwrap();
330
331 if metric.ends_with("_peers_blocked") {
333 let value = value.parse::<u64>().unwrap();
334 assert_eq!(value, 0);
335 }
336
337 if metric.ends_with("_syncer_contiguous_height") {
339 let value = value.parse::<u64>().unwrap();
340 if value >= required {
341 success = true;
342 break;
343 }
344 }
345 }
346 if success {
347 break;
348 }
349
350 context.sleep(Duration::from_secs(1)).await;
352 }
353 context.auditor().state()
354 })
355 }
356
357 #[test_traced]
358 fn test_good_links() {
359 let link = Link {
360 latency: 10.0,
361 jitter: 1.0,
362 success_rate: 1.0,
363 };
364 for seed in 0..5 {
365 let state = all_online(5, seed, link.clone(), 25);
366 assert_eq!(state, all_online(5, seed, link.clone(), 25));
367 }
368 }
369
370 #[test_traced]
371 fn test_bad_links() {
372 let link = Link {
373 latency: 200.0,
374 jitter: 150.0,
375 success_rate: 0.75,
376 };
377 for seed in 0..5 {
378 let state = all_online(5, seed, link.clone(), 25);
379 assert_eq!(state, all_online(5, seed, link.clone(), 25));
380 }
381 }
382
383 #[test_traced]
384 fn test_1k() {
385 let link = Link {
386 latency: 80.0,
387 jitter: 10.0,
388 success_rate: 0.98,
389 };
390 all_online(10, 0, link.clone(), 1000);
391 }
392
393 #[test_traced]
394 fn test_backfill() {
395 let n = 5;
397 let threshold = quorum(n);
398 let initial_container_required = 10;
399 let final_container_required = 20;
400 let executor = Runner::timed(Duration::from_secs(30));
401 executor.start(|mut context| async move {
402 let (network, mut oracle) = Network::new(
404 context.with_label("network"),
405 simulated::Config {
406 max_size: 1024 * 1024,
407 },
408 );
409
410 network.start();
412
413 let mut signers = Vec::new();
415 let mut validators = Vec::new();
416 for i in 0..n {
417 let signer = PrivateKey::from_seed(i as u64);
418 let pk = signer.public_key();
419 signers.push(signer);
420 validators.push(pk);
421 }
422 validators.sort();
423 signers.sort_by_key(|s| s.public_key());
424 let mut registrations = register_validators(&mut oracle, &validators).await;
425
426 let link = Link {
428 latency: 10.0,
429 jitter: 1.0,
430 success_rate: 1.0,
431 };
432 link_validators(
433 &mut oracle,
434 &validators,
435 link.clone(),
436 Some(|_, i, j| ![i, j].contains(&0usize)),
437 )
438 .await;
439
440 let (polynomial, shares) =
442 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
443
444 for (idx, signer) in signers.iter().enumerate() {
446 if idx == 0 {
448 continue;
449 }
450
451 let public_key = signer.public_key();
453 let uid = format!("validator-{public_key}");
454 let config: Config<_, MockIndexer> = engine::Config {
455 blocker: oracle.control(public_key.clone()),
456 partition_prefix: uid.clone(),
457 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
458 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
459 signer: signer.clone(),
460 polynomial: polynomial.clone(),
461 share: shares[idx].clone(),
462 participants: validators.clone(),
463 mailbox_size: 1024,
464 deque_size: 10,
465 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
466 leader_timeout: Duration::from_secs(1),
467 notarization_timeout: Duration::from_secs(2),
468 nullify_retry: Duration::from_secs(10),
469 fetch_timeout: Duration::from_secs(1),
470 activity_timeout: 10,
471 skip_timeout: 5,
472 max_fetch_count: 10,
473 max_fetch_size: 1024 * 512,
474 fetch_concurrent: 10,
475 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
476 indexer: None,
477 };
478 let engine = Engine::new(context.with_label(&uid), config).await;
479
480 let (pending, recovered, resolver, broadcast, backfill) =
482 registrations.remove(&public_key).unwrap();
483
484 engine.start(pending, recovered, resolver, broadcast, backfill);
486 }
487
488 loop {
490 let metrics = context.encode();
491
492 let mut success = true;
494 for line in metrics.lines() {
495 if !line.starts_with("validator-") {
497 continue;
498 }
499
500 let mut parts = line.split_whitespace();
502 let metric = parts.next().unwrap();
503 let value = parts.next().unwrap();
504
505 if metric.ends_with("_peers_blocked") {
507 let value = value.parse::<u64>().unwrap();
508 assert_eq!(value, 0);
509 }
510
511 if metric.ends_with("_syncer_contiguous_height") {
513 let value = value.parse::<u64>().unwrap();
514 if value >= initial_container_required {
515 success = true;
516 break;
517 }
518 }
519 }
520 if success {
521 break;
522 }
523
524 context.sleep(Duration::from_secs(1)).await;
526 }
527
528 link_validators(
530 &mut oracle,
531 &validators,
532 link,
533 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
534 )
535 .await;
536
537 let signer = signers[0].clone();
539 let share = shares[0].clone();
540 let public_key = signer.public_key();
541 let uid = format!("validator-{public_key}");
542 let config: Config<_, MockIndexer> = engine::Config {
543 blocker: oracle.control(public_key.clone()),
544 partition_prefix: uid.clone(),
545 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
546 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
547 signer: signer.clone(),
548 polynomial: polynomial.clone(),
549 share,
550 participants: validators.clone(),
551 mailbox_size: 1024,
552 deque_size: 10,
553 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
554 leader_timeout: Duration::from_secs(1),
555 notarization_timeout: Duration::from_secs(2),
556 nullify_retry: Duration::from_secs(10),
557 fetch_timeout: Duration::from_secs(1),
558 activity_timeout: 10,
559 skip_timeout: 5,
560 max_fetch_count: 10,
561 max_fetch_size: 1024 * 512,
562 fetch_concurrent: 10,
563 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
564 indexer: None,
565 };
566 let engine = Engine::new(context.with_label(&uid), config).await;
567
568 let (pending, recovered, resolver, broadcast, backfill) =
570 registrations.remove(&public_key).unwrap();
571
572 engine.start(pending, recovered, resolver, broadcast, backfill);
574
575 loop {
577 let metrics = context.encode();
578
579 let mut success = false;
581 for line in metrics.lines() {
582 if !line.starts_with("validator-") {
584 continue;
585 }
586
587 let mut parts = line.split_whitespace();
589 let metric = parts.next().unwrap();
590 let value = parts.next().unwrap();
591
592 if metric.ends_with("_peers_blocked") {
594 let value = value.parse::<u64>().unwrap();
595 assert_eq!(value, 0);
596 }
597
598 if metric.ends_with("_syncer_contiguous_height") {
600 let value = value.parse::<u64>().unwrap();
601 if value >= final_container_required {
602 success = true;
603 break;
604 }
605 }
606 }
607 if success {
608 break;
609 }
610
611 context.sleep(Duration::from_secs(1)).await;
613 }
614 });
615 }
616
617 #[test_traced]
618 fn test_unclean_shutdown() {
619 let n = 5;
621 let threshold = quorum(n);
622 let required_container = 100;
623
624 let mut rng = StdRng::seed_from_u64(0);
626 let (polynomial, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, n, threshold);
627
628 let mut runs = 0;
630 let mut prev_ctx = None;
631 loop {
632 let polynomial = polynomial.clone();
634 let shares = shares.clone();
635 let f = |mut context: deterministic::Context| async move {
636 let (network, mut oracle) = Network::new(
638 context.with_label("network"),
639 simulated::Config {
640 max_size: 1024 * 1024,
641 },
642 );
643
644 network.start();
646
647 let mut signers = Vec::new();
649 let mut validators = Vec::new();
650 for i in 0..n {
651 let signer = PrivateKey::from_seed(i as u64);
652 let pk = signer.public_key();
653 signers.push(signer);
654 validators.push(pk);
655 }
656 validators.sort();
657 signers.sort_by_key(|s| s.public_key());
658 let mut registrations = register_validators(&mut oracle, &validators).await;
659
660 let link = Link {
662 latency: 10.0,
663 jitter: 1.0,
664 success_rate: 1.0,
665 };
666 link_validators(&mut oracle, &validators, link, None).await;
667
668 let mut public_keys = HashSet::new();
670 for (idx, signer) in signers.into_iter().enumerate() {
671 let public_key = signer.public_key();
673 public_keys.insert(public_key.clone());
674
675 let uid = format!("validator-{public_key}");
677 let config: Config<_, MockIndexer> = engine::Config {
678 blocker: oracle.control(public_key.clone()),
679 partition_prefix: uid.clone(),
680 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
681 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
682 signer,
683 polynomial: polynomial.clone(),
684 share: shares[idx].clone(),
685 participants: validators.clone(),
686 mailbox_size: 1024,
687 deque_size: 10,
688 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
689 leader_timeout: Duration::from_secs(1),
690 notarization_timeout: Duration::from_secs(2),
691 nullify_retry: Duration::from_secs(10),
692 fetch_timeout: Duration::from_secs(1),
693 activity_timeout: 10,
694 skip_timeout: 5,
695 max_fetch_count: 10,
696 max_fetch_size: 1024 * 512,
697 fetch_concurrent: 10,
698 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
699 indexer: None,
700 };
701 let engine = Engine::new(context.with_label(&uid), config).await;
702
703 let (pending, recovered, resolver, broadcast, backfill) =
705 registrations.remove(&public_key).unwrap();
706
707 engine.start(pending, recovered, resolver, broadcast, backfill);
709 }
710
711 let poller = context
713 .with_label("metrics")
714 .spawn(move |context| async move {
715 loop {
716 let metrics = context.encode();
717
718 let mut success = false;
720 for line in metrics.lines() {
721 if !line.starts_with("validator-") {
723 continue;
724 }
725
726 let mut parts = line.split_whitespace();
728 let metric = parts.next().unwrap();
729 let value = parts.next().unwrap();
730
731 if metric.ends_with("_peers_blocked") {
733 let value = value.parse::<u64>().unwrap();
734 assert_eq!(value, 0);
735 }
736
737 if metric.ends_with("_syncer_contiguous_height") {
739 let value = value.parse::<u64>().unwrap();
740 if value >= required_container {
741 success = true;
742 break;
743 }
744 }
745 }
746 if success {
747 break;
748 }
749
750 context.sleep(Duration::from_millis(10)).await;
752 }
753 });
754
755 let wait =
757 context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
758
759 select! {
761 _ = poller => {
762 (true, context)
764 },
765 _ = context.sleep(wait) => {
766 (false, context)
768 }
769 }
770 };
771
772 let (complete, context) = if let Some(prev_ctx) = prev_ctx {
774 Runner::from(prev_ctx)
775 } else {
776 Runner::timed(Duration::from_secs(30))
777 }
778 .start(f);
779 if complete {
780 break;
781 }
782
783 prev_ctx = Some(context.recover());
785 runs += 1;
786 }
787 assert!(runs > 1);
788 info!(runs, "unclean shutdown recovery worked");
789 }
790
791 #[test_traced]
792 fn test_indexer() {
793 let n = 5;
795 let threshold = quorum(n);
796 let required_container = 10;
797 let executor = Runner::timed(Duration::from_secs(30));
798 executor.start(|mut context| async move {
799 let (network, mut oracle) = Network::new(
801 context.with_label("network"),
802 simulated::Config {
803 max_size: 1024 * 1024,
804 },
805 );
806
807 network.start();
809
810 let mut signers = Vec::new();
812 let mut validators = Vec::new();
813 for i in 0..n {
814 let signer = PrivateKey::from_seed(i as u64);
815 let pk = signer.public_key();
816 signers.push(signer);
817 validators.push(pk);
818 }
819 validators.sort();
820 signers.sort_by_key(|s| s.public_key());
821 let mut registrations = register_validators(&mut oracle, &validators).await;
822
823 let link = Link {
825 latency: 10.0,
826 jitter: 1.0,
827 success_rate: 1.0,
828 };
829 link_validators(&mut oracle, &validators, link, None).await;
830
831 let (polynomial, shares) =
833 ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
834 let identity = *poly::public::<MinSig>(&polynomial);
835
836 let indexer = MockIndexer::new("", identity);
838
839 let mut public_keys = HashSet::new();
841 for (idx, signer) in signers.into_iter().enumerate() {
842 let public_key = signer.public_key();
844 public_keys.insert(public_key.clone());
845
846 let uid = format!("validator-{public_key}");
848 let config: Config<_, MockIndexer> = engine::Config {
849 blocker: oracle.control(public_key.clone()),
850 partition_prefix: uid.clone(),
851 blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
852 finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
853 signer,
854 polynomial: polynomial.clone(),
855 share: shares[idx].clone(),
856 participants: validators.clone(),
857 mailbox_size: 1024,
858 deque_size: 10,
859 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
860 leader_timeout: Duration::from_secs(1),
861 notarization_timeout: Duration::from_secs(2),
862 nullify_retry: Duration::from_secs(10),
863 fetch_timeout: Duration::from_secs(1),
864 activity_timeout: 10,
865 skip_timeout: 5,
866 max_fetch_count: 10,
867 max_fetch_size: 1024 * 512,
868 fetch_concurrent: 10,
869 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
870 indexer: Some(indexer.clone()),
871 };
872 let engine = Engine::new(context.with_label(&uid), config).await;
873
874 let (pending, recovered, resolver, broadcast, backfill) =
876 registrations.remove(&public_key).unwrap();
877
878 engine.start(pending, recovered, resolver, broadcast, backfill);
880 }
881
882 loop {
884 let metrics = context.encode();
885
886 let mut success = false;
888 for line in metrics.lines() {
889 if !line.starts_with("validator-") {
891 continue;
892 }
893
894 let mut parts = line.split_whitespace();
896 let metric = parts.next().unwrap();
897 let value = parts.next().unwrap();
898
899 if metric.ends_with("_peers_blocked") {
901 let value = value.parse::<u64>().unwrap();
902 assert_eq!(value, 0);
903 }
904
905 if metric.ends_with("_syncer_contiguous_height") {
907 let value = value.parse::<u64>().unwrap();
908 if value >= required_container {
909 success = true;
910 break;
911 }
912 }
913 }
914 if success {
915 break;
916 }
917
918 context.sleep(Duration::from_secs(1)).await;
920 }
921
922 assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed));
924 assert!(indexer
925 .notarization_seen
926 .load(std::sync::atomic::Ordering::Relaxed));
927 assert!(indexer
928 .finalization_seen
929 .load(std::sync::atomic::Ordering::Relaxed));
930 });
931 }
932}