alto_chain/
lib.rs

1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, net::SocketAddr};
3
4pub mod application;
5pub mod engine;
6pub mod indexer;
7pub mod supervisor;
8pub mod utils;
9
10/// Configuration for the [engine::Engine].
11#[derive(Deserialize, Serialize)]
12pub struct Config {
13    pub private_key: String,
14    pub share: String,
15    pub polynomial: String,
16
17    pub port: u16,
18    pub metrics_port: u16,
19    pub directory: String,
20    pub worker_threads: usize,
21    pub log_level: String,
22
23    pub allowed_peers: Vec<String>,
24    pub bootstrappers: Vec<String>,
25
26    pub message_backlog: usize,
27    pub mailbox_size: usize,
28    pub deque_size: usize,
29
30    pub indexer: Option<String>,
31}
32
33/// A list of peers provided when a validator is run locally.
34///
35/// When run remotely, [commonware_deployer::ec2::Hosts] is used instead.
36#[derive(Deserialize, Serialize)]
37pub struct Peers {
38    pub addresses: HashMap<String, SocketAddr>,
39}
40
41#[cfg(test)]
42mod tests {
43    use super::*;
44    use commonware_cryptography::{
45        bls12381::{
46            dkg::ops,
47            primitives::{poly, variant::MinSig},
48        },
49        ed25519::{PrivateKey, PublicKey},
50        PrivateKeyExt, Signer,
51    };
52    use commonware_macros::{select, test_traced};
53    use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
54    use commonware_runtime::{
55        deterministic::{self, Runner},
56        Clock, Metrics, Runner as _, Spawner,
57    };
58    use commonware_utils::quorum;
59    use engine::{Config, Engine};
60    use governor::Quota;
61    use indexer::{Indexer, Mock};
62    use rand::{rngs::StdRng, Rng, SeedableRng};
63    use std::{
64        collections::{HashMap, HashSet},
65        num::NonZeroU32,
66        time::Duration,
67    };
68    use tracing::info;
69
70    /// Limit the freezer table size to 1MB because the deterministic runtime stores
71    /// everything in RAM.
72    const FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(14); // 1MB
73
74    /// Registers all validators using the oracle.
75    async fn register_validators(
76        oracle: &mut Oracle<PublicKey>,
77        validators: &[PublicKey],
78    ) -> HashMap<
79        PublicKey,
80        (
81            (Sender<PublicKey>, Receiver<PublicKey>),
82            (Sender<PublicKey>, Receiver<PublicKey>),
83            (Sender<PublicKey>, Receiver<PublicKey>),
84            (Sender<PublicKey>, Receiver<PublicKey>),
85            (Sender<PublicKey>, Receiver<PublicKey>),
86        ),
87    > {
88        let mut registrations = HashMap::new();
89        for validator in validators.iter() {
90            let (pending_sender, pending_receiver) =
91                oracle.register(validator.clone(), 0).await.unwrap();
92            let (recovered_sender, recovered_receiver) =
93                oracle.register(validator.clone(), 1).await.unwrap();
94            let (resolver_sender, resolver_receiver) =
95                oracle.register(validator.clone(), 2).await.unwrap();
96            let (broadcast_sender, broadcast_receiver) =
97                oracle.register(validator.clone(), 3).await.unwrap();
98            let (backfill_sender, backfill_receiver) =
99                oracle.register(validator.clone(), 4).await.unwrap();
100            registrations.insert(
101                validator.clone(),
102                (
103                    (pending_sender, pending_receiver),
104                    (recovered_sender, recovered_receiver),
105                    (resolver_sender, resolver_receiver),
106                    (broadcast_sender, broadcast_receiver),
107                    (backfill_sender, backfill_receiver),
108                ),
109            );
110        }
111        registrations
112    }
113
114    /// Links (or unlinks) validators using the oracle.
115    ///
116    /// The `action` parameter determines the action (e.g. link, unlink) to take.
117    /// The `restrict_to` function can be used to restrict the linking to certain connections,
118    /// otherwise all validators will be linked to all other validators.
119    async fn link_validators(
120        oracle: &mut Oracle<PublicKey>,
121        validators: &[PublicKey],
122        link: Link,
123        restrict_to: Option<fn(usize, usize, usize) -> bool>,
124    ) {
125        for (i1, v1) in validators.iter().enumerate() {
126            for (i2, v2) in validators.iter().enumerate() {
127                // Ignore self
128                if v2 == v1 {
129                    continue;
130                }
131
132                // Restrict to certain connections
133                if let Some(f) = restrict_to {
134                    if !f(validators.len(), i1, i2) {
135                        continue;
136                    }
137                }
138
139                // Add link
140                oracle
141                    .add_link(v1.clone(), v2.clone(), link.clone())
142                    .await
143                    .unwrap();
144            }
145        }
146    }
147
148    fn all_online(n: u32, seed: u64, link: Link, required: u64) -> String {
149        // Create context
150        let threshold = quorum(n);
151        let cfg = deterministic::Config::default().with_seed(seed);
152        let executor = Runner::from(cfg);
153        executor.start(|mut context| async move {
154            // Create simulated network
155            let (network, mut oracle) = Network::new(
156                context.with_label("network"),
157                simulated::Config {
158                    max_size: 1024 * 1024,
159                },
160            );
161
162            // Start network
163            network.start();
164
165            // Register participants
166            let mut signers = Vec::new();
167            let mut validators = Vec::new();
168            for i in 0..n {
169                let signer = PrivateKey::from_seed(i as u64);
170                let pk = signer.public_key();
171                signers.push(signer);
172                validators.push(pk);
173            }
174            validators.sort();
175            signers.sort_by_key(|s| s.public_key());
176            let mut registrations = register_validators(&mut oracle, &validators).await;
177
178            // Link all validators
179            link_validators(&mut oracle, &validators, link, None).await;
180
181            // Derive threshold
182            let (polynomial, shares) =
183                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
184
185            // Create instances
186            let mut public_keys = HashSet::new();
187            for (idx, signer) in signers.into_iter().enumerate() {
188                // Create signer context
189                let public_key = signer.public_key();
190                public_keys.insert(public_key.clone());
191
192                // Configure engine
193                let uid = format!("validator-{public_key}");
194                let config: Config<_, Mock> = engine::Config {
195                    blocker: oracle.control(public_key.clone()),
196                    partition_prefix: uid.clone(),
197                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
198                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
199                    signer,
200                    polynomial: polynomial.clone(),
201                    share: shares[idx].clone(),
202                    participants: validators.clone(),
203                    mailbox_size: 1024,
204                    deque_size: 10,
205                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
206                    leader_timeout: Duration::from_secs(1),
207                    notarization_timeout: Duration::from_secs(2),
208                    nullify_retry: Duration::from_secs(10),
209                    fetch_timeout: Duration::from_secs(1),
210                    activity_timeout: 10,
211                    skip_timeout: 5,
212                    max_fetch_count: 10,
213                    max_fetch_size: 1024 * 512,
214                    fetch_concurrent: 10,
215                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
216                    indexer: None,
217                };
218                let engine = Engine::new(context.with_label(&uid), config).await;
219
220                // Get networking
221                let (pending, recovered, resolver, broadcast, backfill) =
222                    registrations.remove(&public_key).unwrap();
223
224                // Start engine
225                engine.start(pending, recovered, resolver, broadcast, backfill);
226            }
227
228            // Poll metrics
229            loop {
230                let metrics = context.encode();
231
232                // Iterate over all lines
233                let mut success = false;
234                for line in metrics.lines() {
235                    // Ensure it is a metrics line
236                    if !line.starts_with("validator-") {
237                        continue;
238                    }
239
240                    // Split metric and value
241                    let mut parts = line.split_whitespace();
242                    let metric = parts.next().unwrap();
243                    let value = parts.next().unwrap();
244
245                    // If ends with peers_blocked, ensure it is zero
246                    if metric.ends_with("_peers_blocked") {
247                        let value = value.parse::<u64>().unwrap();
248                        assert_eq!(value, 0);
249                    }
250
251                    // If ends with contiguous_height, ensure it is at least required_container
252                    if metric.ends_with("_marshal_processed_height") {
253                        let value = value.parse::<u64>().unwrap();
254                        if value >= required {
255                            success = true;
256                            break;
257                        }
258                    }
259                }
260                if success {
261                    break;
262                }
263
264                // Still waiting for all validators to complete
265                context.sleep(Duration::from_secs(1)).await;
266            }
267            context.auditor().state()
268        })
269    }
270
271    #[test_traced]
272    fn test_good_links() {
273        let link = Link {
274            latency: 10.0,
275            jitter: 1.0,
276            success_rate: 1.0,
277        };
278        for seed in 0..5 {
279            let state = all_online(5, seed, link.clone(), 25);
280            assert_eq!(state, all_online(5, seed, link.clone(), 25));
281        }
282    }
283
284    #[test_traced]
285    fn test_bad_links() {
286        let link = Link {
287            latency: 200.0,
288            jitter: 150.0,
289            success_rate: 0.75,
290        };
291        for seed in 0..5 {
292            let state = all_online(5, seed, link.clone(), 25);
293            assert_eq!(state, all_online(5, seed, link.clone(), 25));
294        }
295    }
296
297    #[test_traced]
298    fn test_1k() {
299        let link = Link {
300            latency: 80.0,
301            jitter: 10.0,
302            success_rate: 0.98,
303        };
304        all_online(10, 0, link.clone(), 1000);
305    }
306
307    #[test_traced]
308    fn test_backfill() {
309        // Create context
310        let n = 5;
311        let threshold = quorum(n);
312        let initial_container_required = 10;
313        let final_container_required = 20;
314        let executor = Runner::timed(Duration::from_secs(30));
315        executor.start(|mut context| async move {
316            // Create simulated network
317            let (network, mut oracle) = Network::new(
318                context.with_label("network"),
319                simulated::Config {
320                    max_size: 1024 * 1024,
321                },
322            );
323
324            // Start network
325            network.start();
326
327            // Register participants
328            let mut signers = Vec::new();
329            let mut validators = Vec::new();
330            for i in 0..n {
331                let signer = PrivateKey::from_seed(i as u64);
332                let pk = signer.public_key();
333                signers.push(signer);
334                validators.push(pk);
335            }
336            validators.sort();
337            signers.sort_by_key(|s| s.public_key());
338            let mut registrations = register_validators(&mut oracle, &validators).await;
339
340            // Link all validators (except 0)
341            let link = Link {
342                latency: 10.0,
343                jitter: 1.0,
344                success_rate: 1.0,
345            };
346            link_validators(
347                &mut oracle,
348                &validators,
349                link.clone(),
350                Some(|_, i, j| ![i, j].contains(&0usize)),
351            )
352            .await;
353
354            // Derive threshold
355            let (polynomial, shares) =
356                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
357
358            // Create instances
359            for (idx, signer) in signers.iter().enumerate() {
360                // Skip first
361                if idx == 0 {
362                    continue;
363                }
364
365                // Configure engine
366                let public_key = signer.public_key();
367                let uid = format!("validator-{public_key}");
368                let config: Config<_, Mock> = engine::Config {
369                    blocker: oracle.control(public_key.clone()),
370                    partition_prefix: uid.clone(),
371                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
372                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
373                    signer: signer.clone(),
374                    polynomial: polynomial.clone(),
375                    share: shares[idx].clone(),
376                    participants: validators.clone(),
377                    mailbox_size: 1024,
378                    deque_size: 10,
379                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
380                    leader_timeout: Duration::from_secs(1),
381                    notarization_timeout: Duration::from_secs(2),
382                    nullify_retry: Duration::from_secs(10),
383                    fetch_timeout: Duration::from_secs(1),
384                    activity_timeout: 10,
385                    skip_timeout: 5,
386                    max_fetch_count: 10,
387                    max_fetch_size: 1024 * 512,
388                    fetch_concurrent: 10,
389                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
390                    indexer: None,
391                };
392                let engine = Engine::new(context.with_label(&uid), config).await;
393
394                // Get networking
395                let (pending, recovered, resolver, broadcast, backfill) =
396                    registrations.remove(&public_key).unwrap();
397
398                // Start engine
399                engine.start(pending, recovered, resolver, broadcast, backfill);
400            }
401
402            // Poll metrics
403            loop {
404                let metrics = context.encode();
405
406                // Iterate over all lines
407                let mut success = true;
408                for line in metrics.lines() {
409                    // Ensure it is a metrics line
410                    if !line.starts_with("validator-") {
411                        continue;
412                    }
413
414                    // Split metric and value
415                    let mut parts = line.split_whitespace();
416                    let metric = parts.next().unwrap();
417                    let value = parts.next().unwrap();
418
419                    // If ends with peers_blocked, ensure it is zero
420                    if metric.ends_with("_peers_blocked") {
421                        let value = value.parse::<u64>().unwrap();
422                        assert_eq!(value, 0);
423                    }
424
425                    // If ends with contiguous_height, ensure it is at least required_container
426                    if metric.ends_with("_marshal_processed_height") {
427                        let value = value.parse::<u64>().unwrap();
428                        if value >= initial_container_required {
429                            success = true;
430                            break;
431                        }
432                    }
433                }
434                if success {
435                    break;
436                }
437
438                // Still waiting for all validators to complete
439                context.sleep(Duration::from_secs(1)).await;
440            }
441
442            // Link first peer
443            link_validators(
444                &mut oracle,
445                &validators,
446                link,
447                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
448            )
449            .await;
450
451            // Configure engine
452            let signer = signers[0].clone();
453            let share = shares[0].clone();
454            let public_key = signer.public_key();
455            let uid = format!("validator-{public_key}");
456            let config: Config<_, Mock> = engine::Config {
457                blocker: oracle.control(public_key.clone()),
458                partition_prefix: uid.clone(),
459                blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
460                finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
461                signer: signer.clone(),
462                polynomial: polynomial.clone(),
463                share,
464                participants: validators.clone(),
465                mailbox_size: 1024,
466                deque_size: 10,
467                backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
468                leader_timeout: Duration::from_secs(1),
469                notarization_timeout: Duration::from_secs(2),
470                nullify_retry: Duration::from_secs(10),
471                fetch_timeout: Duration::from_secs(1),
472                activity_timeout: 10,
473                skip_timeout: 5,
474                max_fetch_count: 10,
475                max_fetch_size: 1024 * 512,
476                fetch_concurrent: 10,
477                fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
478                indexer: None,
479            };
480            let engine = Engine::new(context.with_label(&uid), config).await;
481
482            // Get networking
483            let (pending, recovered, resolver, broadcast, backfill) =
484                registrations.remove(&public_key).unwrap();
485
486            // Start engine
487            engine.start(pending, recovered, resolver, broadcast, backfill);
488
489            // Poll metrics
490            loop {
491                let metrics = context.encode();
492
493                // Iterate over all lines
494                let mut success = false;
495                for line in metrics.lines() {
496                    // Ensure it is a metrics line
497                    if !line.starts_with("validator-") {
498                        continue;
499                    }
500
501                    // Split metric and value
502                    let mut parts = line.split_whitespace();
503                    let metric = parts.next().unwrap();
504                    let value = parts.next().unwrap();
505
506                    // If ends with peers_blocked, ensure it is zero
507                    if metric.ends_with("_peers_blocked") {
508                        let value = value.parse::<u64>().unwrap();
509                        assert_eq!(value, 0);
510                    }
511
512                    // If ends with contiguous_height, ensure it is at least required_container
513                    if metric.ends_with("_marshal_processed_height") {
514                        let value = value.parse::<u64>().unwrap();
515                        if value >= final_container_required {
516                            success = true;
517                            break;
518                        }
519                    }
520                }
521                if success {
522                    break;
523                }
524
525                // Still waiting for all validators to complete
526                context.sleep(Duration::from_secs(1)).await;
527            }
528        });
529    }
530
531    #[test_traced]
532    fn test_unclean_shutdown() {
533        // Create context
534        let n = 5;
535        let threshold = quorum(n);
536        let required_container = 100;
537
538        // Derive threshold
539        let mut rng = StdRng::seed_from_u64(0);
540        let (polynomial, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, n, threshold);
541
542        // Random restarts every x seconds
543        let mut runs = 0;
544        let mut prev_ctx = None;
545        loop {
546            // Setup run
547            let polynomial = polynomial.clone();
548            let shares = shares.clone();
549            let f = |mut context: deterministic::Context| async move {
550                // Create simulated network
551                let (network, mut oracle) = Network::new(
552                    context.with_label("network"),
553                    simulated::Config {
554                        max_size: 1024 * 1024,
555                    },
556                );
557
558                // Start network
559                network.start();
560
561                // Register participants
562                let mut signers = Vec::new();
563                let mut validators = Vec::new();
564                for i in 0..n {
565                    let signer = PrivateKey::from_seed(i as u64);
566                    let pk = signer.public_key();
567                    signers.push(signer);
568                    validators.push(pk);
569                }
570                validators.sort();
571                signers.sort_by_key(|s| s.public_key());
572                let mut registrations = register_validators(&mut oracle, &validators).await;
573
574                // Link all validators
575                let link = Link {
576                    latency: 10.0,
577                    jitter: 1.0,
578                    success_rate: 1.0,
579                };
580                link_validators(&mut oracle, &validators, link, None).await;
581
582                // Create instances
583                let mut public_keys = HashSet::new();
584                for (idx, signer) in signers.into_iter().enumerate() {
585                    // Create signer context
586                    let public_key = signer.public_key();
587                    public_keys.insert(public_key.clone());
588
589                    // Configure engine
590                    let uid = format!("validator-{public_key}");
591                    let config: Config<_, Mock> = engine::Config {
592                        blocker: oracle.control(public_key.clone()),
593                        partition_prefix: uid.clone(),
594                        blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
595                        finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
596                        signer,
597                        polynomial: polynomial.clone(),
598                        share: shares[idx].clone(),
599                        participants: validators.clone(),
600                        mailbox_size: 1024,
601                        deque_size: 10,
602                        backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
603                        leader_timeout: Duration::from_secs(1),
604                        notarization_timeout: Duration::from_secs(2),
605                        nullify_retry: Duration::from_secs(10),
606                        fetch_timeout: Duration::from_secs(1),
607                        activity_timeout: 10,
608                        skip_timeout: 5,
609                        max_fetch_count: 10,
610                        max_fetch_size: 1024 * 512,
611                        fetch_concurrent: 10,
612                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
613                        indexer: None,
614                    };
615                    let engine = Engine::new(context.with_label(&uid), config).await;
616
617                    // Get networking
618                    let (pending, recovered, resolver, broadcast, backfill) =
619                        registrations.remove(&public_key).unwrap();
620
621                    // Start engine
622                    engine.start(pending, recovered, resolver, broadcast, backfill);
623                }
624
625                // Poll metrics
626                let poller = context
627                    .with_label("metrics")
628                    .spawn(move |context| async move {
629                        loop {
630                            let metrics = context.encode();
631
632                            // Iterate over all lines
633                            let mut success = false;
634                            for line in metrics.lines() {
635                                // Ensure it is a metrics line
636                                if !line.starts_with("validator-") {
637                                    continue;
638                                }
639
640                                // Split metric and value
641                                let mut parts = line.split_whitespace();
642                                let metric = parts.next().unwrap();
643                                let value = parts.next().unwrap();
644
645                                // If ends with peers_blocked, ensure it is zero
646                                if metric.ends_with("_peers_blocked") {
647                                    let value = value.parse::<u64>().unwrap();
648                                    assert_eq!(value, 0);
649                                }
650
651                                // If ends with contiguous_height, ensure it is at least required_container
652                                if metric.ends_with("_marshal_processed_height") {
653                                    let value = value.parse::<u64>().unwrap();
654                                    if value >= required_container {
655                                        success = true;
656                                        break;
657                                    }
658                                }
659                            }
660                            if success {
661                                break;
662                            }
663
664                            // Still waiting for all validators to complete
665                            context.sleep(Duration::from_millis(10)).await;
666                        }
667                    });
668
669                // Exit at random points until finished
670                let wait =
671                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
672
673                // Wait for one to finish
674                select! {
675                    _ = poller => {
676                        // Finished
677                        (true, context)
678                    },
679                    _ = context.sleep(wait) => {
680                        // Randomly exit
681                        (false, context)
682                    }
683                }
684            };
685
686            // Handle run
687            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
688                Runner::from(prev_ctx)
689            } else {
690                Runner::timed(Duration::from_secs(30))
691            }
692            .start(f);
693            if complete {
694                break;
695            }
696
697            // Prepare for next run
698            prev_ctx = Some(context.recover());
699            runs += 1;
700        }
701        assert!(runs > 1);
702        info!(runs, "unclean shutdown recovery worked");
703    }
704
705    #[test_traced]
706    fn test_indexer() {
707        // Create context
708        let n = 5;
709        let threshold = quorum(n);
710        let required_container = 10;
711        let executor = Runner::timed(Duration::from_secs(30));
712        executor.start(|mut context| async move {
713            // Create simulated network
714            let (network, mut oracle) = Network::new(
715                context.with_label("network"),
716                simulated::Config {
717                    max_size: 1024 * 1024,
718                },
719            );
720
721            // Start network
722            network.start();
723
724            // Register participants
725            let mut signers = Vec::new();
726            let mut validators = Vec::new();
727            for i in 0..n {
728                let signer = PrivateKey::from_seed(i as u64);
729                let pk = signer.public_key();
730                signers.push(signer);
731                validators.push(pk);
732            }
733            validators.sort();
734            signers.sort_by_key(|s| s.public_key());
735            let mut registrations = register_validators(&mut oracle, &validators).await;
736
737            // Link all validators
738            let link = Link {
739                latency: 10.0,
740                jitter: 1.0,
741                success_rate: 1.0,
742            };
743            link_validators(&mut oracle, &validators, link, None).await;
744
745            // Derive threshold
746            let (polynomial, shares) =
747                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
748            let identity = *poly::public::<MinSig>(&polynomial);
749
750            // Define mock indexer
751            let indexer = Mock::new("", identity);
752
753            // Create instances
754            let mut public_keys = HashSet::new();
755            for (idx, signer) in signers.into_iter().enumerate() {
756                // Create signer context
757                let public_key = signer.public_key();
758                public_keys.insert(public_key.clone());
759
760                // Configure engine
761                let uid = format!("validator-{public_key}");
762                let config: Config<_, Mock> = engine::Config {
763                    blocker: oracle.control(public_key.clone()),
764                    partition_prefix: uid.clone(),
765                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
766                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
767                    signer,
768                    polynomial: polynomial.clone(),
769                    share: shares[idx].clone(),
770                    participants: validators.clone(),
771                    mailbox_size: 1024,
772                    deque_size: 10,
773                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
774                    leader_timeout: Duration::from_secs(1),
775                    notarization_timeout: Duration::from_secs(2),
776                    nullify_retry: Duration::from_secs(10),
777                    fetch_timeout: Duration::from_secs(1),
778                    activity_timeout: 10,
779                    skip_timeout: 5,
780                    max_fetch_count: 10,
781                    max_fetch_size: 1024 * 512,
782                    fetch_concurrent: 10,
783                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
784                    indexer: Some(indexer.clone()),
785                };
786                let engine = Engine::new(context.with_label(&uid), config).await;
787
788                // Get networking
789                let (pending, recovered, resolver, broadcast, backfill) =
790                    registrations.remove(&public_key).unwrap();
791
792                // Start engine
793                engine.start(pending, recovered, resolver, broadcast, backfill);
794            }
795
796            // Poll metrics
797            loop {
798                let metrics = context.encode();
799
800                // Iterate over all lines
801                let mut success = false;
802                for line in metrics.lines() {
803                    // Ensure it is a metrics line
804                    if !line.starts_with("validator-") {
805                        continue;
806                    }
807
808                    // Split metric and value
809                    let mut parts = line.split_whitespace();
810                    let metric = parts.next().unwrap();
811                    let value = parts.next().unwrap();
812
813                    // If ends with peers_blocked, ensure it is zero
814                    if metric.ends_with("_peers_blocked") {
815                        let value = value.parse::<u64>().unwrap();
816                        assert_eq!(value, 0);
817                    }
818
819                    // If ends with contiguous_height, ensure it is at least required_container
820                    if metric.ends_with("_marshal_processed_height") {
821                        let value = value.parse::<u64>().unwrap();
822                        if value >= required_container {
823                            success = true;
824                            break;
825                        }
826                    }
827                }
828                if success {
829                    break;
830                }
831
832                // Still waiting for all validators to complete
833                context.sleep(Duration::from_secs(1)).await;
834            }
835
836            // Check indexer uploads
837            assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed));
838            assert!(indexer
839                .notarization_seen
840                .load(std::sync::atomic::Ordering::Relaxed));
841            assert!(indexer
842                .finalization_seen
843                .load(std::sync::atomic::Ordering::Relaxed));
844        });
845    }
846}