1use commonware_utils::Array;
37
38mod namespace;
39mod parsed;
40mod serializer;
41
42#[cfg(test)]
43pub mod mocks;
44
45mod wire {
46 include!(concat!(env!("OUT_DIR"), "/wire.rs"));
47}
48
49pub mod prover;
50pub mod signer;
51
52pub type Epoch = u64;
58
59#[derive(Debug, Clone, Hash, PartialEq, Eq)]
61pub struct Context<P: Array> {
62 pub sequencer: P,
64
65 pub height: u64,
67}
68
69#[cfg(test)]
70mod tests {
71 use super::{mocks, signer};
72 use bytes::Bytes;
73 use commonware_cryptography::{
74 bls12381::{
75 dkg::ops,
76 primitives::{group::Share, poly},
77 },
78 ed25519::PublicKey,
79 sha256::{Digest as Sha256Digest, Sha256},
80 Ed25519, Hasher, Scheme,
81 };
82 use commonware_macros::test_traced;
83 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
84 use commonware_runtime::{
85 deterministic::{self, Context, Executor},
86 Metrics,
87 };
88 use commonware_runtime::{Clock, Runner, Spawner};
89 use futures::channel::oneshot;
90 use futures::future::join_all;
91 use std::sync::{Arc, Mutex};
92 use std::{
93 collections::{BTreeMap, HashSet},
94 time::Duration,
95 };
96 use tracing::debug;
97
98 type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
99
100 async fn register_validators(
101 oracle: &mut Oracle<PublicKey>,
102 validators: &[PublicKey],
103 ) -> Registrations<PublicKey> {
104 let mut registrations = BTreeMap::new();
105 for validator in validators.iter() {
106 let (a1, a2) = oracle.register(validator.clone(), 0).await.unwrap();
107 let (b1, b2) = oracle.register(validator.clone(), 1).await.unwrap();
108 registrations.insert(validator.clone(), ((a1, a2), (b1, b2)));
109 }
110 registrations
111 }
112
113 #[allow(dead_code)]
114 enum Action {
115 Link(Link),
116 Update(Link),
117 Unlink,
118 }
119
120 async fn link_validators(
121 oracle: &mut Oracle<PublicKey>,
122 validators: &[PublicKey],
123 action: Action,
124 restrict_to: Option<fn(usize, usize, usize) -> bool>,
125 ) {
126 for (i1, v1) in validators.iter().enumerate() {
127 for (i2, v2) in validators.iter().enumerate() {
128 if v2 == v1 {
129 continue;
130 }
131 if let Some(f) = restrict_to {
132 if !f(validators.len(), i1, i2) {
133 continue;
134 }
135 }
136 if matches!(action, Action::Update(_) | Action::Unlink) {
137 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
138 }
139 if let Action::Link(ref link) | Action::Update(ref link) = action {
140 oracle
141 .add_link(v1.clone(), v2.clone(), link.clone())
142 .await
143 .unwrap();
144 }
145 }
146 }
147 }
148
149 async fn initialize_simulation(
150 context: Context,
151 num_validators: u32,
152 shares_vec: &mut [Share],
153 ) -> (
154 Oracle<PublicKey>,
155 Vec<(PublicKey, Ed25519, Share)>,
156 Vec<PublicKey>,
157 Registrations<PublicKey>,
158 ) {
159 let (network, mut oracle) = Network::new(
160 context.with_label("network"),
161 commonware_p2p::simulated::Config {
162 max_size: 1024 * 1024,
163 },
164 );
165 network.start();
166
167 let mut schemes = (0..num_validators)
168 .map(|i| Ed25519::from_seed(i as u64))
169 .collect::<Vec<_>>();
170 schemes.sort_by_key(|s| s.public_key());
171 let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
172 .iter()
173 .enumerate()
174 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
175 .collect();
176 let pks = validators
177 .iter()
178 .map(|(pk, _, _)| pk.clone())
179 .collect::<Vec<_>>();
180
181 let registrations = register_validators(&mut oracle, &pks).await;
182 let link = Link {
183 latency: 10.0,
184 jitter: 1.0,
185 success_rate: 1.0,
186 };
187 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
188 (oracle, validators, pks, registrations)
189 }
190
191 #[allow(clippy::too_many_arguments)]
192 fn spawn_validator_engines(
193 context: Context,
194 identity: poly::Public,
195 pks: &[PublicKey],
196 validators: &[(PublicKey, Ed25519, Share)],
197 registrations: &mut Registrations<PublicKey>,
198 mailboxes: &mut BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>,
199 collectors: &mut BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
200 refresh_epoch_timeout: Duration,
201 rebroadcast_timeout: Duration,
202 ) {
203 let namespace = b"my testing namespace";
204 for (validator, scheme, share) in validators.iter() {
205 let context = context.with_label(&validator.to_string());
206 let mut coordinator = mocks::coordinator::Coordinator::<PublicKey>::new(
207 identity.clone(),
208 pks.to_vec(),
209 *share,
210 );
211 coordinator.set_view(111);
212
213 let (app, app_mailbox) =
214 mocks::application::Application::<Sha256Digest, PublicKey>::new();
215 mailboxes.insert(validator.clone(), app_mailbox.clone());
216
217 let (collector, collector_mailbox) =
218 mocks::collector::Collector::<Ed25519, Sha256Digest>::new(
219 namespace,
220 poly::public(&identity),
221 );
222 context.with_label("collector").spawn(|_| collector.run());
223 collectors.insert(validator.clone(), collector_mailbox);
224
225 let (signer, signer_mailbox) = signer::Actor::new(
226 context.with_label("signer"),
227 signer::Config {
228 crypto: scheme.clone(),
229 application: app_mailbox.clone(),
230 collector: collectors.get(validator).unwrap().clone(),
231 coordinator,
232 mailbox_size: 1024,
233 pending_verify_size: 1024,
234 namespace: namespace.to_vec(),
235 epoch_bounds: (1, 1),
236 height_bound: 2,
237 refresh_epoch_timeout,
238 rebroadcast_timeout,
239 journal_heights_per_section: 10,
240 journal_replay_concurrency: 1,
241 journal_name_prefix: format!("broadcast-linked-seq/{}/", validator),
242 },
243 );
244
245 context.with_label("app").spawn(|_| app.run(signer_mailbox));
246 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
247 signer.start((a1, a2), (b1, b2));
248 }
249 }
250
251 fn spawn_proposer(
252 context: Context,
253 mailboxes: Arc<
254 Mutex<BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>>,
255 >,
256 invalid_when: fn(u64) -> bool,
257 ) {
258 context
259 .clone()
260 .with_label("invalid signature proposer")
261 .spawn(move |context| async move {
262 let mut iter = 0;
263 loop {
264 iter += 1;
265 let mailbox_vec: Vec<mocks::application::Mailbox<Sha256Digest, PublicKey>> = {
266 let guard = mailboxes.lock().unwrap();
267 guard.values().cloned().collect()
268 };
269 for mut mailbox in mailbox_vec {
270 let payload = Bytes::from(format!("hello world, iter {}", iter));
271 let mut hasher = Sha256::default();
272 hasher.update(&payload);
273
274 if invalid_when(iter) {
276 hasher.update(&payload);
277 }
278
279 let digest = hasher.finalize();
280 mailbox.broadcast(digest).await;
281 }
282 context.sleep(Duration::from_millis(250)).await;
283 }
284 });
285 }
286
287 async fn await_collectors(
288 context: Context,
289 collectors: &BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
290 threshold: u64,
291 ) {
292 let mut receivers = Vec::new();
293 for (sequencer, mailbox) in collectors.iter() {
294 let (tx, rx) = oneshot::channel();
296 receivers.push(rx);
297
298 context.with_label("collector_watcher").spawn({
300 let sequencer = sequencer.clone();
301 let mut mailbox = mailbox.clone();
302 move |context| async move {
303 loop {
304 let tip = mailbox.get_tip(sequencer.clone()).await.unwrap_or(0);
305 debug!(tip, ?sequencer, "collector");
306 if tip >= threshold {
307 let _ = tx.send(sequencer.clone());
308 break;
309 }
310 context.sleep(Duration::from_millis(100)).await;
311 }
312 }
313 });
314 }
315
316 let results = join_all(receivers).await;
318 assert_eq!(results.len(), collectors.len());
319 }
320
321 #[test_traced]
322 fn test_all_online() {
323 let num_validators: u32 = 4;
324 let quorum: u32 = 3;
325 let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
326 let (identity, mut shares_vec) =
327 ops::generate_shares(&mut context, None, num_validators, quorum);
328 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
329
330 runner.start(async move {
331 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
332 context.with_label("simulation"),
333 num_validators,
334 &mut shares_vec,
335 )
336 .await;
337 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
338 PublicKey,
339 mocks::application::Mailbox<Sha256Digest, PublicKey>,
340 >::new()));
341 let mut collectors =
342 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
343 spawn_validator_engines(
344 context.with_label("validator"),
345 identity.clone(),
346 &pks,
347 &validators,
348 &mut registrations,
349 &mut mailboxes.lock().unwrap(),
350 &mut collectors,
351 Duration::from_millis(100),
352 Duration::from_secs(5),
353 );
354 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
355 await_collectors(context.with_label("collector"), &collectors, 100).await;
356 });
357 }
358
359 #[test_traced]
360 fn test_unclean_shutdown() {
361 let num_validators: u32 = 4;
362 let quorum: u32 = 3;
363 let (mut runner, mut context, _) = Executor::timed(Duration::from_secs(45));
364 let (identity, mut shares_vec) =
365 ops::generate_shares(&mut context, None, num_validators, quorum);
366 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
367 let completed = Arc::new(Mutex::new(HashSet::new()));
368 let shutdowns = Arc::new(Mutex::new(0u64));
369
370 while completed.lock().unwrap().len() != num_validators as usize {
371 runner.start({
372 let context = context.clone();
373 let completed = completed.clone();
374 let shares_vec = shares_vec.clone();
375 let shutdowns = shutdowns.clone();
376 let identity = identity.clone();
377 async move {
378 let (network, mut oracle) = Network::new(
379 context.with_label("network"),
380 commonware_p2p::simulated::Config {
381 max_size: 1024 * 1024,
382 },
383 );
384 network.start();
385
386 let mut schemes = (0..num_validators)
387 .map(|i| Ed25519::from_seed(i as u64))
388 .collect::<Vec<_>>();
389 schemes.sort_by_key(|s| s.public_key());
390 let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
391 .iter()
392 .enumerate()
393 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
394 .collect();
395 let pks = validators
396 .iter()
397 .map(|(pk, _, _)| pk.clone())
398 .collect::<Vec<_>>();
399
400 let mut registrations = register_validators(&mut oracle, &pks).await;
401 let link = commonware_p2p::simulated::Link {
402 latency: 10.0,
403 jitter: 1.0,
404 success_rate: 1.0,
405 };
406 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
407
408 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
409 PublicKey,
410 mocks::application::Mailbox<Sha256Digest, PublicKey>,
411 >::new()));
412 let mut collectors = BTreeMap::<
413 PublicKey,
414 mocks::collector::Mailbox<Ed25519, Sha256Digest>,
415 >::new();
416 spawn_validator_engines(
417 context.with_label("validator"),
418 identity.clone(),
419 &pks,
420 &validators,
421 &mut registrations,
422 &mut mailboxes.lock().unwrap(),
423 &mut collectors,
424 Duration::from_millis(100),
425 Duration::from_secs(5),
426 );
427 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
428
429 let collector_pairs: Vec<(
430 PublicKey,
431 mocks::collector::Mailbox<Ed25519, Sha256Digest>,
432 )> = collectors
433 .iter()
434 .map(|(v, m)| (v.clone(), m.clone()))
435 .collect();
436 for (validator, mut mailbox) in collector_pairs {
437 let completed_clone = completed.clone();
438 context
439 .with_label("collector_unclean")
440 .spawn(|context| async move {
441 loop {
442 let tip = mailbox.get_tip(validator.clone()).await.unwrap_or(0);
443 if tip >= 100 {
444 completed_clone.lock().unwrap().insert(validator.clone());
445 break;
446 }
447 context.sleep(Duration::from_millis(100)).await;
448 }
449 });
450 }
451 context.sleep(Duration::from_millis(1000)).await;
452 *shutdowns.lock().unwrap() += 1;
453 }
454 });
455 let recovered = context.recover();
456 runner = recovered.0;
457 context = recovered.1;
458 }
459 }
460
461 #[test_traced]
462 fn test_network_partition() {
463 let num_validators: u32 = 4;
464 let quorum: u32 = 3;
465 let (runner, mut context, _) = Executor::timed(Duration::from_secs(60));
466 let (identity, mut shares_vec) =
467 ops::generate_shares(&mut context, None, num_validators, quorum);
468 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
469
470 runner.start(async move {
471 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
472 context.with_label("simulation"),
473 num_validators,
474 &mut shares_vec,
475 )
476 .await;
477 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
478 PublicKey,
479 mocks::application::Mailbox<Sha256Digest, PublicKey>,
480 >::new()));
481 let mut collectors =
482 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
483 spawn_validator_engines(
484 context.with_label("validator"),
485 identity.clone(),
486 &pks,
487 &validators,
488 &mut registrations,
489 &mut mailboxes.lock().unwrap(),
490 &mut collectors,
491 Duration::from_millis(100),
492 Duration::from_secs(1),
493 );
494 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
495 link_validators(&mut oracle, &pks, Action::Unlink, None).await;
497 context.sleep(Duration::from_secs(5)).await;
498 let link = Link {
500 latency: 10.0,
501 jitter: 1.0,
502 success_rate: 1.0,
503 };
504 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
505 await_collectors(context.with_label("collector"), &collectors, 100).await;
506 });
507 }
508
509 fn slow_and_lossy_links(seed: u64) -> String {
510 let num_validators: u32 = 4;
511 let quorum: u32 = 3;
512 let cfg = deterministic::Config {
513 seed,
514 timeout: Some(Duration::from_secs(40)),
515 ..deterministic::Config::default()
516 };
517 let (runner, mut context, auditor) = Executor::init(cfg);
518 let (identity, mut shares_vec) =
519 ops::generate_shares(&mut context, None, num_validators, quorum);
520 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
521
522 runner.start(async move {
523 let (oracle, validators, pks, mut registrations) = initialize_simulation(
524 context.with_label("simulation"),
525 num_validators,
526 &mut shares_vec,
527 )
528 .await;
529 let delayed_link = Link {
530 latency: 50.0,
531 jitter: 40.0,
532 success_rate: 0.5,
533 };
534 let mut oracle_clone = oracle.clone();
535 link_validators(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
536
537 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
538 PublicKey,
539 mocks::application::Mailbox<Sha256Digest, PublicKey>,
540 >::new()));
541 let mut collectors =
542 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
543 spawn_validator_engines(
544 context.with_label("validator"),
545 identity.clone(),
546 &pks,
547 &validators,
548 &mut registrations,
549 &mut mailboxes.lock().unwrap(),
550 &mut collectors,
551 Duration::from_millis(100),
552 Duration::from_millis(150),
553 );
554
555 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
556 await_collectors(context.with_label("collector"), &collectors, 40).await;
557 });
558 auditor.state()
559 }
560
561 #[test_traced]
562 fn test_slow_and_lossy_links() {
563 slow_and_lossy_links(0);
564 }
565
566 #[test_traced]
567 fn test_determinism() {
568 for seed in 1..6 {
571 let state_1 = slow_and_lossy_links(seed);
572 let state_2 = slow_and_lossy_links(seed);
573 assert_eq!(state_1, state_2);
574 }
575 }
576
577 #[test_traced]
578 fn test_invalid_signature_injection() {
579 let num_validators: u32 = 4;
580 let quorum: u32 = 3;
581 let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
582 let (identity, mut shares_vec) =
583 ops::generate_shares(&mut context, None, num_validators, quorum);
584 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
585
586 runner.start(async move {
587 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
588 context.with_label("simulation"),
589 num_validators,
590 &mut shares_vec,
591 )
592 .await;
593 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
594 PublicKey,
595 mocks::application::Mailbox<Sha256Digest, PublicKey>,
596 >::new()));
597 let mut collectors =
598 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
599 spawn_validator_engines(
600 context.with_label("validator"),
601 identity.clone(),
602 &pks,
603 &validators,
604 &mut registrations,
605 &mut mailboxes.lock().unwrap(),
606 &mut collectors,
607 Duration::from_millis(100),
608 Duration::from_secs(5),
609 );
610
611 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |i| {
612 i % 10 == 0
613 });
614 await_collectors(context.with_label("collector"), &collectors, 100).await;
615 });
616 }
617}