1use std::future::Future;
2
3use bytes::Bytes;
4use commonware_cryptography::bls12381;
5use serde::{Deserialize, Serialize};
6
7pub mod actors;
8pub mod engine;
9
10pub trait Indexer: Clone + Send + Sync + 'static {
12 type Error: std::error::Error + Send + Sync + 'static;
13
14 fn new(uri: &str, public: bls12381::PublicKey) -> Self;
16
17 fn seed_upload(&self, seed: Bytes) -> impl Future<Output = Result<(), Self::Error>> + Send;
19
20 fn notarization_upload(
22 &self,
23 notarized: Bytes,
24 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
25
26 fn finalization_upload(
28 &self,
29 finalized: Bytes,
30 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
31}
32
33impl Indexer for alto_client::Client {
34 type Error = alto_client::Error;
35 fn new(uri: &str, public: bls12381::PublicKey) -> Self {
36 Self::new(uri, public)
37 }
38
39 fn seed_upload(
40 &self,
41 seed: bytes::Bytes,
42 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
43 self.seed_upload(seed)
44 }
45
46 fn notarization_upload(
47 &self,
48 notarization: bytes::Bytes,
49 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
50 self.notarization_upload(notarization)
51 }
52
53 fn finalization_upload(
54 &self,
55 finalization: bytes::Bytes,
56 ) -> impl Future<Output = Result<(), Self::Error>> + Send {
57 self.finalization_upload(finalization)
58 }
59}
60
61#[derive(Deserialize, Serialize)]
62pub struct Config {
63 pub private_key: String,
64 pub share: String,
65 pub identity: String,
66
67 pub port: u16,
68 pub directory: String,
69 pub worker_threads: usize,
70
71 pub allowed_peers: Vec<String>,
72 pub bootstrappers: Vec<String>,
73
74 pub message_backlog: usize,
75 pub mailbox_size: usize,
76
77 pub indexer: Option<String>,
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use alto_types::{Finalized, Notarized, Seed};
84 use bls12381::primitives::poly;
85 use commonware_cryptography::{bls12381::dkg::ops, ed25519::PublicKey, Ed25519, Scheme};
86 use commonware_macros::test_traced;
87 use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
88 use commonware_runtime::{
89 deterministic::{self, Executor},
90 Clock, Metrics, Runner, Spawner,
91 };
92 use commonware_utils::quorum;
93 use engine::{Config, Engine};
94 use governor::Quota;
95 use rand::{rngs::StdRng, Rng, SeedableRng};
96 use std::{
97 collections::{HashMap, HashSet},
98 num::NonZeroU32,
99 sync::{Arc, Mutex},
100 };
101 use std::{sync::atomic::AtomicBool, time::Duration};
102 use tracing::info;
103
104 #[derive(Clone)]
106 struct MockIndexer {
107 public: bls12381::PublicKey,
108
109 seed_seen: Arc<AtomicBool>,
110 notarization_seen: Arc<AtomicBool>,
111 finalization_seen: Arc<AtomicBool>,
112 }
113
114 impl Indexer for MockIndexer {
115 type Error = std::io::Error;
116
117 fn new(_: &str, public: bls12381::PublicKey) -> Self {
118 MockIndexer {
119 public,
120 seed_seen: Arc::new(AtomicBool::new(false)),
121 notarization_seen: Arc::new(AtomicBool::new(false)),
122 finalization_seen: Arc::new(AtomicBool::new(false)),
123 }
124 }
125
126 async fn seed_upload(&self, seed: Bytes) -> Result<(), Self::Error> {
127 Seed::deserialize(Some(&self.public), &seed).unwrap();
128 self.seed_seen
129 .store(true, std::sync::atomic::Ordering::Relaxed);
130 Ok(())
131 }
132
133 async fn notarization_upload(&self, notarized: Bytes) -> Result<(), Self::Error> {
134 Notarized::deserialize(Some(&self.public), ¬arized).unwrap();
135 self.notarization_seen
136 .store(true, std::sync::atomic::Ordering::Relaxed);
137 Ok(())
138 }
139
140 async fn finalization_upload(&self, finalized: Bytes) -> Result<(), Self::Error> {
141 Finalized::deserialize(Some(&self.public), &finalized).unwrap();
142 self.finalization_seen
143 .store(true, std::sync::atomic::Ordering::Relaxed);
144 Ok(())
145 }
146 }
147
148 async fn register_validators(
150 oracle: &mut Oracle<PublicKey>,
151 validators: &[PublicKey],
152 ) -> HashMap<
153 PublicKey,
154 (
155 (Sender<PublicKey>, Receiver<PublicKey>),
156 (Sender<PublicKey>, Receiver<PublicKey>),
157 (Sender<PublicKey>, Receiver<PublicKey>),
158 (Sender<PublicKey>, Receiver<PublicKey>),
159 ),
160 > {
161 let mut registrations = HashMap::new();
162 for validator in validators.iter() {
163 let (voter_sender, voter_receiver) =
164 oracle.register(validator.clone(), 0).await.unwrap();
165 let (resolver_sender, resolver_receiver) =
166 oracle.register(validator.clone(), 1).await.unwrap();
167 let (broadcast_sender, broadcast_receiver) =
168 oracle.register(validator.clone(), 2).await.unwrap();
169 let (backfill_sender, backfill_receiver) =
170 oracle.register(validator.clone(), 3).await.unwrap();
171 registrations.insert(
172 validator.clone(),
173 (
174 (voter_sender, voter_receiver),
175 (resolver_sender, resolver_receiver),
176 (broadcast_sender, broadcast_receiver),
177 (backfill_sender, backfill_receiver),
178 ),
179 );
180 }
181 registrations
182 }
183
184 async fn link_validators(
190 oracle: &mut Oracle<PublicKey>,
191 validators: &[PublicKey],
192 link: Link,
193 restrict_to: Option<fn(usize, usize, usize) -> bool>,
194 ) {
195 for (i1, v1) in validators.iter().enumerate() {
196 for (i2, v2) in validators.iter().enumerate() {
197 if v2 == v1 {
199 continue;
200 }
201
202 if let Some(f) = restrict_to {
204 if !f(validators.len(), i1, i2) {
205 continue;
206 }
207 }
208
209 oracle
211 .add_link(v1.clone(), v2.clone(), link.clone())
212 .await
213 .unwrap();
214 }
215 }
216 }
217
218 fn all_online(seed: u64, link: Link) -> String {
219 let n = 5;
221 let threshold = quorum(n).unwrap();
222 let required_container = 10;
223 let cfg = deterministic::Config {
224 seed,
225 timeout: Some(Duration::from_secs(30)),
226 ..Default::default()
227 };
228 let (executor, mut context, auditor) = Executor::init(cfg);
229 executor.start(async move {
230 let (network, mut oracle) = Network::new(
232 context.with_label("network"),
233 simulated::Config {
234 max_size: 1024 * 1024,
235 },
236 );
237
238 network.start();
240
241 let mut schemes = Vec::new();
243 let mut validators = Vec::new();
244 for i in 0..n {
245 let scheme = Ed25519::from_seed(i as u64);
246 let pk = scheme.public_key();
247 schemes.push(scheme);
248 validators.push(pk);
249 }
250 validators.sort();
251 schemes.sort_by_key(|s| s.public_key());
252 let mut registrations = register_validators(&mut oracle, &validators).await;
253
254 link_validators(&mut oracle, &validators, link, None).await;
256
257 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
259
260 let mut public_keys = HashSet::new();
262 for (idx, scheme) in schemes.into_iter().enumerate() {
263 let public_key = scheme.public_key();
265 public_keys.insert(public_key.clone());
266
267 let uid = format!("validator-{}", public_key);
269 let config: Config<MockIndexer> = engine::Config {
270 partition_prefix: uid.clone(),
271 signer: scheme,
272 identity: public.clone(),
273 share: shares[idx],
274 participants: validators.clone(),
275 mailbox_size: 1024,
276 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
277 leader_timeout: Duration::from_secs(1),
278 notarization_timeout: Duration::from_secs(2),
279 nullify_retry: Duration::from_secs(10),
280 fetch_timeout: Duration::from_secs(1),
281 activity_timeout: 10,
282 max_fetch_count: 10,
283 max_fetch_size: 1024 * 512,
284 fetch_concurrent: 10,
285 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
286 indexer: None,
287 };
288 let engine = Engine::new(context.with_label(&uid), config).await;
289
290 let (voter, resolver, broadcast, backfill) =
292 registrations.remove(&public_key).unwrap();
293
294 engine.start(voter, resolver, broadcast, backfill);
296 }
297
298 loop {
300 let metrics = context.encode();
301
302 let mut success = false;
304 for line in metrics.lines() {
305 if !line.starts_with("validator-") {
307 continue;
308 }
309
310 let mut parts = line.split_whitespace();
312 let metric = parts.next().unwrap();
313 let value = parts.next().unwrap();
314
315 if metric.ends_with("_peers_blocked") {
317 let value = value.parse::<u64>().unwrap();
318 assert_eq!(value, 0);
319 }
320
321 if metric.ends_with("_syncer_contiguous_height") {
323 let value = value.parse::<u64>().unwrap();
324 if value >= required_container {
325 success = true;
326 break;
327 }
328 }
329 }
330 if !success {
331 break;
332 }
333
334 context.sleep(Duration::from_secs(1)).await;
336 }
337 });
338 auditor.state()
339 }
340
341 #[test_traced]
342 fn test_good_links() {
343 let link = Link {
344 latency: 10.0,
345 jitter: 1.0,
346 success_rate: 1.0,
347 };
348 for seed in 0..5 {
349 let state = all_online(seed, link.clone());
350 assert_eq!(state, all_online(seed, link.clone()));
351 }
352 }
353
354 #[test_traced]
355 fn test_bad_links() {
356 let link = Link {
357 latency: 200.0,
358 jitter: 150.0,
359 success_rate: 0.75,
360 };
361 for seed in 0..5 {
362 let state = all_online(seed, link.clone());
363 assert_eq!(state, all_online(seed, link.clone()));
364 }
365 }
366
367 #[test_traced]
368 fn test_backfill() {
369 let n = 5;
371 let threshold = quorum(n).unwrap();
372 let initial_container_required = 10;
373 let final_container_required = 20;
374 let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
375 executor.start(async move {
376 let (network, mut oracle) = Network::new(
378 context.with_label("network"),
379 simulated::Config {
380 max_size: 1024 * 1024,
381 },
382 );
383
384 network.start();
386
387 let mut schemes = Vec::new();
389 let mut validators = Vec::new();
390 for i in 0..n {
391 let scheme = Ed25519::from_seed(i as u64);
392 let pk = scheme.public_key();
393 schemes.push(scheme);
394 validators.push(pk);
395 }
396 validators.sort();
397 schemes.sort_by_key(|s| s.public_key());
398 let mut registrations = register_validators(&mut oracle, &validators).await;
399
400 let link = Link {
402 latency: 10.0,
403 jitter: 1.0,
404 success_rate: 1.0,
405 };
406 link_validators(
407 &mut oracle,
408 &validators,
409 link.clone(),
410 Some(|_, i, j| ![i, j].contains(&0usize)),
411 )
412 .await;
413
414 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
416
417 for (idx, scheme) in schemes.iter().enumerate() {
419 if idx == 0 {
421 continue;
422 }
423
424 let public_key = scheme.public_key();
426 let uid = format!("validator-{}", public_key);
427 let config: Config<MockIndexer> = engine::Config {
428 partition_prefix: uid.clone(),
429 signer: scheme.clone(),
430 identity: public.clone(),
431 share: shares[idx],
432 participants: validators.clone(),
433 mailbox_size: 1024,
434 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
435 leader_timeout: Duration::from_secs(1),
436 notarization_timeout: Duration::from_secs(2),
437 nullify_retry: Duration::from_secs(10),
438 fetch_timeout: Duration::from_secs(1),
439 activity_timeout: 10,
440 max_fetch_count: 10,
441 max_fetch_size: 1024 * 512,
442 fetch_concurrent: 10,
443 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
444 indexer: None,
445 };
446 let engine = Engine::new(context.with_label(&uid), config).await;
447
448 let (voter, resolver, broadcast, backfill) =
450 registrations.remove(&public_key).unwrap();
451
452 engine.start(voter, resolver, broadcast, backfill);
454 }
455
456 loop {
458 let metrics = context.encode();
459
460 let mut success = true;
462 for line in metrics.lines() {
463 if !line.starts_with("validator-") {
465 continue;
466 }
467
468 let mut parts = line.split_whitespace();
470 let metric = parts.next().unwrap();
471 let value = parts.next().unwrap();
472
473 if metric.ends_with("_peers_blocked") {
475 let value = value.parse::<u64>().unwrap();
476 assert_eq!(value, 0);
477 }
478
479 if metric.ends_with("_syncer_contiguous_height") {
481 let value = value.parse::<u64>().unwrap();
482 if value >= initial_container_required {
483 success = true;
484 break;
485 }
486 }
487 }
488 if success {
489 break;
490 }
491
492 context.sleep(Duration::from_secs(1)).await;
494 }
495
496 link_validators(
498 &mut oracle,
499 &validators,
500 link,
501 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
502 )
503 .await;
504
505 let scheme = schemes[0].clone();
507 let share = shares[0];
508 let public_key = scheme.public_key();
509 let uid = format!("validator-{}", public_key);
510 let config: Config<MockIndexer> = engine::Config {
511 partition_prefix: uid.clone(),
512 signer: scheme.clone(),
513 identity: public.clone(),
514 share,
515 participants: validators.clone(),
516 mailbox_size: 1024,
517 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
518 leader_timeout: Duration::from_secs(1),
519 notarization_timeout: Duration::from_secs(2),
520 nullify_retry: Duration::from_secs(10),
521 fetch_timeout: Duration::from_secs(1),
522 activity_timeout: 10,
523 max_fetch_count: 10,
524 max_fetch_size: 1024 * 512,
525 fetch_concurrent: 10,
526 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
527 indexer: None,
528 };
529 let engine = Engine::new(context.with_label(&uid), config).await;
530
531 let (voter, resolver, broadcast, backfill) = registrations.remove(&public_key).unwrap();
533
534 engine.start(voter, resolver, broadcast, backfill);
536
537 loop {
539 let metrics = context.encode();
540
541 let mut success = false;
543 for line in metrics.lines() {
544 if !line.starts_with("validator-") {
546 continue;
547 }
548
549 let mut parts = line.split_whitespace();
551 let metric = parts.next().unwrap();
552 let value = parts.next().unwrap();
553
554 if metric.ends_with("_peers_blocked") {
556 let value = value.parse::<u64>().unwrap();
557 assert_eq!(value, 0);
558 }
559
560 if metric.ends_with("_syncer_contiguous_height") {
562 let value = value.parse::<u64>().unwrap();
563 if value >= final_container_required {
564 success = true;
565 break;
566 }
567 }
568 }
569 if success {
570 break;
571 }
572
573 context.sleep(Duration::from_secs(1)).await;
575 }
576 });
577 }
578
579 #[test_traced]
580 fn test_unclean_shutdown() {
581 let n = 5;
583 let threshold = quorum(n).unwrap();
584 let required_container = 100;
585
586 let mut rng = StdRng::seed_from_u64(0);
588 let (public, shares) = ops::generate_shares(&mut rng, None, n, threshold);
589
590 let mut runs = 0;
592 let done = Arc::new(Mutex::new(false));
593 let (mut executor, mut context, _) = Executor::timed(Duration::from_secs(10));
594 while !*done.lock().unwrap() {
595 runs += 1;
596 executor.start({
597 let mut context = context.clone();
598 let public = public.clone();
599 let shares = shares.clone();
600 let done = done.clone();
601 async move {
602 let (network, mut oracle) = Network::new(
604 context.with_label("network"),
605 simulated::Config {
606 max_size: 1024 * 1024,
607 },
608 );
609
610 network.start();
612
613 let mut schemes = Vec::new();
615 let mut validators = Vec::new();
616 for i in 0..n {
617 let scheme = Ed25519::from_seed(i as u64);
618 let pk = scheme.public_key();
619 schemes.push(scheme);
620 validators.push(pk);
621 }
622 validators.sort();
623 schemes.sort_by_key(|s| s.public_key());
624 let mut registrations = register_validators(&mut oracle, &validators).await;
625
626 let link = Link {
628 latency: 10.0,
629 jitter: 1.0,
630 success_rate: 1.0,
631 };
632 link_validators(&mut oracle, &validators, link, None).await;
633
634 let mut public_keys = HashSet::new();
636 for (idx, scheme) in schemes.into_iter().enumerate() {
637 let public_key = scheme.public_key();
639 public_keys.insert(public_key.clone());
640
641 let uid = format!("validator-{}", public_key);
643 let config: Config<MockIndexer> = engine::Config {
644 partition_prefix: uid.clone(),
645 signer: scheme,
646 identity: public.clone(),
647 share: shares[idx],
648 participants: validators.clone(),
649 mailbox_size: 1024,
650 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
651 leader_timeout: Duration::from_secs(1),
652 notarization_timeout: Duration::from_secs(2),
653 nullify_retry: Duration::from_secs(10),
654 fetch_timeout: Duration::from_secs(1),
655 activity_timeout: 10,
656 max_fetch_count: 10,
657 max_fetch_size: 1024 * 512,
658 fetch_concurrent: 10,
659 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
660 indexer: None,
661 };
662 let engine = Engine::new(context.with_label(&uid), config).await;
663
664 let (voter, resolver, broadcast, backfill) =
666 registrations.remove(&public_key).unwrap();
667
668 engine.start(voter, resolver, broadcast, backfill);
670 }
671
672 context
674 .with_label("metrics")
675 .spawn(move |context| async move {
676 loop {
677 let metrics = context.encode();
678
679 let mut success = false;
681 for line in metrics.lines() {
682 if !line.starts_with("validator-") {
684 continue;
685 }
686
687 let mut parts = line.split_whitespace();
689 let metric = parts.next().unwrap();
690 let value = parts.next().unwrap();
691
692 if metric.ends_with("_peers_blocked") {
694 let value = value.parse::<u64>().unwrap();
695 assert_eq!(value, 0);
696 }
697
698 if metric.ends_with("_syncer_contiguous_height") {
700 let value = value.parse::<u64>().unwrap();
701 if value >= required_container {
702 success = true;
703 break;
704 }
705 }
706 }
707 if success {
708 break;
709 }
710
711 context.sleep(Duration::from_millis(10)).await;
713 }
714 *done.lock().unwrap() = true;
715 });
716
717 let wait =
719 context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
720 context.sleep(wait).await;
721 }
722 });
723
724 (executor, context, _) = context.recover();
726 }
727 assert!(runs > 1);
728 info!(runs, "unclean shutdown recovery worked");
729 }
730
731 #[test_traced]
732 fn test_indexer() {
733 let n = 5;
735 let threshold = quorum(n).unwrap();
736 let required_container = 10;
737 let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
738 executor.start(async move {
739 let (network, mut oracle) = Network::new(
741 context.with_label("network"),
742 simulated::Config {
743 max_size: 1024 * 1024,
744 },
745 );
746
747 network.start();
749
750 let mut schemes = Vec::new();
752 let mut validators = Vec::new();
753 for i in 0..n {
754 let scheme = Ed25519::from_seed(i as u64);
755 let pk = scheme.public_key();
756 schemes.push(scheme);
757 validators.push(pk);
758 }
759 validators.sort();
760 schemes.sort_by_key(|s| s.public_key());
761 let mut registrations = register_validators(&mut oracle, &validators).await;
762
763 let link = Link {
765 latency: 10.0,
766 jitter: 1.0,
767 success_rate: 1.0,
768 };
769 link_validators(&mut oracle, &validators, link, None).await;
770
771 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
773
774 let indexer = MockIndexer::new("", poly::public(&public).into());
776
777 let mut public_keys = HashSet::new();
779 for (idx, scheme) in schemes.into_iter().enumerate() {
780 let public_key = scheme.public_key();
782 public_keys.insert(public_key.clone());
783
784 let uid = format!("validator-{}", public_key);
786 let config: Config<MockIndexer> = engine::Config {
787 partition_prefix: uid.clone(),
788 signer: scheme,
789 identity: public.clone(),
790 share: shares[idx],
791 participants: validators.clone(),
792 mailbox_size: 1024,
793 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
794 leader_timeout: Duration::from_secs(1),
795 notarization_timeout: Duration::from_secs(2),
796 nullify_retry: Duration::from_secs(10),
797 fetch_timeout: Duration::from_secs(1),
798 activity_timeout: 10,
799 max_fetch_count: 10,
800 max_fetch_size: 1024 * 512,
801 fetch_concurrent: 10,
802 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
803 indexer: Some(indexer.clone()),
804 };
805 let engine = Engine::new(context.with_label(&uid), config).await;
806
807 let (voter, resolver, broadcast, backfill) =
809 registrations.remove(&public_key).unwrap();
810
811 engine.start(voter, resolver, broadcast, backfill);
813 }
814
815 loop {
817 let metrics = context.encode();
818
819 let mut success = false;
821 for line in metrics.lines() {
822 if !line.starts_with("validator-") {
824 continue;
825 }
826
827 let mut parts = line.split_whitespace();
829 let metric = parts.next().unwrap();
830 let value = parts.next().unwrap();
831
832 if metric.ends_with("_peers_blocked") {
834 let value = value.parse::<u64>().unwrap();
835 assert_eq!(value, 0);
836 }
837
838 if metric.ends_with("_syncer_contiguous_height") {
840 let value = value.parse::<u64>().unwrap();
841 if value >= required_container {
842 success = true;
843 break;
844 }
845 }
846 }
847 if success {
848 break;
849 }
850
851 context.sleep(Duration::from_secs(1)).await;
853 }
854
855 assert!(indexer.seed_seen.load(std::sync::atomic::Ordering::Relaxed));
857 assert!(indexer
858 .notarization_seen
859 .load(std::sync::atomic::Ordering::Relaxed));
860 assert!(indexer
861 .finalization_seen
862 .load(std::sync::atomic::Ordering::Relaxed));
863 });
864 }
865}