commonware_resolver/p2p/
mod.rs

1//! Resolve data identified by a fixed-length key by using the P2P network.
2//!
3//! # Overview
4//!
5//! The `p2p` module enables resolving data by fixed-length keys in a P2P network. Central to the
6//! module is the `peer` actor which manages the fetch-request lifecycle. Its mailbox allows
7//! initiation and cancellation of fetch requests via the `Resolver` interface.
8//!
9//! The peer handles an arbitrarily large number of concurrent fetch requests by sending requests
10//! to other peers and processing their responses. It uses [commonware_p2p::utils::requester] to
11//! select peers based on performance, retrying with another peer if one fails or provides invalid
12//! data. Requests persist until canceled or fulfilled, delivering data to the `Consumer` for
13//! verification.
14//!
15//! The `Consumer` checks data integrity and authenticity (critical in an adversarial environment)
16//! and returns `true` if valid, completing the fetch, or `false` to retry.
17//!
18//! The peer also serves data to other peers, forwarding network requests to the `Producer`. The
19//! `Producer` provides data asynchronously (e.g., from storage). If it fails, the peer sends an
20//! empty response, prompting the requester to retry elsewhere. Each message between peers contains
21//! an ID. Each request is sent with a unique ID, and each response includes the ID of the request
22//! it responds to.
23//!
24//! Lastly, the `Coordinator` manages the set of peers that can be used to fetch data.
25//!
26//! # Performance Considerations
27//!
28//! The peer supports arbitrarily many concurrent fetch requests, but resource usage generally
29//! depends on the rate-limiting configuration of the `Requester` and of the underlying P2P network.
30
31use bytes::Bytes;
32use commonware_cryptography::PublicKey;
33use commonware_utils::Array;
34use futures::channel::oneshot;
35use std::future::Future;
36
37mod config;
38pub use config::Config;
39mod engine;
40pub use engine::Engine;
41mod fetcher;
42mod ingress;
43pub use ingress::Mailbox;
44mod metrics;
45mod wire;
46
47#[cfg(test)]
48pub mod mocks;
49
50/// Serves data requested by the network.
51pub trait Producer: Clone + Send + 'static {
52    /// Type used to uniquely identify data.
53    type Key: Array;
54
55    /// Serve a request received from the network.
56    fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
57}
58
59/// Manages the set of peers that can be used to fetch data.
60pub trait Coordinator: Clone + Send + Sync + 'static {
61    /// Type used to uniquely identify peers.
62    type PublicKey: PublicKey;
63
64    /// Returns the current list of peers that can be used to fetch data.
65    ///
66    /// This is also used to filter requests from peers.
67    fn peers(&self) -> &Vec<Self::PublicKey>;
68
69    /// Returns an identifier for the peer set.
70    ///
71    /// Used as a low-overhead way to check if the list of peers has changed, this value must change
72    /// to a novel value whenever the list of peers changes. For example, it could be an
73    /// incrementing counter, or an epoch.
74    fn peer_set_id(&self) -> u64;
75}
76
77#[cfg(test)]
78mod tests {
79    use super::{
80        mocks::{Consumer, Coordinator, CoordinatorMsg, Event, Key, Producer},
81        Config, Engine, Mailbox,
82    };
83    use crate::Resolver;
84    use bytes::Bytes;
85    use commonware_cryptography::{
86        ed25519::{PrivateKey, PublicKey},
87        PrivateKeyExt as _, Signer,
88    };
89    use commonware_macros::{select, test_traced};
90    use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
91    use commonware_runtime::{deterministic, Clock, Metrics, Runner};
92    use commonware_utils::NZU32;
93    use futures::{SinkExt, StreamExt};
94    use std::time::Duration;
95
96    const MAILBOX_SIZE: usize = 1024;
97    const RATE_LIMIT: u32 = 10;
98    const INITIAL_DURATION: Duration = Duration::from_millis(100);
99    const TIMEOUT: Duration = Duration::from_millis(400);
100    const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
101    const LINK: Link = Link {
102        latency: 10.0,
103        jitter: 1.0,
104        success_rate: 1.0,
105    };
106    const LINK_UNRELIABLE: Link = Link {
107        latency: 10.0,
108        jitter: 1.0,
109        success_rate: 0.5,
110    };
111
112    async fn setup_network_and_peers(
113        context: &deterministic::Context,
114        peer_seeds: &[u64],
115    ) -> (
116        Oracle<PublicKey>,
117        Vec<PrivateKey>,
118        Vec<PublicKey>,
119        Vec<(Sender<PublicKey>, Receiver<PublicKey>)>,
120    ) {
121        let (network, mut oracle) = Network::new(
122            context.with_label("network"),
123            commonware_p2p::simulated::Config {
124                max_size: 1024 * 1024,
125            },
126        );
127        network.start();
128
129        let schemes: Vec<PrivateKey> = peer_seeds
130            .iter()
131            .map(|seed| PrivateKey::from_seed(*seed))
132            .collect();
133        let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
134
135        let mut connections = Vec::new();
136        for peer in &peers {
137            let (sender, receiver) = oracle.register(peer.clone(), 0).await.unwrap();
138            connections.push((sender, receiver));
139        }
140
141        (oracle, schemes, peers, connections)
142    }
143
144    async fn add_link(
145        oracle: &mut Oracle<PublicKey>,
146        link: Link,
147        peers: &[PublicKey],
148        from: usize,
149        to: usize,
150    ) {
151        oracle
152            .add_link(peers[from].clone(), peers[to].clone(), link.clone())
153            .await
154            .unwrap();
155        oracle
156            .add_link(peers[to].clone(), peers[from].clone(), link)
157            .await
158            .unwrap();
159    }
160
161    async fn setup_and_spawn_actor(
162        context: &deterministic::Context,
163        coordinator: &Coordinator<PublicKey>,
164        signer: impl Signer<PublicKey = PublicKey>,
165        connection: (Sender<PublicKey>, Receiver<PublicKey>),
166        consumer: Consumer<Key, Bytes>,
167        producer: Producer<Key, Bytes>,
168    ) -> Mailbox<Key> {
169        let public_key = signer.public_key();
170        let (engine, mailbox) = Engine::new(
171            context.with_label(&format!("actor_{public_key}")),
172            Config {
173                coordinator: coordinator.clone(),
174                consumer,
175                producer,
176                mailbox_size: MAILBOX_SIZE,
177                requester_config: commonware_p2p::utils::requester::Config {
178                    public_key,
179                    rate_limit: governor::Quota::per_second(NZU32!(RATE_LIMIT)),
180                    initial: INITIAL_DURATION,
181                    timeout: TIMEOUT,
182                },
183                fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
184                priority_requests: false,
185                priority_responses: false,
186            },
187        );
188        engine.start(connection);
189
190        mailbox
191    }
192
193    /// Tests that fetching a key from another peer succeeds when data is available.
194    /// This test sets up two peers, where Peer 1 requests data that Peer 2 has,
195    /// and verifies that the data is correctly delivered to Peer 1's consumer.
196    #[test_traced]
197    fn test_fetch_success() {
198        let executor = deterministic::Runner::timed(Duration::from_secs(10));
199        executor.start(|context| async move {
200            let (mut oracle, mut schemes, peers, mut connections) =
201                setup_network_and_peers(&context, &[1, 2]).await;
202
203            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
204
205            let key = Key(2);
206            let mut prod2 = Producer::default();
207            prod2.insert(key.clone(), Bytes::from("data for key 2"));
208
209            let coordinator = Coordinator::new(peers);
210            let (cons1, mut cons_out1) = Consumer::new();
211
212            let mut mailbox1 = setup_and_spawn_actor(
213                &context,
214                &coordinator,
215                schemes.remove(0),
216                connections.remove(0),
217                cons1,
218                Producer::default(),
219            )
220            .await;
221
222            let _mailbox2 = setup_and_spawn_actor(
223                &context,
224                &coordinator,
225                schemes.remove(0),
226                connections.remove(0),
227                Consumer::dummy(),
228                prod2,
229            )
230            .await;
231
232            mailbox1.fetch(key.clone()).await;
233
234            let event = cons_out1.next().await.unwrap();
235            match event {
236                Event::Success(key_actual, value) => {
237                    assert_eq!(key_actual, key);
238                    assert_eq!(value, Bytes::from("data for key 2"));
239                }
240                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
241            }
242        });
243    }
244
245    /// Tests that canceling a fetch request results in a failure event.
246    /// This test initiates a fetch request and immediately cancels it,
247    /// verifying that the consumer receives a failure notification instead of data.
248    #[test_traced]
249    fn test_cancel_fetch() {
250        let executor = deterministic::Runner::timed(Duration::from_secs(10));
251        executor.start(|context| async move {
252            let (_oracle, mut schemes, peers, mut connections) =
253                setup_network_and_peers(&context, &[1]).await;
254
255            let coordinator = Coordinator::new(peers);
256            let (cons1, mut cons_out1) = Consumer::new();
257            let prod1 = Producer::default();
258
259            let mut mailbox1 = setup_and_spawn_actor(
260                &context,
261                &coordinator,
262                schemes.remove(0),
263                connections.remove(0),
264                cons1,
265                prod1,
266            )
267            .await;
268
269            let key = Key(3);
270            mailbox1.fetch(key.clone()).await;
271            mailbox1.cancel(key.clone()).await;
272
273            let event = cons_out1.next().await.unwrap();
274            match event {
275                Event::Failed(key_actual) => {
276                    assert_eq!(key_actual, key);
277                }
278                Event::Success(_, _) => panic!("Fetch should have been canceled"),
279            }
280        });
281    }
282
283    /// Tests fetching data from a peer when some peers lack the data.
284    /// This test sets up three peers, where Peer 1 requests data that only Peer 3 has.
285    /// It verifies that the resolver retries with another peer and successfully
286    /// delivers the data to Peer 1's consumer.
287    #[test_traced]
288    fn test_peer_no_data() {
289        let executor = deterministic::Runner::timed(Duration::from_secs(10));
290        executor.start(|context| async move {
291            let (mut oracle, mut schemes, peers, mut connections) =
292                setup_network_and_peers(&context, &[1, 2, 3]).await;
293
294            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
295            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
296
297            let prod1 = Producer::default();
298            let prod2 = Producer::default();
299            let mut prod3 = Producer::default();
300            let key = Key(3);
301            prod3.insert(key.clone(), Bytes::from("data for key 3"));
302
303            let coordinator = Coordinator::new(peers);
304            let (cons1, mut cons_out1) = Consumer::new();
305
306            let mut mailbox1 = setup_and_spawn_actor(
307                &context,
308                &coordinator,
309                schemes.remove(0),
310                connections.remove(0),
311                cons1,
312                prod1,
313            )
314            .await;
315
316            let _mailbox2 = setup_and_spawn_actor(
317                &context,
318                &coordinator,
319                schemes.remove(0),
320                connections.remove(0),
321                Consumer::dummy(),
322                prod2,
323            )
324            .await;
325
326            let _mailbox3 = setup_and_spawn_actor(
327                &context,
328                &coordinator,
329                schemes.remove(0),
330                connections.remove(0),
331                Consumer::dummy(),
332                prod3,
333            )
334            .await;
335
336            mailbox1.fetch(key.clone()).await;
337
338            let event = cons_out1.next().await.unwrap();
339            match event {
340                Event::Success(key_actual, value) => {
341                    assert_eq!(key_actual, key);
342                    assert_eq!(value, Bytes::from("data for key 3"));
343                }
344                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
345            }
346        });
347    }
348
349    /// Tests fetching when no peers are available.
350    /// This test sets up a single peer with an empty coordinator (no peers).
351    /// It initiates a fetch, waits beyond the retry timeout, cancels the fetch,
352    /// and verifies that the consumer receives a failure notification.
353    #[test_traced]
354    fn test_no_peers_available() {
355        let executor = deterministic::Runner::timed(Duration::from_secs(10));
356        executor.start(|context| async move {
357            let (_oracle, mut schemes, _peers, mut connections) =
358                setup_network_and_peers(&context, &[1]).await;
359
360            let coordinator = Coordinator::new(vec![]);
361            let (cons1, mut cons_out1) = Consumer::new();
362            let prod1 = Producer::default();
363
364            let mut mailbox1 = setup_and_spawn_actor(
365                &context,
366                &coordinator,
367                schemes.remove(0),
368                connections.remove(0),
369                cons1,
370                prod1,
371            )
372            .await;
373
374            let key = Key(4);
375            mailbox1.fetch(key.clone()).await;
376            context.sleep(Duration::from_secs(5)).await;
377            mailbox1.cancel(key.clone()).await;
378
379            let event = cons_out1.next().await.expect("Consumer channel closed");
380            match event {
381                Event::Failed(key_actual) => {
382                    assert_eq!(key_actual, key);
383                }
384                Event::Success(_, _) => {
385                    panic!("Fetch should have failed due to no peers")
386                }
387            }
388        });
389    }
390
391    /// Tests that concurrent fetch requests are handled correctly.
392    /// Also tests that the peer can recover from having no peers available.
393    /// Also tests that the peer can get data from multiple peers that have different sets of data.
394    #[test_traced]
395    fn test_concurrent_fetch_requests() {
396        let executor = deterministic::Runner::timed(Duration::from_secs(60));
397        executor.start(|context| async move {
398            let (mut oracle, mut schemes, peers, mut connections) =
399                setup_network_and_peers(&context, &[1, 2, 3]).await;
400
401            let key2 = Key(2);
402            let key3 = Key(3);
403            let mut prod2 = Producer::default();
404            prod2.insert(key2.clone(), Bytes::from("data for key 2"));
405            let mut prod3 = Producer::default();
406            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
407
408            let coordinator = Coordinator::new(peers.clone());
409            let (cons1, mut cons_out1) = Consumer::new();
410
411            let mut mailbox1 = setup_and_spawn_actor(
412                &context,
413                &coordinator,
414                schemes.remove(0),
415                connections.remove(0),
416                cons1,
417                Producer::default(),
418            )
419            .await;
420
421            let _mailbox2 = setup_and_spawn_actor(
422                &context,
423                &coordinator,
424                schemes.remove(0),
425                connections.remove(0),
426                Consumer::dummy(),
427                prod2,
428            )
429            .await;
430
431            let _mailbox3 = setup_and_spawn_actor(
432                &context,
433                &coordinator,
434                schemes.remove(0),
435                connections.remove(0),
436                Consumer::dummy(),
437                prod3,
438            )
439            .await;
440
441            // Add choppy links between the requester and the two producers
442            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
443            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
444
445            // Run the fetches multiple times to ensure that the peer tries both of its peers
446            for _ in 0..10 {
447                // Initiate concurrent fetch requests
448                mailbox1.fetch(key2.clone()).await;
449                mailbox1.fetch(key3.clone()).await;
450
451                // Collect both events without assuming order
452                let mut events = Vec::new();
453                events.push(cons_out1.next().await.expect("Consumer channel closed"));
454                events.push(cons_out1.next().await.expect("Consumer channel closed"));
455
456                // Check that both keys were successfully fetched
457                let mut found_key2 = false;
458                let mut found_key3 = false;
459                for event in events {
460                    match event {
461                        Event::Success(key_actual, value) => {
462                            if key_actual == key2 {
463                                assert_eq!(value, Bytes::from("data for key 2"));
464                                found_key2 = true;
465                            } else if key_actual == key3 {
466                                assert_eq!(value, Bytes::from("data for key 3"));
467                                found_key3 = true;
468                            } else {
469                                panic!("Unexpected key received");
470                            }
471                        }
472                        Event::Failed(_) => panic!("Fetch failed unexpectedly"),
473                    }
474                }
475                assert!(found_key2 && found_key3,);
476            }
477        });
478    }
479
480    /// Tests that canceling an inactive fetch request has no effect.
481    /// Cancels a request before, after, and during the fetch process,
482    #[test_traced]
483    fn test_cancel() {
484        let executor = deterministic::Runner::timed(Duration::from_secs(10));
485        executor.start(|context| async move {
486            let (mut oracle, mut schemes, peers, mut connections) =
487                setup_network_and_peers(&context, &[1, 2]).await;
488
489            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
490
491            let key = Key(6);
492            let mut prod2 = Producer::default();
493            prod2.insert(key.clone(), Bytes::from("data for key 6"));
494
495            let coordinator = Coordinator::new(peers);
496            let (cons1, mut cons_out1) = Consumer::new();
497
498            let mut mailbox1 = setup_and_spawn_actor(
499                &context,
500                &coordinator,
501                schemes.remove(0),
502                connections.remove(0),
503                cons1,
504                Producer::default(),
505            )
506            .await;
507
508            let _mailbox2 = setup_and_spawn_actor(
509                &context,
510                &coordinator,
511                schemes.remove(0),
512                connections.remove(0),
513                Consumer::dummy(),
514                prod2,
515            )
516            .await;
517
518            // Cancel before sending the fetch request, expecting no effect
519            mailbox1.cancel(key.clone()).await;
520            select! {
521                _ = cons_out1.next() => { panic!("unexpected event"); },
522                _ = context.sleep(Duration::from_millis(100)) => {},
523            };
524
525            // Initiate fetch and wait for data to be delivered
526            mailbox1.fetch(key.clone()).await;
527            let event = cons_out1.next().await.unwrap();
528            match event {
529                Event::Success(key_actual, value) => {
530                    assert_eq!(key_actual, key);
531                    assert_eq!(value, Bytes::from("data for key 6"));
532                }
533                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
534            }
535
536            // Attempt to cancel after data has been delivered, expecting no effect
537            mailbox1.cancel(key.clone()).await;
538            select! {
539                _ = cons_out1.next() => { panic!("unexpected event"); },
540                _ = context.sleep(Duration::from_millis(100)) => {},
541            };
542
543            // Initiate and cancel another fetch request
544            let key = Key(7);
545            mailbox1.fetch(key.clone()).await;
546            mailbox1.cancel(key.clone()).await;
547
548            // Make sure we receive a failure event
549            let event = cons_out1.next().await.unwrap();
550            match event {
551                Event::Failed(key_actual) => {
552                    assert_eq!(key_actual, key);
553                }
554                Event::Success(_, _) => panic!("Fetch should have been canceled"),
555            }
556        });
557    }
558
559    /// Tests that a peer is blocked after delivering invalid data,
560    /// preventing further fetches from that peer.
561    #[test_traced]
562    fn test_blocking_peer() {
563        let executor = deterministic::Runner::timed(Duration::from_secs(10));
564        executor.start(|context| async move {
565            let (mut oracle, mut schemes, peers, mut connections) =
566                setup_network_and_peers(&context, &[1, 2, 3]).await;
567
568            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
569            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
570            add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
571
572            let key_a = Key(1);
573            let key_b = Key(2);
574            let invalid_data_a = Bytes::from("invalid for A");
575            let valid_data_a = Bytes::from("valid for A");
576            let valid_data_b = Bytes::from("valid for B");
577
578            // Set up producers
579            let mut prod2 = Producer::default();
580            prod2.insert(key_a.clone(), invalid_data_a.clone());
581            prod2.insert(key_b.clone(), valid_data_b.clone());
582
583            let mut prod3 = Producer::default();
584            prod3.insert(key_a.clone(), valid_data_a.clone());
585
586            // Set up coordinators
587            let coordinator1 = Coordinator::new(vec![peers[1].clone(), peers[2].clone()]);
588            let coordinator2 = Coordinator::new(vec![peers[0].clone()]);
589            let coordinator3 = Coordinator::new(vec![peers[0].clone()]);
590
591            // Set up consumer for Peer1 with expected values
592            let (mut cons1, mut cons_out1) = Consumer::new();
593            cons1.add_expected(key_a.clone(), valid_data_a.clone());
594            cons1.add_expected(key_b.clone(), valid_data_b.clone());
595
596            // Spawn actors
597            let mut mailbox1 = setup_and_spawn_actor(
598                &context,
599                &coordinator1,
600                schemes.remove(0),
601                connections.remove(0),
602                cons1,
603                Producer::default(),
604            )
605            .await;
606
607            let _mailbox2 = setup_and_spawn_actor(
608                &context,
609                &coordinator2,
610                schemes.remove(0),
611                connections.remove(0),
612                Consumer::dummy(),
613                prod2,
614            )
615            .await;
616
617            let _mailbox3 = setup_and_spawn_actor(
618                &context,
619                &coordinator3,
620                schemes.remove(0),
621                connections.remove(0),
622                Consumer::dummy(),
623                prod3,
624            )
625            .await;
626
627            // Fetch keyA multiple times to ensure that Peer2 is blocked.
628            for _ in 0..10 {
629                // Fetch keyA
630                mailbox1.fetch(key_a.clone()).await;
631
632                // Wait for success event for keyA
633                let event = cons_out1.next().await.unwrap();
634                match event {
635                    Event::Success(key_actual, value) => {
636                        assert_eq!(key_actual, key_a);
637                        assert_eq!(value, valid_data_a);
638                    }
639                    Event::Failed(_) => panic!("Fetch failed unexpectedly"),
640                }
641            }
642
643            // Fetch keyB
644            mailbox1.fetch(key_b.clone()).await;
645
646            // Wait for some time (longer than retry timeout)
647            context.sleep(Duration::from_secs(5)).await;
648
649            // Cancel the fetch for keyB
650            mailbox1.cancel(key_b.clone()).await;
651
652            // Wait for failure event for keyB
653            let event = cons_out1.next().await.unwrap();
654            match event {
655                Event::Failed(key_actual) => {
656                    assert_eq!(key_actual, key_b);
657                }
658                Event::Success(_, _) => panic!("Fetch should have been canceled"),
659            }
660        });
661    }
662
663    /// Tests that duplicate fetch requests for the same key are handled properly.
664    /// The test verifies that when the same key is requested multiple times,
665    /// the data is correctly delivered once without errors.
666    #[test_traced]
667    fn test_duplicate_fetch_request() {
668        let executor = deterministic::Runner::timed(Duration::from_secs(10));
669        executor.start(|context| async move {
670            let (mut oracle, mut schemes, peers, mut connections) =
671                setup_network_and_peers(&context, &[1, 2]).await;
672
673            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
674
675            let key = Key(5);
676            let mut prod2 = Producer::default();
677            prod2.insert(key.clone(), Bytes::from("data for key 5"));
678
679            let coordinator = Coordinator::new(peers);
680            let (cons1, mut cons_out1) = Consumer::new();
681
682            let mut mailbox1 = setup_and_spawn_actor(
683                &context,
684                &coordinator,
685                schemes.remove(0),
686                connections.remove(0),
687                cons1,
688                Producer::default(),
689            )
690            .await;
691
692            let _mailbox2 = setup_and_spawn_actor(
693                &context,
694                &coordinator,
695                schemes.remove(0),
696                connections.remove(0),
697                Consumer::dummy(),
698                prod2,
699            )
700            .await;
701
702            // Send duplicate fetch requests for the same key
703            mailbox1.fetch(key.clone()).await;
704            mailbox1.fetch(key.clone()).await;
705
706            // Should receive the data only once
707            let event = cons_out1.next().await.unwrap();
708            match event {
709                Event::Success(key_actual, value) => {
710                    assert_eq!(key_actual, key);
711                    assert_eq!(value, Bytes::from("data for key 5"));
712                }
713                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
714            }
715
716            // Make sure we don't receive a second event for the duplicate fetch
717            select! {
718                _ = cons_out1.next() => {
719                    panic!("Unexpected second event received for duplicate fetch");
720                },
721                _ = context.sleep(Duration::from_millis(500)) => {
722                    // This is expected - no additional events should be produced
723                },
724            };
725        });
726    }
727
728    /// Tests that changing peer sets is handled correctly using the update channel.
729    /// This test verifies that when the peer set changes from peer A to peer B,
730    /// the resolver correctly adapts and fetches from the new peer.
731    #[test_traced]
732    fn test_changing_peer_sets() {
733        let executor = deterministic::Runner::timed(Duration::from_secs(10));
734        executor.start(|context| async move {
735            let (mut oracle, mut schemes, peers, mut connections) =
736                setup_network_and_peers(&context, &[1, 2, 3]).await;
737
738            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
739            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
740
741            let key1 = Key(1);
742            let key2 = Key(2);
743
744            let mut prod2 = Producer::default();
745            prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
746
747            let mut prod3 = Producer::default();
748            prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
749
750            // Create a coordinator with initial peer 2
751            let coordinator = Coordinator::new(vec![peers[1].clone()]);
752
753            // Create an update channel for the coordinator
754            let mut update_sender = coordinator.create_update_channel(context.clone());
755
756            let (cons1, mut cons_out1) = Consumer::new();
757
758            let mut mailbox1 = setup_and_spawn_actor(
759                &context,
760                &coordinator,
761                schemes.remove(0),
762                connections.remove(0),
763                cons1,
764                Producer::default(),
765            )
766            .await;
767
768            let _mailbox2 = setup_and_spawn_actor(
769                &context,
770                &Coordinator::new(vec![peers[0].clone()]),
771                schemes.remove(0),
772                connections.remove(0),
773                Consumer::dummy(),
774                prod2,
775            )
776            .await;
777
778            let _mailbox3 = setup_and_spawn_actor(
779                &context,
780                &Coordinator::new(vec![peers[0].clone()]),
781                schemes.remove(0),
782                connections.remove(0),
783                Consumer::dummy(),
784                prod3,
785            )
786            .await;
787
788            // Fetch key1 from peer 2
789            mailbox1.fetch(key1.clone()).await;
790
791            // Wait for successful fetch
792            let event = cons_out1.next().await.unwrap();
793            match event {
794                Event::Success(key_actual, value) => {
795                    assert_eq!(key_actual, key1);
796                    assert_eq!(value, Bytes::from("data from peer 2"));
797                }
798                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
799            }
800
801            // Change peer set to only include peer 3 via the update channel
802            update_sender
803                .send(CoordinatorMsg::UpdatePeers(vec![peers[2].clone()]))
804                .await
805                .expect("Failed to send update");
806
807            // Need to wait for the peer set change to propagate
808            context.sleep(Duration::from_millis(200)).await;
809
810            // Fetch key2 from peer 3
811            mailbox1.fetch(key2.clone()).await;
812
813            // Wait for successful fetch
814            let event = cons_out1.next().await.unwrap();
815            match event {
816                Event::Success(key_actual, value) => {
817                    assert_eq!(key_actual, key2);
818                    assert_eq!(value, Bytes::from("data from peer 3"));
819                }
820                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
821            }
822        });
823    }
824}