Skip to main content

commonware_resolver/p2p/
mod.rs

1//! Resolve data identified by a fixed-length key by using the P2P network.
2//!
3//! # Overview
4//!
5//! The `p2p` module enables resolving data by fixed-length keys in a P2P network. Central to the
6//! module is the `peer` actor which manages the fetch lifecycle. Its mailbox allows
7//! initiation and pruning of fetches via the `Resolver` interface.
8//!
9//! The peer handles an arbitrarily large number of concurrent fetches by sending requests
10//! to other peers and processing their responses. It selects peers based on performance, retrying
11//! with another peer if one fails or provides invalid data. Fetches persist until pruned or
12//! fulfilled, delivering data to the `Consumer` for verification.
13//!
14//! The `Consumer` checks data integrity and authenticity (critical in an adversarial environment)
15//! and returns `true` if valid, completing the fetch, or `false` to retry. Pruning a fetch with
16//! in-progress response validation aborts that validation. If the aborted validation would have
17//! returned `false`, the peer is not blocked for that response.
18//!
19//! The peer also serves data to other peers, forwarding network requests to the `Producer`. The
20//! `Producer` provides data asynchronously (e.g., from storage). If it fails, the peer sends an
21//! empty response, prompting the requester to retry elsewhere. Each message between peers contains
22//! an ID. Each request is sent with a unique ID, and each response includes the ID of the request
23//! it responds to.
24//!
25//! # Targeting
26//!
27//! Callers can restrict fetches to specific target peers using
28//! [`TargetedResolver::fetch_targeted`](crate::TargetedResolver::fetch_targeted).
29//! Only target peers are tried, there is no automatic fallback to other peers. Targets persist through
30//! transient failures (timeout, "no data" response, send failure) since the peer might be slow or
31//! receive the data later.
32//!
33//! While a fetch is in progress, callers can modify targeting:
34//! - [`TargetedResolver::fetch_targeted`](crate::TargetedResolver::fetch_targeted) adds peers to the existing target set
35//!   (only if the fetch already has targets, an "all" fetch remains unrestricted)
36//! - [`Resolver::fetch`](crate::Resolver::fetch) clears all targets, allowing fallback to any peer
37//!
38//! These modifications only apply to in-progress fetches. Once a fetch completes (success, pruning,
39//! or blocked peer), the targets for that key are cleared automatically.
40//!
41//! # Subscribers
42//!
43//! [`Resolver::fetch`](crate::Resolver::fetch) accepts a peer-visible key and a
44//! subscriber. This is useful when several subscribers can share the same peer-visible
45//! fetch. A fetch remains active while at least one attached subscriber satisfies the latest
46//! [`Resolver::retain`](crate::Resolver::retain) predicate. When the fetch resolves, the
47//! key and currently retained subscribers are supplied to
48//! [`Consumer::deliver`](crate::Consumer::deliver). Subscribers added while response validation
49//! is in progress are delivered the same accepted response locally.
50//!
51//! # Peer Selection
52//!
53//! Outbound fetches are only sent to peers in `latest.primary` (see [commonware_p2p::Provider]) but inbound
54//! requests are handled for all connected peers. Thus, callers that still expect a key to be fetchable after
55//! a peer set update must ensure the latest primary set can serve it.
56//!
57//! [`TargetedResolver::fetch_targeted`](crate::TargetedResolver::fetch_targeted) can narrow the current primary set
58//! further, but it does not bypass that latest-primary filter. Explicit targets that are no longer
59//! in the latest primary set are ignored until they become primary again.
60//!
61//! # Performance Considerations
62//!
63//! The peer supports arbitrarily many concurrent fetches, but resource usage generally
64//! depends on the rate-limiting configuration of the underlying P2P network.
65
66use bytes::Bytes;
67use commonware_utils::{channel::oneshot, Span};
68
69mod config;
70pub use config::Config;
71mod engine;
72pub use engine::Engine;
73mod fetcher;
74mod inflight;
75mod ingress;
76pub use ingress::Mailbox;
77mod metrics;
78mod wire;
79
80#[cfg(feature = "mocks")]
81pub mod mocks;
82
83/// Serves data requested by the network.
84pub trait Producer: Clone + Send + 'static {
85    /// Type used to key data requested from peers.
86    type Key: Span;
87
88    /// Serve a request received from the network.
89    fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes>;
90}
91
92#[cfg(test)]
93mod tests {
94    use super::{
95        mocks::{Consumer, Key, Producer},
96        Config, Engine, Mailbox,
97    };
98    use crate::{Delivery, Fetch, Resolver, TargetedResolver};
99    use bytes::Bytes;
100    use commonware_cryptography::{
101        ed25519::{PrivateKey, PublicKey},
102        Signer,
103    };
104    use commonware_macros::{select, test_traced};
105    use commonware_p2p::{
106        simulated::{Link, Network, Oracle, Receiver, Sender},
107        Blocker, Manager as _, Provider, TrackedPeers,
108    };
109    use commonware_runtime::{
110        deterministic, telemetry::metrics::count_running_tasks, Clock, Metrics as _, Quota, Runner,
111        Spawner as _, Supervisor as _,
112    };
113    use commonware_utils::{
114        channel::{fallible::FallibleExt, mpsc, oneshot},
115        non_empty_vec,
116        ordered::Set,
117        sync::Mutex,
118        NZUsize, NZU32,
119    };
120    use std::{
121        collections::{HashMap, VecDeque},
122        num::{NonZeroU32, NonZeroUsize},
123        sync::Arc,
124        time::Duration,
125    };
126
127    const MAILBOX_SIZE: NonZeroUsize = NZUsize!(1024);
128    const RATE_LIMIT: NonZeroU32 = NZU32!(10);
129    const INITIAL_DURATION: Duration = Duration::from_millis(100);
130    const TIMEOUT: Duration = Duration::from_millis(400);
131    const FETCH_RETRY_TIMEOUT: Duration = Duration::from_millis(100);
132    const LINK: Link = Link {
133        latency: Duration::from_millis(10),
134        jitter: Duration::from_millis(1),
135        success_rate: 1.0,
136    };
137    const LINK_UNRELIABLE: Link = Link {
138        latency: Duration::from_millis(10),
139        jitter: Duration::from_millis(1),
140        success_rate: 0.5,
141    };
142
143    fn status_metric_total(metrics: &str, name: &str, status: &str) -> u64 {
144        let prefix = format!("{name}{{");
145        let status_label = format!("status=\"{status}\"");
146        metrics
147            .lines()
148            .filter(|line| line.starts_with(&prefix) && line.contains(&status_label))
149            .map(|line| {
150                line.split_whitespace()
151                    .next_back()
152                    .expect("metric line must have a value")
153                    .parse::<u64>()
154                    .expect("status metric value must be an integer")
155            })
156            .sum()
157    }
158
159    async fn setup_network_and_peers(
160        context: &deterministic::Context,
161        peer_seeds: &[u64],
162    ) -> (
163        Oracle<PublicKey, deterministic::Context>,
164        Vec<PrivateKey>,
165        Vec<PublicKey>,
166        Vec<(
167            Sender<PublicKey, deterministic::Context>,
168            Receiver<PublicKey>,
169        )>,
170    ) {
171        setup_network_and_peers_with_rate_limit(context, peer_seeds, Quota::per_second(RATE_LIMIT))
172            .await
173    }
174
175    async fn setup_network_and_peers_with_rate_limit(
176        context: &deterministic::Context,
177        peer_seeds: &[u64],
178        rate_limit: Quota,
179    ) -> (
180        Oracle<PublicKey, deterministic::Context>,
181        Vec<PrivateKey>,
182        Vec<PublicKey>,
183        Vec<(
184            Sender<PublicKey, deterministic::Context>,
185            Receiver<PublicKey>,
186        )>,
187    ) {
188        let (network, oracle) = Network::new(
189            context.child("network"),
190            commonware_p2p::simulated::Config {
191                max_size: 1024 * 1024,
192                disconnect_on_block: true,
193                tracked_peer_sets: NZUsize!(3),
194            },
195        );
196        network.start();
197
198        let schemes: Vec<PrivateKey> = peer_seeds
199            .iter()
200            .map(|seed| PrivateKey::from_seed(*seed))
201            .collect();
202        let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
203        let mut manager = oracle.manager();
204        manager.track(0, Set::try_from(peers.clone()).unwrap());
205
206        let mut connections = Vec::new();
207        for peer in &peers {
208            let (sender, receiver) = oracle
209                .control(peer.clone())
210                .register(0, rate_limit)
211                .await
212                .unwrap();
213            connections.push((sender, receiver));
214        }
215
216        (oracle, schemes, peers, connections)
217    }
218
219    async fn add_link(
220        oracle: &mut Oracle<PublicKey, deterministic::Context>,
221        link: Link,
222        peers: &[PublicKey],
223        from: usize,
224        to: usize,
225    ) {
226        oracle
227            .add_link(peers[from].clone(), peers[to].clone(), link.clone())
228            .await
229            .unwrap();
230        oracle
231            .add_link(peers[to].clone(), peers[from].clone(), link)
232            .await
233            .unwrap();
234    }
235
236    #[derive(Clone, Default)]
237    struct SequencedProducer {
238        data: Arc<Mutex<HashMap<Key, VecDeque<Bytes>>>>,
239    }
240
241    impl SequencedProducer {
242        fn insert(&mut self, key: Key, values: impl IntoIterator<Item = Bytes>) {
243            self.data.lock().insert(key, values.into_iter().collect());
244        }
245
246        fn remaining(&self, key: &Key) -> Vec<Bytes> {
247            self.data
248                .lock()
249                .get(key)
250                .map(|values| values.iter().cloned().collect())
251                .unwrap_or_default()
252        }
253    }
254
255    impl crate::p2p::Producer for SequencedProducer {
256        type Key = Key;
257
258        fn produce(&mut self, key: Self::Key) -> oneshot::Receiver<Bytes> {
259            let (sender, receiver) = oneshot::channel();
260            if let Some(value) = self.data.lock().get_mut(&key).and_then(VecDeque::pop_front) {
261                let _ = sender.send(value);
262            }
263            receiver
264        }
265    }
266
267    fn setup_and_spawn_actor<C, R>(
268        context: &deterministic::Context,
269        provider: impl Provider<PublicKey = PublicKey>,
270        blocker: impl Blocker<PublicKey = PublicKey>,
271        signer: impl Signer<PublicKey = PublicKey>,
272        connection: (
273            Sender<PublicKey, deterministic::Context>,
274            Receiver<PublicKey>,
275        ),
276        consumer: C,
277        producer: Producer<Key, Bytes>,
278    ) -> Mailbox<Key, PublicKey, R>
279    where
280        C: crate::Consumer<Key = Key, Subscriber = R, Value = Bytes>,
281        R: Clone + Ord + Send + 'static,
282    {
283        setup_and_spawn_actor_with_producer(
284            context, provider, blocker, signer, connection, consumer, producer,
285        )
286    }
287
288    fn setup_and_spawn_actor_with_producer<C, R, Pro>(
289        context: &deterministic::Context,
290        provider: impl Provider<PublicKey = PublicKey>,
291        blocker: impl Blocker<PublicKey = PublicKey>,
292        signer: impl Signer<PublicKey = PublicKey>,
293        connection: (
294            Sender<PublicKey, deterministic::Context>,
295            Receiver<PublicKey>,
296        ),
297        consumer: C,
298        producer: Pro,
299    ) -> Mailbox<Key, PublicKey, R>
300    where
301        C: crate::Consumer<Key = Key, Subscriber = R, Value = Bytes>,
302        Pro: crate::p2p::Producer<Key = Key>,
303        R: Clone + Ord + Send + 'static,
304    {
305        let public_key = signer.public_key();
306        let (engine, mailbox) = Engine::new(
307            context.child("actor").with_attribute("peer", &public_key),
308            Config {
309                peer_provider: provider,
310                blocker,
311                consumer,
312                producer,
313                mailbox_size: MAILBOX_SIZE,
314                me: Some(public_key),
315                initial: INITIAL_DURATION,
316                timeout: TIMEOUT,
317                fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
318                priority_requests: false,
319                priority_responses: false,
320            },
321        );
322        engine.start(connection);
323
324        mailbox
325    }
326
327    type DeliveryGate = (oneshot::Receiver<()>, bool);
328    type DeliveryGates = Arc<Mutex<VecDeque<DeliveryGate>>>;
329
330    #[derive(Clone)]
331    struct BlockingConsumer {
332        context: Arc<deterministic::Context>,
333        sender: mpsc::UnboundedSender<(Key, Bytes)>,
334        started: mpsc::UnboundedSender<Key>,
335        gates: DeliveryGates,
336    }
337
338    impl BlockingConsumer {
339        fn new(
340            context: deterministic::Context,
341            gates: Vec<DeliveryGate>,
342        ) -> (
343            Self,
344            mpsc::UnboundedReceiver<(Key, Bytes)>,
345            mpsc::UnboundedReceiver<Key>,
346        ) {
347            let (sender, receiver) = mpsc::unbounded_channel();
348            let (started, started_receiver) = mpsc::unbounded_channel();
349            (
350                Self {
351                    context: Arc::new(context),
352                    sender,
353                    started,
354                    gates: Arc::new(Mutex::new(gates.into())),
355                },
356                receiver,
357                started_receiver,
358            )
359        }
360    }
361
362    impl crate::Consumer for BlockingConsumer {
363        type Key = Key;
364        type Value = Bytes;
365        type Subscriber = ();
366
367        fn deliver(
368            &mut self,
369            delivery: Delivery<Self::Key, Self::Subscriber>,
370            value: Self::Value,
371        ) -> oneshot::Receiver<bool> {
372            let key = delivery.key;
373            self.started.send_lossy(key.clone());
374            let (gate, valid) = self
375                .gates
376                .lock()
377                .pop_front()
378                .map_or((None, true), |(gate, valid)| (Some(gate), valid));
379            let (mut response, receiver) = oneshot::channel();
380            let sender = self.sender.clone();
381            self.context.child("delivery").spawn(move |_| async move {
382                if let Some(gate) = gate {
383                    select! {
384                        _ = response.closed() => return,
385                        result = gate => {
386                            if result.is_err() {
387                                let _ = response.send(false);
388                                return;
389                            }
390                        },
391                    }
392                }
393                if valid {
394                    sender.send_lossy((key, value));
395                }
396                let _ = response.send(valid);
397            });
398            receiver
399        }
400    }
401
402    #[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
403    struct SubscriberTag(u16);
404
405    type RecordedDelivery = (Delivery<Key, SubscriberTag>, Bytes);
406
407    #[derive(Clone)]
408    struct BlockingSubscriberRecordingConsumer {
409        context: Arc<deterministic::Context>,
410        sender: mpsc::UnboundedSender<RecordedDelivery>,
411        started: mpsc::UnboundedSender<Delivery<Key, SubscriberTag>>,
412        gates: DeliveryGates,
413    }
414
415    impl BlockingSubscriberRecordingConsumer {
416        fn new(
417            context: deterministic::Context,
418            gates: Vec<DeliveryGate>,
419        ) -> (
420            Self,
421            mpsc::UnboundedReceiver<RecordedDelivery>,
422            mpsc::UnboundedReceiver<Delivery<Key, SubscriberTag>>,
423        ) {
424            let (sender, receiver) = mpsc::unbounded_channel();
425            let (started, started_receiver) = mpsc::unbounded_channel();
426            (
427                Self {
428                    context: Arc::new(context),
429                    sender,
430                    started,
431                    gates: Arc::new(Mutex::new(gates.into())),
432                },
433                receiver,
434                started_receiver,
435            )
436        }
437    }
438
439    impl crate::Consumer for BlockingSubscriberRecordingConsumer {
440        type Key = Key;
441        type Value = Bytes;
442        type Subscriber = SubscriberTag;
443
444        fn deliver(
445            &mut self,
446            delivery: Delivery<Self::Key, Self::Subscriber>,
447            value: Self::Value,
448        ) -> oneshot::Receiver<bool> {
449            self.started.send_lossy(delivery.clone());
450            let (gate, valid) = self
451                .gates
452                .lock()
453                .pop_front()
454                .map_or((None, true), |(gate, valid)| (Some(gate), valid));
455            let (mut response, receiver) = oneshot::channel();
456            let sender = self.sender.clone();
457            self.context.child("delivery").spawn(move |_| async move {
458                if let Some(gate) = gate {
459                    select! {
460                        _ = response.closed() => return,
461                        result = gate => {
462                            if result.is_err() {
463                                let _ = response.send(false);
464                                return;
465                            }
466                        },
467                    }
468                }
469                if valid {
470                    sender.send_lossy((delivery, value));
471                }
472                let _ = response.send(valid);
473            });
474            receiver
475        }
476    }
477
478    #[derive(Clone)]
479    struct SubscriberRecordingConsumer {
480        sender: mpsc::UnboundedSender<RecordedDelivery>,
481    }
482
483    impl SubscriberRecordingConsumer {
484        fn new() -> (Self, mpsc::UnboundedReceiver<RecordedDelivery>) {
485            let (sender, receiver) = mpsc::unbounded_channel();
486            (Self { sender }, receiver)
487        }
488    }
489
490    impl crate::Consumer for SubscriberRecordingConsumer {
491        type Key = Key;
492        type Value = Bytes;
493        type Subscriber = SubscriberTag;
494
495        fn deliver(
496            &mut self,
497            delivery: Delivery<Self::Key, Self::Subscriber>,
498            value: Self::Value,
499        ) -> oneshot::Receiver<bool> {
500            let (sender, receiver) = oneshot::channel();
501            self.sender.send_lossy((delivery, value));
502            let _ = sender.send(true);
503            receiver
504        }
505    }
506
507    fn dummy_consumer() -> Consumer<Key, Bytes> {
508        Consumer::dummy()
509    }
510
511    fn consumer() -> (Consumer<Key, Bytes>, mpsc::UnboundedReceiver<(Key, Bytes)>) {
512        Consumer::new()
513    }
514
515    async fn wait_for_blocked(
516        context: &deterministic::Context,
517        oracle: &Oracle<PublicKey, deterministic::Context>,
518        blocker: &PublicKey,
519        blocked: &PublicKey,
520    ) {
521        loop {
522            let blocked_peers = oracle.blocked().await.unwrap();
523            if blocked_peers
524                .iter()
525                .any(|(a, b)| a == blocker && b == blocked)
526            {
527                return;
528            }
529            context.sleep(Duration::from_millis(10)).await;
530        }
531    }
532
533    /// Tests that fetching a key from another peer succeeds when data is available.
534    /// This test sets up two peers, where Peer 1 requests data that Peer 2 has,
535    /// and verifies that the data is correctly delivered to Peer 1's consumer.
536    #[test_traced]
537    fn test_fetch_success() {
538        let executor = deterministic::Runner::timed(Duration::from_secs(10));
539        executor.start(|context| async move {
540            let (mut oracle, mut schemes, peers, mut connections) =
541                setup_network_and_peers(&context, &[1, 2]).await;
542
543            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
544
545            let key = Key(2);
546            let mut prod2 = Producer::default();
547            prod2.insert(key.clone(), Bytes::from("data for key 2"));
548
549            let (cons1, mut cons_out1) = consumer();
550
551            let scheme = schemes.remove(0);
552            let mut mailbox1 = setup_and_spawn_actor(
553                &context,
554                oracle.manager(),
555                oracle.control(scheme.public_key()),
556                scheme,
557                connections.remove(0),
558                cons1,
559                Producer::default(),
560            );
561
562            let scheme = schemes.remove(0);
563            let _mailbox2 = setup_and_spawn_actor(
564                &context,
565                oracle.manager(),
566                oracle.control(scheme.public_key()),
567                scheme,
568                connections.remove(0),
569                dummy_consumer(),
570                prod2,
571            );
572
573            mailbox1.fetch(key.clone());
574
575            let (key_actual, value) = cons_out1.recv().await.unwrap();
576            assert_eq!(key_actual, key);
577            assert_eq!(value, Bytes::from("data for key 2"));
578        });
579    }
580
581    #[test_traced]
582    fn test_pending_delivery_does_not_block_engine() {
583        let executor = deterministic::Runner::timed(Duration::from_secs(10));
584        executor.start(|context| async move {
585            let (mut oracle, mut schemes, peers, mut connections) =
586                setup_network_and_peers(&context, &[1, 2, 3]).await;
587
588            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
589            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
590
591            let key1 = Key(1);
592            let key2 = Key(2);
593            let data1 = Bytes::from("data for key 1");
594            let data2 = Bytes::from("data for key 2");
595
596            let mut prod2 = Producer::default();
597            prod2.insert(key1.clone(), data1.clone());
598
599            let mut prod3 = Producer::default();
600            prod3.insert(key2.clone(), data2.clone());
601
602            let (gate_sender1, gate_receiver1) = oneshot::channel();
603            let (gate_sender2, gate_receiver2) = oneshot::channel();
604            let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
605                context.child("consumer"),
606                vec![(gate_receiver1, true), (gate_receiver2, true)],
607            );
608
609            let scheme = schemes.remove(0);
610            let mut mailbox1 = setup_and_spawn_actor(
611                &context,
612                oracle.manager(),
613                oracle.control(scheme.public_key()),
614                scheme,
615                connections.remove(0),
616                cons1,
617                Producer::default(),
618            );
619
620            let scheme = schemes.remove(0);
621            let _mailbox2 = setup_and_spawn_actor(
622                &context,
623                oracle.manager(),
624                oracle.control(scheme.public_key()),
625                scheme,
626                connections.remove(0),
627                dummy_consumer(),
628                prod2,
629            );
630
631            let scheme = schemes.remove(0);
632            let _mailbox3 = setup_and_spawn_actor(
633                &context,
634                oracle.manager(),
635                oracle.control(scheme.public_key()),
636                scheme,
637                connections.remove(0),
638                dummy_consumer(),
639                prod3,
640            );
641
642            mailbox1.fetch(key1.clone());
643            let started_key = started.recv().await.expect("delivery did not start");
644            assert_eq!(started_key, key1);
645
646            mailbox1.fetch(key2.clone());
647            select! {
648                started_key = started.recv() => {
649                    assert_eq!(started_key.expect("delivery did not start"), key2);
650                },
651                _ = context.sleep(Duration::from_secs(2)) => {
652                    panic!("resolver engine blocked on pending delivery");
653                },
654            };
655
656            gate_sender2.send(()).unwrap();
657            let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
658            assert_eq!(key_actual, key2);
659            assert_eq!(value, data2);
660
661            gate_sender1.send(()).unwrap();
662            let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
663            assert_eq!(key_actual, key1);
664            assert_eq!(value, data1);
665        });
666    }
667
668    #[test_traced]
669    fn test_retain_drops_pending_delivery() {
670        let executor = deterministic::Runner::timed(Duration::from_secs(10));
671        executor.start(|context| async move {
672            let (mut oracle, mut schemes, peers, mut connections) =
673                setup_network_and_peers(&context, &[1, 2]).await;
674
675            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
676
677            let key = Key(1);
678            let data = Bytes::from("data for key 1");
679            let mut prod2 = Producer::default();
680            prod2.insert(key.clone(), data.clone());
681
682            let (mut first_gate_sender, first_gate_receiver) = oneshot::channel();
683            let (second_gate_sender, second_gate_receiver) = oneshot::channel();
684            let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
685                context.child("consumer"),
686                vec![(first_gate_receiver, true), (second_gate_receiver, true)],
687            );
688
689            let scheme = schemes.remove(0);
690            let mut mailbox1 = setup_and_spawn_actor(
691                &context,
692                oracle.manager(),
693                oracle.control(scheme.public_key()),
694                scheme,
695                connections.remove(0),
696                cons1,
697                Producer::default(),
698            );
699
700            let scheme = schemes.remove(0);
701            let _mailbox2 = setup_and_spawn_actor(
702                &context,
703                oracle.manager(),
704                oracle.control(scheme.public_key()),
705                scheme,
706                connections.remove(0),
707                dummy_consumer(),
708                prod2,
709            );
710
711            mailbox1.fetch(key.clone());
712            let started_key = started.recv().await.expect("delivery did not start");
713            assert_eq!(started_key, key);
714
715            let canceled = key.clone();
716            mailbox1.retain(move |key, _| key != &canceled);
717            mailbox1.fetch(key.clone());
718
719            first_gate_sender.closed().await;
720            let started_key = started.recv().await.expect("second delivery did not start");
721            assert_eq!(started_key, key);
722
723            second_gate_sender.send(()).unwrap();
724            let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
725            assert_eq!(key_actual, key);
726            assert_eq!(value, data);
727
728            select! {
729                _ = cons_out1.recv() => panic!("unexpected extra event"),
730                _ = context.sleep(Duration::from_millis(100)) => {},
731            };
732        });
733    }
734
735    #[test_traced]
736    fn test_invalid_delivery_retries_and_rearms_slot() {
737        let executor = deterministic::Runner::timed(Duration::from_secs(10));
738        executor.start(|context| async move {
739            let (mut oracle, mut schemes, peers, mut connections) =
740                setup_network_and_peers(&context, &[1, 2, 3]).await;
741
742            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
743
744            let key = Key(1);
745            let data = Bytes::from("data for key 1");
746
747            let mut prod2 = Producer::default();
748            prod2.insert(key.clone(), data.clone());
749
750            let mut prod3 = Producer::default();
751            prod3.insert(key.clone(), data.clone());
752
753            let (first_gate_sender, first_gate_receiver) = oneshot::channel();
754            let (second_gate_sender, second_gate_receiver) = oneshot::channel();
755            let (cons1, mut cons_out1, mut started) = BlockingConsumer::new(
756                context.child("consumer"),
757                vec![(first_gate_receiver, false), (second_gate_receiver, true)],
758            );
759
760            let scheme = schemes.remove(0);
761            let mut mailbox1 = setup_and_spawn_actor(
762                &context,
763                oracle.manager(),
764                oracle.control(scheme.public_key()),
765                scheme,
766                connections.remove(0),
767                cons1,
768                Producer::default(),
769            );
770
771            let scheme = schemes.remove(0);
772            let _mailbox2 = setup_and_spawn_actor(
773                &context,
774                oracle.manager(),
775                oracle.control(scheme.public_key()),
776                scheme,
777                connections.remove(0),
778                dummy_consumer(),
779                prod2,
780            );
781
782            let scheme = schemes.remove(0);
783            let _mailbox3 = setup_and_spawn_actor(
784                &context,
785                oracle.manager(),
786                oracle.control(scheme.public_key()),
787                scheme,
788                connections.remove(0),
789                dummy_consumer(),
790                prod3,
791            );
792
793            mailbox1.fetch_targeted(
794                key.clone(),
795                non_empty_vec![peers[1].clone(), peers[2].clone()],
796            );
797            let started_key = started.recv().await.expect("delivery did not start");
798            assert_eq!(started_key, key);
799
800            first_gate_sender.send(()).unwrap();
801            wait_for_blocked(&context, &oracle, &peers[0], &peers[1]).await;
802
803            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
804            oracle.manager().track(
805                1,
806                Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
807            );
808
809            let started_key = started.recv().await.expect("retry delivery did not start");
810            assert_eq!(started_key, key);
811
812            second_gate_sender.send(()).unwrap();
813            let (key_actual, value) = cons_out1.recv().await.expect("consumer channel closed");
814            assert_eq!(key_actual, key);
815            assert_eq!(value, data);
816        });
817    }
818
819    async fn run_pending_invalid_delivery_race(
820        context: &deterministic::Context,
821        validation_first: bool,
822    ) {
823        let (mut oracle, mut schemes, peers, mut connections) =
824            setup_network_and_peers(context, &[1, 2]).await;
825
826        add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
827
828        let key = Key(1);
829        let mut prod2 = Producer::default();
830        prod2.insert(key.clone(), Bytes::from("data for key 1"));
831
832        let (mut gate_sender, gate_receiver) = oneshot::channel();
833        let (cons1, mut cons_out1, mut started) =
834            BlockingConsumer::new(context.child("consumer"), vec![(gate_receiver, false)]);
835
836        let scheme = schemes.remove(0);
837        let mut mailbox1 = setup_and_spawn_actor(
838            context,
839            oracle.manager(),
840            oracle.control(scheme.public_key()),
841            scheme,
842            connections.remove(0),
843            cons1,
844            Producer::default(),
845        );
846
847        let scheme = schemes.remove(0);
848        let _mailbox2 = setup_and_spawn_actor(
849            context,
850            oracle.manager(),
851            oracle.control(scheme.public_key()),
852            scheme,
853            connections.remove(0),
854            dummy_consumer(),
855            prod2,
856        );
857
858        mailbox1.fetch(key.clone());
859        let started_key = started.recv().await.expect("delivery did not start");
860        assert_eq!(started_key, key);
861
862        if validation_first {
863            gate_sender.send(()).unwrap();
864            wait_for_blocked(context, &oracle, &peers[0], &peers[1]).await;
865            mailbox1.retain(|_, _| false);
866            let blocked = oracle.blocked().await.unwrap();
867            assert_eq!(blocked.len(), 1);
868            assert_eq!(blocked[0].0, peers[0]);
869            assert_eq!(blocked[0].1, peers[1]);
870        } else {
871            mailbox1.retain(|_, _| false);
872            gate_sender.closed().await;
873            assert!(oracle.blocked().await.unwrap().is_empty());
874        }
875
876        select! {
877            _ = cons_out1.recv() => panic!("unexpected event"),
878            _ = context.sleep(Duration::from_millis(100)) => {},
879        };
880    }
881
882    #[test_traced]
883    fn test_retain_pending_invalid_delivery_race() {
884        let executor = deterministic::Runner::timed(Duration::from_secs(10));
885        executor.start(|context| async move {
886            run_pending_invalid_delivery_race(&context, false).await;
887            run_pending_invalid_delivery_race(&context, true).await;
888        });
889    }
890
891    /// Tests that pruning a fetch leaves the consumer untouched.
892    /// This test initiates a fetch and immediately prunes it, verifying
893    /// that the consumer does not receive any event.
894    #[test_traced]
895    fn test_retain_drops_fetch() {
896        let executor = deterministic::Runner::timed(Duration::from_secs(10));
897        executor.start(|context| async move {
898            let (oracle, mut schemes, _peers, mut connections) =
899                setup_network_and_peers(&context, &[1]).await;
900
901            let (cons1, mut cons_out1) = consumer();
902            let prod1 = Producer::default();
903
904            let scheme = schemes.remove(0);
905            let mut mailbox1 = setup_and_spawn_actor(
906                &context,
907                oracle.manager(),
908                oracle.control(scheme.public_key()),
909                scheme,
910                connections.remove(0),
911                cons1,
912                prod1,
913            );
914
915            let key = Key(3);
916            mailbox1.fetch(key.clone());
917            let canceled = key.clone();
918            mailbox1.retain(move |key, _| key != &canceled);
919
920            select! {
921                _ = cons_out1.recv() => panic!("unexpected event"),
922                _ = context.sleep(Duration::from_millis(100)) => {},
923            };
924        });
925    }
926
927    /// Tests fetching data from a peer when some peers lack the data.
928    /// This test sets up three peers, where Peer 1 requests data that only Peer 3 has.
929    /// It verifies that the resolver retries with another peer and successfully
930    /// delivers the data to Peer 1's consumer.
931    #[test_traced]
932    fn test_peer_no_data() {
933        let executor = deterministic::Runner::timed(Duration::from_secs(10));
934        executor.start(|context| async move {
935            let (mut oracle, mut schemes, peers, mut connections) =
936                setup_network_and_peers(&context, &[1, 2, 3]).await;
937
938            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
939            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
940
941            let prod1 = Producer::default();
942            let prod2 = Producer::default();
943            let mut prod3 = Producer::default();
944            let key = Key(3);
945            prod3.insert(key.clone(), Bytes::from("data for key 3"));
946
947            let (cons1, mut cons_out1) = consumer();
948
949            let scheme = schemes.remove(0);
950            let mut mailbox1 = setup_and_spawn_actor(
951                &context,
952                oracle.manager(),
953                oracle.control(scheme.public_key()),
954                scheme,
955                connections.remove(0),
956                cons1,
957                prod1,
958            );
959
960            let scheme = schemes.remove(0);
961            let _mailbox2 = setup_and_spawn_actor(
962                &context,
963                oracle.manager(),
964                oracle.control(scheme.public_key()),
965                scheme,
966                connections.remove(0),
967                dummy_consumer(),
968                prod2,
969            );
970
971            let scheme = schemes.remove(0);
972            let _mailbox3 = setup_and_spawn_actor(
973                &context,
974                oracle.manager(),
975                oracle.control(scheme.public_key()),
976                scheme,
977                connections.remove(0),
978                dummy_consumer(),
979                prod3,
980            );
981
982            mailbox1.fetch(key.clone());
983
984            let (key_actual, value) = cons_out1.recv().await.unwrap();
985            assert_eq!(key_actual, key);
986            assert_eq!(value, Bytes::from("data for key 3"));
987        });
988    }
989
990    /// Tests fetching when no peers are available.
991    /// This test sets up a single peer with an empty peer provider (no peers).
992    /// It initiates a fetch, waits beyond the retry timeout, prunes the fetch,
993    /// and verifies that the consumer receives a failure notification.
994    #[test_traced]
995    fn test_no_peers_available() {
996        let executor = deterministic::Runner::timed(Duration::from_secs(10));
997        executor.start(|context| async move {
998            let (oracle, mut schemes, _peers, mut connections) =
999                setup_network_and_peers(&context, &[1]).await;
1000
1001            let (cons1, mut cons_out1) = consumer();
1002            let prod1 = Producer::default();
1003
1004            let scheme = schemes.remove(0);
1005            let mut mailbox1 = setup_and_spawn_actor(
1006                &context,
1007                oracle.manager(),
1008                oracle.control(scheme.public_key()),
1009                scheme,
1010                connections.remove(0),
1011                cons1,
1012                prod1,
1013            );
1014
1015            mailbox1.fetch(Key(4));
1016            context.sleep(Duration::from_secs(5)).await;
1017
1018            // With no peers, no event should arrive
1019            select! {
1020                _ = cons_out1.recv() => panic!("Fetch should have failed due to no peers"),
1021                _ = context.sleep(Duration::from_millis(100)) => {},
1022            };
1023        });
1024    }
1025
1026    /// Tests that fetches issued before the first peer set arrives stay pending and complete once
1027    /// the initial update is tracked.
1028    #[test_traced]
1029    fn test_fetch_before_initial_peer_set_waits_for_update() {
1030        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1031        executor.start(|context| async move {
1032            let (network, mut oracle) = Network::new(
1033                context.child("network"),
1034                commonware_p2p::simulated::Config {
1035                    max_size: 1024 * 1024,
1036                    disconnect_on_block: true,
1037                    tracked_peer_sets: NZUsize!(1),
1038                },
1039            );
1040            network.start();
1041
1042            let mut schemes = [1_u64, 2]
1043                .into_iter()
1044                .map(PrivateKey::from_seed)
1045                .collect::<Vec<_>>();
1046            schemes.sort_by_key(|s| s.public_key());
1047            let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
1048
1049            let mut connections = Vec::new();
1050            for peer in &peers {
1051                let (sender, receiver) = oracle
1052                    .control(peer.clone())
1053                    .register(0, Quota::per_second(RATE_LIMIT))
1054                    .await
1055                    .unwrap();
1056                connections.push((sender, receiver));
1057            }
1058
1059            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1060
1061            let key = Key(2);
1062            let mut prod2 = Producer::default();
1063            prod2.insert(key.clone(), Bytes::from("data for key 2"));
1064
1065            let (cons1, mut cons_out1) = consumer();
1066
1067            let scheme = schemes.remove(0);
1068            let mut mailbox1 = setup_and_spawn_actor(
1069                &context,
1070                oracle.manager(),
1071                oracle.control(scheme.public_key()),
1072                scheme,
1073                connections.remove(0),
1074                cons1,
1075                Producer::default(),
1076            );
1077
1078            let scheme = schemes.remove(0);
1079            let _mailbox2 = setup_and_spawn_actor(
1080                &context,
1081                oracle.manager(),
1082                oracle.control(scheme.public_key()),
1083                scheme,
1084                connections.remove(0),
1085                dummy_consumer(),
1086                prod2,
1087            );
1088
1089            mailbox1.fetch(key.clone());
1090
1091            select! {
1092                event = cons_out1.recv() => {
1093                    panic!("fetch should wait for the initial peer set, got {event:?}");
1094                },
1095                _ = context.sleep(Duration::from_millis(200)) => {},
1096            };
1097
1098            oracle
1099                .manager()
1100                .track(0, Set::try_from(peers.clone()).unwrap());
1101
1102            let (key_actual, value) = cons_out1.recv().await.unwrap();
1103            assert_eq!(key_actual, key);
1104            assert_eq!(value, Bytes::from("data for key 2"));
1105        });
1106    }
1107
1108    /// Tests that concurrent fetches are handled correctly.
1109    /// Also tests that the peer can recover from having no peers available.
1110    /// Also tests that the peer can get data from multiple peers that have different sets of data.
1111    #[test_traced]
1112    fn test_concurrent_fetch_requests() {
1113        let executor = deterministic::Runner::default();
1114        executor.start(|context| async move {
1115            let (mut oracle, mut schemes, peers, mut connections) =
1116                setup_network_and_peers(&context, &[1, 2, 3]).await;
1117
1118            let key2 = Key(2);
1119            let key3 = Key(3);
1120            let mut prod2 = Producer::default();
1121            prod2.insert(key2.clone(), Bytes::from("data for key 2"));
1122            let mut prod3 = Producer::default();
1123            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1124
1125            let (cons1, mut cons_out1) = consumer();
1126
1127            let scheme = schemes.remove(0);
1128            let mut mailbox1 = setup_and_spawn_actor(
1129                &context,
1130                oracle.manager(),
1131                oracle.control(scheme.public_key()),
1132                scheme,
1133                connections.remove(0),
1134                cons1,
1135                Producer::default(),
1136            );
1137
1138            let scheme = schemes.remove(0);
1139            let _mailbox2 = setup_and_spawn_actor(
1140                &context,
1141                oracle.manager(),
1142                oracle.control(scheme.public_key()),
1143                scheme,
1144                connections.remove(0),
1145                dummy_consumer(),
1146                prod2,
1147            );
1148
1149            let scheme = schemes.remove(0);
1150            let _mailbox3 = setup_and_spawn_actor(
1151                &context,
1152                oracle.manager(),
1153                oracle.control(scheme.public_key()),
1154                scheme,
1155                connections.remove(0),
1156                dummy_consumer(),
1157                prod3,
1158            );
1159
1160            // Add choppy links between the requester and the two producers
1161            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 1).await;
1162            add_link(&mut oracle, LINK_UNRELIABLE.clone(), &peers, 0, 2).await;
1163
1164            // Run the fetches multiple times to ensure that the peer tries both of its peers
1165            for _ in 0..10 {
1166                // Initiate concurrent fetches.
1167                mailbox1.fetch(key2.clone());
1168                mailbox1.fetch(key3.clone());
1169
1170                // Collect both events without assuming order
1171                let mut events = Vec::new();
1172                events.push(cons_out1.recv().await.expect("Consumer channel closed"));
1173                events.push(cons_out1.recv().await.expect("Consumer channel closed"));
1174
1175                // Check that both keys were successfully fetched
1176                let mut found_key2 = false;
1177                let mut found_key3 = false;
1178                for (key_actual, value) in events {
1179                    if key_actual == key2 {
1180                        assert_eq!(value, Bytes::from("data for key 2"));
1181                        found_key2 = true;
1182                    } else if key_actual == key3 {
1183                        assert_eq!(value, Bytes::from("data for key 3"));
1184                        found_key3 = true;
1185                    } else {
1186                        panic!("Unexpected key received");
1187                    }
1188                }
1189                assert!(found_key2 && found_key3,);
1190            }
1191        });
1192    }
1193
1194    /// Tests that pruning an inactive fetch has no effect.
1195    /// Prunes a key before, after, and during the fetch process.
1196    #[test_traced]
1197    fn test_retain_drops_key() {
1198        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1199        executor.start(|context| async move {
1200            let (mut oracle, mut schemes, peers, mut connections) =
1201                setup_network_and_peers(&context, &[1, 2]).await;
1202
1203            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1204
1205            let key = Key(6);
1206            let mut prod2 = Producer::default();
1207            prod2.insert(key.clone(), Bytes::from("data for key 6"));
1208
1209            let (cons1, mut cons_out1) = consumer();
1210
1211            let scheme = schemes.remove(0);
1212            let mut mailbox1 = setup_and_spawn_actor(
1213                &context,
1214                oracle.manager(),
1215                oracle.control(scheme.public_key()),
1216                scheme,
1217                connections.remove(0),
1218                cons1,
1219                Producer::default(),
1220            );
1221
1222            let scheme = schemes.remove(0);
1223            let _mailbox2 = setup_and_spawn_actor(
1224                &context,
1225                oracle.manager(),
1226                oracle.control(scheme.public_key()),
1227                scheme,
1228                connections.remove(0),
1229                dummy_consumer(),
1230                prod2,
1231            );
1232
1233            // Prune before sending the fetch, expecting no effect.
1234            let canceled = key.clone();
1235            mailbox1.retain(move |key, _| key != &canceled);
1236            select! {
1237                _ = cons_out1.recv() => {
1238                    panic!("unexpected event");
1239                },
1240                _ = context.sleep(Duration::from_millis(100)) => {},
1241            };
1242
1243            // Initiate fetch and wait for data to be delivered
1244            mailbox1.fetch(key.clone());
1245            let (key_actual, value) = cons_out1.recv().await.unwrap();
1246            assert_eq!(key_actual, key);
1247            assert_eq!(value, Bytes::from("data for key 6"));
1248
1249            // Attempt to prune after data has been delivered, expecting no effect
1250            let canceled = key.clone();
1251            mailbox1.retain(move |key, _| key != &canceled);
1252            select! {
1253                _ = cons_out1.recv() => {
1254                    panic!("unexpected event");
1255                },
1256                _ = context.sleep(Duration::from_millis(100)) => {},
1257            };
1258
1259            // Initiate and prune another fetch.
1260            let key = Key(7);
1261            mailbox1.fetch(key.clone());
1262            let canceled = key.clone();
1263            mailbox1.retain(move |key, _| key != &canceled);
1264
1265            // No event should arrive after pruning.
1266            select! {
1267                _ = cons_out1.recv() => panic!("unexpected event"),
1268                _ = context.sleep(Duration::from_millis(100)) => {},
1269            };
1270        });
1271    }
1272
1273    /// Tests that a peer is blocked after delivering invalid data,
1274    /// preventing further fetches from that peer.
1275    #[test_traced]
1276    fn test_blocking_peer() {
1277        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1278        executor.start(|context| async move {
1279            let (mut oracle, mut schemes, peers, mut connections) =
1280                setup_network_and_peers(&context, &[1, 2, 3]).await;
1281
1282            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1283            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1284            add_link(&mut oracle, LINK.clone(), &peers, 1, 2).await;
1285
1286            let key_a = Key(1);
1287            let key_b = Key(2);
1288            let invalid_data_a = Bytes::from("invalid for A");
1289            let valid_data_a = Bytes::from("valid for A");
1290            let valid_data_b = Bytes::from("valid for B");
1291
1292            // Set up producers
1293            let mut prod2 = Producer::default();
1294            prod2.insert(key_a.clone(), invalid_data_a.clone());
1295            prod2.insert(key_b.clone(), valid_data_b.clone());
1296
1297            let mut prod3 = Producer::default();
1298            prod3.insert(key_a.clone(), valid_data_a.clone());
1299
1300            // Set up consumer for Peer1 with expected values
1301            let (mut cons1, mut cons_out1) = consumer();
1302            cons1.add_expected(key_a.clone(), valid_data_a.clone());
1303            cons1.add_expected(key_b.clone(), valid_data_b.clone());
1304
1305            // Spawn actors
1306            let scheme = schemes.remove(0);
1307            let mut mailbox1 = setup_and_spawn_actor(
1308                &context,
1309                oracle.manager(),
1310                oracle.control(scheme.public_key()),
1311                scheme,
1312                connections.remove(0),
1313                cons1,
1314                Producer::default(),
1315            );
1316
1317            let scheme = schemes.remove(0);
1318            let _mailbox2 = setup_and_spawn_actor(
1319                &context,
1320                oracle.manager(),
1321                oracle.control(scheme.public_key()),
1322                scheme,
1323                connections.remove(0),
1324                dummy_consumer(),
1325                prod2,
1326            );
1327
1328            let scheme = schemes.remove(0);
1329            let _mailbox3 = setup_and_spawn_actor(
1330                &context,
1331                oracle.manager(),
1332                oracle.control(scheme.public_key()),
1333                scheme,
1334                connections.remove(0),
1335                dummy_consumer(),
1336                prod3,
1337            );
1338
1339            // Fetch keyA multiple times to ensure that Peer2 is blocked.
1340            for _ in 0..20 {
1341                // Fetch keyA
1342                mailbox1.fetch(key_a.clone());
1343
1344                // Wait for success event for keyA
1345                let (key_actual, value) = cons_out1.recv().await.unwrap();
1346                assert_eq!(key_actual, key_a);
1347                assert_eq!(value, valid_data_a);
1348            }
1349
1350            // Fetch keyB
1351            mailbox1.fetch(key_b.clone());
1352
1353            // Wait for some time (longer than retry timeout)
1354            context.sleep(Duration::from_secs(5)).await;
1355
1356            // No success event should be received for keyB since the only peer with valid data is blocked
1357            select! {
1358                _ = cons_out1.recv() => panic!("unexpected event"),
1359                _ = context.sleep(Duration::from_millis(100)) => {},
1360            };
1361
1362            // Prune the fetch for keyB.
1363            let canceled = key_b.clone();
1364            mailbox1.retain(move |key, _| key != &canceled);
1365
1366            // Check oracle
1367            let blocked = oracle.blocked().await.unwrap();
1368            assert_eq!(blocked.len(), 1);
1369            assert_eq!(blocked[0].0, peers[0]);
1370            assert_eq!(blocked[0].1, peers[1]);
1371        });
1372    }
1373
1374    /// Tests that duplicate fetches for the same key are handled properly.
1375    /// The test verifies that when the same key is fetched multiple times,
1376    /// the data is correctly delivered once without errors.
1377    #[test_traced]
1378    fn test_duplicate_fetch_key() {
1379        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1380        executor.start(|context| async move {
1381            let (mut oracle, mut schemes, peers, mut connections) =
1382                setup_network_and_peers(&context, &[1, 2]).await;
1383
1384            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1385
1386            let key = Key(5);
1387            let mut prod2 = Producer::default();
1388            prod2.insert(key.clone(), Bytes::from("data for key 5"));
1389
1390            let (cons1, mut cons_out1) = consumer();
1391
1392            let scheme = schemes.remove(0);
1393            let mut mailbox1 = setup_and_spawn_actor(
1394                &context,
1395                oracle.manager(),
1396                oracle.control(scheme.public_key()),
1397                scheme,
1398                connections.remove(0),
1399                cons1,
1400                Producer::default(),
1401            );
1402
1403            let scheme = schemes.remove(0);
1404            let _mailbox2 = setup_and_spawn_actor(
1405                &context,
1406                oracle.manager(),
1407                oracle.control(scheme.public_key()),
1408                scheme,
1409                connections.remove(0),
1410                dummy_consumer(),
1411                prod2,
1412            );
1413
1414            // Send duplicate fetches for the same key.
1415            mailbox1.fetch(key.clone());
1416            mailbox1.fetch(key.clone());
1417
1418            // Should receive the data only once
1419            let (key_actual, value) = cons_out1.recv().await.unwrap();
1420            assert_eq!(key_actual, key);
1421            assert_eq!(value, Bytes::from("data for key 5"));
1422
1423            // Make sure we don't receive a second event for the duplicate fetch
1424            select! {
1425                _ = cons_out1.recv() => {
1426                    panic!("Unexpected second event received for duplicate fetch");
1427                },
1428                _ = context.sleep(Duration::from_millis(500)) => {
1429                    // This is expected - no additional events should be produced
1430                },
1431            };
1432        });
1433    }
1434
1435    /// Tests that changing peer sets is handled correctly using the update channel.
1436    /// This test verifies that when the peer set changes from peer A to peer B,
1437    /// the resolver correctly adapts and fetches from the new peer.
1438    #[test_traced]
1439    fn test_changing_peer_sets() {
1440        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1441        executor.start(|context| async move {
1442            let (mut oracle, mut schemes, peers, mut connections) =
1443                setup_network_and_peers(&context, &[1, 2, 3]).await;
1444
1445            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1446            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1447
1448            let key1 = Key(1);
1449            let key2 = Key(2);
1450
1451            let mut prod2 = Producer::default();
1452            prod2.insert(key1.clone(), Bytes::from("data from peer 2"));
1453
1454            let mut prod3 = Producer::default();
1455            prod3.insert(key2.clone(), Bytes::from("data from peer 3"));
1456
1457            let (cons1, mut cons_out1) = consumer();
1458
1459            let scheme = schemes.remove(0);
1460            let mut mailbox1 = setup_and_spawn_actor(
1461                &context,
1462                oracle.manager(),
1463                oracle.control(scheme.public_key()),
1464                scheme,
1465                connections.remove(0),
1466                cons1,
1467                Producer::default(),
1468            );
1469
1470            let scheme = schemes.remove(0);
1471            let _mailbox2 = setup_and_spawn_actor(
1472                &context,
1473                oracle.manager(),
1474                oracle.control(scheme.public_key()),
1475                scheme,
1476                connections.remove(0),
1477                dummy_consumer(),
1478                prod2,
1479            );
1480
1481            // Fetch key1 from peer 2
1482            mailbox1.fetch(key1.clone());
1483
1484            // Wait for successful fetch
1485            let (key_actual, value) = cons_out1.recv().await.unwrap();
1486            assert_eq!(key_actual, key1);
1487            assert_eq!(value, Bytes::from("data from peer 2"));
1488
1489            // Change peer set to include peer 3
1490            let scheme = schemes.remove(0);
1491            let _mailbox3 = setup_and_spawn_actor(
1492                &context,
1493                oracle.manager(),
1494                oracle.control(scheme.public_key()),
1495                scheme,
1496                connections.remove(0),
1497                dummy_consumer(),
1498                prod3,
1499            );
1500
1501            // Need to wait for the peer set change to propagate
1502            context.sleep(Duration::from_millis(200)).await;
1503
1504            // Fetch key2 from peer 3
1505            mailbox1.fetch(key2.clone());
1506
1507            // Wait for successful fetch
1508            let (key_actual, value) = cons_out1.recv().await.unwrap();
1509            assert_eq!(key_actual, key2);
1510            assert_eq!(value, Bytes::from("data from peer 3"));
1511        });
1512    }
1513
1514    #[test_traced]
1515    fn test_fetch_targeted() {
1516        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1517        executor.start(|context| async move {
1518            let (mut oracle, mut schemes, peers, mut connections) =
1519                setup_network_and_peers(&context, &[1, 2, 3]).await;
1520
1521            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1522            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1523
1524            let key = Key(1);
1525            let invalid_data = Bytes::from("invalid data");
1526            let valid_data = Bytes::from("valid data");
1527
1528            // Peer 2 has invalid data, peer 3 has valid data
1529            let mut prod2 = Producer::default();
1530            prod2.insert(key.clone(), invalid_data.clone());
1531
1532            let mut prod3 = Producer::default();
1533            prod3.insert(key.clone(), valid_data.clone());
1534
1535            // Consumer expects only valid_data
1536            let (mut cons1, mut cons_out1) = consumer();
1537            cons1.add_expected(key.clone(), valid_data.clone());
1538
1539            let scheme = schemes.remove(0);
1540            let mut mailbox1 = setup_and_spawn_actor(
1541                &context,
1542                oracle.manager(),
1543                oracle.control(scheme.public_key()),
1544                scheme,
1545                connections.remove(0),
1546                cons1,
1547                Producer::default(),
1548            );
1549
1550            let scheme = schemes.remove(0);
1551            let _mailbox2 = setup_and_spawn_actor(
1552                &context,
1553                oracle.manager(),
1554                oracle.control(scheme.public_key()),
1555                scheme,
1556                connections.remove(0),
1557                dummy_consumer(),
1558                prod2,
1559            );
1560
1561            let scheme = schemes.remove(0);
1562            let _mailbox3 = setup_and_spawn_actor(
1563                &context,
1564                oracle.manager(),
1565                oracle.control(scheme.public_key()),
1566                scheme,
1567                connections.remove(0),
1568                dummy_consumer(),
1569                prod3,
1570            );
1571
1572            // Wait for peer set to be established
1573            context.sleep(Duration::from_millis(100)).await;
1574
1575            // Start fetch with targets for both peer 2 (invalid data) and peer 3 (valid data)
1576            // When peer 2 returns invalid data, only peer 2 should be removed from targets
1577            // Peer 3 should still be tried as a target and succeed
1578            mailbox1.fetch_targeted(
1579                key.clone(),
1580                non_empty_vec![peers[1].clone(), peers[2].clone()],
1581            );
1582
1583            // Should eventually succeed from peer 3
1584            let (key_actual, value) = cons_out1.recv().await.unwrap();
1585            assert_eq!(key_actual, key);
1586            assert_eq!(value, valid_data);
1587
1588            // Verify peer 2 was blocked (sent invalid data)
1589            let blocked = oracle.blocked().await.unwrap();
1590            assert_eq!(blocked.len(), 1);
1591            assert_eq!(blocked[0].0, peers[0]);
1592            assert_eq!(blocked[0].1, peers[1]);
1593
1594            // Verify metrics: 1 successful fetch (from peer 3 after peer 2 was blocked)
1595            let metrics = context.encode();
1596            assert_eq!(
1597                status_metric_total(&metrics, "actor_fetch_total", "Success"),
1598                1
1599            );
1600        });
1601    }
1602
1603    #[test_traced]
1604    fn test_fetch_targeted_no_fallback() {
1605        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1606        executor.start(|context| async move {
1607            let (mut oracle, mut schemes, peers, mut connections) =
1608                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1609
1610            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1611            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1612            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1613
1614            let key = Key(1);
1615
1616            // Only peer 4 has the data, peers 2 and 3 don't
1617            let mut prod4 = Producer::default();
1618            prod4.insert(key.clone(), Bytes::from("data from peer 4"));
1619
1620            let (cons1, mut cons_out1) = consumer();
1621
1622            let scheme = schemes.remove(0);
1623            let mut mailbox1 = setup_and_spawn_actor(
1624                &context,
1625                oracle.manager(),
1626                oracle.control(scheme.public_key()),
1627                scheme,
1628                connections.remove(0),
1629                cons1,
1630                Producer::default(),
1631            );
1632
1633            let scheme = schemes.remove(0);
1634            let _mailbox2 = setup_and_spawn_actor(
1635                &context,
1636                oracle.manager(),
1637                oracle.control(scheme.public_key()),
1638                scheme,
1639                connections.remove(0),
1640                dummy_consumer(),
1641                Producer::default(), // no data
1642            );
1643
1644            let scheme = schemes.remove(0);
1645            let _mailbox3 = setup_and_spawn_actor(
1646                &context,
1647                oracle.manager(),
1648                oracle.control(scheme.public_key()),
1649                scheme,
1650                connections.remove(0),
1651                dummy_consumer(),
1652                Producer::default(), // no data
1653            );
1654
1655            let scheme = schemes.remove(0);
1656            let _mailbox4 = setup_and_spawn_actor(
1657                &context,
1658                oracle.manager(),
1659                oracle.control(scheme.public_key()),
1660                scheme,
1661                connections.remove(0),
1662                dummy_consumer(),
1663                prod4,
1664            );
1665
1666            // Wait for peer set to be established
1667            context.sleep(Duration::from_millis(100)).await;
1668
1669            // Start fetch with targets for peers 2 and 3 (both don't have data)
1670            // Peer 4 has data but is NOT a target - it should NEVER be tried
1671            mailbox1.fetch_targeted(
1672                key.clone(),
1673                non_empty_vec![peers[1].clone(), peers[2].clone()],
1674            );
1675
1676            // Wait enough time for targets to fail and retry multiple times
1677            // The fetch should not succeed because peer 4 (which has data) is not targeted
1678            select! {
1679                event = cons_out1.recv() => {
1680                    panic!("Fetch should not succeed, but got: {event:?}");
1681                },
1682                _ = context.sleep(Duration::from_secs(3)) => {
1683                    // Expected: no success event because peer 4 is not targeted
1684                },
1685            };
1686        });
1687    }
1688
1689    #[test_traced]
1690    fn test_fetch_all_targeted() {
1691        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1692        executor.start(|context| async move {
1693            let (mut oracle, mut schemes, peers, mut connections) =
1694                setup_network_and_peers(&context, &[1, 2, 3, 4]).await;
1695
1696            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1697            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1698            add_link(&mut oracle, LINK.clone(), &peers, 0, 3).await;
1699
1700            let key1 = Key(1);
1701            let key2 = Key(2);
1702            let key3 = Key(3);
1703
1704            // Peer 2 has key1
1705            let mut prod2 = Producer::default();
1706            prod2.insert(key1.clone(), Bytes::from("data for key 1"));
1707
1708            // Peer 3 has key3
1709            let mut prod3 = Producer::default();
1710            prod3.insert(key3.clone(), Bytes::from("data for key 3"));
1711
1712            // Peer 4 has key2
1713            let mut prod4 = Producer::default();
1714            prod4.insert(key2.clone(), Bytes::from("data for key 2"));
1715
1716            // Consumer expects all three keys
1717            let (mut cons1, mut cons_out1) = consumer();
1718            cons1.add_expected(key1.clone(), Bytes::from("data for key 1"));
1719            cons1.add_expected(key2.clone(), Bytes::from("data for key 2"));
1720            cons1.add_expected(key3.clone(), Bytes::from("data for key 3"));
1721
1722            let scheme = schemes.remove(0);
1723            let mut mailbox1 = setup_and_spawn_actor(
1724                &context,
1725                oracle.manager(),
1726                oracle.control(scheme.public_key()),
1727                scheme,
1728                connections.remove(0),
1729                cons1,
1730                Producer::default(),
1731            );
1732
1733            let scheme = schemes.remove(0);
1734            let _mailbox2 = setup_and_spawn_actor(
1735                &context,
1736                oracle.manager(),
1737                oracle.control(scheme.public_key()),
1738                scheme,
1739                connections.remove(0),
1740                dummy_consumer(),
1741                prod2,
1742            );
1743
1744            let scheme = schemes.remove(0);
1745            let _mailbox3 = setup_and_spawn_actor(
1746                &context,
1747                oracle.manager(),
1748                oracle.control(scheme.public_key()),
1749                scheme,
1750                connections.remove(0),
1751                dummy_consumer(),
1752                prod3,
1753            );
1754
1755            let scheme = schemes.remove(0);
1756            let _mailbox4 = setup_and_spawn_actor(
1757                &context,
1758                oracle.manager(),
1759                oracle.control(scheme.public_key()),
1760                scheme,
1761                connections.remove(0),
1762                dummy_consumer(),
1763                prod4,
1764            );
1765
1766            // Wait for peer set to be established
1767            context.sleep(Duration::from_millis(100)).await;
1768
1769            // Fetch keys with mixed targeting:
1770            // - key1 targeted to peer 2 (has data) -> should succeed from target
1771            // - key2 targeted to peer 4 (has data) -> should succeed from target
1772            // - key3 no targeting -> fetched from any peer (peer 3 has it)
1773            mailbox1.fetch_all_targeted(vec![
1774                (key1.clone(), non_empty_vec![peers[1].clone()]), // peer 2 has key1
1775                (key2.clone(), non_empty_vec![peers[3].clone()]), // peer 4 has key2
1776            ]);
1777            mailbox1.fetch(key3.clone()); // no targeting for key3
1778
1779            // Collect all three events
1780            let mut results = HashMap::new();
1781            for _ in 0..3 {
1782                let (key, value) = cons_out1.recv().await.unwrap();
1783                results.insert(key, value);
1784            }
1785
1786            // Verify all keys received correct data
1787            assert_eq!(results.len(), 3);
1788            assert_eq!(results.get(&key1).unwrap(), &Bytes::from("data for key 1"));
1789            assert_eq!(results.get(&key2).unwrap(), &Bytes::from("data for key 2"));
1790            assert_eq!(results.get(&key3).unwrap(), &Bytes::from("data for key 3"));
1791
1792            // Verify metrics: 3 successful fetches
1793            let metrics = context.encode();
1794            assert_eq!(
1795                status_metric_total(&metrics, "actor_fetch_total", "Success"),
1796                3
1797            );
1798        });
1799    }
1800
1801    /// Tests that calling fetch() on an in-progress targeted fetch clears the targets,
1802    /// allowing the fetch to succeed from any available peer.
1803    #[test_traced]
1804    fn test_fetch_clears_targets() {
1805        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1806        executor.start(|context| async move {
1807            let (mut oracle, mut schemes, peers, mut connections) =
1808                setup_network_and_peers(&context, &[1, 2, 3]).await;
1809
1810            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1811            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1812
1813            let key = Key(1);
1814            let valid_data = Bytes::from("valid data");
1815
1816            // Peer 2 has no data, peer 3 has the data
1817            let mut prod3 = Producer::default();
1818            prod3.insert(key.clone(), valid_data.clone());
1819
1820            let (cons1, mut cons_out1) = consumer();
1821
1822            let scheme = schemes.remove(0);
1823            let mut mailbox1 = setup_and_spawn_actor(
1824                &context,
1825                oracle.manager(),
1826                oracle.control(scheme.public_key()),
1827                scheme,
1828                connections.remove(0),
1829                cons1,
1830                Producer::default(),
1831            );
1832
1833            let scheme = schemes.remove(0);
1834            let _mailbox2 = setup_and_spawn_actor(
1835                &context,
1836                oracle.manager(),
1837                oracle.control(scheme.public_key()),
1838                scheme,
1839                connections.remove(0),
1840                dummy_consumer(),
1841                Producer::default(), // no data
1842            );
1843
1844            let scheme = schemes.remove(0);
1845            let _mailbox3 = setup_and_spawn_actor(
1846                &context,
1847                oracle.manager(),
1848                oracle.control(scheme.public_key()),
1849                scheme,
1850                connections.remove(0),
1851                dummy_consumer(),
1852                prod3,
1853            );
1854
1855            // Wait for peer set to be established
1856            context.sleep(Duration::from_millis(100)).await;
1857
1858            // Start fetch with target for peer 2 only (who doesn't have data)
1859            mailbox1.fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()]);
1860
1861            // Wait for the targeted fetch to fail a few times
1862            context.sleep(Duration::from_millis(500)).await;
1863
1864            // Call fetch() which should clear the targets and allow fallback to any peer
1865            mailbox1.fetch(key.clone());
1866
1867            // Should now succeed from peer 3 (who has data but wasn't originally targeted)
1868            let (key_actual, value) = cons_out1.recv().await.unwrap();
1869            assert_eq!(key_actual, key);
1870            assert_eq!(value, valid_data);
1871        });
1872    }
1873
1874    #[test_traced]
1875    fn test_fetch_targeted_does_not_restrict_all() {
1876        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1877        executor.start(|context| async move {
1878            let (mut oracle, mut schemes, peers, mut connections) =
1879                setup_network_and_peers(&context, &[1, 2, 3]).await;
1880
1881            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
1882            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
1883
1884            let key = Key(1);
1885            let valid_data = Bytes::from("valid data");
1886
1887            // Peer 2 has no data, peer 3 has the data
1888            let mut prod3 = Producer::default();
1889            prod3.insert(key.clone(), valid_data.clone());
1890
1891            let (cons1, mut cons_out1) = consumer();
1892
1893            let scheme = schemes.remove(0);
1894            let mut mailbox1 = setup_and_spawn_actor(
1895                &context,
1896                oracle.manager(),
1897                oracle.control(scheme.public_key()),
1898                scheme,
1899                connections.remove(0),
1900                cons1,
1901                Producer::default(),
1902            );
1903
1904            let scheme = schemes.remove(0);
1905            let _mailbox2 = setup_and_spawn_actor(
1906                &context,
1907                oracle.manager(),
1908                oracle.control(scheme.public_key()),
1909                scheme,
1910                connections.remove(0),
1911                dummy_consumer(),
1912                Producer::default(), // no data
1913            );
1914
1915            let scheme = schemes.remove(0);
1916            let _mailbox3 = setup_and_spawn_actor(
1917                &context,
1918                oracle.manager(),
1919                oracle.control(scheme.public_key()),
1920                scheme,
1921                connections.remove(0),
1922                dummy_consumer(),
1923                prod3,
1924            );
1925
1926            // Wait for peer set to be established
1927            context.sleep(Duration::from_millis(100)).await;
1928
1929            // Start fetch without targets (can try any peer)
1930            mailbox1.fetch(key.clone());
1931
1932            // Wait a bit for the fetch to start
1933            context.sleep(Duration::from_millis(50)).await;
1934
1935            // Call fetch_targeted with peer 2 only (who doesn't have data)
1936            // This should NOT restrict the existing "all" fetch
1937            mailbox1.fetch_targeted(key.clone(), non_empty_vec![peers[1].clone()]);
1938
1939            // Should still succeed from peer 3 (who has data but wasn't in the targeted call)
1940            // because the original fetch was "all" and shouldn't be restricted
1941            let (key_actual, value) = cons_out1.recv().await.unwrap();
1942            assert_eq!(key_actual, key);
1943            assert_eq!(value, valid_data);
1944        });
1945    }
1946
1947    #[test_traced]
1948    fn test_retain() {
1949        let executor = deterministic::Runner::timed(Duration::from_secs(10));
1950        executor.start(|context| async move {
1951            let (mut oracle, mut schemes, peers, mut connections) =
1952                setup_network_and_peers(&context, &[1, 2]).await;
1953
1954            let key = Key(5);
1955            let mut prod2 = Producer::default();
1956            prod2.insert(key.clone(), Bytes::from("data for key 5"));
1957
1958            let (cons1, mut cons_out1) = consumer();
1959
1960            let scheme = schemes.remove(0);
1961            let mut mailbox1 = setup_and_spawn_actor(
1962                &context,
1963                oracle.manager(),
1964                oracle.control(scheme.public_key()),
1965                scheme,
1966                connections.remove(0),
1967                cons1,
1968                Producer::default(),
1969            );
1970
1971            let scheme = schemes.remove(0);
1972            let _mailbox2 = setup_and_spawn_actor(
1973                &context,
1974                oracle.manager(),
1975                oracle.control(scheme.public_key()),
1976                scheme,
1977                connections.remove(0),
1978                dummy_consumer(),
1979                prod2,
1980            );
1981
1982            // Retain before fetching should have no effect
1983            mailbox1.retain(|_, _| true);
1984            select! {
1985                _ = cons_out1.recv() => {
1986                    panic!("unexpected event");
1987                },
1988                _ = context.sleep(Duration::from_millis(100)) => {},
1989            };
1990
1991            // Start a fetch (no link, so fetch stays in-flight)
1992            mailbox1.fetch(key.clone());
1993
1994            // Retain with predicate that excludes the key. This must clean up
1995            // the in-flight entry for the key.
1996            let key_clone = key.clone();
1997            mailbox1.retain(move |key, _| key != &key_clone);
1998
1999            // Now add link so fetches can complete
2000            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2001
2002            // Fetch same key again, if the in-flight entry wasn't cleaned up, this would
2003            // be treated as a duplicate and silently ignored
2004            mailbox1.fetch(key.clone());
2005
2006            // Should succeed
2007            let (key_actual, value) = cons_out1.recv().await.unwrap();
2008            assert_eq!(key_actual, key);
2009            assert_eq!(value, Bytes::from("data for key 5"));
2010        });
2011    }
2012
2013    #[test_traced]
2014    fn test_retain_uses_subscribers() {
2015        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2016        executor.start(|context| async move {
2017            let (mut oracle, mut schemes, peers, mut connections) =
2018                setup_network_and_peers(&context, &[1, 2]).await;
2019
2020            let key = Key(5);
2021            let mut prod2 = Producer::default();
2022            prod2.insert(key.clone(), Bytes::from("data for key 5"));
2023
2024            let (cons1, mut cons_out1): (Consumer<Key, Bytes, SubscriberTag>, _) = Consumer::new();
2025
2026            let scheme = schemes.remove(0);
2027            let mut mailbox1 = setup_and_spawn_actor(
2028                &context,
2029                oracle.manager(),
2030                oracle.control(scheme.public_key()),
2031                scheme,
2032                connections.remove(0),
2033                cons1,
2034                Producer::default(),
2035            );
2036
2037            let scheme = schemes.remove(0);
2038            let _mailbox2 = setup_and_spawn_actor(
2039                &context,
2040                oracle.manager(),
2041                oracle.control(scheme.public_key()),
2042                scheme,
2043                connections.remove(0),
2044                dummy_consumer(),
2045                prod2,
2046            );
2047
2048            let dropped_subscriber = SubscriberTag(50);
2049            let kept_subscriber = SubscriberTag(51);
2050            mailbox1.fetch(Fetch {
2051                key: key.clone(),
2052                subscriber: dropped_subscriber,
2053            });
2054            mailbox1.fetch(Fetch {
2055                key: key.clone(),
2056                subscriber: kept_subscriber.clone(),
2057            });
2058
2059            context.sleep(Duration::from_millis(100)).await;
2060            mailbox1.retain(move |_, subscriber| subscriber == &kept_subscriber);
2061            context.sleep(Duration::from_millis(100)).await;
2062
2063            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2064
2065            let (key_actual, value) = cons_out1.recv().await.unwrap();
2066            assert_eq!(key_actual, key);
2067            assert_eq!(value, Bytes::from("data for key 5"));
2068        });
2069    }
2070
2071    #[test_traced]
2072    fn test_deliver_receives_subscribers() {
2073        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2074        executor.start(|context| async move {
2075            let (mut oracle, mut schemes, peers, mut connections) =
2076                setup_network_and_peers(&context, &[1, 2]).await;
2077
2078            let key = Key(5);
2079            let mut prod2 = Producer::default();
2080            prod2.insert(key.clone(), Bytes::from("data for key 5"));
2081
2082            let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
2083
2084            let scheme = schemes.remove(0);
2085            let mut mailbox1 = setup_and_spawn_actor(
2086                &context,
2087                oracle.manager(),
2088                oracle.control(scheme.public_key()),
2089                scheme,
2090                connections.remove(0),
2091                cons1,
2092                Producer::default(),
2093            );
2094
2095            let scheme = schemes.remove(0);
2096            let _mailbox2 = setup_and_spawn_actor(
2097                &context,
2098                oracle.manager(),
2099                oracle.control(scheme.public_key()),
2100                scheme,
2101                connections.remove(0),
2102                dummy_consumer(),
2103                prod2,
2104            );
2105
2106            let first_subscriber = SubscriberTag(50);
2107            let second_subscriber = SubscriberTag(51);
2108            mailbox1.fetch(Fetch {
2109                key: key.clone(),
2110                subscriber: second_subscriber.clone(),
2111            });
2112            mailbox1.fetch(Fetch {
2113                key: key.clone(),
2114                subscriber: first_subscriber.clone(),
2115            });
2116
2117            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2118
2119            let (delivery, value) = cons_out1.recv().await.unwrap();
2120            assert_eq!(
2121                delivery,
2122                Delivery {
2123                    key,
2124                    subscribers: non_empty_vec![first_subscriber, second_subscriber],
2125                }
2126            );
2127            assert_eq!(value, Bytes::from("data for key 5"));
2128        });
2129    }
2130
2131    #[test_traced]
2132    fn test_deliver_receives_multiple_subscribers() {
2133        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2134        executor.start(|context| async move {
2135            let (mut oracle, mut schemes, peers, mut connections) =
2136                setup_network_and_peers(&context, &[1, 2]).await;
2137
2138            let key = Key(5);
2139            let mut prod2 = Producer::default();
2140            prod2.insert(key.clone(), Bytes::from("data for key 5"));
2141
2142            let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
2143
2144            let scheme = schemes.remove(0);
2145            let mut mailbox1 = setup_and_spawn_actor(
2146                &context,
2147                oracle.manager(),
2148                oracle.control(scheme.public_key()),
2149                scheme,
2150                connections.remove(0),
2151                cons1,
2152                Producer::default(),
2153            );
2154
2155            let scheme = schemes.remove(0);
2156            let _mailbox2 = setup_and_spawn_actor(
2157                &context,
2158                oracle.manager(),
2159                oracle.control(scheme.public_key()),
2160                scheme,
2161                connections.remove(0),
2162                dummy_consumer(),
2163                prod2,
2164            );
2165
2166            let first_subscriber = SubscriberTag(49);
2167            let second_subscriber = SubscriberTag(50);
2168            mailbox1.fetch(Fetch {
2169                key: key.clone(),
2170                subscriber: first_subscriber.clone(),
2171            });
2172            mailbox1.fetch(Fetch {
2173                key: key.clone(),
2174                subscriber: second_subscriber.clone(),
2175            });
2176
2177            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2178
2179            let (delivery, value) = cons_out1.recv().await.unwrap();
2180            assert_eq!(
2181                delivery,
2182                Delivery {
2183                    key: key.clone(),
2184                    subscribers: non_empty_vec![first_subscriber, second_subscriber],
2185                }
2186            );
2187            assert_eq!(value, Bytes::from("data for key 5"));
2188        });
2189    }
2190
2191    #[test_traced]
2192    fn test_fetch_during_validation_reuses_response_after_success() {
2193        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2194        executor.start(|context| async move {
2195            let (mut oracle, mut schemes, peers, mut connections) =
2196                setup_network_and_peers(&context, &[1, 2]).await;
2197
2198            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2199
2200            let key = Key(5);
2201            let first_response = Bytes::from("data for key 5");
2202            let second_response = Bytes::from("refetched data for key 5");
2203            let mut prod2 = SequencedProducer::default();
2204            prod2.insert(
2205                key.clone(),
2206                [first_response.clone(), second_response.clone()],
2207            );
2208            let prod2_observer = prod2.clone();
2209
2210            let (first_gate_sender, first_gate_receiver) = oneshot::channel();
2211            let (second_gate_sender, second_gate_receiver) = oneshot::channel();
2212            let (cons1, mut deliveries, mut started) = BlockingSubscriberRecordingConsumer::new(
2213                context.child("consumer"),
2214                vec![(first_gate_receiver, true), (second_gate_receiver, true)],
2215            );
2216
2217            let scheme = schemes.remove(0);
2218            let mut mailbox1 = setup_and_spawn_actor(
2219                &context,
2220                oracle.manager(),
2221                oracle.control(scheme.public_key()),
2222                scheme,
2223                connections.remove(0),
2224                cons1,
2225                Producer::default(),
2226            );
2227
2228            let scheme = schemes.remove(0);
2229            let _mailbox2 = setup_and_spawn_actor_with_producer(
2230                &context,
2231                oracle.manager(),
2232                oracle.control(scheme.public_key()),
2233                scheme,
2234                connections.remove(0),
2235                dummy_consumer(),
2236                prod2,
2237            );
2238
2239            let first_subscriber = SubscriberTag(49);
2240            let second_subscriber = SubscriberTag(50);
2241            mailbox1.fetch(Fetch {
2242                key: key.clone(),
2243                subscriber: first_subscriber.clone(),
2244            });
2245
2246            let delivery = started.recv().await.expect("delivery did not start");
2247            assert_eq!(
2248                delivery,
2249                Delivery {
2250                    key: key.clone(),
2251                    subscribers: non_empty_vec![first_subscriber.clone()],
2252                }
2253            );
2254
2255            mailbox1.fetch(Fetch {
2256                key: key.clone(),
2257                subscriber: second_subscriber.clone(),
2258            });
2259            context.sleep(Duration::from_millis(100)).await;
2260            assert_eq!(
2261                prod2_observer.remaining(&key),
2262                vec![second_response.clone()]
2263            );
2264
2265            first_gate_sender.send(()).unwrap();
2266            let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
2267            assert_eq!(
2268                delivery,
2269                Delivery {
2270                    key: key.clone(),
2271                    subscribers: non_empty_vec![first_subscriber],
2272                }
2273            );
2274            assert_eq!(value, first_response);
2275
2276            let delivery = select! {
2277                delivery = started.recv() => delivery.expect("second delivery did not start"),
2278                _ = context.sleep(Duration::from_secs(2)) => {
2279                    panic!("late subscriber was not delivered");
2280                },
2281            };
2282            assert_eq!(
2283                delivery,
2284                Delivery {
2285                    key: key.clone(),
2286                    subscribers: non_empty_vec![second_subscriber.clone()],
2287                }
2288            );
2289
2290            second_gate_sender.send(()).unwrap();
2291            let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
2292            assert_eq!(
2293                delivery,
2294                Delivery {
2295                    key: key.clone(),
2296                    subscribers: non_empty_vec![second_subscriber],
2297                }
2298            );
2299            assert_eq!(value, first_response);
2300            assert_eq!(prod2_observer.remaining(&key), vec![second_response]);
2301        });
2302    }
2303
2304    #[test_traced]
2305    fn test_late_subscriber_delivery_ignores_unrelated_waiter() {
2306        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2307        executor.start(|context| async move {
2308            let (mut oracle, mut schemes, peers, mut connections) =
2309                setup_network_and_peers(&context, &[1, 2, 3]).await;
2310
2311            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2312            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2313
2314            let blocked_key = Key(4);
2315            let waiting_key = Key(5);
2316            let main_key = Key(6);
2317            let data = Bytes::from("data for key 6");
2318
2319            let mut prod2 = Producer::default();
2320            prod2.insert(blocked_key.clone(), Bytes::from("bad data"));
2321
2322            let mut prod3 = Producer::default();
2323            prod3.insert(main_key.clone(), data.clone());
2324
2325            let (first_gate_sender, first_gate_receiver) = oneshot::channel();
2326            let (second_gate_sender, second_gate_receiver) = oneshot::channel();
2327            let (cons1, mut deliveries, mut started) = BlockingSubscriberRecordingConsumer::new(
2328                context.child("consumer"),
2329                vec![(first_gate_receiver, false), (second_gate_receiver, true)],
2330            );
2331
2332            let scheme = schemes.remove(0);
2333            let mut mailbox1 = setup_and_spawn_actor(
2334                &context,
2335                oracle.manager(),
2336                oracle.control(scheme.public_key()),
2337                scheme,
2338                connections.remove(0),
2339                cons1,
2340                Producer::default(),
2341            );
2342
2343            let scheme = schemes.remove(0);
2344            let _mailbox2 = setup_and_spawn_actor(
2345                &context,
2346                oracle.manager(),
2347                oracle.control(scheme.public_key()),
2348                scheme,
2349                connections.remove(0),
2350                dummy_consumer(),
2351                prod2,
2352            );
2353
2354            let scheme = schemes.remove(0);
2355            let _mailbox3 = setup_and_spawn_actor(
2356                &context,
2357                oracle.manager(),
2358                oracle.control(scheme.public_key()),
2359                scheme,
2360                connections.remove(0),
2361                dummy_consumer(),
2362                prod3,
2363            );
2364
2365            mailbox1.fetch(Fetch {
2366                key: blocked_key.clone(),
2367                subscriber: SubscriberTag(1),
2368            });
2369            started
2370                .recv()
2371                .await
2372                .expect("blocking delivery did not start");
2373            first_gate_sender.send(()).unwrap();
2374            wait_for_blocked(&context, &oracle, &peers[0], &peers[1]).await;
2375
2376            mailbox1.fetch_targeted(
2377                Fetch {
2378                    key: waiting_key,
2379                    subscriber: SubscriberTag(2),
2380                },
2381                non_empty_vec![peers[1].clone()],
2382            );
2383            context.sleep(Duration::from_millis(100)).await;
2384
2385            let first_subscriber = SubscriberTag(3);
2386            let second_subscriber = SubscriberTag(4);
2387            mailbox1.fetch(Fetch {
2388                key: main_key.clone(),
2389                subscriber: first_subscriber.clone(),
2390            });
2391
2392            let delivery = started.recv().await.expect("delivery did not start");
2393            assert_eq!(
2394                delivery,
2395                Delivery {
2396                    key: main_key.clone(),
2397                    subscribers: non_empty_vec![first_subscriber.clone()],
2398                }
2399            );
2400
2401            mailbox1.fetch(Fetch {
2402                key: main_key.clone(),
2403                subscriber: second_subscriber.clone(),
2404            });
2405            context.sleep(Duration::from_millis(100)).await;
2406
2407            second_gate_sender.send(()).unwrap();
2408            let (delivery, value) = deliveries.recv().await.expect("consumer channel closed");
2409            assert_eq!(
2410                delivery,
2411                Delivery {
2412                    key: main_key.clone(),
2413                    subscribers: non_empty_vec![first_subscriber],
2414                }
2415            );
2416            assert_eq!(value, data);
2417
2418            let delivery = select! {
2419                delivery = started.recv() => delivery.expect("second delivery did not start"),
2420                _ = context.sleep(Duration::from_secs(2)) => {
2421                    panic!("late subscriber was not delivered while an unrelated waiter was armed");
2422                },
2423            };
2424            assert_eq!(
2425                delivery,
2426                Delivery {
2427                    key: main_key,
2428                    subscribers: non_empty_vec![second_subscriber],
2429                }
2430            );
2431        });
2432    }
2433
2434    #[test_traced]
2435    fn test_deliver_receives_distinct_subscriber_type() {
2436        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2437        executor.start(|context| async move {
2438            let (mut oracle, mut schemes, peers, mut connections) =
2439                setup_network_and_peers(&context, &[1, 2]).await;
2440
2441            let key = Key(5);
2442            let mut prod2 = Producer::default();
2443            prod2.insert(key.clone(), Bytes::from("data for key 5"));
2444
2445            let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
2446
2447            let scheme = schemes.remove(0);
2448            let mut mailbox1 = setup_and_spawn_actor(
2449                &context,
2450                oracle.manager(),
2451                oracle.control(scheme.public_key()),
2452                scheme,
2453                connections.remove(0),
2454                cons1,
2455                Producer::default(),
2456            );
2457
2458            let scheme = schemes.remove(0);
2459            let _mailbox2 = setup_and_spawn_actor(
2460                &context,
2461                oracle.manager(),
2462                oracle.control(scheme.public_key()),
2463                scheme,
2464                connections.remove(0),
2465                dummy_consumer(),
2466                prod2,
2467            );
2468
2469            let subscriber = SubscriberTag(50);
2470            let retained = subscriber.clone();
2471            mailbox1.fetch(Fetch {
2472                key: key.clone(),
2473                subscriber: subscriber.clone(),
2474            });
2475
2476            context.sleep(Duration::from_millis(100)).await;
2477            mailbox1.retain(move |_, subscriber| subscriber == &retained);
2478            context.sleep(Duration::from_millis(100)).await;
2479
2480            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2481
2482            let (delivery, value) = cons_out1.recv().await.unwrap();
2483            assert_eq!(
2484                delivery,
2485                Delivery {
2486                    key,
2487                    subscribers: non_empty_vec![subscriber],
2488                }
2489            );
2490            assert_eq!(value, Bytes::from("data for key 5"));
2491        });
2492    }
2493
2494    #[test_traced]
2495    fn test_deliver_receives_single_subscriber() {
2496        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2497        executor.start(|context| async move {
2498            let (mut oracle, mut schemes, peers, mut connections) =
2499                setup_network_and_peers(&context, &[1, 2]).await;
2500
2501            let key = Key(5);
2502            let mut prod2 = Producer::default();
2503            prod2.insert(key.clone(), Bytes::from("data for key 5"));
2504
2505            let (cons1, mut cons_out1) = SubscriberRecordingConsumer::new();
2506
2507            let scheme = schemes.remove(0);
2508            let mut mailbox1 = setup_and_spawn_actor(
2509                &context,
2510                oracle.manager(),
2511                oracle.control(scheme.public_key()),
2512                scheme,
2513                connections.remove(0),
2514                cons1,
2515                Producer::default(),
2516            );
2517
2518            let scheme = schemes.remove(0);
2519            let _mailbox2 = setup_and_spawn_actor(
2520                &context,
2521                oracle.manager(),
2522                oracle.control(scheme.public_key()),
2523                scheme,
2524                connections.remove(0),
2525                dummy_consumer(),
2526                prod2,
2527            );
2528
2529            let subscriber = SubscriberTag(50);
2530            mailbox1.fetch(Fetch {
2531                key: key.clone(),
2532                subscriber: subscriber.clone(),
2533            });
2534            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2535
2536            let (delivery, value) = cons_out1.recv().await.unwrap();
2537            assert_eq!(
2538                delivery,
2539                Delivery {
2540                    key,
2541                    subscribers: non_empty_vec![subscriber],
2542                }
2543            );
2544            assert_eq!(value, Bytes::from("data for key 5"));
2545        });
2546    }
2547
2548    #[test_traced]
2549    fn test_retain_drops_all() {
2550        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2551        executor.start(|context| async move {
2552            let (mut oracle, mut schemes, peers, mut connections) =
2553                setup_network_and_peers(&context, &[1, 2]).await;
2554
2555            // No link yet - fetch will stay in-flight
2556            let key = Key(6);
2557            let mut prod2 = Producer::default();
2558            prod2.insert(key.clone(), Bytes::from("data for key 6"));
2559
2560            let (cons1, mut cons_out1) = consumer();
2561
2562            let scheme = schemes.remove(0);
2563            let mut mailbox1 = setup_and_spawn_actor(
2564                &context,
2565                oracle.manager(),
2566                oracle.control(scheme.public_key()),
2567                scheme,
2568                connections.remove(0),
2569                cons1,
2570                Producer::default(),
2571            );
2572
2573            let scheme = schemes.remove(0);
2574            let _mailbox2 = setup_and_spawn_actor(
2575                &context,
2576                oracle.manager(),
2577                oracle.control(scheme.public_key()),
2578                scheme,
2579                connections.remove(0),
2580                dummy_consumer(),
2581                prod2,
2582            );
2583
2584            // Pruning before fetching should have no effect.
2585            mailbox1.retain(|_, _| false);
2586            select! {
2587                _ = cons_out1.recv() => {
2588                    panic!("unexpected event");
2589                },
2590                _ = context.sleep(Duration::from_millis(100)) => {},
2591            };
2592
2593            // Start a fetch (no link, so fetch stays in-flight)
2594            mailbox1.fetch(key.clone());
2595
2596            // Prune all fetches.
2597            mailbox1.retain(|_, _| false);
2598
2599            // Now add link so fetches can complete
2600            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2601
2602            // Fetch same key again, if the in-flight entry wasn't cleaned up, this would
2603            // be treated as a duplicate and silently ignored
2604            mailbox1.fetch(key.clone());
2605
2606            // Should succeed
2607            let (key_actual, value) = cons_out1.recv().await.unwrap();
2608            assert_eq!(key_actual, key);
2609            assert_eq!(value, Bytes::from("data for key 6"));
2610        });
2611    }
2612
2613    /// Tests that when a peer is rate-limited, the fetcher spills over to another peer.
2614    /// With 2 peers and rate limit of 1/sec each, 2 requests issued simultaneously should
2615    /// both complete immediately (one to each peer) without waiting for rate limit reset.
2616    #[test_traced]
2617    fn test_rate_limit_spillover() {
2618        let executor = deterministic::Runner::timed(Duration::from_secs(30));
2619        executor.start(|context| async move {
2620            // Use a very restrictive rate limit: 1 request per second per peer
2621            let (mut oracle, mut schemes, peers, mut connections) =
2622                setup_network_and_peers_with_rate_limit(
2623                    &context,
2624                    &[1, 2, 3],
2625                    Quota::per_second(NZU32!(1)),
2626                )
2627                .await;
2628
2629            // Add links between peer 1 and both peer 2 and peer 3
2630            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2631            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2632
2633            // Both peer 2 and peer 3 have the same data
2634            let mut prod2 = Producer::default();
2635            let mut prod3 = Producer::default();
2636            prod2.insert(Key(0), Bytes::from("data for key 0"));
2637            prod2.insert(Key(1), Bytes::from("data for key 1"));
2638            prod3.insert(Key(0), Bytes::from("data for key 0"));
2639            prod3.insert(Key(1), Bytes::from("data for key 1"));
2640
2641            let (cons1, mut cons_out1) = consumer();
2642
2643            // Set up peer 1 (the requester)
2644            let scheme = schemes.remove(0);
2645            let mut mailbox1 = setup_and_spawn_actor(
2646                &context,
2647                oracle.manager(),
2648                oracle.control(scheme.public_key()),
2649                scheme,
2650                connections.remove(0),
2651                cons1,
2652                Producer::default(),
2653            );
2654
2655            // Set up peer 2 (has data)
2656            let scheme = schemes.remove(0);
2657            let _mailbox2 = setup_and_spawn_actor(
2658                &context,
2659                oracle.manager(),
2660                oracle.control(scheme.public_key()),
2661                scheme,
2662                connections.remove(0),
2663                dummy_consumer(),
2664                prod2,
2665            );
2666
2667            // Set up peer 3 (also has data)
2668            let scheme = schemes.remove(0);
2669            let _mailbox3 = setup_and_spawn_actor(
2670                &context,
2671                oracle.manager(),
2672                oracle.control(scheme.public_key()),
2673                scheme,
2674                connections.remove(0),
2675                dummy_consumer(),
2676                prod3,
2677            );
2678
2679            // Wait for peer set to be established
2680            context.sleep(Duration::from_millis(100)).await;
2681            let start = context.current();
2682
2683            // Issue 2 fetches rapidly.
2684            // With rate limit of 1/sec per peer and 2 peers, both should complete
2685            // immediately via spill-over (one request to each peer)
2686            mailbox1.fetch(Key(0));
2687            mailbox1.fetch(Key(1));
2688
2689            // Collect results
2690            let mut results = HashMap::new();
2691            for _ in 0..2 {
2692                let (key, value) = cons_out1.recv().await.unwrap();
2693                results.insert(key.clone(), value);
2694            }
2695
2696            // Verify both keys were fetched successfully
2697            assert_eq!(results.len(), 2);
2698            assert_eq!(
2699                results.get(&Key(0)).unwrap(),
2700                &Bytes::from("data for key 0")
2701            );
2702            assert_eq!(
2703                results.get(&Key(1)).unwrap(),
2704                &Bytes::from("data for key 1")
2705            );
2706
2707            // Verify it completed quickly (well under 1 second) - proves spill-over worked
2708            // Without spill-over, the second request would wait ~1 second for rate limit reset
2709            let elapsed = context.current().duration_since(start).unwrap();
2710            assert!(
2711                elapsed < Duration::from_millis(500),
2712                "Expected quick completion via spill-over, but took {elapsed:?}"
2713            );
2714        });
2715    }
2716
2717    /// Tests that rate limiting causes retries to eventually succeed after the rate limit resets.
2718    /// This test uses a single peer with a restrictive rate limit and verifies that
2719    /// fetches eventually complete after waiting for the rate limit to reset.
2720    #[test_traced]
2721    fn test_rate_limit_retry_after_reset() {
2722        let executor = deterministic::Runner::timed(Duration::from_secs(30));
2723        executor.start(|context| async move {
2724            // Use a restrictive rate limit: 1 request per second
2725            let (mut oracle, mut schemes, peers, mut connections) =
2726                setup_network_and_peers_with_rate_limit(
2727                    &context,
2728                    &[1, 2],
2729                    Quota::per_second(NZU32!(1)),
2730                )
2731                .await;
2732
2733            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2734
2735            // Peer 2 has data for multiple keys
2736            let mut prod2 = Producer::default();
2737            prod2.insert(Key(1), Bytes::from("data for key 1"));
2738            prod2.insert(Key(2), Bytes::from("data for key 2"));
2739            prod2.insert(Key(3), Bytes::from("data for key 3"));
2740
2741            let (cons1, mut cons_out1) = consumer();
2742
2743            let scheme = schemes.remove(0);
2744            let mut mailbox1 = setup_and_spawn_actor(
2745                &context,
2746                oracle.manager(),
2747                oracle.control(scheme.public_key()),
2748                scheme,
2749                connections.remove(0),
2750                cons1,
2751                Producer::default(),
2752            );
2753
2754            let scheme = schemes.remove(0);
2755            let _mailbox2 = setup_and_spawn_actor(
2756                &context,
2757                oracle.manager(),
2758                oracle.control(scheme.public_key()),
2759                scheme,
2760                connections.remove(0),
2761                dummy_consumer(),
2762                prod2,
2763            );
2764
2765            // Wait for peer set to be established
2766            context.sleep(Duration::from_millis(100)).await;
2767            let start = context.current();
2768
2769            // Issue 3 fetches to a single peer with rate limit of 1/sec.
2770            // Only 1 can be sent immediately, the others must wait for rate limit reset
2771            mailbox1.fetch(Key(1));
2772            mailbox1.fetch(Key(2));
2773            mailbox1.fetch(Key(3));
2774
2775            // All 3 should eventually succeed (after rate limit resets)
2776            let mut results = HashMap::new();
2777            for _ in 0..3 {
2778                let (key, value) = cons_out1.recv().await.unwrap();
2779                results.insert(key.clone(), value);
2780            }
2781
2782            assert_eq!(results.len(), 3);
2783            for i in 1..=3 {
2784                assert_eq!(
2785                    results.get(&Key(i)).unwrap(),
2786                    &Bytes::from(format!("data for key {}", i))
2787                );
2788            }
2789
2790            // Verify it took significant time due to rate limiting
2791            // With 3 requests at 1/sec to a single peer, requests 2 and 3 must wait
2792            // for rate limit resets (~1 second each), so total should be > 2 seconds
2793            let elapsed = context.current().duration_since(start).unwrap();
2794            assert!(
2795                elapsed > Duration::from_secs(2),
2796                "Expected rate limiting to cause delay > 2s, but took {elapsed:?}"
2797            );
2798        });
2799    }
2800
2801    /// Tests that the resolver never sends fetches to itself (me exclusion).
2802    /// Even when the local peer has the data in its producer, it should fetch from
2803    /// another peer instead.
2804    #[test_traced]
2805    fn test_self_exclusion() {
2806        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2807        executor.start(|context| async move {
2808            let (mut oracle, mut schemes, peers, mut connections) =
2809                setup_network_and_peers(&context, &[1, 2]).await;
2810
2811            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2812
2813            let key = Key(1);
2814            let data = Bytes::from("shared data");
2815
2816            // Both peers have the data - peer 1 (requester) and peer 2
2817            let mut prod1 = Producer::default();
2818            prod1.insert(key.clone(), data.clone());
2819            let mut prod2 = Producer::default();
2820            prod2.insert(key.clone(), data.clone());
2821
2822            let (cons1, mut cons_out1) = consumer();
2823
2824            // Set up peer 1 with `me` set - it has the data but should NOT fetch from itself
2825            let scheme = schemes.remove(0);
2826            let mut mailbox1 = setup_and_spawn_actor(
2827                &context,
2828                oracle.manager(),
2829                oracle.control(scheme.public_key()),
2830                scheme,
2831                connections.remove(0),
2832                cons1,
2833                prod1, // peer 1 has the data
2834            );
2835
2836            // Set up peer 2 - also has the data
2837            let scheme = schemes.remove(0);
2838            let _mailbox2 = setup_and_spawn_actor(
2839                &context,
2840                oracle.manager(),
2841                oracle.control(scheme.public_key()),
2842                scheme,
2843                connections.remove(0),
2844                dummy_consumer(),
2845                prod2,
2846            );
2847
2848            // Wait for peer set to be established
2849            context.sleep(Duration::from_millis(100)).await;
2850
2851            // Fetch the key - should get it from peer 2, not from self
2852            mailbox1.fetch(key.clone());
2853
2854            // Should succeed (from peer 2)
2855            let (key_actual, value) = cons_out1.recv().await.unwrap();
2856            assert_eq!(key_actual, key);
2857            assert_eq!(value, data);
2858        });
2859    }
2860
2861    #[test_traced]
2862    fn test_fetch_uses_primary_peers_only() {
2863        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2864        executor.start(|context| async move {
2865            let (network, oracle) = Network::new(
2866                context.child("network"),
2867                commonware_p2p::simulated::Config {
2868                    max_size: 1024 * 1024,
2869                    disconnect_on_block: true,
2870                    tracked_peer_sets: NZUsize!(1),
2871                },
2872            );
2873            network.start();
2874
2875            let schemes: Vec<PrivateKey> = [1u64, 2, 3]
2876                .into_iter()
2877                .map(PrivateKey::from_seed)
2878                .collect();
2879            let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
2880            let mut schemes = schemes;
2881
2882            let mut connections = Vec::new();
2883            for peer in &peers {
2884                let (sender, receiver) = oracle
2885                    .control(peer.clone())
2886                    .register(0, Quota::per_second(RATE_LIMIT))
2887                    .await
2888                    .unwrap();
2889                connections.push((sender, receiver));
2890            }
2891
2892            // Topology: peer 1 (requester) linked to peers 2 and 3.
2893            // Peer 2 is primary (no data), peer 3 is secondary (has data).
2894            // Fetch should only query primary peers, so the request must time out.
2895            let mut oracle = oracle;
2896            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2897            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2898
2899            oracle.manager().track(
2900                1,
2901                TrackedPeers::new(
2902                    Set::try_from([peers[1].clone()]).unwrap(),
2903                    Set::try_from([peers[2].clone()]).unwrap(),
2904                ),
2905            );
2906            context.sleep(Duration::from_millis(100)).await;
2907
2908            let key = Key(1);
2909            let data = Bytes::from("secondary only data");
2910
2911            let (cons1, mut cons_out1) = consumer();
2912
2913            // Peer 1: the requester, has no data.
2914            let scheme = schemes.remove(0);
2915            let mut mailbox1 = setup_and_spawn_actor(
2916                &context,
2917                oracle.manager(),
2918                oracle.control(scheme.public_key()),
2919                scheme,
2920                connections.remove(0),
2921                cons1,
2922                Producer::default(),
2923            );
2924
2925            // Peer 2: primary, has no data.
2926            let scheme = schemes.remove(0);
2927            let _mailbox2 = setup_and_spawn_actor(
2928                &context,
2929                oracle.manager(),
2930                oracle.control(scheme.public_key()),
2931                scheme,
2932                connections.remove(0),
2933                dummy_consumer(),
2934                Producer::default(),
2935            );
2936
2937            // Peer 3: secondary, has the data. Should not be queried.
2938            let mut prod3 = Producer::default();
2939            prod3.insert(key.clone(), data);
2940            let scheme = schemes.remove(0);
2941            let _mailbox3 = setup_and_spawn_actor(
2942                &context,
2943                oracle.manager(),
2944                oracle.control(scheme.public_key()),
2945                scheme,
2946                connections.remove(0),
2947                dummy_consumer(),
2948                prod3,
2949            );
2950
2951            // Fetch should time out because the only peer with data (peer 3)
2952            // is secondary and won't be queried.
2953            mailbox1.fetch(key.clone());
2954
2955            select! {
2956                event = cons_out1.recv() => {
2957                    panic!("fetch should not succeed from a secondary peer, got: {event:?}");
2958                },
2959                _ = context.sleep(Duration::from_secs(2)) => {},
2960            }
2961        });
2962    }
2963
2964    #[test_traced]
2965    fn test_fetch_uses_latest_primary_set_only() {
2966        let executor = deterministic::Runner::timed(Duration::from_secs(10));
2967        executor.start(|context| async move {
2968            let (network, oracle) = Network::new(
2969                context.child("network"),
2970                commonware_p2p::simulated::Config {
2971                    max_size: 1024 * 1024,
2972                    disconnect_on_block: true,
2973                    tracked_peer_sets: NZUsize!(2),
2974                },
2975            );
2976            network.start();
2977
2978            let schemes: Vec<PrivateKey> = [1u64, 2, 3]
2979                .into_iter()
2980                .map(PrivateKey::from_seed)
2981                .collect();
2982            let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
2983            let mut schemes = schemes;
2984
2985            let mut connections = Vec::new();
2986            for peer in &peers {
2987                let (sender, receiver) = oracle
2988                    .control(peer.clone())
2989                    .register(0, Quota::per_second(RATE_LIMIT))
2990                    .await
2991                    .unwrap();
2992                connections.push((sender, receiver));
2993            }
2994
2995            let mut oracle = oracle;
2996            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
2997            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
2998
2999            // Keep the requester tracked across the cutover so the fetch path itself remains
3000            // active, while peer 2 is retained only through the overlap window after peer 3
3001            // becomes the newest primary set.
3002            oracle
3003                .manager()
3004                .track(
3005                    0,
3006                    Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
3007                );
3008            context.sleep(Duration::from_millis(100)).await;
3009
3010            let key = Key(7);
3011            let targeted_key = Key(8);
3012            let data = Bytes::from("old primary data");
3013
3014            let (cons1, mut cons_out1) = consumer();
3015
3016            // Peer 1: requester.
3017            let scheme = schemes.remove(0);
3018            let mut mailbox1 = setup_and_spawn_actor(
3019                &context,
3020                oracle.manager(),
3021                oracle.control(scheme.public_key()),
3022                scheme,
3023                connections.remove(0),
3024                cons1,
3025                Producer::default(),
3026            );
3027
3028            // Peer 2: old primary, still retained in `all.primary`, has the data.
3029            let mut prod2 = Producer::default();
3030            prod2.insert(key.clone(), data.clone());
3031            prod2.insert(targeted_key.clone(), data);
3032            let scheme = schemes.remove(0);
3033            let _mailbox2 = setup_and_spawn_actor(
3034                &context,
3035                oracle.manager(),
3036                oracle.control(scheme.public_key()),
3037                scheme,
3038                connections.remove(0),
3039                dummy_consumer(),
3040                prod2,
3041            );
3042
3043            // Peer 3: latest primary, has no data.
3044            let scheme = schemes.remove(0);
3045            let _mailbox3 = setup_and_spawn_actor(
3046                &context,
3047                oracle.manager(),
3048                oracle.control(scheme.public_key()),
3049                scheme,
3050                connections.remove(0),
3051                dummy_consumer(),
3052                Producer::default(),
3053            );
3054
3055            context.sleep(Duration::from_millis(100)).await;
3056
3057            // Track peer 3 as the latest primary while keeping the requester in the peer set.
3058            // Peer 2 remains in the provider's overlap window (`all.primary`), but new resolver traffic
3059            // should use only `latest.primary`.
3060            oracle
3061                .manager()
3062                .track(
3063                    1,
3064                    Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
3065                );
3066            context.sleep(Duration::from_millis(100)).await;
3067
3068            mailbox1.fetch(key);
3069
3070            select! {
3071                event = cons_out1.recv() => {
3072                    panic!(
3073                        "fetch should not succeed from an old primary retained only in the overlap window, got: {event:?}"
3074                    );
3075                },
3076                _ = context.sleep(Duration::from_secs(1)) => {},
3077            }
3078
3079            // Explicit targets still respect the latest-primary filter.
3080            mailbox1
3081                .fetch_targeted(targeted_key, non_empty_vec![peers[1].clone()]);
3082
3083            select! {
3084                event = cons_out1.recv() => {
3085                    panic!(
3086                        "targeted fetch should not bypass the latest-primary filter, got: {event:?}"
3087                    );
3088                },
3089                _ = context.sleep(Duration::from_secs(1)) => {},
3090            }
3091        });
3092    }
3093
3094    #[test_traced]
3095    fn test_fetch_after_cutover_relies_on_latest_primary_history() {
3096        let executor = deterministic::Runner::timed(Duration::from_secs(10));
3097        executor.start(|context| async move {
3098            let (network, oracle) = Network::new(
3099                context.child("network"),
3100                commonware_p2p::simulated::Config {
3101                    max_size: 1024 * 1024,
3102                    disconnect_on_block: true,
3103                    tracked_peer_sets: NZUsize!(2),
3104                },
3105            );
3106            network.start();
3107
3108            let schemes: Vec<PrivateKey> = [1u64, 2, 3]
3109                .into_iter()
3110                .map(PrivateKey::from_seed)
3111                .collect();
3112            let peers: Vec<PublicKey> = schemes.iter().map(|s| s.public_key()).collect();
3113            let mut schemes = schemes;
3114
3115            let mut connections = Vec::new();
3116            for peer in &peers {
3117                let (sender, receiver) = oracle
3118                    .control(peer.clone())
3119                    .register(0, Quota::per_second(RATE_LIMIT))
3120                    .await
3121                    .unwrap();
3122                connections.push((sender, receiver));
3123            }
3124
3125            let mut oracle = oracle;
3126            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3127            add_link(&mut oracle, LINK.clone(), &peers, 0, 2).await;
3128
3129            // Keep the requester in the peer set across the cutover while peer 2 remains connected
3130            // only through the overlap window after the latest primary advances to peer 3.
3131            oracle.manager().track(
3132                0,
3133                Set::try_from([peers[0].clone(), peers[1].clone()]).unwrap(),
3134            );
3135            context.sleep(Duration::from_millis(100)).await;
3136
3137            let key = Key(9);
3138            let invalid_history = Bytes::from("stale overlap history");
3139            let valid_history = Bytes::from("latest primary history");
3140
3141            let (mut cons1, mut cons_out1) = consumer();
3142            cons1.add_expected(key.clone(), valid_history.clone());
3143
3144            // Peer 1: requester.
3145            let scheme = schemes.remove(0);
3146            let mut mailbox1 = setup_and_spawn_actor(
3147                &context,
3148                oracle.manager(),
3149                oracle.control(scheme.public_key()),
3150                scheme,
3151                connections.remove(0),
3152                cons1,
3153                Producer::default(),
3154            );
3155
3156            // Peer 2: old primary retained only via overlap. If queried, it would be blocked for
3157            // serving invalid history.
3158            let mut prod2 = Producer::default();
3159            prod2.insert(key.clone(), invalid_history);
3160            let scheme = schemes.remove(0);
3161            let _mailbox2 = setup_and_spawn_actor(
3162                &context,
3163                oracle.manager(),
3164                oracle.control(scheme.public_key()),
3165                scheme,
3166                connections.remove(0),
3167                dummy_consumer(),
3168                prod2,
3169            );
3170
3171            // Peer 3: latest primary and the only peer that should satisfy the fetch.
3172            let mut prod3 = Producer::default();
3173            prod3.insert(key.clone(), valid_history.clone());
3174            let scheme = schemes.remove(0);
3175            let _mailbox3 = setup_and_spawn_actor(
3176                &context,
3177                oracle.manager(),
3178                oracle.control(scheme.public_key()),
3179                scheme,
3180                connections.remove(0),
3181                dummy_consumer(),
3182                prod3,
3183            );
3184
3185            context.sleep(Duration::from_millis(100)).await;
3186
3187            oracle.manager().track(
3188                1,
3189                Set::try_from([peers[0].clone(), peers[2].clone()]).unwrap(),
3190            );
3191            context.sleep(Duration::from_millis(100)).await;
3192
3193            mailbox1.fetch(key.clone());
3194
3195            let (key_actual, value) = cons_out1.recv().await.unwrap();
3196            assert_eq!(key_actual, key);
3197            assert_eq!(value, valid_history);
3198
3199            assert!(
3200                oracle.blocked().await.unwrap().is_empty(),
3201                "overlap-only peers should not be queried for post-cutover history"
3202            );
3203        });
3204    }
3205
3206    #[test_traced]
3207    fn test_secondary_peer_requests_are_served() {
3208        let executor = deterministic::Runner::timed(Duration::from_secs(10));
3209        executor.start(|context| async move {
3210            let (mut oracle, mut schemes, peers, mut connections) =
3211                setup_network_and_peers(&context, &[1, 2]).await;
3212
3213            // Topology: peer 1 is primary (has data), peer 2 is secondary (requester).
3214            // Verifies that a primary peer serves requests from secondary peers
3215            // (i.e. secondary peers can't fetch via broadcast, but their direct
3216            // requests are still answered).
3217            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3218
3219            oracle.manager().track(
3220                1,
3221                TrackedPeers::new(
3222                    Set::try_from([peers[0].clone()]).unwrap(),
3223                    Set::try_from([peers[1].clone()]).unwrap(),
3224                ),
3225            );
3226            context.sleep(Duration::from_millis(100)).await;
3227
3228            let key = Key(9);
3229            let data = Bytes::from("served to secondary");
3230
3231            // Peer 1: primary, has the data.
3232            let mut prod1 = Producer::default();
3233            prod1.insert(key.clone(), data.clone());
3234
3235            let scheme = schemes.remove(0);
3236            let _mailbox1 = setup_and_spawn_actor(
3237                &context,
3238                oracle.manager(),
3239                oracle.control(scheme.public_key()),
3240                scheme,
3241                connections.remove(0),
3242                dummy_consumer(),
3243                prod1,
3244            );
3245
3246            // Peer 2: secondary, uses fetch_targeted to explicitly request from peer 1.
3247            let (mut cons2, mut cons_out2) = consumer();
3248            cons2.add_expected(key.clone(), data.clone());
3249            let scheme = schemes.remove(0);
3250            let mut mailbox2 = setup_and_spawn_actor(
3251                &context,
3252                oracle.manager(),
3253                oracle.control(scheme.public_key()),
3254                scheme,
3255                connections.remove(0),
3256                cons2,
3257                Producer::default(),
3258            );
3259
3260            mailbox2.fetch_targeted(key.clone(), non_empty_vec![peers[0].clone()]);
3261
3262            let (key_actual, value) = cons_out2.recv().await.unwrap();
3263            assert_eq!(key_actual, key);
3264            assert_eq!(value, data);
3265        });
3266    }
3267
3268    #[test_traced]
3269    fn test_shutdown_aborts_pending_delivery_without_leaked_tasks() {
3270        let executor = deterministic::Runner::timed(Duration::from_secs(10));
3271        executor.start(|context| async move {
3272            let (mut oracle, mut schemes, peers, mut connections) =
3273                setup_network_and_peers(&context, &[1, 2]).await;
3274
3275            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3276
3277            let key = Key(1);
3278            let data = Bytes::from("data for key 1");
3279            let mut prod2 = Producer::default();
3280            prod2.insert(key.clone(), data);
3281
3282            let (mut gate_sender, gate_receiver) = oneshot::channel();
3283            let (cons1, mut cons_out1, mut started) =
3284                BlockingConsumer::new(context.child("consumer"), vec![(gate_receiver, true)]);
3285
3286            let actor_context = context.child("actor");
3287
3288            let scheme = schemes.remove(0);
3289            let public_key = scheme.public_key();
3290            let (engine, mut mailbox1): (_, Mailbox<Key, PublicKey>) = Engine::new(
3291                actor_context.child("peer").with_attribute("index", 0),
3292                Config {
3293                    peer_provider: oracle.manager(),
3294                    blocker: oracle.control(public_key.clone()),
3295                    consumer: cons1,
3296                    producer: Producer::<Key, Bytes>::default(),
3297                    mailbox_size: MAILBOX_SIZE,
3298                    me: Some(public_key),
3299                    initial: INITIAL_DURATION,
3300                    timeout: TIMEOUT,
3301                    fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
3302                    priority_requests: false,
3303                    priority_responses: false,
3304                },
3305            );
3306            let handle1 = engine.start(connections.remove(0));
3307
3308            let scheme = schemes.remove(0);
3309            let public_key = scheme.public_key();
3310            let (engine, _mailbox2): (_, Mailbox<Key, PublicKey>) = Engine::new(
3311                actor_context.child("peer").with_attribute("index", 1),
3312                Config {
3313                    peer_provider: oracle.manager(),
3314                    blocker: oracle.control(public_key.clone()),
3315                    consumer: dummy_consumer(),
3316                    producer: prod2,
3317                    mailbox_size: MAILBOX_SIZE,
3318                    me: Some(public_key),
3319                    initial: INITIAL_DURATION,
3320                    timeout: TIMEOUT,
3321                    fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
3322                    priority_requests: false,
3323                    priority_responses: false,
3324                },
3325            );
3326            let handle2 = engine.start(connections.remove(0));
3327
3328            mailbox1.fetch(key.clone());
3329            let started_key = started.recv().await.expect("delivery did not start");
3330            assert_eq!(started_key, key);
3331
3332            assert!(count_running_tasks(&context, "actor") > 0);
3333
3334            handle1.abort();
3335            handle2.abort();
3336
3337            context.sleep(Duration::from_millis(100)).await;
3338
3339            select! {
3340                _ = gate_sender.closed() => {},
3341                _ = context.sleep(Duration::from_secs(2)) => {
3342                    panic!("pending delivery was not aborted");
3343                },
3344            };
3345
3346            select! {
3347                event = cons_out1.recv() => assert!(event.is_none(), "unexpected event"),
3348                _ = context.sleep(Duration::from_millis(100)) => {},
3349            };
3350
3351            let running_after = count_running_tasks(&context, "actor");
3352            assert_eq!(
3353                running_after, 0,
3354                "all actor tasks should be stopped, but {running_after} still running"
3355            );
3356        });
3357    }
3358
3359    #[allow(clippy::type_complexity)]
3360    fn spawn_actors_with_handles(
3361        context: &deterministic::Context,
3362        oracle: &Oracle<PublicKey, deterministic::Context>,
3363        schemes: Vec<PrivateKey>,
3364        connections: Vec<(
3365            Sender<PublicKey, deterministic::Context>,
3366            Receiver<PublicKey>,
3367        )>,
3368        consumers: Vec<Consumer<Key, Bytes>>,
3369        producers: Vec<Producer<Key, Bytes>>,
3370    ) -> (
3371        Vec<Mailbox<Key, PublicKey>>,
3372        Vec<commonware_runtime::Handle<()>>,
3373    ) {
3374        let actor_context = context.child("actor");
3375        let mut mailboxes = Vec::new();
3376        let mut handles = Vec::new();
3377
3378        for (idx, ((scheme, conn), (consumer, producer))) in schemes
3379            .into_iter()
3380            .zip(connections)
3381            .zip(consumers.into_iter().zip(producers))
3382            .enumerate()
3383        {
3384            let ctx = actor_context.child("peer").with_attribute("index", idx);
3385            let public_key = scheme.public_key();
3386            let (engine, mailbox) = Engine::new(
3387                ctx,
3388                Config {
3389                    peer_provider: oracle.manager(),
3390                    blocker: oracle.control(public_key.clone()),
3391                    consumer,
3392                    producer,
3393                    mailbox_size: MAILBOX_SIZE,
3394                    me: Some(public_key),
3395                    initial: INITIAL_DURATION,
3396                    timeout: TIMEOUT,
3397                    fetch_retry_timeout: FETCH_RETRY_TIMEOUT,
3398                    priority_requests: false,
3399                    priority_responses: false,
3400                },
3401            );
3402            handles.push(engine.start(conn));
3403            mailboxes.push(mailbox);
3404        }
3405
3406        (mailboxes, handles)
3407    }
3408
3409    #[test_traced]
3410    fn test_operations_after_shutdown_do_not_panic() {
3411        let executor = deterministic::Runner::timed(Duration::from_secs(10));
3412        executor.start(|context| async move {
3413            let (mut oracle, schemes, peers, connections) =
3414                setup_network_and_peers(&context, &[1, 2]).await;
3415
3416            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3417
3418            let key = Key(1);
3419            let mut prod2 = Producer::default();
3420            prod2.insert(key.clone(), Bytes::from("data for key 1"));
3421
3422            let (cons1, mut cons_out1) = consumer();
3423
3424            let (mut mailboxes, handles) = spawn_actors_with_handles(
3425                &context,
3426                &oracle,
3427                schemes,
3428                connections,
3429                vec![cons1, dummy_consumer()],
3430                vec![Producer::default(), prod2],
3431            );
3432
3433            // Fetch to verify network is functional
3434            mailboxes[0].fetch(key.clone());
3435            let (_, value) = cons_out1.recv().await.unwrap();
3436            assert_eq!(value, Bytes::from("data for key 1"));
3437
3438            // Abort all actors
3439            for handle in handles {
3440                handle.abort();
3441            }
3442            context.sleep(Duration::from_millis(100)).await;
3443
3444            // All operations should not panic after shutdown
3445
3446            // Fetch should not panic
3447            let key2 = Key(2);
3448            mailboxes[0].fetch(key2.clone());
3449
3450            // Retain can prune a single key after shutdown without panicking.
3451            let canceled = key2;
3452            mailboxes[0].retain(move |key, _| key != &canceled);
3453
3454            // Retain should not panic
3455            mailboxes[0].retain(|_, _| true);
3456
3457            // Fetch targeted should not panic
3458            mailboxes[0].fetch_targeted(Key(3), non_empty_vec![peers[1].clone()]);
3459        });
3460    }
3461
3462    fn clean_shutdown(seed: u64) {
3463        let cfg = deterministic::Config::default()
3464            .with_seed(seed)
3465            .with_timeout(Some(Duration::from_secs(30)));
3466        let executor = deterministic::Runner::new(cfg);
3467        executor.start(|context| async move {
3468            let (mut oracle, schemes, peers, connections) =
3469                setup_network_and_peers(&context, &[1, 2]).await;
3470
3471            add_link(&mut oracle, LINK.clone(), &peers, 0, 1).await;
3472
3473            let key = Key(1);
3474            let mut prod2 = Producer::default();
3475            prod2.insert(key.clone(), Bytes::from("data for key 1"));
3476
3477            let (cons1, mut cons_out1) = consumer();
3478
3479            let (mut mailboxes, handles) = spawn_actors_with_handles(
3480                &context,
3481                &oracle,
3482                schemes,
3483                connections,
3484                vec![cons1, dummy_consumer()],
3485                vec![Producer::default(), prod2],
3486            );
3487
3488            // Allow tasks to start
3489            context.sleep(Duration::from_millis(100)).await;
3490
3491            // Count running tasks under the actor prefix
3492            let running_before = count_running_tasks(&context, "actor");
3493            assert!(
3494                running_before > 0,
3495                "at least one actor task should be running"
3496            );
3497
3498            // Verify network is functional
3499            mailboxes[0].fetch(key.clone());
3500            let (_, value) = cons_out1.recv().await.unwrap();
3501            assert_eq!(value, Bytes::from("data for key 1"));
3502
3503            // Abort all actors
3504            for handle in handles {
3505                handle.abort();
3506            }
3507            context.sleep(Duration::from_millis(100)).await;
3508
3509            // Verify all actor tasks are stopped
3510            let running_after = count_running_tasks(&context, "actor");
3511            assert_eq!(
3512                running_after, 0,
3513                "all actor tasks should be stopped, but {running_after} still running"
3514            );
3515        });
3516    }
3517
3518    #[test]
3519    fn test_clean_shutdown() {
3520        for seed in 0..25 {
3521            clean_shutdown(seed);
3522        }
3523    }
3524}