1use commonware_cryptography::Array;
37
38mod namespace;
39mod parsed;
40mod serializer;
41
42#[cfg(test)]
43pub mod mocks;
44
45mod wire {
46 include!(concat!(env!("OUT_DIR"), "/wire.rs"));
47}
48
49pub mod prover;
50pub mod signer;
51
52pub type Epoch = u64;
58
59#[derive(Debug, Clone, Hash, PartialEq, Eq)]
61pub struct Context<P: Array> {
62 pub sequencer: P,
64
65 pub height: u64,
67}
68
69#[cfg(test)]
70mod tests {
71 use super::{mocks, signer};
72 use bytes::Bytes;
73 use commonware_cryptography::{
74 bls12381::{
75 dkg::ops,
76 primitives::{group::Share, poly},
77 },
78 ed25519::PublicKey,
79 sha256::{Digest as Sha256Digest, Sha256},
80 Ed25519, Hasher, Scheme,
81 };
82 use commonware_macros::test_traced;
83 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
84 use commonware_runtime::deterministic::{self, Context, Executor};
85 use commonware_runtime::{Clock, Runner, Spawner};
86 use futures::channel::oneshot;
87 use futures::future::join_all;
88 use prometheus_client::registry::Registry;
89 use std::sync::{Arc, Mutex};
90 use std::{
91 collections::{BTreeMap, HashSet},
92 time::Duration,
93 };
94 use tracing::debug;
95
96 type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
97
98 async fn register_validators(
99 oracle: &mut Oracle<PublicKey>,
100 validators: &[PublicKey],
101 ) -> Registrations<PublicKey> {
102 let mut registrations = BTreeMap::new();
103 for validator in validators.iter() {
104 let (a1, a2) = oracle.register(validator.clone(), 0).await.unwrap();
105 let (b1, b2) = oracle.register(validator.clone(), 1).await.unwrap();
106 registrations.insert(validator.clone(), ((a1, a2), (b1, b2)));
107 }
108 registrations
109 }
110
111 #[allow(dead_code)]
112 enum Action {
113 Link(Link),
114 Update(Link),
115 Unlink,
116 }
117
118 async fn link_validators(
119 oracle: &mut Oracle<PublicKey>,
120 validators: &[PublicKey],
121 action: Action,
122 restrict_to: Option<fn(usize, usize, usize) -> bool>,
123 ) {
124 for (i1, v1) in validators.iter().enumerate() {
125 for (i2, v2) in validators.iter().enumerate() {
126 if v2 == v1 {
127 continue;
128 }
129 if let Some(f) = restrict_to {
130 if !f(validators.len(), i1, i2) {
131 continue;
132 }
133 }
134 if matches!(action, Action::Update(_) | Action::Unlink) {
135 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
136 }
137 if let Action::Link(ref link) | Action::Update(ref link) = action {
138 oracle
139 .add_link(v1.clone(), v2.clone(), link.clone())
140 .await
141 .unwrap();
142 }
143 }
144 }
145 }
146
147 async fn initialize_simulation(
148 runtime: &Context,
149 num_validators: u32,
150 shares_vec: &mut [Share],
151 ) -> (
152 Oracle<PublicKey>,
153 Vec<(PublicKey, Ed25519, Share)>,
154 Vec<PublicKey>,
155 Registrations<PublicKey>,
156 ) {
157 let (network, mut oracle) = Network::new(
158 runtime.clone(),
159 commonware_p2p::simulated::Config {
160 registry: Arc::new(Mutex::new(Registry::default())),
161 max_size: 1024 * 1024,
162 },
163 );
164 runtime.clone().spawn("network", network.run());
165
166 let mut schemes = (0..num_validators)
167 .map(|i| Ed25519::from_seed(i as u64))
168 .collect::<Vec<_>>();
169 schemes.sort_by_key(|s| s.public_key());
170 let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
171 .iter()
172 .enumerate()
173 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
174 .collect();
175 let pks = validators
176 .iter()
177 .map(|(pk, _, _)| pk.clone())
178 .collect::<Vec<_>>();
179
180 let registrations = register_validators(&mut oracle, &pks).await;
181 let link = Link {
182 latency: 10.0,
183 jitter: 1.0,
184 success_rate: 1.0,
185 };
186 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
187 (oracle, validators, pks, registrations)
188 }
189
190 #[allow(clippy::too_many_arguments)]
191 fn spawn_validator_engines(
192 runtime: &Context,
193 identity: poly::Public,
194 pks: &[PublicKey],
195 validators: &[(PublicKey, Ed25519, Share)],
196 registrations: &mut Registrations<PublicKey>,
197 mailboxes: &mut BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>,
198 collectors: &mut BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
199 refresh_epoch_timeout: Duration,
200 rebroadcast_timeout: Duration,
201 ) {
202 let namespace = b"my testing namespace";
203 for (validator, scheme, share) in validators.iter() {
204 let mut coordinator = mocks::coordinator::Coordinator::<PublicKey>::new(
205 identity.clone(),
206 pks.to_vec(),
207 *share,
208 );
209 coordinator.set_view(111);
210
211 let (mut app, app_mailbox) =
212 mocks::application::Application::<Sha256Digest, PublicKey>::new();
213 mailboxes.insert(validator.clone(), app_mailbox.clone());
214
215 let (collector, collector_mailbox) =
216 mocks::collector::Collector::<Ed25519, Sha256Digest>::new(
217 namespace,
218 poly::public(&identity),
219 );
220 runtime.clone().spawn("collector", collector.run());
221 collectors.insert(validator.clone(), collector_mailbox);
222
223 let (signer, signer_mailbox) = signer::Actor::new(
224 runtime.clone(),
225 signer::Config {
226 crypto: scheme.clone(),
227 application: app_mailbox.clone(),
228 collector: collectors.get(validator).unwrap().clone(),
229 coordinator,
230 mailbox_size: 1024,
231 pending_verify_size: 1024,
232 namespace: namespace.to_vec(),
233 epoch_bounds: (1, 1),
234 height_bound: 2,
235 refresh_epoch_timeout,
236 rebroadcast_timeout,
237 journal_heights_per_section: 10,
238 journal_replay_concurrency: 1,
239 journal_name_prefix: format!("broadcast-linked-seq/{}/", validator),
240 },
241 );
242
243 runtime
244 .clone()
245 .spawn("app", async move { app.run(signer_mailbox).await });
246 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
247 runtime.clone().spawn(
248 "signer",
249 async move { signer.run((a1, a2), (b1, b2)).await },
250 );
251 }
252 }
253
254 fn spawn_proposer(
255 runtime: &Context,
256 mailboxes: Arc<
257 Mutex<BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>>,
258 >,
259 invalid_when: fn(u64) -> bool,
260 ) {
261 runtime.clone().spawn("invalid signature proposer", {
262 let runtime = runtime.clone();
263 async move {
264 let mut iter = 0;
265 loop {
266 iter += 1;
267 let mailbox_vec: Vec<mocks::application::Mailbox<Sha256Digest, PublicKey>> = {
268 let guard = mailboxes.lock().unwrap();
269 guard.values().cloned().collect()
270 };
271 for mut mailbox in mailbox_vec {
272 let payload = Bytes::from(format!("hello world, iter {}", iter));
273 let mut hasher = Sha256::default();
274 hasher.update(&payload);
275
276 if invalid_when(iter) {
278 hasher.update(&payload);
279 }
280
281 let digest = hasher.finalize();
282 mailbox.broadcast(digest).await;
283 }
284 runtime.sleep(Duration::from_millis(250)).await;
285 }
286 }
287 });
288 }
289
290 async fn await_collectors(
291 runtime: &Context,
292 collectors: &BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
293 threshold: u64,
294 ) {
295 let mut receivers = Vec::new();
296 for (sequencer, mailbox) in collectors.iter() {
297 let (tx, rx) = oneshot::channel();
299 receivers.push(rx);
300
301 runtime.spawn("collector_watcher", {
303 let sequencer = sequencer.clone();
304 let mut mailbox = mailbox.clone();
305 let runtime = runtime.clone();
306 async move {
307 loop {
308 let tip = mailbox.get_tip(sequencer.clone()).await.unwrap_or(0);
309 debug!(tip, ?sequencer, "collector");
310 if tip >= threshold {
311 let _ = tx.send(sequencer.clone());
312 break;
313 }
314 runtime.sleep(Duration::from_millis(100)).await;
315 }
316 }
317 });
318 }
319
320 let results = join_all(receivers).await;
322 assert_eq!(results.len(), collectors.len());
323 }
324
325 #[test_traced]
326 fn test_all_online() {
327 let num_validators: u32 = 4;
328 let quorum: u32 = 3;
329 let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
330 let (identity, mut shares_vec) =
331 ops::generate_shares(&mut context, None, num_validators, quorum);
332 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
333
334 runner.start({
335 let context = context.clone();
336 async move {
337 let (_oracle, validators, pks, mut registrations) =
338 initialize_simulation(&context, num_validators, &mut shares_vec).await;
339 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
340 PublicKey,
341 mocks::application::Mailbox<Sha256Digest, PublicKey>,
342 >::new()));
343 let mut collectors =
344 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
345 spawn_validator_engines(
346 &context,
347 identity.clone(),
348 &pks,
349 &validators,
350 &mut registrations,
351 &mut mailboxes.lock().unwrap(),
352 &mut collectors,
353 Duration::from_millis(100),
354 Duration::from_secs(5),
355 );
356 spawn_proposer(&context, mailboxes.clone(), |_| false);
357 await_collectors(&context, &collectors, 100).await;
358 }
359 });
360 }
361
362 #[test_traced]
363 fn test_unclean_shutdown() {
364 let num_validators: u32 = 4;
365 let quorum: u32 = 3;
366 let (mut runner, mut context, _) = Executor::timed(Duration::from_secs(45));
367 let (identity, mut shares_vec) =
368 ops::generate_shares(&mut context, None, num_validators, quorum);
369 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
370 let completed = Arc::new(Mutex::new(HashSet::new()));
371 let shutdowns = Arc::new(Mutex::new(0u64));
372
373 while completed.lock().unwrap().len() != num_validators as usize {
374 runner.start({
375 let context = context.clone();
376 let completed = completed.clone();
377 let shares_vec = shares_vec.clone();
378 let shutdowns = shutdowns.clone();
379 let identity = identity.clone();
380 async move {
381 let (network, mut oracle) = Network::new(
382 context.clone(),
383 commonware_p2p::simulated::Config {
384 registry: Arc::new(Mutex::new(Registry::default())),
385 max_size: 1024 * 1024,
386 },
387 );
388 context.clone().spawn("network", network.run());
389
390 let mut schemes = (0..num_validators)
391 .map(|i| Ed25519::from_seed(i as u64))
392 .collect::<Vec<_>>();
393 schemes.sort_by_key(|s| s.public_key());
394 let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
395 .iter()
396 .enumerate()
397 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
398 .collect();
399 let pks = validators
400 .iter()
401 .map(|(pk, _, _)| pk.clone())
402 .collect::<Vec<_>>();
403
404 let mut registrations = register_validators(&mut oracle, &pks).await;
405 let link = commonware_p2p::simulated::Link {
406 latency: 10.0,
407 jitter: 1.0,
408 success_rate: 1.0,
409 };
410 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
411
412 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
413 PublicKey,
414 mocks::application::Mailbox<Sha256Digest, PublicKey>,
415 >::new()));
416 let mut collectors = BTreeMap::<
417 PublicKey,
418 mocks::collector::Mailbox<Ed25519, Sha256Digest>,
419 >::new();
420 spawn_validator_engines(
421 &context,
422 identity.clone(),
423 &pks,
424 &validators,
425 &mut registrations,
426 &mut mailboxes.lock().unwrap(),
427 &mut collectors,
428 Duration::from_millis(100),
429 Duration::from_secs(5),
430 );
431 spawn_proposer(&context, mailboxes.clone(), |_| false);
432
433 let collector_pairs: Vec<(
434 PublicKey,
435 mocks::collector::Mailbox<Ed25519, Sha256Digest>,
436 )> = collectors
437 .iter()
438 .map(|(v, m)| (v.clone(), m.clone()))
439 .collect();
440 for (validator, mut mailbox) in collector_pairs {
441 let context_cloned = context.clone();
442 let completed_clone = completed.clone();
443 context.clone().spawn("collector_unclean", async move {
444 loop {
445 let tip = mailbox.get_tip(validator.clone()).await.unwrap_or(0);
446 if tip >= 100 {
447 completed_clone.lock().unwrap().insert(validator.clone());
448 break;
449 }
450 context_cloned.sleep(Duration::from_millis(100)).await;
451 }
452 });
453 }
454 context.sleep(Duration::from_millis(1000)).await;
455 *shutdowns.lock().unwrap() += 1;
456 }
457 });
458 let recovered = context.recover();
459 runner = recovered.0;
460 context = recovered.1;
461 }
462 }
463
464 #[test_traced]
465 fn test_network_partition() {
466 let num_validators: u32 = 4;
467 let quorum: u32 = 3;
468 let (runner, mut context, _) = Executor::timed(Duration::from_secs(60));
469 let (identity, mut shares_vec) =
470 ops::generate_shares(&mut context, None, num_validators, quorum);
471 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
472
473 runner.start({
474 let context = context.clone();
475 async move {
476 let (mut oracle, validators, pks, mut registrations) =
477 initialize_simulation(&context, num_validators, &mut shares_vec).await;
478 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
479 PublicKey,
480 mocks::application::Mailbox<Sha256Digest, PublicKey>,
481 >::new()));
482 let mut collectors =
483 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
484 spawn_validator_engines(
485 &context,
486 identity.clone(),
487 &pks,
488 &validators,
489 &mut registrations,
490 &mut mailboxes.lock().unwrap(),
491 &mut collectors,
492 Duration::from_millis(100),
493 Duration::from_secs(1),
494 );
495 spawn_proposer(&context, mailboxes.clone(), |_| false);
496 link_validators(&mut oracle, &pks, Action::Unlink, None).await;
498 context.sleep(Duration::from_secs(5)).await;
499 let link = Link {
501 latency: 10.0,
502 jitter: 1.0,
503 success_rate: 1.0,
504 };
505 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
506 await_collectors(&context, &collectors, 100).await;
507 }
508 });
509 }
510
511 fn slow_and_lossy_links(seed: u64) -> String {
512 let num_validators: u32 = 4;
513 let quorum: u32 = 3;
514 let cfg = deterministic::Config {
515 seed,
516 timeout: Some(Duration::from_secs(40)),
517 ..deterministic::Config::default()
518 };
519 let (runner, mut context, auditor) = Executor::init(cfg);
520 let (identity, mut shares_vec) =
521 ops::generate_shares(&mut context, None, num_validators, quorum);
522 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
523
524 runner.start({
525 let context = context.clone();
526 async move {
527 let (oracle, validators, pks, mut registrations) =
528 initialize_simulation(&context, num_validators, &mut shares_vec).await;
529 let delayed_link = Link {
530 latency: 50.0,
531 jitter: 40.0,
532 success_rate: 0.5,
533 };
534 let mut oracle_clone = oracle.clone();
535 link_validators(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
536
537 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
538 PublicKey,
539 mocks::application::Mailbox<Sha256Digest, PublicKey>,
540 >::new()));
541 let mut collectors =
542 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
543 spawn_validator_engines(
544 &context,
545 identity.clone(),
546 &pks,
547 &validators,
548 &mut registrations,
549 &mut mailboxes.lock().unwrap(),
550 &mut collectors,
551 Duration::from_millis(100),
552 Duration::from_millis(150),
553 );
554
555 spawn_proposer(&context, mailboxes.clone(), |_| false);
556 await_collectors(&context, &collectors, 40).await;
557 }
558 });
559 auditor.state()
560 }
561
562 #[test_traced]
563 fn test_slow_and_lossy_links() {
564 slow_and_lossy_links(0);
565 }
566
567 #[test_traced]
568 fn test_determinism() {
569 for seed in 1..6 {
572 let state_1 = slow_and_lossy_links(seed);
573 let state_2 = slow_and_lossy_links(seed);
574 assert_eq!(state_1, state_2);
575 }
576 }
577
578 #[test_traced]
579 fn test_invalid_signature_injection() {
580 let num_validators: u32 = 4;
581 let quorum: u32 = 3;
582 let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
583 let (identity, mut shares_vec) =
584 ops::generate_shares(&mut context, None, num_validators, quorum);
585 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
586
587 runner.start({
588 let context = context.clone();
589 async move {
590 let (_oracle, validators, pks, mut registrations) =
591 initialize_simulation(&context, num_validators, &mut shares_vec).await;
592 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
593 PublicKey,
594 mocks::application::Mailbox<Sha256Digest, PublicKey>,
595 >::new()));
596 let mut collectors =
597 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
598 spawn_validator_engines(
599 &context,
600 identity.clone(),
601 &pks,
602 &validators,
603 &mut registrations,
604 &mut mailboxes.lock().unwrap(),
605 &mut collectors,
606 Duration::from_millis(100),
607 Duration::from_secs(5),
608 );
609
610 spawn_proposer(&context, mailboxes.clone(), |i| i % 10 == 0);
611 await_collectors(&context, &collectors, 100).await;
612 }
613 });
614 }
615}