Skip to main content

commonware_resolver/p2p/
mod.rs

1//! Resolve data identified by a fixed-length key by using the P2P network.
2//!
3//! # Overview
4//!
5//! The `p2p` module enables resolving data by fixed-length keys in a P2P network. Central to the
6//! module is the `peer` actor which manages the fetch-request lifecycle. Its mailbox allows
7//! initiation and cancellation of fetch requests via the `Resolver` interface.
8//!
9//! The peer handles an arbitrarily large number of concurrent fetch requests by sending requests
10//! to other peers and processing their responses. It selects peers based on performance, retrying
11//! with another peer if one fails or provides invalid data. Requests persist until canceled or
12//! fulfilled, delivering data to the `Consumer` for verification.
13//!
14//! The `Consumer` checks data integrity and authenticity (critical in an adversarial environment)
15//! and returns `true` if valid, completing the fetch, or `false` to retry.
16//!
17//! The peer also serves data to other peers, forwarding network requests to the `Producer`. The
18//! `Producer` provides data asynchronously (e.g., from storage). If it fails, the peer sends an
19//! empty response, prompting the requester to retry elsewhere. Each message between peers contains
20//! an ID. Each request is sent with a unique ID, and each response includes the ID of the request
21//! it responds to.
22//!
23//! # Targeting
24//!
25//! Callers can restrict fetches to specific target peers using [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted).
26//! Only target peers are tried, there is no automatic fallback to other peers. Targets persist through
27//! transient failures (timeout, "no data" response, send failure) since the peer might be slow or
28//! receive the data later.
29//!
30//! While a fetch is in progress, callers can modify targeting:
31//! - [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted) adds peers to the existing target set
32//!   (only if the fetch already has targets, an "all" fetch remains unrestricted)
33//! - [`Resolver::fetch`](crate::Resolver::fetch) clears all targets, allowing fallback to any peer
34//!
35//! These modifications only apply to in-progress fetches. Once a fetch completes (success, cancel,
36//! or blocked peer), the targets for that key are cleared automatically.
37//!
38//! # Performance Considerations
39//!
40//! The peer supports arbitrarily many concurrent fetch requests, but resource usage generally
41//! depends on the rate-limiting configuration of the underlying P2P network.
42
43use bytes::Bytes;
44use commonware_utils::{channel::oneshot, Span};
45use std::future::Future;
46
47mod config;
48pub use config::Config;
49mod engine;
50pub use engine::Engine;
51mod fetcher;
52mod ingress;
53pub use ingress::Mailbox;
54mod metrics;
55mod wire;
56
57#[cfg(feature = "mocks")]
58pub mod mocks;
59
60/// Serves data requested by the network.
61pub trait Producer: Clone + Send + 'static {
62    /// Type used to uniquely identify data.
63    type Key: Span;
64
65    /// Serve a request received from the network.
66    fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
67}
68
69#[cfg(test)]
70mod tests {
71    use super::{
72        mocks::{Consumer, Event, Key, Producer},
73        Config, Engine, Mailbox,
74    };
75    use crate::Resolver;
76    use bytes::Bytes;
77    use commonware_cryptography::{
78        ed25519::{PrivateKey, PublicKey},
79        Signer,
80    };
81    use commonware_macros::{select, test_traced};
82    use commonware_p2p::{
83        simulated::{Link, Network, Oracle, Receiver, Sender},
84        Blocker, Manager, Provider,
85    };
86    use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
87    use commonware_utils::{non_empty_vec, NZU32};
88    use std::{collections::HashMap, num::NonZeroU32, time::Duration};
89
90    const MAILBOX_SIZE: usize = 1024;
91    const RATE_LIMIT: NonZeroU32 = NZU32!(10);
92    const INITIAL_DURATION: Duration = Duration::from_millis(100);
93    const TIMEOUT: Duration = Duration::from_millis(400);
94    const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
95    const LINK: Link = Link {
96        latency: Duration::from_millis(10),
97        jitter: Duration::from_millis(1),
98        success_rate: 1.0,
99    };
100    const LINK_UNRELIABLE: Link = Link {
101        latency: Duration::from_millis(10),
102        jitter: Duration::from_millis(1),
103        success_rate: 0.5,
104    };
105
106    async fn setup_network_and_peers(
107        context: &deterministic::Context,
108        peer_seeds: &[u64],
109    ) -> (
110        Oracle<PublicKey, deterministic::Context>,
111        Vec<PrivateKey>,
112        Vec<PublicKey>,
113        Vec<(
114            Sender<PublicKey, deterministic::Context>,
115            Receiver<PublicKey>,
116        )>,
117    ) {
118        setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
119            .await
120    }
121
122    async fn setup_network_and_peers_with_rate_limit(
123        context: &deterministic::Context,
124        peer_seeds: &[u64],
125        rate_limit: Quota,
126    ) -> (
127        Oracle<PublicKey, deterministic::Context>,
128        Vec<PrivateKey>,
129        Vec<PublicKey>,
130        Vec<(
131            Sender<PublicKey, deterministic::Context>,
132            Receiver<PublicKey>,
133        )>,
134    ) {
135        let (network, oracle) = Network::new(
136            context.with_label("network"),
137            commonware_p2p::simulated::Config {
138                max_size: 1024 * 1024,
139                disconnect_on_block: true,
140                tracked_peer_sets: Some(3),
141            },
142        );
143        network.start();
144
145        let schemes: Vec<PrivateKey> = peer_seeds
146            .iter()
147            .map(|seed| PrivateKey::from_seed(*seed))
148            .collect();
149        let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
150        let mut manager = oracle.manager();
151        manager.track(0, peers.clone().try_into().unwrap()).await;
152
153        let mut connections = Vec::new();
154        for peer in &peers {
155            let (sender, receiver) = oracle
156                .control(peer.clone())
157                .register(0, rate_limit)
158                .await
159                .unwrap();
160            connections.push((sender, receiver));
161        }
162
163        (oracle, schemes, peers, connections)
164    }
165
166    async fn add_link(
167        oracle: &mut Oracle<PublicKey, deterministic::Context>,
168        link: Link,
169        peers: &[PublicKey],
170        from: usize,
171        to: usize,
172    ) {
173        oracle
174            .add_link(peers[from].clone(), peers[to].clone(), link.clone())
175            .await
176            .unwrap();
177        oracle
178            .add_link(peers[to].clone(), peers[from].clone(), link)
179            .await
180            .unwrap();
181    }
182
183    async fn setup_and_spawn_actor(
184        context: &deterministic::Context,
185        provider: impl Provider<PublicKey = PublicKey>,
186        blocker: impl Blocker<PublicKey = PublicKey>,
187        signer: impl Signer<PublicKey = PublicKey>,
188        connection: (
189            Sender<PublicKey, deterministic::Context>,
190            Receiver<PublicKey>,
191        ),
192        consumer: Consumer<Key, Bytes>,
193        producer: Producer<Key, Bytes>,
194    ) -> Mailbox<Key, PublicKey> {
195        let public_key = signer.public_key();
196        let (engine, mailbox) = Engine::new(
197            context.with_label(&format!("actor_{public_key}")),
198            Config {
199                provider,
200                blocker,
201                consumer,
202                producer,
203                mailbox_size: MAILBOX_SIZE,
204                me: Some(public_key),
205                initial: INITIAL_DURATION,
206                timeout: TIMEOUT,
207                fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
208                priority_requests: false,
209                priority_responses: false,
210            },
211        );
212        engine.start(connection);
213
214        mailbox
215    }
216
217    /// Tests that fetching a key from another peer succeeds when data is available.
218    /// This test sets up two peers, where Peer 1 requests data that Peer 2 has,
219    /// and verifies that the data is correctly delivered to Peer 1's consumer.
220    #[test_traced]
221    fn test_fetch_success() {
222        let executor = deterministic::Runner::timed(Duration::from_secs(10));
223        executor.start(|context| async move {
224            let (mut oracle, mut schemes, peers, mut connections) =
225                setup_network_and_peers(&context, &[1, 2]).await;
226
227            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
228
229            let key = Key(2);
230            let mut prod2 = Producer::default();
231            prod2.insert(key.clone(), Bytes::from("data for key 2"));
232
233            let (cons1, mut cons_out1) = Consumer::new();
234
235            let scheme = schemes.remove(0);
236            let mut mailbox1 = setup_and_spawn_actor(
237                &context,
238                oracle.manager(),
239                oracle.control(scheme.public_key()),
240                scheme,
241                connections.remove(0),
242                cons1,
243                Producer::default(),
244            )
245            .await;
246
247            let scheme = schemes.remove(0);
248            let _mailbox2 = setup_and_spawn_actor(
249                &context,
250                oracle.manager(),
251                oracle.control(scheme.public_key()),
252                scheme,
253                connections.remove(0),
254                Consumer::dummy(),
255                prod2,
256            )
257            .await;
258
259            mailbox1.fetch(key.clone()).await;
260
261            let event = cons_out1.recv().await.unwrap();
262            match event {
263                Event::Success(key_actual, value) => {
264                    assert_eq!(key_actual, key);
265                    assert_eq!(value, Bytes::from("data for key 2"));
266                }
267                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
268            }
269        });
270    }
271
272    /// Tests that canceling a fetch request results in a failure event.
273    /// This test initiates a fetch request and immediately cancels it,
274    /// verifying that the consumer receives a failure notification instead of data.
275    #[test_traced]
276    fn test_cancel_fetch() {
277        let executor = deterministic::Runner::timed(Duration::from_secs(10));
278        executor.start(|context| async move {
279            let (oracle, mut schemes, _peers, mut connections) =
280                setup_network_and_peers(&context, &[1]).await;
281
282            let (cons1, mut cons_out1) = Consumer::new();
283            let prod1 = Producer::default();
284
285            let scheme = schemes.remove(0);
286            let mut mailbox1 = setup_and_spawn_actor(
287                &context,
288                oracle.manager(),
289                oracle.control(scheme.public_key()),
290                scheme,
291                connections.remove(0),
292                cons1,
293                prod1,
294            )
295            .await;
296
297            let key = Key(3);
298            mailbox1.fetch(key.clone()).await;
299            mailbox1.cancel(key.clone()).await;
300
301            let event = cons_out1.recv().await.unwrap();
302            match event {
303                Event::Failed(key_actual) => {
304                    assert_eq!(key_actual, key);
305                }
306                Event::Success(_, _) => panic!("Fetch should have been canceled"),
307            }
308        });
309    }
310
311    /// Tests fetching data from a peer when some peers lack the data.
312    /// This test sets up three peers, where Peer 1 requests data that only Peer 3 has.
313    /// It verifies that the resolver retries with another peer and successfully
314    /// delivers the data to Peer 1's consumer.
315    #[test_traced]
316    fn test_peer_no_data() {
317        let executor = deterministic::Runner::timed(Duration::from_secs(10));
318        executor.start(|context| async move {
319            let (mut oracle, mut schemes, peers, mut connections) =
320                setup_network_and_peers(&context, &[1, 2, 3]).await;
321
322            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
323            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
324
325            let prod1 = Producer::default();
326            let prod2 = Producer::default();
327            let mut prod3 = Producer::default();
328            let key = Key(3);
329            prod3.insert(key.clone(), Bytes::from("data for key 3"));
330
331            let (cons1, mut cons_out1) = Consumer::new();
332
333            let scheme = schemes.remove(0);
334            let mut mailbox1 = setup_and_spawn_actor(
335                &context,
336                oracle.manager(),
337                oracle.control(scheme.public_key()),
338                scheme,
339                connections.remove(0),
340                cons1,
341                prod1,
342            )
343            .await;
344
345            let scheme = schemes.remove(0);
346            let _mailbox2 = setup_and_spawn_actor(
347                &context,
348                oracle.manager(),
349                oracle.control(scheme.public_key()),
350                scheme,
351                connections.remove(0),
352                Consumer::dummy(),
353                prod2,
354            )
355            .await;
356
357            let scheme = schemes.remove(0);
358            let _mailbox3 = setup_and_spawn_actor(
359                &context,
360                oracle.manager(),
361                oracle.control(scheme.public_key()),
362                scheme,
363                connections.remove(0),
364                Consumer::dummy(),
365                prod3,
366            )
367            .await;
368
369            mailbox1.fetch(key.clone()).await;
370
371            let event = cons_out1.recv().await.unwrap();
372            match event {
373                Event::Success(key_actual, value) => {
374                    assert_eq!(key_actual, key);
375                    assert_eq!(value, Bytes::from("data for key 3"));
376                }
377                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
378            }
379        });
380    }
381
382    /// Tests fetching when no peers are available.
383    /// This test sets up a single peer with an empty peer provider (no peers).
384    /// It initiates a fetch, waits beyond the retry timeout, cancels the fetch,
385    /// and verifies that the consumer receives a failure notification.
386    #[test_traced]
387    fn test_no_peers_available() {
388        let executor = deterministic::Runner::timed(Duration::from_secs(10));
389        executor.start(|context| async move {
390            let (oracle, mut schemes, _peers, mut connections) =
391                setup_network_and_peers(&context, &[1]).await;
392
393            let (cons1, mut cons_out1) = Consumer::new();
394            let prod1 = Producer::default();
395
396            let scheme = schemes.remove(0);
397            let mut mailbox1 = setup_and_spawn_actor(
398                &context,
399                oracle.manager(),
400                oracle.control(scheme.public_key()),
401                scheme,
402                connections.remove(0),
403                cons1,
404                prod1,
405            )
406            .await;
407
408            let key = Key(4);
409            mailbox1.fetch(key.clone()).await;
410            context.sleep(Duration::from_secs(5)).await;
411            mailbox1.cancel(key.clone()).await;
412
413            let event = cons_out1.recv().await.expect("Consumer channel closed");
414            match event {
415                Event::Failed(key_actual) => {
416                    assert_eq!(key_actual, key);
417                }
418                Event::Success(_, _) => {
419                    panic!("Fetch should have failed due to no peers")
420                }
421            }
422        });
423    }
424
425    /// Tests that concurrent fetch requests are handled correctly.
426    /// Also tests that the peer can recover from having no peers available.
427    /// Also tests that the peer can get data from multiple peers that have different sets of data.
428    #[test_traced]
429    fn test_concurrent_fetch_requests() {
430        let executor = deterministic::Runner::timed(Duration::from_secs(60));
431        executor.start(|context| async move {
432            let (mut oracle, mut schemes, peers, mut connections) =
433                setup_network_and_peers(&context, &[1, 2, 3]).await;
434
435            let key2 = Key(2);
436            let key3 = Key(3);
437            let mut prod2 = Producer::default();
438            prod2.insert(key2.clone(), Bytes::from("data for key 2"));
439            let mut prod3 = Producer::default();
440            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
441
442            let (cons1, mut cons_out1) = Consumer::new();
443
444            let scheme = schemes.remove(0);
445            let mut mailbox1 = setup_and_spawn_actor(
446                &context,
447                oracle.manager(),
448                oracle.control(scheme.public_key()),
449                scheme,
450                connections.remove(0),
451                cons1,
452                Producer::default(),
453            )
454            .await;
455
456            let scheme = schemes.remove(0);
457            let _mailbox2 = setup_and_spawn_actor(
458                &context,
459                oracle.manager(),
460                oracle.control(scheme.public_key()),
461                scheme,
462                connections.remove(0),
463                Consumer::dummy(),
464                prod2,
465            )
466            .await;
467
468            let scheme = schemes.remove(0);
469            let _mailbox3 = setup_and_spawn_actor(
470                &context,
471                oracle.manager(),
472                oracle.control(scheme.public_key()),
473                scheme,
474                connections.remove(0),
475                Consumer::dummy(),
476                prod3,
477            )
478            .await;
479
480            // Add choppy links between the requester and the two producers
481            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
482            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
483
484            // Run the fetches multiple times to ensure that the peer tries both of its peers
485            for _ in 0..10 {
486                // Initiate concurrent fetch requests
487                mailbox1.fetch(key2.clone()).await;
488                mailbox1.fetch(key3.clone()).await;
489
490                // Collect both events without assuming order
491                let mut events = Vec::new();
492                events.push(cons_out1.recv().await.expect("Consumer channel closed"));
493                events.push(cons_out1.recv().await.expect("Consumer channel closed"));
494
495                // Check that both keys were successfully fetched
496                let mut found_key2 = false;
497                let mut found_key3 = false;
498                for event in events {
499                    match event {
500                        Event::Success(key_actual, value) => {
501                            if key_actual == key2 {
502                                assert_eq!(value, Bytes::from("data for key 2"));
503                                found_key2 = true;
504                            } else if key_actual == key3 {
505                                assert_eq!(value, Bytes::from("data for key 3"));
506                                found_key3 = true;
507                            } else {
508                                panic!("Unexpected key received");
509                            }
510                        }
511                        Event::Failed(_) => panic!("Fetch failed unexpectedly"),
512                    }
513                }
514                assert!(found_key2 && found_key3,);
515            }
516        });
517    }
518
519    /// Tests that canceling an inactive fetch request has no effect.
520    /// Cancels a request before, after, and during the fetch process,
521    #[test_traced]
522    fn test_cancel() {
523        let executor = deterministic::Runner::timed(Duration::from_secs(10));
524        executor.start(|context| async move {
525            let (mut oracle, mut schemes, peers, mut connections) =
526                setup_network_and_peers(&context, &[1, 2]).await;
527
528            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
529
530            let key = Key(6);
531            let mut prod2 = Producer::default();
532            prod2.insert(key.clone(), Bytes::from("data for key 6"));
533
534            let (cons1, mut cons_out1) = Consumer::new();
535
536            let scheme = schemes.remove(0);
537            let mut mailbox1 = setup_and_spawn_actor(
538                &context,
539                oracle.manager(),
540                oracle.control(scheme.public_key()),
541                scheme,
542                connections.remove(0),
543                cons1,
544                Producer::default(),
545            )
546            .await;
547
548            let scheme = schemes.remove(0);
549            let _mailbox2 = setup_and_spawn_actor(
550                &context,
551                oracle.manager(),
552                oracle.control(scheme.public_key()),
553                scheme,
554                connections.remove(0),
555                Consumer::dummy(),
556                prod2,
557            )
558            .await;
559
560            // Cancel before sending the fetch request, expecting no effect
561            mailbox1.cancel(key.clone()).await;
562            select! {
563                _ = cons_out1.recv() => {
564                    panic!("unexpected event");
565                },
566                _ = context.sleep(Duration::from_millis(100)) => {},
567            };
568
569            // Initiate fetch and wait for data to be delivered
570            mailbox1.fetch(key.clone()).await;
571            let event = cons_out1.recv().await.unwrap();
572            match event {
573                Event::Success(key_actual, value) => {
574                    assert_eq!(key_actual, key);
575                    assert_eq!(value, Bytes::from("data for key 6"));
576                }
577                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
578            }
579
580            // Attempt to cancel after data has been delivered, expecting no effect
581            mailbox1.cancel(key.clone()).await;
582            select! {
583                _ = cons_out1.recv() => {
584                    panic!("unexpected event");
585                },
586                _ = context.sleep(Duration::from_millis(100)) => {},
587            };
588
589            // Initiate and cancel another fetch request
590            let key = Key(7);
591            mailbox1.fetch(key.clone()).await;
592            mailbox1.cancel(key.clone()).await;
593
594            // Make sure we receive a failure event
595            let event = cons_out1.recv().await.unwrap();
596            match event {
597                Event::Failed(key_actual) => {
598                    assert_eq!(key_actual, key);
599                }
600                Event::Success(_, _) => panic!("Fetch should have been canceled"),
601            }
602        });
603    }
604
605    /// Tests that a peer is blocked after delivering invalid data,
606    /// preventing further fetches from that peer.
607    #[test_traced]
608    fn test_blocking_peer() {
609        let executor = deterministic::Runner::timed(Duration::from_secs(10));
610        executor.start(|context| async move {
611            let (mut oracle, mut schemes, peers, mut connections) =
612                setup_network_and_peers(&context, &[1, 2, 3]).await;
613
614            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
615            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
616            add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
617
618            let key_a = Key(1);
619            let key_b = Key(2);
620            let invalid_data_a = Bytes::from("invalid for A");
621            let valid_data_a = Bytes::from("valid for A");
622            let valid_data_b = Bytes::from("valid for B");
623
624            // Set up producers
625            let mut prod2 = Producer::default();
626            prod2.insert(key_a.clone(), invalid_data_a.clone());
627            prod2.insert(key_b.clone(), valid_data_b.clone());
628
629            let mut prod3 = Producer::default();
630            prod3.insert(key_a.clone(), valid_data_a.clone());
631
632            // Set up consumer for Peer1 with expected values
633            let (mut cons1, mut cons_out1) = Consumer::new();
634            cons1.add_expected(key_a.clone(), valid_data_a.clone());
635            cons1.add_expected(key_b.clone(), valid_data_b.clone());
636
637            // Spawn actors
638            let scheme = schemes.remove(0);
639            let mut mailbox1 = setup_and_spawn_actor(
640                &context,
641                oracle.manager(),
642                oracle.control(scheme.public_key()),
643                scheme,
644                connections.remove(0),
645                cons1,
646                Producer::default(),
647            )
648            .await;
649
650            let scheme = schemes.remove(0);
651            let _mailbox2 = setup_and_spawn_actor(
652                &context,
653                oracle.manager(),
654                oracle.control(scheme.public_key()),
655                scheme,
656                connections.remove(0),
657                Consumer::dummy(),
658                prod2,
659            )
660            .await;
661
662            let scheme = schemes.remove(0);
663            let _mailbox3 = setup_and_spawn_actor(
664                &context,
665                oracle.manager(),
666                oracle.control(scheme.public_key()),
667                scheme,
668                connections.remove(0),
669                Consumer::dummy(),
670                prod3,
671            )
672            .await;
673
674            // Fetch keyA multiple times to ensure that Peer2 is blocked.
675            for _ in 0..20 {
676                // Fetch keyA
677                mailbox1.fetch(key_a.clone()).await;
678
679                // Wait for success event for keyA
680                let event = cons_out1.recv().await.unwrap();
681                match event {
682                    Event::Success(key_actual, value) => {
683                        assert_eq!(key_actual, key_a);
684                        assert_eq!(value, valid_data_a);
685                    }
686                    Event::Failed(_) => panic!("Fetch failed unexpectedly"),
687                }
688            }
689
690            // Fetch keyB
691            mailbox1.fetch(key_b.clone()).await;
692
693            // Wait for some time (longer than retry timeout)
694            context.sleep(Duration::from_secs(5)).await;
695
696            // Cancel the fetch for keyB
697            mailbox1.cancel(key_b.clone()).await;
698
699            // Wait for failure event for keyB
700            let event = cons_out1.recv().await.unwrap();
701            match event {
702                Event::Failed(key_actual) => {
703                    assert_eq!(key_actual, key_b);
704                }
705                Event::Success(_, _) => panic!("Fetch should have been canceled"),
706            }
707
708            // Check oracle
709            let blocked = oracle.blocked().await.unwrap();
710            assert_eq!(blocked.len(), 1);
711            assert_eq!(blocked[0].0, peers[0]);
712            assert_eq!(blocked[0].1, peers[1]);
713        });
714    }
715
716    /// Tests that duplicate fetch requests for the same key are handled properly.
717    /// The test verifies that when the same key is requested multiple times,
718    /// the data is correctly delivered once without errors.
719    #[test_traced]
720    fn test_duplicate_fetch_request() {
721        let executor = deterministic::Runner::timed(Duration::from_secs(10));
722        executor.start(|context| async move {
723            let (mut oracle, mut schemes, peers, mut connections) =
724                setup_network_and_peers(&context, &[1, 2]).await;
725
726            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
727
728            let key = Key(5);
729            let mut prod2 = Producer::default();
730            prod2.insert(key.clone(), Bytes::from("data for key 5"));
731
732            let (cons1, mut cons_out1) = Consumer::new();
733
734            let scheme = schemes.remove(0);
735            let mut mailbox1 = setup_and_spawn_actor(
736                &context,
737                oracle.manager(),
738                oracle.control(scheme.public_key()),
739                scheme,
740                connections.remove(0),
741                cons1,
742                Producer::default(),
743            )
744            .await;
745
746            let scheme = schemes.remove(0);
747            let _mailbox2 = setup_and_spawn_actor(
748                &context,
749                oracle.manager(),
750                oracle.control(scheme.public_key()),
751                scheme,
752                connections.remove(0),
753                Consumer::dummy(),
754                prod2,
755            )
756            .await;
757
758            // Send duplicate fetch requests for the same key
759            mailbox1.fetch(key.clone()).await;
760            mailbox1.fetch(key.clone()).await;
761
762            // Should receive the data only once
763            let event = cons_out1.recv().await.unwrap();
764            match event {
765                Event::Success(key_actual, value) => {
766                    assert_eq!(key_actual, key);
767                    assert_eq!(value, Bytes::from("data for key 5"));
768                }
769                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
770            }
771
772            // Make sure we don't receive a second event for the duplicate fetch
773            select! {
774                _ = cons_out1.recv() => {
775                    panic!("Unexpected second event received for duplicate fetch");
776                },
777                _ = context.sleep(Duration::from_millis(500)) => {
778                    // This is expected - no additional events should be produced
779                },
780            };
781        });
782    }
783
784    /// Tests that changing peer sets is handled correctly using the update channel.
785    /// This test verifies that when the peer set changes from peer A to peer B,
786    /// the resolver correctly adapts and fetches from the new peer.
787    #[test_traced]
788    fn test_changing_peer_sets() {
789        let executor = deterministic::Runner::timed(Duration::from_secs(10));
790        executor.start(|context| async move {
791            let (mut oracle, mut schemes, peers, mut connections) =
792                setup_network_and_peers(&context, &[1, 2, 3]).await;
793
794            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
795            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
796
797            let key1 = Key(1);
798            let key2 = Key(2);
799
800            let mut prod2 = Producer::default();
801            prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
802
803            let mut prod3 = Producer::default();
804            prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
805
806            let (cons1, mut cons_out1) = Consumer::new();
807
808            let scheme = schemes.remove(0);
809            let mut mailbox1 = setup_and_spawn_actor(
810                &context,
811                oracle.manager(),
812                oracle.control(scheme.public_key()),
813                scheme,
814                connections.remove(0),
815                cons1,
816                Producer::default(),
817            )
818            .await;
819
820            let scheme = schemes.remove(0);
821            let _mailbox2 = setup_and_spawn_actor(
822                &context,
823                oracle.manager(),
824                oracle.control(scheme.public_key()),
825                scheme,
826                connections.remove(0),
827                Consumer::dummy(),
828                prod2,
829            )
830            .await;
831
832            // Fetch key1 from peer 2
833            mailbox1.fetch(key1.clone()).await;
834
835            // Wait for successful fetch
836            let event = cons_out1.recv().await.unwrap();
837            match event {
838                Event::Success(key_actual, value) => {
839                    assert_eq!(key_actual, key1);
840                    assert_eq!(value, Bytes::from("data from peer 2"));
841                }
842                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
843            }
844
845            // Change peer set to include peer 3
846            let scheme = schemes.remove(0);
847            let _mailbox3 = setup_and_spawn_actor(
848                &context,
849                oracle.manager(),
850                oracle.control(scheme.public_key()),
851                scheme,
852                connections.remove(0),
853                Consumer::dummy(),
854                prod3,
855            )
856            .await;
857
858            // Need to wait for the peer set change to propagate
859            context.sleep(Duration::from_millis(200)).await;
860
861            // Fetch key2 from peer 3
862            mailbox1.fetch(key2.clone()).await;
863
864            // Wait for successful fetch
865            let event = cons_out1.recv().await.unwrap();
866            match event {
867                Event::Success(key_actual, value) => {
868                    assert_eq!(key_actual, key2);
869                    assert_eq!(value, Bytes::from("data from peer 3"));
870                }
871                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
872            }
873        });
874    }
875
876    #[test_traced]
877    fn test_fetch_targeted() {
878        let executor = deterministic::Runner::timed(Duration::from_secs(10));
879        executor.start(|context| async move {
880            let (mut oracle, mut schemes, peers, mut connections) =
881                setup_network_and_peers(&context, &[1, 2, 3]).await;
882
883            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
884            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
885
886            let key = Key(1);
887            let invalid_data = Bytes::from("invalid data");
888            let valid_data = Bytes::from("valid data");
889
890            // Peer 2 has invalid data, peer 3 has valid data
891            let mut prod2 = Producer::default();
892            prod2.insert(key.clone(), invalid_data.clone());
893
894            let mut prod3 = Producer::default();
895            prod3.insert(key.clone(), valid_data.clone());
896
897            // Consumer expects only valid_data
898            let (mut cons1, mut cons_out1) = Consumer::new();
899            cons1.add_expected(key.clone(), valid_data.clone());
900
901            let scheme = schemes.remove(0);
902            let mut mailbox1 = setup_and_spawn_actor(
903                &context,
904                oracle.manager(),
905                oracle.control(scheme.public_key()),
906                scheme,
907                connections.remove(0),
908                cons1,
909                Producer::default(),
910            )
911            .await;
912
913            let scheme = schemes.remove(0);
914            let _mailbox2 = setup_and_spawn_actor(
915                &context,
916                oracle.manager(),
917                oracle.control(scheme.public_key()),
918                scheme,
919                connections.remove(0),
920                Consumer::dummy(),
921                prod2,
922            )
923            .await;
924
925            let scheme = schemes.remove(0);
926            let _mailbox3 = setup_and_spawn_actor(
927                &context,
928                oracle.manager(),
929                oracle.control(scheme.public_key()),
930                scheme,
931                connections.remove(0),
932                Consumer::dummy(),
933                prod3,
934            )
935            .await;
936
937            // Wait for peer set to be established
938            context.sleep(Duration::from_millis(100)).await;
939
940            // Start fetch with targets for both peer 2 (invalid data) and peer 3 (valid data)
941            // When peer 2 returns invalid data, only peer 2 should be removed from targets
942            // Peer 3 should still be tried as a target and succeed
943            mailbox1
944                .fetch_targeted(
945                    key.clone(),
946                    non_empty_vec![peers[1].clone(), peers[2].clone()],
947                )
948                .await;
949
950            // Should eventually succeed from peer 3
951            let event = cons_out1.recv().await.unwrap();
952            match event {
953                Event::Success(key_actual, value) => {
954                    assert_eq!(key_actual, key);
955                    assert_eq!(value, valid_data);
956                }
957                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
958            }
959
960            // Verify peer 2 was blocked (sent invalid data)
961            let blocked = oracle.blocked().await.unwrap();
962            assert_eq!(blocked.len(), 1);
963            assert_eq!(blocked[0].0, peers[0]);
964            assert_eq!(blocked[0].1, peers[1]);
965
966            // Verify metrics: 1 successful fetch (from peer 3 after peer 2 was blocked)
967            let metrics = context.encode();
968            assert!(metrics.contains("_fetch_total{status=\"Success\"} 1"));
969        });
970    }
971
972    #[test_traced]
973    fn test_fetch_targeted_no_fallback() {
974        let executor = deterministic::Runner::timed(Duration::from_secs(10));
975        executor.start(|context| async move {
976            let (mut oracle, mut schemes, peers, mut connections) =
977                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
978
979            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
980            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
981            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
982
983            let key = Key(1);
984
985            // Only peer 4 has the data, peers 2 and 3 don't
986            let mut prod4 = Producer::default();
987            prod4.insert(key.clone(), Bytes::from("data from peer 4"));
988
989            let (cons1, mut cons_out1) = Consumer::new();
990
991            let scheme = schemes.remove(0);
992            let mut mailbox1 = setup_and_spawn_actor(
993                &context,
994                oracle.manager(),
995                oracle.control(scheme.public_key()),
996                scheme,
997                connections.remove(0),
998                cons1,
999                Producer::default(),
1000            )
1001            .await;
1002
1003            let scheme = schemes.remove(0);
1004            let _mailbox2 = setup_and_spawn_actor(
1005                &context,
1006                oracle.manager(),
1007                oracle.control(scheme.public_key()),
1008                scheme,
1009                connections.remove(0),
1010                Consumer::dummy(),
1011                Producer::default(), // no data
1012            )
1013            .await;
1014
1015            let scheme = schemes.remove(0);
1016            let _mailbox3 = setup_and_spawn_actor(
1017                &context,
1018                oracle.manager(),
1019                oracle.control(scheme.public_key()),
1020                scheme,
1021                connections.remove(0),
1022                Consumer::dummy(),
1023                Producer::default(), // no data
1024            )
1025            .await;
1026
1027            let scheme = schemes.remove(0);
1028            let _mailbox4 = setup_and_spawn_actor(
1029                &context,
1030                oracle.manager(),
1031                oracle.control(scheme.public_key()),
1032                scheme,
1033                connections.remove(0),
1034                Consumer::dummy(),
1035                prod4,
1036            )
1037            .await;
1038
1039            // Wait for peer set to be established
1040            context.sleep(Duration::from_millis(100)).await;
1041
1042            // Start fetch with targets for peers 2 and 3 (both don't have data)
1043            // Peer 4 has data but is NOT a target - it should NEVER be tried
1044            mailbox1
1045                .fetch_targeted(
1046                    key.clone(),
1047                    non_empty_vec![peers[1].clone(), peers[2].clone()],
1048                )
1049                .await;
1050
1051            // Wait enough time for targets to fail and retry multiple times
1052            // The fetch should not succeed because peer 4 (which has data) is not targeted
1053            select! {
1054                event = cons_out1.recv() => {
1055                    panic!("Fetch should not succeed, but got: {event:?}");
1056                },
1057                _ = context.sleep(Duration::from_secs(3)) => {
1058                    // Expected: no success event because peer 4 is not targeted
1059                },
1060            };
1061        });
1062    }
1063
1064    #[test_traced]
1065    fn test_fetch_all_targeted() {
1066        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1067        executor.start(|context| async move {
1068            let (mut oracle, mut schemes, peers, mut connections) =
1069                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1070
1071            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1072            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1073            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1074
1075            let key1 = Key(1);
1076            let key2 = Key(2);
1077            let key3 = Key(3);
1078
1079            // Peer 2 has key1
1080            let mut prod2 = Producer::default();
1081            prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1082
1083            // Peer 3 has key3
1084            let mut prod3 = Producer::default();
1085            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1086
1087            // Peer 4 has key2
1088            let mut prod4 = Producer::default();
1089            prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1090
1091            // Consumer expects all three keys
1092            let (mut cons1, mut cons_out1) = Consumer::new();
1093            cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1094            cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1095            cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1096
1097            let scheme = schemes.remove(0);
1098            let mut mailbox1 = setup_and_spawn_actor(
1099                &context,
1100                oracle.manager(),
1101                oracle.control(scheme.public_key()),
1102                scheme,
1103                connections.remove(0),
1104                cons1,
1105                Producer::default(),
1106            )
1107            .await;
1108
1109            let scheme = schemes.remove(0);
1110            let _mailbox2 = setup_and_spawn_actor(
1111                &context,
1112                oracle.manager(),
1113                oracle.control(scheme.public_key()),
1114                scheme,
1115                connections.remove(0),
1116                Consumer::dummy(),
1117                prod2,
1118            )
1119            .await;
1120
1121            let scheme = schemes.remove(0);
1122            let _mailbox3 = setup_and_spawn_actor(
1123                &context,
1124                oracle.manager(),
1125                oracle.control(scheme.public_key()),
1126                scheme,
1127                connections.remove(0),
1128                Consumer::dummy(),
1129                prod3,
1130            )
1131            .await;
1132
1133            let scheme = schemes.remove(0);
1134            let _mailbox4 = setup_and_spawn_actor(
1135                &context,
1136                oracle.manager(),
1137                oracle.control(scheme.public_key()),
1138                scheme,
1139                connections.remove(0),
1140                Consumer::dummy(),
1141                prod4,
1142            )
1143            .await;
1144
1145            // Wait for peer set to be established
1146            context.sleep(Duration::from_millis(100)).await;
1147
1148            // Fetch keys with mixed targeting:
1149            // - key1 targeted to peer 2 (has data) -> should succeed from target
1150            // - key2 targeted to peer 4 (has data) -> should succeed from target
1151            // - key3 no targeting -> fetched from any peer (peer 3 has it)
1152            mailbox1
1153                .fetch_all_targeted(vec![
1154                    (key1.clone(), non_empty_vec![peers[1].clone()]), // peer 2 has key1
1155                    (key2.clone(), non_empty_vec![peers[3].clone()]), // peer 4 has key2
1156                ])
1157                .await;
1158            mailbox1.fetch(key3.clone()).await; // no targeting for key3
1159
1160            // Collect all three events
1161            let mut results = HashMap::new();
1162            for _ in 0..3 {
1163                let event = cons_out1.recv().await.unwrap();
1164                match event {
1165                    Event::Success(key, value) => {
1166                        results.insert(key, value);
1167                    }
1168                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1169                }
1170            }
1171
1172            // Verify all keys received correct data
1173            assert_eq!(results.len(), 3);
1174            assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1175            assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1176            assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1177
1178            // Verify metrics: 3 successful fetches
1179            let metrics = context.encode();
1180            assert!(metrics.contains("_fetch_total{status=\"Success\"} 3"));
1181        });
1182    }
1183
1184    /// Tests that calling fetch() on an in-progress targeted fetch clears the targets,
1185    /// allowing the fetch to succeed from any available peer.
1186    #[test_traced]
1187    fn test_fetch_clears_targets() {
1188        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1189        executor.start(|context| async move {
1190            let (mut oracle, mut schemes, peers, mut connections) =
1191                setup_network_and_peers(&context, &[1, 2, 3]).await;
1192
1193            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1194            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1195
1196            let key = Key(1);
1197            let valid_data = Bytes::from("valid data");
1198
1199            // Peer 2 has no data, peer 3 has the data
1200            let mut prod3 = Producer::default();
1201            prod3.insert(key.clone(), valid_data.clone());
1202
1203            let (cons1, mut cons_out1) = Consumer::new();
1204
1205            let scheme = schemes.remove(0);
1206            let mut mailbox1 = setup_and_spawn_actor(
1207                &context,
1208                oracle.manager(),
1209                oracle.control(scheme.public_key()),
1210                scheme,
1211                connections.remove(0),
1212                cons1,
1213                Producer::default(),
1214            )
1215            .await;
1216
1217            let scheme = schemes.remove(0);
1218            let _mailbox2 = setup_and_spawn_actor(
1219                &context,
1220                oracle.manager(),
1221                oracle.control(scheme.public_key()),
1222                scheme,
1223                connections.remove(0),
1224                Consumer::dummy(),
1225                Producer::default(), // no data
1226            )
1227            .await;
1228
1229            let scheme = schemes.remove(0);
1230            let _mailbox3 = setup_and_spawn_actor(
1231                &context,
1232                oracle.manager(),
1233                oracle.control(scheme.public_key()),
1234                scheme,
1235                connections.remove(0),
1236                Consumer::dummy(),
1237                prod3,
1238            )
1239            .await;
1240
1241            // Wait for peer set to be established
1242            context.sleep(Duration::from_millis(100)).await;
1243
1244            // Start fetch with target for peer 2 only (who doesn't have data)
1245            mailbox1
1246                .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1247                .await;
1248
1249            // Wait for the targeted fetch to fail a few times
1250            context.sleep(Duration::from_millis(500)).await;
1251
1252            // Call fetch() which should clear the targets and allow fallback to any peer
1253            mailbox1.fetch(key.clone()).await;
1254
1255            // Should now succeed from peer 3 (who has data but wasn't originally targeted)
1256            let event = cons_out1.recv().await.unwrap();
1257            match event {
1258                Event::Success(key_actual, value) => {
1259                    assert_eq!(key_actual, key);
1260                    assert_eq!(value, valid_data);
1261                }
1262                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1263            }
1264        });
1265    }
1266
1267    #[test_traced]
1268    fn test_fetch_targeted_does_not_restrict_all() {
1269        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1270        executor.start(|context| async move {
1271            let (mut oracle, mut schemes, peers, mut connections) =
1272                setup_network_and_peers(&context, &[1, 2, 3]).await;
1273
1274            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1275            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1276
1277            let key = Key(1);
1278            let valid_data = Bytes::from("valid data");
1279
1280            // Peer 2 has no data, peer 3 has the data
1281            let mut prod3 = Producer::default();
1282            prod3.insert(key.clone(), valid_data.clone());
1283
1284            let (cons1, mut cons_out1) = Consumer::new();
1285
1286            let scheme = schemes.remove(0);
1287            let mut mailbox1 = setup_and_spawn_actor(
1288                &context,
1289                oracle.manager(),
1290                oracle.control(scheme.public_key()),
1291                scheme,
1292                connections.remove(0),
1293                cons1,
1294                Producer::default(),
1295            )
1296            .await;
1297
1298            let scheme = schemes.remove(0);
1299            let _mailbox2 = setup_and_spawn_actor(
1300                &context,
1301                oracle.manager(),
1302                oracle.control(scheme.public_key()),
1303                scheme,
1304                connections.remove(0),
1305                Consumer::dummy(),
1306                Producer::default(), // no data
1307            )
1308            .await;
1309
1310            let scheme = schemes.remove(0);
1311            let _mailbox3 = setup_and_spawn_actor(
1312                &context,
1313                oracle.manager(),
1314                oracle.control(scheme.public_key()),
1315                scheme,
1316                connections.remove(0),
1317                Consumer::dummy(),
1318                prod3,
1319            )
1320            .await;
1321
1322            // Wait for peer set to be established
1323            context.sleep(Duration::from_millis(100)).await;
1324
1325            // Start fetch without targets (can try any peer)
1326            mailbox1.fetch(key.clone()).await;
1327
1328            // Wait a bit for the fetch to start
1329            context.sleep(Duration::from_millis(50)).await;
1330
1331            // Call fetch_targeted with peer 2 only (who doesn't have data)
1332            // This should NOT restrict the existing "all" fetch
1333            mailbox1
1334                .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1335                .await;
1336
1337            // Should still succeed from peer 3 (who has data but wasn't in the targeted call)
1338            // because the original fetch was "all" and shouldn't be restricted
1339            let event = cons_out1.recv().await.unwrap();
1340            match event {
1341                Event::Success(key_actual, value) => {
1342                    assert_eq!(key_actual, key);
1343                    assert_eq!(value, valid_data);
1344                }
1345                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1346            }
1347        });
1348    }
1349
1350    #[test_traced]
1351    fn test_retain() {
1352        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1353        executor.start(|context| async move {
1354            let (mut oracle, mut schemes, peers, mut connections) =
1355                setup_network_and_peers(&context, &[1, 2]).await;
1356
1357            let key = Key(5);
1358            let mut prod2 = Producer::default();
1359            prod2.insert(key.clone(), Bytes::from("data for key 5"));
1360
1361            let (cons1, mut cons_out1) = Consumer::new();
1362
1363            let scheme = schemes.remove(0);
1364            let mut mailbox1 = setup_and_spawn_actor(
1365                &context,
1366                oracle.manager(),
1367                oracle.control(scheme.public_key()),
1368                scheme,
1369                connections.remove(0),
1370                cons1,
1371                Producer::default(),
1372            )
1373            .await;
1374
1375            let scheme = schemes.remove(0);
1376            let _mailbox2 = setup_and_spawn_actor(
1377                &context,
1378                oracle.manager(),
1379                oracle.control(scheme.public_key()),
1380                scheme,
1381                connections.remove(0),
1382                Consumer::dummy(),
1383                prod2,
1384            )
1385            .await;
1386
1387            // Retain before fetching should have no effect
1388            mailbox1.retain(|_| true).await;
1389            select! {
1390                _ = cons_out1.recv() => {
1391                    panic!("unexpected event");
1392                },
1393                _ = context.sleep(Duration::from_millis(100)) => {},
1394            };
1395
1396            // Start a fetch (no link, so fetch stays in-flight with timer in fetch timers)
1397            mailbox1.fetch(key.clone()).await;
1398
1399            // Retain with predicate that excludes the key
1400            // This must clean up fetch timers entry for the key
1401            let key_clone = key.clone();
1402            mailbox1.retain(move |k| k != &key_clone).await;
1403
1404            // Consumer should receive failed event
1405            let event = cons_out1.recv().await.unwrap();
1406            match event {
1407                Event::Failed(key_actual) => {
1408                    assert_eq!(key_actual, key);
1409                }
1410                Event::Success(_, _) => panic!("Fetch should have been retained out"),
1411            }
1412
1413            // Now add link so fetches can complete
1414            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1415
1416            // Fetch same key again, if fetch timers wasn't cleaned up, this would
1417            // be treated as a duplicate and silently ignored
1418            mailbox1.fetch(key.clone()).await;
1419
1420            // Should succeed
1421            let event = cons_out1.recv().await.unwrap();
1422            match event {
1423                Event::Success(key_actual, value) => {
1424                    assert_eq!(key_actual, key);
1425                    assert_eq!(value, Bytes::from("data for key 5"));
1426                }
1427                Event::Failed(_) => unreachable!(),
1428            }
1429        });
1430    }
1431
1432    #[test_traced]
1433    fn test_clear() {
1434        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1435        executor.start(|context| async move {
1436            let (mut oracle, mut schemes, peers, mut connections) =
1437                setup_network_and_peers(&context, &[1, 2]).await;
1438
1439            // No link yet - fetch will stay in-flight
1440            let key = Key(6);
1441            let mut prod2 = Producer::default();
1442            prod2.insert(key.clone(), Bytes::from("data for key 6"));
1443
1444            let (cons1, mut cons_out1) = Consumer::new();
1445
1446            let scheme = schemes.remove(0);
1447            let mut mailbox1 = setup_and_spawn_actor(
1448                &context,
1449                oracle.manager(),
1450                oracle.control(scheme.public_key()),
1451                scheme,
1452                connections.remove(0),
1453                cons1,
1454                Producer::default(),
1455            )
1456            .await;
1457
1458            let scheme = schemes.remove(0);
1459            let _mailbox2 = setup_and_spawn_actor(
1460                &context,
1461                oracle.manager(),
1462                oracle.control(scheme.public_key()),
1463                scheme,
1464                connections.remove(0),
1465                Consumer::dummy(),
1466                prod2,
1467            )
1468            .await;
1469
1470            // Clear before fetching should have no effect
1471            mailbox1.clear().await;
1472            select! {
1473                _ = cons_out1.recv() => {
1474                    panic!("unexpected event");
1475                },
1476                _ = context.sleep(Duration::from_millis(100)) => {},
1477            };
1478
1479            // Start a fetch (no link, so fetch stays in-flight with timer in fetch timers)
1480            mailbox1.fetch(key.clone()).await;
1481
1482            // Clear all fetches
1483            mailbox1.clear().await;
1484
1485            // Consumer should receive failed event
1486            let event = cons_out1.recv().await.unwrap();
1487            match event {
1488                Event::Failed(key_actual) => {
1489                    assert_eq!(key_actual, key);
1490                }
1491                Event::Success(_, _) => panic!("Fetch should have been cleared"),
1492            }
1493
1494            // Now add link so fetches can complete
1495            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1496
1497            // Fetch same key again, if fetch_timers wasn't cleaned up, this would
1498            // be treated as a duplicate and silently ignored
1499            mailbox1.fetch(key.clone()).await;
1500
1501            // Should succeed
1502            let event = cons_out1.recv().await.unwrap();
1503            match event {
1504                Event::Success(key_actual, value) => {
1505                    assert_eq!(key_actual, key);
1506                    assert_eq!(value, Bytes::from("data for key 6"));
1507                }
1508                Event::Failed(_) => unreachable!(),
1509            }
1510        });
1511    }
1512
1513    /// Tests that when a peer is rate-limited, the fetcher spills over to another peer.
1514    /// With 2 peers and rate limit of 1/sec each, 2 requests issued simultaneously should
1515    /// both complete immediately (one to each peer) without waiting for rate limit reset.
1516    #[test_traced]
1517    fn test_rate_limit_spillover() {
1518        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1519        executor.start(|context| async move {
1520            // Use a very restrictive rate limit: 1 request per second per peer
1521            let (mut oracle, mut schemes, peers, mut connections) =
1522                setup_network_and_peers_with_rate_limit(
1523                    &context,
1524                    &[1, 2, 3],
1525                    Quota::per_second(NZU32!(1)),
1526                )
1527                .await;
1528
1529            // Add links between peer 1 and both peer 2 and peer 3
1530            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1531            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1532
1533            // Both peer 2 and peer 3 have the same data
1534            let mut prod2 = Producer::default();
1535            let mut prod3 = Producer::default();
1536            prod2.insert(Key(0), Bytes::from("data for key 0"));
1537            prod2.insert(Key(1), Bytes::from("data for key 1"));
1538            prod3.insert(Key(0), Bytes::from("data for key 0"));
1539            prod3.insert(Key(1), Bytes::from("data for key 1"));
1540
1541            let (cons1, mut cons_out1) = Consumer::new();
1542
1543            // Set up peer 1 (the requester)
1544            let scheme = schemes.remove(0);
1545            let mut mailbox1 = setup_and_spawn_actor(
1546                &context,
1547                oracle.manager(),
1548                oracle.control(scheme.public_key()),
1549                scheme,
1550                connections.remove(0),
1551                cons1,
1552                Producer::default(),
1553            )
1554            .await;
1555
1556            // Set up peer 2 (has data)
1557            let scheme = schemes.remove(0);
1558            let _mailbox2 = setup_and_spawn_actor(
1559                &context,
1560                oracle.manager(),
1561                oracle.control(scheme.public_key()),
1562                scheme,
1563                connections.remove(0),
1564                Consumer::dummy(),
1565                prod2,
1566            )
1567            .await;
1568
1569            // Set up peer 3 (also has data)
1570            let scheme = schemes.remove(0);
1571            let _mailbox3 = setup_and_spawn_actor(
1572                &context,
1573                oracle.manager(),
1574                oracle.control(scheme.public_key()),
1575                scheme,
1576                connections.remove(0),
1577                Consumer::dummy(),
1578                prod3,
1579            )
1580            .await;
1581
1582            // Wait for peer set to be established
1583            context.sleep(Duration::from_millis(100)).await;
1584            let start = context.current();
1585
1586            // Issue 2 fetch requests rapidly
1587            // With rate limit of 1/sec per peer and 2 peers, both should complete
1588            // immediately via spill-over (one request to each peer)
1589            mailbox1.fetch(Key(0)).await;
1590            mailbox1.fetch(Key(1)).await;
1591
1592            // Collect results
1593            let mut results = HashMap::new();
1594            for _ in 0..2 {
1595                let event = cons_out1.recv().await.unwrap();
1596                match event {
1597                    Event::Success(key, value) => {
1598                        results.insert(key.clone(), value);
1599                    }
1600                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1601                }
1602            }
1603
1604            // Verify both keys were fetched successfully
1605            assert_eq!(results.len(), 2);
1606            assert_eq!(
1607                results.get(&Key(0)).unwrap(),
1608                &Bytes::from("data for key 0")
1609            );
1610            assert_eq!(
1611                results.get(&Key(1)).unwrap(),
1612                &Bytes::from("data for key 1")
1613            );
1614
1615            // Verify it completed quickly (well under 1 second) - proves spill-over worked
1616            // Without spill-over, the second request would wait ~1 second for rate limit reset
1617            let elapsed = context.current().duration_since(start).unwrap();
1618            assert!(
1619                elapsed < Duration::from_millis(500),
1620                "Expected quick completion via spill-over, but took {elapsed:?}"
1621            );
1622        });
1623    }
1624
1625    /// Tests that rate limiting causes retries to eventually succeed after the rate limit resets.
1626    /// This test uses a single peer with a restrictive rate limit and verifies that
1627    /// fetches eventually complete after waiting for the rate limit to reset.
1628    #[test_traced]
1629    fn test_rate_limit_retry_after_reset() {
1630        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1631        executor.start(|context| async move {
1632            // Use a restrictive rate limit: 1 request per second
1633            let (mut oracle, mut schemes, peers, mut connections) =
1634                setup_network_and_peers_with_rate_limit(
1635                    &context,
1636                    &[1, 2],
1637                    Quota::per_second(NZU32!(1)),
1638                )
1639                .await;
1640
1641            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1642
1643            // Peer 2 has data for multiple keys
1644            let mut prod2 = Producer::default();
1645            prod2.insert(Key(1), Bytes::from("data for key 1"));
1646            prod2.insert(Key(2), Bytes::from("data for key 2"));
1647            prod2.insert(Key(3), Bytes::from("data for key 3"));
1648
1649            let (cons1, mut cons_out1) = Consumer::new();
1650
1651            let scheme = schemes.remove(0);
1652            let mut mailbox1 = setup_and_spawn_actor(
1653                &context,
1654                oracle.manager(),
1655                oracle.control(scheme.public_key()),
1656                scheme,
1657                connections.remove(0),
1658                cons1,
1659                Producer::default(),
1660            )
1661            .await;
1662
1663            let scheme = schemes.remove(0);
1664            let _mailbox2 = setup_and_spawn_actor(
1665                &context,
1666                oracle.manager(),
1667                oracle.control(scheme.public_key()),
1668                scheme,
1669                connections.remove(0),
1670                Consumer::dummy(),
1671                prod2,
1672            )
1673            .await;
1674
1675            // Wait for peer set to be established
1676            context.sleep(Duration::from_millis(100)).await;
1677            let start = context.current();
1678
1679            // Issue 3 fetch requests to a single peer with rate limit of 1/sec
1680            // Only 1 can be sent immediately, the others must wait for rate limit reset
1681            mailbox1.fetch(Key(1)).await;
1682            mailbox1.fetch(Key(2)).await;
1683            mailbox1.fetch(Key(3)).await;
1684
1685            // All 3 should eventually succeed (after rate limit resets)
1686            let mut results = HashMap::new();
1687            for _ in 0..3 {
1688                let event = cons_out1.recv().await.unwrap();
1689                match event {
1690                    Event::Success(key, value) => {
1691                        results.insert(key.clone(), value);
1692                    }
1693                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1694                }
1695            }
1696
1697            assert_eq!(results.len(), 3);
1698            for i in 1..=3 {
1699                assert_eq!(
1700                    results.get(&Key(i)).unwrap(),
1701                    &Bytes::from(format!("data for key {}", i))
1702                );
1703            }
1704
1705            // Verify it took significant time due to rate limiting
1706            // With 3 requests at 1/sec to a single peer, requests 2 and 3 must wait
1707            // for rate limit resets (~1 second each), so total should be > 2 seconds
1708            let elapsed = context.current().duration_since(start).unwrap();
1709            assert!(
1710                elapsed > Duration::from_secs(2),
1711                "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
1712            );
1713        });
1714    }
1715
1716    /// Tests that the resolver never sends fetch requests to itself (me exclusion).
1717    /// Even when the local peer has the data in its producer, it should fetch from
1718    /// another peer instead.
1719    #[test_traced]
1720    fn test_self_exclusion() {
1721        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1722        executor.start(|context| async move {
1723            let (mut oracle, mut schemes, peers, mut connections) =
1724                setup_network_and_peers(&context, &[1, 2]).await;
1725
1726            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1727
1728            let key = Key(1);
1729            let data = Bytes::from("shared data");
1730
1731            // Both peers have the data - peer 1 (requester) and peer 2
1732            let mut prod1 = Producer::default();
1733            prod1.insert(key.clone(), data.clone());
1734            let mut prod2 = Producer::default();
1735            prod2.insert(key.clone(), data.clone());
1736
1737            let (cons1, mut cons_out1) = Consumer::new();
1738
1739            // Set up peer 1 with `me` set - it has the data but should NOT fetch from itself
1740            let scheme = schemes.remove(0);
1741            let mut mailbox1 = setup_and_spawn_actor(
1742                &context,
1743                oracle.manager(),
1744                oracle.control(scheme.public_key()),
1745                scheme,
1746                connections.remove(0),
1747                cons1,
1748                prod1, // peer 1 has the data
1749            )
1750            .await;
1751
1752            // Set up peer 2 - also has the data
1753            let scheme = schemes.remove(0);
1754            let _mailbox2 = setup_and_spawn_actor(
1755                &context,
1756                oracle.manager(),
1757                oracle.control(scheme.public_key()),
1758                scheme,
1759                connections.remove(0),
1760                Consumer::dummy(),
1761                prod2,
1762            )
1763            .await;
1764
1765            // Wait for peer set to be established
1766            context.sleep(Duration::from_millis(100)).await;
1767
1768            // Fetch the key - should get it from peer 2, not from self
1769            mailbox1.fetch(key.clone()).await;
1770
1771            // Should succeed (from peer 2)
1772            let event = cons_out1.recv().await.unwrap();
1773            match event {
1774                Event::Success(key_actual, value) => {
1775                    assert_eq!(key_actual, key);
1776                    assert_eq!(value, data);
1777                }
1778                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1779            }
1780        });
1781    }
1782
1783    #[allow(clippy::type_complexity)]
1784    fn spawn_actors_with_handles(
1785        context: deterministic::Context,
1786        oracle: &Oracle<PublicKey, deterministic::Context>,
1787        schemes: Vec<PrivateKey>,
1788        connections: Vec<(
1789            Sender<PublicKey, deterministic::Context>,
1790            Receiver<PublicKey>,
1791        )>,
1792        consumers: Vec<Consumer<Key, Bytes>>,
1793        producers: Vec<Producer<Key, Bytes>>,
1794    ) -> (
1795        Vec<Mailbox<Key, PublicKey>>,
1796        Vec<commonware_runtime::Handle<()>>,
1797    ) {
1798        let actor_context = context.with_label("actor");
1799        let mut mailboxes = Vec::new();
1800        let mut handles = Vec::new();
1801
1802        for (idx, ((scheme, conn), (consumer, producer))) in schemes
1803            .into_iter()
1804            .zip(connections)
1805            .zip(consumers.into_iter().zip(producers))
1806            .enumerate()
1807        {
1808            let ctx = actor_context.with_label(&format!("peer_{idx}"));
1809            let public_key = scheme.public_key();
1810            let (engine, mailbox) = Engine::new(
1811                ctx,
1812                Config {
1813                    provider: oracle.manager(),
1814                    blocker: oracle.control(public_key.clone()),
1815                    consumer,
1816                    producer,
1817                    mailbox_size: MAILBOX_SIZE,
1818                    me: Some(public_key),
1819                    initial: INITIAL_DURATION,
1820                    timeout: TIMEOUT,
1821                    fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
1822                    priority_requests: false,
1823                    priority_responses: false,
1824                },
1825            );
1826            handles.push(engine.start(conn));
1827            mailboxes.push(mailbox);
1828        }
1829
1830        (mailboxes, handles)
1831    }
1832
1833    #[test_traced]
1834    fn test_operations_after_shutdown_do_not_panic() {
1835        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1836        executor.start(|context| async move {
1837            let (mut oracle, schemes, peers, connections) =
1838                setup_network_and_peers(&context, &[1, 2]).await;
1839
1840            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1841
1842            let key = Key(1);
1843            let mut prod2 = Producer::default();
1844            prod2.insert(key.clone(), Bytes::from("data for key 1"));
1845
1846            let (cons1, mut cons_out1) = Consumer::new();
1847
1848            let (mut mailboxes, handles) = spawn_actors_with_handles(
1849                context.clone(),
1850                &oracle,
1851                schemes,
1852                connections,
1853                vec![cons1, Consumer::dummy()],
1854                vec![Producer::default(), prod2],
1855            );
1856
1857            // Fetch to verify network is functional
1858            mailboxes[0].fetch(key.clone()).await;
1859            let event = cons_out1.recv().await.unwrap();
1860            match event {
1861                Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1862                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1863            }
1864
1865            // Abort all actors
1866            for handle in handles {
1867                handle.abort();
1868            }
1869            context.sleep(Duration::from_millis(100)).await;
1870
1871            // All operations should not panic after shutdown
1872
1873            // Fetch should not panic
1874            let key2 = Key(2);
1875            mailboxes[0].fetch(key2.clone()).await;
1876
1877            // Cancel should not panic
1878            mailboxes[0].cancel(key2.clone()).await;
1879
1880            // Clear should not panic
1881            mailboxes[0].clear().await;
1882
1883            // Retain should not panic
1884            mailboxes[0].retain(|_| true).await;
1885
1886            // Fetch targeted should not panic
1887            mailboxes[0]
1888                .fetch_targeted(Key(3), non_empty_vec![peers[1].clone()])
1889                .await;
1890        });
1891    }
1892
1893    fn clean_shutdown(seed: u64) {
1894        let cfg = deterministic::Config::default()
1895            .with_seed(seed)
1896            .with_timeout(Some(Duration::from_secs(30)));
1897        let executor = deterministic::Runner::new(cfg);
1898        executor.start(|context| async move {
1899            let (mut oracle, schemes, peers, connections) =
1900                setup_network_and_peers(&context, &[1, 2]).await;
1901
1902            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1903
1904            let key = Key(1);
1905            let mut prod2 = Producer::default();
1906            prod2.insert(key.clone(), Bytes::from("data for key 1"));
1907
1908            let (cons1, mut cons_out1) = Consumer::new();
1909
1910            let (mut mailboxes, handles) = spawn_actors_with_handles(
1911                context.clone(),
1912                &oracle,
1913                schemes,
1914                connections,
1915                vec![cons1, Consumer::dummy()],
1916                vec![Producer::default(), prod2],
1917            );
1918
1919            // Allow tasks to start
1920            context.sleep(Duration::from_millis(100)).await;
1921
1922            // Count running tasks under the actor prefix
1923            let running_before = count_running_tasks(&context, "actor");
1924            assert!(
1925                running_before > 0,
1926                "at least one actor task should be running"
1927            );
1928
1929            // Verify network is functional
1930            mailboxes[0].fetch(key.clone()).await;
1931            let event = cons_out1.recv().await.unwrap();
1932            match event {
1933                Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
1934                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1935            }
1936
1937            // Abort all actors
1938            for handle in handles {
1939                handle.abort();
1940            }
1941            context.sleep(Duration::from_millis(100)).await;
1942
1943            // Verify all actor tasks are stopped
1944            let running_after = count_running_tasks(&context, "actor");
1945            assert_eq!(
1946                running_after, 0,
1947                "all actor tasks should be stopped, but {running_after} still running"
1948            );
1949        });
1950    }
1951
1952    #[test]
1953    fn test_clean_shutdown() {
1954        for seed in 0..25 {
1955            clean_shutdown(seed);
1956        }
1957    }
1958}