battleware_node/
lib.rs

1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, net::SocketAddr};
3
4pub mod aggregator;
5pub mod application;
6pub mod engine;
7pub mod indexer;
8pub mod seeder;
9pub mod supervisor;
10
11/// Configuration for the [engine::Engine].
12#[derive(Deserialize, Serialize)]
13pub struct Config {
14    pub private_key: String,
15    pub share: String,
16    pub polynomial: String,
17
18    pub port: u16,
19    pub metrics_port: u16,
20    pub directory: String,
21    pub worker_threads: usize,
22    pub log_level: String,
23
24    pub allowed_peers: Vec<String>,
25    pub bootstrappers: Vec<String>,
26
27    pub message_backlog: usize,
28    pub mailbox_size: usize,
29    pub deque_size: usize,
30
31    pub indexer: String,
32    pub execution_concurrency: usize,
33}
34
35/// A list of peers provided when a validator is run locally.
36///
37/// When run remotely, [commonware_deployer::ec2::Hosts] is used instead.
38#[derive(Deserialize, Serialize)]
39pub struct Peers {
40    pub addresses: HashMap<String, SocketAddr>,
41}
42
43#[cfg(test)]
44mod tests {
45    use super::*;
46    use battleware_types::execution::{Instruction, Transaction};
47    use commonware_cryptography::{
48        bls12381::{
49            dkg::ops,
50            primitives::{poly::public, variant::MinSig},
51        },
52        ed25519::{PrivateKey, PublicKey},
53        PrivateKeyExt, Signer,
54    };
55    use commonware_macros::{select, test_traced};
56    use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
57    use commonware_runtime::{
58        deterministic::{self, Runner},
59        Clock, Metrics, Runner as _, Spawner,
60    };
61    use commonware_utils::{quorum, NZUsize};
62    use engine::{Config, Engine};
63    use governor::Quota;
64    use indexer::Mock;
65    use rand::{rngs::StdRng, Rng, SeedableRng};
66    use std::{
67        collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
68        num::{NonZeroU32, NonZeroUsize},
69        time::Duration,
70    };
71    use tracing::info;
72
73    /// Limit the freezer table size to 1MB because the deterministic runtime stores
74    /// everything in RAM.
75    const FREEZER_TABLE_INITIAL_SIZE: u32 = 2u32.pow(14); // 1MB
76
77    /// The buffer pool page size.
78    const BUFFER_POOL_PAGE_SIZE: NonZeroUsize = NZUsize!(4_096);
79
80    /// The buffer pool capacity.
81    const BUFFER_POOL_CAPACITY: NonZeroUsize = NZUsize!(1024 * 1024);
82
83    /// Registers all validators using the oracle.
84    async fn register_validators(
85        oracle: &mut Oracle<PublicKey>,
86        validators: &[PublicKey],
87    ) -> HashMap<
88        PublicKey,
89        (
90            (Sender<PublicKey>, Receiver<PublicKey>),
91            (Sender<PublicKey>, Receiver<PublicKey>),
92            (Sender<PublicKey>, Receiver<PublicKey>),
93            (Sender<PublicKey>, Receiver<PublicKey>),
94            (Sender<PublicKey>, Receiver<PublicKey>),
95            (Sender<PublicKey>, Receiver<PublicKey>),
96            (Sender<PublicKey>, Receiver<PublicKey>),
97            (Sender<PublicKey>, Receiver<PublicKey>),
98        ),
99    > {
100        let mut registrations = HashMap::new();
101        for validator in validators.iter() {
102            let (pending_sender, pending_receiver) =
103                oracle.register(validator.clone(), 0).await.unwrap();
104            let (recovered_sender, recovered_receiver) =
105                oracle.register(validator.clone(), 1).await.unwrap();
106            let (resolver_sender, resolver_receiver) =
107                oracle.register(validator.clone(), 2).await.unwrap();
108            let (broadcast_sender, broadcast_receiver) =
109                oracle.register(validator.clone(), 3).await.unwrap();
110            let (backfill_sender, backfill_receiver) =
111                oracle.register(validator.clone(), 4).await.unwrap();
112            let (seeder_sender, seeder_receiver) =
113                oracle.register(validator.clone(), 5).await.unwrap();
114            let (aggregator_sender, aggregator_receiver) =
115                oracle.register(validator.clone(), 6).await.unwrap();
116            let (aggregation_sender, aggregation_receiver) =
117                oracle.register(validator.clone(), 7).await.unwrap();
118            registrations.insert(
119                validator.clone(),
120                (
121                    (pending_sender, pending_receiver),
122                    (recovered_sender, recovered_receiver),
123                    (resolver_sender, resolver_receiver),
124                    (broadcast_sender, broadcast_receiver),
125                    (backfill_sender, backfill_receiver),
126                    (seeder_sender, seeder_receiver),
127                    (aggregator_sender, aggregator_receiver),
128                    (aggregation_sender, aggregation_receiver),
129                ),
130            );
131        }
132        registrations
133    }
134
135    /// Links (or unlinks) validators using the oracle.
136    ///
137    /// The `action` parameter determines the action (e.g. link, unlink) to take.
138    /// The `restrict_to` function can be used to restrict the linking to certain connections,
139    /// otherwise all validators will be linked to all other validators.
140    async fn link_validators(
141        oracle: &mut Oracle<PublicKey>,
142        validators: &[PublicKey],
143        link: Link,
144        restrict_to: Option<fn(usize, usize, usize) -> bool>,
145    ) {
146        for (i1, v1) in validators.iter().enumerate() {
147            for (i2, v2) in validators.iter().enumerate() {
148                // Ignore self
149                if v2 == v1 {
150                    continue;
151                }
152
153                // Restrict to certain connections
154                if let Some(f) = restrict_to {
155                    if !f(validators.len(), i1, i2) {
156                        continue;
157                    }
158                }
159
160                // Add link
161                oracle
162                    .add_link(v1.clone(), v2.clone(), link.clone())
163                    .await
164                    .unwrap();
165            }
166        }
167    }
168
169    fn all_online(n: u32, seed: u64, link: Link, required: u64) -> String {
170        // Create context
171        let threshold = quorum(n);
172        let cfg = deterministic::Config::default().with_seed(seed);
173        let executor = Runner::from(cfg);
174        executor.start(|mut context| async move {
175            // Create simulated network
176            let (network, mut oracle) = Network::new(
177                context.with_label("network"),
178                simulated::Config {
179                    max_size: 1024 * 1024,
180                },
181            );
182
183            // Start network
184            network.start();
185
186            // Register participants
187            let mut signers = Vec::new();
188            let mut validators = Vec::new();
189            for i in 0..n {
190                let signer = PrivateKey::from_seed(i as u64);
191                let pk = signer.public_key();
192                signers.push(signer);
193                validators.push(pk);
194            }
195            validators.sort();
196            signers.sort_by_key(|s| s.public_key());
197            let mut registrations = register_validators(&mut oracle, &validators).await;
198
199            // Link all validators
200            link_validators(&mut oracle, &validators, link, None).await;
201
202            // Derive threshold
203            let (polynomial, shares) =
204                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
205            let identity = *public::<MinSig>(&polynomial);
206
207            // Define mock indexer
208            let indexer = Mock::new(identity);
209
210            // Create instances
211            let mut public_keys = HashSet::new();
212            for (idx, signer) in signers.into_iter().enumerate() {
213                // Create signer context
214                let public_key = signer.public_key();
215                public_keys.insert(public_key.clone());
216
217                // Configure engine
218                let uid = format!("validator-{public_key}");
219                let config: Config<_, Mock> = engine::Config {
220                    blocker: oracle.control(public_key.clone()),
221                    partition_prefix: uid.clone(),
222                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
223                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
224                    signer,
225                    polynomial: polynomial.clone(),
226                    share: shares[idx].clone(),
227                    participants: validators.clone(),
228                    mailbox_size: 1024,
229                    deque_size: 10,
230                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
231                    leader_timeout: Duration::from_secs(1),
232                    notarization_timeout: Duration::from_secs(2),
233                    nullify_retry: Duration::from_secs(10),
234                    fetch_timeout: Duration::from_secs(1),
235                    activity_timeout: 10,
236                    skip_timeout: 5,
237                    max_fetch_count: 10,
238                    max_fetch_size: 1024 * 512,
239                    fetch_concurrent: 10,
240                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
241                    buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
242                    buffer_pool_capacity: BUFFER_POOL_CAPACITY,
243                    indexer: indexer.clone(),
244                    execution_concurrency: 2,
245                    max_uploads_outstanding: 4,
246                };
247                let engine = Engine::new(context.with_label(&uid), config).await;
248
249                // Get networking
250                let (
251                    pending,
252                    recovered,
253                    resolver,
254                    broadcast,
255                    backfill,
256                    seeder,
257                    aggregator,
258                    aggregation,
259                ) = registrations.remove(&public_key).unwrap();
260
261                // Start engine
262                engine.start(
263                    pending,
264                    recovered,
265                    resolver,
266                    broadcast,
267                    backfill,
268                    seeder,
269                    aggregator,
270                    aggregation,
271                );
272            }
273
274            // Poll metrics
275            loop {
276                let metrics = context.encode();
277
278                // Iterate over all lines
279                let mut success = 0;
280                for line in metrics.lines() {
281                    // Ensure it is a metrics line
282                    if !line.starts_with("validator-") {
283                        continue;
284                    }
285
286                    // Split metric and value
287                    let mut parts = line.split_whitespace();
288                    let metric = parts.next().unwrap();
289                    let value = parts.next().unwrap();
290
291                    // If ends with peers_blocked, ensure it is zero
292                    if metric.ends_with("_peers_blocked") {
293                        let value = value.parse::<u64>().unwrap();
294                        assert_eq!(value, 0);
295                    }
296
297                    // If ends with certificates_processed, ensure it is at least required_container
298                    if metric.ends_with("_certificates_processed") {
299                        let value = value.parse::<u64>().unwrap();
300                        if value >= required {
301                            success += 1;
302                        }
303                    }
304                }
305                if success == n {
306                    break;
307                }
308
309                // Still waiting for all validators to complete
310                context.sleep(Duration::from_secs(1)).await;
311            }
312
313            // Wait for mock indexer to contain all seeds and summaries
314            loop {
315                let contains_seeds = {
316                    let mut contains_seeds = true;
317                    let seeds = indexer.seeds.lock().unwrap();
318                    for i in 1..=required {
319                        if !seeds.contains_key(&i) {
320                            contains_seeds = false;
321                            break;
322                        }
323                    }
324                    contains_seeds
325                };
326                let contains_summaries = {
327                    let summaries = indexer.summaries.write().await;
328                    let seen_summaries = summaries.iter().map(|(i, _)| *i).collect::<HashSet<_>>();
329                    let mut contains_summaries = true;
330                    for i in 1..=required {
331                        if !seen_summaries.contains(&i) {
332                            contains_summaries = false;
333                            break;
334                        }
335                    }
336                    contains_summaries
337                };
338
339                // If both contain all required containers, break
340                if contains_seeds && contains_summaries {
341                    break;
342                }
343
344                // Still waiting for all validators to complete
345                context.sleep(Duration::from_millis(10)).await;
346            }
347
348            context.auditor().state()
349        })
350    }
351
352    #[test_traced("INFO")]
353    fn test_good_links() {
354        let link = Link {
355            latency: Duration::from_millis(10),
356            jitter: Duration::from_millis(1),
357            success_rate: 1.0,
358        };
359        for seed in 0..5 {
360            let state = all_online(5, seed, link.clone(), 25);
361            assert_eq!(state, all_online(5, seed, link.clone(), 25));
362        }
363    }
364
365    #[test_traced("INFO")]
366    fn test_bad_links() {
367        let link = Link {
368            latency: Duration::from_millis(200),
369            jitter: Duration::from_millis(150),
370            success_rate: 0.75,
371        };
372        for seed in 0..5 {
373            let state = all_online(5, seed, link.clone(), 25);
374            assert_eq!(state, all_online(5, seed, link.clone(), 25));
375        }
376    }
377
378    #[test_traced("INFO")]
379    fn test_1k() {
380        let link = Link {
381            latency: Duration::from_millis(80),
382            jitter: Duration::from_millis(10),
383            success_rate: 0.98,
384        };
385        all_online(10, 0, link.clone(), 1000);
386    }
387
388    #[test_traced("INFO")]
389    fn test_backfill() {
390        // Create context
391        let n = 5;
392        let threshold = quorum(n);
393        let initial_container_required = 10;
394        let final_container_required = 20;
395        let executor = Runner::timed(Duration::from_secs(30));
396        executor.start(|mut context| async move {
397            // Create simulated network
398            let (network, mut oracle) = Network::new(
399                context.with_label("network"),
400                simulated::Config {
401                    max_size: 1024 * 1024,
402                },
403            );
404
405            // Start network
406            network.start();
407
408            // Register participants
409            let mut signers = Vec::new();
410            let mut validators = Vec::new();
411            for i in 0..n {
412                let signer = PrivateKey::from_seed(i as u64);
413                let pk = signer.public_key();
414                signers.push(signer);
415                validators.push(pk);
416            }
417            validators.sort();
418            signers.sort_by_key(|s| s.public_key());
419            let mut registrations = register_validators(&mut oracle, &validators).await;
420
421            // Link all validators (except 0)
422            let link = Link {
423                latency: Duration::from_millis(10),
424                jitter: Duration::from_millis(1),
425                success_rate: 1.0,
426            };
427            link_validators(
428                &mut oracle,
429                &validators,
430                link.clone(),
431                Some(|_, i, j| ![i, j].contains(&0usize)),
432            )
433            .await;
434
435            // Derive threshold
436            let (polynomial, shares) =
437                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
438            let identity = *public::<MinSig>(&polynomial);
439
440            // Define mock indexer
441            let indexer = Mock::new(identity);
442
443            // Create instances
444            for (idx, signer) in signers.iter().enumerate() {
445                // Skip first
446                if idx == 0 {
447                    continue;
448                }
449
450                // Configure engine
451                let public_key = signer.public_key();
452                let uid = format!("validator-{public_key}");
453                let config: Config<_, Mock> = engine::Config {
454                    blocker: oracle.control(public_key.clone()),
455                    partition_prefix: uid.clone(),
456                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
457                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
458                    signer: signer.clone(),
459                    polynomial: polynomial.clone(),
460                    share: shares[idx].clone(),
461                    participants: validators.clone(),
462                    mailbox_size: 1024,
463                    deque_size: 10,
464                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
465                    leader_timeout: Duration::from_secs(1),
466                    notarization_timeout: Duration::from_secs(2),
467                    nullify_retry: Duration::from_secs(10),
468                    fetch_timeout: Duration::from_secs(1),
469                    activity_timeout: 10,
470                    skip_timeout: 5,
471                    max_fetch_count: 10,
472                    max_fetch_size: 1024 * 512,
473                    fetch_concurrent: 10,
474                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
475                    buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
476                    buffer_pool_capacity: BUFFER_POOL_CAPACITY,
477                    indexer: indexer.clone(),
478                    execution_concurrency: 2,
479                    max_uploads_outstanding: 4,
480                };
481                let engine = Engine::new(context.with_label(&uid), config).await;
482
483                // Get networking
484                let (
485                    pending,
486                    recovered,
487                    resolver,
488                    broadcast,
489                    backfill,
490                    seeder,
491                    aggregator,
492                    aggregation,
493                ) = registrations.remove(&public_key).unwrap();
494
495                // Start engine
496                engine.start(
497                    pending,
498                    recovered,
499                    resolver,
500                    broadcast,
501                    backfill,
502                    seeder,
503                    aggregator,
504                    aggregation,
505                );
506            }
507
508            // Poll metrics
509            loop {
510                let metrics = context.encode();
511
512                // Iterate over all lines
513                let mut success = 0;
514                for line in metrics.lines() {
515                    // Ensure it is a metrics line
516                    if !line.starts_with("validator-") {
517                        continue;
518                    }
519
520                    // Split metric and value
521                    let mut parts = line.split_whitespace();
522                    let metric = parts.next().unwrap();
523                    let value = parts.next().unwrap();
524
525                    // If ends with peers_blocked, ensure it is zero
526                    if metric.ends_with("_peers_blocked") {
527                        let value = value.parse::<u64>().unwrap();
528                        assert_eq!(value, 0);
529                    }
530
531                    // If ends with certificates_processed, ensure it is at least required_container
532                    if metric.ends_with("_certificates_processed") {
533                        let value = value.parse::<u64>().unwrap();
534                        if value >= initial_container_required {
535                            success += 1;
536                        }
537                    }
538                }
539                if success == n - 1 {
540                    break;
541                }
542
543                // Still waiting for all validators to complete
544                context.sleep(Duration::from_secs(1)).await;
545            }
546
547            // Link first peer (and disable link to second peer)
548            link_validators(
549                &mut oracle,
550                &validators,
551                link,
552                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
553            )
554            .await;
555
556            // Configure engine
557            let signer = signers[0].clone();
558            let share = shares[0].clone();
559            let public_key = signer.public_key();
560            let uid = format!("validator-{public_key}");
561            let config: Config<_, Mock> = engine::Config {
562                blocker: oracle.control(public_key.clone()),
563                partition_prefix: uid.clone(),
564                blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
565                finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
566                signer: signer.clone(),
567                polynomial: polynomial.clone(),
568                share,
569                participants: validators.clone(),
570                mailbox_size: 1024,
571                deque_size: 10,
572                backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
573                leader_timeout: Duration::from_secs(1),
574                notarization_timeout: Duration::from_secs(2),
575                nullify_retry: Duration::from_secs(10),
576                fetch_timeout: Duration::from_secs(1),
577                activity_timeout: 10,
578                skip_timeout: 5,
579                max_fetch_count: 10,
580                max_fetch_size: 1024 * 512,
581                fetch_concurrent: 10,
582                fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
583                buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
584                buffer_pool_capacity: BUFFER_POOL_CAPACITY,
585                indexer: indexer.clone(),
586                execution_concurrency: 2,
587                max_uploads_outstanding: 4,
588            };
589            let engine = Engine::new(context.with_label(&uid), config).await;
590
591            // Get networking
592            let (
593                pending,
594                recovered,
595                resolver,
596                broadcast,
597                backfill,
598                seeder,
599                aggregator,
600                aggregation,
601            ) = registrations.remove(&public_key).unwrap();
602
603            // Start engine
604            engine.start(
605                pending,
606                recovered,
607                resolver,
608                broadcast,
609                backfill,
610                seeder,
611                aggregator,
612                aggregation,
613            );
614
615            // Poll metrics
616            loop {
617                let metrics = context.encode();
618
619                // Iterate over all lines
620                let mut success = 0;
621                for line in metrics.lines() {
622                    // Ensure it is a metrics line
623                    if !line.starts_with("validator-") {
624                        continue;
625                    }
626
627                    // Split metric and value
628                    let mut parts = line.split_whitespace();
629                    let metric = parts.next().unwrap();
630                    let value = parts.next().unwrap();
631
632                    // If ends with peers_blocked, ensure it is zero
633                    if metric.ends_with("_peers_blocked") {
634                        let value = value.parse::<u64>().unwrap();
635                        assert_eq!(value, 0);
636                    }
637
638                    // If ends with certificates_processed, ensure it is at least required_container
639                    if metric.ends_with("_certificates_processed") {
640                        let value = value.parse::<u64>().unwrap();
641                        if value >= final_container_required {
642                            success += 1;
643                        }
644                    }
645                }
646                if success == n - 1 {
647                    break;
648                }
649
650                // Still waiting for all validators to complete
651                context.sleep(Duration::from_secs(1)).await;
652            }
653        });
654    }
655
656    #[test_traced("INFO")]
657    fn test_unclean_shutdown() {
658        // Create context
659        let n = 5;
660        let threshold = quorum(n);
661        let required_container = 100;
662
663        // Derive threshold
664        let mut rng = StdRng::seed_from_u64(0);
665        let (polynomial, shares) = ops::generate_shares::<_, MinSig>(&mut rng, None, n, threshold);
666        let identity = *public::<MinSig>(&polynomial);
667
668        // Define mock indexer (must live outside of the loop because
669        // it stores seeds beyond the consensus pruning boundary)
670        let indexer = Mock::new(identity);
671
672        // Random restarts every x seconds
673        let mut runs = 0;
674        let mut prev_ctx = None;
675        loop {
676            // Setup run
677            let polynomial = polynomial.clone();
678            let shares = shares.clone();
679            let indexer = indexer.clone();
680            let f = |mut context: deterministic::Context| async move {
681                // Create simulated network
682                let (network, mut oracle) = Network::new(
683                    context.with_label("network"),
684                    simulated::Config {
685                        max_size: 1024 * 1024,
686                    },
687                );
688
689                // Start network
690                network.start();
691
692                // Register participants
693                let mut signers = Vec::new();
694                let mut validators = Vec::new();
695                for i in 0..n {
696                    let signer = PrivateKey::from_seed(i as u64);
697                    let pk = signer.public_key();
698                    signers.push(signer);
699                    validators.push(pk);
700                }
701                validators.sort();
702                signers.sort_by_key(|s| s.public_key());
703                let mut registrations = register_validators(&mut oracle, &validators).await;
704
705                // Link all validators
706                let link = Link {
707                    latency: Duration::from_millis(10),
708                    jitter: Duration::from_millis(1),
709                    success_rate: 1.0,
710                };
711                link_validators(&mut oracle, &validators, link, None).await;
712
713                // Create instances
714                let mut public_keys = HashSet::new();
715                for (idx, signer) in signers.into_iter().enumerate() {
716                    // Create signer context
717                    let public_key = signer.public_key();
718                    public_keys.insert(public_key.clone());
719
720                    // Configure engine
721                    let uid = format!("validator-{public_key}");
722                    let config: Config<_, Mock> = engine::Config {
723                        blocker: oracle.control(public_key.clone()),
724                        partition_prefix: uid.clone(),
725                        blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
726                        finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
727                        signer,
728                        polynomial: polynomial.clone(),
729                        share: shares[idx].clone(),
730                        participants: validators.clone(),
731                        mailbox_size: 1024,
732                        deque_size: 10,
733                        backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
734                        leader_timeout: Duration::from_secs(1),
735                        notarization_timeout: Duration::from_secs(2),
736                        nullify_retry: Duration::from_secs(10),
737                        fetch_timeout: Duration::from_secs(1),
738                        activity_timeout: 10,
739                        skip_timeout: 5,
740                        max_fetch_count: 10,
741                        max_fetch_size: 1024 * 512,
742                        fetch_concurrent: 10,
743                        fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
744                        buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
745                        buffer_pool_capacity: BUFFER_POOL_CAPACITY,
746                        indexer: indexer.clone(),
747                        execution_concurrency: 2,
748                        max_uploads_outstanding: 4,
749                    };
750                    let engine = Engine::new(context.with_label(&uid), config).await;
751
752                    // Get networking
753                    let (
754                        pending,
755                        recovered,
756                        resolver,
757                        broadcast,
758                        backfill,
759                        seeder,
760                        aggregator,
761                        aggregation,
762                    ) = registrations.remove(&public_key).unwrap();
763
764                    // Start engine
765                    engine.start(
766                        pending,
767                        recovered,
768                        resolver,
769                        broadcast,
770                        backfill,
771                        seeder,
772                        aggregator,
773                        aggregation,
774                    );
775                }
776
777                // Poll metrics
778                let poller = context
779                    .with_label("metrics")
780                    .spawn(move |context| async move {
781                        // Wait for all validators to reach required container
782                        loop {
783                            let metrics = context.encode();
784
785                            // Iterate over all lines
786                            let mut success = 0;
787                            for line in metrics.lines() {
788                                // Ensure it is a metrics line
789                                if !line.starts_with("validator-") {
790                                    continue;
791                                }
792
793                                // Split metric and value
794                                let mut parts = line.split_whitespace();
795                                let metric = parts.next().unwrap();
796                                let value = parts.next().unwrap();
797
798                                // If ends with peers_blocked, ensure it is zero
799                                if metric.ends_with("_peers_blocked") {
800                                    let value = value.parse::<u64>().unwrap();
801                                    assert_eq!(value, 0);
802                                }
803
804                                // If ends with certificates_processed, ensure it is at least required_container
805                                if metric.ends_with("_certificates_processed") {
806                                    let value = value.parse::<u64>().unwrap();
807                                    if value >= required_container {
808                                        success += 1;
809                                    }
810                                }
811                            }
812                            if success == n {
813                                break;
814                            }
815
816                            // Still waiting for all validators to complete
817                            context.sleep(Duration::from_millis(10)).await;
818                        }
819
820                        // Wait for mock indexer to contain all seeds and summaries
821                        loop {
822                            let contains_seeds = {
823                                let mut contains_seeds = true;
824                                let seeds = indexer.seeds.lock().unwrap();
825                                for i in 1..=required_container {
826                                    if !seeds.contains_key(&i) {
827                                        contains_seeds = false;
828                                        break;
829                                    }
830                                }
831                                contains_seeds
832                            };
833                            let contains_summaries = {
834                                let summaries = indexer.summaries.write().await;
835                                let seen_summaries =
836                                    summaries.iter().map(|(i, _)| *i).collect::<HashSet<_>>();
837                                let mut contains_summaries = true;
838                                for i in 1..=required_container {
839                                    if !seen_summaries.contains(&i) {
840                                        contains_summaries = false;
841                                        break;
842                                    }
843                                }
844                                contains_summaries
845                            };
846
847                            // If both contain all required containers, break
848                            if contains_seeds && contains_summaries {
849                                break;
850                            }
851
852                            // Still waiting for all validators to complete
853                            context.sleep(Duration::from_millis(10)).await;
854                        }
855                    });
856
857                // Exit at random points until finished
858                let wait =
859                    context.gen_range(Duration::from_millis(100)..Duration::from_millis(1_000));
860
861                // Wait for one to finish
862                select! {
863                    _ = poller => {
864                        // Finished
865                        (true, context)
866                    },
867                    _ = context.sleep(wait) => {
868                        // Randomly exit
869                        (false, context)
870                    }
871                }
872            };
873
874            // Handle run
875            let (complete, context) = if let Some(prev_ctx) = prev_ctx {
876                Runner::from(prev_ctx)
877            } else {
878                Runner::timed(Duration::from_secs(300))
879            }
880            .start(f);
881            if complete {
882                break;
883            }
884
885            // Prepare for next run
886            prev_ctx = Some(context.recover());
887            runs += 1;
888        }
889        assert!(runs > 1);
890        info!(runs, "unclean shutdown recovery worked");
891    }
892
893    fn test_execution(seed: u64, link: Link) -> String {
894        // Create context
895        let n = 5;
896        let threshold = quorum(n);
897        let cfg = deterministic::Config::default()
898            .with_seed(seed)
899            .with_timeout(Some(Duration::from_secs(1200)));
900        let executor = Runner::from(cfg);
901        executor.start(|mut context| async move {
902            // Create simulated network
903            let (network, mut oracle) = Network::new(
904                context.with_label("network"),
905                simulated::Config {
906                    max_size: 1024 * 1024,
907                },
908            );
909
910            // Start network
911            network.start();
912
913            // Register participants
914            let mut signers = Vec::new();
915            let mut validators = Vec::new();
916            for i in 0..n {
917                let signer = PrivateKey::from_seed(i as u64);
918                let pk = signer.public_key();
919                signers.push(signer);
920                validators.push(pk);
921            }
922            validators.sort();
923            signers.sort_by_key(|s| s.public_key());
924            let mut registrations = register_validators(&mut oracle, &validators).await;
925
926            // Link all validators
927            link_validators(&mut oracle, &validators, link, None).await;
928
929            // Derive threshold
930            let (polynomial, shares) =
931                ops::generate_shares::<_, MinSig>(&mut context, None, n, threshold);
932            let identity = *public::<MinSig>(&polynomial);
933
934            // Define mock indexer
935            let indexer = Mock::new(identity);
936
937            // Create instances
938            let mut public_keys = HashSet::new();
939            for (idx, signer) in signers.into_iter().enumerate() {
940                // Create signer context
941                let public_key = signer.public_key();
942                public_keys.insert(public_key.clone());
943
944                // Configure engine
945                let uid = format!("validator-{public_key}");
946                let config: Config<_, Mock> = engine::Config {
947                    blocker: oracle.control(public_key.clone()),
948                    partition_prefix: uid.clone(),
949                    blocks_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
950                    finalized_freezer_table_initial_size: FREEZER_TABLE_INITIAL_SIZE,
951                    signer,
952                    polynomial: polynomial.clone(),
953                    share: shares[idx].clone(),
954                    participants: validators.clone(),
955                    mailbox_size: 1024,
956                    deque_size: 10,
957                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
958                    leader_timeout: Duration::from_secs(1),
959                    notarization_timeout: Duration::from_secs(2),
960                    nullify_retry: Duration::from_secs(10),
961                    fetch_timeout: Duration::from_secs(1),
962                    activity_timeout: 10,
963                    skip_timeout: 5,
964                    max_fetch_count: 10,
965                    max_fetch_size: 1024 * 1024,
966                    fetch_concurrent: 10,
967                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
968                    buffer_pool_page_size: BUFFER_POOL_PAGE_SIZE,
969                    buffer_pool_capacity: BUFFER_POOL_CAPACITY,
970                    indexer: indexer.clone(),
971                    execution_concurrency: 2,
972                    max_uploads_outstanding: 4,
973                };
974                let engine = Engine::new(context.with_label(&uid), config).await;
975
976                // Get networking
977                let (
978                    pending,
979                    recovered,
980                    resolver,
981                    broadcast,
982                    backfill,
983                    seeder,
984                    aggregator,
985                    aggregation,
986                ) = registrations.remove(&public_key).unwrap();
987
988                // Start engine
989                engine.start(
990                    pending,
991                    recovered,
992                    resolver,
993                    broadcast,
994                    backfill,
995                    seeder,
996                    aggregator,
997                    aggregation,
998                );
999            }
1000
1001            // Submit 1000 transactions
1002            let mut remaining = BTreeMap::new();
1003            for i in 0..1_000 {
1004                // Generate a signer
1005                let signer = PrivateKey::from_seed(i as u64);
1006
1007                // Generate a transaction
1008                let tx = Transaction::sign(&signer, 0, Instruction::Generate);
1009                indexer.submit_tx(tx.clone());
1010                remaining.insert(signer.public_key(), tx);
1011
1012                // Sleep for a bit to spread them out
1013                context.sleep(Duration::from_millis(5)).await;
1014            }
1015
1016            // Wait for all transactions to be processed
1017            let mut seen = HashMap::new();
1018            let mut last_height = None;
1019            let mut all_height = 1;
1020            while last_height.is_none() || all_height < last_height.unwrap() {
1021                // Remove all transactions in some event
1022                let summaries = indexer
1023                    .summaries
1024                    .write()
1025                    .await
1026                    .drain(..)
1027                    .collect::<Vec<_>>();
1028
1029                // If no events, sleep
1030                if summaries.is_empty() {
1031                    // Rebroadcast all remaining transactions
1032                    for (_, tx) in remaining.iter() {
1033                        indexer.submit_tx(tx.clone());
1034                    }
1035
1036                    // Avoid busy loop
1037                    context.sleep(Duration::from_secs(1)).await;
1038                    continue;
1039                }
1040
1041                // Process events
1042                for (height, summary) in summaries.into_iter() {
1043                    // Remove any pending transactions
1044                    for event in summary.events_proof_ops.iter() {
1045                        if let commonware_storage::store::operation::Keyless::Append(
1046                            battleware_types::execution::Output::Event(
1047                                battleware_types::execution::Event::Generated { account, .. },
1048                            ),
1049                        ) = event
1050                        {
1051                            remaining.remove(account);
1052                        }
1053                    }
1054
1055                    // Ensure all validators see the same events at the same height
1056                    match seen.entry(height) {
1057                        Entry::Vacant(entry) => {
1058                            entry.insert((1, summary));
1059                        }
1060                        Entry::Occupied(mut entry) => {
1061                            assert_eq!(entry.get().1, summary);
1062                            entry.get_mut().0 += 1;
1063                        }
1064                    }
1065
1066                    // Update last height
1067                    if last_height.is_none() && remaining.is_empty() {
1068                        last_height = Some(height);
1069                    }
1070                }
1071
1072                // Wait for all validators to see all important heights
1073                loop {
1074                    let Some((seen, _)) = seen.get(&all_height) else {
1075                        break;
1076                    };
1077                    if seen < &n {
1078                        break;
1079                    }
1080                    all_height += 1;
1081                }
1082            }
1083
1084            // Return the state
1085            context.auditor().state()
1086        })
1087    }
1088
1089    #[test_traced]
1090    fn test_execution_basic() {
1091        test_execution(
1092            42,
1093            Link {
1094                latency: Duration::from_millis(10),
1095                jitter: Duration::from_millis(1),
1096                success_rate: 1.0,
1097            },
1098        );
1099    }
1100
1101    #[test_traced("INFO")]
1102    fn test_execution_good_links() {
1103        let link = Link {
1104            latency: Duration::from_millis(10),
1105            jitter: Duration::from_millis(1),
1106            success_rate: 1.0,
1107        };
1108        for seed in 0..5 {
1109            let state1 = test_execution(seed, link.clone());
1110            let state2 = test_execution(seed, link.clone());
1111            assert_eq!(state1, state2);
1112        }
1113    }
1114
1115    #[test_traced("INFO")]
1116    fn test_execution_bad_links() {
1117        let link = Link {
1118            latency: Duration::from_millis(200),
1119            jitter: Duration::from_millis(150),
1120            success_rate: 0.75,
1121        };
1122        for seed in 0..5 {
1123            let state1 = test_execution(seed, link.clone());
1124            let state2 = test_execution(seed, link.clone());
1125            assert_eq!(state1, state2);
1126        }
1127    }
1128}