alto_chain/
lib.rs

1use alto_types::Scheme;
2use commonware_consensus::marshal::SchemeProvider;
3use serde::{Deserialize, Serialize};
4use std::{collections::HashMap, net::SocketAddr, sync::Arc};
5
6pub mod application;
7pub mod engine;
8pub mod indexer;
9pub mod utils;
10
11/// Configuration for the [engine::Engine].
12#[derive(Deserialize, Serialize)]
13pub struct Config {
14    pub private_key: String,
15    pub share: String,
16    pub polynomial: String,
17
18    pub port: u16,
19    pub metrics_port: u16,
20    pub directory: String,
21    pub worker_threads: usize,
22    pub log_level: String,
23
24    pub local: bool,
25    pub allowed_peers: Vec<String>,
26    pub bootstrappers: Vec<String>,
27
28    pub message_backlog: usize,
29    pub mailbox_size: usize,
30    pub deque_size: usize,
31
32    pub indexer: Option<String>,
33}
34
35/// A list of peers provided when a validator is run locally.
36///
37/// When run remotely, [commonware_deployer::ec2::Hosts] is used instead.
38#[derive(Deserialize, Serialize)]
39pub struct Peers {
40    pub addresses: HashMap<String, SocketAddr>,
41}
42
43/// A static provider that always returns the same signing scheme.
44#[derive(Clone)]
45pub struct StaticSchemeProvider(Arc<Scheme>);
46
47impl SchemeProvider for StaticSchemeProvider {
48    type Scheme = Scheme;
49
50    fn scheme(&self, _epoch: u64) -> Option<Arc<Scheme>> {
51        Some(self.0.clone())
52    }
53}
54
55impl From<Scheme> for StaticSchemeProvider {
56    fn from(scheme: Scheme) -> Self {
57        Self(Arc::new(scheme))
58    }
59}
60
61#[cfg(test)]
62mod tests {
63    use super::*;
64    use commonware_consensus::marshal;
65    use commonware_cryptography::{
66        bls12381::{
67            dkg::ops,
68            primitives::{poly, variant::MinSig},
69        },
70        ed25519::{PrivateKey, PublicKey},
71        PrivateKeyExt, Signer,
72    };
73    use commonware_macros::{select, test_traced};
74    use commonware_p2p::{
75        simulated::{self, Link, Network, Oracle, Receiver, Sender},
76        utils::requester,
77        Manager,
78    };
79    use commonware_runtime::{
80        deterministic::{self, Runner},
81        Clock, Metrics, Runner as _, Spawner,
82    };
83    use commonware_utils::quorum;
84    use engine::{Config, Engine};
85    use governor::Quota;
86    use indexer::{Indexer, Mock};
87    use rand::{rngs::StdRng, Rng, SeedableRng};
88    use std::{
89        collections::{HashMap, HashSet},
90        num::NonZeroU32,
91        time::Duration,
92    };
93    use tracing::info;
94
95    /// Limit the freezer table size to 1MB because the deterministic runtime stores
96    /// everything in RAM.
97    const FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(14); // 1MB
98
99    /// Registers all validators using the oracle.
100    async fn register_validators(
101        oracle: &mut Oracle<PublicKey>,
102        validators: &[PublicKey],
103    ) -> HashMap<
104        PublicKey,
105        (
106            (Sender<PublicKey>, Receiver<PublicKey>),
107            (Sender<PublicKey>, Receiver<PublicKey>),
108            (Sender<PublicKey>, Receiver<PublicKey>),
109            (Sender<PublicKey>, Receiver<PublicKey>),
110            (Sender<PublicKey>, Receiver<PublicKey>),
111        ),
112    > {
113        oracle.update(0, validators.into()).await;
114        let mut registrations = HashMap::new();
115        for validator in validators.iter() {
116            let mut oracle = oracle.control(validator.clone());
117            let (pending_sender, pending_receiver) = oracle.register(0).await.unwrap();
118            let (recovered_sender, recovered_receiver) = oracle.register(1).await.unwrap();
119            let (resolver_sender, resolver_receiver) = oracle.register(2).await.unwrap();
120            let (broadcast_sender, broadcast_receiver) = oracle.register(3).await.unwrap();
121            let (backfill_sender, backfill_receiver) = oracle.register(4).await.unwrap();
122            registrations.insert(
123                validator.clone(),
124                (
125                    (pending_sender, pending_receiver),
126                    (recovered_sender, recovered_receiver),
127                    (resolver_sender, resolver_receiver),
128                    (broadcast_sender, broadcast_receiver),
129                    (backfill_sender, backfill_receiver),
130                ),
131            );
132        }
133        registrations
134    }
135
136    /// Links (or unlinks) validators using the oracle.
137    ///
138    /// The `action` parameter determines the action (e.g. link, unlink) to take.
139    /// The `restrict_to` function can be used to restrict the linking to certain connections,
140    /// otherwise all validators will be linked to all other validators.
141    async fn link_validators(
142        oracle: &mut Oracle<PublicKey>,
143        validators: &[PublicKey],
144        link: Link,
145        restrict_to: Option<fn(usize, usize, usize) -> bool>,
146    ) {
147        for (i1, v1) in validators.iter().enumerate() {
148            for (i2, v2) in validators.iter().enumerate() {
149                // Ignore self
150                if v2 == v1 {
151                    continue;
152                }
153
154                // Restrict to certain connections
155                if let Some(f) = restrict_to {
156                    if !f(validators.len(), i1, i2) {
157                        continue;
158                    }
159                }
160
161                // Add link
162                oracle
163                    .add_link(v1.clone(), v2.clone(), link.clone())
164                    .await
165                    .unwrap();
166            }
167        }
168    }
169
170    fn all_online(n: u32, seed: u64, link: Link, required: u64) -> String {
171        // Create context
172        let threshold = quorum(n);
173        let cfg = deterministic::Config::default().with_seed(seed);
174        let executor = Runner::from(cfg);
175        executor.start(|mut context| async move {
176            // Create simulated network
177            let (network, mut oracle) = Network::new(
178                context.with_label("network"),
179                simulated::Config {
180                    max_size: 1024 * 1024,
181                    disconnect_on_block: true,
182                    tracked_peer_sets: Some(1),
183                },
184            );
185
186            // Start network
187            network.start();
188
189            // Register participants
190            let mut signers = Vec::new();
191            let mut validators = Vec::new();
192            for i in 0..n {
193                let signer = PrivateKey::from_seed(i as u64);
194                let pk = signer.public_key();
195                signers.push(signer);
196                validators.push(pk);
197            }
198            validators.sort();
199            signers.sort_by_key(|s| s.public_key());
200            let mut registrations = register_validators(&mut oracle, &validators).await;
201
202            // Link all validators
203            link_validators(&mut oracle, &validators, link, None).await;
204
205            // Derive threshold
206            let (polynomial, shares) =
207                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
208
209            // Create instances
210            let mut public_keys = HashSet::new();
211            for (idx, signer) in signers.into_iter().enumerate() {
212                // Create signer context
213                let public_key = signer.public_key();
214                public_keys.insert(public_key.clone());
215
216                // Configure engine
217                let uid = format!("validator-{public_key}");
218                let config: Config<_, Mock> = engine::Config {
219                    blocker: oracle.control(public_key.clone()),
220                    partition_prefix: uid.clone(),
221                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
222                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
223                    me: signer.public_key(),
224                    polynomial: polynomial.clone(),
225                    share: shares[idx].clone(),
226                    participants: validators.clone().into(),
227                    mailbox_size: 1024,
228                    deque_size: 10,
229                    leader_timeout: Duration::from_secs(1),
230                    notarization_timeout: Duration::from_secs(2),
231                    nullify_retry: Duration::from_secs(10),
232                    fetch_timeout: Duration::from_secs(1),
233                    activity_timeout: 10,
234                    skip_timeout: 5,
235                    max_fetch_count: 10,
236                    max_fetch_size: 1024 * 512,
237                    fetch_concurrent: 10,
238                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
239                    indexer: None,
240                };
241                let engine = Engine::new(context.with_label(&uid), config).await;
242
243                // Get networking
244                let (pending, recovered, resolver, broadcast, backfill) =
245                    registrations.remove(&public_key).unwrap();
246
247                // Configure marshal resolver
248                let marshal_resolver_cfg = marshal::resolver::p2p::Config {
249                    public_key: public_key.clone(),
250                    manager: oracle.clone(),
251                    mailbox_size: 1024,
252                    requester_config: requester::Config {
253                        me: Some(public_key.clone()),
254                        rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
255                        initial: Duration::from_secs(1),
256                        timeout: Duration::from_secs(2),
257                    },
258                    fetch_retry_timeout: Duration::from_millis(100),
259                    priority_requests: false,
260                    priority_responses: false,
261                };
262
263                let marshal_resolver =
264                    marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
265
266                // Start engine
267                engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
268            }
269
270            // Poll metrics
271            loop {
272                let metrics = context.encode();
273
274                // Iterate over all lines
275                let mut success = false;
276                for line in metrics.lines() {
277                    // Ensure it is a metrics line
278                    if !line.starts_with("validator-") {
279                        continue;
280                    }
281
282                    // Split metric and value
283                    let mut parts = line.split_whitespace();
284                    let metric = parts.next().unwrap();
285                    let value = parts.next().unwrap();
286
287                    // If ends with peers_blocked, ensure it is zero
288                    if metric.ends_with("_peers_blocked") {
289                        let value = value.parse::<u64>().unwrap();
290                        assert_eq!(value, 0);
291                    }
292
293                    // If ends with contiguous_height, ensure it is at least required_container
294                    if metric.ends_with("_marshal_processed_height") {
295                        let value = value.parse::<u64>().unwrap();
296                        if value >= required {
297                            success = true;
298                            break;
299                        }
300                    }
301                }
302                if success {
303                    break;
304                }
305
306                // Still waiting for all validators to complete
307                context.sleep(Duration::from_secs(1)).await;
308            }
309            context.auditor().state()
310        })
311    }
312
313    #[test_traced]
314    fn test_good_links() {
315        let link = Link {
316            latency: Duration::from_millis(10),
317            jitter: Duration::from_millis(1),
318            success_rate: 1.0,
319        };
320        for seed in 0..5 {
321            let state = all_online(5, seed, link.clone(), 25);
322            assert_eq!(state, all_online(5, seed, link.clone(), 25));
323        }
324    }
325
326    #[test_traced]
327    fn test_bad_links() {
328        let link = Link {
329            latency: Duration::from_millis(200),
330            jitter: Duration::from_millis(150),
331            success_rate: 0.75,
332        };
333        for seed in 0..5 {
334            let state = all_online(5, seed, link.clone(), 25);
335            assert_eq!(state, all_online(5, seed, link.clone(), 25));
336        }
337    }
338
339    #[test_traced]
340    fn test_1k() {
341        let link = Link {
342            latency: Duration::from_millis(80),
343            jitter: Duration::from_millis(10),
344            success_rate: 0.98,
345        };
346        all_online(10, 0, link.clone(), 1000);
347    }
348
349    #[test_traced]
350    fn test_backfill() {
351        // Create context
352        let n = 5;
353        let threshold = quorum(n);
354        let initial_container_required = 10;
355        let final_container_required = 20;
356        let executor = Runner::timed(Duration::from_secs(30));
357        executor.start(|mut context| async move {
358            // Create simulated network
359            let (network, mut oracle) = Network::new(
360                context.with_label("network"),
361                simulated::Config {
362                    max_size: 1024 * 1024,
363                    disconnect_on_block: true,
364                    tracked_peer_sets: Some(1),
365                },
366            );
367
368            // Start network
369            network.start();
370
371            // Register participants
372            let mut signers = Vec::new();
373            let mut validators = Vec::new();
374            for i in 0..n {
375                let signer = PrivateKey::from_seed(i as u64);
376                let pk = signer.public_key();
377                signers.push(signer);
378                validators.push(pk);
379            }
380            validators.sort();
381            signers.sort_by_key(|s| s.public_key());
382            let mut registrations = register_validators(&mut oracle, &validators).await;
383
384            // Link all validators (except 0)
385            let link = Link {
386                latency: Duration::from_millis(10),
387                jitter: Duration::from_millis(1),
388                success_rate: 1.0,
389            };
390            link_validators(
391                &mut oracle,
392                &validators,
393                link.clone(),
394                Some(|_, i, j| ![i, j].contains(&0usize)),
395            )
396            .await;
397
398            // Derive threshold
399            let (polynomial, shares) =
400                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
401
402            // Create instances
403            for (idx, signer) in signers.iter().enumerate() {
404                // Skip first
405                if idx == 0 {
406                    continue;
407                }
408
409                // Configure engine
410                let public_key = signer.public_key();
411                let uid = format!("validator-{public_key}");
412                let config: Config<_, Mock> = engine::Config {
413                    blocker: oracle.control(public_key.clone()),
414                    partition_prefix: uid.clone(),
415                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
416                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
417                    me: signer.public_key(),
418                    polynomial: polynomial.clone(),
419                    share: shares[idx].clone(),
420                    participants: validators.clone().into(),
421                    mailbox_size: 1024,
422                    deque_size: 10,
423                    leader_timeout: Duration::from_secs(1),
424                    notarization_timeout: Duration::from_secs(2),
425                    nullify_retry: Duration::from_secs(10),
426                    fetch_timeout: Duration::from_secs(1),
427                    activity_timeout: 10,
428                    skip_timeout: 5,
429                    max_fetch_count: 10,
430                    max_fetch_size: 1024 * 512,
431                    fetch_concurrent: 10,
432                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
433                    indexer: None,
434                };
435                let engine = Engine::new(context.with_label(&uid), config).await;
436
437                // Get networking
438                let (pending, recovered, resolver, broadcast, backfill) =
439                    registrations.remove(&public_key).unwrap();
440
441                // Configure marshal resolver
442                let marshal_resolver_cfg = marshal::resolver::p2p::Config {
443                    public_key: public_key.clone(),
444                    manager: oracle.clone(),
445                    mailbox_size: 1024,
446                    requester_config: requester::Config {
447                        me: Some(public_key.clone()),
448                        rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
449                        initial: Duration::from_secs(1),
450                        timeout: Duration::from_secs(2),
451                    },
452                    fetch_retry_timeout: Duration::from_millis(100),
453                    priority_requests: false,
454                    priority_responses: false,
455                };
456
457                let marshal_resolver =
458                    marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
459
460                // Start engine
461                engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
462            }
463
464            // Poll metrics
465            loop {
466                let metrics = context.encode();
467
468                // Iterate over all lines
469                let mut success = false;
470                for line in metrics.lines() {
471                    // Ensure it is a metrics line
472                    if !line.starts_with("validator-") {
473                        continue;
474                    }
475
476                    // Split metric and value
477                    let mut parts = line.split_whitespace();
478                    let metric = parts.next().unwrap();
479                    let value = parts.next().unwrap();
480
481                    // If ends with peers_blocked, ensure it is zero
482                    if metric.ends_with("_peers_blocked") {
483                        let value = value.parse::<u64>().unwrap();
484                        assert_eq!(value, 0);
485                    }
486
487                    // If ends with contiguous_height, ensure it is at least required_container
488                    if metric.ends_with("_marshal_processed_height") {
489                        let value = value.parse::<u64>().unwrap();
490                        if value >= initial_container_required {
491                            success = true;
492                            break;
493                        }
494                    }
495                }
496                if success {
497                    break;
498                }
499
500                // Still waiting for all validators to complete
501                context.sleep(Duration::from_secs(1)).await;
502            }
503
504            // Link first peer
505            link_validators(
506                &mut oracle,
507                &validators,
508                link,
509                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
510            )
511            .await;
512
513            // Configure engine
514            let signer = signers[0].clone();
515            let share = shares[0].clone();
516            let public_key = signer.public_key();
517            let uid = format!("validator-{public_key}");
518            let config: Config<_, Mock> = engine::Config {
519                blocker: oracle.control(public_key.clone()),
520                partition_prefix: uid.clone(),
521                blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
522                finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
523                me: signer.public_key(),
524                polynomial: polynomial.clone(),
525                share,
526                participants: validators.clone().into(),
527                mailbox_size: 1024,
528                deque_size: 10,
529                leader_timeout: Duration::from_secs(1),
530                notarization_timeout: Duration::from_secs(2),
531                nullify_retry: Duration::from_secs(10),
532                fetch_timeout: Duration::from_secs(1),
533                activity_timeout: 10,
534                skip_timeout: 5,
535                max_fetch_count: 10,
536                max_fetch_size: 1024 * 512,
537                fetch_concurrent: 10,
538                fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
539                indexer: None,
540            };
541            let engine = Engine::new(context.with_label(&uid), config).await;
542
543            // Get networking
544            let (pending, recovered, resolver, broadcast, backfill) =
545                registrations.remove(&public_key).unwrap();
546
547            // Configure marshal resolver
548            let marshal_resolver_cfg = marshal::resolver::p2p::Config {
549                public_key: public_key.clone(),
550                manager: oracle,
551                mailbox_size: 1024,
552                requester_config: requester::Config {
553                    me: Some(public_key.clone()),
554                    rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
555                    initial: Duration::from_secs(1),
556                    timeout: Duration::from_secs(2),
557                },
558                fetch_retry_timeout: Duration::from_millis(100),
559                priority_requests: false,
560                priority_responses: false,
561            };
562
563            let marshal_resolver =
564                marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
565
566            // Start engine
567            engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
568
569            // Poll metrics
570            loop {
571                let metrics = context.encode();
572
573                // Iterate over all lines
574                let mut success = false;
575                for line in metrics.lines() {
576                    // Ensure it is a metrics line
577                    if !line.starts_with("validator-") {
578                        continue;
579                    }
580
581                    // Split metric and value
582                    let mut parts = line.split_whitespace();
583                    let metric = parts.next().unwrap();
584                    let value = parts.next().unwrap();
585
586                    // If ends with peers_blocked, ensure it is zero
587                    if metric.ends_with("_peers_blocked") {
588                        let value = value.parse::<u64>().unwrap();
589                        assert_eq!(value, 0);
590                    }
591
592                    // If ends with contiguous_height, ensure it is at least required_container
593                    if metric.ends_with("_marshal_processed_height") {
594                        let value = value.parse::<u64>().unwrap();
595                        if value >= final_container_required {
596                            success = true;
597                            break;
598                        }
599                    }
600                }
601                if success {
602                    break;
603                }
604
605                // Still waiting for all validators to complete
606                context.sleep(Duration::from_secs(1)).await;
607            }
608        });
609    }
610
611    #[test_traced]
612    fn test_unclean_shutdown() {
613        // Create context
614        let n = 5;
615        let threshold = quorum(n);
616        let required_container = 100;
617
618        // Derive threshold
619        let mut rng = StdRng::seed_from_u64(0);
620        let (polynomial, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, n, threshold);
621
622        // Random restarts every x seconds
623        let mut runs = 0;
624        let mut prev_checkpoint = None;
625        loop {
626            // Setup run
627            let polynomial = polynomial.clone();
628            let shares = shares.clone();
629            let f = |mut context: deterministic::Context| async move {
630                // Create simulated network
631                let (network, mut oracle) = Network::new(
632                    context.with_label("network"),
633                    simulated::Config {
634                        max_size: 1024 * 1024,
635                        disconnect_on_block: true,
636                        tracked_peer_sets: Some(1),
637                    },
638                );
639
640                // Start network
641                network.start();
642
643                // Register participants
644                let mut signers = Vec::new();
645                let mut validators = Vec::new();
646                for i in 0..n {
647                    let signer = PrivateKey::from_seed(i as u64);
648                    let pk = signer.public_key();
649                    signers.push(signer);
650                    validators.push(pk);
651                }
652                validators.sort();
653                signers.sort_by_key(|s| s.public_key());
654                let mut registrations = register_validators(&mut oracle, &validators).await;
655
656                // Link all validators
657                let link = Link {
658                    latency: Duration::from_millis(10),
659                    jitter: Duration::from_millis(1),
660                    success_rate: 1.0,
661                };
662                link_validators(&mut oracle, &validators, link, None).await;
663
664                // Create instances
665                let mut public_keys = HashSet::new();
666                for (idx, signer) in signers.into_iter().enumerate() {
667                    // Create signer context
668                    let public_key = signer.public_key();
669                    public_keys.insert(public_key.clone());
670
671                    // Configure engine
672                    let uid = format!("validator-{public_key}");
673                    let config: Config<_, Mock> = engine::Config {
674                        blocker: oracle.control(public_key.clone()),
675                        partition_prefix: uid.clone(),
676                        blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
677                        finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
678                        me: signer.public_key(),
679                        polynomial: polynomial.clone(),
680                        share: shares[idx].clone(),
681                        participants: validators.clone().into(),
682                        mailbox_size: 1024,
683                        deque_size: 10,
684                        leader_timeout: Duration::from_secs(1),
685                        notarization_timeout: Duration::from_secs(2),
686                        nullify_retry: Duration::from_secs(10),
687                        fetch_timeout: Duration::from_secs(1),
688                        activity_timeout: 10,
689                        skip_timeout: 5,
690                        max_fetch_count: 10,
691                        max_fetch_size: 1024 * 512,
692                        fetch_concurrent: 10,
693                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
694                        indexer: None,
695                    };
696                    let engine = Engine::new(context.with_label(&uid), config).await;
697
698                    // Get networking
699                    let (pending, recovered, resolver, broadcast, backfill) =
700                        registrations.remove(&public_key).unwrap();
701
702                    // Configure marshal resolver
703                    let marshal_resolver_cfg = marshal::resolver::p2p::Config {
704                        public_key: public_key.clone(),
705                        manager: oracle.clone(),
706                        mailbox_size: 1024,
707                        requester_config: requester::Config {
708                            me: Some(public_key.clone()),
709                            rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
710                            initial: Duration::from_secs(1),
711                            timeout: Duration::from_secs(2),
712                        },
713                        fetch_retry_timeout: Duration::from_millis(100),
714                        priority_requests: false,
715                        priority_responses: false,
716                    };
717
718                    let marshal_resolver =
719                        marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
720
721                    // Start engine
722                    engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
723                }
724
725                // Poll metrics
726                let poller = context
727                    .with_label("metrics")
728                    .spawn(move |context| async move {
729                        loop {
730                            let metrics = context.encode();
731
732                            // Iterate over all lines
733                            let mut success = false;
734                            for line in metrics.lines() {
735                                // Ensure it is a metrics line
736                                if !line.starts_with("validator-") {
737                                    continue;
738                                }
739
740                                // Split metric and value
741                                let mut parts = line.split_whitespace();
742                                let metric = parts.next().unwrap();
743                                let value = parts.next().unwrap();
744
745                                // If ends with peers_blocked, ensure it is zero
746                                if metric.ends_with("_peers_blocked") {
747                                    let value = value.parse::<u64>().unwrap();
748                                    assert_eq!(value, 0);
749                                }
750
751                                // If ends with contiguous_height, ensure it is at least required_container
752                                if metric.ends_with("_marshal_processed_height") {
753                                    let value = value.parse::<u64>().unwrap();
754                                    if value >= required_container {
755                                        success = true;
756                                        break;
757                                    }
758                                }
759                            }
760                            if success {
761                                break;
762                            }
763
764                            // Still waiting for all validators to complete
765                            context.sleep(Duration::from_millis(10)).await;
766                        }
767                    });
768
769                // Exit at random points until finished
770                let wait =
771                    context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
772
773                // Wait for one to finish
774                select! {
775                    _ = poller => {
776                        // Finished
777                        true
778                    },
779                    _ = context.sleep(wait) => {
780                        // Randomly exit
781                        false
782                    }
783                }
784            };
785
786            // Handle run
787            let (complete, checkpoint) = if let Some(prev_checkpoint) = prev_checkpoint {
788                Runner::from(prev_checkpoint)
789            } else {
790                Runner::timed(Duration::from_secs(30))
791            }
792            .start_and_recover(f);
793
794            // Check if we should exit
795            if complete {
796                break;
797            }
798
799            // Prepare for next run
800            prev_checkpoint = Some(checkpoint);
801            runs += 1;
802        }
803        assert!(runs > 1);
804        info!(runs, "unclean shutdown recovery worked");
805    }
806
807    #[test_traced]
808    fn test_indexer() {
809        // Create context
810        let n = 5;
811        let threshold = quorum(n);
812        let required_container = 10;
813        let executor = Runner::timed(Duration::from_secs(30));
814        executor.start(|mut context| async move {
815            // Create simulated network
816            let (network, mut oracle) = Network::new(
817                context.with_label("network"),
818                simulated::Config {
819                    max_size: 1024 * 1024,
820                    disconnect_on_block: true,
821                    tracked_peer_sets: Some(1),
822                },
823            );
824
825            // Start network
826            network.start();
827
828            // Register participants
829            let mut signers = Vec::new();
830            let mut validators = Vec::new();
831            for i in 0..n {
832                let signer = PrivateKey::from_seed(i as u64);
833                let pk = signer.public_key();
834                signers.push(signer);
835                validators.push(pk);
836            }
837            validators.sort();
838            signers.sort_by_key(|s| s.public_key());
839            let mut registrations = register_validators(&mut oracle, &validators).await;
840
841            // Link all validators
842            let link = Link {
843                latency: Duration::from_millis(10),
844                jitter: Duration::from_millis(1),
845                success_rate: 1.0,
846            };
847            link_validators(&mut oracle, &validators, link, None).await;
848
849            // Derive threshold
850            let (polynomial, shares) =
851                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
852            let identity = *poly::public::<MinSig>(&polynomial);
853
854            // Define mock indexer
855            let indexer = Mock::new("", identity);
856
857            // Create instances
858            let mut public_keys = HashSet::new();
859            for (idx, signer) in signers.into_iter().enumerate() {
860                // Create signer context
861                let public_key = signer.public_key();
862                public_keys.insert(public_key.clone());
863
864                // Configure engine
865                let uid = format!("validator-{public_key}");
866                let config: Config<_, Mock> = engine::Config {
867                    blocker: oracle.control(public_key.clone()),
868                    partition_prefix: uid.clone(),
869                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
870                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
871                    me: signer.public_key(),
872                    polynomial: polynomial.clone(),
873                    share: shares[idx].clone(),
874                    participants: validators.clone().into(),
875                    mailbox_size: 1024,
876                    deque_size: 10,
877                    leader_timeout: Duration::from_secs(1),
878                    notarization_timeout: Duration::from_secs(2),
879                    nullify_retry: Duration::from_secs(10),
880                    fetch_timeout: Duration::from_secs(1),
881                    activity_timeout: 10,
882                    skip_timeout: 5,
883                    max_fetch_count: 10,
884                    max_fetch_size: 1024 * 512,
885                    fetch_concurrent: 10,
886                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
887                    indexer: Some(indexer.clone()),
888                };
889                let engine = Engine::new(context.with_label(&uid), config).await;
890
891                // Get networking
892                let (pending, recovered, resolver, broadcast, backfill) =
893                    registrations.remove(&public_key).unwrap();
894
895                // Configure marshal resolver
896                let marshal_resolver_cfg = marshal::resolver::p2p::Config {
897                    public_key: public_key.clone(),
898                    manager: oracle.clone(),
899                    mailbox_size: 1024,
900                    requester_config: requester::Config {
901                        me: Some(public_key.clone()),
902                        rate_limit: Quota::per_second(NonZeroU32::new(5).unwrap()),
903                        initial: Duration::from_secs(1),
904                        timeout: Duration::from_secs(2),
905                    },
906                    fetch_retry_timeout: Duration::from_millis(100),
907                    priority_requests: false,
908                    priority_responses: false,
909                };
910
911                let marshal_resolver =
912                    marshal::resolver::p2p::init(&context, marshal_resolver_cfg, backfill);
913
914                // Start engine
915                engine.start(pending, recovered, resolver, broadcast, marshal_resolver);
916            }
917
918            // Poll metrics
919            loop {
920                let metrics = context.encode();
921
922                // Iterate over all lines
923                let mut success = false;
924                for line in metrics.lines() {
925                    // Ensure it is a metrics line
926                    if !line.starts_with("validator-") {
927                        continue;
928                    }
929
930                    // Split metric and value
931                    let mut parts = line.split_whitespace();
932                    let metric = parts.next().unwrap();
933                    let value = parts.next().unwrap();
934
935                    // If ends with peers_blocked, ensure it is zero
936                    if metric.ends_with("_peers_blocked") {
937                        let value = value.parse::<u64>().unwrap();
938                        assert_eq!(value, 0);
939                    }
940
941                    // If ends with contiguous_height, ensure it is at least required_container
942                    if metric.ends_with("_marshal_processed_height") {
943                        let value = value.parse::<u64>().unwrap();
944                        if value >= required_container {
945                            success = true;
946                            break;
947                        }
948                    }
949                }
950                if success {
951                    break;
952                }
953
954                // Still waiting for all validators to complete
955                context.sleep(Duration::from_secs(1)).await;
956            }
957
958            // Check indexer uploads
959            assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed));
960            assert!(indexer
961                .notarization_seen
962                .load(std::sync::atomic::Ordering::Relaxed));
963            assert!(indexer
964                .finalization_seen
965                .load(std::sync::atomic::Ordering::Relaxed));
966        });
967    }
968}