1use bytes::Bytes;
30use commonware_utils::Span;
31use futures::channel::oneshot;
32use std::future::Future;
33
34mod config;
35pub use config::Config;
36mod engine;
37pub use engine::Engine;
38mod fetcher;
39mod ingress;
40pub use ingress::Mailbox;
41mod metrics;
42mod wire;
43
44#[cfg(feature = "mocks")]
45pub mod mocks;
46
47pub trait Producer: Clone + Send + 'static {
49 type Key: Span;
51
52 fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
54}
55
56#[cfg(test)]
57mod tests {
58 use super::{
59 mocks::{Consumer, Event, Key, Producer},
60 Config, Engine, Mailbox,
61 };
62 use crate::Resolver;
63 use bytes::Bytes;
64 use commonware_cryptography::{
65 ed25519::{PrivateKey, PublicKey},
66 PrivateKeyExt as _, Signer,
67 };
68 use commonware_macros::{select, test_traced};
69 use commonware_p2p::{
70 simulated::{Link, Network, Oracle, Receiver, Sender},
71 Manager,
72 };
73 use commonware_runtime::{deterministic, Clock, Metrics, Runner};
74 use commonware_utils::NZU32;
75 use futures::StreamExt;
76 use std::time::Duration;
77
78 const MAILBOX_SIZE: usize = 1024;
79 const RATE_LIMIT: u32 = 10;
80 const INITIAL_DURATION: Duration = Duration::from_millis(100);
81 const TIMEOUT: Duration = Duration::from_millis(400);
82 const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
83 const LINK: Link = Link {
84 latency: Duration::from_millis(10),
85 jitter: Duration::from_millis(1),
86 success_rate: 1.0,
87 };
88 const LINK_UNRELIABLE: Link = Link {
89 latency: Duration::from_millis(10),
90 jitter: Duration::from_millis(1),
91 success_rate: 0.5,
92 };
93
94 async fn setup_network_and_peers(
95 context: &deterministic::Context,
96 peer_seeds: &[u64],
97 ) -> (
98 Oracle<PublicKey>,
99 Vec<PrivateKey>,
100 Vec<PublicKey>,
101 Vec<(Sender<PublicKey>, Receiver<PublicKey>)>,
102 ) {
103 let (network, mut oracle) = Network::new(
104 context.with_label("network"),
105 commonware_p2p::simulated::Config {
106 max_size: 1024 * 1024,
107 disconnect_on_block: true,
108 tracked_peer_sets: Some(3),
109 },
110 );
111 network.start();
112
113 let schemes: Vec<PrivateKey> = peer_seeds
114 .iter()
115 .map(|seed| PrivateKey::from_seed(*seed))
116 .collect();
117 let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
118 oracle.update(0, peers.clone().into()).await;
119
120 let mut connections = Vec::new();
121 for peer in &peers {
122 let (sender, receiver) = oracle.control(peer.clone()).register(0).await.unwrap();
123 connections.push((sender, receiver));
124 }
125
126 (oracle, schemes, peers, connections)
127 }
128
129 async fn add_link(
130 oracle: &mut Oracle<PublicKey>,
131 link: Link,
132 peers: &[PublicKey],
133 from: usize,
134 to: usize,
135 ) {
136 oracle
137 .add_link(peers[from].clone(), peers[to].clone(), link.clone())
138 .await
139 .unwrap();
140 oracle
141 .add_link(peers[to].clone(), peers[from].clone(), link)
142 .await
143 .unwrap();
144 }
145
146 async fn setup_and_spawn_actor(
147 context: &deterministic::Context,
148 oracle: &Oracle<PublicKey>,
149 signer: impl Signer<PublicKey = PublicKey>,
150 connection: (Sender<PublicKey>, Receiver<PublicKey>),
151 consumer: Consumer<Key, Bytes>,
152 producer: Producer<Key, Bytes>,
153 ) -> Mailbox<Key> {
154 let public_key = signer.public_key();
155 let (engine, mailbox) = Engine::new(
156 context.with_label(&format!("actor_{public_key}")),
157 Config {
158 manager: oracle.clone(),
159 consumer,
160 producer,
161 mailbox_size: MAILBOX_SIZE,
162 requester_config: commonware_p2p::utils::requester::Config {
163 me: Some(public_key),
164 rate_limit: governor::Quota::per_second(NZU32!(RATE_LIMIT)),
165 initial: INITIAL_DURATION,
166 timeout: TIMEOUT,
167 },
168 fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
169 priority_requests: false,
170 priority_responses: false,
171 },
172 );
173 engine.start(connection);
174
175 mailbox
176 }
177
178 #[test_traced]
182 fn test_fetch_success() {
183 let executor = deterministic::Runner::timed(Duration::from_secs(10));
184 executor.start(|context| async move {
185 let (mut oracle, mut schemes, peers, mut connections) =
186 setup_network_and_peers(&context, &[1, 2]).await;
187
188 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
189
190 let key = Key(2);
191 let mut prod2 = Producer::default();
192 prod2.insert(key.clone(), Bytes::from("data for key 2"));
193
194 let (cons1, mut cons_out1) = Consumer::new();
195
196 let mut mailbox1 = setup_and_spawn_actor(
197 &context,
198 &oracle,
199 schemes.remove(0),
200 connections.remove(0),
201 cons1,
202 Producer::default(),
203 )
204 .await;
205
206 let _mailbox2 = setup_and_spawn_actor(
207 &context,
208 &oracle,
209 schemes.remove(0),
210 connections.remove(0),
211 Consumer::dummy(),
212 prod2,
213 )
214 .await;
215
216 mailbox1.fetch(key.clone()).await;
217
218 let event = cons_out1.next().await.unwrap();
219 match event {
220 Event::Success(key_actual, value) => {
221 assert_eq!(key_actual, key);
222 assert_eq!(value, Bytes::from("data for key 2"));
223 }
224 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
225 }
226 });
227 }
228
229 #[test_traced]
233 fn test_cancel_fetch() {
234 let executor = deterministic::Runner::timed(Duration::from_secs(10));
235 executor.start(|context| async move {
236 let (oracle, mut schemes, _peers, mut connections) =
237 setup_network_and_peers(&context, &[1]).await;
238
239 let (cons1, mut cons_out1) = Consumer::new();
240 let prod1 = Producer::default();
241
242 let mut mailbox1 = setup_and_spawn_actor(
243 &context,
244 &oracle,
245 schemes.remove(0),
246 connections.remove(0),
247 cons1,
248 prod1,
249 )
250 .await;
251
252 let key = Key(3);
253 mailbox1.fetch(key.clone()).await;
254 mailbox1.cancel(key.clone()).await;
255
256 let event = cons_out1.next().await.unwrap();
257 match event {
258 Event::Failed(key_actual) => {
259 assert_eq!(key_actual, key);
260 }
261 Event::Success(_, _) => panic!("Fetch should have been canceled"),
262 }
263 });
264 }
265
266 #[test_traced]
271 fn test_peer_no_data() {
272 let executor = deterministic::Runner::timed(Duration::from_secs(10));
273 executor.start(|context| async move {
274 let (mut oracle, mut schemes, peers, mut connections) =
275 setup_network_and_peers(&context, &[1, 2, 3]).await;
276
277 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
278 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
279
280 let prod1 = Producer::default();
281 let prod2 = Producer::default();
282 let mut prod3 = Producer::default();
283 let key = Key(3);
284 prod3.insert(key.clone(), Bytes::from("data for key 3"));
285
286 let (cons1, mut cons_out1) = Consumer::new();
287
288 let mut mailbox1 = setup_and_spawn_actor(
289 &context,
290 &oracle,
291 schemes.remove(0),
292 connections.remove(0),
293 cons1,
294 prod1,
295 )
296 .await;
297
298 let _mailbox2 = setup_and_spawn_actor(
299 &context,
300 &oracle,
301 schemes.remove(0),
302 connections.remove(0),
303 Consumer::dummy(),
304 prod2,
305 )
306 .await;
307
308 let _mailbox3 = setup_and_spawn_actor(
309 &context,
310 &oracle,
311 schemes.remove(0),
312 connections.remove(0),
313 Consumer::dummy(),
314 prod3,
315 )
316 .await;
317
318 mailbox1.fetch(key.clone()).await;
319
320 let event = cons_out1.next().await.unwrap();
321 match event {
322 Event::Success(key_actual, value) => {
323 assert_eq!(key_actual, key);
324 assert_eq!(value, Bytes::from("data for key 3"));
325 }
326 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
327 }
328 });
329 }
330
331 #[test_traced]
336 fn test_no_peers_available() {
337 let executor = deterministic::Runner::timed(Duration::from_secs(10));
338 executor.start(|context| async move {
339 let (oracle, mut schemes, _peers, mut connections) =
340 setup_network_and_peers(&context, &[1]).await;
341
342 let (cons1, mut cons_out1) = Consumer::new();
343 let prod1 = Producer::default();
344
345 let mut mailbox1 = setup_and_spawn_actor(
346 &context,
347 &oracle,
348 schemes.remove(0),
349 connections.remove(0),
350 cons1,
351 prod1,
352 )
353 .await;
354
355 let key = Key(4);
356 mailbox1.fetch(key.clone()).await;
357 context.sleep(Duration::from_secs(5)).await;
358 mailbox1.cancel(key.clone()).await;
359
360 let event = cons_out1.next().await.expect("Consumer channel closed");
361 match event {
362 Event::Failed(key_actual) => {
363 assert_eq!(key_actual, key);
364 }
365 Event::Success(_, _) => {
366 panic!("Fetch should have failed due to no peers")
367 }
368 }
369 });
370 }
371
372 #[test_traced]
376 fn test_concurrent_fetch_requests() {
377 let executor = deterministic::Runner::timed(Duration::from_secs(60));
378 executor.start(|context| async move {
379 let (mut oracle, mut schemes, peers, mut connections) =
380 setup_network_and_peers(&context, &[1, 2, 3]).await;
381
382 let key2 = Key(2);
383 let key3 = Key(3);
384 let mut prod2 = Producer::default();
385 prod2.insert(key2.clone(), Bytes::from("data for key 2"));
386 let mut prod3 = Producer::default();
387 prod3.insert(key3.clone(), Bytes::from("data for key 3"));
388
389 let (cons1, mut cons_out1) = Consumer::new();
390
391 let mut mailbox1 = setup_and_spawn_actor(
392 &context,
393 &oracle,
394 schemes.remove(0),
395 connections.remove(0),
396 cons1,
397 Producer::default(),
398 )
399 .await;
400
401 let _mailbox2 = setup_and_spawn_actor(
402 &context,
403 &oracle,
404 schemes.remove(0),
405 connections.remove(0),
406 Consumer::dummy(),
407 prod2,
408 )
409 .await;
410
411 let _mailbox3 = setup_and_spawn_actor(
412 &context,
413 &oracle,
414 schemes.remove(0),
415 connections.remove(0),
416 Consumer::dummy(),
417 prod3,
418 )
419 .await;
420
421 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
423 add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
424
425 for _ in 0..10 {
427 mailbox1.fetch(key2.clone()).await;
429 mailbox1.fetch(key3.clone()).await;
430
431 let mut events = Vec::new();
433 events.push(cons_out1.next().await.expect("Consumer channel closed"));
434 events.push(cons_out1.next().await.expect("Consumer channel closed"));
435
436 let mut found_key2 = false;
438 let mut found_key3 = false;
439 for event in events {
440 match event {
441 Event::Success(key_actual, value) => {
442 if key_actual == key2 {
443 assert_eq!(value, Bytes::from("data for key 2"));
444 found_key2 = true;
445 } else if key_actual == key3 {
446 assert_eq!(value, Bytes::from("data for key 3"));
447 found_key3 = true;
448 } else {
449 panic!("Unexpected key received");
450 }
451 }
452 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
453 }
454 }
455 assert!(found_key2 && found_key3,);
456 }
457 });
458 }
459
460 #[test_traced]
463 fn test_cancel() {
464 let executor = deterministic::Runner::timed(Duration::from_secs(10));
465 executor.start(|context| async move {
466 let (mut oracle, mut schemes, peers, mut connections) =
467 setup_network_and_peers(&context, &[1, 2]).await;
468
469 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
470
471 let key = Key(6);
472 let mut prod2 = Producer::default();
473 prod2.insert(key.clone(), Bytes::from("data for key 6"));
474
475 let (cons1, mut cons_out1) = Consumer::new();
476
477 let mut mailbox1 = setup_and_spawn_actor(
478 &context,
479 &oracle,
480 schemes.remove(0),
481 connections.remove(0),
482 cons1,
483 Producer::default(),
484 )
485 .await;
486
487 let _mailbox2 = setup_and_spawn_actor(
488 &context,
489 &oracle,
490 schemes.remove(0),
491 connections.remove(0),
492 Consumer::dummy(),
493 prod2,
494 )
495 .await;
496
497 mailbox1.cancel(key.clone()).await;
499 select! {
500 _ = cons_out1.next() => { panic!("unexpected event"); },
501 _ = context.sleep(Duration::from_millis(100)) => {},
502 };
503
504 mailbox1.fetch(key.clone()).await;
506 let event = cons_out1.next().await.unwrap();
507 match event {
508 Event::Success(key_actual, value) => {
509 assert_eq!(key_actual, key);
510 assert_eq!(value, Bytes::from("data for key 6"));
511 }
512 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
513 }
514
515 mailbox1.cancel(key.clone()).await;
517 select! {
518 _ = cons_out1.next() => { panic!("unexpected event"); },
519 _ = context.sleep(Duration::from_millis(100)) => {},
520 };
521
522 let key = Key(7);
524 mailbox1.fetch(key.clone()).await;
525 mailbox1.cancel(key.clone()).await;
526
527 let event = cons_out1.next().await.unwrap();
529 match event {
530 Event::Failed(key_actual) => {
531 assert_eq!(key_actual, key);
532 }
533 Event::Success(_, _) => panic!("Fetch should have been canceled"),
534 }
535 });
536 }
537
538 #[test_traced]
541 fn test_blocking_peer() {
542 let executor = deterministic::Runner::timed(Duration::from_secs(10));
543 executor.start(|context| async move {
544 let (mut oracle, mut schemes, peers, mut connections) =
545 setup_network_and_peers(&context, &[1, 2, 3]).await;
546
547 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
548 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
549 add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
550
551 let key_a = Key(1);
552 let key_b = Key(2);
553 let invalid_data_a = Bytes::from("invalid for A");
554 let valid_data_a = Bytes::from("valid for A");
555 let valid_data_b = Bytes::from("valid for B");
556
557 let mut prod2 = Producer::default();
559 prod2.insert(key_a.clone(), invalid_data_a.clone());
560 prod2.insert(key_b.clone(), valid_data_b.clone());
561
562 let mut prod3 = Producer::default();
563 prod3.insert(key_a.clone(), valid_data_a.clone());
564
565 let (mut cons1, mut cons_out1) = Consumer::new();
567 cons1.add_expected(key_a.clone(), valid_data_a.clone());
568 cons1.add_expected(key_b.clone(), valid_data_b.clone());
569
570 let mut mailbox1 = setup_and_spawn_actor(
572 &context,
573 &oracle,
574 schemes.remove(0),
575 connections.remove(0),
576 cons1,
577 Producer::default(),
578 )
579 .await;
580
581 let _mailbox2 = setup_and_spawn_actor(
582 &context,
583 &oracle,
584 schemes.remove(0),
585 connections.remove(0),
586 Consumer::dummy(),
587 prod2,
588 )
589 .await;
590
591 let _mailbox3 = setup_and_spawn_actor(
592 &context,
593 &oracle,
594 schemes.remove(0),
595 connections.remove(0),
596 Consumer::dummy(),
597 prod3,
598 )
599 .await;
600
601 for _ in 0..10 {
603 mailbox1.fetch(key_a.clone()).await;
605
606 let event = cons_out1.next().await.unwrap();
608 match event {
609 Event::Success(key_actual, value) => {
610 assert_eq!(key_actual, key_a);
611 assert_eq!(value, valid_data_a);
612 }
613 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
614 }
615 }
616
617 mailbox1.fetch(key_b.clone()).await;
619
620 context.sleep(Duration::from_secs(5)).await;
622
623 mailbox1.cancel(key_b.clone()).await;
625
626 let event = cons_out1.next().await.unwrap();
628 match event {
629 Event::Failed(key_actual) => {
630 assert_eq!(key_actual, key_b);
631 }
632 Event::Success(_, _) => panic!("Fetch should have been canceled"),
633 }
634 });
635 }
636
637 #[test_traced]
641 fn test_duplicate_fetch_request() {
642 let executor = deterministic::Runner::timed(Duration::from_secs(10));
643 executor.start(|context| async move {
644 let (mut oracle, mut schemes, peers, mut connections) =
645 setup_network_and_peers(&context, &[1, 2]).await;
646
647 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
648
649 let key = Key(5);
650 let mut prod2 = Producer::default();
651 prod2.insert(key.clone(), Bytes::from("data for key 5"));
652
653 let (cons1, mut cons_out1) = Consumer::new();
654
655 let mut mailbox1 = setup_and_spawn_actor(
656 &context,
657 &oracle,
658 schemes.remove(0),
659 connections.remove(0),
660 cons1,
661 Producer::default(),
662 )
663 .await;
664
665 let _mailbox2 = setup_and_spawn_actor(
666 &context,
667 &oracle,
668 schemes.remove(0),
669 connections.remove(0),
670 Consumer::dummy(),
671 prod2,
672 )
673 .await;
674
675 mailbox1.fetch(key.clone()).await;
677 mailbox1.fetch(key.clone()).await;
678
679 let event = cons_out1.next().await.unwrap();
681 match event {
682 Event::Success(key_actual, value) => {
683 assert_eq!(key_actual, key);
684 assert_eq!(value, Bytes::from("data for key 5"));
685 }
686 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
687 }
688
689 select! {
691 _ = cons_out1.next() => {
692 panic!("Unexpected second event received for duplicate fetch");
693 },
694 _ = context.sleep(Duration::from_millis(500)) => {
695 },
697 };
698 });
699 }
700
701 #[test_traced]
705 fn test_changing_peer_sets() {
706 let executor = deterministic::Runner::timed(Duration::from_secs(10));
707 executor.start(|context| async move {
708 let (mut oracle, mut schemes, peers, mut connections) =
709 setup_network_and_peers(&context, &[1, 2, 3]).await;
710
711 add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
712 add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
713
714 let key1 = Key(1);
715 let key2 = Key(2);
716
717 let mut prod2 = Producer::default();
718 prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
719
720 let mut prod3 = Producer::default();
721 prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
722
723 let (cons1, mut cons_out1) = Consumer::new();
724
725 let mut mailbox1 = setup_and_spawn_actor(
726 &context,
727 &oracle,
728 schemes.remove(0),
729 connections.remove(0),
730 cons1,
731 Producer::default(),
732 )
733 .await;
734
735 let _mailbox2 = setup_and_spawn_actor(
736 &context,
737 &oracle,
738 schemes.remove(0),
739 connections.remove(0),
740 Consumer::dummy(),
741 prod2,
742 )
743 .await;
744
745 mailbox1.fetch(key1.clone()).await;
747
748 let event = cons_out1.next().await.unwrap();
750 match event {
751 Event::Success(key_actual, value) => {
752 assert_eq!(key_actual, key1);
753 assert_eq!(value, Bytes::from("data from peer 2"));
754 }
755 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
756 }
757
758 let _mailbox3 = setup_and_spawn_actor(
760 &context,
761 &oracle,
762 schemes.remove(0),
763 connections.remove(0),
764 Consumer::dummy(),
765 prod3,
766 )
767 .await;
768
769 context.sleep(Duration::from_millis(200)).await;
771
772 mailbox1.fetch(key2.clone()).await;
774
775 let event = cons_out1.next().await.unwrap();
777 match event {
778 Event::Success(key_actual, value) => {
779 assert_eq!(key_actual, key2);
780 assert_eq!(value, Bytes::from("data from peer 3"));
781 }
782 Event::Failed(_) => panic!("Fetch failed unexpectedly"),
783 }
784 });
785 }
786}