1use commonware_utils::Array;
37
38mod ack_manager;
39use ack_manager::AckManager;
40mod config;
41pub use config::Config;
42mod engine;
43pub use engine::Engine;
44mod ingress;
45use ingress::{Mailbox, Message};
46mod metrics;
47mod namespace;
48mod parsed;
49mod prover;
50pub use prover::Prover;
51mod serializer;
52mod tip_manager;
53use tip_manager::TipManager;
54mod wire {
55 include!(concat!(env!("OUT_DIR"), "/wire.rs"));
56}
57#[cfg(test)]
58pub mod mocks;
59
60pub type Epoch = u64;
66
67#[derive(Debug, Clone, Hash, PartialEq, Eq)]
69pub struct Context<P: Array> {
70 pub sequencer: P,
72
73 pub height: u64,
75}
76
77#[cfg(test)]
78mod tests {
79 use super::{mocks, Config, Engine};
80 use bytes::Bytes;
81 use commonware_cryptography::{
82 bls12381::{
83 dkg::ops,
84 primitives::{group::Share, poly},
85 },
86 ed25519::PublicKey,
87 sha256::{Digest as Sha256Digest, Sha256},
88 Ed25519, Hasher, Scheme,
89 };
90 use commonware_macros::test_traced;
91 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
92 use commonware_runtime::{
93 deterministic::{self, Context, Executor},
94 Metrics,
95 };
96 use commonware_runtime::{Clock, Runner, Spawner};
97 use futures::channel::oneshot;
98 use futures::future::join_all;
99 use std::sync::{Arc, Mutex};
100 use std::{
101 collections::{BTreeMap, HashSet},
102 time::Duration,
103 };
104 use tracing::debug;
105
106 type Registrations<P> = BTreeMap<P, ((Sender<P>, Receiver<P>), (Sender<P>, Receiver<P>))>;
107
108 async fn register_validators(
109 oracle: &mut Oracle<PublicKey>,
110 validators: &[PublicKey],
111 ) -> Registrations<PublicKey> {
112 let mut registrations = BTreeMap::new();
113 for validator in validators.iter() {
114 let (a1, a2) = oracle.register(validator.clone(), 0).await.unwrap();
115 let (b1, b2) = oracle.register(validator.clone(), 1).await.unwrap();
116 registrations.insert(validator.clone(), ((a1, a2), (b1, b2)));
117 }
118 registrations
119 }
120
121 #[allow(dead_code)]
122 enum Action {
123 Link(Link),
124 Update(Link),
125 Unlink,
126 }
127
128 async fn link_validators(
129 oracle: &mut Oracle<PublicKey>,
130 validators: &[PublicKey],
131 action: Action,
132 restrict_to: Option<fn(usize, usize, usize) -> bool>,
133 ) {
134 for (i1, v1) in validators.iter().enumerate() {
135 for (i2, v2) in validators.iter().enumerate() {
136 if v2 == v1 {
137 continue;
138 }
139 if let Some(f) = restrict_to {
140 if !f(validators.len(), i1, i2) {
141 continue;
142 }
143 }
144 if matches!(action, Action::Update(_) | Action::Unlink) {
145 oracle.remove_link(v1.clone(), v2.clone()).await.unwrap();
146 }
147 if let Action::Link(ref link) | Action::Update(ref link) = action {
148 oracle
149 .add_link(v1.clone(), v2.clone(), link.clone())
150 .await
151 .unwrap();
152 }
153 }
154 }
155 }
156
157 async fn initialize_simulation(
158 context: Context,
159 num_validators: u32,
160 shares_vec: &mut [Share],
161 ) -> (
162 Oracle<PublicKey>,
163 Vec<(PublicKey, Ed25519, Share)>,
164 Vec<PublicKey>,
165 Registrations<PublicKey>,
166 ) {
167 let (network, mut oracle) = Network::new(
168 context.with_label("network"),
169 commonware_p2p::simulated::Config {
170 max_size: 1024 * 1024,
171 },
172 );
173 network.start();
174
175 let mut schemes = (0..num_validators)
176 .map(|i| Ed25519::from_seed(i as u64))
177 .collect::<Vec<_>>();
178 schemes.sort_by_key(|s| s.public_key());
179 let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
180 .iter()
181 .enumerate()
182 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
183 .collect();
184 let pks = validators
185 .iter()
186 .map(|(pk, _, _)| pk.clone())
187 .collect::<Vec<_>>();
188
189 let registrations = register_validators(&mut oracle, &pks).await;
190 let link = Link {
191 latency: 10.0,
192 jitter: 1.0,
193 success_rate: 1.0,
194 };
195 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
196 (oracle, validators, pks, registrations)
197 }
198
199 #[allow(clippy::too_many_arguments)]
200 fn spawn_validator_engines(
201 context: Context,
202 identity: poly::Public,
203 pks: &[PublicKey],
204 validators: &[(PublicKey, Ed25519, Share)],
205 registrations: &mut Registrations<PublicKey>,
206 mailboxes: &mut BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>,
207 collectors: &mut BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
208 refresh_epoch_timeout: Duration,
209 rebroadcast_timeout: Duration,
210 ) {
211 let namespace = b"my testing namespace";
212 for (validator, scheme, share) in validators.iter() {
213 let context = context.with_label(&validator.to_string());
214 let mut coordinator = mocks::coordinator::Coordinator::<PublicKey>::new(
215 identity.clone(),
216 pks.to_vec(),
217 *share,
218 );
219 coordinator.set_view(111);
220
221 let (app, app_mailbox) =
222 mocks::application::Application::<Sha256Digest, PublicKey>::new();
223 mailboxes.insert(validator.clone(), app_mailbox.clone());
224
225 let (collector, collector_mailbox) =
226 mocks::collector::Collector::<Ed25519, Sha256Digest>::new(
227 namespace,
228 poly::public(&identity),
229 );
230 context.with_label("collector").spawn(|_| collector.run());
231 collectors.insert(validator.clone(), collector_mailbox);
232
233 let (engine, mailbox) = Engine::new(
234 context.with_label("engine"),
235 Config {
236 crypto: scheme.clone(),
237 application: app_mailbox.clone(),
238 collector: collectors.get(validator).unwrap().clone(),
239 coordinator,
240 mailbox_size: 1024,
241 verify_concurrent: 1024,
242 namespace: namespace.to_vec(),
243 epoch_bounds: (1, 1),
244 height_bound: 2,
245 refresh_epoch_timeout,
246 rebroadcast_timeout,
247 journal_heights_per_section: 10,
248 journal_replay_concurrency: 1,
249 journal_name_prefix: format!("broadcast-linked-seq/{}/", validator),
250 },
251 );
252
253 context.with_label("app").spawn(|_| app.run(mailbox));
254 let ((a1, a2), (b1, b2)) = registrations.remove(validator).unwrap();
255 engine.start((a1, a2), (b1, b2));
256 }
257 }
258
259 fn spawn_proposer(
260 context: Context,
261 mailboxes: Arc<
262 Mutex<BTreeMap<PublicKey, mocks::application::Mailbox<Sha256Digest, PublicKey>>>,
263 >,
264 invalid_when: fn(u64) -> bool,
265 ) {
266 context
267 .clone()
268 .with_label("invalid signature proposer")
269 .spawn(move |context| async move {
270 let mut iter = 0;
271 loop {
272 iter += 1;
273 let mailbox_vec: Vec<mocks::application::Mailbox<Sha256Digest, PublicKey>> = {
274 let guard = mailboxes.lock().unwrap();
275 guard.values().cloned().collect()
276 };
277 for mut mailbox in mailbox_vec {
278 let payload = Bytes::from(format!("hello world, iter {}", iter));
279 let mut hasher = Sha256::default();
280 hasher.update(&payload);
281
282 if invalid_when(iter) {
284 hasher.update(&payload);
285 }
286
287 let digest = hasher.finalize();
288 mailbox.broadcast(digest).await;
289 }
290 context.sleep(Duration::from_millis(250)).await;
291 }
292 });
293 }
294
295 async fn await_collectors(
296 context: Context,
297 collectors: &BTreeMap<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>,
298 threshold: u64,
299 ) {
300 let mut receivers = Vec::new();
301 for (sequencer, mailbox) in collectors.iter() {
302 let (tx, rx) = oneshot::channel();
304 receivers.push(rx);
305
306 context.with_label("collector_watcher").spawn({
308 let sequencer = sequencer.clone();
309 let mut mailbox = mailbox.clone();
310 move |context| async move {
311 loop {
312 let tip = mailbox.get_tip(sequencer.clone()).await.unwrap_or(0);
313 debug!(tip, ?sequencer, "collector");
314 if tip >= threshold {
315 let _ = tx.send(sequencer.clone());
316 break;
317 }
318 context.sleep(Duration::from_millis(100)).await;
319 }
320 }
321 });
322 }
323
324 let results = join_all(receivers).await;
326 assert_eq!(results.len(), collectors.len());
327 }
328
329 #[test_traced]
330 fn test_all_online() {
331 let num_validators: u32 = 4;
332 let quorum: u32 = 3;
333 let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
334 let (identity, mut shares_vec) =
335 ops::generate_shares(&mut context, None, num_validators, quorum);
336 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
337
338 runner.start(async move {
339 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
340 context.with_label("simulation"),
341 num_validators,
342 &mut shares_vec,
343 )
344 .await;
345 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
346 PublicKey,
347 mocks::application::Mailbox<Sha256Digest, PublicKey>,
348 >::new()));
349 let mut collectors =
350 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
351 spawn_validator_engines(
352 context.with_label("validator"),
353 identity.clone(),
354 &pks,
355 &validators,
356 &mut registrations,
357 &mut mailboxes.lock().unwrap(),
358 &mut collectors,
359 Duration::from_millis(100),
360 Duration::from_secs(5),
361 );
362 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
363 await_collectors(context.with_label("collector"), &collectors, 100).await;
364 });
365 }
366
367 #[test_traced]
368 fn test_unclean_shutdown() {
369 let num_validators: u32 = 4;
370 let quorum: u32 = 3;
371 let (mut runner, mut context, _) = Executor::timed(Duration::from_secs(45));
372 let (identity, mut shares_vec) =
373 ops::generate_shares(&mut context, None, num_validators, quorum);
374 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
375 let completed = Arc::new(Mutex::new(HashSet::new()));
376 let shutdowns = Arc::new(Mutex::new(0u64));
377
378 while completed.lock().unwrap().len() != num_validators as usize {
379 runner.start({
380 let context = context.clone();
381 let completed = completed.clone();
382 let shares_vec = shares_vec.clone();
383 let shutdowns = shutdowns.clone();
384 let identity = identity.clone();
385 async move {
386 let (network, mut oracle) = Network::new(
387 context.with_label("network"),
388 commonware_p2p::simulated::Config {
389 max_size: 1024 * 1024,
390 },
391 );
392 network.start();
393
394 let mut schemes = (0..num_validators)
395 .map(|i| Ed25519::from_seed(i as u64))
396 .collect::<Vec<_>>();
397 schemes.sort_by_key(|s| s.public_key());
398 let validators: Vec<(PublicKey, Ed25519, Share)> = schemes
399 .iter()
400 .enumerate()
401 .map(|(i, scheme)| (scheme.public_key(), scheme.clone(), shares_vec[i]))
402 .collect();
403 let pks = validators
404 .iter()
405 .map(|(pk, _, _)| pk.clone())
406 .collect::<Vec<_>>();
407
408 let mut registrations = register_validators(&mut oracle, &pks).await;
409 let link = commonware_p2p::simulated::Link {
410 latency: 10.0,
411 jitter: 1.0,
412 success_rate: 1.0,
413 };
414 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
415
416 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
417 PublicKey,
418 mocks::application::Mailbox<Sha256Digest, PublicKey>,
419 >::new()));
420 let mut collectors = BTreeMap::<
421 PublicKey,
422 mocks::collector::Mailbox<Ed25519, Sha256Digest>,
423 >::new();
424 spawn_validator_engines(
425 context.with_label("validator"),
426 identity.clone(),
427 &pks,
428 &validators,
429 &mut registrations,
430 &mut mailboxes.lock().unwrap(),
431 &mut collectors,
432 Duration::from_millis(100),
433 Duration::from_secs(5),
434 );
435 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
436
437 let collector_pairs: Vec<(
438 PublicKey,
439 mocks::collector::Mailbox<Ed25519, Sha256Digest>,
440 )> = collectors
441 .iter()
442 .map(|(v, m)| (v.clone(), m.clone()))
443 .collect();
444 for (validator, mut mailbox) in collector_pairs {
445 let completed_clone = completed.clone();
446 context
447 .with_label("collector_unclean")
448 .spawn(|context| async move {
449 loop {
450 let tip = mailbox.get_tip(validator.clone()).await.unwrap_or(0);
451 if tip >= 100 {
452 completed_clone.lock().unwrap().insert(validator.clone());
453 break;
454 }
455 context.sleep(Duration::from_millis(100)).await;
456 }
457 });
458 }
459 context.sleep(Duration::from_millis(1000)).await;
460 *shutdowns.lock().unwrap() += 1;
461 }
462 });
463 let recovered = context.recover();
464 runner = recovered.0;
465 context = recovered.1;
466 }
467 }
468
469 #[test_traced]
470 fn test_network_partition() {
471 let num_validators: u32 = 4;
472 let quorum: u32 = 3;
473 let (runner, mut context, _) = Executor::timed(Duration::from_secs(60));
474 let (identity, mut shares_vec) =
475 ops::generate_shares(&mut context, None, num_validators, quorum);
476 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
477
478 runner.start(async move {
479 let (mut oracle, validators, pks, mut registrations) = initialize_simulation(
480 context.with_label("simulation"),
481 num_validators,
482 &mut shares_vec,
483 )
484 .await;
485 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
486 PublicKey,
487 mocks::application::Mailbox<Sha256Digest, PublicKey>,
488 >::new()));
489 let mut collectors =
490 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
491 spawn_validator_engines(
492 context.with_label("validator"),
493 identity.clone(),
494 &pks,
495 &validators,
496 &mut registrations,
497 &mut mailboxes.lock().unwrap(),
498 &mut collectors,
499 Duration::from_millis(100),
500 Duration::from_secs(1),
501 );
502 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
503 link_validators(&mut oracle, &pks, Action::Unlink, None).await;
505 context.sleep(Duration::from_secs(5)).await;
506 let link = Link {
508 latency: 10.0,
509 jitter: 1.0,
510 success_rate: 1.0,
511 };
512 link_validators(&mut oracle, &pks, Action::Link(link), None).await;
513 await_collectors(context.with_label("collector"), &collectors, 100).await;
514 });
515 }
516
517 fn slow_and_lossy_links(seed: u64) -> String {
518 let num_validators: u32 = 4;
519 let quorum: u32 = 3;
520 let cfg = deterministic::Config {
521 seed,
522 timeout: Some(Duration::from_secs(40)),
523 ..deterministic::Config::default()
524 };
525 let (runner, mut context, auditor) = Executor::init(cfg);
526 let (identity, mut shares_vec) =
527 ops::generate_shares(&mut context, None, num_validators, quorum);
528 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
529
530 runner.start(async move {
531 let (oracle, validators, pks, mut registrations) = initialize_simulation(
532 context.with_label("simulation"),
533 num_validators,
534 &mut shares_vec,
535 )
536 .await;
537 let delayed_link = Link {
538 latency: 50.0,
539 jitter: 40.0,
540 success_rate: 0.5,
541 };
542 let mut oracle_clone = oracle.clone();
543 link_validators(&mut oracle_clone, &pks, Action::Update(delayed_link), None).await;
544
545 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
546 PublicKey,
547 mocks::application::Mailbox<Sha256Digest, PublicKey>,
548 >::new()));
549 let mut collectors =
550 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
551 spawn_validator_engines(
552 context.with_label("validator"),
553 identity.clone(),
554 &pks,
555 &validators,
556 &mut registrations,
557 &mut mailboxes.lock().unwrap(),
558 &mut collectors,
559 Duration::from_millis(100),
560 Duration::from_millis(150),
561 );
562
563 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |_| false);
564 await_collectors(context.with_label("collector"), &collectors, 40).await;
565 });
566 auditor.state()
567 }
568
569 #[test_traced]
570 fn test_slow_and_lossy_links() {
571 slow_and_lossy_links(0);
572 }
573
574 #[test_traced]
575 fn test_determinism() {
576 for seed in 1..6 {
579 let state_1 = slow_and_lossy_links(seed);
580 let state_2 = slow_and_lossy_links(seed);
581 assert_eq!(state_1, state_2);
582 }
583 }
584
585 #[test_traced]
586 fn test_invalid_signature_injection() {
587 let num_validators: u32 = 4;
588 let quorum: u32 = 3;
589 let (runner, mut context, _) = Executor::timed(Duration::from_secs(30));
590 let (identity, mut shares_vec) =
591 ops::generate_shares(&mut context, None, num_validators, quorum);
592 shares_vec.sort_by(|a, b| a.index.cmp(&b.index));
593
594 runner.start(async move {
595 let (_oracle, validators, pks, mut registrations) = initialize_simulation(
596 context.with_label("simulation"),
597 num_validators,
598 &mut shares_vec,
599 )
600 .await;
601 let mailboxes = Arc::new(Mutex::new(BTreeMap::<
602 PublicKey,
603 mocks::application::Mailbox<Sha256Digest, PublicKey>,
604 >::new()));
605 let mut collectors =
606 BTreeMap::<PublicKey, mocks::collector::Mailbox<Ed25519, Sha256Digest>>::new();
607 spawn_validator_engines(
608 context.with_label("validator"),
609 identity.clone(),
610 &pks,
611 &validators,
612 &mut registrations,
613 &mut mailboxes.lock().unwrap(),
614 &mut collectors,
615 Duration::from_millis(100),
616 Duration::from_secs(5),
617 );
618
619 spawn_proposer(context.with_label("proposer"), mailboxes.clone(), |i| {
620 i % 10 == 0
621 });
622 await_collectors(context.with_label("collector"), &collectors, 100).await;
623 });
624 }
625}