Skip to main content

commonware_resolver/p2p/
mod.rs

1//! Resolve data identified by a fixed-length key by using the P2P network.
2//!
3//! # Overview
4//!
5//! The `p2p` module enables resolving data by fixed-length keys in a P2P network. Central to the
6//! module is the `peer` actor which manages the fetch-request lifecycle. Its mailbox allows
7//! initiation and cancellation of fetch requests via the `Resolver` interface.
8//!
9//! The peer handles an arbitrarily large number of concurrent fetch requests by sending requests
10//! to other peers and processing their responses. It selects peers based on performance, retrying
11//! with another peer if one fails or provides invalid data. Requests persist until canceled or
12//! fulfilled, delivering data to the `Consumer` for verification.
13//!
14//! The `Consumer` checks data integrity and authenticity (critical in an adversarial environment)
15//! and returns `true` if valid, completing the fetch, or `false` to retry.
16//!
17//! The peer also serves data to other peers, forwarding network requests to the `Producer`. The
18//! `Producer` provides data asynchronously (e.g., from storage). If it fails, the peer sends an
19//! empty response, prompting the requester to retry elsewhere. Each message between peers contains
20//! an ID. Each request is sent with a unique ID, and each response includes the ID of the request
21//! it responds to.
22//!
23//! # Targeting
24//!
25//! Callers can restrict fetches to specific target peers using [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted).
26//! Only target peers are tried, there is no automatic fallback to other peers. Targets persist through
27//! transient failures (timeout, "no data" response, send failure) since the peer might be slow or
28//! receive the data later.
29//!
30//! While a fetch is in progress, callers can modify targeting:
31//! - [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted) adds peers to the existing target set
32//!   (only if the fetch already has targets, an "all" fetch remains unrestricted)
33//! - [`Resolver::fetch`](crate::Resolver::fetch) clears all targets, allowing fallback to any peer
34//!
35//! These modifications only apply to in-progress fetches. Once a fetch completes (success, cancel,
36//! or blocked peer), the targets for that key are cleared automatically.
37//!
38//! # Performance Considerations
39//!
40//! The peer supports arbitrarily many concurrent fetch requests, but resource usage generally
41//! depends on the rate-limiting configuration of the underlying P2P network.
42
43use bytes::Bytes;
44use commonware_utils::{channel::oneshot, Span};
45use std::future::Future;
46
47mod config;
48pub use config::Config;
49mod engine;
50pub use engine::Engine;
51mod fetcher;
52mod ingress;
53pub use ingress::Mailbox;
54mod metrics;
55mod wire;
56
57#[cfg(feature = "mocks")]
58pub mod mocks;
59
60/// Serves data requested by the network.
61pub trait Producer: Clone + Send + 'static {
62    /// Type used to uniquely identify data.
63    type Key: Span;
64
65    /// Serve a request received from the network.
66    fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
67}
68
69#[cfg(test)]
70mod tests {
71    use super::{
72        mocks::{Consumer, Event, Key, Producer},
73        Config, Engine, Mailbox,
74    };
75    use crate::Resolver;
76    use bytes::Bytes;
77    use commonware_cryptography::{
78        ed25519::{PrivateKey, PublicKey},
79        Signer,
80    };
81    use commonware_macros::{select, test_traced};
82    use commonware_p2p::{
83        simulated::{Link, Network, Oracle, Receiver, Sender},
84        Blocker, Manager, Provider,
85    };
86    use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
87    use commonware_utils::{non_empty_vec, NZU32};
88    use std::{collections::HashMap, num::NonZeroU32, time::Duration};
89
90    const MAILBOX_SIZE: usize = 1024;
91    const RATE_LIMIT: NonZeroU32 = NZU32!(10);
92    const INITIAL_DURATION: Duration = Duration::from_millis(100);
93    const TIMEOUT: Duration = Duration::from_millis(400);
94    const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
95    const LINK: Link = Link {
96        latency: Duration::from_millis(10),
97        jitter: Duration::from_millis(1),
98        success_rate: 1.0,
99    };
100    const LINK_UNRELIABLE: Link = Link {
101        latency: Duration::from_millis(10),
102        jitter: Duration::from_millis(1),
103        success_rate: 0.5,
104    };
105
106    async fn setup_network_and_peers(
107        context: &deterministic::Context,
108        peer_seeds: &[u64],
109    ) -> (
110        Oracle<PublicKey, deterministic::Context>,
111        Vec<PrivateKey>,
112        Vec<PublicKey>,
113        Vec<(
114            Sender<PublicKey, deterministic::Context>,
115            Receiver<PublicKey>,
116        )>,
117    ) {
118        setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
119            .await
120    }
121
122    async fn setup_network_and_peers_with_rate_limit(
123        context: &deterministic::Context,
124        peer_seeds: &[u64],
125        rate_limit: Quota,
126    ) -> (
127        Oracle<PublicKey, deterministic::Context>,
128        Vec<PrivateKey>,
129        Vec<PublicKey>,
130        Vec<(
131            Sender<PublicKey, deterministic::Context>,
132            Receiver<PublicKey>,
133        )>,
134    ) {
135        let (network, oracle) = Network::new(
136            context.with_label("network"),
137            commonware_p2p::simulated::Config {
138                max_size: 1024 * 1024,
139                disconnect_on_block: true,
140                tracked_peer_sets: Some(3),
141            },
142        );
143        network.start();
144
145        let schemes: Vec<PrivateKey> = peer_seeds
146            .iter()
147            .map(|seed| PrivateKey::from_seed(*seed))
148            .collect();
149        let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
150        let mut manager = oracle.manager();
151        manager.track(0, peers.clone().try_into().unwrap()).await;
152
153        let mut connections = Vec::new();
154        for peer in &peers {
155            let (sender, receiver) = oracle
156                .control(peer.clone())
157                .register(0, rate_limit)
158                .await
159                .unwrap();
160            connections.push((sender, receiver));
161        }
162
163        (oracle, schemes, peers, connections)
164    }
165
166    async fn add_link(
167        oracle: &mut Oracle<PublicKey, deterministic::Context>,
168        link: Link,
169        peers: &[PublicKey],
170        from: usize,
171        to: usize,
172    ) {
173        oracle
174            .add_link(peers[from].clone(), peers[to].clone(), link.clone())
175            .await
176            .unwrap();
177        oracle
178            .add_link(peers[to].clone(), peers[from].clone(), link)
179            .await
180            .unwrap();
181    }
182
183    fn setup_and_spawn_actor(
184        context: &deterministic::Context,
185        provider: impl Provider<PublicKey = PublicKey>,
186        blocker: impl Blocker<PublicKey = PublicKey>,
187        signer: impl Signer<PublicKey = PublicKey>,
188        connection: (
189            Sender<PublicKey, deterministic::Context>,
190            Receiver<PublicKey>,
191        ),
192        consumer: Consumer<Key, Bytes>,
193        producer: Producer<Key, Bytes>,
194    ) -> Mailbox<Key, PublicKey> {
195        let public_key = signer.public_key();
196        let (engine, mailbox) = Engine::new(
197            context.with_label(&format!("actor_{public_key}")),
198            Config {
199                peer_provider: provider,
200                blocker,
201                consumer,
202                producer,
203                mailbox_size: MAILBOX_SIZE,
204                me: Some(public_key),
205                initial: INITIAL_DURATION,
206                timeout: TIMEOUT,
207                fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
208                priority_requests: false,
209                priority_responses: false,
210            },
211        );
212        engine.start(connection);
213
214        mailbox
215    }
216
217    /// Tests that fetching a key from another peer succeeds when data is available.
218    /// This test sets up two peers, where Peer 1 requests data that Peer 2 has,
219    /// and verifies that the data is correctly delivered to Peer 1's consumer.
220    #[test_traced]
221    fn test_fetch_success() {
222        let executor = deterministic::Runner::timed(Duration::from_secs(10));
223        executor.start(|context| async move {
224            let (mut oracle, mut schemes, peers, mut connections) =
225                setup_network_and_peers(&context, &[1, 2]).await;
226
227            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
228
229            let key = Key(2);
230            let mut prod2 = Producer::default();
231            prod2.insert(key.clone(), Bytes::from("data for key 2"));
232
233            let (cons1, mut cons_out1) = Consumer::new();
234
235            let scheme = schemes.remove(0);
236            let mut mailbox1 = setup_and_spawn_actor(
237                &context,
238                oracle.manager(),
239                oracle.control(scheme.public_key()),
240                scheme,
241                connections.remove(0),
242                cons1,
243                Producer::default(),
244            );
245
246            let scheme = schemes.remove(0);
247            let _mailbox2 = setup_and_spawn_actor(
248                &context,
249                oracle.manager(),
250                oracle.control(scheme.public_key()),
251                scheme,
252                connections.remove(0),
253                Consumer::dummy(),
254                prod2,
255            );
256
257            mailbox1.fetch(key.clone()).await;
258
259            let event = cons_out1.recv().await.unwrap();
260            match event {
261                Event::Success(key_actual, value) => {
262                    assert_eq!(key_actual, key);
263                    assert_eq!(value, Bytes::from("data for key 2"));
264                }
265                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
266            }
267        });
268    }
269
270    /// Tests that canceling a fetch request results in a failure event.
271    /// This test initiates a fetch request and immediately cancels it,
272    /// verifying that the consumer receives a failure notification instead of data.
273    #[test_traced]
274    fn test_cancel_fetch() {
275        let executor = deterministic::Runner::timed(Duration::from_secs(10));
276        executor.start(|context| async move {
277            let (oracle, mut schemes, _peers, mut connections) =
278                setup_network_and_peers(&context, &[1]).await;
279
280            let (cons1, mut cons_out1) = Consumer::new();
281            let prod1 = Producer::default();
282
283            let scheme = schemes.remove(0);
284            let mut mailbox1 = setup_and_spawn_actor(
285                &context,
286                oracle.manager(),
287                oracle.control(scheme.public_key()),
288                scheme,
289                connections.remove(0),
290                cons1,
291                prod1,
292            );
293
294            let key = Key(3);
295            mailbox1.fetch(key.clone()).await;
296            mailbox1.cancel(key.clone()).await;
297
298            let event = cons_out1.recv().await.unwrap();
299            match event {
300                Event::Failed(key_actual) => {
301                    assert_eq!(key_actual, key);
302                }
303                Event::Success(_, _) => panic!("Fetch should have been canceled"),
304            }
305        });
306    }
307
308    /// Tests fetching data from a peer when some peers lack the data.
309    /// This test sets up three peers, where Peer 1 requests data that only Peer 3 has.
310    /// It verifies that the resolver retries with another peer and successfully
311    /// delivers the data to Peer 1's consumer.
312    #[test_traced]
313    fn test_peer_no_data() {
314        let executor = deterministic::Runner::timed(Duration::from_secs(10));
315        executor.start(|context| async move {
316            let (mut oracle, mut schemes, peers, mut connections) =
317                setup_network_and_peers(&context, &[1, 2, 3]).await;
318
319            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
320            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
321
322            let prod1 = Producer::default();
323            let prod2 = Producer::default();
324            let mut prod3 = Producer::default();
325            let key = Key(3);
326            prod3.insert(key.clone(), Bytes::from("data for key 3"));
327
328            let (cons1, mut cons_out1) = Consumer::new();
329
330            let scheme = schemes.remove(0);
331            let mut mailbox1 = setup_and_spawn_actor(
332                &context,
333                oracle.manager(),
334                oracle.control(scheme.public_key()),
335                scheme,
336                connections.remove(0),
337                cons1,
338                prod1,
339            );
340
341            let scheme = schemes.remove(0);
342            let _mailbox2 = setup_and_spawn_actor(
343                &context,
344                oracle.manager(),
345                oracle.control(scheme.public_key()),
346                scheme,
347                connections.remove(0),
348                Consumer::dummy(),
349                prod2,
350            );
351
352            let scheme = schemes.remove(0);
353            let _mailbox3 = setup_and_spawn_actor(
354                &context,
355                oracle.manager(),
356                oracle.control(scheme.public_key()),
357                scheme,
358                connections.remove(0),
359                Consumer::dummy(),
360                prod3,
361            );
362
363            mailbox1.fetch(key.clone()).await;
364
365            let event = cons_out1.recv().await.unwrap();
366            match event {
367                Event::Success(key_actual, value) => {
368                    assert_eq!(key_actual, key);
369                    assert_eq!(value, Bytes::from("data for key 3"));
370                }
371                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
372            }
373        });
374    }
375
376    /// Tests fetching when no peers are available.
377    /// This test sets up a single peer with an empty peer provider (no peers).
378    /// It initiates a fetch, waits beyond the retry timeout, cancels the fetch,
379    /// and verifies that the consumer receives a failure notification.
380    #[test_traced]
381    fn test_no_peers_available() {
382        let executor = deterministic::Runner::timed(Duration::from_secs(10));
383        executor.start(|context| async move {
384            let (oracle, mut schemes, _peers, mut connections) =
385                setup_network_and_peers(&context, &[1]).await;
386
387            let (cons1, mut cons_out1) = Consumer::new();
388            let prod1 = Producer::default();
389
390            let scheme = schemes.remove(0);
391            let mut mailbox1 = setup_and_spawn_actor(
392                &context,
393                oracle.manager(),
394                oracle.control(scheme.public_key()),
395                scheme,
396                connections.remove(0),
397                cons1,
398                prod1,
399            );
400
401            let key = Key(4);
402            mailbox1.fetch(key.clone()).await;
403            context.sleep(Duration::from_secs(5)).await;
404            mailbox1.cancel(key.clone()).await;
405
406            let event = cons_out1.recv().await.expect("Consumer channel closed");
407            match event {
408                Event::Failed(key_actual) => {
409                    assert_eq!(key_actual, key);
410                }
411                Event::Success(_, _) => {
412                    panic!("Fetch should have failed due to no peers")
413                }
414            }
415        });
416    }
417
418    /// Tests that concurrent fetch requests are handled correctly.
419    /// Also tests that the peer can recover from having no peers available.
420    /// Also tests that the peer can get data from multiple peers that have different sets of data.
421    #[test_traced]
422    fn test_concurrent_fetch_requests() {
423        let executor = deterministic::Runner::timed(Duration::from_secs(60));
424        executor.start(|context| async move {
425            let (mut oracle, mut schemes, peers, mut connections) =
426                setup_network_and_peers(&context, &[1, 2, 3]).await;
427
428            let key2 = Key(2);
429            let key3 = Key(3);
430            let mut prod2 = Producer::default();
431            prod2.insert(key2.clone(), Bytes::from("data for key 2"));
432            let mut prod3 = Producer::default();
433            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
434
435            let (cons1, mut cons_out1) = Consumer::new();
436
437            let scheme = schemes.remove(0);
438            let mut mailbox1 = setup_and_spawn_actor(
439                &context,
440                oracle.manager(),
441                oracle.control(scheme.public_key()),
442                scheme,
443                connections.remove(0),
444                cons1,
445                Producer::default(),
446            );
447
448            let scheme = schemes.remove(0);
449            let _mailbox2 = setup_and_spawn_actor(
450                &context,
451                oracle.manager(),
452                oracle.control(scheme.public_key()),
453                scheme,
454                connections.remove(0),
455                Consumer::dummy(),
456                prod2,
457            );
458
459            let scheme = schemes.remove(0);
460            let _mailbox3 = setup_and_spawn_actor(
461                &context,
462                oracle.manager(),
463                oracle.control(scheme.public_key()),
464                scheme,
465                connections.remove(0),
466                Consumer::dummy(),
467                prod3,
468            );
469
470            // Add choppy links between the requester and the two producers
471            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
472            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
473
474            // Run the fetches multiple times to ensure that the peer tries both of its peers
475            for _ in 0..10 {
476                // Initiate concurrent fetch requests
477                mailbox1.fetch(key2.clone()).await;
478                mailbox1.fetch(key3.clone()).await;
479
480                // Collect both events without assuming order
481                let mut events = Vec::new();
482                events.push(cons_out1.recv().await.expect("Consumer channel closed"));
483                events.push(cons_out1.recv().await.expect("Consumer channel closed"));
484
485                // Check that both keys were successfully fetched
486                let mut found_key2 = false;
487                let mut found_key3 = false;
488                for event in events {
489                    match event {
490                        Event::Success(key_actual, value) => {
491                            if key_actual == key2 {
492                                assert_eq!(value, Bytes::from("data for key 2"));
493                                found_key2 = true;
494                            } else if key_actual == key3 {
495                                assert_eq!(value, Bytes::from("data for key 3"));
496                                found_key3 = true;
497                            } else {
498                                panic!("Unexpected key received");
499                            }
500                        }
501                        Event::Failed(_) => panic!("Fetch failed unexpectedly"),
502                    }
503                }
504                assert!(found_key2 && found_key3,);
505            }
506        });
507    }
508
509    /// Tests that canceling an inactive fetch request has no effect.
510    /// Cancels a request before, after, and during the fetch process,
511    #[test_traced]
512    fn test_cancel() {
513        let executor = deterministic::Runner::timed(Duration::from_secs(10));
514        executor.start(|context| async move {
515            let (mut oracle, mut schemes, peers, mut connections) =
516                setup_network_and_peers(&context, &[1, 2]).await;
517
518            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
519
520            let key = Key(6);
521            let mut prod2 = Producer::default();
522            prod2.insert(key.clone(), Bytes::from("data for key 6"));
523
524            let (cons1, mut cons_out1) = Consumer::new();
525
526            let scheme = schemes.remove(0);
527            let mut mailbox1 = setup_and_spawn_actor(
528                &context,
529                oracle.manager(),
530                oracle.control(scheme.public_key()),
531                scheme,
532                connections.remove(0),
533                cons1,
534                Producer::default(),
535            );
536
537            let scheme = schemes.remove(0);
538            let _mailbox2 = setup_and_spawn_actor(
539                &context,
540                oracle.manager(),
541                oracle.control(scheme.public_key()),
542                scheme,
543                connections.remove(0),
544                Consumer::dummy(),
545                prod2,
546            );
547
548            // Cancel before sending the fetch request, expecting no effect
549            mailbox1.cancel(key.clone()).await;
550            select! {
551                _ = cons_out1.recv() => {
552                    panic!("unexpected event");
553                },
554                _ = context.sleep(Duration::from_millis(100)) => {},
555            };
556
557            // Initiate fetch and wait for data to be delivered
558            mailbox1.fetch(key.clone()).await;
559            let event = cons_out1.recv().await.unwrap();
560            match event {
561                Event::Success(key_actual, value) => {
562                    assert_eq!(key_actual, key);
563                    assert_eq!(value, Bytes::from("data for key 6"));
564                }
565                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
566            }
567
568            // Attempt to cancel after data has been delivered, expecting no effect
569            mailbox1.cancel(key.clone()).await;
570            select! {
571                _ = cons_out1.recv() => {
572                    panic!("unexpected event");
573                },
574                _ = context.sleep(Duration::from_millis(100)) => {},
575            };
576
577            // Initiate and cancel another fetch request
578            let key = Key(7);
579            mailbox1.fetch(key.clone()).await;
580            mailbox1.cancel(key.clone()).await;
581
582            // Make sure we receive a failure event
583            let event = cons_out1.recv().await.unwrap();
584            match event {
585                Event::Failed(key_actual) => {
586                    assert_eq!(key_actual, key);
587                }
588                Event::Success(_, _) => panic!("Fetch should have been canceled"),
589            }
590        });
591    }
592
593    /// Tests that a peer is blocked after delivering invalid data,
594    /// preventing further fetches from that peer.
595    #[test_traced]
596    fn test_blocking_peer() {
597        let executor = deterministic::Runner::timed(Duration::from_secs(10));
598        executor.start(|context| async move {
599            let (mut oracle, mut schemes, peers, mut connections) =
600                setup_network_and_peers(&context, &[1, 2, 3]).await;
601
602            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
603            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
604            add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
605
606            let key_a = Key(1);
607            let key_b = Key(2);
608            let invalid_data_a = Bytes::from("invalid for A");
609            let valid_data_a = Bytes::from("valid for A");
610            let valid_data_b = Bytes::from("valid for B");
611
612            // Set up producers
613            let mut prod2 = Producer::default();
614            prod2.insert(key_a.clone(), invalid_data_a.clone());
615            prod2.insert(key_b.clone(), valid_data_b.clone());
616
617            let mut prod3 = Producer::default();
618            prod3.insert(key_a.clone(), valid_data_a.clone());
619
620            // Set up consumer for Peer1 with expected values
621            let (mut cons1, mut cons_out1) = Consumer::new();
622            cons1.add_expected(key_a.clone(), valid_data_a.clone());
623            cons1.add_expected(key_b.clone(), valid_data_b.clone());
624
625            // Spawn actors
626            let scheme = schemes.remove(0);
627            let mut mailbox1 = setup_and_spawn_actor(
628                &context,
629                oracle.manager(),
630                oracle.control(scheme.public_key()),
631                scheme,
632                connections.remove(0),
633                cons1,
634                Producer::default(),
635            );
636
637            let scheme = schemes.remove(0);
638            let _mailbox2 = setup_and_spawn_actor(
639                &context,
640                oracle.manager(),
641                oracle.control(scheme.public_key()),
642                scheme,
643                connections.remove(0),
644                Consumer::dummy(),
645                prod2,
646            );
647
648            let scheme = schemes.remove(0);
649            let _mailbox3 = setup_and_spawn_actor(
650                &context,
651                oracle.manager(),
652                oracle.control(scheme.public_key()),
653                scheme,
654                connections.remove(0),
655                Consumer::dummy(),
656                prod3,
657            );
658
659            // Fetch keyA multiple times to ensure that Peer2 is blocked.
660            for _ in 0..20 {
661                // Fetch keyA
662                mailbox1.fetch(key_a.clone()).await;
663
664                // Wait for success event for keyA
665                let event = cons_out1.recv().await.unwrap();
666                match event {
667                    Event::Success(key_actual, value) => {
668                        assert_eq!(key_actual, key_a);
669                        assert_eq!(value, valid_data_a);
670                    }
671                    Event::Failed(_) => panic!("Fetch failed unexpectedly"),
672                }
673            }
674
675            // Fetch keyB
676            mailbox1.fetch(key_b.clone()).await;
677
678            // Wait for some time (longer than retry timeout)
679            context.sleep(Duration::from_secs(5)).await;
680
681            // Cancel the fetch for keyB
682            mailbox1.cancel(key_b.clone()).await;
683
684            // Wait for failure event for keyB
685            let event = cons_out1.recv().await.unwrap();
686            match event {
687                Event::Failed(key_actual) => {
688                    assert_eq!(key_actual, key_b);
689                }
690                Event::Success(_, _) => panic!("Fetch should have been canceled"),
691            }
692
693            // Check oracle
694            let blocked = oracle.blocked().await.unwrap();
695            assert_eq!(blocked.len(), 1);
696            assert_eq!(blocked[0].0, peers[0]);
697            assert_eq!(blocked[0].1, peers[1]);
698        });
699    }
700
701    /// Tests that duplicate fetch requests for the same key are handled properly.
702    /// The test verifies that when the same key is requested multiple times,
703    /// the data is correctly delivered once without errors.
704    #[test_traced]
705    fn test_duplicate_fetch_request() {
706        let executor = deterministic::Runner::timed(Duration::from_secs(10));
707        executor.start(|context| async move {
708            let (mut oracle, mut schemes, peers, mut connections) =
709                setup_network_and_peers(&context, &[1, 2]).await;
710
711            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
712
713            let key = Key(5);
714            let mut prod2 = Producer::default();
715            prod2.insert(key.clone(), Bytes::from("data for key 5"));
716
717            let (cons1, mut cons_out1) = Consumer::new();
718
719            let scheme = schemes.remove(0);
720            let mut mailbox1 = setup_and_spawn_actor(
721                &context,
722                oracle.manager(),
723                oracle.control(scheme.public_key()),
724                scheme,
725                connections.remove(0),
726                cons1,
727                Producer::default(),
728            );
729
730            let scheme = schemes.remove(0);
731            let _mailbox2 = setup_and_spawn_actor(
732                &context,
733                oracle.manager(),
734                oracle.control(scheme.public_key()),
735                scheme,
736                connections.remove(0),
737                Consumer::dummy(),
738                prod2,
739            );
740
741            // Send duplicate fetch requests for the same key
742            mailbox1.fetch(key.clone()).await;
743            mailbox1.fetch(key.clone()).await;
744
745            // Should receive the data only once
746            let event = cons_out1.recv().await.unwrap();
747            match event {
748                Event::Success(key_actual, value) => {
749                    assert_eq!(key_actual, key);
750                    assert_eq!(value, Bytes::from("data for key 5"));
751                }
752                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
753            }
754
755            // Make sure we don't receive a second event for the duplicate fetch
756            select! {
757                _ = cons_out1.recv() => {
758                    panic!("Unexpected second event received for duplicate fetch");
759                },
760                _ = context.sleep(Duration::from_millis(500)) => {
761                    // This is expected - no additional events should be produced
762                },
763            };
764        });
765    }
766
767    /// Tests that changing peer sets is handled correctly using the update channel.
768    /// This test verifies that when the peer set changes from peer A to peer B,
769    /// the resolver correctly adapts and fetches from the new peer.
770    #[test_traced]
771    fn test_changing_peer_sets() {
772        let executor = deterministic::Runner::timed(Duration::from_secs(10));
773        executor.start(|context| async move {
774            let (mut oracle, mut schemes, peers, mut connections) =
775                setup_network_and_peers(&context, &[1, 2, 3]).await;
776
777            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
778            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
779
780            let key1 = Key(1);
781            let key2 = Key(2);
782
783            let mut prod2 = Producer::default();
784            prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
785
786            let mut prod3 = Producer::default();
787            prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
788
789            let (cons1, mut cons_out1) = Consumer::new();
790
791            let scheme = schemes.remove(0);
792            let mut mailbox1 = setup_and_spawn_actor(
793                &context,
794                oracle.manager(),
795                oracle.control(scheme.public_key()),
796                scheme,
797                connections.remove(0),
798                cons1,
799                Producer::default(),
800            );
801
802            let scheme = schemes.remove(0);
803            let _mailbox2 = setup_and_spawn_actor(
804                &context,
805                oracle.manager(),
806                oracle.control(scheme.public_key()),
807                scheme,
808                connections.remove(0),
809                Consumer::dummy(),
810                prod2,
811            );
812
813            // Fetch key1 from peer 2
814            mailbox1.fetch(key1.clone()).await;
815
816            // Wait for successful fetch
817            let event = cons_out1.recv().await.unwrap();
818            match event {
819                Event::Success(key_actual, value) => {
820                    assert_eq!(key_actual, key1);
821                    assert_eq!(value, Bytes::from("data from peer 2"));
822                }
823                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
824            }
825
826            // Change peer set to include peer 3
827            let scheme = schemes.remove(0);
828            let _mailbox3 = setup_and_spawn_actor(
829                &context,
830                oracle.manager(),
831                oracle.control(scheme.public_key()),
832                scheme,
833                connections.remove(0),
834                Consumer::dummy(),
835                prod3,
836            );
837
838            // Need to wait for the peer set change to propagate
839            context.sleep(Duration::from_millis(200)).await;
840
841            // Fetch key2 from peer 3
842            mailbox1.fetch(key2.clone()).await;
843
844            // Wait for successful fetch
845            let event = cons_out1.recv().await.unwrap();
846            match event {
847                Event::Success(key_actual, value) => {
848                    assert_eq!(key_actual, key2);
849                    assert_eq!(value, Bytes::from("data from peer 3"));
850                }
851                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
852            }
853        });
854    }
855
856    #[test_traced]
857    fn test_fetch_targeted() {
858        let executor = deterministic::Runner::timed(Duration::from_secs(10));
859        executor.start(|context| async move {
860            let (mut oracle, mut schemes, peers, mut connections) =
861                setup_network_and_peers(&context, &[1, 2, 3]).await;
862
863            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
864            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
865
866            let key = Key(1);
867            let invalid_data = Bytes::from("invalid data");
868            let valid_data = Bytes::from("valid data");
869
870            // Peer 2 has invalid data, peer 3 has valid data
871            let mut prod2 = Producer::default();
872            prod2.insert(key.clone(), invalid_data.clone());
873
874            let mut prod3 = Producer::default();
875            prod3.insert(key.clone(), valid_data.clone());
876
877            // Consumer expects only valid_data
878            let (mut cons1, mut cons_out1) = Consumer::new();
879            cons1.add_expected(key.clone(), valid_data.clone());
880
881            let scheme = schemes.remove(0);
882            let mut mailbox1 = setup_and_spawn_actor(
883                &context,
884                oracle.manager(),
885                oracle.control(scheme.public_key()),
886                scheme,
887                connections.remove(0),
888                cons1,
889                Producer::default(),
890            );
891
892            let scheme = schemes.remove(0);
893            let _mailbox2 = setup_and_spawn_actor(
894                &context,
895                oracle.manager(),
896                oracle.control(scheme.public_key()),
897                scheme,
898                connections.remove(0),
899                Consumer::dummy(),
900                prod2,
901            );
902
903            let scheme = schemes.remove(0);
904            let _mailbox3 = setup_and_spawn_actor(
905                &context,
906                oracle.manager(),
907                oracle.control(scheme.public_key()),
908                scheme,
909                connections.remove(0),
910                Consumer::dummy(),
911                prod3,
912            );
913
914            // Wait for peer set to be established
915            context.sleep(Duration::from_millis(100)).await;
916
917            // Start fetch with targets for both peer 2 (invalid data) and peer 3 (valid data)
918            // When peer 2 returns invalid data, only peer 2 should be removed from targets
919            // Peer 3 should still be tried as a target and succeed
920            mailbox1
921                .fetch_targeted(
922                    key.clone(),
923                    non_empty_vec![peers[1].clone(), peers[2].clone()],
924                )
925                .await;
926
927            // Should eventually succeed from peer 3
928            let event = cons_out1.recv().await.unwrap();
929            match event {
930                Event::Success(key_actual, value) => {
931                    assert_eq!(key_actual, key);
932                    assert_eq!(value, valid_data);
933                }
934                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
935            }
936
937            // Verify peer 2 was blocked (sent invalid data)
938            let blocked = oracle.blocked().await.unwrap();
939            assert_eq!(blocked.len(), 1);
940            assert_eq!(blocked[0].0, peers[0]);
941            assert_eq!(blocked[0].1, peers[1]);
942
943            // Verify metrics: 1 successful fetch (from peer 3 after peer 2 was blocked)
944            let metrics = context.encode();
945            assert!(metrics.contains("_fetch_total{status=\"Success\"} 1"));
946        });
947    }
948
949    #[test_traced]
950    fn test_fetch_targeted_no_fallback() {
951        let executor = deterministic::Runner::timed(Duration::from_secs(10));
952        executor.start(|context| async move {
953            let (mut oracle, mut schemes, peers, mut connections) =
954                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
955
956            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
957            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
958            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
959
960            let key = Key(1);
961
962            // Only peer 4 has the data, peers 2 and 3 don't
963            let mut prod4 = Producer::default();
964            prod4.insert(key.clone(), Bytes::from("data from peer 4"));
965
966            let (cons1, mut cons_out1) = Consumer::new();
967
968            let scheme = schemes.remove(0);
969            let mut mailbox1 = setup_and_spawn_actor(
970                &context,
971                oracle.manager(),
972                oracle.control(scheme.public_key()),
973                scheme,
974                connections.remove(0),
975                cons1,
976                Producer::default(),
977            );
978
979            let scheme = schemes.remove(0);
980            let _mailbox2 = setup_and_spawn_actor(
981                &context,
982                oracle.manager(),
983                oracle.control(scheme.public_key()),
984                scheme,
985                connections.remove(0),
986                Consumer::dummy(),
987                Producer::default(), // no data
988            );
989
990            let scheme = schemes.remove(0);
991            let _mailbox3 = setup_and_spawn_actor(
992                &context,
993                oracle.manager(),
994                oracle.control(scheme.public_key()),
995                scheme,
996                connections.remove(0),
997                Consumer::dummy(),
998                Producer::default(), // no data
999            );
1000
1001            let scheme = schemes.remove(0);
1002            let _mailbox4 = setup_and_spawn_actor(
1003                &context,
1004                oracle.manager(),
1005                oracle.control(scheme.public_key()),
1006                scheme,
1007                connections.remove(0),
1008                Consumer::dummy(),
1009                prod4,
1010            );
1011
1012            // Wait for peer set to be established
1013            context.sleep(Duration::from_millis(100)).await;
1014
1015            // Start fetch with targets for peers 2 and 3 (both don't have data)
1016            // Peer 4 has data but is NOT a target - it should NEVER be tried
1017            mailbox1
1018                .fetch_targeted(
1019                    key.clone(),
1020                    non_empty_vec![peers[1].clone(), peers[2].clone()],
1021                )
1022                .await;
1023
1024            // Wait enough time for targets to fail and retry multiple times
1025            // The fetch should not succeed because peer 4 (which has data) is not targeted
1026            select! {
1027                event = cons_out1.recv() => {
1028                    panic!("Fetch should not succeed, but got: {event:?}");
1029                },
1030                _ = context.sleep(Duration::from_secs(3)) => {
1031                    // Expected: no success event because peer 4 is not targeted
1032                },
1033            };
1034        });
1035    }
1036
1037    #[test_traced]
1038    fn test_fetch_all_targeted() {
1039        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1040        executor.start(|context| async move {
1041            let (mut oracle, mut schemes, peers, mut connections) =
1042                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1043
1044            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1045            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1046            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1047
1048            let key1 = Key(1);
1049            let key2 = Key(2);
1050            let key3 = Key(3);
1051
1052            // Peer 2 has key1
1053            let mut prod2 = Producer::default();
1054            prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1055
1056            // Peer 3 has key3
1057            let mut prod3 = Producer::default();
1058            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1059
1060            // Peer 4 has key2
1061            let mut prod4 = Producer::default();
1062            prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1063
1064            // Consumer expects all three keys
1065            let (mut cons1, mut cons_out1) = Consumer::new();
1066            cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1067            cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1068            cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1069
1070            let scheme = schemes.remove(0);
1071            let mut mailbox1 = setup_and_spawn_actor(
1072                &context,
1073                oracle.manager(),
1074                oracle.control(scheme.public_key()),
1075                scheme,
1076                connections.remove(0),
1077                cons1,
1078                Producer::default(),
1079            );
1080
1081            let scheme = schemes.remove(0);
1082            let _mailbox2 = setup_and_spawn_actor(
1083                &context,
1084                oracle.manager(),
1085                oracle.control(scheme.public_key()),
1086                scheme,
1087                connections.remove(0),
1088                Consumer::dummy(),
1089                prod2,
1090            );
1091
1092            let scheme = schemes.remove(0);
1093            let _mailbox3 = setup_and_spawn_actor(
1094                &context,
1095                oracle.manager(),
1096                oracle.control(scheme.public_key()),
1097                scheme,
1098                connections.remove(0),
1099                Consumer::dummy(),
1100                prod3,
1101            );
1102
1103            let scheme = schemes.remove(0);
1104            let _mailbox4 = setup_and_spawn_actor(
1105                &context,
1106                oracle.manager(),
1107                oracle.control(scheme.public_key()),
1108                scheme,
1109                connections.remove(0),
1110                Consumer::dummy(),
1111                prod4,
1112            );
1113
1114            // Wait for peer set to be established
1115            context.sleep(Duration::from_millis(100)).await;
1116
1117            // Fetch keys with mixed targeting:
1118            // - key1 targeted to peer 2 (has data) -> should succeed from target
1119            // - key2 targeted to peer 4 (has data) -> should succeed from target
1120            // - key3 no targeting -> fetched from any peer (peer 3 has it)
1121            mailbox1
1122                .fetch_all_targeted(vec![
1123                    (key1.clone(), non_empty_vec![peers[1].clone()]), // peer 2 has key1
1124                    (key2.clone(), non_empty_vec![peers[3].clone()]), // peer 4 has key2
1125                ])
1126                .await;
1127            mailbox1.fetch(key3.clone()).await; // no targeting for key3
1128
1129            // Collect all three events
1130            let mut results = HashMap::new();
1131            for _ in 0..3 {
1132                let event = cons_out1.recv().await.unwrap();
1133                match event {
1134                    Event::Success(key, value) => {
1135                        results.insert(key, value);
1136                    }
1137                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1138                }
1139            }
1140
1141            // Verify all keys received correct data
1142            assert_eq!(results.len(), 3);
1143            assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1144            assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1145            assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1146
1147            // Verify metrics: 3 successful fetches
1148            let metrics = context.encode();
1149            assert!(metrics.contains("_fetch_total{status=\"Success\"} 3"));
1150        });
1151    }
1152
1153    /// Tests that calling fetch() on an in-progress targeted fetch clears the targets,
1154    /// allowing the fetch to succeed from any available peer.
1155    #[test_traced]
1156    fn test_fetch_clears_targets() {
1157        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1158        executor.start(|context| async move {
1159            let (mut oracle, mut schemes, peers, mut connections) =
1160                setup_network_and_peers(&context, &[1, 2, 3]).await;
1161
1162            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1163            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1164
1165            let key = Key(1);
1166            let valid_data = Bytes::from("valid data");
1167
1168            // Peer 2 has no data, peer 3 has the data
1169            let mut prod3 = Producer::default();
1170            prod3.insert(key.clone(), valid_data.clone());
1171
1172            let (cons1, mut cons_out1) = Consumer::new();
1173
1174            let scheme = schemes.remove(0);
1175            let mut mailbox1 = setup_and_spawn_actor(
1176                &context,
1177                oracle.manager(),
1178                oracle.control(scheme.public_key()),
1179                scheme,
1180                connections.remove(0),
1181                cons1,
1182                Producer::default(),
1183            );
1184
1185            let scheme = schemes.remove(0);
1186            let _mailbox2 = setup_and_spawn_actor(
1187                &context,
1188                oracle.manager(),
1189                oracle.control(scheme.public_key()),
1190                scheme,
1191                connections.remove(0),
1192                Consumer::dummy(),
1193                Producer::default(), // no data
1194            );
1195
1196            let scheme = schemes.remove(0);
1197            let _mailbox3 = setup_and_spawn_actor(
1198                &context,
1199                oracle.manager(),
1200                oracle.control(scheme.public_key()),
1201                scheme,
1202                connections.remove(0),
1203                Consumer::dummy(),
1204                prod3,
1205            );
1206
1207            // Wait for peer set to be established
1208            context.sleep(Duration::from_millis(100)).await;
1209
1210            // Start fetch with target for peer 2 only (who doesn't have data)
1211            mailbox1
1212                .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1213                .await;
1214
1215            // Wait for the targeted fetch to fail a few times
1216            context.sleep(Duration::from_millis(500)).await;
1217
1218            // Call fetch() which should clear the targets and allow fallback to any peer
1219            mailbox1.fetch(key.clone()).await;
1220
1221            // Should now succeed from peer 3 (who has data but wasn't originally targeted)
1222            let event = cons_out1.recv().await.unwrap();
1223            match event {
1224                Event::Success(key_actual, value) => {
1225                    assert_eq!(key_actual, key);
1226                    assert_eq!(value, valid_data);
1227                }
1228                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1229            }
1230        });
1231    }
1232
1233    #[test_traced]
1234    fn test_fetch_targeted_does_not_restrict_all() {
1235        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1236        executor.start(|context| async move {
1237            let (mut oracle, mut schemes, peers, mut connections) =
1238                setup_network_and_peers(&context, &[1, 2, 3]).await;
1239
1240            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1241            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1242
1243            let key = Key(1);
1244            let valid_data = Bytes::from("valid data");
1245
1246            // Peer 2 has no data, peer 3 has the data
1247            let mut prod3 = Producer::default();
1248            prod3.insert(key.clone(), valid_data.clone());
1249
1250            let (cons1, mut cons_out1) = Consumer::new();
1251
1252            let scheme = schemes.remove(0);
1253            let mut mailbox1 = setup_and_spawn_actor(
1254                &context,
1255                oracle.manager(),
1256                oracle.control(scheme.public_key()),
1257                scheme,
1258                connections.remove(0),
1259                cons1,
1260                Producer::default(),
1261            );
1262
1263            let scheme = schemes.remove(0);
1264            let _mailbox2 = setup_and_spawn_actor(
1265                &context,
1266                oracle.manager(),
1267                oracle.control(scheme.public_key()),
1268                scheme,
1269                connections.remove(0),
1270                Consumer::dummy(),
1271                Producer::default(), // no data
1272            );
1273
1274            let scheme = schemes.remove(0);
1275            let _mailbox3 = setup_and_spawn_actor(
1276                &context,
1277                oracle.manager(),
1278                oracle.control(scheme.public_key()),
1279                scheme,
1280                connections.remove(0),
1281                Consumer::dummy(),
1282                prod3,
1283            );
1284
1285            // Wait for peer set to be established
1286            context.sleep(Duration::from_millis(100)).await;
1287
1288            // Start fetch without targets (can try any peer)
1289            mailbox1.fetch(key.clone()).await;
1290
1291            // Wait a bit for the fetch to start
1292            context.sleep(Duration::from_millis(50)).await;
1293
1294            // Call fetch_targeted with peer 2 only (who doesn't have data)
1295            // This should NOT restrict the existing "all" fetch
1296            mailbox1
1297                .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1298                .await;
1299
1300            // Should still succeed from peer 3 (who has data but wasn't in the targeted call)
1301            // because the original fetch was "all" and shouldn't be restricted
1302            let event = cons_out1.recv().await.unwrap();
1303            match event {
1304                Event::Success(key_actual, value) => {
1305                    assert_eq!(key_actual, key);
1306                    assert_eq!(value, valid_data);
1307                }
1308                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1309            }
1310        });
1311    }
1312
1313    #[test_traced]
1314    fn test_retain() {
1315        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1316        executor.start(|context| async move {
1317            let (mut oracle, mut schemes, peers, mut connections) =
1318                setup_network_and_peers(&context, &[1, 2]).await;
1319
1320            let key = Key(5);
1321            let mut prod2 = Producer::default();
1322            prod2.insert(key.clone(), Bytes::from("data for key 5"));
1323
1324            let (cons1, mut cons_out1) = Consumer::new();
1325
1326            let scheme = schemes.remove(0);
1327            let mut mailbox1 = setup_and_spawn_actor(
1328                &context,
1329                oracle.manager(),
1330                oracle.control(scheme.public_key()),
1331                scheme,
1332                connections.remove(0),
1333                cons1,
1334                Producer::default(),
1335            );
1336
1337            let scheme = schemes.remove(0);
1338            let _mailbox2 = setup_and_spawn_actor(
1339                &context,
1340                oracle.manager(),
1341                oracle.control(scheme.public_key()),
1342                scheme,
1343                connections.remove(0),
1344                Consumer::dummy(),
1345                prod2,
1346            );
1347
1348            // Retain before fetching should have no effect
1349            mailbox1.retain(|_| true).await;
1350            select! {
1351                _ = cons_out1.recv() => {
1352                    panic!("unexpected event");
1353                },
1354                _ = context.sleep(Duration::from_millis(100)) => {},
1355            };
1356
1357            // Start a fetch (no link, so fetch stays in-flight with timer in fetch timers)
1358            mailbox1.fetch(key.clone()).await;
1359
1360            // Retain with predicate that excludes the key
1361            // This must clean up fetch timers entry for the key
1362            let key_clone = key.clone();
1363            mailbox1.retain(move |k| k != &key_clone).await;
1364
1365            // Consumer should receive failed event
1366            let event = cons_out1.recv().await.unwrap();
1367            match event {
1368                Event::Failed(key_actual) => {
1369                    assert_eq!(key_actual, key);
1370                }
1371                Event::Success(_, _) => panic!("Fetch should have been retained out"),
1372            }
1373
1374            // Now add link so fetches can complete
1375            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1376
1377            // Fetch same key again, if fetch timers wasn't cleaned up, this would
1378            // be treated as a duplicate and silently ignored
1379            mailbox1.fetch(key.clone()).await;
1380
1381            // Should succeed
1382            let event = cons_out1.recv().await.unwrap();
1383            match event {
1384                Event::Success(key_actual, value) => {
1385                    assert_eq!(key_actual, key);
1386                    assert_eq!(value, Bytes::from("data for key 5"));
1387                }
1388                Event::Failed(_) => unreachable!(),
1389            }
1390        });
1391    }
1392
1393    #[test_traced]
1394    fn test_clear() {
1395        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1396        executor.start(|context| async move {
1397            let (mut oracle, mut schemes, peers, mut connections) =
1398                setup_network_and_peers(&context, &[1, 2]).await;
1399
1400            // No link yet - fetch will stay in-flight
1401            let key = Key(6);
1402            let mut prod2 = Producer::default();
1403            prod2.insert(key.clone(), Bytes::from("data for key 6"));
1404
1405            let (cons1, mut cons_out1) = Consumer::new();
1406
1407            let scheme = schemes.remove(0);
1408            let mut mailbox1 = setup_and_spawn_actor(
1409                &context,
1410                oracle.manager(),
1411                oracle.control(scheme.public_key()),
1412                scheme,
1413                connections.remove(0),
1414                cons1,
1415                Producer::default(),
1416            );
1417
1418            let scheme = schemes.remove(0);
1419            let _mailbox2 = setup_and_spawn_actor(
1420                &context,
1421                oracle.manager(),
1422                oracle.control(scheme.public_key()),
1423                scheme,
1424                connections.remove(0),
1425                Consumer::dummy(),
1426                prod2,
1427            );
1428
1429            // Clear before fetching should have no effect
1430            mailbox1.clear().await;
1431            select! {
1432                _ = cons_out1.recv() => {
1433                    panic!("unexpected event");
1434                },
1435                _ = context.sleep(Duration::from_millis(100)) => {},
1436            };
1437
1438            // Start a fetch (no link, so fetch stays in-flight with timer in fetch timers)
1439            mailbox1.fetch(key.clone()).await;
1440
1441            // Clear all fetches
1442            mailbox1.clear().await;
1443
1444            // Consumer should receive failed event
1445            let event = cons_out1.recv().await.unwrap();
1446            match event {
1447                Event::Failed(key_actual) => {
1448                    assert_eq!(key_actual, key);
1449                }
1450                Event::Success(_, _) => panic!("Fetch should have been cleared"),
1451            }
1452
1453            // Now add link so fetches can complete
1454            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1455
1456            // Fetch same key again, if fetch_timers wasn't cleaned up, this would
1457            // be treated as a duplicate and silently ignored
1458            mailbox1.fetch(key.clone()).await;
1459
1460            // Should succeed
1461            let event = cons_out1.recv().await.unwrap();
1462            match event {
1463                Event::Success(key_actual, value) => {
1464                    assert_eq!(key_actual, key);
1465                    assert_eq!(value, Bytes::from("data for key 6"));
1466                }
1467                Event::Failed(_) => unreachable!(),
1468            }
1469        });
1470    }
1471
1472    /// Tests that when a peer is rate-limited, the fetcher spills over to another peer.
1473    /// With 2 peers and rate limit of 1/sec each, 2 requests issued simultaneously should
1474    /// both complete immediately (one to each peer) without waiting for rate limit reset.
1475    #[test_traced]
1476    fn test_rate_limit_spillover() {
1477        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1478        executor.start(|context| async move {
1479            // Use a very restrictive rate limit: 1 request per second per peer
1480            let (mut oracle, mut schemes, peers, mut connections) =
1481                setup_network_and_peers_with_rate_limit(
1482                    &context,
1483                    &[1, 2, 3],
1484                    Quota::per_second(NZU32!(1)),
1485                )
1486                .await;
1487
1488            // Add links between peer 1 and both peer 2 and peer 3
1489            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1490            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1491
1492            // Both peer 2 and peer 3 have the same data
1493            let mut prod2 = Producer::default();
1494            let mut prod3 = Producer::default();
1495            prod2.insert(Key(0), Bytes::from("data for key 0"));
1496            prod2.insert(Key(1), Bytes::from("data for key 1"));
1497            prod3.insert(Key(0), Bytes::from("data for key 0"));
1498            prod3.insert(Key(1), Bytes::from("data for key 1"));
1499
1500            let (cons1, mut cons_out1) = Consumer::new();
1501
1502            // Set up peer 1 (the requester)
1503            let scheme = schemes.remove(0);
1504            let mut mailbox1 = setup_and_spawn_actor(
1505                &context,
1506                oracle.manager(),
1507                oracle.control(scheme.public_key()),
1508                scheme,
1509                connections.remove(0),
1510                cons1,
1511                Producer::default(),
1512            );
1513
1514            // Set up peer 2 (has data)
1515            let scheme = schemes.remove(0);
1516            let _mailbox2 = setup_and_spawn_actor(
1517                &context,
1518                oracle.manager(),
1519                oracle.control(scheme.public_key()),
1520                scheme,
1521                connections.remove(0),
1522                Consumer::dummy(),
1523                prod2,
1524            );
1525
1526            // Set up peer 3 (also has data)
1527            let scheme = schemes.remove(0);
1528            let _mailbox3 = setup_and_spawn_actor(
1529                &context,
1530                oracle.manager(),
1531                oracle.control(scheme.public_key()),
1532                scheme,
1533                connections.remove(0),
1534                Consumer::dummy(),
1535                prod3,
1536            );
1537
1538            // Wait for peer set to be established
1539            context.sleep(Duration::from_millis(100)).await;
1540            let start = context.current();
1541
1542            // Issue 2 fetch requests rapidly
1543            // With rate limit of 1/sec per peer and 2 peers, both should complete
1544            // immediately via spill-over (one request to each peer)
1545            mailbox1.fetch(Key(0)).await;
1546            mailbox1.fetch(Key(1)).await;
1547
1548            // Collect results
1549            let mut results = HashMap::new();
1550            for _ in 0..2 {
1551                let event = cons_out1.recv().await.unwrap();
1552                match event {
1553                    Event::Success(key, value) => {
1554                        results.insert(key.clone(), value);
1555                    }
1556                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1557                }
1558            }
1559
1560            // Verify both keys were fetched successfully
1561            assert_eq!(results.len(), 2);
1562            assert_eq!(
1563                results.get(&Key(0)).unwrap(),
1564                &Bytes::from("data for key 0")
1565            );
1566            assert_eq!(
1567                results.get(&Key(1)).unwrap(),
1568                &Bytes::from("data for key 1")
1569            );
1570
1571            // Verify it completed quickly (well under 1 second) - proves spill-over worked
1572            // Without spill-over, the second request would wait ~1 second for rate limit reset
1573            let elapsed = context.current().duration_since(start).unwrap();
1574            assert!(
1575                elapsed < Duration::from_millis(500),
1576                "Expected quick completion via spill-over, but took {elapsed:?}"
1577            );
1578        });
1579    }
1580
1581    /// Tests that rate limiting causes retries to eventually succeed after the rate limit resets.
1582    /// This test uses a single peer with a restrictive rate limit and verifies that
1583    /// fetches eventually complete after waiting for the rate limit to reset.
1584    #[test_traced]
1585    fn test_rate_limit_retry_after_reset() {
1586        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1587        executor.start(|context| async move {
1588            // Use a restrictive rate limit: 1 request per second
1589            let (mut oracle, mut schemes, peers, mut connections) =
1590                setup_network_and_peers_with_rate_limit(
1591                    &context,
1592                    &[1, 2],
1593                    Quota::per_second(NZU32!(1)),
1594                )
1595                .await;
1596
1597            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1598
1599            // Peer 2 has data for multiple keys
1600            let mut prod2 = Producer::default();
1601            prod2.insert(Key(1), Bytes::from("data for key 1"));
1602            prod2.insert(Key(2), Bytes::from("data for key 2"));
1603            prod2.insert(Key(3), Bytes::from("data for key 3"));
1604
1605            let (cons1, mut cons_out1) = Consumer::new();
1606
1607            let scheme = schemes.remove(0);
1608            let mut mailbox1 = setup_and_spawn_actor(
1609                &context,
1610                oracle.manager(),
1611                oracle.control(scheme.public_key()),
1612                scheme,
1613                connections.remove(0),
1614                cons1,
1615                Producer::default(),
1616            );
1617
1618            let scheme = schemes.remove(0);
1619            let _mailbox2 = setup_and_spawn_actor(
1620                &context,
1621                oracle.manager(),
1622                oracle.control(scheme.public_key()),
1623                scheme,
1624                connections.remove(0),
1625                Consumer::dummy(),
1626                prod2,
1627            );
1628
1629            // Wait for peer set to be established
1630            context.sleep(Duration::from_millis(100)).await;
1631            let start = context.current();
1632
1633            // Issue 3 fetch requests to a single peer with rate limit of 1/sec
1634            // Only 1 can be sent immediately, the others must wait for rate limit reset
1635            mailbox1.fetch(Key(1)).await;
1636            mailbox1.fetch(Key(2)).await;
1637            mailbox1.fetch(Key(3)).await;
1638
1639            // All 3 should eventually succeed (after rate limit resets)
1640            let mut results = HashMap::new();
1641            for _ in 0..3 {
1642                let event = cons_out1.recv().await.unwrap();
1643                match event {
1644                    Event::Success(key, value) => {
1645                        results.insert(key.clone(), value);
1646                    }
1647                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1648                }
1649            }
1650
1651            assert_eq!(results.len(), 3);
1652            for i in 1..=3 {
1653                assert_eq!(
1654                    results.get(&Key(i)).unwrap(),
1655                    &Bytes::from(format!("data for key {}", i))
1656                );
1657            }
1658
1659            // Verify it took significant time due to rate limiting
1660            // With 3 requests at 1/sec to a single peer, requests 2 and 3 must wait
1661            // for rate limit resets (~1 second each), so total should be > 2 seconds
1662            let elapsed = context.current().duration_since(start).unwrap();
1663            assert!(
1664                elapsed > Duration::from_secs(2),
1665                "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
1666            );
1667        });
1668    }
1669
1670    /// Tests that the resolver never sends fetch requests to itself (me exclusion).
1671    /// Even when the local peer has the data in its producer, it should fetch from
1672    /// another peer instead.
1673    #[test_traced]
1674    fn test_self_exclusion() {
1675        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1676        executor.start(|context| async move {
1677            let (mut oracle, mut schemes, peers, mut connections) =
1678                setup_network_and_peers(&context, &[1, 2]).await;
1679
1680            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1681
1682            let key = Key(1);
1683            let data = Bytes::from("shared data");
1684
1685            // Both peers have the data - peer 1 (requester) and peer 2
1686            let mut prod1 = Producer::default();
1687            prod1.insert(key.clone(), data.clone());
1688            let mut prod2 = Producer::default();
1689            prod2.insert(key.clone(), data.clone());
1690
1691            let (cons1, mut cons_out1) = Consumer::new();
1692
1693            // Set up peer 1 with `me` set - it has the data but should NOT fetch from itself
1694            let scheme = schemes.remove(0);
1695            let mut mailbox1 = setup_and_spawn_actor(
1696                &context,
1697                oracle.manager(),
1698                oracle.control(scheme.public_key()),
1699                scheme,
1700                connections.remove(0),
1701                cons1,
1702                prod1, // peer 1 has the data
1703            );
1704
1705            // Set up peer 2 - also has the data
1706            let scheme = schemes.remove(0);
1707            let _mailbox2 = setup_and_spawn_actor(
1708                &context,
1709                oracle.manager(),
1710                oracle.control(scheme.public_key()),
1711                scheme,
1712                connections.remove(0),
1713                Consumer::dummy(),
1714                prod2,
1715            );
1716
1717            // Wait for peer set to be established
1718            context.sleep(Duration::from_millis(100)).await;
1719
1720            // Fetch the key - should get it from peer 2, not from self
1721            mailbox1.fetch(key.clone()).await;
1722
1723            // Should succeed (from peer 2)
1724            let event = cons_out1.recv().await.unwrap();
1725            match event {
1726                Event::Success(key_actual, value) => {
1727                    assert_eq!(key_actual, key);
1728                    assert_eq!(value, data);
1729                }
1730                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1731            }
1732        });
1733    }
1734
1735    #[allow(clippy::type_complexity)]
1736    fn spawn_actors_with_handles(
1737        context: deterministic::Context,
1738        oracle: &Oracle<PublicKey, deterministic::Context>,
1739        schemes: Vec<PrivateKey>,
1740        connections: Vec<(
1741            Sender<PublicKey, deterministic::Context>,
1742            Receiver<PublicKey>,
1743        )>,
1744        consumers: Vec<Consumer<Key, Bytes>>,
1745        producers: Vec<Producer<Key, Bytes>>,
1746    ) -> (
1747        Vec<Mailbox<Key, PublicKey>>,
1748        Vec<commonware_runtime::Handle<()>>,
1749    ) {
1750        let actor_context = context.with_label("actor");
1751        let mut mailboxes = Vec::new();
1752        let mut handles = Vec::new();
1753
1754        for (idx, ((scheme, conn), (consumer, producer))) in schemes
1755            .into_iter()
1756            .zip(connections)
1757            .zip(consumers.into_iter().zip(producers))
1758            .enumerate()
1759        {
1760            let ctx = actor_context.with_label(&format!("peer_{idx}"));
1761            let public_key = scheme.public_key();
1762            let (engine, mailbox) = Engine::new(
1763                ctx,
1764                Config {
1765                    peer_provider: oracle.manager(),
1766                    blocker: oracle.control(public_key.clone()),
1767                    consumer,
1768                    producer,
1769                    mailbox_size: MAILBOX_SIZE,
1770                    me: Some(public_key),
1771                    initial: INITIAL_DURATION,
1772                    timeout: TIMEOUT,
1773                    fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
1774                    priority_requests: false,
1775                    priority_responses: false,
1776                },
1777            );
1778            handles.push(engine.start(conn));
1779            mailboxes.push(mailbox);
1780        }
1781
1782        (mailboxes, handles)
1783    }
1784
1785    #[test_traced]
1786    fn test_operations_after_shutdown_do_not_panic() {
1787        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1788        executor.start(|context| async move {
1789            let (mut oracle, schemes, peers, connections) =
1790                setup_network_and_peers(&context, &[1, 2]).await;
1791
1792            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1793
1794            let key = Key(1);
1795            let mut prod2 = Producer::default();
1796            prod2.insert(key.clone(), Bytes::from("data for key 1"));
1797
1798            let (cons1, mut cons_out1) = Consumer::new();
1799
1800            let (mut mailboxes, handles) = spawn_actors_with_handles(
1801                context.clone(),
1802                &oracle,
1803                schemes,
1804                connections,
1805                vec![cons1, Consumer::dummy()],
1806                vec![Producer::default(), prod2],
1807            );
1808
1809            // Fetch to verify network is functional
1810            mailboxes[0].fetch(key.clone()).await;
1811            let event = cons_out1.recv().await.unwrap();
1812            match event {
1813                Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1814                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1815            }
1816
1817            // Abort all actors
1818            for handle in handles {
1819                handle.abort();
1820            }
1821            context.sleep(Duration::from_millis(100)).await;
1822
1823            // All operations should not panic after shutdown
1824
1825            // Fetch should not panic
1826            let key2 = Key(2);
1827            mailboxes[0].fetch(key2.clone()).await;
1828
1829            // Cancel should not panic
1830            mailboxes[0].cancel(key2.clone()).await;
1831
1832            // Clear should not panic
1833            mailboxes[0].clear().await;
1834
1835            // Retain should not panic
1836            mailboxes[0].retain(|_| true).await;
1837
1838            // Fetch targeted should not panic
1839            mailboxes[0]
1840                .fetch_targeted(Key(3), non_empty_vec![peers[1].clone()])
1841                .await;
1842        });
1843    }
1844
1845    fn clean_shutdown(seed: u64) {
1846        let cfg = deterministic::Config::default()
1847            .with_seed(seed)
1848            .with_timeout(Some(Duration::from_secs(30)));
1849        let executor = deterministic::Runner::new(cfg);
1850        executor.start(|context| async move {
1851            let (mut oracle, schemes, peers, connections) =
1852                setup_network_and_peers(&context, &[1, 2]).await;
1853
1854            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1855
1856            let key = Key(1);
1857            let mut prod2 = Producer::default();
1858            prod2.insert(key.clone(), Bytes::from("data for key 1"));
1859
1860            let (cons1, mut cons_out1) = Consumer::new();
1861
1862            let (mut mailboxes, handles) = spawn_actors_with_handles(
1863                context.clone(),
1864                &oracle,
1865                schemes,
1866                connections,
1867                vec![cons1, Consumer::dummy()],
1868                vec![Producer::default(), prod2],
1869            );
1870
1871            // Allow tasks to start
1872            context.sleep(Duration::from_millis(100)).await;
1873
1874            // Count running tasks under the actor prefix
1875            let running_before = count_running_tasks(&context, "actor");
1876            assert!(
1877                running_before > 0,
1878                "at least one actor task should be running"
1879            );
1880
1881            // Verify network is functional
1882            mailboxes[0].fetch(key.clone()).await;
1883            let event = cons_out1.recv().await.unwrap();
1884            match event {
1885                Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1886                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1887            }
1888
1889            // Abort all actors
1890            for handle in handles {
1891                handle.abort();
1892            }
1893            context.sleep(Duration::from_millis(100)).await;
1894
1895            // Verify all actor tasks are stopped
1896            let running_after = count_running_tasks(&context, "actor");
1897            assert_eq!(
1898                running_after, 0,
1899                "all actor tasks should be stopped, but {running_after} still running"
1900            );
1901        });
1902    }
1903
1904    #[test]
1905    fn test_clean_shutdown() {
1906        for seed in 0..25 {
1907            clean_shutdown(seed);
1908        }
1909    }
1910}