Skip to main content

commonware_resolver/
opaque.rs

1//! Resolve keys from an opaque asynchronous fetcher.
2//!
3//! This module owns the generic resolver actor used when fetching data only
4//! requires asking an application-provided source for raw bytes or objects.
5//! Implementations provide [`Fetcher::fetch`]; this module handles request
6//! coalescing, retain pruning, retry scheduling, consumer delivery, and
7//! accepted-response redelivery.
8//!
9//! Target hints supplied through [`crate::TargetedResolver::fetch_targeted`] and
10//! [`crate::TargetedResolver::fetch_all_targeted`] are ignored because opaque
11//! fetchers do not have peer-specific routing.
12
13use crate::{
14    delivery::{Completion as DeliveryCompletion, Tracker as DeliveryTracker},
15    ingress::{self, FetchKey, Message},
16    subscribers, Consumer, Delivery, Fetch, TargetedResolver,
17};
18use commonware_actor::{mailbox, Feedback};
19use commonware_cryptography::PublicKey;
20use commonware_macros::select_loop;
21use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner};
22use commonware_utils::{
23    futures::{AbortablePool, Aborter},
24    vec::NonEmptyVec,
25    Span,
26};
27use futures::future::{self, Either};
28use std::{
29    collections::{BTreeMap, BTreeSet},
30    future::Future,
31    marker::PhantomData,
32    num::NonZeroUsize,
33    time::{Duration, SystemTime},
34};
35use tracing::{debug, trace, warn};
36
37/// Fetches raw values for resolver keys.
38pub trait Fetcher {
39    /// Key requested by the resolver.
40    type Key: Span;
41
42    /// Raw value delivered to the consumer for validation.
43    type Value;
44
45    /// Fetch the value for `key`.
46    ///
47    /// Return `None` for transient failures, missing data, or unexpected source
48    /// responses. The resolver will retry while the key still has retained
49    /// subscribers.
50    fn fetch(&self, key: Self::Key) -> impl Future<Output = Option<Self::Value>> + Send;
51}
52
53/// Handle to an opaque-fetcher resolver actor.
54pub struct Resolver<K, S, P>
55where
56    K: Span,
57    S: Clone + Eq + Send + 'static,
58    P: PublicKey,
59{
60    mailbox: mailbox::Sender<Message<K, S>>,
61    _peer: PhantomData<P>,
62}
63
64impl<K, S, P> Clone for Resolver<K, S, P>
65where
66    K: Span,
67    S: Clone + Eq + Send + 'static,
68    P: PublicKey,
69{
70    fn clone(&self) -> Self {
71        Self {
72            mailbox: self.mailbox.clone(),
73            _peer: PhantomData,
74        }
75    }
76}
77
78impl<K, S, P> crate::Resolver for Resolver<K, S, P>
79where
80    K: Span,
81    S: Clone + Eq + Send + 'static,
82    P: PublicKey,
83{
84    type Key = K;
85    type Subscriber = S;
86
87    fn fetch<F>(&mut self, fetch: F) -> Feedback
88    where
89        F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
90    {
91        self.send(Message::Fetch(vec![FetchKey::from(fetch.into())]))
92    }
93
94    fn fetch_all<F>(&mut self, fetches: Vec<F>) -> Feedback
95    where
96        F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
97    {
98        self.send(Message::Fetch(
99            fetches
100                .into_iter()
101                .map(|fetch| FetchKey::from(fetch.into()))
102                .collect(),
103        ))
104    }
105
106    fn retain(
107        &mut self,
108        predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
109    ) -> Feedback {
110        self.send(Message::Retain {
111            predicate: Box::new(predicate),
112        })
113    }
114}
115
116impl<K, S, P> TargetedResolver for Resolver<K, S, P>
117where
118    K: Span,
119    S: Clone + Eq + Send + 'static,
120    P: PublicKey,
121{
122    type PublicKey = P;
123
124    fn fetch_targeted(
125        &mut self,
126        fetch: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
127        _targets: NonEmptyVec<Self::PublicKey>,
128    ) -> Feedback {
129        <Self as crate::Resolver>::fetch(self, fetch)
130    }
131
132    fn fetch_all_targeted<F>(&mut self, fetches: Vec<(F, NonEmptyVec<Self::PublicKey>)>) -> Feedback
133    where
134        F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
135    {
136        <Self as crate::Resolver>::fetch_all(
137            self,
138            fetches.into_iter().map(|(fetch, _)| fetch).collect(),
139        )
140    }
141}
142
143impl<K, S, P> Resolver<K, S, P>
144where
145    K: Span,
146    S: Clone + Eq + Send + 'static,
147    P: PublicKey,
148{
149    const fn new(mailbox: mailbox::Sender<Message<K, S>>) -> Self {
150        Self {
151            mailbox,
152            _peer: PhantomData,
153        }
154    }
155
156    fn send(&self, message: Message<K, S>) -> Feedback {
157        self.mailbox.enqueue(message)
158    }
159}
160
161/// Spawn an opaque-fetcher resolver actor.
162pub fn init<E, F, Con, P>(
163    context: E,
164    fetcher: F,
165    consumer: Con,
166    mailbox_size: NonZeroUsize,
167    fetch_retry_timeout: Duration,
168) -> Resolver<F::Key, Con::Subscriber, P>
169where
170    E: Clock + Spawner + Metrics,
171    F: Fetcher + Clone + Send + 'static,
172    F::Value: Clone + Send + 'static,
173    Con: Consumer<Key = F::Key, Value = F::Value>,
174    Con::Subscriber: Ord,
175    P: PublicKey,
176{
177    let (mailbox_tx, mailbox_rx) = mailbox::new(context.child("mailbox"), mailbox_size);
178    Actor::new(
179        context.child("actor"),
180        fetcher,
181        mailbox_rx,
182        consumer,
183        fetch_retry_timeout,
184    )
185    .start();
186    Resolver::new(mailbox_tx)
187}
188
189/// Actor that coalesces opaque fetches, retries failures, and delivers accepted values.
190struct Actor<E, F, Con>
191where
192    E: Clock + Spawner,
193    F: Fetcher,
194    F::Value: Clone + Send + 'static,
195    Con: Consumer<Key = F::Key, Value = F::Value>,
196    Con::Subscriber: Ord,
197{
198    context: ContextCell<E>,
199    fetcher: F,
200    mailbox: mailbox::Receiver<Message<F::Key, Con::Subscriber>>,
201    fetches: AbortablePool<FetchCompletion<F::Key, F::Value>>,
202    deliveries: DeliveryTracker<Con, u64>,
203    requests: BTreeMap<F::Key, Attempt>,
204    subscribers: subscribers::Tracker<F::Key, Con::Subscriber>,
205    retry_schedule: BTreeSet<(SystemTime, F::Key)>,
206    fetch_retry_timeout: Duration,
207    next_id: u64,
208}
209
210enum Attempt {
211    /// Fetch future is active for this key.
212    Fetching { id: u64, _aborter: Aborter },
213
214    /// Consumer validation is active for this key.
215    Delivering { id: u64 },
216
217    /// Fetch is sleeping until the retry deadline.
218    Scheduled(SystemTime),
219}
220
221struct FetchCompletion<K, V> {
222    key: K,
223    id: u64,
224    value: Option<V>,
225}
226
227impl<E, F, Con> Actor<E, F, Con>
228where
229    E: Clock + Spawner,
230    F: Fetcher + Clone + Send + 'static,
231    F::Value: Clone + Send + 'static,
232    Con: Consumer<Key = F::Key, Value = F::Value>,
233    Con::Subscriber: Ord,
234{
235    fn new(
236        context: E,
237        fetcher: F,
238        mailbox: mailbox::Receiver<Message<F::Key, Con::Subscriber>>,
239        consumer: Con,
240        fetch_retry_timeout: Duration,
241    ) -> Self {
242        Self {
243            context: ContextCell::new(context),
244            fetcher,
245            mailbox,
246            fetches: AbortablePool::default(),
247            deliveries: DeliveryTracker::new(consumer),
248            requests: BTreeMap::new(),
249            subscribers: subscribers::Tracker::new(),
250            retry_schedule: BTreeSet::new(),
251            fetch_retry_timeout,
252            next_id: 0,
253        }
254    }
255
256    fn start(mut self) -> Handle<()> {
257        spawn_cell!(self.context, self.run())
258    }
259
260    async fn run(mut self) {
261        select_loop! {
262            self.context,
263            on_stopped => {},
264            Ok(result) = self.fetches.next_completed() else continue => {
265                self.handle_fetch_completed(result);
266            },
267            delivery = self.deliveries.next_completion() => {
268                let delivery = match delivery {
269                    Ok(delivery) => delivery,
270                    Err(_) => continue,
271                };
272                self.handle_delivery_completed(delivery);
273            },
274            _ = match self.retry_schedule.first() {
275                Some((deadline, _)) => Either::Left(self.context.sleep_until(*deadline)),
276                None => Either::Right(future::pending()),
277            } => {
278                self.process_retries();
279            },
280            Some(message) = self.mailbox.recv() else break => {
281                self.handle_message(message);
282            },
283        }
284    }
285
286    /// Apply a mailbox message to active resolver state.
287    fn handle_message(&mut self, message: Message<F::Key, Con::Subscriber>) {
288        match message {
289            Message::Fetch(fetches) => {
290                for fetch in fetches {
291                    self.add_fetch(fetch);
292                }
293            }
294            Message::Retain { predicate } => self.retain(predicate),
295        }
296    }
297
298    /// Add subscribers for a key and start the first fetch if needed.
299    fn add_fetch(&mut self, fetch: FetchKey<F::Key, Con::Subscriber>) {
300        let FetchKey {
301            key, subscribers, ..
302        } = fetch;
303        let is_new = self.subscribers.insert(key.clone(), subscribers);
304
305        if is_new {
306            assert!(self.deliveries.insert(key.clone()), "delivery entry");
307            self.requests
308                .insert(key.clone(), Attempt::Scheduled(self.context.current()));
309            self.start_fetch(key);
310        }
311    }
312
313    /// Prune subscribers, deliveries, active fetches, and scheduled retries.
314    fn retain(&mut self, predicate: ingress::Predicate<F::Key, Con::Subscriber>) {
315        for key in self
316            .subscribers
317            .retain(|key, subscriber| predicate(key, subscriber))
318        {
319            self.deliveries.remove(&key);
320            if let Some(attempt) = self.requests.remove(&key) {
321                match attempt {
322                    Attempt::Fetching { .. } | Attempt::Delivering { .. } => {}
323                    Attempt::Scheduled(deadline) => {
324                        self.retry_schedule.remove(&(deadline, key));
325                    }
326                }
327            }
328        }
329    }
330
331    /// Spawn one fetch attempt for `key`.
332    fn start_fetch(&mut self, key: F::Key) {
333        let id = self.next_id;
334        self.next_id = self.next_id.wrapping_add(1);
335        let future = Self::fetch(key.clone(), id, self.fetcher.clone());
336        let aborter = self.fetches.push(future);
337        self.requests.insert(
338            key,
339            Attempt::Fetching {
340                id,
341                _aborter: aborter,
342            },
343        );
344    }
345
346    /// Deliver a fetched value to currently retained subscribers.
347    fn start_delivery(
348        &mut self,
349        key: F::Key,
350        value: F::Value,
351        delivered: NonEmptyVec<Con::Subscriber>,
352    ) {
353        let id = self.next_id;
354        self.next_id = self.next_id.wrapping_add(1);
355        self.deliveries.deliver(
356            Delivery {
357                key: key.clone(),
358                subscribers: delivered,
359            },
360            id,
361            value,
362        );
363        self.requests.insert(key, Attempt::Delivering { id });
364    }
365
366    /// Deliver an already accepted response to subscribers that arrived later.
367    fn redeliver(&mut self, key: F::Key, delivered: NonEmptyVec<Con::Subscriber>) {
368        self.deliveries.redeliver(Delivery {
369            key,
370            subscribers: delivered,
371        });
372    }
373
374    /// Handle a completed fetch future if it is still the active attempt.
375    fn handle_fetch_completed(&mut self, completion: FetchCompletion<F::Key, F::Value>) {
376        let FetchCompletion { key, id, value } = completion;
377        if !self.current_fetch(&key, id) {
378            return;
379        }
380        self.handle_fetched(key, value);
381    }
382
383    /// Handle a completed consumer delivery if it is still the active attempt.
384    fn handle_delivery_completed(
385        &mut self,
386        completion: DeliveryCompletion<F::Key, Con::Subscriber, u64>,
387    ) {
388        let DeliveryCompletion {
389            context: id,
390            delivery,
391            valid,
392        } = completion;
393        let Delivery {
394            key,
395            subscribers: delivered,
396        } = delivery;
397        if !self.current_delivery(&key, id) {
398            return;
399        }
400        self.handle_delivered(key, delivered, valid);
401    }
402
403    /// Return whether a fetch completion matches the current attempt id.
404    fn current_fetch(&self, key: &F::Key, id: u64) -> bool {
405        let Some(attempt) = self.requests.get(key) else {
406            trace!(?key, id, "ignoring stale fetch completion");
407            return false;
408        };
409        match attempt {
410            Attempt::Fetching { id: active_id, .. } if *active_id == id => true,
411            Attempt::Fetching { id: active_id, .. } => {
412                trace!(
413                    ?key,
414                    completed_id = id,
415                    active_id,
416                    "ignoring replaced fetch completion",
417                );
418                false
419            }
420            Attempt::Delivering { id: active_id } => {
421                trace!(
422                    ?key,
423                    completed_id = id,
424                    active_id,
425                    "ignoring fetch completion for delivery attempt",
426                );
427                false
428            }
429            Attempt::Scheduled(deadline) => {
430                trace!(?key, id, ?deadline, "ignoring scheduled fetch completion");
431                false
432            }
433        }
434    }
435
436    /// Return whether a delivery completion matches the current attempt id.
437    fn current_delivery(&self, key: &F::Key, id: u64) -> bool {
438        let Some(attempt) = self.requests.get(key) else {
439            trace!(?key, id, "ignoring stale delivery completion");
440            return false;
441        };
442        match attempt {
443            Attempt::Delivering { id: active_id } if *active_id == id => true,
444            Attempt::Delivering { id: active_id } => {
445                trace!(
446                    ?key,
447                    completed_id = id,
448                    active_id,
449                    "ignoring replaced delivery completion",
450                );
451                false
452            }
453            Attempt::Fetching { id: active_id, .. } => {
454                trace!(
455                    ?key,
456                    completed_id = id,
457                    active_id,
458                    "ignoring delivery completion for fetch attempt",
459                );
460                false
461            }
462            Attempt::Scheduled(deadline) => {
463                trace!(
464                    ?key,
465                    id,
466                    ?deadline,
467                    "ignoring scheduled delivery completion"
468                );
469                false
470            }
471        }
472    }
473
474    /// Deliver fetched values or schedule a retry when the source had no data.
475    fn handle_fetched(&mut self, key: F::Key, value: Option<F::Value>) {
476        match value {
477            None => self.schedule_retry(key),
478            Some(value) => {
479                if let Some(subscribers) = self.subscribers.pending(&key) {
480                    self.start_delivery(key, value, subscribers);
481                } else {
482                    self.requests.remove(&key);
483                    self.subscribers.remove(&key);
484                    self.deliveries.remove(&key);
485                }
486            }
487        }
488    }
489
490    /// Complete, redeliver, or retry a key after consumer validation.
491    fn handle_delivered(
492        &mut self,
493        key: F::Key,
494        delivered: NonEmptyVec<Con::Subscriber>,
495        valid: bool,
496    ) {
497        let accepted = self.deliveries.response_accepted(&key);
498
499        if valid {
500            let remaining = self.subscribers.remove_delivered(&key, delivered);
501
502            // The first accepted response is reused for subscribers that joined
503            // while validation was pending, avoiding a duplicate source fetch
504            // for the same key.
505            if let Some(subscribers) = remaining {
506                if !accepted {
507                    self.deliveries.accept_response(&key);
508                }
509                self.redeliver(key, subscribers);
510            } else {
511                self.requests.remove(&key);
512                self.subscribers.remove(&key);
513                self.deliveries.remove(&key);
514            }
515            return;
516        }
517
518        // A cached response already satisfied at least one subscriber. Treat a
519        // later rejection during redelivery as stale application feedback rather
520        // than re-fetching data that was accepted once.
521        if accepted {
522            warn!(
523                ?key,
524                "previously accepted resolver response rejected during opaque redelivery"
525            );
526            self.requests.remove(&key);
527            self.subscribers.remove(&key);
528            self.deliveries.remove(&key);
529            return;
530        }
531
532        warn!(?key, "consumer rejected opaque resolver delivery");
533        self.deliveries.discard_response(&key);
534        self.schedule_retry(key);
535    }
536
537    /// Schedule the next fetch attempt for `key`.
538    fn schedule_retry(&mut self, key: F::Key) {
539        let deadline = self.context.current() + self.fetch_retry_timeout;
540        let Some(attempt) = self.requests.get_mut(&key) else {
541            return;
542        };
543        *attempt = Attempt::Scheduled(deadline);
544        debug!(?key, ?deadline, "scheduled opaque resolver retry");
545        self.retry_schedule.insert((deadline, key));
546    }
547
548    /// Start all retry attempts whose deadlines have passed.
549    fn process_retries(&mut self) {
550        let now = self.context.current();
551        while let Some((deadline, key)) = self.retry_schedule.pop_first() {
552            if deadline > now {
553                self.retry_schedule.insert((deadline, key));
554                break;
555            }
556
557            let Some(state) = self.requests.get(&key) else {
558                continue;
559            };
560            match state {
561                Attempt::Scheduled(state_deadline) if *state_deadline == deadline => {
562                    debug!(?key, "retrying opaque resolver fetch");
563                    self.start_fetch(key);
564                }
565                Attempt::Scheduled(_) | Attempt::Fetching { .. } | Attempt::Delivering { .. } => {}
566            }
567        }
568    }
569
570    /// Run the user-supplied fetcher and preserve the attempt id.
571    async fn fetch(key: F::Key, id: u64, fetcher: F) -> FetchCompletion<F::Key, F::Value> {
572        let value = fetcher.fetch(key.clone()).await;
573        FetchCompletion { key, id, value }
574    }
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580    use crate::Resolver as _;
581    use bytes::Bytes;
582    use commonware_cryptography::{
583        ed25519::{PrivateKey, PublicKey},
584        Signer,
585    };
586    use commonware_runtime::{deterministic, deterministic::Runner, Runner as _, Supervisor as _};
587    use commonware_utils::{channel::oneshot, non_empty_vec, sync::Mutex};
588    use std::{
589        collections::{HashMap, VecDeque},
590        sync::{
591            atomic::{AtomicU32, Ordering},
592            Arc,
593        },
594    };
595
596    const RETRY_TIMEOUT: Duration = Duration::from_millis(100);
597
598    #[derive(Clone, Default)]
599    struct MockFetcher {
600        responses: Arc<Mutex<HashMap<u8, VecDeque<Option<Bytes>>>>>,
601        calls: Arc<AtomicU32>,
602    }
603
604    impl MockFetcher {
605        fn push(&self, key: u8, response: Option<Bytes>) {
606            self.responses
607                .lock()
608                .entry(key)
609                .or_default()
610                .push_back(response);
611        }
612
613        fn calls(&self) -> u32 {
614            self.calls.load(Ordering::Relaxed)
615        }
616    }
617
618    impl Fetcher for MockFetcher {
619        type Key = u8;
620        type Value = Bytes;
621
622        fn fetch(&self, key: Self::Key) -> impl Future<Output = Option<Self::Value>> + Send {
623            let responses = self.responses.clone();
624            let calls = self.calls.clone();
625            async move {
626                calls.fetch_add(1, Ordering::Relaxed);
627                responses
628                    .lock()
629                    .get_mut(&key)
630                    .and_then(VecDeque::pop_front)
631                    .flatten()
632            }
633        }
634    }
635
636    #[derive(Clone)]
637    struct BlockingFetcher {
638        started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
639        response: Arc<Mutex<Option<oneshot::Receiver<Option<Bytes>>>>>,
640    }
641
642    impl BlockingFetcher {
643        fn new() -> (Self, oneshot::Receiver<()>, oneshot::Sender<Option<Bytes>>) {
644            let (started_tx, started_rx) = oneshot::channel();
645            let (response_tx, response_rx) = oneshot::channel();
646            (
647                Self {
648                    started: Arc::new(Mutex::new(Some(started_tx))),
649                    response: Arc::new(Mutex::new(Some(response_rx))),
650                },
651                started_rx,
652                response_tx,
653            )
654        }
655    }
656
657    impl Fetcher for BlockingFetcher {
658        type Key = u8;
659        type Value = Bytes;
660
661        fn fetch(&self, _key: Self::Key) -> impl Future<Output = Option<Self::Value>> + Send {
662            let started = self.started.clone();
663            let response = self.response.clone();
664            async move {
665                if let Some(started) = started.lock().take() {
666                    let _ = started.send(());
667                }
668                let response = response.lock().take().expect("missing response");
669                response.await.unwrap_or(None)
670            }
671        }
672    }
673
674    struct CapturedDelivery {
675        delivery: Delivery<u8, u16>,
676        value: Bytes,
677        response: oneshot::Sender<bool>,
678    }
679
680    #[derive(Clone, Default)]
681    struct MockConsumer {
682        deliveries: Arc<Mutex<VecDeque<CapturedDelivery>>>,
683    }
684
685    impl MockConsumer {
686        fn pop(&self) -> Option<CapturedDelivery> {
687            self.deliveries.lock().pop_front()
688        }
689
690        fn len(&self) -> usize {
691            self.deliveries.lock().len()
692        }
693    }
694
695    impl Consumer for MockConsumer {
696        type Key = u8;
697        type Value = Bytes;
698        type Subscriber = u16;
699
700        fn deliver(
701            &mut self,
702            delivery: Delivery<Self::Key, Self::Subscriber>,
703            value: Self::Value,
704        ) -> oneshot::Receiver<bool> {
705            let (response, receiver) = oneshot::channel();
706            self.deliveries.lock().push_back(CapturedDelivery {
707                delivery,
708                value,
709                response,
710            });
711            receiver
712        }
713    }
714
715    fn start_resolver<F>(
716        context: deterministic::Context,
717        fetcher: F,
718        consumer: MockConsumer,
719    ) -> Resolver<u8, u16, PublicKey>
720    where
721        F: Fetcher<Key = u8, Value = Bytes> + Clone + Send + 'static,
722    {
723        init(
724            context,
725            fetcher,
726            consumer,
727            NonZeroUsize::new(16).unwrap(),
728            RETRY_TIMEOUT,
729        )
730    }
731
732    async fn wait_for_delivery(
733        context: &deterministic::Context,
734        consumer: &MockConsumer,
735    ) -> CapturedDelivery {
736        for _ in 0..50 {
737            if let Some(delivery) = consumer.pop() {
738                return delivery;
739            }
740            context.sleep(Duration::from_millis(10)).await;
741        }
742        panic!("timed out waiting for delivery");
743    }
744
745    #[test]
746    fn fetch_during_validation_reuses_response_after_success() {
747        Runner::default().start(|context| async move {
748            let fetcher = MockFetcher::default();
749            fetcher.push(1, Some(Bytes::from_static(b"value")));
750            let consumer = MockConsumer::default();
751            let mut resolver =
752                start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
753
754            assert!(resolver
755                .fetch(Fetch {
756                    key: 1,
757                    subscriber: 10
758                })
759                .accepted());
760            let first = wait_for_delivery(&context, &consumer).await;
761            assert_eq!(first.value, Bytes::from_static(b"value"));
762
763            assert!(resolver
764                .fetch(Fetch {
765                    key: 1,
766                    subscriber: 11
767                })
768                .accepted());
769            context.sleep(Duration::from_millis(10)).await;
770            first.response.send(true).expect("response dropped");
771
772            let second = wait_for_delivery(&context, &consumer).await;
773            assert_eq!(second.value, Bytes::from_static(b"value"));
774            assert_eq!(second.delivery.subscribers, non_empty_vec![11]);
775            second.response.send(true).expect("response dropped");
776
777            context.sleep(Duration::from_millis(10)).await;
778            assert_eq!(fetcher.calls(), 1);
779        });
780    }
781
782    #[test]
783    fn missing_fetch_retries_until_value_is_available() {
784        Runner::default().start(|context| async move {
785            let fetcher = MockFetcher::default();
786            fetcher.push(1, None);
787            fetcher.push(1, Some(Bytes::from_static(b"value")));
788            let consumer = MockConsumer::default();
789            let mut resolver =
790                start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
791
792            assert!(resolver
793                .fetch(Fetch {
794                    key: 1,
795                    subscriber: 10
796                })
797                .accepted());
798            context
799                .sleep(RETRY_TIMEOUT + Duration::from_millis(10))
800                .await;
801
802            let delivery = wait_for_delivery(&context, &consumer).await;
803            assert_eq!(delivery.value, Bytes::from_static(b"value"));
804            delivery.response.send(true).expect("response dropped");
805            assert_eq!(fetcher.calls(), 2);
806        });
807    }
808
809    #[test]
810    fn accepted_redelivery_rejection_does_not_refetch() {
811        Runner::default().start(|context| async move {
812            let fetcher = MockFetcher::default();
813            fetcher.push(1, Some(Bytes::from_static(b"value")));
814            let consumer = MockConsumer::default();
815            let mut resolver =
816                start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
817
818            assert!(resolver
819                .fetch(Fetch {
820                    key: 1,
821                    subscriber: 10
822                })
823                .accepted());
824            let first = wait_for_delivery(&context, &consumer).await;
825
826            assert!(resolver
827                .fetch(Fetch {
828                    key: 1,
829                    subscriber: 11
830                })
831                .accepted());
832            context.sleep(Duration::from_millis(10)).await;
833            first.response.send(true).expect("response dropped");
834
835            let second = wait_for_delivery(&context, &consumer).await;
836            second.response.send(false).expect("response dropped");
837
838            context
839                .sleep(RETRY_TIMEOUT + Duration::from_millis(10))
840                .await;
841            assert_eq!(fetcher.calls(), 1);
842            assert_eq!(consumer.len(), 0);
843        });
844    }
845
846    #[test]
847    fn retain_prunes_active_fetch_subscribers() {
848        Runner::default().start(|context| async move {
849            let (fetcher, started, response) = BlockingFetcher::new();
850            let consumer = MockConsumer::default();
851            let mut resolver = start_resolver(context.child("resolver"), fetcher, consumer.clone());
852
853            assert!(resolver
854                .fetch(Fetch {
855                    key: 1,
856                    subscriber: 10
857                })
858                .accepted());
859            assert!(resolver
860                .fetch(Fetch {
861                    key: 1,
862                    subscriber: 11
863                })
864                .accepted());
865            started.await.expect("fetch did not start");
866            assert!(resolver
867                .retain(|_, subscriber| *subscriber == 11)
868                .accepted());
869            context.sleep(Duration::from_millis(10)).await;
870            response
871                .send(Some(Bytes::from_static(b"value")))
872                .expect("fetcher dropped");
873
874            let delivery = wait_for_delivery(&context, &consumer).await;
875            assert_eq!(delivery.delivery.subscribers, non_empty_vec![11]);
876            delivery.response.send(true).expect("response dropped");
877        });
878    }
879
880    #[test]
881    fn retain_drops_last_subscriber_aborts_active_fetch() {
882        Runner::default().start(|context| async move {
883            let (fetcher, started, response) = BlockingFetcher::new();
884            let consumer = MockConsumer::default();
885            let mut resolver = start_resolver(context.child("resolver"), fetcher, consumer.clone());
886
887            assert!(resolver
888                .fetch(Fetch {
889                    key: 1,
890                    subscriber: 10
891                })
892                .accepted());
893            started.await.expect("fetch did not start");
894            assert!(resolver.retain(|_, _| false).accepted());
895            context.sleep(Duration::from_millis(10)).await;
896
897            assert!(
898                response.send(Some(Bytes::from_static(b"value"))).is_err(),
899                "fetch future should be aborted after its last subscriber is pruned"
900            );
901            context
902                .sleep(RETRY_TIMEOUT + Duration::from_millis(10))
903                .await;
904            assert_eq!(consumer.len(), 0);
905        });
906    }
907
908    #[test]
909    fn retain_drops_last_subscriber_aborts_active_delivery() {
910        Runner::default().start(|context| async move {
911            let fetcher = MockFetcher::default();
912            fetcher.push(1, Some(Bytes::from_static(b"value")));
913            let consumer = MockConsumer::default();
914            let mut resolver =
915                start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
916
917            assert!(resolver
918                .fetch(Fetch {
919                    key: 1,
920                    subscriber: 10
921                })
922                .accepted());
923            let delivery = wait_for_delivery(&context, &consumer).await;
924            assert!(resolver.retain(|_, _| false).accepted());
925            context.sleep(Duration::from_millis(10)).await;
926
927            assert!(
928                delivery.response.send(false).is_err(),
929                "delivery future should be aborted after its last subscriber is pruned"
930            );
931            context
932                .sleep(RETRY_TIMEOUT + Duration::from_millis(10))
933                .await;
934            assert_eq!(fetcher.calls(), 1);
935            assert_eq!(consumer.len(), 0);
936        });
937    }
938
939    #[test]
940    fn targeted_fetch_uses_same_opaque_fetch_path() {
941        Runner::default().start(|context| async move {
942            let fetcher = MockFetcher::default();
943            fetcher.push(1, Some(Bytes::from_static(b"value")));
944            let consumer = MockConsumer::default();
945            let mut resolver =
946                start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
947            let target = PrivateKey::from_seed(0).public_key();
948
949            assert!(resolver
950                .fetch_targeted(
951                    Fetch {
952                        key: 1,
953                        subscriber: 10
954                    },
955                    non_empty_vec![target]
956                )
957                .accepted());
958            let delivery = wait_for_delivery(&context, &consumer).await;
959            assert_eq!(delivery.value, Bytes::from_static(b"value"));
960            delivery.response.send(true).expect("response dropped");
961            assert_eq!(fetcher.calls(), 1);
962        });
963    }
964}