commonware_resolver/p2p/
mod.rs

1//! Resolve data identified by a fixed-length key by using the P2P network.
2//!
3//! # Overview
4//!
5//! The `p2p` module enables resolving data by fixed-length keys in a P2P network. Central to the
6//! module is the `peer` actor which manages the fetch-request lifecycle. Its mailbox allows
7//! initiation and cancellation of fetch requests via the `Resolver` interface.
8//!
9//! The peer handles an arbitrarily large number of concurrent fetch requests by sending requests
10//! to other peers and processing their responses. It uses [commonware_p2p::utils::requester] to
11//! select peers based on performance, retrying with another peer if one fails or provides invalid
12//! data. Requests persist until canceled or fulfilled, delivering data to the `Consumer` for
13//! verification.
14//!
15//! The `Consumer` checks data integrity and authenticity (critical in an adversarial environment)
16//! and returns `true` if valid, completing the fetch, or `false` to retry.
17//!
18//! The peer also serves data to other peers, forwarding network requests to the `Producer`. The
19//! `Producer` provides data asynchronously (e.g., from storage). If it fails, the peer sends an
20//! empty response, prompting the requester to retry elsewhere. Each message between peers contains
21//! an ID. Each request is sent with a unique ID, and each response includes the ID of the request
22//! it responds to.
23//!
24//! # Performance Considerations
25//!
26//! The peer supports arbitrarily many concurrent fetch requests, but resource usage generally
27//! depends on the rate-limiting configuration of the `Requester` and of the underlying P2P network.
28
29use bytes::Bytes;
30use commonware_utils::Span;
31use futures::channel::oneshot;
32use std::future::Future;
33
34mod config;
35pub use config::Config;
36mod engine;
37pub use engine::Engine;
38mod fetcher;
39mod ingress;
40pub use ingress::Mailbox;
41mod metrics;
42mod wire;
43
44#[cfg(feature = "mocks")]
45pub mod mocks;
46
47/// Serves data requested by the network.
48pub trait Producer: Clone + Send + 'static {
49    /// Type used to uniquely identify data.
50    type Key: Span;
51
52    /// Serve a request received from the network.
53    fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
54}
55
56#[cfg(test)]
57mod tests {
58    use super::{
59        mocks::{Consumer, Event, Key, Producer},
60        Config, Engine, Mailbox,
61    };
62    use crate::Resolver;
63    use bytes::Bytes;
64    use commonware_cryptography::{
65        ed25519::{PrivateKey, PublicKey},
66        PrivateKeyExt as _, Signer,
67    };
68    use commonware_macros::{select, test_traced};
69    use commonware_p2p::{
70        simulated::{Link, Network, Oracle, Receiver, Sender},
71        Manager,
72    };
73    use commonware_runtime::{deterministic, Clock, Metrics, Runner};
74    use commonware_utils::NZU32;
75    use futures::StreamExt;
76    use std::time::Duration;
77
78    const MAILBOX_SIZE: usize = 1024;
79    const RATE_LIMIT: u32 = 10;
80    const INITIAL_DURATION: Duration = Duration::from_millis(100);
81    const TIMEOUT: Duration = Duration::from_millis(400);
82    const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
83    const LINK: Link = Link {
84        latency: Duration::from_millis(10),
85        jitter: Duration::from_millis(1),
86        success_rate: 1.0,
87    };
88    const LINK_UNRELIABLE: Link = Link {
89        latency: Duration::from_millis(10),
90        jitter: Duration::from_millis(1),
91        success_rate: 0.5,
92    };
93
94    async fn setup_network_and_peers(
95        context: &deterministic::Context,
96        peer_seeds: &[u64],
97    ) -> (
98        Oracle<PublicKey>,
99        Vec<PrivateKey>,
100        Vec<PublicKey>,
101        Vec<(Sender<PublicKey>, Receiver<PublicKey>)>,
102    ) {
103        let (network, mut oracle) = Network::new(
104            context.with_label("network"),
105            commonware_p2p::simulated::Config {
106                max_size: 1024 * 1024,
107                disconnect_on_block: true,
108                tracked_peer_sets: Some(3),
109            },
110        );
111        network.start();
112
113        let schemes: Vec<PrivateKey> = peer_seeds
114            .iter()
115            .map(|seed| PrivateKey::from_seed(*seed))
116            .collect();
117        let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
118        oracle.update(0, peers.clone().into()).await;
119
120        let mut connections = Vec::new();
121        for peer in &peers {
122            let (sender, receiver) = oracle.control(peer.clone()).register(0).await.unwrap();
123            connections.push((sender, receiver));
124        }
125
126        (oracle, schemes, peers, connections)
127    }
128
129    async fn add_link(
130        oracle: &mut Oracle<PublicKey>,
131        link: Link,
132        peers: &[PublicKey],
133        from: usize,
134        to: usize,
135    ) {
136        oracle
137            .add_link(peers[from].clone(), peers[to].clone(), link.clone())
138            .await
139            .unwrap();
140        oracle
141            .add_link(peers[to].clone(), peers[from].clone(), link)
142            .await
143            .unwrap();
144    }
145
146    async fn setup_and_spawn_actor(
147        context: &deterministic::Context,
148        oracle: &Oracle<PublicKey>,
149        signer: impl Signer<PublicKey = PublicKey>,
150        connection: (Sender<PublicKey>, Receiver<PublicKey>),
151        consumer: Consumer<Key, Bytes>,
152        producer: Producer<Key, Bytes>,
153    ) -> Mailbox<Key> {
154        let public_key = signer.public_key();
155        let (engine, mailbox) = Engine::new(
156            context.with_label(&format!("actor_{public_key}")),
157            Config {
158                manager: oracle.clone(),
159                consumer,
160                producer,
161                mailbox_size: MAILBOX_SIZE,
162                requester_config: commonware_p2p::utils::requester::Config {
163                    me: Some(public_key),
164                    rate_limit: governor::Quota::per_second(NZU32!(RATE_LIMIT)),
165                    initial: INITIAL_DURATION,
166                    timeout: TIMEOUT,
167                },
168                fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
169                priority_requests: false,
170                priority_responses: false,
171            },
172        );
173        engine.start(connection);
174
175        mailbox
176    }
177
178    /// Tests that fetching a key from another peer succeeds when data is available.
179    /// This test sets up two peers, where Peer 1 requests data that Peer 2 has,
180    /// and verifies that the data is correctly delivered to Peer 1's consumer.
181    #[test_traced]
182    fn test_fetch_success() {
183        let executor = deterministic::Runner::timed(Duration::from_secs(10));
184        executor.start(|context| async move {
185            let (mut oracle, mut schemes, peers, mut connections) =
186                setup_network_and_peers(&context, &[1, 2]).await;
187
188            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
189
190            let key = Key(2);
191            let mut prod2 = Producer::default();
192            prod2.insert(key.clone(), Bytes::from("data for key 2"));
193
194            let (cons1, mut cons_out1) = Consumer::new();
195
196            let mut mailbox1 = setup_and_spawn_actor(
197                &context,
198                &oracle,
199                schemes.remove(0),
200                connections.remove(0),
201                cons1,
202                Producer::default(),
203            )
204            .await;
205
206            let _mailbox2 = setup_and_spawn_actor(
207                &context,
208                &oracle,
209                schemes.remove(0),
210                connections.remove(0),
211                Consumer::dummy(),
212                prod2,
213            )
214            .await;
215
216            mailbox1.fetch(key.clone()).await;
217
218            let event = cons_out1.next().await.unwrap();
219            match event {
220                Event::Success(key_actual, value) => {
221                    assert_eq!(key_actual, key);
222                    assert_eq!(value, Bytes::from("data for key 2"));
223                }
224                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
225            }
226        });
227    }
228
229    /// Tests that canceling a fetch request results in a failure event.
230    /// This test initiates a fetch request and immediately cancels it,
231    /// verifying that the consumer receives a failure notification instead of data.
232    #[test_traced]
233    fn test_cancel_fetch() {
234        let executor = deterministic::Runner::timed(Duration::from_secs(10));
235        executor.start(|context| async move {
236            let (oracle, mut schemes, _peers, mut connections) =
237                setup_network_and_peers(&context, &[1]).await;
238
239            let (cons1, mut cons_out1) = Consumer::new();
240            let prod1 = Producer::default();
241
242            let mut mailbox1 = setup_and_spawn_actor(
243                &context,
244                &oracle,
245                schemes.remove(0),
246                connections.remove(0),
247                cons1,
248                prod1,
249            )
250            .await;
251
252            let key = Key(3);
253            mailbox1.fetch(key.clone()).await;
254            mailbox1.cancel(key.clone()).await;
255
256            let event = cons_out1.next().await.unwrap();
257            match event {
258                Event::Failed(key_actual) => {
259                    assert_eq!(key_actual, key);
260                }
261                Event::Success(_, _) => panic!("Fetch should have been canceled"),
262            }
263        });
264    }
265
266    /// Tests fetching data from a peer when some peers lack the data.
267    /// This test sets up three peers, where Peer 1 requests data that only Peer 3 has.
268    /// It verifies that the resolver retries with another peer and successfully
269    /// delivers the data to Peer 1's consumer.
270    #[test_traced]
271    fn test_peer_no_data() {
272        let executor = deterministic::Runner::timed(Duration::from_secs(10));
273        executor.start(|context| async move {
274            let (mut oracle, mut schemes, peers, mut connections) =
275                setup_network_and_peers(&context, &[1, 2, 3]).await;
276
277            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
278            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
279
280            let prod1 = Producer::default();
281            let prod2 = Producer::default();
282            let mut prod3 = Producer::default();
283            let key = Key(3);
284            prod3.insert(key.clone(), Bytes::from("data for key 3"));
285
286            let (cons1, mut cons_out1) = Consumer::new();
287
288            let mut mailbox1 = setup_and_spawn_actor(
289                &context,
290                &oracle,
291                schemes.remove(0),
292                connections.remove(0),
293                cons1,
294                prod1,
295            )
296            .await;
297
298            let _mailbox2 = setup_and_spawn_actor(
299                &context,
300                &oracle,
301                schemes.remove(0),
302                connections.remove(0),
303                Consumer::dummy(),
304                prod2,
305            )
306            .await;
307
308            let _mailbox3 = setup_and_spawn_actor(
309                &context,
310                &oracle,
311                schemes.remove(0),
312                connections.remove(0),
313                Consumer::dummy(),
314                prod3,
315            )
316            .await;
317
318            mailbox1.fetch(key.clone()).await;
319
320            let event = cons_out1.next().await.unwrap();
321            match event {
322                Event::Success(key_actual, value) => {
323                    assert_eq!(key_actual, key);
324                    assert_eq!(value, Bytes::from("data for key 3"));
325                }
326                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
327            }
328        });
329    }
330
331    /// Tests fetching when no peers are available.
332    /// This test sets up a single peer with an empty peer provider (no peers).
333    /// It initiates a fetch, waits beyond the retry timeout, cancels the fetch,
334    /// and verifies that the consumer receives a failure notification.
335    #[test_traced]
336    fn test_no_peers_available() {
337        let executor = deterministic::Runner::timed(Duration::from_secs(10));
338        executor.start(|context| async move {
339            let (oracle, mut schemes, _peers, mut connections) =
340                setup_network_and_peers(&context, &[1]).await;
341
342            let (cons1, mut cons_out1) = Consumer::new();
343            let prod1 = Producer::default();
344
345            let mut mailbox1 = setup_and_spawn_actor(
346                &context,
347                &oracle,
348                schemes.remove(0),
349                connections.remove(0),
350                cons1,
351                prod1,
352            )
353            .await;
354
355            let key = Key(4);
356            mailbox1.fetch(key.clone()).await;
357            context.sleep(Duration::from_secs(5)).await;
358            mailbox1.cancel(key.clone()).await;
359
360            let event = cons_out1.next().await.expect("Consumer channel closed");
361            match event {
362                Event::Failed(key_actual) => {
363                    assert_eq!(key_actual, key);
364                }
365                Event::Success(_, _) => {
366                    panic!("Fetch should have failed due to no peers")
367                }
368            }
369        });
370    }
371
372    /// Tests that concurrent fetch requests are handled correctly.
373    /// Also tests that the peer can recover from having no peers available.
374    /// Also tests that the peer can get data from multiple peers that have different sets of data.
375    #[test_traced]
376    fn test_concurrent_fetch_requests() {
377        let executor = deterministic::Runner::timed(Duration::from_secs(60));
378        executor.start(|context| async move {
379            let (mut oracle, mut schemes, peers, mut connections) =
380                setup_network_and_peers(&context, &[1, 2, 3]).await;
381
382            let key2 = Key(2);
383            let key3 = Key(3);
384            let mut prod2 = Producer::default();
385            prod2.insert(key2.clone(), Bytes::from("data for key 2"));
386            let mut prod3 = Producer::default();
387            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
388
389            let (cons1, mut cons_out1) = Consumer::new();
390
391            let mut mailbox1 = setup_and_spawn_actor(
392                &context,
393                &oracle,
394                schemes.remove(0),
395                connections.remove(0),
396                cons1,
397                Producer::default(),
398            )
399            .await;
400
401            let _mailbox2 = setup_and_spawn_actor(
402                &context,
403                &oracle,
404                schemes.remove(0),
405                connections.remove(0),
406                Consumer::dummy(),
407                prod2,
408            )
409            .await;
410
411            let _mailbox3 = setup_and_spawn_actor(
412                &context,
413                &oracle,
414                schemes.remove(0),
415                connections.remove(0),
416                Consumer::dummy(),
417                prod3,
418            )
419            .await;
420
421            // Add choppy links between the requester and the two producers
422            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
423            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
424
425            // Run the fetches multiple times to ensure that the peer tries both of its peers
426            for _ in 0..10 {
427                // Initiate concurrent fetch requests
428                mailbox1.fetch(key2.clone()).await;
429                mailbox1.fetch(key3.clone()).await;
430
431                // Collect both events without assuming order
432                let mut events = Vec::new();
433                events.push(cons_out1.next().await.expect("Consumer channel closed"));
434                events.push(cons_out1.next().await.expect("Consumer channel closed"));
435
436                // Check that both keys were successfully fetched
437                let mut found_key2 = false;
438                let mut found_key3 = false;
439                for event in events {
440                    match event {
441                        Event::Success(key_actual, value) => {
442                            if key_actual == key2 {
443                                assert_eq!(value, Bytes::from("data for key 2"));
444                                found_key2 = true;
445                            } else if key_actual == key3 {
446                                assert_eq!(value, Bytes::from("data for key 3"));
447                                found_key3 = true;
448                            } else {
449                                panic!("Unexpected key received");
450                            }
451                        }
452                        Event::Failed(_) => panic!("Fetch failed unexpectedly"),
453                    }
454                }
455                assert!(found_key2 && found_key3,);
456            }
457        });
458    }
459
460    /// Tests that canceling an inactive fetch request has no effect.
461    /// Cancels a request before, after, and during the fetch process,
462    #[test_traced]
463    fn test_cancel() {
464        let executor = deterministic::Runner::timed(Duration::from_secs(10));
465        executor.start(|context| async move {
466            let (mut oracle, mut schemes, peers, mut connections) =
467                setup_network_and_peers(&context, &[1, 2]).await;
468
469            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
470
471            let key = Key(6);
472            let mut prod2 = Producer::default();
473            prod2.insert(key.clone(), Bytes::from("data for key 6"));
474
475            let (cons1, mut cons_out1) = Consumer::new();
476
477            let mut mailbox1 = setup_and_spawn_actor(
478                &context,
479                &oracle,
480                schemes.remove(0),
481                connections.remove(0),
482                cons1,
483                Producer::default(),
484            )
485            .await;
486
487            let _mailbox2 = setup_and_spawn_actor(
488                &context,
489                &oracle,
490                schemes.remove(0),
491                connections.remove(0),
492                Consumer::dummy(),
493                prod2,
494            )
495            .await;
496
497            // Cancel before sending the fetch request, expecting no effect
498            mailbox1.cancel(key.clone()).await;
499            select! {
500                _ = cons_out1.next() => { panic!("unexpected event"); },
501                _ = context.sleep(Duration::from_millis(100)) => {},
502            };
503
504            // Initiate fetch and wait for data to be delivered
505            mailbox1.fetch(key.clone()).await;
506            let event = cons_out1.next().await.unwrap();
507            match event {
508                Event::Success(key_actual, value) => {
509                    assert_eq!(key_actual, key);
510                    assert_eq!(value, Bytes::from("data for key 6"));
511                }
512                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
513            }
514
515            // Attempt to cancel after data has been delivered, expecting no effect
516            mailbox1.cancel(key.clone()).await;
517            select! {
518                _ = cons_out1.next() => { panic!("unexpected event"); },
519                _ = context.sleep(Duration::from_millis(100)) => {},
520            };
521
522            // Initiate and cancel another fetch request
523            let key = Key(7);
524            mailbox1.fetch(key.clone()).await;
525            mailbox1.cancel(key.clone()).await;
526
527            // Make sure we receive a failure event
528            let event = cons_out1.next().await.unwrap();
529            match event {
530                Event::Failed(key_actual) => {
531                    assert_eq!(key_actual, key);
532                }
533                Event::Success(_, _) => panic!("Fetch should have been canceled"),
534            }
535        });
536    }
537
538    /// Tests that a peer is blocked after delivering invalid data,
539    /// preventing further fetches from that peer.
540    #[test_traced]
541    fn test_blocking_peer() {
542        let executor = deterministic::Runner::timed(Duration::from_secs(10));
543        executor.start(|context| async move {
544            let (mut oracle, mut schemes, peers, mut connections) =
545                setup_network_and_peers(&context, &[1, 2, 3]).await;
546
547            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
548            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
549            add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
550
551            let key_a = Key(1);
552            let key_b = Key(2);
553            let invalid_data_a = Bytes::from("invalid for A");
554            let valid_data_a = Bytes::from("valid for A");
555            let valid_data_b = Bytes::from("valid for B");
556
557            // Set up producers
558            let mut prod2 = Producer::default();
559            prod2.insert(key_a.clone(), invalid_data_a.clone());
560            prod2.insert(key_b.clone(), valid_data_b.clone());
561
562            let mut prod3 = Producer::default();
563            prod3.insert(key_a.clone(), valid_data_a.clone());
564
565            // Set up consumer for Peer1 with expected values
566            let (mut cons1, mut cons_out1) = Consumer::new();
567            cons1.add_expected(key_a.clone(), valid_data_a.clone());
568            cons1.add_expected(key_b.clone(), valid_data_b.clone());
569
570            // Spawn actors
571            let mut mailbox1 = setup_and_spawn_actor(
572                &context,
573                &oracle,
574                schemes.remove(0),
575                connections.remove(0),
576                cons1,
577                Producer::default(),
578            )
579            .await;
580
581            let _mailbox2 = setup_and_spawn_actor(
582                &context,
583                &oracle,
584                schemes.remove(0),
585                connections.remove(0),
586                Consumer::dummy(),
587                prod2,
588            )
589            .await;
590
591            let _mailbox3 = setup_and_spawn_actor(
592                &context,
593                &oracle,
594                schemes.remove(0),
595                connections.remove(0),
596                Consumer::dummy(),
597                prod3,
598            )
599            .await;
600
601            // Fetch keyA multiple times to ensure that Peer2 is blocked.
602            for _ in 0..10 {
603                // Fetch keyA
604                mailbox1.fetch(key_a.clone()).await;
605
606                // Wait for success event for keyA
607                let event = cons_out1.next().await.unwrap();
608                match event {
609                    Event::Success(key_actual, value) => {
610                        assert_eq!(key_actual, key_a);
611                        assert_eq!(value, valid_data_a);
612                    }
613                    Event::Failed(_) => panic!("Fetch failed unexpectedly"),
614                }
615            }
616
617            // Fetch keyB
618            mailbox1.fetch(key_b.clone()).await;
619
620            // Wait for some time (longer than retry timeout)
621            context.sleep(Duration::from_secs(5)).await;
622
623            // Cancel the fetch for keyB
624            mailbox1.cancel(key_b.clone()).await;
625
626            // Wait for failure event for keyB
627            let event = cons_out1.next().await.unwrap();
628            match event {
629                Event::Failed(key_actual) => {
630                    assert_eq!(key_actual, key_b);
631                }
632                Event::Success(_, _) => panic!("Fetch should have been canceled"),
633            }
634        });
635    }
636
637    /// Tests that duplicate fetch requests for the same key are handled properly.
638    /// The test verifies that when the same key is requested multiple times,
639    /// the data is correctly delivered once without errors.
640    #[test_traced]
641    fn test_duplicate_fetch_request() {
642        let executor = deterministic::Runner::timed(Duration::from_secs(10));
643        executor.start(|context| async move {
644            let (mut oracle, mut schemes, peers, mut connections) =
645                setup_network_and_peers(&context, &[1, 2]).await;
646
647            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
648
649            let key = Key(5);
650            let mut prod2 = Producer::default();
651            prod2.insert(key.clone(), Bytes::from("data for key 5"));
652
653            let (cons1, mut cons_out1) = Consumer::new();
654
655            let mut mailbox1 = setup_and_spawn_actor(
656                &context,
657                &oracle,
658                schemes.remove(0),
659                connections.remove(0),
660                cons1,
661                Producer::default(),
662            )
663            .await;
664
665            let _mailbox2 = setup_and_spawn_actor(
666                &context,
667                &oracle,
668                schemes.remove(0),
669                connections.remove(0),
670                Consumer::dummy(),
671                prod2,
672            )
673            .await;
674
675            // Send duplicate fetch requests for the same key
676            mailbox1.fetch(key.clone()).await;
677            mailbox1.fetch(key.clone()).await;
678
679            // Should receive the data only once
680            let event = cons_out1.next().await.unwrap();
681            match event {
682                Event::Success(key_actual, value) => {
683                    assert_eq!(key_actual, key);
684                    assert_eq!(value, Bytes::from("data for key 5"));
685                }
686                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
687            }
688
689            // Make sure we don't receive a second event for the duplicate fetch
690            select! {
691                _ = cons_out1.next() => {
692                    panic!("Unexpected second event received for duplicate fetch");
693                },
694                _ = context.sleep(Duration::from_millis(500)) => {
695                    // This is expected - no additional events should be produced
696                },
697            };
698        });
699    }
700
701    /// Tests that changing peer sets is handled correctly using the update channel.
702    /// This test verifies that when the peer set changes from peer A to peer B,
703    /// the resolver correctly adapts and fetches from the new peer.
704    #[test_traced]
705    fn test_changing_peer_sets() {
706        let executor = deterministic::Runner::timed(Duration::from_secs(10));
707        executor.start(|context| async move {
708            let (mut oracle, mut schemes, peers, mut connections) =
709                setup_network_and_peers(&context, &[1, 2, 3]).await;
710
711            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
712            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
713
714            let key1 = Key(1);
715            let key2 = Key(2);
716
717            let mut prod2 = Producer::default();
718            prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
719
720            let mut prod3 = Producer::default();
721            prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
722
723            let (cons1, mut cons_out1) = Consumer::new();
724
725            let mut mailbox1 = setup_and_spawn_actor(
726                &context,
727                &oracle,
728                schemes.remove(0),
729                connections.remove(0),
730                cons1,
731                Producer::default(),
732            )
733            .await;
734
735            let _mailbox2 = setup_and_spawn_actor(
736                &context,
737                &oracle,
738                schemes.remove(0),
739                connections.remove(0),
740                Consumer::dummy(),
741                prod2,
742            )
743            .await;
744
745            // Fetch key1 from peer 2
746            mailbox1.fetch(key1.clone()).await;
747
748            // Wait for successful fetch
749            let event = cons_out1.next().await.unwrap();
750            match event {
751                Event::Success(key_actual, value) => {
752                    assert_eq!(key_actual, key1);
753                    assert_eq!(value, Bytes::from("data from peer 2"));
754                }
755                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
756            }
757
758            // Change peer set to include peer 3
759            let _mailbox3 = setup_and_spawn_actor(
760                &context,
761                &oracle,
762                schemes.remove(0),
763                connections.remove(0),
764                Consumer::dummy(),
765                prod3,
766            )
767            .await;
768
769            // Need to wait for the peer set change to propagate
770            context.sleep(Duration::from_millis(200)).await;
771
772            // Fetch key2 from peer 3
773            mailbox1.fetch(key2.clone()).await;
774
775            // Wait for successful fetch
776            let event = cons_out1.next().await.unwrap();
777            match event {
778                Event::Success(key_actual, value) => {
779                    assert_eq!(key_actual, key2);
780                    assert_eq!(value, Bytes::from("data from peer 3"));
781                }
782                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
783            }
784        });
785    }
786}