commonware_resolver/p2p/
mod.rs

1//! Resolve data identified by a fixed-length key by using the P2P network.
2//!
3//! # Overview
4//!
5//! The `p2p` module enables resolving data by fixed-length keys in a P2P network. Central to the
6//! module is the `peer` actor which manages the fetch-request lifecycle. Its mailbox allows
7//! initiation and cancellation of fetch requests via the `Resolver` interface.
8//!
9//! The peer handles an arbitrarily large number of concurrent fetch requests by sending requests
10//! to other peers and processing their responses. It selects peers based on performance, retrying
11//! with another peer if one fails or provides invalid data. Requests persist until canceled or
12//! fulfilled, delivering data to the `Consumer` for verification.
13//!
14//! The `Consumer` checks data integrity and authenticity (critical in an adversarial environment)
15//! and returns `true` if valid, completing the fetch, or `false` to retry.
16//!
17//! The peer also serves data to other peers, forwarding network requests to the `Producer`. The
18//! `Producer` provides data asynchronously (e.g., from storage). If it fails, the peer sends an
19//! empty response, prompting the requester to retry elsewhere. Each message between peers contains
20//! an ID. Each request is sent with a unique ID, and each response includes the ID of the request
21//! it responds to.
22//!
23//! # Targeting
24//!
25//! Callers can restrict fetches to specific target peers using [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted).
26//! Only target peers are tried, there is no automatic fallback to other peers. Targets persist through
27//! transient failures (timeout, "no data" response, send failure) since the peer might be slow or
28//! receive the data later.
29//!
30//! While a fetch is in progress, callers can modify targeting:
31//! - [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted) adds peers to the existing target set
32//!   (only if the fetch already has targets, an "all" fetch remains unrestricted)
33//! - [`Resolver::fetch`](crate::Resolver::fetch) clears all targets, allowing fallback to any peer
34//!
35//! These modifications only apply to in-progress fetches. Once a fetch completes (success, cancel,
36//! or blocked peer), the targets for that key are cleared automatically.
37//!
38//! # Performance Considerations
39//!
40//! The peer supports arbitrarily many concurrent fetch requests, but resource usage generally
41//! depends on the rate-limiting configuration of the underlying P2P network.
42
43use bytes::Bytes;
44use commonware_utils::Span;
45use futures::channel::oneshot;
46use std::future::Future;
47
48mod config;
49pub use config::Config;
50mod engine;
51pub use engine::Engine;
52mod fetcher;
53mod ingress;
54pub use ingress::Mailbox;
55mod metrics;
56mod wire;
57
58#[cfg(feature = "mocks")]
59pub mod mocks;
60
61/// Serves data requested by the network.
62pub trait Producer: Clone + Send + 'static {
63    /// Type used to uniquely identify data.
64    type Key: Span;
65
66    /// Serve a request received from the network.
67    fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
68}
69
70#[cfg(test)]
71mod tests {
72    use super::{
73        mocks::{Consumer, Event, Key, Producer},
74        Config, Engine, Mailbox,
75    };
76    use crate::Resolver;
77    use bytes::Bytes;
78    use commonware_cryptography::{
79        ed25519::{PrivateKey, PublicKey},
80        Signer,
81    };
82    use commonware_macros::{select, test_traced};
83    use commonware_p2p::{
84        simulated::{Link, Network, Oracle, Receiver, Sender},
85        Blocker, Manager,
86    };
87    use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
88    use commonware_utils::{non_empty_vec, NZU32};
89    use futures::StreamExt;
90    use std::{collections::HashMap, num::NonZeroU32, time::Duration};
91
92    const MAILBOX_SIZE: usize = 1024;
93    const RATE_LIMIT: NonZeroU32 = NZU32!(10);
94    const INITIAL_DURATION: Duration = Duration::from_millis(100);
95    const TIMEOUT: Duration = Duration::from_millis(400);
96    const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
97    const LINK: Link = Link {
98        latency: Duration::from_millis(10),
99        jitter: Duration::from_millis(1),
100        success_rate: 1.0,
101    };
102    const LINK_UNRELIABLE: Link = Link {
103        latency: Duration::from_millis(10),
104        jitter: Duration::from_millis(1),
105        success_rate: 0.5,
106    };
107
108    async fn setup_network_and_peers(
109        context: &deterministic::Context,
110        peer_seeds: &[u64],
111    ) -> (
112        Oracle<PublicKey, deterministic::Context>,
113        Vec<PrivateKey>,
114        Vec<PublicKey>,
115        Vec<(
116            Sender<PublicKey, deterministic::Context>,
117            Receiver<PublicKey>,
118        )>,
119    ) {
120        setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
121            .await
122    }
123
124    async fn setup_network_and_peers_with_rate_limit(
125        context: &deterministic::Context,
126        peer_seeds: &[u64],
127        rate_limit: Quota,
128    ) -> (
129        Oracle<PublicKey, deterministic::Context>,
130        Vec<PrivateKey>,
131        Vec<PublicKey>,
132        Vec<(
133            Sender<PublicKey, deterministic::Context>,
134            Receiver<PublicKey>,
135        )>,
136    ) {
137        let (network, oracle) = Network::new(
138            context.with_label("network"),
139            commonware_p2p::simulated::Config {
140                max_size: 1024 * 1024,
141                disconnect_on_block: true,
142                tracked_peer_sets: Some(3),
143            },
144        );
145        network.start();
146
147        let schemes: Vec<PrivateKey> = peer_seeds
148            .iter()
149            .map(|seed| PrivateKey::from_seed(*seed))
150            .collect();
151        let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
152        let mut manager = oracle.manager();
153        manager.update(0, peers.clone().try_into().unwrap()).await;
154
155        let mut connections = Vec::new();
156        for peer in &peers {
157            let (sender, receiver) = oracle
158                .control(peer.clone())
159                .register(0, rate_limit)
160                .await
161                .unwrap();
162            connections.push((sender, receiver));
163        }
164
165        (oracle, schemes, peers, connections)
166    }
167
168    async fn add_link(
169        oracle: &mut Oracle<PublicKey, deterministic::Context>,
170        link: Link,
171        peers: &[PublicKey],
172        from: usize,
173        to: usize,
174    ) {
175        oracle
176            .add_link(peers[from].clone(), peers[to].clone(), link.clone())
177            .await
178            .unwrap();
179        oracle
180            .add_link(peers[to].clone(), peers[from].clone(), link)
181            .await
182            .unwrap();
183    }
184
185    async fn setup_and_spawn_actor(
186        context: &deterministic::Context,
187        manager: impl Manager<PublicKey = PublicKey>,
188        blocker: impl Blocker<PublicKey = PublicKey>,
189        signer: impl Signer<PublicKey = PublicKey>,
190        connection: (
191            Sender<PublicKey, deterministic::Context>,
192            Receiver<PublicKey>,
193        ),
194        consumer: Consumer<Key, Bytes>,
195        producer: Producer<Key, Bytes>,
196    ) -> Mailbox<Key, PublicKey> {
197        let public_key = signer.public_key();
198        let (engine, mailbox) = Engine::new(
199            context.with_label(&format!("actor_{public_key}")),
200            Config {
201                manager,
202                blocker,
203                consumer,
204                producer,
205                mailbox_size: MAILBOX_SIZE,
206                me: Some(public_key),
207                initial: INITIAL_DURATION,
208                timeout: TIMEOUT,
209                fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
210                priority_requests: false,
211                priority_responses: false,
212            },
213        );
214        engine.start(connection);
215
216        mailbox
217    }
218
219    /// Tests that fetching a key from another peer succeeds when data is available.
220    /// This test sets up two peers, where Peer 1 requests data that Peer 2 has,
221    /// and verifies that the data is correctly delivered to Peer 1's consumer.
222    #[test_traced]
223    fn test_fetch_success() {
224        let executor = deterministic::Runner::timed(Duration::from_secs(10));
225        executor.start(|context| async move {
226            let (mut oracle, mut schemes, peers, mut connections) =
227                setup_network_and_peers(&context, &[1, 2]).await;
228
229            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
230
231            let key = Key(2);
232            let mut prod2 = Producer::default();
233            prod2.insert(key.clone(), Bytes::from("data for key 2"));
234
235            let (cons1, mut cons_out1) = Consumer::new();
236
237            let scheme = schemes.remove(0);
238            let mut mailbox1 = setup_and_spawn_actor(
239                &context,
240                oracle.manager(),
241                oracle.control(scheme.public_key()),
242                scheme,
243                connections.remove(0),
244                cons1,
245                Producer::default(),
246            )
247            .await;
248
249            let scheme = schemes.remove(0);
250            let _mailbox2 = setup_and_spawn_actor(
251                &context,
252                oracle.manager(),
253                oracle.control(scheme.public_key()),
254                scheme,
255                connections.remove(0),
256                Consumer::dummy(),
257                prod2,
258            )
259            .await;
260
261            mailbox1.fetch(key.clone()).await;
262
263            let event = cons_out1.next().await.unwrap();
264            match event {
265                Event::Success(key_actual, value) => {
266                    assert_eq!(key_actual, key);
267                    assert_eq!(value, Bytes::from("data for key 2"));
268                }
269                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
270            }
271        });
272    }
273
274    /// Tests that canceling a fetch request results in a failure event.
275    /// This test initiates a fetch request and immediately cancels it,
276    /// verifying that the consumer receives a failure notification instead of data.
277    #[test_traced]
278    fn test_cancel_fetch() {
279        let executor = deterministic::Runner::timed(Duration::from_secs(10));
280        executor.start(|context| async move {
281            let (oracle, mut schemes, _peers, mut connections) =
282                setup_network_and_peers(&context, &[1]).await;
283
284            let (cons1, mut cons_out1) = Consumer::new();
285            let prod1 = Producer::default();
286
287            let scheme = schemes.remove(0);
288            let mut mailbox1 = setup_and_spawn_actor(
289                &context,
290                oracle.manager(),
291                oracle.control(scheme.public_key()),
292                scheme,
293                connections.remove(0),
294                cons1,
295                prod1,
296            )
297            .await;
298
299            let key = Key(3);
300            mailbox1.fetch(key.clone()).await;
301            mailbox1.cancel(key.clone()).await;
302
303            let event = cons_out1.next().await.unwrap();
304            match event {
305                Event::Failed(key_actual) => {
306                    assert_eq!(key_actual, key);
307                }
308                Event::Success(_, _) => panic!("Fetch should have been canceled"),
309            }
310        });
311    }
312
313    /// Tests fetching data from a peer when some peers lack the data.
314    /// This test sets up three peers, where Peer 1 requests data that only Peer 3 has.
315    /// It verifies that the resolver retries with another peer and successfully
316    /// delivers the data to Peer 1's consumer.
317    #[test_traced]
318    fn test_peer_no_data() {
319        let executor = deterministic::Runner::timed(Duration::from_secs(10));
320        executor.start(|context| async move {
321            let (mut oracle, mut schemes, peers, mut connections) =
322                setup_network_and_peers(&context, &[1, 2, 3]).await;
323
324            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
325            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
326
327            let prod1 = Producer::default();
328            let prod2 = Producer::default();
329            let mut prod3 = Producer::default();
330            let key = Key(3);
331            prod3.insert(key.clone(), Bytes::from("data for key 3"));
332
333            let (cons1, mut cons_out1) = Consumer::new();
334
335            let scheme = schemes.remove(0);
336            let mut mailbox1 = setup_and_spawn_actor(
337                &context,
338                oracle.manager(),
339                oracle.control(scheme.public_key()),
340                scheme,
341                connections.remove(0),
342                cons1,
343                prod1,
344            )
345            .await;
346
347            let scheme = schemes.remove(0);
348            let _mailbox2 = setup_and_spawn_actor(
349                &context,
350                oracle.manager(),
351                oracle.control(scheme.public_key()),
352                scheme,
353                connections.remove(0),
354                Consumer::dummy(),
355                prod2,
356            )
357            .await;
358
359            let scheme = schemes.remove(0);
360            let _mailbox3 = setup_and_spawn_actor(
361                &context,
362                oracle.manager(),
363                oracle.control(scheme.public_key()),
364                scheme,
365                connections.remove(0),
366                Consumer::dummy(),
367                prod3,
368            )
369            .await;
370
371            mailbox1.fetch(key.clone()).await;
372
373            let event = cons_out1.next().await.unwrap();
374            match event {
375                Event::Success(key_actual, value) => {
376                    assert_eq!(key_actual, key);
377                    assert_eq!(value, Bytes::from("data for key 3"));
378                }
379                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
380            }
381        });
382    }
383
384    /// Tests fetching when no peers are available.
385    /// This test sets up a single peer with an empty peer provider (no peers).
386    /// It initiates a fetch, waits beyond the retry timeout, cancels the fetch,
387    /// and verifies that the consumer receives a failure notification.
388    #[test_traced]
389    fn test_no_peers_available() {
390        let executor = deterministic::Runner::timed(Duration::from_secs(10));
391        executor.start(|context| async move {
392            let (oracle, mut schemes, _peers, mut connections) =
393                setup_network_and_peers(&context, &[1]).await;
394
395            let (cons1, mut cons_out1) = Consumer::new();
396            let prod1 = Producer::default();
397
398            let scheme = schemes.remove(0);
399            let mut mailbox1 = setup_and_spawn_actor(
400                &context,
401                oracle.manager(),
402                oracle.control(scheme.public_key()),
403                scheme,
404                connections.remove(0),
405                cons1,
406                prod1,
407            )
408            .await;
409
410            let key = Key(4);
411            mailbox1.fetch(key.clone()).await;
412            context.sleep(Duration::from_secs(5)).await;
413            mailbox1.cancel(key.clone()).await;
414
415            let event = cons_out1.next().await.expect("Consumer channel closed");
416            match event {
417                Event::Failed(key_actual) => {
418                    assert_eq!(key_actual, key);
419                }
420                Event::Success(_, _) => {
421                    panic!("Fetch should have failed due to no peers")
422                }
423            }
424        });
425    }
426
427    /// Tests that concurrent fetch requests are handled correctly.
428    /// Also tests that the peer can recover from having no peers available.
429    /// Also tests that the peer can get data from multiple peers that have different sets of data.
430    #[test_traced]
431    fn test_concurrent_fetch_requests() {
432        let executor = deterministic::Runner::timed(Duration::from_secs(60));
433        executor.start(|context| async move {
434            let (mut oracle, mut schemes, peers, mut connections) =
435                setup_network_and_peers(&context, &[1, 2, 3]).await;
436
437            let key2 = Key(2);
438            let key3 = Key(3);
439            let mut prod2 = Producer::default();
440            prod2.insert(key2.clone(), Bytes::from("data for key 2"));
441            let mut prod3 = Producer::default();
442            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
443
444            let (cons1, mut cons_out1) = Consumer::new();
445
446            let scheme = schemes.remove(0);
447            let mut mailbox1 = setup_and_spawn_actor(
448                &context,
449                oracle.manager(),
450                oracle.control(scheme.public_key()),
451                scheme,
452                connections.remove(0),
453                cons1,
454                Producer::default(),
455            )
456            .await;
457
458            let scheme = schemes.remove(0);
459            let _mailbox2 = setup_and_spawn_actor(
460                &context,
461                oracle.manager(),
462                oracle.control(scheme.public_key()),
463                scheme,
464                connections.remove(0),
465                Consumer::dummy(),
466                prod2,
467            )
468            .await;
469
470            let scheme = schemes.remove(0);
471            let _mailbox3 = setup_and_spawn_actor(
472                &context,
473                oracle.manager(),
474                oracle.control(scheme.public_key()),
475                scheme,
476                connections.remove(0),
477                Consumer::dummy(),
478                prod3,
479            )
480            .await;
481
482            // Add choppy links between the requester and the two producers
483            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
484            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
485
486            // Run the fetches multiple times to ensure that the peer tries both of its peers
487            for _ in 0..10 {
488                // Initiate concurrent fetch requests
489                mailbox1.fetch(key2.clone()).await;
490                mailbox1.fetch(key3.clone()).await;
491
492                // Collect both events without assuming order
493                let mut events = Vec::new();
494                events.push(cons_out1.next().await.expect("Consumer channel closed"));
495                events.push(cons_out1.next().await.expect("Consumer channel closed"));
496
497                // Check that both keys were successfully fetched
498                let mut found_key2 = false;
499                let mut found_key3 = false;
500                for event in events {
501                    match event {
502                        Event::Success(key_actual, value) => {
503                            if key_actual == key2 {
504                                assert_eq!(value, Bytes::from("data for key 2"));
505                                found_key2 = true;
506                            } else if key_actual == key3 {
507                                assert_eq!(value, Bytes::from("data for key 3"));
508                                found_key3 = true;
509                            } else {
510                                panic!("Unexpected key received");
511                            }
512                        }
513                        Event::Failed(_) => panic!("Fetch failed unexpectedly"),
514                    }
515                }
516                assert!(found_key2 && found_key3,);
517            }
518        });
519    }
520
521    /// Tests that canceling an inactive fetch request has no effect.
522    /// Cancels a request before, after, and during the fetch process,
523    #[test_traced]
524    fn test_cancel() {
525        let executor = deterministic::Runner::timed(Duration::from_secs(10));
526        executor.start(|context| async move {
527            let (mut oracle, mut schemes, peers, mut connections) =
528                setup_network_and_peers(&context, &[1, 2]).await;
529
530            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
531
532            let key = Key(6);
533            let mut prod2 = Producer::default();
534            prod2.insert(key.clone(), Bytes::from("data for key 6"));
535
536            let (cons1, mut cons_out1) = Consumer::new();
537
538            let scheme = schemes.remove(0);
539            let mut mailbox1 = setup_and_spawn_actor(
540                &context,
541                oracle.manager(),
542                oracle.control(scheme.public_key()),
543                scheme,
544                connections.remove(0),
545                cons1,
546                Producer::default(),
547            )
548            .await;
549
550            let scheme = schemes.remove(0);
551            let _mailbox2 = setup_and_spawn_actor(
552                &context,
553                oracle.manager(),
554                oracle.control(scheme.public_key()),
555                scheme,
556                connections.remove(0),
557                Consumer::dummy(),
558                prod2,
559            )
560            .await;
561
562            // Cancel before sending the fetch request, expecting no effect
563            mailbox1.cancel(key.clone()).await;
564            select! {
565                _ = cons_out1.next() => { panic!("unexpected event"); },
566                _ = context.sleep(Duration::from_millis(100)) => {},
567            };
568
569            // Initiate fetch and wait for data to be delivered
570            mailbox1.fetch(key.clone()).await;
571            let event = cons_out1.next().await.unwrap();
572            match event {
573                Event::Success(key_actual, value) => {
574                    assert_eq!(key_actual, key);
575                    assert_eq!(value, Bytes::from("data for key 6"));
576                }
577                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
578            }
579
580            // Attempt to cancel after data has been delivered, expecting no effect
581            mailbox1.cancel(key.clone()).await;
582            select! {
583                _ = cons_out1.next() => { panic!("unexpected event"); },
584                _ = context.sleep(Duration::from_millis(100)) => {},
585            };
586
587            // Initiate and cancel another fetch request
588            let key = Key(7);
589            mailbox1.fetch(key.clone()).await;
590            mailbox1.cancel(key.clone()).await;
591
592            // Make sure we receive a failure event
593            let event = cons_out1.next().await.unwrap();
594            match event {
595                Event::Failed(key_actual) => {
596                    assert_eq!(key_actual, key);
597                }
598                Event::Success(_, _) => panic!("Fetch should have been canceled"),
599            }
600        });
601    }
602
603    /// Tests that a peer is blocked after delivering invalid data,
604    /// preventing further fetches from that peer.
605    #[test_traced]
606    fn test_blocking_peer() {
607        let executor = deterministic::Runner::timed(Duration::from_secs(10));
608        executor.start(|context| async move {
609            let (mut oracle, mut schemes, peers, mut connections) =
610                setup_network_and_peers(&context, &[1, 2, 3]).await;
611
612            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
613            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
614            add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
615
616            let key_a = Key(1);
617            let key_b = Key(2);
618            let invalid_data_a = Bytes::from("invalid for A");
619            let valid_data_a = Bytes::from("valid for A");
620            let valid_data_b = Bytes::from("valid for B");
621
622            // Set up producers
623            let mut prod2 = Producer::default();
624            prod2.insert(key_a.clone(), invalid_data_a.clone());
625            prod2.insert(key_b.clone(), valid_data_b.clone());
626
627            let mut prod3 = Producer::default();
628            prod3.insert(key_a.clone(), valid_data_a.clone());
629
630            // Set up consumer for Peer1 with expected values
631            let (mut cons1, mut cons_out1) = Consumer::new();
632            cons1.add_expected(key_a.clone(), valid_data_a.clone());
633            cons1.add_expected(key_b.clone(), valid_data_b.clone());
634
635            // Spawn actors
636            let scheme = schemes.remove(0);
637            let mut mailbox1 = setup_and_spawn_actor(
638                &context,
639                oracle.manager(),
640                oracle.control(scheme.public_key()),
641                scheme,
642                connections.remove(0),
643                cons1,
644                Producer::default(),
645            )
646            .await;
647
648            let scheme = schemes.remove(0);
649            let _mailbox2 = setup_and_spawn_actor(
650                &context,
651                oracle.manager(),
652                oracle.control(scheme.public_key()),
653                scheme,
654                connections.remove(0),
655                Consumer::dummy(),
656                prod2,
657            )
658            .await;
659
660            let scheme = schemes.remove(0);
661            let _mailbox3 = setup_and_spawn_actor(
662                &context,
663                oracle.manager(),
664                oracle.control(scheme.public_key()),
665                scheme,
666                connections.remove(0),
667                Consumer::dummy(),
668                prod3,
669            )
670            .await;
671
672            // Fetch keyA multiple times to ensure that Peer2 is blocked.
673            for _ in 0..20 {
674                // Fetch keyA
675                mailbox1.fetch(key_a.clone()).await;
676
677                // Wait for success event for keyA
678                let event = cons_out1.next().await.unwrap();
679                match event {
680                    Event::Success(key_actual, value) => {
681                        assert_eq!(key_actual, key_a);
682                        assert_eq!(value, valid_data_a);
683                    }
684                    Event::Failed(_) => panic!("Fetch failed unexpectedly"),
685                }
686            }
687
688            // Fetch keyB
689            mailbox1.fetch(key_b.clone()).await;
690
691            // Wait for some time (longer than retry timeout)
692            context.sleep(Duration::from_secs(5)).await;
693
694            // Cancel the fetch for keyB
695            mailbox1.cancel(key_b.clone()).await;
696
697            // Wait for failure event for keyB
698            let event = cons_out1.next().await.unwrap();
699            match event {
700                Event::Failed(key_actual) => {
701                    assert_eq!(key_actual, key_b);
702                }
703                Event::Success(_, _) => panic!("Fetch should have been canceled"),
704            }
705
706            // Check oracle
707            let blocked = oracle.blocked().await.unwrap();
708            assert_eq!(blocked.len(), 1);
709            assert_eq!(blocked[0].0, peers[0]);
710            assert_eq!(blocked[0].1, peers[1]);
711        });
712    }
713
714    /// Tests that duplicate fetch requests for the same key are handled properly.
715    /// The test verifies that when the same key is requested multiple times,
716    /// the data is correctly delivered once without errors.
717    #[test_traced]
718    fn test_duplicate_fetch_request() {
719        let executor = deterministic::Runner::timed(Duration::from_secs(10));
720        executor.start(|context| async move {
721            let (mut oracle, mut schemes, peers, mut connections) =
722                setup_network_and_peers(&context, &[1, 2]).await;
723
724            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
725
726            let key = Key(5);
727            let mut prod2 = Producer::default();
728            prod2.insert(key.clone(), Bytes::from("data for key 5"));
729
730            let (cons1, mut cons_out1) = Consumer::new();
731
732            let scheme = schemes.remove(0);
733            let mut mailbox1 = setup_and_spawn_actor(
734                &context,
735                oracle.manager(),
736                oracle.control(scheme.public_key()),
737                scheme,
738                connections.remove(0),
739                cons1,
740                Producer::default(),
741            )
742            .await;
743
744            let scheme = schemes.remove(0);
745            let _mailbox2 = setup_and_spawn_actor(
746                &context,
747                oracle.manager(),
748                oracle.control(scheme.public_key()),
749                scheme,
750                connections.remove(0),
751                Consumer::dummy(),
752                prod2,
753            )
754            .await;
755
756            // Send duplicate fetch requests for the same key
757            mailbox1.fetch(key.clone()).await;
758            mailbox1.fetch(key.clone()).await;
759
760            // Should receive the data only once
761            let event = cons_out1.next().await.unwrap();
762            match event {
763                Event::Success(key_actual, value) => {
764                    assert_eq!(key_actual, key);
765                    assert_eq!(value, Bytes::from("data for key 5"));
766                }
767                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
768            }
769
770            // Make sure we don't receive a second event for the duplicate fetch
771            select! {
772                _ = cons_out1.next() => {
773                    panic!("Unexpected second event received for duplicate fetch");
774                },
775                _ = context.sleep(Duration::from_millis(500)) => {
776                    // This is expected - no additional events should be produced
777                },
778            };
779        });
780    }
781
782    /// Tests that changing peer sets is handled correctly using the update channel.
783    /// This test verifies that when the peer set changes from peer A to peer B,
784    /// the resolver correctly adapts and fetches from the new peer.
785    #[test_traced]
786    fn test_changing_peer_sets() {
787        let executor = deterministic::Runner::timed(Duration::from_secs(10));
788        executor.start(|context| async move {
789            let (mut oracle, mut schemes, peers, mut connections) =
790                setup_network_and_peers(&context, &[1, 2, 3]).await;
791
792            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
793            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
794
795            let key1 = Key(1);
796            let key2 = Key(2);
797
798            let mut prod2 = Producer::default();
799            prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
800
801            let mut prod3 = Producer::default();
802            prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
803
804            let (cons1, mut cons_out1) = Consumer::new();
805
806            let scheme = schemes.remove(0);
807            let mut mailbox1 = setup_and_spawn_actor(
808                &context,
809                oracle.manager(),
810                oracle.control(scheme.public_key()),
811                scheme,
812                connections.remove(0),
813                cons1,
814                Producer::default(),
815            )
816            .await;
817
818            let scheme = schemes.remove(0);
819            let _mailbox2 = setup_and_spawn_actor(
820                &context,
821                oracle.manager(),
822                oracle.control(scheme.public_key()),
823                scheme,
824                connections.remove(0),
825                Consumer::dummy(),
826                prod2,
827            )
828            .await;
829
830            // Fetch key1 from peer 2
831            mailbox1.fetch(key1.clone()).await;
832
833            // Wait for successful fetch
834            let event = cons_out1.next().await.unwrap();
835            match event {
836                Event::Success(key_actual, value) => {
837                    assert_eq!(key_actual, key1);
838                    assert_eq!(value, Bytes::from("data from peer 2"));
839                }
840                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
841            }
842
843            // Change peer set to include peer 3
844            let scheme = schemes.remove(0);
845            let _mailbox3 = setup_and_spawn_actor(
846                &context,
847                oracle.manager(),
848                oracle.control(scheme.public_key()),
849                scheme,
850                connections.remove(0),
851                Consumer::dummy(),
852                prod3,
853            )
854            .await;
855
856            // Need to wait for the peer set change to propagate
857            context.sleep(Duration::from_millis(200)).await;
858
859            // Fetch key2 from peer 3
860            mailbox1.fetch(key2.clone()).await;
861
862            // Wait for successful fetch
863            let event = cons_out1.next().await.unwrap();
864            match event {
865                Event::Success(key_actual, value) => {
866                    assert_eq!(key_actual, key2);
867                    assert_eq!(value, Bytes::from("data from peer 3"));
868                }
869                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
870            }
871        });
872    }
873
874    #[test_traced]
875    fn test_fetch_targeted() {
876        let executor = deterministic::Runner::timed(Duration::from_secs(10));
877        executor.start(|context| async move {
878            let (mut oracle, mut schemes, peers, mut connections) =
879                setup_network_and_peers(&context, &[1, 2, 3]).await;
880
881            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
882            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
883
884            let key = Key(1);
885            let invalid_data = Bytes::from("invalid data");
886            let valid_data = Bytes::from("valid data");
887
888            // Peer 2 has invalid data, peer 3 has valid data
889            let mut prod2 = Producer::default();
890            prod2.insert(key.clone(), invalid_data.clone());
891
892            let mut prod3 = Producer::default();
893            prod3.insert(key.clone(), valid_data.clone());
894
895            // Consumer expects only valid_data
896            let (mut cons1, mut cons_out1) = Consumer::new();
897            cons1.add_expected(key.clone(), valid_data.clone());
898
899            let scheme = schemes.remove(0);
900            let mut mailbox1 = setup_and_spawn_actor(
901                &context,
902                oracle.manager(),
903                oracle.control(scheme.public_key()),
904                scheme,
905                connections.remove(0),
906                cons1,
907                Producer::default(),
908            )
909            .await;
910
911            let scheme = schemes.remove(0);
912            let _mailbox2 = setup_and_spawn_actor(
913                &context,
914                oracle.manager(),
915                oracle.control(scheme.public_key()),
916                scheme,
917                connections.remove(0),
918                Consumer::dummy(),
919                prod2,
920            )
921            .await;
922
923            let scheme = schemes.remove(0);
924            let _mailbox3 = setup_and_spawn_actor(
925                &context,
926                oracle.manager(),
927                oracle.control(scheme.public_key()),
928                scheme,
929                connections.remove(0),
930                Consumer::dummy(),
931                prod3,
932            )
933            .await;
934
935            // Wait for peer set to be established
936            context.sleep(Duration::from_millis(100)).await;
937
938            // Start fetch with targets for both peer 2 (invalid data) and peer 3 (valid data)
939            // When peer 2 returns invalid data, only peer 2 should be removed from targets
940            // Peer 3 should still be tried as a target and succeed
941            mailbox1
942                .fetch_targeted(
943                    key.clone(),
944                    non_empty_vec![peers[1].clone(), peers[2].clone()],
945                )
946                .await;
947
948            // Should eventually succeed from peer 3
949            let event = cons_out1.next().await.unwrap();
950            match event {
951                Event::Success(key_actual, value) => {
952                    assert_eq!(key_actual, key);
953                    assert_eq!(value, valid_data);
954                }
955                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
956            }
957
958            // Verify peer 2 was blocked (sent invalid data)
959            let blocked = oracle.blocked().await.unwrap();
960            assert_eq!(blocked.len(), 1);
961            assert_eq!(blocked[0].0, peers[0]);
962            assert_eq!(blocked[0].1, peers[1]);
963
964            // Verify metrics: 1 successful fetch (from peer 3 after peer 2 was blocked)
965            let metrics = context.encode();
966            assert!(metrics.contains("_fetch_total{status=\"Success\"} 1"));
967        });
968    }
969
970    #[test_traced]
971    fn test_fetch_targeted_no_fallback() {
972        let executor = deterministic::Runner::timed(Duration::from_secs(10));
973        executor.start(|context| async move {
974            let (mut oracle, mut schemes, peers, mut connections) =
975                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
976
977            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
978            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
979            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
980
981            let key = Key(1);
982
983            // Only peer 4 has the data, peers 2 and 3 don't
984            let mut prod4 = Producer::default();
985            prod4.insert(key.clone(), Bytes::from("data from peer 4"));
986
987            let (cons1, mut cons_out1) = Consumer::new();
988
989            let scheme = schemes.remove(0);
990            let mut mailbox1 = setup_and_spawn_actor(
991                &context,
992                oracle.manager(),
993                oracle.control(scheme.public_key()),
994                scheme,
995                connections.remove(0),
996                cons1,
997                Producer::default(),
998            )
999            .await;
1000
1001            let scheme = schemes.remove(0);
1002            let _mailbox2 = setup_and_spawn_actor(
1003                &context,
1004                oracle.manager(),
1005                oracle.control(scheme.public_key()),
1006                scheme,
1007                connections.remove(0),
1008                Consumer::dummy(),
1009                Producer::default(), // no data
1010            )
1011            .await;
1012
1013            let scheme = schemes.remove(0);
1014            let _mailbox3 = setup_and_spawn_actor(
1015                &context,
1016                oracle.manager(),
1017                oracle.control(scheme.public_key()),
1018                scheme,
1019                connections.remove(0),
1020                Consumer::dummy(),
1021                Producer::default(), // no data
1022            )
1023            .await;
1024
1025            let scheme = schemes.remove(0);
1026            let _mailbox4 = setup_and_spawn_actor(
1027                &context,
1028                oracle.manager(),
1029                oracle.control(scheme.public_key()),
1030                scheme,
1031                connections.remove(0),
1032                Consumer::dummy(),
1033                prod4,
1034            )
1035            .await;
1036
1037            // Wait for peer set to be established
1038            context.sleep(Duration::from_millis(100)).await;
1039
1040            // Start fetch with targets for peers 2 and 3 (both don't have data)
1041            // Peer 4 has data but is NOT a target - it should NEVER be tried
1042            mailbox1
1043                .fetch_targeted(
1044                    key.clone(),
1045                    non_empty_vec![peers[1].clone(), peers[2].clone()],
1046                )
1047                .await;
1048
1049            // Wait enough time for targets to fail and retry multiple times
1050            // The fetch should not succeed because peer 4 (which has data) is not targeted
1051            select! {
1052                event = cons_out1.next() => {
1053                    panic!("Fetch should not succeed, but got: {event:?}");
1054                },
1055                _ = context.sleep(Duration::from_secs(3)) => {
1056                    // Expected: no success event because peer 4 is not targeted
1057                },
1058            };
1059        });
1060    }
1061
1062    #[test_traced]
1063    fn test_fetch_all_targeted() {
1064        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1065        executor.start(|context| async move {
1066            let (mut oracle, mut schemes, peers, mut connections) =
1067                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1068
1069            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1070            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1071            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1072
1073            let key1 = Key(1);
1074            let key2 = Key(2);
1075            let key3 = Key(3);
1076
1077            // Peer 2 has key1
1078            let mut prod2 = Producer::default();
1079            prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1080
1081            // Peer 3 has key3
1082            let mut prod3 = Producer::default();
1083            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1084
1085            // Peer 4 has key2
1086            let mut prod4 = Producer::default();
1087            prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1088
1089            // Consumer expects all three keys
1090            let (mut cons1, mut cons_out1) = Consumer::new();
1091            cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1092            cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1093            cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1094
1095            let scheme = schemes.remove(0);
1096            let mut mailbox1 = setup_and_spawn_actor(
1097                &context,
1098                oracle.manager(),
1099                oracle.control(scheme.public_key()),
1100                scheme,
1101                connections.remove(0),
1102                cons1,
1103                Producer::default(),
1104            )
1105            .await;
1106
1107            let scheme = schemes.remove(0);
1108            let _mailbox2 = setup_and_spawn_actor(
1109                &context,
1110                oracle.manager(),
1111                oracle.control(scheme.public_key()),
1112                scheme,
1113                connections.remove(0),
1114                Consumer::dummy(),
1115                prod2,
1116            )
1117            .await;
1118
1119            let scheme = schemes.remove(0);
1120            let _mailbox3 = setup_and_spawn_actor(
1121                &context,
1122                oracle.manager(),
1123                oracle.control(scheme.public_key()),
1124                scheme,
1125                connections.remove(0),
1126                Consumer::dummy(),
1127                prod3,
1128            )
1129            .await;
1130
1131            let scheme = schemes.remove(0);
1132            let _mailbox4 = setup_and_spawn_actor(
1133                &context,
1134                oracle.manager(),
1135                oracle.control(scheme.public_key()),
1136                scheme,
1137                connections.remove(0),
1138                Consumer::dummy(),
1139                prod4,
1140            )
1141            .await;
1142
1143            // Wait for peer set to be established
1144            context.sleep(Duration::from_millis(100)).await;
1145
1146            // Fetch keys with mixed targeting:
1147            // - key1 targeted to peer 2 (has data) -> should succeed from target
1148            // - key2 targeted to peer 4 (has data) -> should succeed from target
1149            // - key3 no targeting -> fetched from any peer (peer 3 has it)
1150            mailbox1
1151                .fetch_all_targeted(vec![
1152                    (key1.clone(), non_empty_vec![peers[1].clone()]), // peer 2 has key1
1153                    (key2.clone(), non_empty_vec![peers[3].clone()]), // peer 4 has key2
1154                ])
1155                .await;
1156            mailbox1.fetch(key3.clone()).await; // no targeting for key3
1157
1158            // Collect all three events
1159            let mut results = HashMap::new();
1160            for _ in 0..3 {
1161                let event = cons_out1.next().await.unwrap();
1162                match event {
1163                    Event::Success(key, value) => {
1164                        results.insert(key, value);
1165                    }
1166                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1167                }
1168            }
1169
1170            // Verify all keys received correct data
1171            assert_eq!(results.len(), 3);
1172            assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1173            assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1174            assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1175
1176            // Verify metrics: 3 successful fetches
1177            let metrics = context.encode();
1178            assert!(metrics.contains("_fetch_total{status=\"Success\"} 3"));
1179        });
1180    }
1181
1182    /// Tests that calling fetch() on an in-progress targeted fetch clears the targets,
1183    /// allowing the fetch to succeed from any available peer.
1184    #[test_traced]
1185    fn test_fetch_clears_targets() {
1186        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1187        executor.start(|context| async move {
1188            let (mut oracle, mut schemes, peers, mut connections) =
1189                setup_network_and_peers(&context, &[1, 2, 3]).await;
1190
1191            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1192            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1193
1194            let key = Key(1);
1195            let valid_data = Bytes::from("valid data");
1196
1197            // Peer 2 has no data, peer 3 has the data
1198            let mut prod3 = Producer::default();
1199            prod3.insert(key.clone(), valid_data.clone());
1200
1201            let (cons1, mut cons_out1) = Consumer::new();
1202
1203            let scheme = schemes.remove(0);
1204            let mut mailbox1 = setup_and_spawn_actor(
1205                &context,
1206                oracle.manager(),
1207                oracle.control(scheme.public_key()),
1208                scheme,
1209                connections.remove(0),
1210                cons1,
1211                Producer::default(),
1212            )
1213            .await;
1214
1215            let scheme = schemes.remove(0);
1216            let _mailbox2 = setup_and_spawn_actor(
1217                &context,
1218                oracle.manager(),
1219                oracle.control(scheme.public_key()),
1220                scheme,
1221                connections.remove(0),
1222                Consumer::dummy(),
1223                Producer::default(), // no data
1224            )
1225            .await;
1226
1227            let scheme = schemes.remove(0);
1228            let _mailbox3 = setup_and_spawn_actor(
1229                &context,
1230                oracle.manager(),
1231                oracle.control(scheme.public_key()),
1232                scheme,
1233                connections.remove(0),
1234                Consumer::dummy(),
1235                prod3,
1236            )
1237            .await;
1238
1239            // Wait for peer set to be established
1240            context.sleep(Duration::from_millis(100)).await;
1241
1242            // Start fetch with target for peer 2 only (who doesn't have data)
1243            mailbox1
1244                .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1245                .await;
1246
1247            // Wait for the targeted fetch to fail a few times
1248            context.sleep(Duration::from_millis(500)).await;
1249
1250            // Call fetch() which should clear the targets and allow fallback to any peer
1251            mailbox1.fetch(key.clone()).await;
1252
1253            // Should now succeed from peer 3 (who has data but wasn't originally targeted)
1254            let event = cons_out1.next().await.unwrap();
1255            match event {
1256                Event::Success(key_actual, value) => {
1257                    assert_eq!(key_actual, key);
1258                    assert_eq!(value, valid_data);
1259                }
1260                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1261            }
1262        });
1263    }
1264
1265    #[test_traced]
1266    fn test_fetch_targeted_does_not_restrict_all() {
1267        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1268        executor.start(|context| async move {
1269            let (mut oracle, mut schemes, peers, mut connections) =
1270                setup_network_and_peers(&context, &[1, 2, 3]).await;
1271
1272            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1273            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1274
1275            let key = Key(1);
1276            let valid_data = Bytes::from("valid data");
1277
1278            // Peer 2 has no data, peer 3 has the data
1279            let mut prod3 = Producer::default();
1280            prod3.insert(key.clone(), valid_data.clone());
1281
1282            let (cons1, mut cons_out1) = Consumer::new();
1283
1284            let scheme = schemes.remove(0);
1285            let mut mailbox1 = setup_and_spawn_actor(
1286                &context,
1287                oracle.manager(),
1288                oracle.control(scheme.public_key()),
1289                scheme,
1290                connections.remove(0),
1291                cons1,
1292                Producer::default(),
1293            )
1294            .await;
1295
1296            let scheme = schemes.remove(0);
1297            let _mailbox2 = setup_and_spawn_actor(
1298                &context,
1299                oracle.manager(),
1300                oracle.control(scheme.public_key()),
1301                scheme,
1302                connections.remove(0),
1303                Consumer::dummy(),
1304                Producer::default(), // no data
1305            )
1306            .await;
1307
1308            let scheme = schemes.remove(0);
1309            let _mailbox3 = setup_and_spawn_actor(
1310                &context,
1311                oracle.manager(),
1312                oracle.control(scheme.public_key()),
1313                scheme,
1314                connections.remove(0),
1315                Consumer::dummy(),
1316                prod3,
1317            )
1318            .await;
1319
1320            // Wait for peer set to be established
1321            context.sleep(Duration::from_millis(100)).await;
1322
1323            // Start fetch without targets (can try any peer)
1324            mailbox1.fetch(key.clone()).await;
1325
1326            // Wait a bit for the fetch to start
1327            context.sleep(Duration::from_millis(50)).await;
1328
1329            // Call fetch_targeted with peer 2 only (who doesn't have data)
1330            // This should NOT restrict the existing "all" fetch
1331            mailbox1
1332                .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1333                .await;
1334
1335            // Should still succeed from peer 3 (who has data but wasn't in the targeted call)
1336            // because the original fetch was "all" and shouldn't be restricted
1337            let event = cons_out1.next().await.unwrap();
1338            match event {
1339                Event::Success(key_actual, value) => {
1340                    assert_eq!(key_actual, key);
1341                    assert_eq!(value, valid_data);
1342                }
1343                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1344            }
1345        });
1346    }
1347
1348    #[test_traced]
1349    fn test_retain() {
1350        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1351        executor.start(|context| async move {
1352            let (mut oracle, mut schemes, peers, mut connections) =
1353                setup_network_and_peers(&context, &[1, 2]).await;
1354
1355            let key = Key(5);
1356            let mut prod2 = Producer::default();
1357            prod2.insert(key.clone(), Bytes::from("data for key 5"));
1358
1359            let (cons1, mut cons_out1) = Consumer::new();
1360
1361            let scheme = schemes.remove(0);
1362            let mut mailbox1 = setup_and_spawn_actor(
1363                &context,
1364                oracle.manager(),
1365                oracle.control(scheme.public_key()),
1366                scheme,
1367                connections.remove(0),
1368                cons1,
1369                Producer::default(),
1370            )
1371            .await;
1372
1373            let scheme = schemes.remove(0);
1374            let _mailbox2 = setup_and_spawn_actor(
1375                &context,
1376                oracle.manager(),
1377                oracle.control(scheme.public_key()),
1378                scheme,
1379                connections.remove(0),
1380                Consumer::dummy(),
1381                prod2,
1382            )
1383            .await;
1384
1385            // Retain before fetching should have no effect
1386            mailbox1.retain(|_| true).await;
1387            select! {
1388                _ = cons_out1.next() => { panic!("unexpected event"); },
1389                _ = context.sleep(Duration::from_millis(100)) => {},
1390            };
1391
1392            // Start a fetch (no link, so fetch stays in-flight with timer in fetch timers)
1393            mailbox1.fetch(key.clone()).await;
1394
1395            // Retain with predicate that excludes the key
1396            // This must clean up fetch timers entry for the key
1397            let key_clone = key.clone();
1398            mailbox1.retain(move |k| k != &key_clone).await;
1399
1400            // Consumer should receive failed event
1401            let event = cons_out1.next().await.unwrap();
1402            match event {
1403                Event::Failed(key_actual) => {
1404                    assert_eq!(key_actual, key);
1405                }
1406                Event::Success(_, _) => panic!("Fetch should have been retained out"),
1407            }
1408
1409            // Now add link so fetches can complete
1410            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1411
1412            // Fetch same key again, if fetch timers wasn't cleaned up, this would
1413            // be treated as a duplicate and silently ignored
1414            mailbox1.fetch(key.clone()).await;
1415
1416            // Should succeed
1417            let event = cons_out1.next().await.unwrap();
1418            match event {
1419                Event::Success(key_actual, value) => {
1420                    assert_eq!(key_actual, key);
1421                    assert_eq!(value, Bytes::from("data for key 5"));
1422                }
1423                Event::Failed(_) => unreachable!(),
1424            }
1425        });
1426    }
1427
1428    #[test_traced]
1429    fn test_clear() {
1430        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1431        executor.start(|context| async move {
1432            let (mut oracle, mut schemes, peers, mut connections) =
1433                setup_network_and_peers(&context, &[1, 2]).await;
1434
1435            // No link yet - fetch will stay in-flight
1436            let key = Key(6);
1437            let mut prod2 = Producer::default();
1438            prod2.insert(key.clone(), Bytes::from("data for key 6"));
1439
1440            let (cons1, mut cons_out1) = Consumer::new();
1441
1442            let scheme = schemes.remove(0);
1443            let mut mailbox1 = setup_and_spawn_actor(
1444                &context,
1445                oracle.manager(),
1446                oracle.control(scheme.public_key()),
1447                scheme,
1448                connections.remove(0),
1449                cons1,
1450                Producer::default(),
1451            )
1452            .await;
1453
1454            let scheme = schemes.remove(0);
1455            let _mailbox2 = setup_and_spawn_actor(
1456                &context,
1457                oracle.manager(),
1458                oracle.control(scheme.public_key()),
1459                scheme,
1460                connections.remove(0),
1461                Consumer::dummy(),
1462                prod2,
1463            )
1464            .await;
1465
1466            // Clear before fetching should have no effect
1467            mailbox1.clear().await;
1468            select! {
1469                _ = cons_out1.next() => { panic!("unexpected event"); },
1470                _ = context.sleep(Duration::from_millis(100)) => {},
1471            };
1472
1473            // Start a fetch (no link, so fetch stays in-flight with timer in fetch timers)
1474            mailbox1.fetch(key.clone()).await;
1475
1476            // Clear all fetches
1477            mailbox1.clear().await;
1478
1479            // Consumer should receive failed event
1480            let event = cons_out1.next().await.unwrap();
1481            match event {
1482                Event::Failed(key_actual) => {
1483                    assert_eq!(key_actual, key);
1484                }
1485                Event::Success(_, _) => panic!("Fetch should have been cleared"),
1486            }
1487
1488            // Now add link so fetches can complete
1489            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1490
1491            // Fetch same key again, if fetch_timers wasn't cleaned up, this would
1492            // be treated as a duplicate and silently ignored
1493            mailbox1.fetch(key.clone()).await;
1494
1495            // Should succeed
1496            let event = cons_out1.next().await.unwrap();
1497            match event {
1498                Event::Success(key_actual, value) => {
1499                    assert_eq!(key_actual, key);
1500                    assert_eq!(value, Bytes::from("data for key 6"));
1501                }
1502                Event::Failed(_) => unreachable!(),
1503            }
1504        });
1505    }
1506
1507    /// Tests that when a peer is rate-limited, the fetcher spills over to another peer.
1508    /// With 2 peers and rate limit of 1/sec each, 2 requests issued simultaneously should
1509    /// both complete immediately (one to each peer) without waiting for rate limit reset.
1510    #[test_traced]
1511    fn test_rate_limit_spillover() {
1512        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1513        executor.start(|context| async move {
1514            // Use a very restrictive rate limit: 1 request per second per peer
1515            let (mut oracle, mut schemes, peers, mut connections) =
1516                setup_network_and_peers_with_rate_limit(
1517                    &context,
1518                    &[1, 2, 3],
1519                    Quota::per_second(NZU32!(1)),
1520                )
1521                .await;
1522
1523            // Add links between peer 1 and both peer 2 and peer 3
1524            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1525            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1526
1527            // Both peer 2 and peer 3 have the same data
1528            let mut prod2 = Producer::default();
1529            let mut prod3 = Producer::default();
1530            prod2.insert(Key(0), Bytes::from("data for key 0"));
1531            prod2.insert(Key(1), Bytes::from("data for key 1"));
1532            prod3.insert(Key(0), Bytes::from("data for key 0"));
1533            prod3.insert(Key(1), Bytes::from("data for key 1"));
1534
1535            let (cons1, mut cons_out1) = Consumer::new();
1536
1537            // Set up peer 1 (the requester)
1538            let scheme = schemes.remove(0);
1539            let mut mailbox1 = setup_and_spawn_actor(
1540                &context,
1541                oracle.manager(),
1542                oracle.control(scheme.public_key()),
1543                scheme,
1544                connections.remove(0),
1545                cons1,
1546                Producer::default(),
1547            )
1548            .await;
1549
1550            // Set up peer 2 (has data)
1551            let scheme = schemes.remove(0);
1552            let _mailbox2 = setup_and_spawn_actor(
1553                &context,
1554                oracle.manager(),
1555                oracle.control(scheme.public_key()),
1556                scheme,
1557                connections.remove(0),
1558                Consumer::dummy(),
1559                prod2,
1560            )
1561            .await;
1562
1563            // Set up peer 3 (also has data)
1564            let scheme = schemes.remove(0);
1565            let _mailbox3 = setup_and_spawn_actor(
1566                &context,
1567                oracle.manager(),
1568                oracle.control(scheme.public_key()),
1569                scheme,
1570                connections.remove(0),
1571                Consumer::dummy(),
1572                prod3,
1573            )
1574            .await;
1575
1576            // Wait for peer set to be established
1577            context.sleep(Duration::from_millis(100)).await;
1578            let start = context.current();
1579
1580            // Issue 2 fetch requests rapidly
1581            // With rate limit of 1/sec per peer and 2 peers, both should complete
1582            // immediately via spill-over (one request to each peer)
1583            mailbox1.fetch(Key(0)).await;
1584            mailbox1.fetch(Key(1)).await;
1585
1586            // Collect results
1587            let mut results = HashMap::new();
1588            for _ in 0..2 {
1589                let event = cons_out1.next().await.unwrap();
1590                match event {
1591                    Event::Success(key, value) => {
1592                        results.insert(key.clone(), value);
1593                    }
1594                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1595                }
1596            }
1597
1598            // Verify both keys were fetched successfully
1599            assert_eq!(results.len(), 2);
1600            assert_eq!(
1601                results.get(&Key(0)).unwrap(),
1602                &Bytes::from("data for key 0")
1603            );
1604            assert_eq!(
1605                results.get(&Key(1)).unwrap(),
1606                &Bytes::from("data for key 1")
1607            );
1608
1609            // Verify it completed quickly (well under 1 second) - proves spill-over worked
1610            // Without spill-over, the second request would wait ~1 second for rate limit reset
1611            let elapsed = context.current().duration_since(start).unwrap();
1612            assert!(
1613                elapsed < Duration::from_millis(500),
1614                "Expected quick completion via spill-over, but took {elapsed:?}"
1615            );
1616        });
1617    }
1618
1619    /// Tests that rate limiting causes retries to eventually succeed after the rate limit resets.
1620    /// This test uses a single peer with a restrictive rate limit and verifies that
1621    /// fetches eventually complete after waiting for the rate limit to reset.
1622    #[test_traced]
1623    fn test_rate_limit_retry_after_reset() {
1624        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1625        executor.start(|context| async move {
1626            // Use a restrictive rate limit: 1 request per second
1627            let (mut oracle, mut schemes, peers, mut connections) =
1628                setup_network_and_peers_with_rate_limit(
1629                    &context,
1630                    &[1, 2],
1631                    Quota::per_second(NZU32!(1)),
1632                )
1633                .await;
1634
1635            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1636
1637            // Peer 2 has data for multiple keys
1638            let mut prod2 = Producer::default();
1639            prod2.insert(Key(1), Bytes::from("data for key 1"));
1640            prod2.insert(Key(2), Bytes::from("data for key 2"));
1641            prod2.insert(Key(3), Bytes::from("data for key 3"));
1642
1643            let (cons1, mut cons_out1) = Consumer::new();
1644
1645            let scheme = schemes.remove(0);
1646            let mut mailbox1 = setup_and_spawn_actor(
1647                &context,
1648                oracle.manager(),
1649                oracle.control(scheme.public_key()),
1650                scheme,
1651                connections.remove(0),
1652                cons1,
1653                Producer::default(),
1654            )
1655            .await;
1656
1657            let scheme = schemes.remove(0);
1658            let _mailbox2 = setup_and_spawn_actor(
1659                &context,
1660                oracle.manager(),
1661                oracle.control(scheme.public_key()),
1662                scheme,
1663                connections.remove(0),
1664                Consumer::dummy(),
1665                prod2,
1666            )
1667            .await;
1668
1669            // Wait for peer set to be established
1670            context.sleep(Duration::from_millis(100)).await;
1671            let start = context.current();
1672
1673            // Issue 3 fetch requests to a single peer with rate limit of 1/sec
1674            // Only 1 can be sent immediately, the others must wait for rate limit reset
1675            mailbox1.fetch(Key(1)).await;
1676            mailbox1.fetch(Key(2)).await;
1677            mailbox1.fetch(Key(3)).await;
1678
1679            // All 3 should eventually succeed (after rate limit resets)
1680            let mut results = HashMap::new();
1681            for _ in 0..3 {
1682                let event = cons_out1.next().await.unwrap();
1683                match event {
1684                    Event::Success(key, value) => {
1685                        results.insert(key.clone(), value);
1686                    }
1687                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1688                }
1689            }
1690
1691            assert_eq!(results.len(), 3);
1692            for i in 1..=3 {
1693                assert_eq!(
1694                    results.get(&Key(i)).unwrap(),
1695                    &Bytes::from(format!("data for key {}", i))
1696                );
1697            }
1698
1699            // Verify it took significant time due to rate limiting
1700            // With 3 requests at 1/sec to a single peer, requests 2 and 3 must wait
1701            // for rate limit resets (~1 second each), so total should be > 2 seconds
1702            let elapsed = context.current().duration_since(start).unwrap();
1703            assert!(
1704                elapsed > Duration::from_secs(2),
1705                "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
1706            );
1707        });
1708    }
1709
1710    /// Tests that the resolver never sends fetch requests to itself (me exclusion).
1711    /// Even when the local peer has the data in its producer, it should fetch from
1712    /// another peer instead.
1713    #[test_traced]
1714    fn test_self_exclusion() {
1715        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1716        executor.start(|context| async move {
1717            let (mut oracle, mut schemes, peers, mut connections) =
1718                setup_network_and_peers(&context, &[1, 2]).await;
1719
1720            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1721
1722            let key = Key(1);
1723            let data = Bytes::from("shared data");
1724
1725            // Both peers have the data - peer 1 (requester) and peer 2
1726            let mut prod1 = Producer::default();
1727            prod1.insert(key.clone(), data.clone());
1728            let mut prod2 = Producer::default();
1729            prod2.insert(key.clone(), data.clone());
1730
1731            let (cons1, mut cons_out1) = Consumer::new();
1732
1733            // Set up peer 1 with `me` set - it has the data but should NOT fetch from itself
1734            let scheme = schemes.remove(0);
1735            let mut mailbox1 = setup_and_spawn_actor(
1736                &context,
1737                oracle.manager(),
1738                oracle.control(scheme.public_key()),
1739                scheme,
1740                connections.remove(0),
1741                cons1,
1742                prod1, // peer 1 has the data
1743            )
1744            .await;
1745
1746            // Set up peer 2 - also has the data
1747            let scheme = schemes.remove(0);
1748            let _mailbox2 = setup_and_spawn_actor(
1749                &context,
1750                oracle.manager(),
1751                oracle.control(scheme.public_key()),
1752                scheme,
1753                connections.remove(0),
1754                Consumer::dummy(),
1755                prod2,
1756            )
1757            .await;
1758
1759            // Wait for peer set to be established
1760            context.sleep(Duration::from_millis(100)).await;
1761
1762            // Fetch the key - should get it from peer 2, not from self
1763            mailbox1.fetch(key.clone()).await;
1764
1765            // Should succeed (from peer 2)
1766            let event = cons_out1.next().await.unwrap();
1767            match event {
1768                Event::Success(key_actual, value) => {
1769                    assert_eq!(key_actual, key);
1770                    assert_eq!(value, data);
1771                }
1772                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1773            }
1774        });
1775    }
1776
1777    #[allow(clippy::type_complexity)]
1778    fn spawn_actors_with_handles(
1779        context: deterministic::Context,
1780        oracle: &Oracle<PublicKey, deterministic::Context>,
1781        schemes: Vec<PrivateKey>,
1782        connections: Vec<(
1783            Sender<PublicKey, deterministic::Context>,
1784            Receiver<PublicKey>,
1785        )>,
1786        consumers: Vec<Consumer<Key, Bytes>>,
1787        producers: Vec<Producer<Key, Bytes>>,
1788    ) -> (
1789        Vec<Mailbox<Key, PublicKey>>,
1790        Vec<commonware_runtime::Handle<()>>,
1791    ) {
1792        let actor_context = context.with_label("actor");
1793        let mut mailboxes = Vec::new();
1794        let mut handles = Vec::new();
1795
1796        for (idx, ((scheme, conn), (consumer, producer))) in schemes
1797            .into_iter()
1798            .zip(connections)
1799            .zip(consumers.into_iter().zip(producers))
1800            .enumerate()
1801        {
1802            let ctx = actor_context.with_label(&format!("peer_{idx}"));
1803            let public_key = scheme.public_key();
1804            let (engine, mailbox) = Engine::new(
1805                ctx,
1806                Config {
1807                    manager: oracle.manager(),
1808                    blocker: oracle.control(public_key.clone()),
1809                    consumer,
1810                    producer,
1811                    mailbox_size: MAILBOX_SIZE,
1812                    me: Some(public_key),
1813                    initial: INITIAL_DURATION,
1814                    timeout: TIMEOUT,
1815                    fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
1816                    priority_requests: false,
1817                    priority_responses: false,
1818                },
1819            );
1820            handles.push(engine.start(conn));
1821            mailboxes.push(mailbox);
1822        }
1823
1824        (mailboxes, handles)
1825    }
1826
1827    #[test_traced]
1828    fn test_operations_after_shutdown_do_not_panic() {
1829        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1830        executor.start(|context| async move {
1831            let (mut oracle, schemes, peers, connections) =
1832                setup_network_and_peers(&context, &[1, 2]).await;
1833
1834            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1835
1836            let key = Key(1);
1837            let mut prod2 = Producer::default();
1838            prod2.insert(key.clone(), Bytes::from("data for key 1"));
1839
1840            let (cons1, mut cons_out1) = Consumer::new();
1841
1842            let (mut mailboxes, handles) = spawn_actors_with_handles(
1843                context.clone(),
1844                &oracle,
1845                schemes,
1846                connections,
1847                vec![cons1, Consumer::dummy()],
1848                vec![Producer::default(), prod2],
1849            );
1850
1851            // Fetch to verify network is functional
1852            mailboxes[0].fetch(key.clone()).await;
1853            let event = cons_out1.next().await.unwrap();
1854            match event {
1855                Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1856                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1857            }
1858
1859            // Abort all actors
1860            for handle in handles {
1861                handle.abort();
1862            }
1863            context.sleep(Duration::from_millis(100)).await;
1864
1865            // All operations should not panic after shutdown
1866
1867            // Fetch should not panic
1868            let key2 = Key(2);
1869            mailboxes[0].fetch(key2.clone()).await;
1870
1871            // Cancel should not panic
1872            mailboxes[0].cancel(key2.clone()).await;
1873
1874            // Clear should not panic
1875            mailboxes[0].clear().await;
1876
1877            // Retain should not panic
1878            mailboxes[0].retain(|_| true).await;
1879
1880            // Fetch targeted should not panic
1881            mailboxes[0]
1882                .fetch_targeted(Key(3), non_empty_vec![peers[1].clone()])
1883                .await;
1884        });
1885    }
1886
1887    fn clean_shutdown(seed: u64) {
1888        let cfg = deterministic::Config::default()
1889            .with_seed(seed)
1890            .with_timeout(Some(Duration::from_secs(30)));
1891        let executor = deterministic::Runner::new(cfg);
1892        executor.start(|context| async move {
1893            let (mut oracle, schemes, peers, connections) =
1894                setup_network_and_peers(&context, &[1, 2]).await;
1895
1896            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1897
1898            let key = Key(1);
1899            let mut prod2 = Producer::default();
1900            prod2.insert(key.clone(), Bytes::from("data for key 1"));
1901
1902            let (cons1, mut cons_out1) = Consumer::new();
1903
1904            let (mut mailboxes, handles) = spawn_actors_with_handles(
1905                context.clone(),
1906                &oracle,
1907                schemes,
1908                connections,
1909                vec![cons1, Consumer::dummy()],
1910                vec![Producer::default(), prod2],
1911            );
1912
1913            // Allow tasks to start
1914            context.sleep(Duration::from_millis(100)).await;
1915
1916            // Count running tasks under the actor prefix
1917            let running_before = count_running_tasks(&context, "actor");
1918            assert!(
1919                running_before > 0,
1920                "at least one actor task should be running"
1921            );
1922
1923            // Verify network is functional
1924            mailboxes[0].fetch(key.clone()).await;
1925            let event = cons_out1.next().await.unwrap();
1926            match event {
1927                Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1928                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1929            }
1930
1931            // Abort all actors
1932            for handle in handles {
1933                handle.abort();
1934            }
1935            context.sleep(Duration::from_millis(100)).await;
1936
1937            // Verify all actor tasks are stopped
1938            let running_after = count_running_tasks(&context, "actor");
1939            assert_eq!(
1940                running_after, 0,
1941                "all actor tasks should be stopped, but {running_after} still running"
1942            );
1943        });
1944    }
1945
1946    #[test]
1947    fn test_clean_shutdown() {
1948        for seed in 0..25 {
1949            clean_shutdown(seed);
1950        }
1951    }
1952}