1use serde::{Deserialize, Serialize};
2
3pub mod actors;
4pub mod engine;
5
6#[derive(Deserialize, Serialize)]
7pub struct Config {
8 pub private_key: String,
9 pub share: String,
10 pub identity: String,
11
12 pub port: u16,
13 pub directory: String,
14 pub worker_threads: usize,
15
16 pub allowed_peers: Vec<String>,
17 pub bootstrappers: Vec<String>,
18
19 pub message_backlog: usize,
20 pub mailbox_size: usize,
21}
22
23#[cfg(test)]
24mod tests {
25 use super::*;
26 use commonware_cryptography::{bls12381::dkg::ops, ed25519::PublicKey, Ed25519, Scheme};
27 use commonware_macros::test_traced;
28 use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender};
29 use commonware_runtime::{
30 deterministic::{self, Executor},
31 Clock, Metrics, Runner, Spawner,
32 };
33 use commonware_utils::quorum;
34 use engine::Engine;
35 use governor::Quota;
36 use rand::{rngs::StdRng, Rng, SeedableRng};
37 use std::time::Duration;
38 use std::{
39 collections::{HashMap, HashSet},
40 num::NonZeroU32,
41 sync::{Arc, Mutex},
42 };
43 use tracing::info;
44
45 async fn register_validators(
47 oracle: &mut Oracle<PublicKey>,
48 validators: &[PublicKey],
49 ) -> HashMap<
50 PublicKey,
51 (
52 (Sender<PublicKey>, Receiver<PublicKey>),
53 (Sender<PublicKey>, Receiver<PublicKey>),
54 (Sender<PublicKey>, Receiver<PublicKey>),
55 (Sender<PublicKey>, Receiver<PublicKey>),
56 ),
57 > {
58 let mut registrations = HashMap::new();
59 for validator in validators.iter() {
60 let (voter_sender, voter_receiver) =
61 oracle.register(validator.clone(), 0).await.unwrap();
62 let (resolver_sender, resolver_receiver) =
63 oracle.register(validator.clone(), 1).await.unwrap();
64 let (broadcast_sender, broadcast_receiver) =
65 oracle.register(validator.clone(), 2).await.unwrap();
66 let (backfill_sender, backfill_receiver) =
67 oracle.register(validator.clone(), 3).await.unwrap();
68 registrations.insert(
69 validator.clone(),
70 (
71 (voter_sender, voter_receiver),
72 (resolver_sender, resolver_receiver),
73 (broadcast_sender, broadcast_receiver),
74 (backfill_sender, backfill_receiver),
75 ),
76 );
77 }
78 registrations
79 }
80
81 async fn link_validators(
87 oracle: &mut Oracle<PublicKey>,
88 validators: &[PublicKey],
89 link: Link,
90 restrict_to: Option<fn(usize, usize, usize) -> bool>,
91 ) {
92 for (i1, v1) in validators.iter().enumerate() {
93 for (i2, v2) in validators.iter().enumerate() {
94 if v2 == v1 {
96 continue;
97 }
98
99 if let Some(f) = restrict_to {
101 if !f(validators.len(), i1, i2) {
102 continue;
103 }
104 }
105
106 oracle
108 .add_link(v1.clone(), v2.clone(), link.clone())
109 .await
110 .unwrap();
111 }
112 }
113 }
114
115 fn all_online(seed: u64, link: Link) -> String {
116 let n = 5;
118 let threshold = quorum(n).unwrap();
119 let required_container = 10;
120 let cfg = deterministic::Config {
121 seed,
122 timeout: Some(Duration::from_secs(30)),
123 ..Default::default()
124 };
125 let (executor, mut context, auditor) = Executor::init(cfg);
126 executor.start(async move {
127 let (network, mut oracle) = Network::new(
129 context.with_label("network"),
130 simulated::Config {
131 max_size: 1024 * 1024,
132 },
133 );
134
135 network.start();
137
138 let mut schemes = Vec::new();
140 let mut validators = Vec::new();
141 for i in 0..n {
142 let scheme = Ed25519::from_seed(i as u64);
143 let pk = scheme.public_key();
144 schemes.push(scheme);
145 validators.push(pk);
146 }
147 validators.sort();
148 schemes.sort_by_key(|s| s.public_key());
149 let mut registrations = register_validators(&mut oracle, &validators).await;
150
151 link_validators(&mut oracle, &validators, link, None).await;
153
154 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
156
157 let mut public_keys = HashSet::new();
159 for (idx, scheme) in schemes.into_iter().enumerate() {
160 let public_key = scheme.public_key();
162 public_keys.insert(public_key.clone());
163
164 let uid = format!("validator-{}", public_key);
166 let config = engine::Config {
167 partition_prefix: uid.clone(),
168 signer: scheme,
169 identity: public.clone(),
170 share: shares[idx],
171 participants: validators.clone(),
172 mailbox_size: 1024,
173 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
174 leader_timeout: Duration::from_secs(1),
175 notarization_timeout: Duration::from_secs(2),
176 nullify_retry: Duration::from_secs(10),
177 fetch_timeout: Duration::from_secs(1),
178 activity_timeout: 10,
179 max_fetch_count: 10,
180 max_fetch_size: 1024 * 512,
181 fetch_concurrent: 10,
182 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
183 };
184 let engine = Engine::new(context.with_label(&uid), config).await;
185
186 let (voter, resolver, broadcast, backfill) =
188 registrations.remove(&public_key).unwrap();
189
190 engine.start(voter, resolver, broadcast, backfill);
192 }
193
194 loop {
196 let metrics = context.encode();
197
198 let mut failed = false;
200 for line in metrics.lines() {
201 if !line.starts_with("validator-") {
203 continue;
204 }
205
206 let mut parts = line.split_whitespace();
208 let metric = parts.next().unwrap();
209 let value = parts.next().unwrap();
210
211 if metric.ends_with("_peers_blocked") {
213 let value = value.parse::<u64>().unwrap();
214 assert_eq!(value, 0);
215 }
216
217 if metric.ends_with("_syncer_indexed_height") {
219 let value = value.parse::<u64>().unwrap();
220 if value < required_container {
221 failed = true;
222 break;
223 }
224 }
225 }
226 if !failed {
227 break;
228 }
229
230 context.sleep(Duration::from_secs(1)).await;
232 }
233 });
234 auditor.state()
235 }
236
237 #[test_traced]
238 fn test_good_links() {
239 let link = Link {
240 latency: 10.0,
241 jitter: 1.0,
242 success_rate: 1.0,
243 };
244 for seed in 0..5 {
245 let state = all_online(seed, link.clone());
246 assert_eq!(state, all_online(seed, link.clone()));
247 }
248 }
249
250 #[test_traced]
251 fn test_bad_links() {
252 let link = Link {
253 latency: 200.0,
254 jitter: 150.0,
255 success_rate: 0.75,
256 };
257 for seed in 0..5 {
258 let state = all_online(seed, link.clone());
259 assert_eq!(state, all_online(seed, link.clone()));
260 }
261 }
262
263 #[test_traced]
264 fn test_backfill() {
265 let n = 5;
267 let threshold = quorum(n).unwrap();
268 let initial_container_required = 10;
269 let final_container_required = 20;
270 let (executor, mut context, _) = Executor::timed(Duration::from_secs(30));
271 executor.start(async move {
272 let (network, mut oracle) = Network::new(
274 context.with_label("network"),
275 simulated::Config {
276 max_size: 1024 * 1024,
277 },
278 );
279
280 network.start();
282
283 let mut schemes = Vec::new();
285 let mut validators = Vec::new();
286 for i in 0..n {
287 let scheme = Ed25519::from_seed(i as u64);
288 let pk = scheme.public_key();
289 schemes.push(scheme);
290 validators.push(pk);
291 }
292 validators.sort();
293 schemes.sort_by_key(|s| s.public_key());
294 let mut registrations = register_validators(&mut oracle, &validators).await;
295
296 let link = Link {
298 latency: 10.0,
299 jitter: 1.0,
300 success_rate: 1.0,
301 };
302 link_validators(
303 &mut oracle,
304 &validators,
305 link.clone(),
306 Some(|_, i, j| ![i, j].contains(&0usize)),
307 )
308 .await;
309
310 let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
312
313 for (idx, scheme) in schemes.iter().enumerate() {
315 if idx == 0 {
317 continue;
318 }
319
320 let public_key = scheme.public_key();
322 let uid = format!("validator-{}", public_key);
323 let config = engine::Config {
324 partition_prefix: uid.clone(),
325 signer: scheme.clone(),
326 identity: public.clone(),
327 share: shares[idx],
328 participants: validators.clone(),
329 mailbox_size: 1024,
330 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
331 leader_timeout: Duration::from_secs(1),
332 notarization_timeout: Duration::from_secs(2),
333 nullify_retry: Duration::from_secs(10),
334 fetch_timeout: Duration::from_secs(1),
335 activity_timeout: 10,
336 max_fetch_count: 10,
337 max_fetch_size: 1024 * 512,
338 fetch_concurrent: 10,
339 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
340 };
341 let engine = Engine::new(context.with_label(&uid), config).await;
342
343 let (voter, resolver, broadcast, backfill) =
345 registrations.remove(&public_key).unwrap();
346
347 engine.start(voter, resolver, broadcast, backfill);
349 }
350
351 loop {
353 let metrics = context.encode();
354
355 let mut failed = false;
357 for line in metrics.lines() {
358 if !line.starts_with("validator-") {
360 continue;
361 }
362
363 let mut parts = line.split_whitespace();
365 let metric = parts.next().unwrap();
366 let value = parts.next().unwrap();
367
368 if metric.ends_with("_peers_blocked") {
370 let value = value.parse::<u64>().unwrap();
371 assert_eq!(value, 0);
372 }
373
374 if metric.ends_with("_syncer_indexed_height") {
376 let value = value.parse::<u64>().unwrap();
377 if value < initial_container_required {
378 failed = true;
379 break;
380 }
381 }
382 }
383 if !failed {
384 break;
385 }
386
387 context.sleep(Duration::from_secs(1)).await;
389 }
390
391 link_validators(
393 &mut oracle,
394 &validators,
395 link,
396 Some(|_, i, j| [i, j].contains(&0usize) && ![i, j].contains(&1usize)),
397 )
398 .await;
399
400 let scheme = schemes[0].clone();
402 let share = shares[0];
403 let public_key = scheme.public_key();
404 let uid = format!("validator-{}", public_key);
405 let config = engine::Config {
406 partition_prefix: uid.clone(),
407 signer: scheme.clone(),
408 identity: public.clone(),
409 share,
410 participants: validators.clone(),
411 mailbox_size: 1024,
412 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
413 leader_timeout: Duration::from_secs(1),
414 notarization_timeout: Duration::from_secs(2),
415 nullify_retry: Duration::from_secs(10),
416 fetch_timeout: Duration::from_secs(1),
417 activity_timeout: 10,
418 max_fetch_count: 10,
419 max_fetch_size: 1024 * 512,
420 fetch_concurrent: 10,
421 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
422 };
423 let engine = Engine::new(context.with_label(&uid), config).await;
424
425 let (voter, resolver, broadcast, backfill) = registrations.remove(&public_key).unwrap();
427
428 engine.start(voter, resolver, broadcast, backfill);
430
431 loop {
433 let metrics = context.encode();
434
435 let mut failed = false;
437 for line in metrics.lines() {
438 if !line.starts_with("validator-") {
440 continue;
441 }
442
443 let mut parts = line.split_whitespace();
445 let metric = parts.next().unwrap();
446 let value = parts.next().unwrap();
447
448 if metric.ends_with("_peers_blocked") {
450 let value = value.parse::<u64>().unwrap();
451 assert_eq!(value, 0);
452 }
453
454 if metric.ends_with("_syncer_indexed_height") {
456 let value = value.parse::<u64>().unwrap();
457 if value < final_container_required {
458 failed = true;
459 break;
460 }
461 }
462 }
463 if !failed {
464 break;
465 }
466
467 context.sleep(Duration::from_secs(1)).await;
469 }
470 });
471 }
472
473 #[test_traced]
474 fn test_unclean_shutdown() {
475 let n = 5;
477 let threshold = quorum(n).unwrap();
478 let required_container = 100;
479
480 let mut rng = StdRng::seed_from_u64(0);
482 let (public, shares) = ops::generate_shares(&mut rng, None, n, threshold);
483
484 let mut runs = 0;
486 let done = Arc::new(Mutex::new(false));
487 let (mut executor, mut context, _) = Executor::timed(Duration::from_secs(10));
488 while !*done.lock().unwrap() {
489 runs += 1;
490 executor.start({
491 let mut context = context.clone();
492 let public = public.clone();
493 let shares = shares.clone();
494 let done = done.clone();
495 async move {
496 let (network, mut oracle) = Network::new(
498 context.with_label("network"),
499 simulated::Config {
500 max_size: 1024 * 1024,
501 },
502 );
503
504 network.start();
506
507 let mut schemes = Vec::new();
509 let mut validators = Vec::new();
510 for i in 0..n {
511 let scheme = Ed25519::from_seed(i as u64);
512 let pk = scheme.public_key();
513 schemes.push(scheme);
514 validators.push(pk);
515 }
516 validators.sort();
517 schemes.sort_by_key(|s| s.public_key());
518 let mut registrations = register_validators(&mut oracle, &validators).await;
519
520 let link = Link {
522 latency: 10.0,
523 jitter: 1.0,
524 success_rate: 1.0,
525 };
526 link_validators(&mut oracle, &validators, link, None).await;
527
528 let mut public_keys = HashSet::new();
530 for (idx, scheme) in schemes.into_iter().enumerate() {
531 let public_key = scheme.public_key();
533 public_keys.insert(public_key.clone());
534
535 let uid = format!("validator-{}", public_key);
537 let config = engine::Config {
538 partition_prefix: uid.clone(),
539 signer: scheme,
540 identity: public.clone(),
541 share: shares[idx],
542 participants: validators.clone(),
543 mailbox_size: 1024,
544 backfill_quota: Quota::per_second(NonZeroU32::new(10).unwrap()),
545 leader_timeout: Duration::from_secs(1),
546 notarization_timeout: Duration::from_secs(2),
547 nullify_retry: Duration::from_secs(10),
548 fetch_timeout: Duration::from_secs(1),
549 activity_timeout: 10,
550 max_fetch_count: 10,
551 max_fetch_size: 1024 * 512,
552 fetch_concurrent: 10,
553 fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
554 };
555 let engine = Engine::new(context.with_label(&uid), config).await;
556
557 let (voter, resolver, broadcast, backfill) =
559 registrations.remove(&public_key).unwrap();
560
561 engine.start(voter, resolver, broadcast, backfill);
563 }
564
565 context
567 .with_label("metrics")
568 .spawn(move |context| async move {
569 loop {
570 let metrics = context.encode();
571
572 let mut failed = false;
574 for line in metrics.lines() {
575 if !line.starts_with("validator-") {
577 continue;
578 }
579
580 let mut parts = line.split_whitespace();
582 let metric = parts.next().unwrap();
583 let value = parts.next().unwrap();
584
585 if metric.ends_with("_peers_blocked") {
587 let value = value.parse::<u64>().unwrap();
588 assert_eq!(value, 0);
589 }
590
591 if metric.ends_with("_syncer_indexed_height") {
593 let value = value.parse::<u64>().unwrap();
594 if value < required_container {
595 failed = true;
596 break;
597 }
598 }
599 }
600 if !failed {
601 break;
602 }
603
604 context.sleep(Duration::from_millis(10)).await;
606 }
607 *done.lock().unwrap() = true;
608 });
609
610 let wait =
612 context.gen_range(Duration::from_millis(10)..Duration::from_millis(1_000));
613 context.sleep(wait).await;
614 }
615 });
616
617 (executor, context, _) = context.recover();
619 }
620 assert!(runs > 1);
621 info!(runs, "unclean shutdown recovery worked");
622 }
623}