alto_chain/
lib.rs

1use serde::{Deserialize, Serialize};
2
3pub mod actors;
4pub mod engine;
5
6#[derive(Deserialize, Serialize)]
7pub struct Config {
8    pub private_key: String,
9    pub share: String,
10    pub identity: String,
11
12    pub port: u16,
13    pub directory: String,
14    pub worker_threads: usize,
15
16    pub allowed_peers: Vec<String>,
17    pub bootstrappers: Vec<String>,
18
19    pub message_backlog: usize,
20    pub mailbox_size: usize,
21}
22
23#[cfg(test)]
24mod tests {
25    use super::*;
26    use commonware_cryptography::{bls12381::dkg::ops, ed25519::PublicKey, Ed25519, Scheme};
27    use commonware_macros::test_traced;
28    use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
29    use commonware_runtime::{
30        deterministic::{self, Executor},
31        Clock, Metrics, Runner, Spawner,
32    };
33    use commonware_utils::quorum;
34    use engine::Engine;
35    use governor::Quota;
36    use rand::{rngs::StdRng, Rng, SeedableRng};
37    use std::time::Duration;
38    use std::{
39        collections::{HashMap, HashSet},
40        num::NonZeroU32,
41        sync::{Arc, Mutex},
42    };
43    use tracing::info;
44
45    /// Registers all validators using the oracle.
46    async fn register_validators(
47        oracle: &mut Oracle<PublicKey>,
48        validators: &[PublicKey],
49    ) -> HashMap<
50        PublicKey,
51        (
52            (Sender<PublicKey>, Receiver<PublicKey>),
53            (Sender<PublicKey>, Receiver<PublicKey>),
54            (Sender<PublicKey>, Receiver<PublicKey>),
55            (Sender<PublicKey>, Receiver<PublicKey>),
56        ),
57    > {
58        let mut registrations = HashMap::new();
59        for validator in validators.iter() {
60            let (voter_sender, voter_receiver) =
61                oracle.register(validator.clone(), 0).await.unwrap();
62            let (resolver_sender, resolver_receiver) =
63                oracle.register(validator.clone(), 1).await.unwrap();
64            let (broadcast_sender, broadcast_receiver) =
65                oracle.register(validator.clone(), 2).await.unwrap();
66            let (backfill_sender, backfill_receiver) =
67                oracle.register(validator.clone(), 3).await.unwrap();
68            registrations.insert(
69                validator.clone(),
70                (
71                    (voter_sender, voter_receiver),
72                    (resolver_sender, resolver_receiver),
73                    (broadcast_sender, broadcast_receiver),
74                    (backfill_sender, backfill_receiver),
75                ),
76            );
77        }
78        registrations
79    }
80
81    /// Links (or unlinks) validators using the oracle.
82    ///
83    /// The `action` parameter determines the action (e.g. link, unlink) to take.
84    /// The `restrict_to` function can be used to restrict the linking to certain connections,
85    /// otherwise all validators will be linked to all other validators.
86    async fn link_validators(
87        oracle: &mut Oracle<PublicKey>,
88        validators: &[PublicKey],
89        link: Link,
90        restrict_to: Option<fn(usize, usize, usize) -> bool>,
91    ) {
92        for (i1, v1) in validators.iter().enumerate() {
93            for (i2, v2) in validators.iter().enumerate() {
94                // Ignore self
95                if v2 == v1 {
96                    continue;
97                }
98
99                // Restrict to certain connections
100                if let Some(f) = restrict_to {
101                    if !f(validators.len(), i1, i2) {
102                        continue;
103                    }
104                }
105
106                // Add link
107                oracle
108                    .add_link(v1.clone(), v2.clone(), link.clone())
109                    .await
110                    .unwrap();
111            }
112        }
113    }
114
115    fn all_online(seed: u64, link: Link) -> String {
116        // Create context
117        let n = 5;
118        let threshold = quorum(n).unwrap();
119        let required_container = 10;
120        let cfg = deterministic::Config {
121            seed,
122            timeout: Some(Duration::from_secs(30)),
123            ..Default::default()
124        };
125        let (executor, mut context, auditor) = Executor::init(cfg);
126        executor.start(async move {
127            // Create simulated network
128            let (network, mut oracle) = Network::new(
129                context.with_label("network"),
130                simulated::Config {
131                    max_size: 1024 * 1024,
132                },
133            );
134
135            // Start network
136            network.start();
137
138            // Register participants
139            let mut schemes = Vec::new();
140            let mut validators = Vec::new();
141            for i in 0..n {
142                let scheme = Ed25519::from_seed(i as u64);
143                let pk = scheme.public_key();
144                schemes.push(scheme);
145                validators.push(pk);
146            }
147            validators.sort();
148            schemes.sort_by_key(|s| s.public_key());
149            let mut registrations = register_validators(&mut oracle, &validators).await;
150
151            // Link all validators
152            link_validators(&mut oracle, &validators, link, None).await;
153
154            // Derive threshold
155            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
156
157            // Create instances
158            let mut public_keys = HashSet::new();
159            for (idx, scheme) in schemes.into_iter().enumerate() {
160                // Create scheme context
161                let public_key = scheme.public_key();
162                public_keys.insert(public_key.clone());
163
164                // Configure engine
165                let uid = format!("validator-{}", public_key);
166                let config = engine::Config {
167                    partition_prefix: uid.clone(),
168                    signer: scheme,
169                    identity: public.clone(),
170                    share: shares[idx],
171                    participants: validators.clone(),
172                    mailbox_size: 1024,
173                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
174                    leader_timeout: Duration::from_secs(1),
175                    notarization_timeout: Duration::from_secs(2),
176                    nullify_retry: Duration::from_secs(10),
177                    fetch_timeout: Duration::from_secs(1),
178                    activity_timeout: 10,
179                    max_fetch_count: 10,
180                    max_fetch_size: 1024 * 512,
181                    fetch_concurrent: 10,
182                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
183                };
184                let engine = Engine::new(context.with_label(&uid), config).await;
185
186                // Get networking
187                let (voter, resolver, broadcast, backfill) =
188                    registrations.remove(&public_key).unwrap();
189
190                // Start engine
191                engine.start(voter, resolver, broadcast, backfill);
192            }
193
194            // Poll metrics
195            loop {
196                let metrics = context.encode();
197
198                // Iterate over all lines
199                let mut failed = false;
200                for line in metrics.lines() {
201                    // Ensure it is a metrics line
202                    if !line.starts_with("validator-") {
203                        continue;
204                    }
205
206                    // Split metric and value
207                    let mut parts = line.split_whitespace();
208                    let metric = parts.next().unwrap();
209                    let value = parts.next().unwrap();
210
211                    // If ends with peers_blocked, ensure it is zero
212                    if metric.ends_with("_peers_blocked") {
213                        let value = value.parse::<u64>().unwrap();
214                        assert_eq!(value, 0);
215                    }
216
217                    // If ends with indexed_height, ensure it is at least required_container
218                    if metric.ends_with("_syncer_indexed_height") {
219                        let value = value.parse::<u64>().unwrap();
220                        if value < required_container {
221                            failed = true;
222                            break;
223                        }
224                    }
225                }
226                if !failed {
227                    break;
228                }
229
230                // Still waiting for all validators to complete
231                context.sleep(Duration::from_secs(1)).await;
232            }
233        });
234        auditor.state()
235    }
236
237    #[test_traced]
238    fn test_good_links() {
239        let link = Link {
240            latency: 10.0,
241            jitter: 1.0,
242            success_rate: 1.0,
243        };
244        for seed in 0..5 {
245            let state = all_online(seed, link.clone());
246            assert_eq!(state, all_online(seed, link.clone()));
247        }
248    }
249
250    #[test_traced]
251    fn test_bad_links() {
252        let link = Link {
253            latency: 200.0,
254            jitter: 150.0,
255            success_rate: 0.75,
256        };
257        for seed in 0..5 {
258            let state = all_online(seed, link.clone());
259            assert_eq!(state, all_online(seed, link.clone()));
260        }
261    }
262
263    #[test_traced]
264    fn test_backfill() {
265        // Create context
266        let n = 5;
267        let threshold = quorum(n).unwrap();
268        let initial_container_required = 10;
269        let final_container_required = 20;
270        let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
271        executor.start(async move {
272            // Create simulated network
273            let (network, mut oracle) = Network::new(
274                context.with_label("network"),
275                simulated::Config {
276                    max_size: 1024 * 1024,
277                },
278            );
279
280            // Start network
281            network.start();
282
283            // Register participants
284            let mut schemes = Vec::new();
285            let mut validators = Vec::new();
286            for i in 0..n {
287                let scheme = Ed25519::from_seed(i as u64);
288                let pk = scheme.public_key();
289                schemes.push(scheme);
290                validators.push(pk);
291            }
292            validators.sort();
293            schemes.sort_by_key(|s| s.public_key());
294            let mut registrations = register_validators(&mut oracle, &validators).await;
295
296            // Link all validators (except 0)
297            let link = Link {
298                latency: 10.0,
299                jitter: 1.0,
300                success_rate: 1.0,
301            };
302            link_validators(
303                &mut oracle,
304                &validators,
305                link.clone(),
306                Some(|_, i, j| ![i, j].contains(&0usize)),
307            )
308            .await;
309
310            // Derive threshold
311            let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
312
313            // Create instances
314            for (idx, scheme) in schemes.iter().enumerate() {
315                // Skip first
316                if idx == 0 {
317                    continue;
318                }
319
320                // Configure engine
321                let public_key = scheme.public_key();
322                let uid = format!("validator-{}", public_key);
323                let config = engine::Config {
324                    partition_prefix: uid.clone(),
325                    signer: scheme.clone(),
326                    identity: public.clone(),
327                    share: shares[idx],
328                    participants: validators.clone(),
329                    mailbox_size: 1024,
330                    backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
331                    leader_timeout: Duration::from_secs(1),
332                    notarization_timeout: Duration::from_secs(2),
333                    nullify_retry: Duration::from_secs(10),
334                    fetch_timeout: Duration::from_secs(1),
335                    activity_timeout: 10,
336                    max_fetch_count: 10,
337                    max_fetch_size: 1024 * 512,
338                    fetch_concurrent: 10,
339                    fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
340                };
341                let engine = Engine::new(context.with_label(&uid), config).await;
342
343                // Get networking
344                let (voter, resolver, broadcast, backfill) =
345                    registrations.remove(&public_key).unwrap();
346
347                // Start engine
348                engine.start(voter, resolver, broadcast, backfill);
349            }
350
351            // Poll metrics
352            loop {
353                let metrics = context.encode();
354
355                // Iterate over all lines
356                let mut failed = false;
357                for line in metrics.lines() {
358                    // Ensure it is a metrics line
359                    if !line.starts_with("validator-") {
360                        continue;
361                    }
362
363                    // Split metric and value
364                    let mut parts = line.split_whitespace();
365                    let metric = parts.next().unwrap();
366                    let value = parts.next().unwrap();
367
368                    // If ends with peers_blocked, ensure it is zero
369                    if metric.ends_with("_peers_blocked") {
370                        let value = value.parse::<u64>().unwrap();
371                        assert_eq!(value, 0);
372                    }
373
374                    // If ends with indexed_height, ensure it is at least required_container
375                    if metric.ends_with("_syncer_indexed_height") {
376                        let value = value.parse::<u64>().unwrap();
377                        if value < initial_container_required {
378                            failed = true;
379                            break;
380                        }
381                    }
382                }
383                if !failed {
384                    break;
385                }
386
387                // Still waiting for all validators to complete
388                context.sleep(Duration::from_secs(1)).await;
389            }
390
391            // Link first peer
392            link_validators(
393                &mut oracle,
394                &validators,
395                link,
396                Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
397            )
398            .await;
399
400            // Configure engine
401            let scheme = schemes[0].clone();
402            let share = shares[0];
403            let public_key = scheme.public_key();
404            let uid = format!("validator-{}", public_key);
405            let config = engine::Config {
406                partition_prefix: uid.clone(),
407                signer: scheme.clone(),
408                identity: public.clone(),
409                share,
410                participants: validators.clone(),
411                mailbox_size: 1024,
412                backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
413                leader_timeout: Duration::from_secs(1),
414                notarization_timeout: Duration::from_secs(2),
415                nullify_retry: Duration::from_secs(10),
416                fetch_timeout: Duration::from_secs(1),
417                activity_timeout: 10,
418                max_fetch_count: 10,
419                max_fetch_size: 1024 * 512,
420                fetch_concurrent: 10,
421                fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
422            };
423            let engine = Engine::new(context.with_label(&uid), config).await;
424
425            // Get networking
426            let (voter, resolver, broadcast, backfill) = registrations.remove(&public_key).unwrap();
427
428            // Start engine
429            engine.start(voter, resolver, broadcast, backfill);
430
431            // Poll metrics
432            loop {
433                let metrics = context.encode();
434
435                // Iterate over all lines
436                let mut failed = false;
437                for line in metrics.lines() {
438                    // Ensure it is a metrics line
439                    if !line.starts_with("validator-") {
440                        continue;
441                    }
442
443                    // Split metric and value
444                    let mut parts = line.split_whitespace();
445                    let metric = parts.next().unwrap();
446                    let value = parts.next().unwrap();
447
448                    // If ends with peers_blocked, ensure it is zero
449                    if metric.ends_with("_peers_blocked") {
450                        let value = value.parse::<u64>().unwrap();
451                        assert_eq!(value, 0);
452                    }
453
454                    // If ends with indexed_height, ensure it is at least required_container
455                    if metric.ends_with("_syncer_indexed_height") {
456                        let value = value.parse::<u64>().unwrap();
457                        if value < final_container_required {
458                            failed = true;
459                            break;
460                        }
461                    }
462                }
463                if !failed {
464                    break;
465                }
466
467                // Still waiting for all validators to complete
468                context.sleep(Duration::from_secs(1)).await;
469            }
470        });
471    }
472
473    #[test_traced]
474    fn test_unclean_shutdown() {
475        // Create context
476        let n = 5;
477        let threshold = quorum(n).unwrap();
478        let required_container = 100;
479
480        // Derive threshold
481        let mut rng = StdRng::seed_from_u64(0);
482        let (public, shares) = ops::generate_shares(&mut rng, None, n, threshold);
483
484        // Random restarts every x seconds
485        let mut runs = 0;
486        let done = Arc::new(Mutex::new(false));
487        let (mut executor, mut context, _) = Executor::timed(Duration::from_secs(10));
488        while !*done.lock().unwrap() {
489            runs += 1;
490            executor.start({
491                let mut context = context.clone();
492                let public = public.clone();
493                let shares = shares.clone();
494                let done = done.clone();
495                async move {
496                    // Create simulated network
497                    let (network, mut oracle) = Network::new(
498                        context.with_label("network"),
499                        simulated::Config {
500                            max_size: 1024 * 1024,
501                        },
502                    );
503
504                    // Start network
505                    network.start();
506
507                    // Register participants
508                    let mut schemes = Vec::new();
509                    let mut validators = Vec::new();
510                    for i in 0..n {
511                        let scheme = Ed25519::from_seed(i as u64);
512                        let pk = scheme.public_key();
513                        schemes.push(scheme);
514                        validators.push(pk);
515                    }
516                    validators.sort();
517                    schemes.sort_by_key(|s| s.public_key());
518                    let mut registrations = register_validators(&mut oracle, &validators).await;
519
520                    // Link all validators
521                    let link = Link {
522                        latency: 10.0,
523                        jitter: 1.0,
524                        success_rate: 1.0,
525                    };
526                    link_validators(&mut oracle, &validators, link, None).await;
527
528                    // Create instances
529                    let mut public_keys = HashSet::new();
530                    for (idx, scheme) in schemes.into_iter().enumerate() {
531                        // Create scheme context
532                        let public_key = scheme.public_key();
533                        public_keys.insert(public_key.clone());
534
535                        // Configure engine
536                        let uid = format!("validator-{}", public_key);
537                        let config = engine::Config {
538                            partition_prefix: uid.clone(),
539                            signer: scheme,
540                            identity: public.clone(),
541                            share: shares[idx],
542                            participants: validators.clone(),
543                            mailbox_size: 1024,
544                            backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
545                            leader_timeout: Duration::from_secs(1),
546                            notarization_timeout: Duration::from_secs(2),
547                            nullify_retry: Duration::from_secs(10),
548                            fetch_timeout: Duration::from_secs(1),
549                            activity_timeout: 10,
550                            max_fetch_count: 10,
551                            max_fetch_size: 1024 * 512,
552                            fetch_concurrent: 10,
553                            fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
554                        };
555                        let engine = Engine::new(context.with_label(&uid), config).await;
556
557                        // Get networking
558                        let (voter, resolver, broadcast, backfill) =
559                            registrations.remove(&public_key).unwrap();
560
561                        // Start engine
562                        engine.start(voter, resolver, broadcast, backfill);
563                    }
564
565                    // Poll metrics
566                    context
567                        .with_label("metrics")
568                        .spawn(move |context| async move {
569                            loop {
570                                let metrics = context.encode();
571
572                                // Iterate over all lines
573                                let mut failed = false;
574                                for line in metrics.lines() {
575                                    // Ensure it is a metrics line
576                                    if !line.starts_with("validator-") {
577                                        continue;
578                                    }
579
580                                    // Split metric and value
581                                    let mut parts = line.split_whitespace();
582                                    let metric = parts.next().unwrap();
583                                    let value = parts.next().unwrap();
584
585                                    // If ends with peers_blocked, ensure it is zero
586                                    if metric.ends_with("_peers_blocked") {
587                                        let value = value.parse::<u64>().unwrap();
588                                        assert_eq!(value, 0);
589                                    }
590
591                                    // If ends with indexed_height, ensure it is at least required_container
592                                    if metric.ends_with("_syncer_indexed_height") {
593                                        let value = value.parse::<u64>().unwrap();
594                                        if value < required_container {
595                                            failed = true;
596                                            break;
597                                        }
598                                    }
599                                }
600                                if !failed {
601                                    break;
602                                }
603
604                                // Still waiting for all validators to complete
605                                context.sleep(Duration::from_millis(10)).await;
606                            }
607                            *done.lock().unwrap() = true;
608                        });
609
610                    // Exit at random points until finished
611                    let wait =
612                        context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
613                    context.sleep(wait).await;
614                }
615            });
616
617            // Recover context
618            (executor, context, _) = context.recover();
619        }
620        assert!(runs > 1);
621        info!(runs, "unclean shutdown recovery worked");
622    }
623}