1use bytes::Bytes;
32use commonware_cryptography::PublicKey;
33use commonware_utils::Array;
34use futures::channel::oneshot;
35use std::future::Future;
36
37mod config;
38pub use config::Config;
39mod engine;
40pub use engine::Engine;
41mod fetcher;
42mod ingress;
43pub use ingress::Mailbox;
44mod metrics;
45mod wire;
46
47#[cfg(test)]
48pub mod mocks;
49
50pub trait Producer: Clone + Send + 'static {
52 type Key: Array;
54
55 fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
57}
58
59pub trait Coordinator: Clone + Send + Sync + 'static {
61 type PublicKey: PublicKey;
63
64 fn peers(&self) -> &Vec<Self::PublicKey>;
68
69 fn peer_set_id(&self) -> u64;
75}
76
77#[cfg(test)]
78mod tests {
79 use super::{
80 mocks::{Consumer, Coordinator, CoordinatorMsg, Event, Key, Producer},
81 Config, Engine, Mailbox,
82 };
83 use crate::Resolver;
84 use bytes::Bytes;
85 use commonware_cryptography::{
86 ed25519::{PrivateKey, PublicKey},
87 PrivateKeyExt as _, Signer,
88 };
89 use commonware_macros::{select, test_traced};
90 use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
91 use commonware_runtime::{deterministic, Clock, Metrics, Runner};
92 use commonware_utils::NZU32;
93 use futures::{SinkExt, StreamExt};
94 use std::time::Duration;
95
96 const MAILBOX_SIZE: usize = 1024;
97 const RATE_LIMIT: u32 = 10;
98 const INITIAL_DURATION: Duration = Duration::from_millis(100);
99 const TIMEOUT: Duration = Duration::from_millis(400);
100 const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
101 const LINK: Link = Link {
102 latency: 10.0,
103 jitter: 1.0,
104 success_rate: 1.0,
105 };
106 const LINK_UNRELIABLE: Link = Link {
107 latency: 10.0,
108 jitter: 1.0,
109 success_rate: 0.5,
110 };
111
112 async fn setup_network_and_peers(
113 context: &deterministic::Context,
114 peer_seeds: &[u64],
115 ) -> (
116 Oracle<PublicKey>,
117 Vec<PrivateKey>,
118 Vec<PublicKey>,
119 Vec<(Sender<PublicKey>, Receiver<PublicKey>)>,
120 ) {
121 let (network, mut oracle) = Network::new(
122 context.with_label("network"),
123 commonware_p2p::simulated::Config {
124 max_size: 1024 * 1024,
125 },
126 );
127 network.start();
128
129 let schemes: Vec<PrivateKey> = peer_seeds
130 .iter()
131 .map(|seed| PrivateKey::from_seed(*seed))
132 .collect();
133 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
134
135 let mut connections = Vec::new();
136 for peer in &peers {
137 let (sender, receiver) = oracle.register(peer.clone(), 0).await.unwrap();
138 connections.push((sender, receiver));
139 }
140
141 (oracle, schemes, peers, connections)
142 }
143
144 async fn add_link(
145 oracle: &mut Oracle<PublicKey>,
146 link: Link,
147 peers: &[PublicKey],
148 from: usize,
149 to: usize,
150 ) {
151 oracle
152 .add_link(peers[from].clone(), peers[to].clone(), link.clone())
153 .await
154 .unwrap();
155 oracle
156 .add_link(peers[to].clone(), peers[from].clone(), link)
157 .await
158 .unwrap();
159 }
160
161 async fn setup_and_spawn_actor(
162 context: &deterministic::Context,
163 coordinator: &Coordinator<PublicKey>,
164 signer: impl Signer<PublicKey = PublicKey>,
165 connection: (Sender<PublicKey>, Receiver<PublicKey>),
166 consumer: Consumer<Key, Bytes>,
167 producer: Producer<Key, Bytes>,
168 ) -> Mailbox<Key> {
169 let public_key = signer.public_key();
170 let (engine, mailbox) = Engine::new(
171 context.with_label(&format!("actor_{public_key}")),
172 Config {
173 coordinator: coordinator.clone(),
174 consumer,
175 producer,
176 mailbox_size: MAILBOX_SIZE,
177 requester_config: commonware_p2p::utils::requester::Config {
178 public_key,
179 rate_limit: governor::Quota::per_second(NZU32!(RATE_LIMIT)),
180 initial: INITIAL_DURATION,
181 timeout: TIMEOUT,
182 },
183 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
184 priority_requests: false,
185 priority_responses: false,
186 },
187 );
188 engine.start(connection);
189
190 mailbox
191 }
192
193 #[test_traced]
197 fn test_fetch_success() {
198 let executor = deterministic::Runner::timed(Duration::from_secs(10));
199 executor.start(|context| async move {
200 let (mut oracle, mut schemes, peers, mut connections) =
201 setup_network_and_peers(&context, &[1, 2]).await;
202
203 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
204
205 let key = Key(2);
206 let mut prod2 = Producer::default();
207 prod2.insert(key.clone(), Bytes::from("data for key 2"));
208
209 let coordinator = Coordinator::new(peers);
210 let (cons1, mut cons_out1) = Consumer::new();
211
212 let mut mailbox1 = setup_and_spawn_actor(
213 &context,
214 &coordinator,
215 schemes.remove(0),
216 connections.remove(0),
217 cons1,
218 Producer::default(),
219 )
220 .await;
221
222 let _mailbox2 = setup_and_spawn_actor(
223 &context,
224 &coordinator,
225 schemes.remove(0),
226 connections.remove(0),
227 Consumer::dummy(),
228 prod2,
229 )
230 .await;
231
232 mailbox1.fetch(key.clone()).await;
233
234 let event = cons_out1.next().await.unwrap();
235 match event {
236 Event::Success(key_actual, value) => {
237 assert_eq!(key_actual, key);
238 assert_eq!(value, Bytes::from("data for key 2"));
239 }
240 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
241 }
242 });
243 }
244
245 #[test_traced]
249 fn test_cancel_fetch() {
250 let executor = deterministic::Runner::timed(Duration::from_secs(10));
251 executor.start(|context| async move {
252 let (_oracle, mut schemes, peers, mut connections) =
253 setup_network_and_peers(&context, &[1]).await;
254
255 let coordinator = Coordinator::new(peers);
256 let (cons1, mut cons_out1) = Consumer::new();
257 let prod1 = Producer::default();
258
259 let mut mailbox1 = setup_and_spawn_actor(
260 &context,
261 &coordinator,
262 schemes.remove(0),
263 connections.remove(0),
264 cons1,
265 prod1,
266 )
267 .await;
268
269 let key = Key(3);
270 mailbox1.fetch(key.clone()).await;
271 mailbox1.cancel(key.clone()).await;
272
273 let event = cons_out1.next().await.unwrap();
274 match event {
275 Event::Failed(key_actual) => {
276 assert_eq!(key_actual, key);
277 }
278 Event::Success(_, _) => panic!("Fetch should have been canceled"),
279 }
280 });
281 }
282
283 #[test_traced]
288 fn test_peer_no_data() {
289 let executor = deterministic::Runner::timed(Duration::from_secs(10));
290 executor.start(|context| async move {
291 let (mut oracle, mut schemes, peers, mut connections) =
292 setup_network_and_peers(&context, &[1, 2, 3]).await;
293
294 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
295 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
296
297 let prod1 = Producer::default();
298 let prod2 = Producer::default();
299 let mut prod3 = Producer::default();
300 let key = Key(3);
301 prod3.insert(key.clone(), Bytes::from("data for key 3"));
302
303 let coordinator = Coordinator::new(peers);
304 let (cons1, mut cons_out1) = Consumer::new();
305
306 let mut mailbox1 = setup_and_spawn_actor(
307 &context,
308 &coordinator,
309 schemes.remove(0),
310 connections.remove(0),
311 cons1,
312 prod1,
313 )
314 .await;
315
316 let _mailbox2 = setup_and_spawn_actor(
317 &context,
318 &coordinator,
319 schemes.remove(0),
320 connections.remove(0),
321 Consumer::dummy(),
322 prod2,
323 )
324 .await;
325
326 let _mailbox3 = setup_and_spawn_actor(
327 &context,
328 &coordinator,
329 schemes.remove(0),
330 connections.remove(0),
331 Consumer::dummy(),
332 prod3,
333 )
334 .await;
335
336 mailbox1.fetch(key.clone()).await;
337
338 let event = cons_out1.next().await.unwrap();
339 match event {
340 Event::Success(key_actual, value) => {
341 assert_eq!(key_actual, key);
342 assert_eq!(value, Bytes::from("data for key 3"));
343 }
344 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
345 }
346 });
347 }
348
349 #[test_traced]
354 fn test_no_peers_available() {
355 let executor = deterministic::Runner::timed(Duration::from_secs(10));
356 executor.start(|context| async move {
357 let (_oracle, mut schemes, _peers, mut connections) =
358 setup_network_and_peers(&context, &[1]).await;
359
360 let coordinator = Coordinator::new(vec![]);
361 let (cons1, mut cons_out1) = Consumer::new();
362 let prod1 = Producer::default();
363
364 let mut mailbox1 = setup_and_spawn_actor(
365 &context,
366 &coordinator,
367 schemes.remove(0),
368 connections.remove(0),
369 cons1,
370 prod1,
371 )
372 .await;
373
374 let key = Key(4);
375 mailbox1.fetch(key.clone()).await;
376 context.sleep(Duration::from_secs(5)).await;
377 mailbox1.cancel(key.clone()).await;
378
379 let event = cons_out1.next().await.expect("Consumer channel closed");
380 match event {
381 Event::Failed(key_actual) => {
382 assert_eq!(key_actual, key);
383 }
384 Event::Success(_, _) => {
385 panic!("Fetch should have failed due to no peers")
386 }
387 }
388 });
389 }
390
391 #[test_traced]
395 fn test_concurrent_fetch_requests() {
396 let executor = deterministic::Runner::timed(Duration::from_secs(60));
397 executor.start(|context| async move {
398 let (mut oracle, mut schemes, peers, mut connections) =
399 setup_network_and_peers(&context, &[1, 2, 3]).await;
400
401 let key2 = Key(2);
402 let key3 = Key(3);
403 let mut prod2 = Producer::default();
404 prod2.insert(key2.clone(), Bytes::from("data for key 2"));
405 let mut prod3 = Producer::default();
406 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
407
408 let coordinator = Coordinator::new(peers.clone());
409 let (cons1, mut cons_out1) = Consumer::new();
410
411 let mut mailbox1 = setup_and_spawn_actor(
412 &context,
413 &coordinator,
414 schemes.remove(0),
415 connections.remove(0),
416 cons1,
417 Producer::default(),
418 )
419 .await;
420
421 let _mailbox2 = setup_and_spawn_actor(
422 &context,
423 &coordinator,
424 schemes.remove(0),
425 connections.remove(0),
426 Consumer::dummy(),
427 prod2,
428 )
429 .await;
430
431 let _mailbox3 = setup_and_spawn_actor(
432 &context,
433 &coordinator,
434 schemes.remove(0),
435 connections.remove(0),
436 Consumer::dummy(),
437 prod3,
438 )
439 .await;
440
441 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
443 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
444
445 for _ in 0..10 {
447 mailbox1.fetch(key2.clone()).await;
449 mailbox1.fetch(key3.clone()).await;
450
451 let mut events = Vec::new();
453 events.push(cons_out1.next().await.expect("Consumer channel closed"));
454 events.push(cons_out1.next().await.expect("Consumer channel closed"));
455
456 let mut found_key2 = false;
458 let mut found_key3 = false;
459 for event in events {
460 match event {
461 Event::Success(key_actual, value) => {
462 if key_actual == key2 {
463 assert_eq!(value, Bytes::from("data for key 2"));
464 found_key2 = true;
465 } else if key_actual == key3 {
466 assert_eq!(value, Bytes::from("data for key 3"));
467 found_key3 = true;
468 } else {
469 panic!("Unexpected key received");
470 }
471 }
472 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
473 }
474 }
475 assert!(found_key2 && found_key3,);
476 }
477 });
478 }
479
480 #[test_traced]
483 fn test_cancel() {
484 let executor = deterministic::Runner::timed(Duration::from_secs(10));
485 executor.start(|context| async move {
486 let (mut oracle, mut schemes, peers, mut connections) =
487 setup_network_and_peers(&context, &[1, 2]).await;
488
489 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
490
491 let key = Key(6);
492 let mut prod2 = Producer::default();
493 prod2.insert(key.clone(), Bytes::from("data for key 6"));
494
495 let coordinator = Coordinator::new(peers);
496 let (cons1, mut cons_out1) = Consumer::new();
497
498 let mut mailbox1 = setup_and_spawn_actor(
499 &context,
500 &coordinator,
501 schemes.remove(0),
502 connections.remove(0),
503 cons1,
504 Producer::default(),
505 )
506 .await;
507
508 let _mailbox2 = setup_and_spawn_actor(
509 &context,
510 &coordinator,
511 schemes.remove(0),
512 connections.remove(0),
513 Consumer::dummy(),
514 prod2,
515 )
516 .await;
517
518 mailbox1.cancel(key.clone()).await;
520 select! {
521 _ = cons_out1.next() => { panic!("unexpected event"); },
522 _ = context.sleep(Duration::from_millis(100)) => {},
523 };
524
525 mailbox1.fetch(key.clone()).await;
527 let event = cons_out1.next().await.unwrap();
528 match event {
529 Event::Success(key_actual, value) => {
530 assert_eq!(key_actual, key);
531 assert_eq!(value, Bytes::from("data for key 6"));
532 }
533 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
534 }
535
536 mailbox1.cancel(key.clone()).await;
538 select! {
539 _ = cons_out1.next() => { panic!("unexpected event"); },
540 _ = context.sleep(Duration::from_millis(100)) => {},
541 };
542
543 let key = Key(7);
545 mailbox1.fetch(key.clone()).await;
546 mailbox1.cancel(key.clone()).await;
547
548 let event = cons_out1.next().await.unwrap();
550 match event {
551 Event::Failed(key_actual) => {
552 assert_eq!(key_actual, key);
553 }
554 Event::Success(_, _) => panic!("Fetch should have been canceled"),
555 }
556 });
557 }
558
559 #[test_traced]
562 fn test_blocking_peer() {
563 let executor = deterministic::Runner::timed(Duration::from_secs(10));
564 executor.start(|context| async move {
565 let (mut oracle, mut schemes, peers, mut connections) =
566 setup_network_and_peers(&context, &[1, 2, 3]).await;
567
568 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
569 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
570 add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
571
572 let key_a = Key(1);
573 let key_b = Key(2);
574 let invalid_data_a = Bytes::from("invalid for A");
575 let valid_data_a = Bytes::from("valid for A");
576 let valid_data_b = Bytes::from("valid for B");
577
578 let mut prod2 = Producer::default();
580 prod2.insert(key_a.clone(), invalid_data_a.clone());
581 prod2.insert(key_b.clone(), valid_data_b.clone());
582
583 let mut prod3 = Producer::default();
584 prod3.insert(key_a.clone(), valid_data_a.clone());
585
586 let coordinator1 = Coordinator::new(vec![peers[1].clone(), peers[2].clone()]);
588 let coordinator2 = Coordinator::new(vec![peers[0].clone()]);
589 let coordinator3 = Coordinator::new(vec![peers[0].clone()]);
590
591 let (mut cons1, mut cons_out1) = Consumer::new();
593 cons1.add_expected(key_a.clone(), valid_data_a.clone());
594 cons1.add_expected(key_b.clone(), valid_data_b.clone());
595
596 let mut mailbox1 = setup_and_spawn_actor(
598 &context,
599 &coordinator1,
600 schemes.remove(0),
601 connections.remove(0),
602 cons1,
603 Producer::default(),
604 )
605 .await;
606
607 let _mailbox2 = setup_and_spawn_actor(
608 &context,
609 &coordinator2,
610 schemes.remove(0),
611 connections.remove(0),
612 Consumer::dummy(),
613 prod2,
614 )
615 .await;
616
617 let _mailbox3 = setup_and_spawn_actor(
618 &context,
619 &coordinator3,
620 schemes.remove(0),
621 connections.remove(0),
622 Consumer::dummy(),
623 prod3,
624 )
625 .await;
626
627 for _ in 0..10 {
629 mailbox1.fetch(key_a.clone()).await;
631
632 let event = cons_out1.next().await.unwrap();
634 match event {
635 Event::Success(key_actual, value) => {
636 assert_eq!(key_actual, key_a);
637 assert_eq!(value, valid_data_a);
638 }
639 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
640 }
641 }
642
643 mailbox1.fetch(key_b.clone()).await;
645
646 context.sleep(Duration::from_secs(5)).await;
648
649 mailbox1.cancel(key_b.clone()).await;
651
652 let event = cons_out1.next().await.unwrap();
654 match event {
655 Event::Failed(key_actual) => {
656 assert_eq!(key_actual, key_b);
657 }
658 Event::Success(_, _) => panic!("Fetch should have been canceled"),
659 }
660 });
661 }
662
663 #[test_traced]
667 fn test_duplicate_fetch_request() {
668 let executor = deterministic::Runner::timed(Duration::from_secs(10));
669 executor.start(|context| async move {
670 let (mut oracle, mut schemes, peers, mut connections) =
671 setup_network_and_peers(&context, &[1, 2]).await;
672
673 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
674
675 let key = Key(5);
676 let mut prod2 = Producer::default();
677 prod2.insert(key.clone(), Bytes::from("data for key 5"));
678
679 let coordinator = Coordinator::new(peers);
680 let (cons1, mut cons_out1) = Consumer::new();
681
682 let mut mailbox1 = setup_and_spawn_actor(
683 &context,
684 &coordinator,
685 schemes.remove(0),
686 connections.remove(0),
687 cons1,
688 Producer::default(),
689 )
690 .await;
691
692 let _mailbox2 = setup_and_spawn_actor(
693 &context,
694 &coordinator,
695 schemes.remove(0),
696 connections.remove(0),
697 Consumer::dummy(),
698 prod2,
699 )
700 .await;
701
702 mailbox1.fetch(key.clone()).await;
704 mailbox1.fetch(key.clone()).await;
705
706 let event = cons_out1.next().await.unwrap();
708 match event {
709 Event::Success(key_actual, value) => {
710 assert_eq!(key_actual, key);
711 assert_eq!(value, Bytes::from("data for key 5"));
712 }
713 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
714 }
715
716 select! {
718 _ = cons_out1.next() => {
719 panic!("Unexpected second event received for duplicate fetch");
720 },
721 _ = context.sleep(Duration::from_millis(500)) => {
722 },
724 };
725 });
726 }
727
728 #[test_traced]
732 fn test_changing_peer_sets() {
733 let executor = deterministic::Runner::timed(Duration::from_secs(10));
734 executor.start(|context| async move {
735 let (mut oracle, mut schemes, peers, mut connections) =
736 setup_network_and_peers(&context, &[1, 2, 3]).await;
737
738 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
739 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
740
741 let key1 = Key(1);
742 let key2 = Key(2);
743
744 let mut prod2 = Producer::default();
745 prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
746
747 let mut prod3 = Producer::default();
748 prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
749
750 let coordinator = Coordinator::new(vec![peers[1].clone()]);
752
753 let mut update_sender = coordinator.create_update_channel(context.clone());
755
756 let (cons1, mut cons_out1) = Consumer::new();
757
758 let mut mailbox1 = setup_and_spawn_actor(
759 &context,
760 &coordinator,
761 schemes.remove(0),
762 connections.remove(0),
763 cons1,
764 Producer::default(),
765 )
766 .await;
767
768 let _mailbox2 = setup_and_spawn_actor(
769 &context,
770 &Coordinator::new(vec![peers[0].clone()]),
771 schemes.remove(0),
772 connections.remove(0),
773 Consumer::dummy(),
774 prod2,
775 )
776 .await;
777
778 let _mailbox3 = setup_and_spawn_actor(
779 &context,
780 &Coordinator::new(vec![peers[0].clone()]),
781 schemes.remove(0),
782 connections.remove(0),
783 Consumer::dummy(),
784 prod3,
785 )
786 .await;
787
788 mailbox1.fetch(key1.clone()).await;
790
791 let event = cons_out1.next().await.unwrap();
793 match event {
794 Event::Success(key_actual, value) => {
795 assert_eq!(key_actual, key1);
796 assert_eq!(value, Bytes::from("data from peer 2"));
797 }
798 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
799 }
800
801 update_sender
803 .send(CoordinatorMsg::UpdatePeers(vec![peers[2].clone()]))
804 .await
805 .expect("Failed to send update");
806
807 context.sleep(Duration::from_millis(200)).await;
809
810 mailbox1.fetch(key2.clone()).await;
812
813 let event = cons_out1.next().await.unwrap();
815 match event {
816 Event::Success(key_actual, value) => {
817 assert_eq!(key_actual, key2);
818 assert_eq!(value, Bytes::from("data from peer 3"));
819 }
820 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
821 }
822 });
823 }
824}