1mod config;
23pub use config::Config;
24mod engine;
25pub use engine::Engine;
26mod ingress;
27pub use ingress::Mailbox;
28pub(crate) use ingress::Message;
29mod metrics;
30
31#[cfg(test)]
32pub mod mocks;
33
34#[cfg(test)]
35mod tests {
36 use super::{mocks::TestMessage, *};
37 use crate::Broadcaster;
38 use commonware_codec::RangeCfg;
39 use commonware_cryptography::{
40 ed25519::PublicKey, sha256::Digest as Sha256Digest, Committable, Digestible, Ed25519,
41 Signer,
42 };
43 use commonware_macros::{select, test_traced};
44 use commonware_p2p::{
45 simulated::{Link, Network, Oracle, Receiver, Sender},
46 Recipients,
47 };
48 use commonware_runtime::{deterministic, Clock, Metrics, Runner};
49 use std::{collections::BTreeMap, time::Duration};
50
51 const CACHE_SIZE: usize = 10;
53
54 const A_JIFFY: Duration = Duration::from_millis(10);
57
58 const NETWORK_SPEED: Duration = Duration::from_millis(100);
60
61 const NETWORK_SPEED_WITH_BUFFER: Duration = Duration::from_millis(200);
63
64 type Registrations = BTreeMap<PublicKey, (Sender<PublicKey>, Receiver<PublicKey>)>;
65
66 async fn initialize_simulation(
67 context: deterministic::Context,
68 num_peers: u32,
69 success_rate: f64,
70 ) -> (Vec<PublicKey>, Registrations, Oracle<PublicKey>) {
71 let (network, mut oracle) = Network::<deterministic::Context, PublicKey>::new(
72 context.with_label("network"),
73 commonware_p2p::simulated::Config {
74 max_size: 1024 * 1024,
75 },
76 );
77 network.start();
78
79 let mut schemes = (0..num_peers)
80 .map(|i| Ed25519::from_seed(i as u64))
81 .collect::<Vec<_>>();
82 schemes.sort_by_key(|s| s.public_key());
83 let peers: Vec<PublicKey> = schemes.iter().map(|c| (c.public_key())).collect();
84
85 let mut registrations: Registrations = BTreeMap::new();
86 for peer in peers.iter() {
87 let (sender, receiver) = oracle.register(peer.clone(), 0).await.unwrap();
88 registrations.insert(peer.clone(), (sender, receiver));
89 }
90
91 let link = Link {
93 latency: NETWORK_SPEED.as_millis() as f64,
94 jitter: 0.0,
95 success_rate,
96 };
97 for p1 in peers.iter() {
98 for p2 in peers.iter() {
99 if p2 == p1 {
100 continue;
101 }
102 oracle
103 .add_link(p1.clone(), p2.clone(), link.clone())
104 .await
105 .unwrap();
106 }
107 }
108
109 (peers, registrations, oracle)
110 }
111
112 fn spawn_peer_engines(
113 context: deterministic::Context,
114 registrations: &mut Registrations,
115 ) -> BTreeMap<PublicKey, Mailbox<PublicKey, Sha256Digest, Sha256Digest, TestMessage>> {
116 let mut mailboxes = BTreeMap::new();
117 while let Some((peer, network)) = registrations.pop_first() {
118 let context = context.with_label(&peer.to_string());
119 let config = Config {
120 public_key: peer.clone(),
121 mailbox_size: 1024,
122 deque_size: CACHE_SIZE,
123 priority: false,
124 codec_config: RangeCfg::from(..),
125 };
126 let (engine, engine_mailbox) =
127 Engine::<_, PublicKey, Sha256Digest, Sha256Digest, TestMessage>::new(
128 context.clone(),
129 config,
130 );
131 mailboxes.insert(peer.clone(), engine_mailbox);
132 engine.start(network);
133 }
134 mailboxes
135 }
136
137 #[test_traced]
138 fn test_broadcast() {
139 let runner = deterministic::Runner::timed(Duration::from_secs(5));
140 runner.start(|context| async move {
141 let (peers, mut registrations, _oracle) =
142 initialize_simulation(context.clone(), 4, 1.0).await;
143 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
144
145 let message = TestMessage::shared(b"hello world test message");
147 let mut first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
148 let result = first_mailbox
149 .broadcast(Recipients::All, message.clone())
150 .await;
151
152 context.sleep(Duration::from_secs(1)).await;
154
155 for peer in peers.iter() {
157 let mut mailbox = mailboxes.get(peer).unwrap().clone();
158 let commitment = message.commitment();
159 let receiver = mailbox.subscribe(None, commitment, None).await;
160 let received_message = receiver.await.ok();
161 assert_eq!(received_message.unwrap(), message.clone());
162 }
163 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
164
165 let message = TestMessage::shared(b"hello world again");
167 let result = first_mailbox
168 .broadcast(Recipients::All, message.clone())
169 .await;
170 drop(result);
171
172 context.sleep(Duration::from_secs(1)).await;
174
175 let mut found = 0;
177 for peer in peers.iter() {
178 let mut mailbox = mailboxes.get(peer).unwrap().clone();
179 let commitment = message.commitment();
180 let receiver = mailbox.get(None, commitment, None).await;
181 if !receiver.is_empty() {
182 assert_eq!(receiver, vec![message.clone()]);
183 found += 1;
184 }
185 }
186 assert!(found > 0, "No peers received the message");
187 });
188 }
189
190 #[test_traced]
191 fn test_self_retrieval() {
192 let runner = deterministic::Runner::timed(Duration::from_secs(5));
193 runner.start(|context| async move {
194 let (peers, mut registrations, _oracle) =
196 initialize_simulation(context.clone(), 1, 1.0).await;
197 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
198
199 let mut mailbox_a = mailboxes.get(&peers[0]).unwrap().clone();
201
202 let m1 = TestMessage::shared(b"hello world");
204 let commitment_m1 = m1.commitment();
205
206 let receiver_before = mailbox_a.get(None, commitment_m1, None).await;
208 assert!(receiver_before.is_empty());
209
210 let receiver_before = mailbox_a.subscribe(None, commitment_m1, None).await;
212
213 let result = mailbox_a.broadcast(Recipients::All, m1.clone()).await;
215 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
216
217 let msg_before = receiver_before
219 .await
220 .expect("Pre-broadcast retrieval failed");
221 assert_eq!(msg_before, m1);
222
223 let receiver_after = mailbox_a.get(None, commitment_m1, None).await;
225 assert_eq!(receiver_after, vec![m1.clone()]);
226
227 let receiver_after = mailbox_a.subscribe(None, commitment_m1, None).await;
229
230 let start = context.current();
232 let msg_after = receiver_after
233 .await
234 .expect("Post-broadcast retrieval failed");
235 let duration = context.current().duration_since(start).unwrap();
236
237 assert_eq!(msg_after, m1);
239
240 assert!(duration < A_JIFFY, "get not instant");
242 });
243 }
244
245 #[test_traced]
246 fn test_packet_loss() {
247 let runner = deterministic::Runner::timed(Duration::from_secs(30));
248 runner.start(|context| async move {
249 let (peers, mut registrations, _oracle) =
250 initialize_simulation(context.clone(), 10, 0.1).await;
251 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
252
253 let message = TestMessage::shared(b"hello world test message");
255 let mut first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
256
257 let commitment = message.commitment();
259 for i in 0..100 {
260 let result = first_mailbox
262 .broadcast(Recipients::All, message.clone())
263 .await;
264 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
265
266 let mut all_received = true;
268 for peer in peers.iter() {
269 let mut mailbox = mailboxes.get(peer).unwrap().clone();
270 let receiver = mailbox.subscribe(None, commitment, None).await;
271 let has = select! {
272 _ = context.sleep(A_JIFFY) => {false},
273 r = receiver => { r.is_ok() },
274 };
275 all_received &= has;
276 }
277 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
278
279 if all_received {
281 assert!(i > 0, "Message received on first try");
282 return;
283 }
284 }
285 panic!("Not all peers received the message after retries");
286 });
287 }
288
289 #[test_traced]
290 fn test_get_cached() {
291 let runner = deterministic::Runner::timed(Duration::from_secs(5));
292 runner.start(|context| async move {
293 let (peers, mut registrations, _oracle) =
294 initialize_simulation(context.clone(), 2, 1.0).await;
295 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
296
297 let message = TestMessage::shared(b"cached message");
299 let mut first_mailbox = mailboxes.get(peers.first().unwrap()).unwrap().clone();
300 let result = first_mailbox
301 .broadcast(Recipients::All, message.clone())
302 .await;
303 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
304
305 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
307
308 let commitment = message.commitment();
310 let mut mailbox = mailboxes.get(peers.last().unwrap()).unwrap().clone();
311 let receiver = mailbox.subscribe(None, commitment, None).await;
312 let start = context.current();
313 let received = receiver.await.expect("failed to get cached message");
314 let duration = context.current().duration_since(start).unwrap();
315 assert_eq!(received, message);
316 assert!(duration < A_JIFFY, "get not instant",);
317 });
318 }
319
320 #[test_traced]
321 fn test_get_nonexistent() {
322 let runner = deterministic::Runner::timed(Duration::from_secs(5));
323 runner.start(|context| async move {
324 let (peers, mut registrations, _oracle) =
325 initialize_simulation(context.clone(), 2, 1.0).await;
326 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
327
328 let message = TestMessage::shared(b"future message");
330 let commitment = message.commitment();
331 let mut mailbox1 = mailboxes.get(&peers[0]).unwrap().clone();
332 let mut mailbox2 = mailboxes.get(&peers[1]).unwrap().clone();
333 let receiver = mailbox1.subscribe(None, commitment, None).await;
334
335 let dummy1 = mailbox1.subscribe(None, commitment, None).await;
337 let dummy2 = mailbox2.subscribe(None, commitment, None).await;
338 drop(dummy1);
339 drop(dummy2);
340
341 let result = mailbox1.broadcast(Recipients::All, message.clone()).await;
343 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
344
345 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
347
348 let received = receiver.await.expect("receiver1 should get message");
350 assert_eq!(received, message);
351 });
352 }
353
354 #[test_traced]
355 fn test_cache_eviction_single_peer() {
356 let runner = deterministic::Runner::timed(Duration::from_secs(5));
357 runner.start(|context| async move {
358 let (peers, mut registrations, _oracle) =
359 initialize_simulation(context.clone(), 2, 1.0).await;
360 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
361
362 let mut mailbox = mailboxes.get(&peers[0]).unwrap().clone();
364 let mut messages = vec![];
365 for i in 0..CACHE_SIZE + 1 {
366 messages.push(TestMessage::shared(format!("message {}", i).as_bytes()));
367 }
368 for message in messages.iter() {
369 let result = mailbox.broadcast(Recipients::All, message.clone()).await;
370 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
371 }
372
373 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
375
376 let mut peer_mailbox = mailboxes.get(&peers[1]).unwrap().clone();
378 for msg in messages.iter().skip(1) {
379 let result = peer_mailbox
380 .subscribe(None, msg.commitment(), None)
381 .await
382 .await
383 .unwrap();
384 assert_eq!(result, msg.clone());
385 }
386
387 let receiver = peer_mailbox
389 .subscribe(None, messages[0].commitment(), None)
390 .await;
391 select! {
392 _ = context.sleep(A_JIFFY) => {},
393 _ = receiver => { panic!("receiver should have failed")},
394 }
395 });
396 }
397
398 #[test_traced]
399 fn test_cache_eviction_multi_peer() {
400 let runner = deterministic::Runner::timed(Duration::from_secs(10));
401 runner.start(|context| async move {
402 let (peers, mut registrations, _oracle) =
404 initialize_simulation(context.clone(), 3, 1.0).await;
405 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
406
407 let mut mailbox_a = mailboxes.get(&peers[0]).unwrap().clone();
409 let mut mailbox_b = mailboxes.get(&peers[1]).unwrap().clone();
410 let mut mailbox_c = mailboxes.get(&peers[2]).unwrap().clone();
411
412 let m1 = TestMessage::shared(b"message M1");
414 let commitment_m1 = m1.commitment();
415 let result = mailbox_a.broadcast(Recipients::All, m1.clone()).await;
416 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
417 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
418
419 let result = mailbox_c.broadcast(Recipients::All, m1.clone()).await;
421 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
422 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
423
424 let mut new_messages_a = Vec::with_capacity(CACHE_SIZE);
428 for i in 0..CACHE_SIZE {
429 new_messages_a.push(TestMessage::shared(format!("A{}", i).as_bytes()));
430 }
431 for msg in &new_messages_a {
432 let result = mailbox_a.broadcast(Recipients::All, msg.clone()).await;
433 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
434 }
435 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
436
437 let receiver = mailbox_b.subscribe(None, commitment_m1, None).await;
439 let received = receiver.await.expect("M1 should be retrievable");
440 assert_eq!(received, m1);
441
442 let mut new_messages_c = Vec::with_capacity(CACHE_SIZE);
444 for i in 0..CACHE_SIZE {
445 new_messages_c.push(TestMessage::shared(format!("C{}", i).as_bytes()));
446 }
447 for msg in &new_messages_c {
448 let result = mailbox_c.broadcast(Recipients::All, msg.clone()).await;
449 assert_eq!(result.await.unwrap().len(), peers.len() - 1);
450 }
451 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
452
453 let receiver = mailbox_b.subscribe(None, commitment_m1, None).await;
455 select! {
456 _ = context.sleep(A_JIFFY) => {},
457 _ = receiver => { panic!("M1 should not be retrievable"); },
458 }
459 });
460 }
461
462 #[test_traced]
463 fn test_selective_recipients() {
464 let runner = deterministic::Runner::timed(Duration::from_secs(5));
465 runner.start(|context| async move {
466 let (peers, mut registrations, _oracle) =
467 initialize_simulation(context.clone(), 4, 1.0).await;
468
469 let sender_pk = peers[0].clone();
470 let target_peer = peers[1].clone();
471 let non_target_peer = peers[2].clone();
472
473 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
474 let mut sender_mb = mailboxes.get(&sender_pk).unwrap().clone();
475
476 let msg = TestMessage::shared(b"selective-broadcast");
477 let result = sender_mb
478 .broadcast(Recipients::One(target_peer.clone()), msg.clone())
479 .await;
480 assert_eq!(result.await.unwrap(), vec![target_peer.clone()]);
481
482 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
483
484 let got_target = mailboxes
486 .get(&target_peer)
487 .unwrap()
488 .clone()
489 .get(None, msg.commitment(), None)
490 .await;
491 assert_eq!(got_target, vec![msg.clone()]);
492
493 let got_other = mailboxes
495 .get(&non_target_peer)
496 .unwrap()
497 .clone()
498 .get(None, msg.commitment(), None)
499 .await;
500 assert!(got_other.is_empty());
501 });
502 }
503
504 #[test_traced]
505 fn test_sender_filter_subscribe() {
506 let runner = deterministic::Runner::timed(Duration::from_secs(10));
507 runner.start(|context| async move {
508 let (peers, mut registrations, _oracle) =
509 initialize_simulation(context.clone(), 4, 1.0).await;
510 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
511
512 let sender1 = peers[0].clone();
513 let sender2 = peers[1].clone();
514 let sender3 = peers[2].clone();
515
516 let mut mb1 = mailboxes.get(&sender1).unwrap().clone();
517 let mut mb2 = mailboxes.get(&sender2).unwrap().clone();
518 let mut mb3 = mailboxes.get(&sender3).unwrap().clone();
519
520 let msg = TestMessage::shared(b"from-one");
521 let id = msg.commitment();
522
523 let mut recv = mb2.subscribe(Some(sender1.clone()), id, None).await;
525
526 mb3.broadcast(Recipients::All, msg.clone())
528 .await
529 .await
530 .unwrap();
531
532 context.sleep(A_JIFFY).await;
534
535 assert!(recv.try_recv().unwrap().is_none());
537
538 mb1.broadcast(Recipients::All, msg.clone())
540 .await
541 .await
542 .unwrap();
543 assert_eq!(recv.await.unwrap(), msg);
544 });
545 }
546
547 #[test_traced]
548 fn test_get_all_for_commitment() {
549 let runner = deterministic::Runner::timed(Duration::from_secs(5));
550 runner.start(|context| async move {
551 let (peers, mut registrations, _oracle) =
552 initialize_simulation(context.clone(), 4, 1.0).await;
553 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
554
555 let sender1 = peers[0].clone();
556 let sender2 = peers[1].clone();
557
558 let mut mb1 = mailboxes.get(&sender1).unwrap().clone();
559 let mut mb2 = mailboxes.get(&sender2).unwrap().clone();
560
561 let m1 = TestMessage::new(b"id", b"content-1");
563 let m2 = TestMessage::new(b"id", b"content-2");
564 let m3 = TestMessage::new(b"other-id", b"content-3");
565 mb1.broadcast(Recipients::All, m1.clone())
566 .await
567 .await
568 .unwrap();
569 mb1.broadcast(Recipients::All, m2.clone())
570 .await
571 .await
572 .unwrap();
573 mb1.broadcast(Recipients::All, m3.clone())
574 .await
575 .await
576 .unwrap();
577
578 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
580
581 let mut got = mb2.get(None, m1.commitment(), None).await;
583 got.sort_by_key(|m| m.content.clone());
584 assert_eq!(got, vec![m1.clone(), m2.clone()]);
585
586 let got = mb2.get(None, m1.commitment(), Some(m1.digest())).await;
588 assert_eq!(got, vec![m1.clone()]);
589
590 let got = mb2.get(None, m3.commitment(), None).await;
592 assert_eq!(got, vec![m3.clone()]);
593
594 let got = mb2.get(None, m3.commitment(), Some(m2.digest())).await;
596 assert!(got.is_empty());
597
598 let mut got = mb2.get(Some(sender1.clone()), m1.commitment(), None).await;
600 got.sort_by_key(|m| m.content.clone());
601 assert_eq!(got, vec![m1.clone(), m2.clone()]);
602 let got = mb2.get(Some(sender1.clone()), m3.commitment(), None).await;
603 assert_eq!(got, vec![m3.clone()]);
604 });
605 }
606
607 #[test_traced]
608 fn test_ref_count_across_peers() {
609 let runner = deterministic::Runner::timed(Duration::from_secs(10));
610 runner.start(|context| async move {
611 let (peers, mut registrations, _oracle) =
613 initialize_simulation(context.clone(), 3, 1.0).await;
614 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
615
616 let p0 = peers[0].clone();
617 let p1 = peers[1].clone();
618 let observer = peers[2].clone();
619
620 let mut mb0 = mailboxes.get(&p0).unwrap().clone();
621 let mut mb1 = mailboxes.get(&p1).unwrap().clone();
622 let mut obs = mailboxes.get(&observer).unwrap().clone();
623
624 let dup = TestMessage::shared(b"dup");
626 let id = dup.commitment();
627
628 mb0.broadcast(Recipients::All, dup.clone())
630 .await
631 .await
632 .unwrap();
633 mb1.broadcast(Recipients::All, dup.clone())
634 .await
635 .await
636 .unwrap();
637 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
638
639 assert_eq!(obs.get(None, id, None).await, vec![dup.clone()]);
641
642 for i in 0..CACHE_SIZE {
644 let spam = TestMessage::shared(format!("p0-{i}").into_bytes());
645 mb0.broadcast(Recipients::All, spam).await.await.unwrap();
646 }
647 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
648 assert_eq!(obs.get(None, id, None).await, vec![dup.clone()]);
649
650 for i in 0..CACHE_SIZE {
652 let spam = TestMessage::shared(format!("p1-{i}").into_bytes());
653 mb1.broadcast(Recipients::All, spam).await.await.unwrap();
654 }
655 context.sleep(NETWORK_SPEED_WITH_BUFFER).await;
656 assert!(obs.get(None, id, None).await.is_empty());
657 });
658 }
659
660 #[test_traced]
661 fn test_digest_filtered_waiter() {
662 let runner = deterministic::Runner::timed(Duration::from_secs(5));
663 runner.start(|context| async move {
664 let (peers, mut registrations, _oracle) =
665 initialize_simulation(context.clone(), 3, 1.0).await;
666 let mailboxes = spawn_peer_engines(context.clone(), &mut registrations);
667
668 let owner = peers[0].clone();
669 let spoiler = peers[1].clone();
670 let waiter = peers[2].clone();
671
672 let mut mb_owner = mailboxes.get(&owner).unwrap().clone();
673 let mut mb_spoiler = mailboxes.get(&spoiler).unwrap().clone();
674 let mut mb_waiter = mailboxes.get(&waiter).unwrap().clone();
675
676 let wanted = TestMessage::new(b"same-id", b"wanted");
678 let not_want = TestMessage::new(b"same-id", b"noise");
679
680 let mut recv = mb_waiter
682 .subscribe(
683 Some(owner.clone()),
684 wanted.commitment(),
685 Some(wanted.digest()),
686 )
687 .await;
688
689 mb_spoiler
691 .broadcast(Recipients::All, wanted.clone())
692 .await
693 .await
694 .unwrap();
695 context.sleep(A_JIFFY).await;
696 assert!(recv.try_recv().unwrap().is_none());
697
698 mb_owner
700 .broadcast(Recipients::All, not_want.clone())
701 .await
702 .await
703 .unwrap();
704 context.sleep(A_JIFFY).await;
705 assert!(recv.try_recv().unwrap().is_none());
706
707 mb_owner
709 .broadcast(Recipients::All, wanted.clone())
710 .await
711 .await
712 .unwrap();
713 assert_eq!(recv.await.unwrap(), wanted);
714 });
715 }
716}