alto_chain/
lib.rs

1use std::future::Future;
2
3use bytes::Bytes;
4use commonware_cryptography::bls12381;
5use serde::{Deserialize, Serialize};
6
7pub mod actors;
8pub mod engine;
9
10/// Trait for interacting with an indexer.
11pub trait Indexer: Clone + Send + Sync + 'static {
12    type Error: std::error::Error + Send + Sync + 'static;
13
14    /// Create a new indexer with the given URI and public key.
15    fn new(uri: &str, public: bls12381::PublicKey) -> Self;
16
17    /// Upload a seed to the indexer.
18    fn seed_upload(&self, seed: Bytes) -> impl Future<Output = Result<(), Self::Error>> + Send;
19
20    /// Upload a notarization to the indexer.
21    fn notarization_upload(
22        &self,
23        notarized: Bytes,
24    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
25
26    /// Upload a finalization to the indexer.
27    fn finalization_upload(
28        &self,
29        finalized: Bytes,
30    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
31}
32
33impl Indexer for alto_client::Client {
34    type Error = alto_client::Error;
35    fn new(uri: &str, public: bls12381::PublicKey) -> Self {
36        Self::new(uri, public)
37    }
38
39    fn seed_upload(
40        &self,
41        seed: bytes::Bytes,
42    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
43        self.seed_upload(seed)
44    }
45
46    fn notarization_upload(
47        &self,
48        notarization: bytes::Bytes,
49    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
50        self.notarization_upload(notarization)
51    }
52
53    fn finalization_upload(
54        &self,
55        finalization: bytes::Bytes,
56    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
57        self.finalization_upload(finalization)
58    }
59}
60
61#[derive(Deserialize, Serialize)]
62pub struct Config {
63    pub private_key: String,
64    pub share: String,
65    pub identity: String,
66
67    pub port: u16,
68    pub directory: String,
69    pub worker_threads: usize,
70
71    pub allowed_peers: Vec<String>,
72    pub bootstrappers: Vec<String>,
73
74    pub message_backlog: usize,
75    pub mailbox_size: usize,
76
77    pub indexer: Option<String>,
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use alto_types::{Finalized, Notarized, Seed};
84    use bls12381::primitives::poly;
85    use commonware_cryptography::{bls12381::dkg::ops, ed25519::PublicKey, Ed25519, Scheme};
86    use commonware_macros::test_traced;
87    use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
88    use commonware_runtime::{
89        deterministic::{self, Executor},
90        Clock, Metrics, Runner, Spawner,
91    };
92    use commonware_utils::quorum;
93    use engine::{Config, Engine};
94    use governor::Quota;
95    use rand::{rngs::StdRng, Rng, SeedableRng};
96    use std::{
97        collections::{HashMap, HashSet},
98        num::NonZeroU32,
99        sync::{Arc, Mutex},
100    };
101    use std::{sync::atomic::AtomicBool, time::Duration};
102    use tracing::info;
103
104    /// MockIndexer is a simple indexer implementation for testing.
105    #[derive(Clone)]
106    struct MockIndexer {
107        public: bls12381::PublicKey,
108
109        seed_seen: Arc<AtomicBool>,
110        notarization_seen: Arc<AtomicBool>,
111        finalization_seen: Arc<AtomicBool>,
112    }
113
114    impl Indexer for MockIndexer {
115        type Error = std::io::Error;
116
117        fn new(_: &str, public: bls12381::PublicKey) -> Self {
118            MockIndexer {
119                public,
120                seed_seen: Arc::new(AtomicBool::new(false)),
121                notarization_seen: Arc::new(AtomicBool::new(false)),
122                finalization_seen: Arc::new(AtomicBool::new(false)),
123            }
124        }
125
126        async fn seed_upload(&self, seed: Bytes) -> Result<(), Self::Error> {
127            Seed::deserialize(Some(&self.public), &seed).unwrap();
128            self.seed_seen
129                .store(true, std::sync::atomic::Ordering::Relaxed);
130            Ok(())
131        }
132
133        async fn notarization_upload(&self, notarized: Bytes) -> Result<(), Self::Error> {
134            Notarized::deserialize(Some(&self.public), &notarized).unwrap();
135            self.notarization_seen
136                .store(true, std::sync::atomic::Ordering::Relaxed);
137            Ok(())
138        }
139
140        async fn finalization_upload(&self, finalized: Bytes) -> Result<(), Self::Error> {
141            Finalized::deserialize(Some(&self.public), &finalized).unwrap();
142            self.finalization_seen
143                .store(true, std::sync::atomic::Ordering::Relaxed);
144            Ok(())
145        }
146    }
147
148    /// Registers all validators using the oracle.
149    async fn register_validators(
150        oracle: &mut Oracle<PublicKey>,
151        validators: &[PublicKey],
152    ) -> HashMap<
153        PublicKey,
154        (
155            (Sender<PublicKey>, Receiver<PublicKey>),
156            (Sender<PublicKey>, Receiver<PublicKey>),
157            (Sender<PublicKey>, Receiver<PublicKey>),
158            (Sender<PublicKey>, Receiver<PublicKey>),
159        ),
160    > {
161        let mut registrations = HashMap::new();
162        for validator in validators.iter() {
163            let (voter_sender, voter_receiver) =
164                oracle.register(validator.clone(), 0).await.unwrap();
165            let (resolver_sender, resolver_receiver) =
166                oracle.register(validator.clone(), 1).await.unwrap();
167            let (broadcast_sender, broadcast_receiver) =
168                oracle.register(validator.clone(), 2).await.unwrap();
169            let (backfill_sender, backfill_receiver) =
170                oracle.register(validator.clone(), 3).await.unwrap();
171            registrations.insert(
172                validator.clone(),
173                (
174                    (voter_sender, voter_receiver),
175                    (resolver_sender, resolver_receiver),
176                    (broadcast_sender, broadcast_receiver),
177                    (backfill_sender, backfill_receiver),
178                ),
179            );
180        }
181        registrations
182    }
183
184    /// Links (or unlinks) validators using the oracle.
185    ///
186    /// The `action` parameter determines the action (e.g. link, unlink) to take.
187    /// The `restrict_to` function can be used to restrict the linking to certain connections,
188    /// otherwise all validators will be linked to all other validators.
189    async fn link_validators(
190        oracle: &mut Oracle<PublicKey>,
191        validators: &[PublicKey],
192        link: Link,
193        restrict_to: Option<fn(usize, usize, usize) -> bool>,
194    ) {
195        for (i1, v1) in validators.iter().enumerate() {
196            for (i2, v2) in validators.iter().enumerate() {
197                // Ignore self
198                if v2 == v1 {
199                    continue;
200                }
201
202                // Restrict to certain connections
203                if let Some(f) = restrict_to {
204                    if !f(validators.len(), i1, i2) {
205                        continue;
206                    }
207                }
208
209                // Add link
210                oracle
211                    .add_link(v1.clone(), v2.clone(), link.clone())
212                    .await
213                    .unwrap();
214            }
215        }
216    }
217
218    fn all_online(seed: u64, link: Link) -> String {
219        // Create context
220        let n = 5;
221        let threshold = quorum(n).unwrap();
222        let required_container = 10;
223        let cfg = deterministic::Config {
224            seed,
225            timeout: Some(Duration::from_secs(30)),
226            ..Default::default()
227        };
228        let (executor, mut context, auditor) = Executor::init(cfg);
229        executor.start(async move {
230            // Create simulated network
231            let (network, mut oracle) = Network::new(
232                context.with_label("network"),
233                simulated::Config {
234                    max_size: 1024 * 1024,
235                },
236            );
237
238            // Start network
239            network.start();
240
241            // Register participants
242            let mut schemes = Vec::new();
243            let mut validators = Vec::new();
244            for i in 0..n {
245                let scheme = Ed25519::from_seed(i as u64);
246                let pk = scheme.public_key();
247                schemes.push(scheme);
248                validators.push(pk);
249            }
250            validators.sort();
251            schemes.sort_by_key(|s| s.public_key());
252            let mut registrations = register_validators(&mut oracle, &validators).await;
253
254            // Link all validators
255            link_validators(&mut oracle, &validators, link, None).await;
256
257            // Derive threshold
258            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
259
260            // Create instances
261            let mut public_keys = HashSet::new();
262            for (idx, scheme) in schemes.into_iter().enumerate() {
263                // Create scheme context
264                let public_key = scheme.public_key();
265                public_keys.insert(public_key.clone());
266
267                // Configure engine
268                let uid = format!("validator-{}", public_key);
269                let config: Config<MockIndexer> = engine::Config {
270                    partition_prefix: uid.clone(),
271                    signer: scheme,
272                    identity: public.clone(),
273                    share: shares[idx],
274                    participants: validators.clone(),
275                    mailbox_size: 1024,
276                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
277                    leader_timeout: Duration::from_secs(1),
278                    notarization_timeout: Duration::from_secs(2),
279                    nullify_retry: Duration::from_secs(10),
280                    fetch_timeout: Duration::from_secs(1),
281                    activity_timeout: 10,
282                    max_fetch_count: 10,
283                    max_fetch_size: 1024 * 512,
284                    fetch_concurrent: 10,
285                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
286                    indexer: None,
287                };
288                let engine = Engine::new(context.with_label(&uid), config).await;
289
290                // Get networking
291                let (voter, resolver, broadcast, backfill) =
292                    registrations.remove(&public_key).unwrap();
293
294                // Start engine
295                engine.start(voter, resolver, broadcast, backfill);
296            }
297
298            // Poll metrics
299            loop {
300                let metrics = context.encode();
301
302                // Iterate over all lines
303                let mut success = false;
304                for line in metrics.lines() {
305                    // Ensure it is a metrics line
306                    if !line.starts_with("validator-") {
307                        continue;
308                    }
309
310                    // Split metric and value
311                    let mut parts = line.split_whitespace();
312                    let metric = parts.next().unwrap();
313                    let value = parts.next().unwrap();
314
315                    // If ends with peers_blocked, ensure it is zero
316                    if metric.ends_with("_peers_blocked") {
317                        let value = value.parse::<u64>().unwrap();
318                        assert_eq!(value, 0);
319                    }
320
321                    // If ends with contiguous_height, ensure it is at least required_container
322                    if metric.ends_with("_syncer_contiguous_height") {
323                        let value = value.parse::<u64>().unwrap();
324                        if value >= required_container {
325                            success = true;
326                            break;
327                        }
328                    }
329                }
330                if !success {
331                    break;
332                }
333
334                // Still waiting for all validators to complete
335                context.sleep(Duration::from_secs(1)).await;
336            }
337        });
338        auditor.state()
339    }
340
341    #[test_traced]
342    fn test_good_links() {
343        let link = Link {
344            latency: 10.0,
345            jitter: 1.0,
346            success_rate: 1.0,
347        };
348        for seed in 0..5 {
349            let state = all_online(seed, link.clone());
350            assert_eq!(state, all_online(seed, link.clone()));
351        }
352    }
353
354    #[test_traced]
355    fn test_bad_links() {
356        let link = Link {
357            latency: 200.0,
358            jitter: 150.0,
359            success_rate: 0.75,
360        };
361        for seed in 0..5 {
362            let state = all_online(seed, link.clone());
363            assert_eq!(state, all_online(seed, link.clone()));
364        }
365    }
366
367    #[test_traced]
368    fn test_backfill() {
369        // Create context
370        let n = 5;
371        let threshold = quorum(n).unwrap();
372        let initial_container_required = 10;
373        let final_container_required = 20;
374        let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
375        executor.start(async move {
376            // Create simulated network
377            let (network, mut oracle) = Network::new(
378                context.with_label("network"),
379                simulated::Config {
380                    max_size: 1024 * 1024,
381                },
382            );
383
384            // Start network
385            network.start();
386
387            // Register participants
388            let mut schemes = Vec::new();
389            let mut validators = Vec::new();
390            for i in 0..n {
391                let scheme = Ed25519::from_seed(i as u64);
392                let pk = scheme.public_key();
393                schemes.push(scheme);
394                validators.push(pk);
395            }
396            validators.sort();
397            schemes.sort_by_key(|s| s.public_key());
398            let mut registrations = register_validators(&mut oracle, &validators).await;
399
400            // Link all validators (except 0)
401            let link = Link {
402                latency: 10.0,
403                jitter: 1.0,
404                success_rate: 1.0,
405            };
406            link_validators(
407                &mut oracle,
408                &validators,
409                link.clone(),
410                Some(|_, i, j| ![i, j].contains(&0usize)),
411            )
412            .await;
413
414            // Derive threshold
415            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
416
417            // Create instances
418            for (idx, scheme) in schemes.iter().enumerate() {
419                // Skip first
420                if idx == 0 {
421                    continue;
422                }
423
424                // Configure engine
425                let public_key = scheme.public_key();
426                let uid = format!("validator-{}", public_key);
427                let config: Config<MockIndexer> = engine::Config {
428                    partition_prefix: uid.clone(),
429                    signer: scheme.clone(),
430                    identity: public.clone(),
431                    share: shares[idx],
432                    participants: validators.clone(),
433                    mailbox_size: 1024,
434                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
435                    leader_timeout: Duration::from_secs(1),
436                    notarization_timeout: Duration::from_secs(2),
437                    nullify_retry: Duration::from_secs(10),
438                    fetch_timeout: Duration::from_secs(1),
439                    activity_timeout: 10,
440                    max_fetch_count: 10,
441                    max_fetch_size: 1024 * 512,
442                    fetch_concurrent: 10,
443                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
444                    indexer: None,
445                };
446                let engine = Engine::new(context.with_label(&uid), config).await;
447
448                // Get networking
449                let (voter, resolver, broadcast, backfill) =
450                    registrations.remove(&public_key).unwrap();
451
452                // Start engine
453                engine.start(voter, resolver, broadcast, backfill);
454            }
455
456            // Poll metrics
457            loop {
458                let metrics = context.encode();
459
460                // Iterate over all lines
461                let mut success = true;
462                for line in metrics.lines() {
463                    // Ensure it is a metrics line
464                    if !line.starts_with("validator-") {
465                        continue;
466                    }
467
468                    // Split metric and value
469                    let mut parts = line.split_whitespace();
470                    let metric = parts.next().unwrap();
471                    let value = parts.next().unwrap();
472
473                    // If ends with peers_blocked, ensure it is zero
474                    if metric.ends_with("_peers_blocked") {
475                        let value = value.parse::<u64>().unwrap();
476                        assert_eq!(value, 0);
477                    }
478
479                    // If ends with contiguous_height, ensure it is at least required_container
480                    if metric.ends_with("_syncer_contiguous_height") {
481                        let value = value.parse::<u64>().unwrap();
482                        if value >= initial_container_required {
483                            success = true;
484                            break;
485                        }
486                    }
487                }
488                if success {
489                    break;
490                }
491
492                // Still waiting for all validators to complete
493                context.sleep(Duration::from_secs(1)).await;
494            }
495
496            // Link first peer
497            link_validators(
498                &mut oracle,
499                &validators,
500                link,
501                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
502            )
503            .await;
504
505            // Configure engine
506            let scheme = schemes[0].clone();
507            let share = shares[0];
508            let public_key = scheme.public_key();
509            let uid = format!("validator-{}", public_key);
510            let config: Config<MockIndexer> = engine::Config {
511                partition_prefix: uid.clone(),
512                signer: scheme.clone(),
513                identity: public.clone(),
514                share,
515                participants: validators.clone(),
516                mailbox_size: 1024,
517                backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
518                leader_timeout: Duration::from_secs(1),
519                notarization_timeout: Duration::from_secs(2),
520                nullify_retry: Duration::from_secs(10),
521                fetch_timeout: Duration::from_secs(1),
522                activity_timeout: 10,
523                max_fetch_count: 10,
524                max_fetch_size: 1024 * 512,
525                fetch_concurrent: 10,
526                fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
527                indexer: None,
528            };
529            let engine = Engine::new(context.with_label(&uid), config).await;
530
531            // Get networking
532            let (voter, resolver, broadcast, backfill) = registrations.remove(&public_key).unwrap();
533
534            // Start engine
535            engine.start(voter, resolver, broadcast, backfill);
536
537            // Poll metrics
538            loop {
539                let metrics = context.encode();
540
541                // Iterate over all lines
542                let mut success = false;
543                for line in metrics.lines() {
544                    // Ensure it is a metrics line
545                    if !line.starts_with("validator-") {
546                        continue;
547                    }
548
549                    // Split metric and value
550                    let mut parts = line.split_whitespace();
551                    let metric = parts.next().unwrap();
552                    let value = parts.next().unwrap();
553
554                    // If ends with peers_blocked, ensure it is zero
555                    if metric.ends_with("_peers_blocked") {
556                        let value = value.parse::<u64>().unwrap();
557                        assert_eq!(value, 0);
558                    }
559
560                    // If ends with contiguous_height, ensure it is at least required_container
561                    if metric.ends_with("_syncer_contiguous_height") {
562                        let value = value.parse::<u64>().unwrap();
563                        if value >= final_container_required {
564                            success = true;
565                            break;
566                        }
567                    }
568                }
569                if success {
570                    break;
571                }
572
573                // Still waiting for all validators to complete
574                context.sleep(Duration::from_secs(1)).await;
575            }
576        });
577    }
578
579    #[test_traced]
580    fn test_unclean_shutdown() {
581        // Create context
582        let n = 5;
583        let threshold = quorum(n).unwrap();
584        let required_container = 100;
585
586        // Derive threshold
587        let mut rng = StdRng::seed_from_u64(0);
588        let (public, shares) = ops::generate_shares(&mut rng, None, n, threshold);
589
590        // Random restarts every x seconds
591        let mut runs = 0;
592        let done = Arc::new(Mutex::new(false));
593        let (mut executor, mut context, _) = Executor::timed(Duration::from_secs(10));
594        while !*done.lock().unwrap() {
595            runs += 1;
596            executor.start({
597                let mut context = context.clone();
598                let public = public.clone();
599                let shares = shares.clone();
600                let done = done.clone();
601                async move {
602                    // Create simulated network
603                    let (network, mut oracle) = Network::new(
604                        context.with_label("network"),
605                        simulated::Config {
606                            max_size: 1024 * 1024,
607                        },
608                    );
609
610                    // Start network
611                    network.start();
612
613                    // Register participants
614                    let mut schemes = Vec::new();
615                    let mut validators = Vec::new();
616                    for i in 0..n {
617                        let scheme = Ed25519::from_seed(i as u64);
618                        let pk = scheme.public_key();
619                        schemes.push(scheme);
620                        validators.push(pk);
621                    }
622                    validators.sort();
623                    schemes.sort_by_key(|s| s.public_key());
624                    let mut registrations = register_validators(&mut oracle, &validators).await;
625
626                    // Link all validators
627                    let link = Link {
628                        latency: 10.0,
629                        jitter: 1.0,
630                        success_rate: 1.0,
631                    };
632                    link_validators(&mut oracle, &validators, link, None).await;
633
634                    // Create instances
635                    let mut public_keys = HashSet::new();
636                    for (idx, scheme) in schemes.into_iter().enumerate() {
637                        // Create scheme context
638                        let public_key = scheme.public_key();
639                        public_keys.insert(public_key.clone());
640
641                        // Configure engine
642                        let uid = format!("validator-{}", public_key);
643                        let config: Config<MockIndexer> = engine::Config {
644                            partition_prefix: uid.clone(),
645                            signer: scheme,
646                            identity: public.clone(),
647                            share: shares[idx],
648                            participants: validators.clone(),
649                            mailbox_size: 1024,
650                            backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
651                            leader_timeout: Duration::from_secs(1),
652                            notarization_timeout: Duration::from_secs(2),
653                            nullify_retry: Duration::from_secs(10),
654                            fetch_timeout: Duration::from_secs(1),
655                            activity_timeout: 10,
656                            max_fetch_count: 10,
657                            max_fetch_size: 1024 * 512,
658                            fetch_concurrent: 10,
659                            fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
660                            indexer: None,
661                        };
662                        let engine = Engine::new(context.with_label(&uid), config).await;
663
664                        // Get networking
665                        let (voter, resolver, broadcast, backfill) =
666                            registrations.remove(&public_key).unwrap();
667
668                        // Start engine
669                        engine.start(voter, resolver, broadcast, backfill);
670                    }
671
672                    // Poll metrics
673                    context
674                        .with_label("metrics")
675                        .spawn(move |context| async move {
676                            loop {
677                                let metrics = context.encode();
678
679                                // Iterate over all lines
680                                let mut success = false;
681                                for line in metrics.lines() {
682                                    // Ensure it is a metrics line
683                                    if !line.starts_with("validator-") {
684                                        continue;
685                                    }
686
687                                    // Split metric and value
688                                    let mut parts = line.split_whitespace();
689                                    let metric = parts.next().unwrap();
690                                    let value = parts.next().unwrap();
691
692                                    // If ends with peers_blocked, ensure it is zero
693                                    if metric.ends_with("_peers_blocked") {
694                                        let value = value.parse::<u64>().unwrap();
695                                        assert_eq!(value, 0);
696                                    }
697
698                                    // If ends with contiguous_height, ensure it is at least required_container
699                                    if metric.ends_with("_syncer_contiguous_height") {
700                                        let value = value.parse::<u64>().unwrap();
701                                        if value >= required_container {
702                                            success = true;
703                                            break;
704                                        }
705                                    }
706                                }
707                                if success {
708                                    break;
709                                }
710
711                                // Still waiting for all validators to complete
712                                context.sleep(Duration::from_millis(10)).await;
713                            }
714                            *done.lock().unwrap() = true;
715                        });
716
717                    // Exit at random points until finished
718                    let wait =
719                        context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
720                    context.sleep(wait).await;
721                }
722            });
723
724            // Recover context
725            (executor, context, _) = context.recover();
726        }
727        assert!(runs > 1);
728        info!(runs, "unclean shutdown recovery worked");
729    }
730
731    #[test_traced]
732    fn test_indexer() {
733        // Create context
734        let n = 5;
735        let threshold = quorum(n).unwrap();
736        let required_container = 10;
737        let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
738        executor.start(async move {
739            // Create simulated network
740            let (network, mut oracle) = Network::new(
741                context.with_label("network"),
742                simulated::Config {
743                    max_size: 1024 * 1024,
744                },
745            );
746
747            // Start network
748            network.start();
749
750            // Register participants
751            let mut schemes = Vec::new();
752            let mut validators = Vec::new();
753            for i in 0..n {
754                let scheme = Ed25519::from_seed(i as u64);
755                let pk = scheme.public_key();
756                schemes.push(scheme);
757                validators.push(pk);
758            }
759            validators.sort();
760            schemes.sort_by_key(|s| s.public_key());
761            let mut registrations = register_validators(&mut oracle, &validators).await;
762
763            // Link all validators
764            let link = Link {
765                latency: 10.0,
766                jitter: 1.0,
767                success_rate: 1.0,
768            };
769            link_validators(&mut oracle, &validators, link, None).await;
770
771            // Derive threshold
772            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
773
774            // Define mock indexer
775            let indexer = MockIndexer::new("", poly::public(&public).into());
776
777            // Create instances
778            let mut public_keys = HashSet::new();
779            for (idx, scheme) in schemes.into_iter().enumerate() {
780                // Create scheme context
781                let public_key = scheme.public_key();
782                public_keys.insert(public_key.clone());
783
784                // Configure engine
785                let uid = format!("validator-{}", public_key);
786                let config: Config<MockIndexer> = engine::Config {
787                    partition_prefix: uid.clone(),
788                    signer: scheme,
789                    identity: public.clone(),
790                    share: shares[idx],
791                    participants: validators.clone(),
792                    mailbox_size: 1024,
793                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
794                    leader_timeout: Duration::from_secs(1),
795                    notarization_timeout: Duration::from_secs(2),
796                    nullify_retry: Duration::from_secs(10),
797                    fetch_timeout: Duration::from_secs(1),
798                    activity_timeout: 10,
799                    max_fetch_count: 10,
800                    max_fetch_size: 1024 * 512,
801                    fetch_concurrent: 10,
802                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
803                    indexer: Some(indexer.clone()),
804                };
805                let engine = Engine::new(context.with_label(&uid), config).await;
806
807                // Get networking
808                let (voter, resolver, broadcast, backfill) =
809                    registrations.remove(&public_key).unwrap();
810
811                // Start engine
812                engine.start(voter, resolver, broadcast, backfill);
813            }
814
815            // Poll metrics
816            loop {
817                let metrics = context.encode();
818
819                // Iterate over all lines
820                let mut success = false;
821                for line in metrics.lines() {
822                    // Ensure it is a metrics line
823                    if !line.starts_with("validator-") {
824                        continue;
825                    }
826
827                    // Split metric and value
828                    let mut parts = line.split_whitespace();
829                    let metric = parts.next().unwrap();
830                    let value = parts.next().unwrap();
831
832                    // If ends with peers_blocked, ensure it is zero
833                    if metric.ends_with("_peers_blocked") {
834                        let value = value.parse::<u64>().unwrap();
835                        assert_eq!(value, 0);
836                    }
837
838                    // If ends with contiguous_height, ensure it is at least required_container
839                    if metric.ends_with("_syncer_contiguous_height") {
840                        let value = value.parse::<u64>().unwrap();
841                        if value >= required_container {
842                            success = true;
843                            break;
844                        }
845                    }
846                }
847                if success {
848                    break;
849                }
850
851                // Still waiting for all validators to complete
852                context.sleep(Duration::from_secs(1)).await;
853            }
854
855            // Check indexer uploads
856            assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed));
857            assert!(indexer
858                .notarization_seen
859                .load(std::sync::atomic::Ordering::Relaxed));
860            assert!(indexer
861                .finalization_seen
862                .load(std::sync::atomic::Ordering::Relaxed));
863        });
864    }
865}