Skip to main content

commonware_resolver/p2p/
mod.rs

1//! Resolve data identified by a fixed-length key by using the P2P network.
2//!
3//! # Overview
4//!
5//! The `p2p` module enables resolving data by fixed-length keys in a P2P network. Central to the
6//! module is the `peer` actor which manages the fetch-request lifecycle. Its mailbox allows
7//! initiation and cancellation of fetch requests via the `Resolver` interface.
8//!
9//! The peer handles an arbitrarily large number of concurrent fetch requests by sending requests
10//! to other peers and processing their responses. It selects peers based on performance, retrying
11//! with another peer if one fails or provides invalid data. Requests persist until canceled or
12//! fulfilled, delivering data to the `Consumer` for verification.
13//!
14//! The `Consumer` checks data integrity and authenticity (critical in an adversarial environment)
15//! and returns `true` if valid, completing the fetch, or `false` to retry.
16//!
17//! The peer also serves data to other peers, forwarding network requests to the `Producer`. The
18//! `Producer` provides data asynchronously (e.g., from storage). If it fails, the peer sends an
19//! empty response, prompting the requester to retry elsewhere. Each message between peers contains
20//! an ID. Each request is sent with a unique ID, and each response includes the ID of the request
21//! it responds to.
22//!
23//! # Targeting
24//!
25//! Callers can restrict fetches to specific target peers using [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted).
26//! Only target peers are tried, there is no automatic fallback to other peers. Targets persist through
27//! transient failures (timeout, "no data" response, send failure) since the peer might be slow or
28//! receive the data later.
29//!
30//! While a fetch is in progress, callers can modify targeting:
31//! - [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted) adds peers to the existing target set
32//!   (only if the fetch already has targets, an "all" fetch remains unrestricted)
33//! - [`Resolver::fetch`](crate::Resolver::fetch) clears all targets, allowing fallback to any peer
34//!
35//! These modifications only apply to in-progress fetches. Once a fetch completes (success, cancel,
36//! or blocked peer), the targets for that key are cleared automatically.
37//!
38//! # Peer Selection
39//!
40//! Outbound fetches are only sent to peers in `latest.primary` (see [commonware_p2p::Provider]) but inbound
41//! requests are handled for all connected peers. Thus, callers that still expect a key to be fetchable after
42//! a peer set update must ensure the latest primary set can serve it.
43//!
44//! [`Resolver::fetch_targeted`](crate::Resolver::fetch_targeted) can narrow the current primary set
45//! further, but it does not bypass that latest-primary filter. Explicit targets that are no longer
46//! in the latest primary set are ignored until they become primary again.
47//!
48//! # Performance Considerations
49//!
50//! The peer supports arbitrarily many concurrent fetch requests, but resource usage generally
51//! depends on the rate-limiting configuration of the underlying P2P network.
52
53use bytes::Bytes;
54use commonware_utils::{channel::oneshot, Span};
55use std::future::Future;
56
57mod config;
58pub use config::Config;
59mod engine;
60pub use engine::Engine;
61mod fetcher;
62mod ingress;
63pub use ingress::Mailbox;
64mod metrics;
65mod wire;
66
67#[cfg(feature = "mocks")]
68pub mod mocks;
69
70/// Serves data requested by the network.
71pub trait Producer: Clone + Send + 'static {
72    /// Type used to uniquely identify data.
73    type Key: Span;
74
75    /// Serve a request received from the network.
76    fn produce(&mut self, key: Self::Key) -> impl Future<Output = oneshot::Receiver<Bytes>> + Send;
77}
78
79#[cfg(test)]
80mod tests {
81    use super::{
82        mocks::{Consumer, Event, Key, Producer},
83        Config, Engine, Mailbox,
84    };
85    use crate::Resolver;
86    use bytes::Bytes;
87    use commonware_cryptography::{
88        ed25519::{PrivateKey, PublicKey},
89        Signer,
90    };
91    use commonware_macros::{select, test_traced};
92    use commonware_p2p::{
93        simulated::{Link, Network, Oracle, Receiver, Sender},
94        Blocker, Manager as _, Provider, TrackedPeers,
95    };
96    use commonware_runtime::{count_running_tasks, deterministic, Clock, Metrics, Quota, Runner};
97    use commonware_utils::{non_empty_vec, ordered::Set, NZUsize, NZU32};
98    use std::{collections::HashMap, num::NonZeroU32, time::Duration};
99
100    const MAILBOX_SIZE: usize = 1024;
101    const RATE_LIMIT: NonZeroU32 = NZU32!(10);
102    const INITIAL_DURATION: Duration = Duration::from_millis(100);
103    const TIMEOUT: Duration = Duration::from_millis(400);
104    const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
105    const LINK: Link = Link {
106        latency: Duration::from_millis(10),
107        jitter: Duration::from_millis(1),
108        success_rate: 1.0,
109    };
110    const LINK_UNRELIABLE: Link = Link {
111        latency: Duration::from_millis(10),
112        jitter: Duration::from_millis(1),
113        success_rate: 0.5,
114    };
115
116    async fn setup_network_and_peers(
117        context: &deterministic::Context,
118        peer_seeds: &[u64],
119    ) -> (
120        Oracle<PublicKey, deterministic::Context>,
121        Vec<PrivateKey>,
122        Vec<PublicKey>,
123        Vec<(
124            Sender<PublicKey, deterministic::Context>,
125            Receiver<PublicKey>,
126        )>,
127    ) {
128        setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
129            .await
130    }
131
132    async fn setup_network_and_peers_with_rate_limit(
133        context: &deterministic::Context,
134        peer_seeds: &[u64],
135        rate_limit: Quota,
136    ) -> (
137        Oracle<PublicKey, deterministic::Context>,
138        Vec<PrivateKey>,
139        Vec<PublicKey>,
140        Vec<(
141            Sender<PublicKey, deterministic::Context>,
142            Receiver<PublicKey>,
143        )>,
144    ) {
145        let (network, oracle) = Network::new(
146            context.with_label("network"),
147            commonware_p2p::simulated::Config {
148                max_size: 1024 * 1024,
149                disconnect_on_block: true,
150                tracked_peer_sets: NZUsize!(3),
151            },
152        );
153        network.start();
154
155        let schemes: Vec<PrivateKey> = peer_seeds
156            .iter()
157            .map(|seed| PrivateKey::from_seed(*seed))
158            .collect();
159        let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
160        let mut manager = oracle.manager();
161        manager
162            .track(0, Set::try_from(peers.clone()).unwrap())
163            .await;
164
165        let mut connections = Vec::new();
166        for peer in &peers {
167            let (sender, receiver) = oracle
168                .control(peer.clone())
169                .register(0, rate_limit)
170                .await
171                .unwrap();
172            connections.push((sender, receiver));
173        }
174
175        (oracle, schemes, peers, connections)
176    }
177
178    async fn add_link(
179        oracle: &mut Oracle<PublicKey, deterministic::Context>,
180        link: Link,
181        peers: &[PublicKey],
182        from: usize,
183        to: usize,
184    ) {
185        oracle
186            .add_link(peers[from].clone(), peers[to].clone(), link.clone())
187            .await
188            .unwrap();
189        oracle
190            .add_link(peers[to].clone(), peers[from].clone(), link)
191            .await
192            .unwrap();
193    }
194
195    fn setup_and_spawn_actor(
196        context: &deterministic::Context,
197        provider: impl Provider<PublicKey = PublicKey>,
198        blocker: impl Blocker<PublicKey = PublicKey>,
199        signer: impl Signer<PublicKey = PublicKey>,
200        connection: (
201            Sender<PublicKey, deterministic::Context>,
202            Receiver<PublicKey>,
203        ),
204        consumer: Consumer<Key, Bytes>,
205        producer: Producer<Key, Bytes>,
206    ) -> Mailbox<Key, PublicKey> {
207        let public_key = signer.public_key();
208        let (engine, mailbox) = Engine::new(
209            context.with_label(&format!("actor_{public_key}")),
210            Config {
211                peer_provider: provider,
212                blocker,
213                consumer,
214                producer,
215                mailbox_size: MAILBOX_SIZE,
216                me: Some(public_key),
217                initial: INITIAL_DURATION,
218                timeout: TIMEOUT,
219                fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
220                priority_requests: false,
221                priority_responses: false,
222            },
223        );
224        engine.start(connection);
225
226        mailbox
227    }
228
229    /// Tests that fetching a key from another peer succeeds when data is available.
230    /// This test sets up two peers, where Peer 1 requests data that Peer 2 has,
231    /// and verifies that the data is correctly delivered to Peer 1's consumer.
232    #[test_traced]
233    fn test_fetch_success() {
234        let executor = deterministic::Runner::timed(Duration::from_secs(10));
235        executor.start(|context| async move {
236            let (mut oracle, mut schemes, peers, mut connections) =
237                setup_network_and_peers(&context, &[1, 2]).await;
238
239            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
240
241            let key = Key(2);
242            let mut prod2 = Producer::default();
243            prod2.insert(key.clone(), Bytes::from("data for key 2"));
244
245            let (cons1, mut cons_out1) = Consumer::new();
246
247            let scheme = schemes.remove(0);
248            let mut mailbox1 = setup_and_spawn_actor(
249                &context,
250                oracle.manager(),
251                oracle.control(scheme.public_key()),
252                scheme,
253                connections.remove(0),
254                cons1,
255                Producer::default(),
256            );
257
258            let scheme = schemes.remove(0);
259            let _mailbox2 = setup_and_spawn_actor(
260                &context,
261                oracle.manager(),
262                oracle.control(scheme.public_key()),
263                scheme,
264                connections.remove(0),
265                Consumer::dummy(),
266                prod2,
267            );
268
269            mailbox1.fetch(key.clone()).await;
270
271            let event = cons_out1.recv().await.unwrap();
272            match event {
273                Event::Success(key_actual, value) => {
274                    assert_eq!(key_actual, key);
275                    assert_eq!(value, Bytes::from("data for key 2"));
276                }
277                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
278            }
279        });
280    }
281
282    /// Tests that canceling a fetch request results in a failure event.
283    /// This test initiates a fetch request and immediately cancels it,
284    /// verifying that the consumer receives a failure notification instead of data.
285    #[test_traced]
286    fn test_cancel_fetch() {
287        let executor = deterministic::Runner::timed(Duration::from_secs(10));
288        executor.start(|context| async move {
289            let (oracle, mut schemes, _peers, mut connections) =
290                setup_network_and_peers(&context, &[1]).await;
291
292            let (cons1, mut cons_out1) = Consumer::new();
293            let prod1 = Producer::default();
294
295            let scheme = schemes.remove(0);
296            let mut mailbox1 = setup_and_spawn_actor(
297                &context,
298                oracle.manager(),
299                oracle.control(scheme.public_key()),
300                scheme,
301                connections.remove(0),
302                cons1,
303                prod1,
304            );
305
306            let key = Key(3);
307            mailbox1.fetch(key.clone()).await;
308            mailbox1.cancel(key.clone()).await;
309
310            let event = cons_out1.recv().await.unwrap();
311            match event {
312                Event::Failed(key_actual) => {
313                    assert_eq!(key_actual, key);
314                }
315                Event::Success(_, _) => panic!("Fetch should have been canceled"),
316            }
317        });
318    }
319
320    /// Tests fetching data from a peer when some peers lack the data.
321    /// This test sets up three peers, where Peer 1 requests data that only Peer 3 has.
322    /// It verifies that the resolver retries with another peer and successfully
323    /// delivers the data to Peer 1's consumer.
324    #[test_traced]
325    fn test_peer_no_data() {
326        let executor = deterministic::Runner::timed(Duration::from_secs(10));
327        executor.start(|context| async move {
328            let (mut oracle, mut schemes, peers, mut connections) =
329                setup_network_and_peers(&context, &[1, 2, 3]).await;
330
331            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
332            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
333
334            let prod1 = Producer::default();
335            let prod2 = Producer::default();
336            let mut prod3 = Producer::default();
337            let key = Key(3);
338            prod3.insert(key.clone(), Bytes::from("data for key 3"));
339
340            let (cons1, mut cons_out1) = Consumer::new();
341
342            let scheme = schemes.remove(0);
343            let mut mailbox1 = setup_and_spawn_actor(
344                &context,
345                oracle.manager(),
346                oracle.control(scheme.public_key()),
347                scheme,
348                connections.remove(0),
349                cons1,
350                prod1,
351            );
352
353            let scheme = schemes.remove(0);
354            let _mailbox2 = setup_and_spawn_actor(
355                &context,
356                oracle.manager(),
357                oracle.control(scheme.public_key()),
358                scheme,
359                connections.remove(0),
360                Consumer::dummy(),
361                prod2,
362            );
363
364            let scheme = schemes.remove(0);
365            let _mailbox3 = setup_and_spawn_actor(
366                &context,
367                oracle.manager(),
368                oracle.control(scheme.public_key()),
369                scheme,
370                connections.remove(0),
371                Consumer::dummy(),
372                prod3,
373            );
374
375            mailbox1.fetch(key.clone()).await;
376
377            let event = cons_out1.recv().await.unwrap();
378            match event {
379                Event::Success(key_actual, value) => {
380                    assert_eq!(key_actual, key);
381                    assert_eq!(value, Bytes::from("data for key 3"));
382                }
383                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
384            }
385        });
386    }
387
388    /// Tests fetching when no peers are available.
389    /// This test sets up a single peer with an empty peer provider (no peers).
390    /// It initiates a fetch, waits beyond the retry timeout, cancels the fetch,
391    /// and verifies that the consumer receives a failure notification.
392    #[test_traced]
393    fn test_no_peers_available() {
394        let executor = deterministic::Runner::timed(Duration::from_secs(10));
395        executor.start(|context| async move {
396            let (oracle, mut schemes, _peers, mut connections) =
397                setup_network_and_peers(&context, &[1]).await;
398
399            let (cons1, mut cons_out1) = Consumer::new();
400            let prod1 = Producer::default();
401
402            let scheme = schemes.remove(0);
403            let mut mailbox1 = setup_and_spawn_actor(
404                &context,
405                oracle.manager(),
406                oracle.control(scheme.public_key()),
407                scheme,
408                connections.remove(0),
409                cons1,
410                prod1,
411            );
412
413            let key = Key(4);
414            mailbox1.fetch(key.clone()).await;
415            context.sleep(Duration::from_secs(5)).await;
416            mailbox1.cancel(key.clone()).await;
417
418            let event = cons_out1.recv().await.expect("Consumer channel closed");
419            match event {
420                Event::Failed(key_actual) => {
421                    assert_eq!(key_actual, key);
422                }
423                Event::Success(_, _) => {
424                    panic!("Fetch should have failed due to no peers")
425                }
426            }
427        });
428    }
429
430    /// Tests that fetches issued before the first peer set arrives stay pending and complete once
431    /// the initial update is tracked.
432    #[test_traced]
433    fn test_fetch_before_initial_peer_set_waits_for_update() {
434        let executor = deterministic::Runner::timed(Duration::from_secs(10));
435        executor.start(|context| async move {
436            let (network, mut oracle) = Network::new(
437                context.with_label("network"),
438                commonware_p2p::simulated::Config {
439                    max_size: 1024 * 1024,
440                    disconnect_on_block: true,
441                    tracked_peer_sets: NZUsize!(1),
442                },
443            );
444            network.start();
445
446            let mut schemes = [1_u64, 2]
447                .into_iter()
448                .map(PrivateKey::from_seed)
449                .collect::<Vec<_>>();
450            schemes.sort_by_key(|s| s.public_key());
451            let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
452
453            let mut connections = Vec::new();
454            for peer in &peers {
455                let (sender, receiver) = oracle
456                    .control(peer.clone())
457                    .register(0, Quota::per_second(RATE_LIMIT))
458                    .await
459                    .unwrap();
460                connections.push((sender, receiver));
461            }
462
463            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
464
465            let key = Key(2);
466            let mut prod2 = Producer::default();
467            prod2.insert(key.clone(), Bytes::from("data for key 2"));
468
469            let (cons1, mut cons_out1) = Consumer::new();
470
471            let scheme = schemes.remove(0);
472            let mut mailbox1 = setup_and_spawn_actor(
473                &context,
474                oracle.manager(),
475                oracle.control(scheme.public_key()),
476                scheme,
477                connections.remove(0),
478                cons1,
479                Producer::default(),
480            );
481
482            let scheme = schemes.remove(0);
483            let _mailbox2 = setup_and_spawn_actor(
484                &context,
485                oracle.manager(),
486                oracle.control(scheme.public_key()),
487                scheme,
488                connections.remove(0),
489                Consumer::dummy(),
490                prod2,
491            );
492
493            mailbox1.fetch(key.clone()).await;
494
495            select! {
496                event = cons_out1.recv() => {
497                    panic!("fetch should wait for the initial peer set, got {event:?}");
498                },
499                _ = context.sleep(Duration::from_millis(200)) => {}
500            };
501
502            oracle
503                .manager()
504                .track(0, Set::try_from(peers.clone()).unwrap())
505                .await;
506
507            let event = cons_out1.recv().await.unwrap();
508            match event {
509                Event::Success(key_actual, value) => {
510                    assert_eq!(key_actual, key);
511                    assert_eq!(value, Bytes::from("data for key 2"));
512                }
513                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
514            }
515        });
516    }
517
518    /// Tests that concurrent fetch requests are handled correctly.
519    /// Also tests that the peer can recover from having no peers available.
520    /// Also tests that the peer can get data from multiple peers that have different sets of data.
521    #[test_traced]
522    fn test_concurrent_fetch_requests() {
523        let executor = deterministic::Runner::timed(Duration::from_secs(60));
524        executor.start(|context| async move {
525            let (mut oracle, mut schemes, peers, mut connections) =
526                setup_network_and_peers(&context, &[1, 2, 3]).await;
527
528            let key2 = Key(2);
529            let key3 = Key(3);
530            let mut prod2 = Producer::default();
531            prod2.insert(key2.clone(), Bytes::from("data for key 2"));
532            let mut prod3 = Producer::default();
533            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
534
535            let (cons1, mut cons_out1) = Consumer::new();
536
537            let scheme = schemes.remove(0);
538            let mut mailbox1 = setup_and_spawn_actor(
539                &context,
540                oracle.manager(),
541                oracle.control(scheme.public_key()),
542                scheme,
543                connections.remove(0),
544                cons1,
545                Producer::default(),
546            );
547
548            let scheme = schemes.remove(0);
549            let _mailbox2 = setup_and_spawn_actor(
550                &context,
551                oracle.manager(),
552                oracle.control(scheme.public_key()),
553                scheme,
554                connections.remove(0),
555                Consumer::dummy(),
556                prod2,
557            );
558
559            let scheme = schemes.remove(0);
560            let _mailbox3 = setup_and_spawn_actor(
561                &context,
562                oracle.manager(),
563                oracle.control(scheme.public_key()),
564                scheme,
565                connections.remove(0),
566                Consumer::dummy(),
567                prod3,
568            );
569
570            // Add choppy links between the requester and the two producers
571            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
572            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
573
574            // Run the fetches multiple times to ensure that the peer tries both of its peers
575            for _ in 0..10 {
576                // Initiate concurrent fetch requests
577                mailbox1.fetch(key2.clone()).await;
578                mailbox1.fetch(key3.clone()).await;
579
580                // Collect both events without assuming order
581                let mut events = Vec::new();
582                events.push(cons_out1.recv().await.expect("Consumer channel closed"));
583                events.push(cons_out1.recv().await.expect("Consumer channel closed"));
584
585                // Check that both keys were successfully fetched
586                let mut found_key2 = false;
587                let mut found_key3 = false;
588                for event in events {
589                    match event {
590                        Event::Success(key_actual, value) => {
591                            if key_actual == key2 {
592                                assert_eq!(value, Bytes::from("data for key 2"));
593                                found_key2 = true;
594                            } else if key_actual == key3 {
595                                assert_eq!(value, Bytes::from("data for key 3"));
596                                found_key3 = true;
597                            } else {
598                                panic!("Unexpected key received");
599                            }
600                        }
601                        Event::Failed(_) => panic!("Fetch failed unexpectedly"),
602                    }
603                }
604                assert!(found_key2 && found_key3,);
605            }
606        });
607    }
608
609    /// Tests that canceling an inactive fetch request has no effect.
610    /// Cancels a request before, after, and during the fetch process,
611    #[test_traced]
612    fn test_cancel() {
613        let executor = deterministic::Runner::timed(Duration::from_secs(10));
614        executor.start(|context| async move {
615            let (mut oracle, mut schemes, peers, mut connections) =
616                setup_network_and_peers(&context, &[1, 2]).await;
617
618            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
619
620            let key = Key(6);
621            let mut prod2 = Producer::default();
622            prod2.insert(key.clone(), Bytes::from("data for key 6"));
623
624            let (cons1, mut cons_out1) = Consumer::new();
625
626            let scheme = schemes.remove(0);
627            let mut mailbox1 = setup_and_spawn_actor(
628                &context,
629                oracle.manager(),
630                oracle.control(scheme.public_key()),
631                scheme,
632                connections.remove(0),
633                cons1,
634                Producer::default(),
635            );
636
637            let scheme = schemes.remove(0);
638            let _mailbox2 = setup_and_spawn_actor(
639                &context,
640                oracle.manager(),
641                oracle.control(scheme.public_key()),
642                scheme,
643                connections.remove(0),
644                Consumer::dummy(),
645                prod2,
646            );
647
648            // Cancel before sending the fetch request, expecting no effect
649            mailbox1.cancel(key.clone()).await;
650            select! {
651                _ = cons_out1.recv() => {
652                    panic!("unexpected event");
653                },
654                _ = context.sleep(Duration::from_millis(100)) => {},
655            };
656
657            // Initiate fetch and wait for data to be delivered
658            mailbox1.fetch(key.clone()).await;
659            let event = cons_out1.recv().await.unwrap();
660            match event {
661                Event::Success(key_actual, value) => {
662                    assert_eq!(key_actual, key);
663                    assert_eq!(value, Bytes::from("data for key 6"));
664                }
665                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
666            }
667
668            // Attempt to cancel after data has been delivered, expecting no effect
669            mailbox1.cancel(key.clone()).await;
670            select! {
671                _ = cons_out1.recv() => {
672                    panic!("unexpected event");
673                },
674                _ = context.sleep(Duration::from_millis(100)) => {},
675            };
676
677            // Initiate and cancel another fetch request
678            let key = Key(7);
679            mailbox1.fetch(key.clone()).await;
680            mailbox1.cancel(key.clone()).await;
681
682            // Make sure we receive a failure event
683            let event = cons_out1.recv().await.unwrap();
684            match event {
685                Event::Failed(key_actual) => {
686                    assert_eq!(key_actual, key);
687                }
688                Event::Success(_, _) => panic!("Fetch should have been canceled"),
689            }
690        });
691    }
692
693    /// Tests that a peer is blocked after delivering invalid data,
694    /// preventing further fetches from that peer.
695    #[test_traced]
696    fn test_blocking_peer() {
697        let executor = deterministic::Runner::timed(Duration::from_secs(10));
698        executor.start(|context| async move {
699            let (mut oracle, mut schemes, peers, mut connections) =
700                setup_network_and_peers(&context, &[1, 2, 3]).await;
701
702            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
703            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
704            add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
705
706            let key_a = Key(1);
707            let key_b = Key(2);
708            let invalid_data_a = Bytes::from("invalid for A");
709            let valid_data_a = Bytes::from("valid for A");
710            let valid_data_b = Bytes::from("valid for B");
711
712            // Set up producers
713            let mut prod2 = Producer::default();
714            prod2.insert(key_a.clone(), invalid_data_a.clone());
715            prod2.insert(key_b.clone(), valid_data_b.clone());
716
717            let mut prod3 = Producer::default();
718            prod3.insert(key_a.clone(), valid_data_a.clone());
719
720            // Set up consumer for Peer1 with expected values
721            let (mut cons1, mut cons_out1) = Consumer::new();
722            cons1.add_expected(key_a.clone(), valid_data_a.clone());
723            cons1.add_expected(key_b.clone(), valid_data_b.clone());
724
725            // Spawn actors
726            let scheme = schemes.remove(0);
727            let mut mailbox1 = setup_and_spawn_actor(
728                &context,
729                oracle.manager(),
730                oracle.control(scheme.public_key()),
731                scheme,
732                connections.remove(0),
733                cons1,
734                Producer::default(),
735            );
736
737            let scheme = schemes.remove(0);
738            let _mailbox2 = setup_and_spawn_actor(
739                &context,
740                oracle.manager(),
741                oracle.control(scheme.public_key()),
742                scheme,
743                connections.remove(0),
744                Consumer::dummy(),
745                prod2,
746            );
747
748            let scheme = schemes.remove(0);
749            let _mailbox3 = setup_and_spawn_actor(
750                &context,
751                oracle.manager(),
752                oracle.control(scheme.public_key()),
753                scheme,
754                connections.remove(0),
755                Consumer::dummy(),
756                prod3,
757            );
758
759            // Fetch keyA multiple times to ensure that Peer2 is blocked.
760            for _ in 0..20 {
761                // Fetch keyA
762                mailbox1.fetch(key_a.clone()).await;
763
764                // Wait for success event for keyA
765                let event = cons_out1.recv().await.unwrap();
766                match event {
767                    Event::Success(key_actual, value) => {
768                        assert_eq!(key_actual, key_a);
769                        assert_eq!(value, valid_data_a);
770                    }
771                    Event::Failed(_) => panic!("Fetch failed unexpectedly"),
772                }
773            }
774
775            // Fetch keyB
776            mailbox1.fetch(key_b.clone()).await;
777
778            // Wait for some time (longer than retry timeout)
779            context.sleep(Duration::from_secs(5)).await;
780
781            // Cancel the fetch for keyB
782            mailbox1.cancel(key_b.clone()).await;
783
784            // Wait for failure event for keyB
785            let event = cons_out1.recv().await.unwrap();
786            match event {
787                Event::Failed(key_actual) => {
788                    assert_eq!(key_actual, key_b);
789                }
790                Event::Success(_, _) => panic!("Fetch should have been canceled"),
791            }
792
793            // Check oracle
794            let blocked = oracle.blocked().await.unwrap();
795            assert_eq!(blocked.len(), 1);
796            assert_eq!(blocked[0].0, peers[0]);
797            assert_eq!(blocked[0].1, peers[1]);
798        });
799    }
800
801    /// Tests that duplicate fetch requests for the same key are handled properly.
802    /// The test verifies that when the same key is requested multiple times,
803    /// the data is correctly delivered once without errors.
804    #[test_traced]
805    fn test_duplicate_fetch_request() {
806        let executor = deterministic::Runner::timed(Duration::from_secs(10));
807        executor.start(|context| async move {
808            let (mut oracle, mut schemes, peers, mut connections) =
809                setup_network_and_peers(&context, &[1, 2]).await;
810
811            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
812
813            let key = Key(5);
814            let mut prod2 = Producer::default();
815            prod2.insert(key.clone(), Bytes::from("data for key 5"));
816
817            let (cons1, mut cons_out1) = Consumer::new();
818
819            let scheme = schemes.remove(0);
820            let mut mailbox1 = setup_and_spawn_actor(
821                &context,
822                oracle.manager(),
823                oracle.control(scheme.public_key()),
824                scheme,
825                connections.remove(0),
826                cons1,
827                Producer::default(),
828            );
829
830            let scheme = schemes.remove(0);
831            let _mailbox2 = setup_and_spawn_actor(
832                &context,
833                oracle.manager(),
834                oracle.control(scheme.public_key()),
835                scheme,
836                connections.remove(0),
837                Consumer::dummy(),
838                prod2,
839            );
840
841            // Send duplicate fetch requests for the same key
842            mailbox1.fetch(key.clone()).await;
843            mailbox1.fetch(key.clone()).await;
844
845            // Should receive the data only once
846            let event = cons_out1.recv().await.unwrap();
847            match event {
848                Event::Success(key_actual, value) => {
849                    assert_eq!(key_actual, key);
850                    assert_eq!(value, Bytes::from("data for key 5"));
851                }
852                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
853            }
854
855            // Make sure we don't receive a second event for the duplicate fetch
856            select! {
857                _ = cons_out1.recv() => {
858                    panic!("Unexpected second event received for duplicate fetch");
859                },
860                _ = context.sleep(Duration::from_millis(500)) => {
861                    // This is expected - no additional events should be produced
862                },
863            };
864        });
865    }
866
867    /// Tests that changing peer sets is handled correctly using the update channel.
868    /// This test verifies that when the peer set changes from peer A to peer B,
869    /// the resolver correctly adapts and fetches from the new peer.
870    #[test_traced]
871    fn test_changing_peer_sets() {
872        let executor = deterministic::Runner::timed(Duration::from_secs(10));
873        executor.start(|context| async move {
874            let (mut oracle, mut schemes, peers, mut connections) =
875                setup_network_and_peers(&context, &[1, 2, 3]).await;
876
877            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
878            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
879
880            let key1 = Key(1);
881            let key2 = Key(2);
882
883            let mut prod2 = Producer::default();
884            prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
885
886            let mut prod3 = Producer::default();
887            prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
888
889            let (cons1, mut cons_out1) = Consumer::new();
890
891            let scheme = schemes.remove(0);
892            let mut mailbox1 = setup_and_spawn_actor(
893                &context,
894                oracle.manager(),
895                oracle.control(scheme.public_key()),
896                scheme,
897                connections.remove(0),
898                cons1,
899                Producer::default(),
900            );
901
902            let scheme = schemes.remove(0);
903            let _mailbox2 = setup_and_spawn_actor(
904                &context,
905                oracle.manager(),
906                oracle.control(scheme.public_key()),
907                scheme,
908                connections.remove(0),
909                Consumer::dummy(),
910                prod2,
911            );
912
913            // Fetch key1 from peer 2
914            mailbox1.fetch(key1.clone()).await;
915
916            // Wait for successful fetch
917            let event = cons_out1.recv().await.unwrap();
918            match event {
919                Event::Success(key_actual, value) => {
920                    assert_eq!(key_actual, key1);
921                    assert_eq!(value, Bytes::from("data from peer 2"));
922                }
923                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
924            }
925
926            // Change peer set to include peer 3
927            let scheme = schemes.remove(0);
928            let _mailbox3 = setup_and_spawn_actor(
929                &context,
930                oracle.manager(),
931                oracle.control(scheme.public_key()),
932                scheme,
933                connections.remove(0),
934                Consumer::dummy(),
935                prod3,
936            );
937
938            // Need to wait for the peer set change to propagate
939            context.sleep(Duration::from_millis(200)).await;
940
941            // Fetch key2 from peer 3
942            mailbox1.fetch(key2.clone()).await;
943
944            // Wait for successful fetch
945            let event = cons_out1.recv().await.unwrap();
946            match event {
947                Event::Success(key_actual, value) => {
948                    assert_eq!(key_actual, key2);
949                    assert_eq!(value, Bytes::from("data from peer 3"));
950                }
951                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
952            }
953        });
954    }
955
956    #[test_traced]
957    fn test_fetch_targeted() {
958        let executor = deterministic::Runner::timed(Duration::from_secs(10));
959        executor.start(|context| async move {
960            let (mut oracle, mut schemes, peers, mut connections) =
961                setup_network_and_peers(&context, &[1, 2, 3]).await;
962
963            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
964            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
965
966            let key = Key(1);
967            let invalid_data = Bytes::from("invalid data");
968            let valid_data = Bytes::from("valid data");
969
970            // Peer 2 has invalid data, peer 3 has valid data
971            let mut prod2 = Producer::default();
972            prod2.insert(key.clone(), invalid_data.clone());
973
974            let mut prod3 = Producer::default();
975            prod3.insert(key.clone(), valid_data.clone());
976
977            // Consumer expects only valid_data
978            let (mut cons1, mut cons_out1) = Consumer::new();
979            cons1.add_expected(key.clone(), valid_data.clone());
980
981            let scheme = schemes.remove(0);
982            let mut mailbox1 = setup_and_spawn_actor(
983                &context,
984                oracle.manager(),
985                oracle.control(scheme.public_key()),
986                scheme,
987                connections.remove(0),
988                cons1,
989                Producer::default(),
990            );
991
992            let scheme = schemes.remove(0);
993            let _mailbox2 = setup_and_spawn_actor(
994                &context,
995                oracle.manager(),
996                oracle.control(scheme.public_key()),
997                scheme,
998                connections.remove(0),
999                Consumer::dummy(),
1000                prod2,
1001            );
1002
1003            let scheme = schemes.remove(0);
1004            let _mailbox3 = setup_and_spawn_actor(
1005                &context,
1006                oracle.manager(),
1007                oracle.control(scheme.public_key()),
1008                scheme,
1009                connections.remove(0),
1010                Consumer::dummy(),
1011                prod3,
1012            );
1013
1014            // Wait for peer set to be established
1015            context.sleep(Duration::from_millis(100)).await;
1016
1017            // Start fetch with targets for both peer 2 (invalid data) and peer 3 (valid data)
1018            // When peer 2 returns invalid data, only peer 2 should be removed from targets
1019            // Peer 3 should still be tried as a target and succeed
1020            mailbox1
1021                .fetch_targeted(
1022                    key.clone(),
1023                    non_empty_vec![peers[1].clone(), peers[2].clone()],
1024                )
1025                .await;
1026
1027            // Should eventually succeed from peer 3
1028            let event = cons_out1.recv().await.unwrap();
1029            match event {
1030                Event::Success(key_actual, value) => {
1031                    assert_eq!(key_actual, key);
1032                    assert_eq!(value, valid_data);
1033                }
1034                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1035            }
1036
1037            // Verify peer 2 was blocked (sent invalid data)
1038            let blocked = oracle.blocked().await.unwrap();
1039            assert_eq!(blocked.len(), 1);
1040            assert_eq!(blocked[0].0, peers[0]);
1041            assert_eq!(blocked[0].1, peers[1]);
1042
1043            // Verify metrics: 1 successful fetch (from peer 3 after peer 2 was blocked)
1044            let metrics = context.encode();
1045            assert!(metrics.contains("_fetch_total{status=\"Success\"} 1"));
1046        });
1047    }
1048
1049    #[test_traced]
1050    fn test_fetch_targeted_no_fallback() {
1051        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1052        executor.start(|context| async move {
1053            let (mut oracle, mut schemes, peers, mut connections) =
1054                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1055
1056            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1057            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1058            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1059
1060            let key = Key(1);
1061
1062            // Only peer 4 has the data, peers 2 and 3 don't
1063            let mut prod4 = Producer::default();
1064            prod4.insert(key.clone(), Bytes::from("data from peer 4"));
1065
1066            let (cons1, mut cons_out1) = Consumer::new();
1067
1068            let scheme = schemes.remove(0);
1069            let mut mailbox1 = setup_and_spawn_actor(
1070                &context,
1071                oracle.manager(),
1072                oracle.control(scheme.public_key()),
1073                scheme,
1074                connections.remove(0),
1075                cons1,
1076                Producer::default(),
1077            );
1078
1079            let scheme = schemes.remove(0);
1080            let _mailbox2 = setup_and_spawn_actor(
1081                &context,
1082                oracle.manager(),
1083                oracle.control(scheme.public_key()),
1084                scheme,
1085                connections.remove(0),
1086                Consumer::dummy(),
1087                Producer::default(), // no data
1088            );
1089
1090            let scheme = schemes.remove(0);
1091            let _mailbox3 = setup_and_spawn_actor(
1092                &context,
1093                oracle.manager(),
1094                oracle.control(scheme.public_key()),
1095                scheme,
1096                connections.remove(0),
1097                Consumer::dummy(),
1098                Producer::default(), // no data
1099            );
1100
1101            let scheme = schemes.remove(0);
1102            let _mailbox4 = setup_and_spawn_actor(
1103                &context,
1104                oracle.manager(),
1105                oracle.control(scheme.public_key()),
1106                scheme,
1107                connections.remove(0),
1108                Consumer::dummy(),
1109                prod4,
1110            );
1111
1112            // Wait for peer set to be established
1113            context.sleep(Duration::from_millis(100)).await;
1114
1115            // Start fetch with targets for peers 2 and 3 (both don't have data)
1116            // Peer 4 has data but is NOT a target - it should NEVER be tried
1117            mailbox1
1118                .fetch_targeted(
1119                    key.clone(),
1120                    non_empty_vec![peers[1].clone(), peers[2].clone()],
1121                )
1122                .await;
1123
1124            // Wait enough time for targets to fail and retry multiple times
1125            // The fetch should not succeed because peer 4 (which has data) is not targeted
1126            select! {
1127                event = cons_out1.recv() => {
1128                    panic!("Fetch should not succeed, but got: {event:?}");
1129                },
1130                _ = context.sleep(Duration::from_secs(3)) => {
1131                    // Expected: no success event because peer 4 is not targeted
1132                },
1133            };
1134        });
1135    }
1136
1137    #[test_traced]
1138    fn test_fetch_all_targeted() {
1139        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1140        executor.start(|context| async move {
1141            let (mut oracle, mut schemes, peers, mut connections) =
1142                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1143
1144            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1145            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1146            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1147
1148            let key1 = Key(1);
1149            let key2 = Key(2);
1150            let key3 = Key(3);
1151
1152            // Peer 2 has key1
1153            let mut prod2 = Producer::default();
1154            prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1155
1156            // Peer 3 has key3
1157            let mut prod3 = Producer::default();
1158            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1159
1160            // Peer 4 has key2
1161            let mut prod4 = Producer::default();
1162            prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1163
1164            // Consumer expects all three keys
1165            let (mut cons1, mut cons_out1) = Consumer::new();
1166            cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1167            cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1168            cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1169
1170            let scheme = schemes.remove(0);
1171            let mut mailbox1 = setup_and_spawn_actor(
1172                &context,
1173                oracle.manager(),
1174                oracle.control(scheme.public_key()),
1175                scheme,
1176                connections.remove(0),
1177                cons1,
1178                Producer::default(),
1179            );
1180
1181            let scheme = schemes.remove(0);
1182            let _mailbox2 = setup_and_spawn_actor(
1183                &context,
1184                oracle.manager(),
1185                oracle.control(scheme.public_key()),
1186                scheme,
1187                connections.remove(0),
1188                Consumer::dummy(),
1189                prod2,
1190            );
1191
1192            let scheme = schemes.remove(0);
1193            let _mailbox3 = setup_and_spawn_actor(
1194                &context,
1195                oracle.manager(),
1196                oracle.control(scheme.public_key()),
1197                scheme,
1198                connections.remove(0),
1199                Consumer::dummy(),
1200                prod3,
1201            );
1202
1203            let scheme = schemes.remove(0);
1204            let _mailbox4 = setup_and_spawn_actor(
1205                &context,
1206                oracle.manager(),
1207                oracle.control(scheme.public_key()),
1208                scheme,
1209                connections.remove(0),
1210                Consumer::dummy(),
1211                prod4,
1212            );
1213
1214            // Wait for peer set to be established
1215            context.sleep(Duration::from_millis(100)).await;
1216
1217            // Fetch keys with mixed targeting:
1218            // - key1 targeted to peer 2 (has data) -> should succeed from target
1219            // - key2 targeted to peer 4 (has data) -> should succeed from target
1220            // - key3 no targeting -> fetched from any peer (peer 3 has it)
1221            mailbox1
1222                .fetch_all_targeted(vec![
1223                    (key1.clone(), non_empty_vec![peers[1].clone()]), // peer 2 has key1
1224                    (key2.clone(), non_empty_vec![peers[3].clone()]), // peer 4 has key2
1225                ])
1226                .await;
1227            mailbox1.fetch(key3.clone()).await; // no targeting for key3
1228
1229            // Collect all three events
1230            let mut results = HashMap::new();
1231            for _ in 0..3 {
1232                let event = cons_out1.recv().await.unwrap();
1233                match event {
1234                    Event::Success(key, value) => {
1235                        results.insert(key, value);
1236                    }
1237                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1238                }
1239            }
1240
1241            // Verify all keys received correct data
1242            assert_eq!(results.len(), 3);
1243            assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1244            assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1245            assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1246
1247            // Verify metrics: 3 successful fetches
1248            let metrics = context.encode();
1249            assert!(metrics.contains("_fetch_total{status=\"Success\"} 3"));
1250        });
1251    }
1252
1253    /// Tests that calling fetch() on an in-progress targeted fetch clears the targets,
1254    /// allowing the fetch to succeed from any available peer.
1255    #[test_traced]
1256    fn test_fetch_clears_targets() {
1257        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1258        executor.start(|context| async move {
1259            let (mut oracle, mut schemes, peers, mut connections) =
1260                setup_network_and_peers(&context, &[1, 2, 3]).await;
1261
1262            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1263            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1264
1265            let key = Key(1);
1266            let valid_data = Bytes::from("valid data");
1267
1268            // Peer 2 has no data, peer 3 has the data
1269            let mut prod3 = Producer::default();
1270            prod3.insert(key.clone(), valid_data.clone());
1271
1272            let (cons1, mut cons_out1) = Consumer::new();
1273
1274            let scheme = schemes.remove(0);
1275            let mut mailbox1 = setup_and_spawn_actor(
1276                &context,
1277                oracle.manager(),
1278                oracle.control(scheme.public_key()),
1279                scheme,
1280                connections.remove(0),
1281                cons1,
1282                Producer::default(),
1283            );
1284
1285            let scheme = schemes.remove(0);
1286            let _mailbox2 = setup_and_spawn_actor(
1287                &context,
1288                oracle.manager(),
1289                oracle.control(scheme.public_key()),
1290                scheme,
1291                connections.remove(0),
1292                Consumer::dummy(),
1293                Producer::default(), // no data
1294            );
1295
1296            let scheme = schemes.remove(0);
1297            let _mailbox3 = setup_and_spawn_actor(
1298                &context,
1299                oracle.manager(),
1300                oracle.control(scheme.public_key()),
1301                scheme,
1302                connections.remove(0),
1303                Consumer::dummy(),
1304                prod3,
1305            );
1306
1307            // Wait for peer set to be established
1308            context.sleep(Duration::from_millis(100)).await;
1309
1310            // Start fetch with target for peer 2 only (who doesn't have data)
1311            mailbox1
1312                .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1313                .await;
1314
1315            // Wait for the targeted fetch to fail a few times
1316            context.sleep(Duration::from_millis(500)).await;
1317
1318            // Call fetch() which should clear the targets and allow fallback to any peer
1319            mailbox1.fetch(key.clone()).await;
1320
1321            // Should now succeed from peer 3 (who has data but wasn't originally targeted)
1322            let event = cons_out1.recv().await.unwrap();
1323            match event {
1324                Event::Success(key_actual, value) => {
1325                    assert_eq!(key_actual, key);
1326                    assert_eq!(value, valid_data);
1327                }
1328                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1329            }
1330        });
1331    }
1332
1333    #[test_traced]
1334    fn test_fetch_targeted_does_not_restrict_all() {
1335        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1336        executor.start(|context| async move {
1337            let (mut oracle, mut schemes, peers, mut connections) =
1338                setup_network_and_peers(&context, &[1, 2, 3]).await;
1339
1340            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1341            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1342
1343            let key = Key(1);
1344            let valid_data = Bytes::from("valid data");
1345
1346            // Peer 2 has no data, peer 3 has the data
1347            let mut prod3 = Producer::default();
1348            prod3.insert(key.clone(), valid_data.clone());
1349
1350            let (cons1, mut cons_out1) = Consumer::new();
1351
1352            let scheme = schemes.remove(0);
1353            let mut mailbox1 = setup_and_spawn_actor(
1354                &context,
1355                oracle.manager(),
1356                oracle.control(scheme.public_key()),
1357                scheme,
1358                connections.remove(0),
1359                cons1,
1360                Producer::default(),
1361            );
1362
1363            let scheme = schemes.remove(0);
1364            let _mailbox2 = setup_and_spawn_actor(
1365                &context,
1366                oracle.manager(),
1367                oracle.control(scheme.public_key()),
1368                scheme,
1369                connections.remove(0),
1370                Consumer::dummy(),
1371                Producer::default(), // no data
1372            );
1373
1374            let scheme = schemes.remove(0);
1375            let _mailbox3 = setup_and_spawn_actor(
1376                &context,
1377                oracle.manager(),
1378                oracle.control(scheme.public_key()),
1379                scheme,
1380                connections.remove(0),
1381                Consumer::dummy(),
1382                prod3,
1383            );
1384
1385            // Wait for peer set to be established
1386            context.sleep(Duration::from_millis(100)).await;
1387
1388            // Start fetch without targets (can try any peer)
1389            mailbox1.fetch(key.clone()).await;
1390
1391            // Wait a bit for the fetch to start
1392            context.sleep(Duration::from_millis(50)).await;
1393
1394            // Call fetch_targeted with peer 2 only (who doesn't have data)
1395            // This should NOT restrict the existing "all" fetch
1396            mailbox1
1397                .fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()])
1398                .await;
1399
1400            // Should still succeed from peer 3 (who has data but wasn't in the targeted call)
1401            // because the original fetch was "all" and shouldn't be restricted
1402            let event = cons_out1.recv().await.unwrap();
1403            match event {
1404                Event::Success(key_actual, value) => {
1405                    assert_eq!(key_actual, key);
1406                    assert_eq!(value, valid_data);
1407                }
1408                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1409            }
1410        });
1411    }
1412
1413    #[test_traced]
1414    fn test_retain() {
1415        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1416        executor.start(|context| async move {
1417            let (mut oracle, mut schemes, peers, mut connections) =
1418                setup_network_and_peers(&context, &[1, 2]).await;
1419
1420            let key = Key(5);
1421            let mut prod2 = Producer::default();
1422            prod2.insert(key.clone(), Bytes::from("data for key 5"));
1423
1424            let (cons1, mut cons_out1) = Consumer::new();
1425
1426            let scheme = schemes.remove(0);
1427            let mut mailbox1 = setup_and_spawn_actor(
1428                &context,
1429                oracle.manager(),
1430                oracle.control(scheme.public_key()),
1431                scheme,
1432                connections.remove(0),
1433                cons1,
1434                Producer::default(),
1435            );
1436
1437            let scheme = schemes.remove(0);
1438            let _mailbox2 = setup_and_spawn_actor(
1439                &context,
1440                oracle.manager(),
1441                oracle.control(scheme.public_key()),
1442                scheme,
1443                connections.remove(0),
1444                Consumer::dummy(),
1445                prod2,
1446            );
1447
1448            // Retain before fetching should have no effect
1449            mailbox1.retain(|_| true).await;
1450            select! {
1451                _ = cons_out1.recv() => {
1452                    panic!("unexpected event");
1453                },
1454                _ = context.sleep(Duration::from_millis(100)) => {},
1455            };
1456
1457            // Start a fetch (no link, so fetch stays in-flight with timer in fetch timers)
1458            mailbox1.fetch(key.clone()).await;
1459
1460            // Retain with predicate that excludes the key
1461            // This must clean up fetch timers entry for the key
1462            let key_clone = key.clone();
1463            mailbox1.retain(move |k| k != &key_clone).await;
1464
1465            // Consumer should receive failed event
1466            let event = cons_out1.recv().await.unwrap();
1467            match event {
1468                Event::Failed(key_actual) => {
1469                    assert_eq!(key_actual, key);
1470                }
1471                Event::Success(_, _) => panic!("Fetch should have been retained out"),
1472            }
1473
1474            // Now add link so fetches can complete
1475            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1476
1477            // Fetch same key again, if fetch timers wasn't cleaned up, this would
1478            // be treated as a duplicate and silently ignored
1479            mailbox1.fetch(key.clone()).await;
1480
1481            // Should succeed
1482            let event = cons_out1.recv().await.unwrap();
1483            match event {
1484                Event::Success(key_actual, value) => {
1485                    assert_eq!(key_actual, key);
1486                    assert_eq!(value, Bytes::from("data for key 5"));
1487                }
1488                Event::Failed(_) => unreachable!(),
1489            }
1490        });
1491    }
1492
1493    #[test_traced]
1494    fn test_clear() {
1495        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1496        executor.start(|context| async move {
1497            let (mut oracle, mut schemes, peers, mut connections) =
1498                setup_network_and_peers(&context, &[1, 2]).await;
1499
1500            // No link yet - fetch will stay in-flight
1501            let key = Key(6);
1502            let mut prod2 = Producer::default();
1503            prod2.insert(key.clone(), Bytes::from("data for key 6"));
1504
1505            let (cons1, mut cons_out1) = Consumer::new();
1506
1507            let scheme = schemes.remove(0);
1508            let mut mailbox1 = setup_and_spawn_actor(
1509                &context,
1510                oracle.manager(),
1511                oracle.control(scheme.public_key()),
1512                scheme,
1513                connections.remove(0),
1514                cons1,
1515                Producer::default(),
1516            );
1517
1518            let scheme = schemes.remove(0);
1519            let _mailbox2 = setup_and_spawn_actor(
1520                &context,
1521                oracle.manager(),
1522                oracle.control(scheme.public_key()),
1523                scheme,
1524                connections.remove(0),
1525                Consumer::dummy(),
1526                prod2,
1527            );
1528
1529            // Clear before fetching should have no effect
1530            mailbox1.clear().await;
1531            select! {
1532                _ = cons_out1.recv() => {
1533                    panic!("unexpected event");
1534                },
1535                _ = context.sleep(Duration::from_millis(100)) => {},
1536            };
1537
1538            // Start a fetch (no link, so fetch stays in-flight with timer in fetch timers)
1539            mailbox1.fetch(key.clone()).await;
1540
1541            // Clear all fetches
1542            mailbox1.clear().await;
1543
1544            // Consumer should receive failed event
1545            let event = cons_out1.recv().await.unwrap();
1546            match event {
1547                Event::Failed(key_actual) => {
1548                    assert_eq!(key_actual, key);
1549                }
1550                Event::Success(_, _) => panic!("Fetch should have been cleared"),
1551            }
1552
1553            // Now add link so fetches can complete
1554            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1555
1556            // Fetch same key again, if fetch_timers wasn't cleaned up, this would
1557            // be treated as a duplicate and silently ignored
1558            mailbox1.fetch(key.clone()).await;
1559
1560            // Should succeed
1561            let event = cons_out1.recv().await.unwrap();
1562            match event {
1563                Event::Success(key_actual, value) => {
1564                    assert_eq!(key_actual, key);
1565                    assert_eq!(value, Bytes::from("data for key 6"));
1566                }
1567                Event::Failed(_) => unreachable!(),
1568            }
1569        });
1570    }
1571
1572    /// Tests that when a peer is rate-limited, the fetcher spills over to another peer.
1573    /// With 2 peers and rate limit of 1/sec each, 2 requests issued simultaneously should
1574    /// both complete immediately (one to each peer) without waiting for rate limit reset.
1575    #[test_traced]
1576    fn test_rate_limit_spillover() {
1577        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1578        executor.start(|context| async move {
1579            // Use a very restrictive rate limit: 1 request per second per peer
1580            let (mut oracle, mut schemes, peers, mut connections) =
1581                setup_network_and_peers_with_rate_limit(
1582                    &context,
1583                    &[1, 2, 3],
1584                    Quota::per_second(NZU32!(1)),
1585                )
1586                .await;
1587
1588            // Add links between peer 1 and both peer 2 and peer 3
1589            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1590            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1591
1592            // Both peer 2 and peer 3 have the same data
1593            let mut prod2 = Producer::default();
1594            let mut prod3 = Producer::default();
1595            prod2.insert(Key(0), Bytes::from("data for key 0"));
1596            prod2.insert(Key(1), Bytes::from("data for key 1"));
1597            prod3.insert(Key(0), Bytes::from("data for key 0"));
1598            prod3.insert(Key(1), Bytes::from("data for key 1"));
1599
1600            let (cons1, mut cons_out1) = Consumer::new();
1601
1602            // Set up peer 1 (the requester)
1603            let scheme = schemes.remove(0);
1604            let mut mailbox1 = setup_and_spawn_actor(
1605                &context,
1606                oracle.manager(),
1607                oracle.control(scheme.public_key()),
1608                scheme,
1609                connections.remove(0),
1610                cons1,
1611                Producer::default(),
1612            );
1613
1614            // Set up peer 2 (has data)
1615            let scheme = schemes.remove(0);
1616            let _mailbox2 = setup_and_spawn_actor(
1617                &context,
1618                oracle.manager(),
1619                oracle.control(scheme.public_key()),
1620                scheme,
1621                connections.remove(0),
1622                Consumer::dummy(),
1623                prod2,
1624            );
1625
1626            // Set up peer 3 (also has data)
1627            let scheme = schemes.remove(0);
1628            let _mailbox3 = setup_and_spawn_actor(
1629                &context,
1630                oracle.manager(),
1631                oracle.control(scheme.public_key()),
1632                scheme,
1633                connections.remove(0),
1634                Consumer::dummy(),
1635                prod3,
1636            );
1637
1638            // Wait for peer set to be established
1639            context.sleep(Duration::from_millis(100)).await;
1640            let start = context.current();
1641
1642            // Issue 2 fetch requests rapidly
1643            // With rate limit of 1/sec per peer and 2 peers, both should complete
1644            // immediately via spill-over (one request to each peer)
1645            mailbox1.fetch(Key(0)).await;
1646            mailbox1.fetch(Key(1)).await;
1647
1648            // Collect results
1649            let mut results = HashMap::new();
1650            for _ in 0..2 {
1651                let event = cons_out1.recv().await.unwrap();
1652                match event {
1653                    Event::Success(key, value) => {
1654                        results.insert(key.clone(), value);
1655                    }
1656                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1657                }
1658            }
1659
1660            // Verify both keys were fetched successfully
1661            assert_eq!(results.len(), 2);
1662            assert_eq!(
1663                results.get(&Key(0)).unwrap(),
1664                &Bytes::from("data for key 0")
1665            );
1666            assert_eq!(
1667                results.get(&Key(1)).unwrap(),
1668                &Bytes::from("data for key 1")
1669            );
1670
1671            // Verify it completed quickly (well under 1 second) - proves spill-over worked
1672            // Without spill-over, the second request would wait ~1 second for rate limit reset
1673            let elapsed = context.current().duration_since(start).unwrap();
1674            assert!(
1675                elapsed < Duration::from_millis(500),
1676                "Expected quick completion via spill-over, but took {elapsed:?}"
1677            );
1678        });
1679    }
1680
1681    /// Tests that rate limiting causes retries to eventually succeed after the rate limit resets.
1682    /// This test uses a single peer with a restrictive rate limit and verifies that
1683    /// fetches eventually complete after waiting for the rate limit to reset.
1684    #[test_traced]
1685    fn test_rate_limit_retry_after_reset() {
1686        let executor = deterministic::Runner::timed(Duration::from_secs(30));
1687        executor.start(|context| async move {
1688            // Use a restrictive rate limit: 1 request per second
1689            let (mut oracle, mut schemes, peers, mut connections) =
1690                setup_network_and_peers_with_rate_limit(
1691                    &context,
1692                    &[1, 2],
1693                    Quota::per_second(NZU32!(1)),
1694                )
1695                .await;
1696
1697            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1698
1699            // Peer 2 has data for multiple keys
1700            let mut prod2 = Producer::default();
1701            prod2.insert(Key(1), Bytes::from("data for key 1"));
1702            prod2.insert(Key(2), Bytes::from("data for key 2"));
1703            prod2.insert(Key(3), Bytes::from("data for key 3"));
1704
1705            let (cons1, mut cons_out1) = Consumer::new();
1706
1707            let scheme = schemes.remove(0);
1708            let mut mailbox1 = setup_and_spawn_actor(
1709                &context,
1710                oracle.manager(),
1711                oracle.control(scheme.public_key()),
1712                scheme,
1713                connections.remove(0),
1714                cons1,
1715                Producer::default(),
1716            );
1717
1718            let scheme = schemes.remove(0);
1719            let _mailbox2 = setup_and_spawn_actor(
1720                &context,
1721                oracle.manager(),
1722                oracle.control(scheme.public_key()),
1723                scheme,
1724                connections.remove(0),
1725                Consumer::dummy(),
1726                prod2,
1727            );
1728
1729            // Wait for peer set to be established
1730            context.sleep(Duration::from_millis(100)).await;
1731            let start = context.current();
1732
1733            // Issue 3 fetch requests to a single peer with rate limit of 1/sec
1734            // Only 1 can be sent immediately, the others must wait for rate limit reset
1735            mailbox1.fetch(Key(1)).await;
1736            mailbox1.fetch(Key(2)).await;
1737            mailbox1.fetch(Key(3)).await;
1738
1739            // All 3 should eventually succeed (after rate limit resets)
1740            let mut results = HashMap::new();
1741            for _ in 0..3 {
1742                let event = cons_out1.recv().await.unwrap();
1743                match event {
1744                    Event::Success(key, value) => {
1745                        results.insert(key.clone(), value);
1746                    }
1747                    Event::Failed(key) => panic!("Fetch failed for key {key:?}"),
1748                }
1749            }
1750
1751            assert_eq!(results.len(), 3);
1752            for i in 1..=3 {
1753                assert_eq!(
1754                    results.get(&Key(i)).unwrap(),
1755                    &Bytes::from(format!("data for key {}", i))
1756                );
1757            }
1758
1759            // Verify it took significant time due to rate limiting
1760            // With 3 requests at 1/sec to a single peer, requests 2 and 3 must wait
1761            // for rate limit resets (~1 second each), so total should be > 2 seconds
1762            let elapsed = context.current().duration_since(start).unwrap();
1763            assert!(
1764                elapsed > Duration::from_secs(2),
1765                "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
1766            );
1767        });
1768    }
1769
1770    /// Tests that the resolver never sends fetch requests to itself (me exclusion).
1771    /// Even when the local peer has the data in its producer, it should fetch from
1772    /// another peer instead.
1773    #[test_traced]
1774    fn test_self_exclusion() {
1775        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1776        executor.start(|context| async move {
1777            let (mut oracle, mut schemes, peers, mut connections) =
1778                setup_network_and_peers(&context, &[1, 2]).await;
1779
1780            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1781
1782            let key = Key(1);
1783            let data = Bytes::from("shared data");
1784
1785            // Both peers have the data - peer 1 (requester) and peer 2
1786            let mut prod1 = Producer::default();
1787            prod1.insert(key.clone(), data.clone());
1788            let mut prod2 = Producer::default();
1789            prod2.insert(key.clone(), data.clone());
1790
1791            let (cons1, mut cons_out1) = Consumer::new();
1792
1793            // Set up peer 1 with `me` set - it has the data but should NOT fetch from itself
1794            let scheme = schemes.remove(0);
1795            let mut mailbox1 = setup_and_spawn_actor(
1796                &context,
1797                oracle.manager(),
1798                oracle.control(scheme.public_key()),
1799                scheme,
1800                connections.remove(0),
1801                cons1,
1802                prod1, // peer 1 has the data
1803            );
1804
1805            // Set up peer 2 - also has the data
1806            let scheme = schemes.remove(0);
1807            let _mailbox2 = setup_and_spawn_actor(
1808                &context,
1809                oracle.manager(),
1810                oracle.control(scheme.public_key()),
1811                scheme,
1812                connections.remove(0),
1813                Consumer::dummy(),
1814                prod2,
1815            );
1816
1817            // Wait for peer set to be established
1818            context.sleep(Duration::from_millis(100)).await;
1819
1820            // Fetch the key - should get it from peer 2, not from self
1821            mailbox1.fetch(key.clone()).await;
1822
1823            // Should succeed (from peer 2)
1824            let event = cons_out1.recv().await.unwrap();
1825            match event {
1826                Event::Success(key_actual, value) => {
1827                    assert_eq!(key_actual, key);
1828                    assert_eq!(value, data);
1829                }
1830                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
1831            }
1832        });
1833    }
1834
1835    #[test_traced]
1836    fn test_fetch_uses_primary_peers_only() {
1837        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1838        executor.start(|context| async move {
1839            let (network, oracle) = Network::new(
1840                context.with_label("network"),
1841                commonware_p2p::simulated::Config {
1842                    max_size: 1024 * 1024,
1843                    disconnect_on_block: true,
1844                    tracked_peer_sets: NZUsize!(1),
1845                },
1846            );
1847            network.start();
1848
1849            let schemes: Vec<PrivateKey> = [1u64, 2, 3]
1850                .into_iter()
1851                .map(PrivateKey::from_seed)
1852                .collect();
1853            let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
1854            let mut schemes = schemes;
1855
1856            let mut connections = Vec::new();
1857            for peer in &peers {
1858                let (sender, receiver) = oracle
1859                    .control(peer.clone())
1860                    .register(0, Quota::per_second(RATE_LIMIT))
1861                    .await
1862                    .unwrap();
1863                connections.push((sender, receiver));
1864            }
1865
1866            // Topology: peer 1 (requester) linked to peers 2 and 3.
1867            // Peer 2 is primary (no data), peer 3 is secondary (has data).
1868            // Fetch should only query primary peers, so the request must time out.
1869            let mut oracle = oracle;
1870            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1871            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1872
1873            oracle
1874                .manager()
1875                .track(
1876                    1,
1877                    TrackedPeers::new(
1878                        Set::try_from([peers[1].clone()]).unwrap(),
1879                        Set::try_from([peers[2].clone()]).unwrap(),
1880                    ),
1881                )
1882                .await;
1883            context.sleep(Duration::from_millis(100)).await;
1884
1885            let key = Key(1);
1886            let data = Bytes::from("secondary only data");
1887
1888            let (cons1, mut cons_out1) = Consumer::new();
1889
1890            // Peer 1: the requester, has no data.
1891            let scheme = schemes.remove(0);
1892            let mut mailbox1 = setup_and_spawn_actor(
1893                &context,
1894                oracle.manager(),
1895                oracle.control(scheme.public_key()),
1896                scheme,
1897                connections.remove(0),
1898                cons1,
1899                Producer::default(),
1900            );
1901
1902            // Peer 2: primary, has no data.
1903            let scheme = schemes.remove(0);
1904            let _mailbox2 = setup_and_spawn_actor(
1905                &context,
1906                oracle.manager(),
1907                oracle.control(scheme.public_key()),
1908                scheme,
1909                connections.remove(0),
1910                Consumer::dummy(),
1911                Producer::default(),
1912            );
1913
1914            // Peer 3: secondary, has the data. Should not be queried.
1915            let mut prod3 = Producer::default();
1916            prod3.insert(key.clone(), data);
1917            let scheme = schemes.remove(0);
1918            let _mailbox3 = setup_and_spawn_actor(
1919                &context,
1920                oracle.manager(),
1921                oracle.control(scheme.public_key()),
1922                scheme,
1923                connections.remove(0),
1924                Consumer::dummy(),
1925                prod3,
1926            );
1927
1928            // Fetch should time out because the only peer with data (peer 3)
1929            // is secondary and won't be queried.
1930            mailbox1.fetch(key.clone()).await;
1931
1932            select! {
1933                event = cons_out1.recv() => {
1934                    panic!("fetch should not succeed from a secondary peer, got: {event:?}");
1935                },
1936                _ = context.sleep(Duration::from_secs(2)) => {},
1937            }
1938        });
1939    }
1940
1941    #[test_traced]
1942    fn test_fetch_uses_latest_primary_set_only() {
1943        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1944        executor.start(|context| async move {
1945            let (network, oracle) = Network::new(
1946                context.with_label("network"),
1947                commonware_p2p::simulated::Config {
1948                    max_size: 1024 * 1024,
1949                    disconnect_on_block: true,
1950                    tracked_peer_sets: NZUsize!(2),
1951                },
1952            );
1953            network.start();
1954
1955            let schemes: Vec<PrivateKey> = [1u64, 2, 3]
1956                .into_iter()
1957                .map(PrivateKey::from_seed)
1958                .collect();
1959            let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
1960            let mut schemes = schemes;
1961
1962            let mut connections = Vec::new();
1963            for peer in &peers {
1964                let (sender, receiver) = oracle
1965                    .control(peer.clone())
1966                    .register(0, Quota::per_second(RATE_LIMIT))
1967                    .await
1968                    .unwrap();
1969                connections.push((sender, receiver));
1970            }
1971
1972            let mut oracle = oracle;
1973            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1974            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1975
1976            // Keep the requester tracked across the cutover so the fetch path itself remains
1977            // active, while peer 2 is retained only through the overlap window after peer 3
1978            // becomes the newest primary set.
1979            oracle
1980                .manager()
1981                .track(
1982                    0,
1983                    Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
1984                )
1985                .await;
1986            context.sleep(Duration::from_millis(100)).await;
1987
1988            let key = Key(7);
1989            let targeted_key = Key(8);
1990            let data = Bytes::from("old primary data");
1991
1992            let (cons1, mut cons_out1) = Consumer::new();
1993
1994            // Peer 1: requester.
1995            let scheme = schemes.remove(0);
1996            let mut mailbox1 = setup_and_spawn_actor(
1997                &context,
1998                oracle.manager(),
1999                oracle.control(scheme.public_key()),
2000                scheme,
2001                connections.remove(0),
2002                cons1,
2003                Producer::default(),
2004            );
2005
2006            // Peer 2: old primary, still retained in `all.primary`, has the data.
2007            let mut prod2 = Producer::default();
2008            prod2.insert(key.clone(), data.clone());
2009            prod2.insert(targeted_key.clone(), data);
2010            let scheme = schemes.remove(0);
2011            let _mailbox2 = setup_and_spawn_actor(
2012                &context,
2013                oracle.manager(),
2014                oracle.control(scheme.public_key()),
2015                scheme,
2016                connections.remove(0),
2017                Consumer::dummy(),
2018                prod2,
2019            );
2020
2021            // Peer 3: latest primary, has no data.
2022            let scheme = schemes.remove(0);
2023            let _mailbox3 = setup_and_spawn_actor(
2024                &context,
2025                oracle.manager(),
2026                oracle.control(scheme.public_key()),
2027                scheme,
2028                connections.remove(0),
2029                Consumer::dummy(),
2030                Producer::default(),
2031            );
2032
2033            context.sleep(Duration::from_millis(100)).await;
2034
2035            // Track peer 3 as the latest primary while keeping the requester in the peer set.
2036            // Peer 2 remains in the provider's overlap window (`all.primary`), but new resolver traffic
2037            // should use only `latest.primary`.
2038            oracle
2039                .manager()
2040                .track(
2041                    1,
2042                    Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
2043                )
2044                .await;
2045            context.sleep(Duration::from_millis(100)).await;
2046
2047            mailbox1.fetch(key).await;
2048
2049            select! {
2050                event = cons_out1.recv() => {
2051                    panic!(
2052                        "fetch should not succeed from an old primary retained only in the overlap window, got: {event:?}"
2053                    );
2054                },
2055                _ = context.sleep(Duration::from_secs(1)) => {},
2056            }
2057
2058            // Explicit targets still respect the latest-primary filter.
2059            mailbox1
2060                .fetch_targeted(targeted_key, non_empty_vec![peers[1].clone()])
2061                .await;
2062
2063            select! {
2064                event = cons_out1.recv() => {
2065                    panic!(
2066                        "targeted fetch should not bypass the latest-primary filter, got: {event:?}"
2067                    );
2068                },
2069                _ = context.sleep(Duration::from_secs(1)) => {},
2070            }
2071        });
2072    }
2073
2074    #[test_traced]
2075    fn test_fetch_after_cutover_relies_on_latest_primary_history() {
2076        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2077        executor.start(|context| async move {
2078            let (network, oracle) = Network::new(
2079                context.with_label("network"),
2080                commonware_p2p::simulated::Config {
2081                    max_size: 1024 * 1024,
2082                    disconnect_on_block: true,
2083                    tracked_peer_sets: NZUsize!(2),
2084                },
2085            );
2086            network.start();
2087
2088            let schemes: Vec<PrivateKey> = [1u64, 2, 3]
2089                .into_iter()
2090                .map(PrivateKey::from_seed)
2091                .collect();
2092            let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
2093            let mut schemes = schemes;
2094
2095            let mut connections = Vec::new();
2096            for peer in &peers {
2097                let (sender, receiver) = oracle
2098                    .control(peer.clone())
2099                    .register(0, Quota::per_second(RATE_LIMIT))
2100                    .await
2101                    .unwrap();
2102                connections.push((sender, receiver));
2103            }
2104
2105            let mut oracle = oracle;
2106            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2107            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2108
2109            // Keep the requester in the peer set across the cutover while peer 2 remains connected
2110            // only through the overlap window after the latest primary advances to peer 3.
2111            oracle
2112                .manager()
2113                .track(
2114                    0,
2115                    Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
2116                )
2117                .await;
2118            context.sleep(Duration::from_millis(100)).await;
2119
2120            let key = Key(9);
2121            let invalid_history = Bytes::from("stale overlap history");
2122            let valid_history = Bytes::from("latest primary history");
2123
2124            let (mut cons1, mut cons_out1) = Consumer::new();
2125            cons1.add_expected(key.clone(), valid_history.clone());
2126
2127            // Peer 1: requester.
2128            let scheme = schemes.remove(0);
2129            let mut mailbox1 = setup_and_spawn_actor(
2130                &context,
2131                oracle.manager(),
2132                oracle.control(scheme.public_key()),
2133                scheme,
2134                connections.remove(0),
2135                cons1,
2136                Producer::default(),
2137            );
2138
2139            // Peer 2: old primary retained only via overlap. If queried, it would be blocked for
2140            // serving invalid history.
2141            let mut prod2 = Producer::default();
2142            prod2.insert(key.clone(), invalid_history);
2143            let scheme = schemes.remove(0);
2144            let _mailbox2 = setup_and_spawn_actor(
2145                &context,
2146                oracle.manager(),
2147                oracle.control(scheme.public_key()),
2148                scheme,
2149                connections.remove(0),
2150                Consumer::dummy(),
2151                prod2,
2152            );
2153
2154            // Peer 3: latest primary and the only peer that should satisfy the fetch.
2155            let mut prod3 = Producer::default();
2156            prod3.insert(key.clone(), valid_history.clone());
2157            let scheme = schemes.remove(0);
2158            let _mailbox3 = setup_and_spawn_actor(
2159                &context,
2160                oracle.manager(),
2161                oracle.control(scheme.public_key()),
2162                scheme,
2163                connections.remove(0),
2164                Consumer::dummy(),
2165                prod3,
2166            );
2167
2168            context.sleep(Duration::from_millis(100)).await;
2169
2170            oracle
2171                .manager()
2172                .track(
2173                    1,
2174                    Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
2175                )
2176                .await;
2177            context.sleep(Duration::from_millis(100)).await;
2178
2179            mailbox1.fetch(key.clone()).await;
2180
2181            let event = cons_out1.recv().await.unwrap();
2182            match event {
2183                Event::Success(key_actual, value) => {
2184                    assert_eq!(key_actual, key);
2185                    assert_eq!(value, valid_history);
2186                }
2187                Event::Failed(_) => panic!("fetch failed unexpectedly"),
2188            }
2189
2190            assert!(
2191                oracle.blocked().await.unwrap().is_empty(),
2192                "overlap-only peers should not be queried for post-cutover history"
2193            );
2194        });
2195    }
2196
2197    #[test_traced]
2198    fn test_secondary_peer_requests_are_served() {
2199        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2200        executor.start(|context| async move {
2201            let (mut oracle, mut schemes, peers, mut connections) =
2202                setup_network_and_peers(&context, &[1, 2]).await;
2203
2204            // Topology: peer 1 is primary (has data), peer 2 is secondary (requester).
2205            // Verifies that a primary peer serves requests from secondary peers
2206            // (i.e. secondary peers can't fetch via broadcast, but their direct
2207            // requests are still answered).
2208            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2209
2210            oracle
2211                .manager()
2212                .track(
2213                    1,
2214                    TrackedPeers::new(
2215                        Set::try_from([peers[0].clone()]).unwrap(),
2216                        Set::try_from([peers[1].clone()]).unwrap(),
2217                    ),
2218                )
2219                .await;
2220            context.sleep(Duration::from_millis(100)).await;
2221
2222            let key = Key(9);
2223            let data = Bytes::from("served to secondary");
2224
2225            // Peer 1: primary, has the data.
2226            let mut prod1 = Producer::default();
2227            prod1.insert(key.clone(), data.clone());
2228
2229            let scheme = schemes.remove(0);
2230            let _mailbox1 = setup_and_spawn_actor(
2231                &context,
2232                oracle.manager(),
2233                oracle.control(scheme.public_key()),
2234                scheme,
2235                connections.remove(0),
2236                Consumer::dummy(),
2237                prod1,
2238            );
2239
2240            // Peer 2: secondary, uses fetch_targeted to explicitly request from peer 1.
2241            let (mut cons2, mut cons_out2) = Consumer::new();
2242            cons2.add_expected(key.clone(), data.clone());
2243            let scheme = schemes.remove(0);
2244            let mut mailbox2 = setup_and_spawn_actor(
2245                &context,
2246                oracle.manager(),
2247                oracle.control(scheme.public_key()),
2248                scheme,
2249                connections.remove(0),
2250                cons2,
2251                Producer::default(),
2252            );
2253
2254            mailbox2
2255                .fetch_targeted(key.clone(), non_empty_vec![peers[0].clone()])
2256                .await;
2257
2258            let event = cons_out2.recv().await.unwrap();
2259            match event {
2260                Event::Success(key_actual, value) => {
2261                    assert_eq!(key_actual, key);
2262                    assert_eq!(value, data);
2263                }
2264                Event::Failed(_) => panic!("secondary peer request should have been served"),
2265            }
2266        });
2267    }
2268
2269    #[allow(clippy::type_complexity)]
2270    fn spawn_actors_with_handles(
2271        context: deterministic::Context,
2272        oracle: &Oracle<PublicKey, deterministic::Context>,
2273        schemes: Vec<PrivateKey>,
2274        connections: Vec<(
2275            Sender<PublicKey, deterministic::Context>,
2276            Receiver<PublicKey>,
2277        )>,
2278        consumers: Vec<Consumer<Key, Bytes>>,
2279        producers: Vec<Producer<Key, Bytes>>,
2280    ) -> (
2281        Vec<Mailbox<Key, PublicKey>>,
2282        Vec<commonware_runtime::Handle<()>>,
2283    ) {
2284        let actor_context = context.with_label("actor");
2285        let mut mailboxes = Vec::new();
2286        let mut handles = Vec::new();
2287
2288        for (idx, ((scheme, conn), (consumer, producer))) in schemes
2289            .into_iter()
2290            .zip(connections)
2291            .zip(consumers.into_iter().zip(producers))
2292            .enumerate()
2293        {
2294            let ctx = actor_context.with_label(&format!("peer_{idx}"));
2295            let public_key = scheme.public_key();
2296            let (engine, mailbox) = Engine::new(
2297                ctx,
2298                Config {
2299                    peer_provider: oracle.manager(),
2300                    blocker: oracle.control(public_key.clone()),
2301                    consumer,
2302                    producer,
2303                    mailbox_size: MAILBOX_SIZE,
2304                    me: Some(public_key),
2305                    initial: INITIAL_DURATION,
2306                    timeout: TIMEOUT,
2307                    fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
2308                    priority_requests: false,
2309                    priority_responses: false,
2310                },
2311            );
2312            handles.push(engine.start(conn));
2313            mailboxes.push(mailbox);
2314        }
2315
2316        (mailboxes, handles)
2317    }
2318
2319    #[test_traced]
2320    fn test_operations_after_shutdown_do_not_panic() {
2321        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2322        executor.start(|context| async move {
2323            let (mut oracle, schemes, peers, connections) =
2324                setup_network_and_peers(&context, &[1, 2]).await;
2325
2326            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2327
2328            let key = Key(1);
2329            let mut prod2 = Producer::default();
2330            prod2.insert(key.clone(), Bytes::from("data for key 1"));
2331
2332            let (cons1, mut cons_out1) = Consumer::new();
2333
2334            let (mut mailboxes, handles) = spawn_actors_with_handles(
2335                context.clone(),
2336                &oracle,
2337                schemes,
2338                connections,
2339                vec![cons1, Consumer::dummy()],
2340                vec![Producer::default(), prod2],
2341            );
2342
2343            // Fetch to verify network is functional
2344            mailboxes[0].fetch(key.clone()).await;
2345            let event = cons_out1.recv().await.unwrap();
2346            match event {
2347                Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
2348                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
2349            }
2350
2351            // Abort all actors
2352            for handle in handles {
2353                handle.abort();
2354            }
2355            context.sleep(Duration::from_millis(100)).await;
2356
2357            // All operations should not panic after shutdown
2358
2359            // Fetch should not panic
2360            let key2 = Key(2);
2361            mailboxes[0].fetch(key2.clone()).await;
2362
2363            // Cancel should not panic
2364            mailboxes[0].cancel(key2.clone()).await;
2365
2366            // Clear should not panic
2367            mailboxes[0].clear().await;
2368
2369            // Retain should not panic
2370            mailboxes[0].retain(|_| true).await;
2371
2372            // Fetch targeted should not panic
2373            mailboxes[0]
2374                .fetch_targeted(Key(3), non_empty_vec![peers[1].clone()])
2375                .await;
2376        });
2377    }
2378
2379    fn clean_shutdown(seed: u64) {
2380        let cfg = deterministic::Config::default()
2381            .with_seed(seed)
2382            .with_timeout(Some(Duration::from_secs(30)));
2383        let executor = deterministic::Runner::new(cfg);
2384        executor.start(|context| async move {
2385            let (mut oracle, schemes, peers, connections) =
2386                setup_network_and_peers(&context, &[1, 2]).await;
2387
2388            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2389
2390            let key = Key(1);
2391            let mut prod2 = Producer::default();
2392            prod2.insert(key.clone(), Bytes::from("data for key 1"));
2393
2394            let (cons1, mut cons_out1) = Consumer::new();
2395
2396            let (mut mailboxes, handles) = spawn_actors_with_handles(
2397                context.clone(),
2398                &oracle,
2399                schemes,
2400                connections,
2401                vec![cons1, Consumer::dummy()],
2402                vec![Producer::default(), prod2],
2403            );
2404
2405            // Allow tasks to start
2406            context.sleep(Duration::from_millis(100)).await;
2407
2408            // Count running tasks under the actor prefix
2409            let running_before = count_running_tasks(&context, "actor");
2410            assert!(
2411                running_before > 0,
2412                "at least one actor task should be running"
2413            );
2414
2415            // Verify network is functional
2416            mailboxes[0].fetch(key.clone()).await;
2417            let event = cons_out1.recv().await.unwrap();
2418            match event {
2419                Event::Success(_, value) => assert_eq!(value, Bytes::from("data for key 1")),
2420                Event::Failed(_) => panic!("Fetch failed unexpectedly"),
2421            }
2422
2423            // Abort all actors
2424            for handle in handles {
2425                handle.abort();
2426            }
2427            context.sleep(Duration::from_millis(100)).await;
2428
2429            // Verify all actor tasks are stopped
2430            let running_after = count_running_tasks(&context, "actor");
2431            assert_eq!(
2432                running_after, 0,
2433                "all actor tasks should be stopped, but {running_after} still running"
2434            );
2435        });
2436    }
2437
2438    #[test]
2439    fn test_clean_shutdown() {
2440        for seed in 0..25 {
2441            clean_shutdown(seed);
2442        }
2443    }
2444}