alto_chain/
lib.rs

1use alto_types::{Finalized, Identity, Notarized, Seed};
2use serde::{Deserialize, Serialize};
3use std::{collections::HashMap, future::Future, net::SocketAddr};
4
5pub mod actors;
6pub mod engine;
7
8/// Trait for interacting with an indexer.
9pub trait Indexer: Clone + Send + Sync + 'static {
10    type Error: std::error::Error + Send + Sync + 'static;
11
12    /// Create a new indexer with the given URI and public key.
13    fn new(uri: &str, public: Identity) -> Self;
14
15    /// Upload a seed to the indexer.
16    fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send;
17
18    /// Upload a notarization to the indexer.
19    fn notarized_upload(
20        &self,
21        notarized: Notarized,
22    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
23
24    /// Upload a finalization to the indexer.
25    fn finalized_upload(
26        &self,
27        finalized: Finalized,
28    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
29}
30
31impl Indexer for alto_client::Client {
32    type Error = alto_client::Error;
33
34    fn new(uri: &str, identity: Identity) -> Self {
35        Self::new(uri, identity)
36    }
37
38    fn seed_upload(&self, seed: Seed) -> impl Future<Output = Result<(), Self::Error>> + Send {
39        self.seed_upload(seed)
40    }
41
42    fn notarized_upload(
43        &self,
44        notarized: Notarized,
45    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
46        self.notarized_upload(notarized)
47    }
48
49    fn finalized_upload(
50        &self,
51        finalized: Finalized,
52    ) -> impl Future<Output = Result<(), Self::Error>> + Send {
53        self.finalized_upload(finalized)
54    }
55}
56
57/// Configuration for the engine.
58#[derive(Deserialize, Serialize)]
59pub struct Config {
60    pub private_key: String,
61    pub share: String,
62    pub polynomial: String,
63
64    pub port: u16,
65    pub metrics_port: u16,
66    pub directory: String,
67    pub worker_threads: usize,
68    pub log_level: String,
69
70    pub allowed_peers: Vec<String>,
71    pub bootstrappers: Vec<String>,
72
73    pub message_backlog: usize,
74    pub mailbox_size: usize,
75    pub deque_size: usize,
76
77    pub indexer: Option<String>,
78}
79
80/// A list of peers provided when a validator is run locally.
81///
82/// When run remotely, [commonware_deployer::ec2::Hosts] is used instead.
83#[derive(Deserialize, Serialize)]
84pub struct Peers {
85    pub addresses: HashMap<String, SocketAddr>,
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use alto_types::{Finalized, Notarized};
92    use commonware_cryptography::{
93        bls12381::{
94            dkg::ops,
95            primitives::{poly, variant::MinSig},
96        },
97        ed25519::{PrivateKey, PublicKey},
98        PrivateKeyExt, Signer,
99    };
100    use commonware_macros::{select, test_traced};
101    use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
102    use commonware_runtime::{
103        deterministic::{self, Runner},
104        Clock, Metrics, Runner as _, Spawner,
105    };
106    use commonware_utils::quorum;
107    use engine::{Config, Engine};
108    use governor::Quota;
109    use rand::{rngs::StdRng, Rng, SeedableRng};
110    use std::{
111        collections::{HashMap, HashSet},
112        num::NonZeroU32,
113        sync::Arc,
114    };
115    use std::{sync::atomic::AtomicBool, time::Duration};
116    use tracing::info;
117
118    /// Limit the freezer table size to 1MB because the deterministic runtime stores
119    /// everything in RAM.
120    const FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(14); // 1MB
121
122    /// MockIndexer is a simple indexer implementation for testing.
123    #[derive(Clone)]
124    struct MockIndexer {
125        seed_seen: Arc<AtomicBool>,
126        notarization_seen: Arc<AtomicBool>,
127        finalization_seen: Arc<AtomicBool>,
128    }
129
130    impl Indexer for MockIndexer {
131        type Error = std::io::Error;
132
133        fn new(_: &str, _: Identity) -> Self {
134            MockIndexer {
135                seed_seen: Arc::new(AtomicBool::new(false)),
136                notarization_seen: Arc::new(AtomicBool::new(false)),
137                finalization_seen: Arc::new(AtomicBool::new(false)),
138            }
139        }
140
141        async fn seed_upload(&self, _: Seed) -> Result<(), Self::Error> {
142            self.seed_seen
143                .store(true, std::sync::atomic::Ordering::Relaxed);
144            Ok(())
145        }
146
147        async fn notarized_upload(&self, _: Notarized) -> Result<(), Self::Error> {
148            self.notarization_seen
149                .store(true, std::sync::atomic::Ordering::Relaxed);
150            Ok(())
151        }
152
153        async fn finalized_upload(&self, _: Finalized) -> Result<(), Self::Error> {
154            self.finalization_seen
155                .store(true, std::sync::atomic::Ordering::Relaxed);
156            Ok(())
157        }
158    }
159
160    /// Registers all validators using the oracle.
161    async fn register_validators(
162        oracle: &mut Oracle<PublicKey>,
163        validators: &[PublicKey],
164    ) -> HashMap<
165        PublicKey,
166        (
167            (Sender<PublicKey>, Receiver<PublicKey>),
168            (Sender<PublicKey>, Receiver<PublicKey>),
169            (Sender<PublicKey>, Receiver<PublicKey>),
170            (Sender<PublicKey>, Receiver<PublicKey>),
171            (Sender<PublicKey>, Receiver<PublicKey>),
172        ),
173    > {
174        let mut registrations = HashMap::new();
175        for validator in validators.iter() {
176            let (pending_sender, pending_receiver) =
177                oracle.register(validator.clone(), 0).await.unwrap();
178            let (recovered_sender, recovered_receiver) =
179                oracle.register(validator.clone(), 1).await.unwrap();
180            let (resolver_sender, resolver_receiver) =
181                oracle.register(validator.clone(), 2).await.unwrap();
182            let (broadcast_sender, broadcast_receiver) =
183                oracle.register(validator.clone(), 3).await.unwrap();
184            let (backfill_sender, backfill_receiver) =
185                oracle.register(validator.clone(), 4).await.unwrap();
186            registrations.insert(
187                validator.clone(),
188                (
189                    (pending_sender, pending_receiver),
190                    (recovered_sender, recovered_receiver),
191                    (resolver_sender, resolver_receiver),
192                    (broadcast_sender, broadcast_receiver),
193                    (backfill_sender, backfill_receiver),
194                ),
195            );
196        }
197        registrations
198    }
199
200    /// Links (or unlinks) validators using the oracle.
201    ///
202    /// The `action` parameter determines the action (e.g. link, unlink) to take.
203    /// The `restrict_to` function can be used to restrict the linking to certain connections,
204    /// otherwise all validators will be linked to all other validators.
205    async fn link_validators(
206        oracle: &mut Oracle<PublicKey>,
207        validators: &[PublicKey],
208        link: Link,
209        restrict_to: Option<fn(usize, usize, usize) -> bool>,
210    ) {
211        for (i1, v1) in validators.iter().enumerate() {
212            for (i2, v2) in validators.iter().enumerate() {
213                // Ignore self
214                if v2 == v1 {
215                    continue;
216                }
217
218                // Restrict to certain connections
219                if let Some(f) = restrict_to {
220                    if !f(validators.len(), i1, i2) {
221                        continue;
222                    }
223                }
224
225                // Add link
226                oracle
227                    .add_link(v1.clone(), v2.clone(), link.clone())
228                    .await
229                    .unwrap();
230            }
231        }
232    }
233
234    fn all_online(n: u32, seed: u64, link: Link, required: u64) -> String {
235        // Create context
236        let threshold = quorum(n);
237        let cfg = deterministic::Config::default().with_seed(seed);
238        let executor = Runner::from(cfg);
239        executor.start(|mut context| async move {
240            // Create simulated network
241            let (network, mut oracle) = Network::new(
242                context.with_label("network"),
243                simulated::Config {
244                    max_size: 1024 * 1024,
245                },
246            );
247
248            // Start network
249            network.start();
250
251            // Register participants
252            let mut signers = Vec::new();
253            let mut validators = Vec::new();
254            for i in 0..n {
255                let signer = PrivateKey::from_seed(i as u64);
256                let pk = signer.public_key();
257                signers.push(signer);
258                validators.push(pk);
259            }
260            validators.sort();
261            signers.sort_by_key(|s| s.public_key());
262            let mut registrations = register_validators(&mut oracle, &validators).await;
263
264            // Link all validators
265            link_validators(&mut oracle, &validators, link, None).await;
266
267            // Derive threshold
268            let (polynomial, shares) =
269                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
270
271            // Create instances
272            let mut public_keys = HashSet::new();
273            for (idx, signer) in signers.into_iter().enumerate() {
274                // Create signer context
275                let public_key = signer.public_key();
276                public_keys.insert(public_key.clone());
277
278                // Configure engine
279                let uid = format!("validator-{public_key}");
280                let config: Config<_, MockIndexer> = engine::Config {
281                    blocker: oracle.control(public_key.clone()),
282                    partition_prefix: uid.clone(),
283                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
284                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
285                    signer,
286                    polynomial: polynomial.clone(),
287                    share: shares[idx].clone(),
288                    participants: validators.clone(),
289                    mailbox_size: 1024,
290                    deque_size: 10,
291                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
292                    leader_timeout: Duration::from_secs(1),
293                    notarization_timeout: Duration::from_secs(2),
294                    nullify_retry: Duration::from_secs(10),
295                    fetch_timeout: Duration::from_secs(1),
296                    activity_timeout: 10,
297                    skip_timeout: 5,
298                    max_fetch_count: 10,
299                    max_fetch_size: 1024 * 512,
300                    fetch_concurrent: 10,
301                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
302                    indexer: None,
303                };
304                let engine = Engine::new(context.with_label(&uid), config).await;
305
306                // Get networking
307                let (pending, recovered, resolver, broadcast, backfill) =
308                    registrations.remove(&public_key).unwrap();
309
310                // Start engine
311                engine.start(pending, recovered, resolver, broadcast, backfill);
312            }
313
314            // Poll metrics
315            loop {
316                let metrics = context.encode();
317
318                // Iterate over all lines
319                let mut success = false;
320                for line in metrics.lines() {
321                    // Ensure it is a metrics line
322                    if !line.starts_with("validator-") {
323                        continue;
324                    }
325
326                    // Split metric and value
327                    let mut parts = line.split_whitespace();
328                    let metric = parts.next().unwrap();
329                    let value = parts.next().unwrap();
330
331                    // If ends with peers_blocked, ensure it is zero
332                    if metric.ends_with("_peers_blocked") {
333                        let value = value.parse::<u64>().unwrap();
334                        assert_eq!(value, 0);
335                    }
336
337                    // If ends with contiguous_height, ensure it is at least required_container
338                    if metric.ends_with("_syncer_contiguous_height") {
339                        let value = value.parse::<u64>().unwrap();
340                        if value >= required {
341                            success = true;
342                            break;
343                        }
344                    }
345                }
346                if success {
347                    break;
348                }
349
350                // Still waiting for all validators to complete
351                context.sleep(Duration::from_secs(1)).await;
352            }
353            context.auditor().state()
354        })
355    }
356
357    #[test_traced]
358    fn test_good_links() {
359        let link = Link {
360            latency: 10.0,
361            jitter: 1.0,
362            success_rate: 1.0,
363        };
364        for seed in 0..5 {
365            let state = all_online(5, seed, link.clone(), 25);
366            assert_eq!(state, all_online(5, seed, link.clone(), 25));
367        }
368    }
369
370    #[test_traced]
371    fn test_bad_links() {
372        let link = Link {
373            latency: 200.0,
374            jitter: 150.0,
375            success_rate: 0.75,
376        };
377        for seed in 0..5 {
378            let state = all_online(5, seed, link.clone(), 25);
379            assert_eq!(state, all_online(5, seed, link.clone(), 25));
380        }
381    }
382
383    #[test_traced]
384    fn test_1k() {
385        let link = Link {
386            latency: 80.0,
387            jitter: 10.0,
388            success_rate: 0.98,
389        };
390        all_online(10, 0, link.clone(), 1000);
391    }
392
393    #[test_traced]
394    fn test_backfill() {
395        // Create context
396        let n = 5;
397        let threshold = quorum(n);
398        let initial_container_required = 10;
399        let final_container_required = 20;
400        let executor = Runner::timed(Duration::from_secs(30));
401        executor.start(|mut context| async move {
402            // Create simulated network
403            let (network, mut oracle) = Network::new(
404                context.with_label("network"),
405                simulated::Config {
406                    max_size: 1024 * 1024,
407                },
408            );
409
410            // Start network
411            network.start();
412
413            // Register participants
414            let mut signers = Vec::new();
415            let mut validators = Vec::new();
416            for i in 0..n {
417                let signer = PrivateKey::from_seed(i as u64);
418                let pk = signer.public_key();
419                signers.push(signer);
420                validators.push(pk);
421            }
422            validators.sort();
423            signers.sort_by_key(|s| s.public_key());
424            let mut registrations = register_validators(&mut oracle, &validators).await;
425
426            // Link all validators (except 0)
427            let link = Link {
428                latency: 10.0,
429                jitter: 1.0,
430                success_rate: 1.0,
431            };
432            link_validators(
433                &mut oracle,
434                &validators,
435                link.clone(),
436                Some(|_, i, j| ![i, j].contains(&0usize)),
437            )
438            .await;
439
440            // Derive threshold
441            let (polynomial, shares) =
442                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
443
444            // Create instances
445            for (idx, signer) in signers.iter().enumerate() {
446                // Skip first
447                if idx == 0 {
448                    continue;
449                }
450
451                // Configure engine
452                let public_key = signer.public_key();
453                let uid = format!("validator-{public_key}");
454                let config: Config<_, MockIndexer> = engine::Config {
455                    blocker: oracle.control(public_key.clone()),
456                    partition_prefix: uid.clone(),
457                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
458                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
459                    signer: signer.clone(),
460                    polynomial: polynomial.clone(),
461                    share: shares[idx].clone(),
462                    participants: validators.clone(),
463                    mailbox_size: 1024,
464                    deque_size: 10,
465                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
466                    leader_timeout: Duration::from_secs(1),
467                    notarization_timeout: Duration::from_secs(2),
468                    nullify_retry: Duration::from_secs(10),
469                    fetch_timeout: Duration::from_secs(1),
470                    activity_timeout: 10,
471                    skip_timeout: 5,
472                    max_fetch_count: 10,
473                    max_fetch_size: 1024 * 512,
474                    fetch_concurrent: 10,
475                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
476                    indexer: None,
477                };
478                let engine = Engine::new(context.with_label(&uid), config).await;
479
480                // Get networking
481                let (pending, recovered, resolver, broadcast, backfill) =
482                    registrations.remove(&public_key).unwrap();
483
484                // Start engine
485                engine.start(pending, recovered, resolver, broadcast, backfill);
486            }
487
488            // Poll metrics
489            loop {
490                let metrics = context.encode();
491
492                // Iterate over all lines
493                let mut success = true;
494                for line in metrics.lines() {
495                    // Ensure it is a metrics line
496                    if !line.starts_with("validator-") {
497                        continue;
498                    }
499
500                    // Split metric and value
501                    let mut parts = line.split_whitespace();
502                    let metric = parts.next().unwrap();
503                    let value = parts.next().unwrap();
504
505                    // If ends with peers_blocked, ensure it is zero
506                    if metric.ends_with("_peers_blocked") {
507                        let value = value.parse::<u64>().unwrap();
508                        assert_eq!(value, 0);
509                    }
510
511                    // If ends with contiguous_height, ensure it is at least required_container
512                    if metric.ends_with("_syncer_contiguous_height") {
513                        let value = value.parse::<u64>().unwrap();
514                        if value >= initial_container_required {
515                            success = true;
516                            break;
517                        }
518                    }
519                }
520                if success {
521                    break;
522                }
523
524                // Still waiting for all validators to complete
525                context.sleep(Duration::from_secs(1)).await;
526            }
527
528            // Link first peer
529            link_validators(
530                &mut oracle,
531                &validators,
532                link,
533                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
534            )
535            .await;
536
537            // Configure engine
538            let signer = signers[0].clone();
539            let share = shares[0].clone();
540            let public_key = signer.public_key();
541            let uid = format!("validator-{public_key}");
542            let config: Config<_, MockIndexer> = engine::Config {
543                blocker: oracle.control(public_key.clone()),
544                partition_prefix: uid.clone(),
545                blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
546                finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
547                signer: signer.clone(),
548                polynomial: polynomial.clone(),
549                share,
550                participants: validators.clone(),
551                mailbox_size: 1024,
552                deque_size: 10,
553                backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
554                leader_timeout: Duration::from_secs(1),
555                notarization_timeout: Duration::from_secs(2),
556                nullify_retry: Duration::from_secs(10),
557                fetch_timeout: Duration::from_secs(1),
558                activity_timeout: 10,
559                skip_timeout: 5,
560                max_fetch_count: 10,
561                max_fetch_size: 1024 * 512,
562                fetch_concurrent: 10,
563                fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
564                indexer: None,
565            };
566            let engine = Engine::new(context.with_label(&uid), config).await;
567
568            // Get networking
569            let (pending, recovered, resolver, broadcast, backfill) =
570                registrations.remove(&public_key).unwrap();
571
572            // Start engine
573            engine.start(pending, recovered, resolver, broadcast, backfill);
574
575            // Poll metrics
576            loop {
577                let metrics = context.encode();
578
579                // Iterate over all lines
580                let mut success = false;
581                for line in metrics.lines() {
582                    // Ensure it is a metrics line
583                    if !line.starts_with("validator-") {
584                        continue;
585                    }
586
587                    // Split metric and value
588                    let mut parts = line.split_whitespace();
589                    let metric = parts.next().unwrap();
590                    let value = parts.next().unwrap();
591
592                    // If ends with peers_blocked, ensure it is zero
593                    if metric.ends_with("_peers_blocked") {
594                        let value = value.parse::<u64>().unwrap();
595                        assert_eq!(value, 0);
596                    }
597
598                    // If ends with contiguous_height, ensure it is at least required_container
599                    if metric.ends_with("_syncer_contiguous_height") {
600                        let value = value.parse::<u64>().unwrap();
601                        if value >= final_container_required {
602                            success = true;
603                            break;
604                        }
605                    }
606                }
607                if success {
608                    break;
609                }
610
611                // Still waiting for all validators to complete
612                context.sleep(Duration::from_secs(1)).await;
613            }
614        });
615    }
616
617    #[test_traced]
618    fn test_unclean_shutdown() {
619        // Create context
620        let n = 5;
621        let threshold = quorum(n);
622        let required_container = 100;
623
624        // Derive threshold
625        let mut rng = StdRng::seed_from_u64(0);
626        let (polynomial, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, n, threshold);
627
628        // Random restarts every x seconds
629        let mut runs = 0;
630        let mut prev_ctx = None;
631        loop {
632            // Setup run
633            let polynomial = polynomial.clone();
634            let shares = shares.clone();
635            let f = |mut context: deterministic::Context| async move {
636                // Create simulated network
637                let (network, mut oracle) = Network::new(
638                    context.with_label("network"),
639                    simulated::Config {
640                        max_size: 1024 * 1024,
641                    },
642                );
643
644                // Start network
645                network.start();
646
647                // Register participants
648                let mut signers = Vec::new();
649                let mut validators = Vec::new();
650                for i in 0..n {
651                    let signer = PrivateKey::from_seed(i as u64);
652                    let pk = signer.public_key();
653                    signers.push(signer);
654                    validators.push(pk);
655                }
656                validators.sort();
657                signers.sort_by_key(|s| s.public_key());
658                let mut registrations = register_validators(&mut oracle, &validators).await;
659
660                // Link all validators
661                let link = Link {
662                    latency: 10.0,
663                    jitter: 1.0,
664                    success_rate: 1.0,
665                };
666                link_validators(&mut oracle, &validators, link, None).await;
667
668                // Create instances
669                let mut public_keys = HashSet::new();
670                for (idx, signer) in signers.into_iter().enumerate() {
671                    // Create signer context
672                    let public_key = signer.public_key();
673                    public_keys.insert(public_key.clone());
674
675                    // Configure engine
676                    let uid = format!("validator-{public_key}");
677                    let config: Config<_, MockIndexer> = engine::Config {
678                        blocker: oracle.control(public_key.clone()),
679                        partition_prefix: uid.clone(),
680                        blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
681                        finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
682                        signer,
683                        polynomial: polynomial.clone(),
684                        share: shares[idx].clone(),
685                        participants: validators.clone(),
686                        mailbox_size: 1024,
687                        deque_size: 10,
688                        backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
689                        leader_timeout: Duration::from_secs(1),
690                        notarization_timeout: Duration::from_secs(2),
691                        nullify_retry: Duration::from_secs(10),
692                        fetch_timeout: Duration::from_secs(1),
693                        activity_timeout: 10,
694                        skip_timeout: 5,
695                        max_fetch_count: 10,
696                        max_fetch_size: 1024 * 512,
697                        fetch_concurrent: 10,
698                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
699                        indexer: None,
700                    };
701                    let engine = Engine::new(context.with_label(&uid), config).await;
702
703                    // Get networking
704                    let (pending, recovered, resolver, broadcast, backfill) =
705                        registrations.remove(&public_key).unwrap();
706
707                    // Start engine
708                    engine.start(pending, recovered, resolver, broadcast, backfill);
709                }
710
711                // Poll metrics
712                let poller = context
713                    .with_label("metrics")
714                    .spawn(move |context| async move {
715                        loop {
716                            let metrics = context.encode();
717
718                            // Iterate over all lines
719                            let mut success = false;
720                            for line in metrics.lines() {
721                                // Ensure it is a metrics line
722                                if !line.starts_with("validator-") {
723                                    continue;
724                                }
725
726                                // Split metric and value
727                                let mut parts = line.split_whitespace();
728                                let metric = parts.next().unwrap();
729                                let value = parts.next().unwrap();
730
731                                // If ends with peers_blocked, ensure it is zero
732                                if metric.ends_with("_peers_blocked") {
733                                    let value = value.parse::<u64>().unwrap();
734                                    assert_eq!(value, 0);
735                                }
736
737                                // If ends with contiguous_height, ensure it is at least required_container
738                                if metric.ends_with("_syncer_contiguous_height") {
739                                    let value = value.parse::<u64>().unwrap();
740                                    if value >= required_container {
741                                        success = true;
742                                        break;
743                                    }
744                                }
745                            }
746                            if success {
747                                break;
748                            }
749
750                            // Still waiting for all validators to complete
751                            context.sleep(Duration::from_millis(10)).await;
752                        }
753                    });
754
755                // Exit at random points until finished
756                let wait =
757                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
758
759                // Wait for one to finish
760                select! {
761                    _ = poller => {
762                        // Finished
763                        (true, context)
764                    },
765                    _ = context.sleep(wait) => {
766                        // Randomly exit
767                        (false, context)
768                    }
769                }
770            };
771
772            // Handle run
773            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
774                Runner::from(prev_ctx)
775            } else {
776                Runner::timed(Duration::from_secs(30))
777            }
778            .start(f);
779            if complete {
780                break;
781            }
782
783            // Prepare for next run
784            prev_ctx = Some(context.recover());
785            runs += 1;
786        }
787        assert!(runs > 1);
788        info!(runs, "unclean shutdown recovery worked");
789    }
790
791    #[test_traced]
792    fn test_indexer() {
793        // Create context
794        let n = 5;
795        let threshold = quorum(n);
796        let required_container = 10;
797        let executor = Runner::timed(Duration::from_secs(30));
798        executor.start(|mut context| async move {
799            // Create simulated network
800            let (network, mut oracle) = Network::new(
801                context.with_label("network"),
802                simulated::Config {
803                    max_size: 1024 * 1024,
804                },
805            );
806
807            // Start network
808            network.start();
809
810            // Register participants
811            let mut signers = Vec::new();
812            let mut validators = Vec::new();
813            for i in 0..n {
814                let signer = PrivateKey::from_seed(i as u64);
815                let pk = signer.public_key();
816                signers.push(signer);
817                validators.push(pk);
818            }
819            validators.sort();
820            signers.sort_by_key(|s| s.public_key());
821            let mut registrations = register_validators(&mut oracle, &validators).await;
822
823            // Link all validators
824            let link = Link {
825                latency: 10.0,
826                jitter: 1.0,
827                success_rate: 1.0,
828            };
829            link_validators(&mut oracle, &validators, link, None).await;
830
831            // Derive threshold
832            let (polynomial, shares) =
833                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
834            let identity = *poly::public::<MinSig>(&polynomial);
835
836            // Define mock indexer
837            let indexer = MockIndexer::new("", identity);
838
839            // Create instances
840            let mut public_keys = HashSet::new();
841            for (idx, signer) in signers.into_iter().enumerate() {
842                // Create signer context
843                let public_key = signer.public_key();
844                public_keys.insert(public_key.clone());
845
846                // Configure engine
847                let uid = format!("validator-{public_key}");
848                let config: Config<_, MockIndexer> = engine::Config {
849                    blocker: oracle.control(public_key.clone()),
850                    partition_prefix: uid.clone(),
851                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
852                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
853                    signer,
854                    polynomial: polynomial.clone(),
855                    share: shares[idx].clone(),
856                    participants: validators.clone(),
857                    mailbox_size: 1024,
858                    deque_size: 10,
859                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
860                    leader_timeout: Duration::from_secs(1),
861                    notarization_timeout: Duration::from_secs(2),
862                    nullify_retry: Duration::from_secs(10),
863                    fetch_timeout: Duration::from_secs(1),
864                    activity_timeout: 10,
865                    skip_timeout: 5,
866                    max_fetch_count: 10,
867                    max_fetch_size: 1024 * 512,
868                    fetch_concurrent: 10,
869                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
870                    indexer: Some(indexer.clone()),
871                };
872                let engine = Engine::new(context.with_label(&uid), config).await;
873
874                // Get networking
875                let (pending, recovered, resolver, broadcast, backfill) =
876                    registrations.remove(&public_key).unwrap();
877
878                // Start engine
879                engine.start(pending, recovered, resolver, broadcast, backfill);
880            }
881
882            // Poll metrics
883            loop {
884                let metrics = context.encode();
885
886                // Iterate over all lines
887                let mut success = false;
888                for line in metrics.lines() {
889                    // Ensure it is a metrics line
890                    if !line.starts_with("validator-") {
891                        continue;
892                    }
893
894                    // Split metric and value
895                    let mut parts = line.split_whitespace();
896                    let metric = parts.next().unwrap();
897                    let value = parts.next().unwrap();
898
899                    // If ends with peers_blocked, ensure it is zero
900                    if metric.ends_with("_peers_blocked") {
901                        let value = value.parse::<u64>().unwrap();
902                        assert_eq!(value, 0);
903                    }
904
905                    // If ends with contiguous_height, ensure it is at least required_container
906                    if metric.ends_with("_syncer_contiguous_height") {
907                        let value = value.parse::<u64>().unwrap();
908                        if value >= required_container {
909                            success = true;
910                            break;
911                        }
912                    }
913                }
914                if success {
915                    break;
916                }
917
918                // Still waiting for all validators to complete
919                context.sleep(Duration::from_secs(1)).await;
920            }
921
922            // Check indexer uploads
923            assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed));
924            assert!(indexer
925                .notarization_seen
926                .load(std::sync::atomic::Ordering::Relaxed));
927            assert!(indexer
928                .finalization_seen
929                .load(std::sync::atomic::Ordering::Relaxed));
930        });
931    }
932}