1use crate::{
14 delivery::{Completion as DeliveryCompletion, Tracker as DeliveryTracker},
15 ingress::{self, FetchKey, Message},
16 subscribers, Consumer, Delivery, Fetch, TargetedResolver,
17};
18use commonware_actor::{mailbox, Feedback};
19use commonware_cryptography::PublicKey;
20use commonware_macros::select_loop;
21use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner};
22use commonware_utils::{
23 futures::{AbortablePool, Aborter},
24 vec::NonEmptyVec,
25 Span,
26};
27use futures::future::{self, Either};
28use std::{
29 collections::{BTreeMap, BTreeSet},
30 future::Future,
31 marker::PhantomData,
32 num::NonZeroUsize,
33 time::{Duration, SystemTime},
34};
35use tracing::{debug, trace, warn};
36
37pub trait Fetcher {
39 type Key: Span;
41
42 type Value;
44
45 fn fetch(&self, key: Self::Key) -> impl Future<Output = Option<Self::Value>> + Send;
51}
52
53pub struct Resolver<K, S, P>
55where
56 K: Span,
57 S: Clone + Eq + Send + 'static,
58 P: PublicKey,
59{
60 mailbox: mailbox::Sender<Message<K, S>>,
61 _peer: PhantomData<P>,
62}
63
64impl<K, S, P> Clone for Resolver<K, S, P>
65where
66 K: Span,
67 S: Clone + Eq + Send + 'static,
68 P: PublicKey,
69{
70 fn clone(&self) -> Self {
71 Self {
72 mailbox: self.mailbox.clone(),
73 _peer: PhantomData,
74 }
75 }
76}
77
78impl<K, S, P> crate::Resolver for Resolver<K, S, P>
79where
80 K: Span,
81 S: Clone + Eq + Send + 'static,
82 P: PublicKey,
83{
84 type Key = K;
85 type Subscriber = S;
86
87 fn fetch<F>(&mut self, fetch: F) -> Feedback
88 where
89 F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
90 {
91 self.send(Message::Fetch(vec![FetchKey::from(fetch.into())]))
92 }
93
94 fn fetch_all<F>(&mut self, fetches: Vec<F>) -> Feedback
95 where
96 F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
97 {
98 self.send(Message::Fetch(
99 fetches
100 .into_iter()
101 .map(|fetch| FetchKey::from(fetch.into()))
102 .collect(),
103 ))
104 }
105
106 fn retain(
107 &mut self,
108 predicate: impl Fn(&Self::Key, &Self::Subscriber) -> bool + Send + 'static,
109 ) -> Feedback {
110 self.send(Message::Retain {
111 predicate: Box::new(predicate),
112 })
113 }
114}
115
116impl<K, S, P> TargetedResolver for Resolver<K, S, P>
117where
118 K: Span,
119 S: Clone + Eq + Send + 'static,
120 P: PublicKey,
121{
122 type PublicKey = P;
123
124 fn fetch_targeted(
125 &mut self,
126 fetch: impl Into<Fetch<Self::Key, Self::Subscriber>> + Send,
127 _targets: NonEmptyVec<Self::PublicKey>,
128 ) -> Feedback {
129 <Self as crate::Resolver>::fetch(self, fetch)
130 }
131
132 fn fetch_all_targeted<F>(&mut self, fetches: Vec<(F, NonEmptyVec<Self::PublicKey>)>) -> Feedback
133 where
134 F: Into<Fetch<Self::Key, Self::Subscriber>> + Send,
135 {
136 <Self as crate::Resolver>::fetch_all(
137 self,
138 fetches.into_iter().map(|(fetch, _)| fetch).collect(),
139 )
140 }
141}
142
143impl<K, S, P> Resolver<K, S, P>
144where
145 K: Span,
146 S: Clone + Eq + Send + 'static,
147 P: PublicKey,
148{
149 const fn new(mailbox: mailbox::Sender<Message<K, S>>) -> Self {
150 Self {
151 mailbox,
152 _peer: PhantomData,
153 }
154 }
155
156 fn send(&self, message: Message<K, S>) -> Feedback {
157 self.mailbox.enqueue(message)
158 }
159}
160
161pub fn init<E, F, Con, P>(
163 context: E,
164 fetcher: F,
165 consumer: Con,
166 mailbox_size: NonZeroUsize,
167 fetch_retry_timeout: Duration,
168) -> Resolver<F::Key, Con::Subscriber, P>
169where
170 E: Clock + Spawner + Metrics,
171 F: Fetcher + Clone + Send + 'static,
172 F::Value: Clone + Send + 'static,
173 Con: Consumer<Key = F::Key, Value = F::Value>,
174 Con::Subscriber: Ord,
175 P: PublicKey,
176{
177 let (mailbox_tx, mailbox_rx) = mailbox::new(context.child("mailbox"), mailbox_size);
178 Actor::new(
179 context.child("actor"),
180 fetcher,
181 mailbox_rx,
182 consumer,
183 fetch_retry_timeout,
184 )
185 .start();
186 Resolver::new(mailbox_tx)
187}
188
189struct Actor<E, F, Con>
191where
192 E: Clock + Spawner,
193 F: Fetcher,
194 F::Value: Clone + Send + 'static,
195 Con: Consumer<Key = F::Key, Value = F::Value>,
196 Con::Subscriber: Ord,
197{
198 context: ContextCell<E>,
199 fetcher: F,
200 mailbox: mailbox::Receiver<Message<F::Key, Con::Subscriber>>,
201 fetches: AbortablePool<FetchCompletion<F::Key, F::Value>>,
202 deliveries: DeliveryTracker<Con, u64>,
203 requests: BTreeMap<F::Key, Attempt>,
204 subscribers: subscribers::Tracker<F::Key, Con::Subscriber>,
205 retry_schedule: BTreeSet<(SystemTime, F::Key)>,
206 fetch_retry_timeout: Duration,
207 next_id: u64,
208}
209
210enum Attempt {
211 Fetching { id: u64, _aborter: Aborter },
213
214 Delivering { id: u64 },
216
217 Scheduled(SystemTime),
219}
220
221struct FetchCompletion<K, V> {
222 key: K,
223 id: u64,
224 value: Option<V>,
225}
226
227impl<E, F, Con> Actor<E, F, Con>
228where
229 E: Clock + Spawner,
230 F: Fetcher + Clone + Send + 'static,
231 F::Value: Clone + Send + 'static,
232 Con: Consumer<Key = F::Key, Value = F::Value>,
233 Con::Subscriber: Ord,
234{
235 fn new(
236 context: E,
237 fetcher: F,
238 mailbox: mailbox::Receiver<Message<F::Key, Con::Subscriber>>,
239 consumer: Con,
240 fetch_retry_timeout: Duration,
241 ) -> Self {
242 Self {
243 context: ContextCell::new(context),
244 fetcher,
245 mailbox,
246 fetches: AbortablePool::default(),
247 deliveries: DeliveryTracker::new(consumer),
248 requests: BTreeMap::new(),
249 subscribers: subscribers::Tracker::new(),
250 retry_schedule: BTreeSet::new(),
251 fetch_retry_timeout,
252 next_id: 0,
253 }
254 }
255
256 fn start(mut self) -> Handle<()> {
257 spawn_cell!(self.context, self.run())
258 }
259
260 async fn run(mut self) {
261 select_loop! {
262 self.context,
263 on_stopped => {},
264 Ok(result) = self.fetches.next_completed() else continue => {
265 self.handle_fetch_completed(result);
266 },
267 delivery = self.deliveries.next_completion() => {
268 let delivery = match delivery {
269 Ok(delivery) => delivery,
270 Err(_) => continue,
271 };
272 self.handle_delivery_completed(delivery);
273 },
274 _ = match self.retry_schedule.first() {
275 Some((deadline, _)) => Either::Left(self.context.sleep_until(*deadline)),
276 None => Either::Right(future::pending()),
277 } => {
278 self.process_retries();
279 },
280 Some(message) = self.mailbox.recv() else break => {
281 self.handle_message(message);
282 },
283 }
284 }
285
286 fn handle_message(&mut self, message: Message<F::Key, Con::Subscriber>) {
288 match message {
289 Message::Fetch(fetches) => {
290 for fetch in fetches {
291 self.add_fetch(fetch);
292 }
293 }
294 Message::Retain { predicate } => self.retain(predicate),
295 }
296 }
297
298 fn add_fetch(&mut self, fetch: FetchKey<F::Key, Con::Subscriber>) {
300 let FetchKey {
301 key, subscribers, ..
302 } = fetch;
303 let is_new = self.subscribers.insert(key.clone(), subscribers);
304
305 if is_new {
306 assert!(self.deliveries.insert(key.clone()), "delivery entry");
307 self.requests
308 .insert(key.clone(), Attempt::Scheduled(self.context.current()));
309 self.start_fetch(key);
310 }
311 }
312
313 fn retain(&mut self, predicate: ingress::Predicate<F::Key, Con::Subscriber>) {
315 for key in self
316 .subscribers
317 .retain(|key, subscriber| predicate(key, subscriber))
318 {
319 self.deliveries.remove(&key);
320 if let Some(attempt) = self.requests.remove(&key) {
321 match attempt {
322 Attempt::Fetching { .. } | Attempt::Delivering { .. } => {}
323 Attempt::Scheduled(deadline) => {
324 self.retry_schedule.remove(&(deadline, key));
325 }
326 }
327 }
328 }
329 }
330
331 fn start_fetch(&mut self, key: F::Key) {
333 let id = self.next_id;
334 self.next_id = self.next_id.wrapping_add(1);
335 let future = Self::fetch(key.clone(), id, self.fetcher.clone());
336 let aborter = self.fetches.push(future);
337 self.requests.insert(
338 key,
339 Attempt::Fetching {
340 id,
341 _aborter: aborter,
342 },
343 );
344 }
345
346 fn start_delivery(
348 &mut self,
349 key: F::Key,
350 value: F::Value,
351 delivered: NonEmptyVec<Con::Subscriber>,
352 ) {
353 let id = self.next_id;
354 self.next_id = self.next_id.wrapping_add(1);
355 self.deliveries.deliver(
356 Delivery {
357 key: key.clone(),
358 subscribers: delivered,
359 },
360 id,
361 value,
362 );
363 self.requests.insert(key, Attempt::Delivering { id });
364 }
365
366 fn redeliver(&mut self, key: F::Key, delivered: NonEmptyVec<Con::Subscriber>) {
368 self.deliveries.redeliver(Delivery {
369 key,
370 subscribers: delivered,
371 });
372 }
373
374 fn handle_fetch_completed(&mut self, completion: FetchCompletion<F::Key, F::Value>) {
376 let FetchCompletion { key, id, value } = completion;
377 if !self.current_fetch(&key, id) {
378 return;
379 }
380 self.handle_fetched(key, value);
381 }
382
383 fn handle_delivery_completed(
385 &mut self,
386 completion: DeliveryCompletion<F::Key, Con::Subscriber, u64>,
387 ) {
388 let DeliveryCompletion {
389 context: id,
390 delivery,
391 valid,
392 } = completion;
393 let Delivery {
394 key,
395 subscribers: delivered,
396 } = delivery;
397 if !self.current_delivery(&key, id) {
398 return;
399 }
400 self.handle_delivered(key, delivered, valid);
401 }
402
403 fn current_fetch(&self, key: &F::Key, id: u64) -> bool {
405 let Some(attempt) = self.requests.get(key) else {
406 trace!(?key, id, "ignoring stale fetch completion");
407 return false;
408 };
409 match attempt {
410 Attempt::Fetching { id: active_id, .. } if *active_id == id => true,
411 Attempt::Fetching { id: active_id, .. } => {
412 trace!(
413 ?key,
414 completed_id = id,
415 active_id,
416 "ignoring replaced fetch completion",
417 );
418 false
419 }
420 Attempt::Delivering { id: active_id } => {
421 trace!(
422 ?key,
423 completed_id = id,
424 active_id,
425 "ignoring fetch completion for delivery attempt",
426 );
427 false
428 }
429 Attempt::Scheduled(deadline) => {
430 trace!(?key, id, ?deadline, "ignoring scheduled fetch completion");
431 false
432 }
433 }
434 }
435
436 fn current_delivery(&self, key: &F::Key, id: u64) -> bool {
438 let Some(attempt) = self.requests.get(key) else {
439 trace!(?key, id, "ignoring stale delivery completion");
440 return false;
441 };
442 match attempt {
443 Attempt::Delivering { id: active_id } if *active_id == id => true,
444 Attempt::Delivering { id: active_id } => {
445 trace!(
446 ?key,
447 completed_id = id,
448 active_id,
449 "ignoring replaced delivery completion",
450 );
451 false
452 }
453 Attempt::Fetching { id: active_id, .. } => {
454 trace!(
455 ?key,
456 completed_id = id,
457 active_id,
458 "ignoring delivery completion for fetch attempt",
459 );
460 false
461 }
462 Attempt::Scheduled(deadline) => {
463 trace!(
464 ?key,
465 id,
466 ?deadline,
467 "ignoring scheduled delivery completion"
468 );
469 false
470 }
471 }
472 }
473
474 fn handle_fetched(&mut self, key: F::Key, value: Option<F::Value>) {
476 match value {
477 None => self.schedule_retry(key),
478 Some(value) => {
479 if let Some(subscribers) = self.subscribers.pending(&key) {
480 self.start_delivery(key, value, subscribers);
481 } else {
482 self.requests.remove(&key);
483 self.subscribers.remove(&key);
484 self.deliveries.remove(&key);
485 }
486 }
487 }
488 }
489
490 fn handle_delivered(
492 &mut self,
493 key: F::Key,
494 delivered: NonEmptyVec<Con::Subscriber>,
495 valid: bool,
496 ) {
497 let accepted = self.deliveries.response_accepted(&key);
498
499 if valid {
500 let remaining = self.subscribers.remove_delivered(&key, delivered);
501
502 if let Some(subscribers) = remaining {
506 if !accepted {
507 self.deliveries.accept_response(&key);
508 }
509 self.redeliver(key, subscribers);
510 } else {
511 self.requests.remove(&key);
512 self.subscribers.remove(&key);
513 self.deliveries.remove(&key);
514 }
515 return;
516 }
517
518 if accepted {
522 warn!(
523 ?key,
524 "previously accepted resolver response rejected during opaque redelivery"
525 );
526 self.requests.remove(&key);
527 self.subscribers.remove(&key);
528 self.deliveries.remove(&key);
529 return;
530 }
531
532 warn!(?key, "consumer rejected opaque resolver delivery");
533 self.deliveries.discard_response(&key);
534 self.schedule_retry(key);
535 }
536
537 fn schedule_retry(&mut self, key: F::Key) {
539 let deadline = self.context.current() + self.fetch_retry_timeout;
540 let Some(attempt) = self.requests.get_mut(&key) else {
541 return;
542 };
543 *attempt = Attempt::Scheduled(deadline);
544 debug!(?key, ?deadline, "scheduled opaque resolver retry");
545 self.retry_schedule.insert((deadline, key));
546 }
547
548 fn process_retries(&mut self) {
550 let now = self.context.current();
551 while let Some((deadline, key)) = self.retry_schedule.pop_first() {
552 if deadline > now {
553 self.retry_schedule.insert((deadline, key));
554 break;
555 }
556
557 let Some(state) = self.requests.get(&key) else {
558 continue;
559 };
560 match state {
561 Attempt::Scheduled(state_deadline) if *state_deadline == deadline => {
562 debug!(?key, "retrying opaque resolver fetch");
563 self.start_fetch(key);
564 }
565 Attempt::Scheduled(_) | Attempt::Fetching { .. } | Attempt::Delivering { .. } => {}
566 }
567 }
568 }
569
570 async fn fetch(key: F::Key, id: u64, fetcher: F) -> FetchCompletion<F::Key, F::Value> {
572 let value = fetcher.fetch(key.clone()).await;
573 FetchCompletion { key, id, value }
574 }
575}
576
577#[cfg(test)]
578mod tests {
579 use super::*;
580 use crate::Resolver as _;
581 use bytes::Bytes;
582 use commonware_cryptography::{
583 ed25519::{PrivateKey, PublicKey},
584 Signer,
585 };
586 use commonware_runtime::{deterministic, deterministic::Runner, Runner as _, Supervisor as _};
587 use commonware_utils::{channel::oneshot, non_empty_vec, sync::Mutex};
588 use std::{
589 collections::{HashMap, VecDeque},
590 sync::{
591 atomic::{AtomicU32, Ordering},
592 Arc,
593 },
594 };
595
596 const RETRY_TIMEOUT: Duration = Duration::from_millis(100);
597
598 #[derive(Clone, Default)]
599 struct MockFetcher {
600 responses: Arc<Mutex<HashMap<u8, VecDeque<Option<Bytes>>>>>,
601 calls: Arc<AtomicU32>,
602 }
603
604 impl MockFetcher {
605 fn push(&self, key: u8, response: Option<Bytes>) {
606 self.responses
607 .lock()
608 .entry(key)
609 .or_default()
610 .push_back(response);
611 }
612
613 fn calls(&self) -> u32 {
614 self.calls.load(Ordering::Relaxed)
615 }
616 }
617
618 impl Fetcher for MockFetcher {
619 type Key = u8;
620 type Value = Bytes;
621
622 fn fetch(&self, key: Self::Key) -> impl Future<Output = Option<Self::Value>> + Send {
623 let responses = self.responses.clone();
624 let calls = self.calls.clone();
625 async move {
626 calls.fetch_add(1, Ordering::Relaxed);
627 responses
628 .lock()
629 .get_mut(&key)
630 .and_then(VecDeque::pop_front)
631 .flatten()
632 }
633 }
634 }
635
636 #[derive(Clone)]
637 struct BlockingFetcher {
638 started: Arc<Mutex<Option<oneshot::Sender<()>>>>,
639 response: Arc<Mutex<Option<oneshot::Receiver<Option<Bytes>>>>>,
640 }
641
642 impl BlockingFetcher {
643 fn new() -> (Self, oneshot::Receiver<()>, oneshot::Sender<Option<Bytes>>) {
644 let (started_tx, started_rx) = oneshot::channel();
645 let (response_tx, response_rx) = oneshot::channel();
646 (
647 Self {
648 started: Arc::new(Mutex::new(Some(started_tx))),
649 response: Arc::new(Mutex::new(Some(response_rx))),
650 },
651 started_rx,
652 response_tx,
653 )
654 }
655 }
656
657 impl Fetcher for BlockingFetcher {
658 type Key = u8;
659 type Value = Bytes;
660
661 fn fetch(&self, _key: Self::Key) -> impl Future<Output = Option<Self::Value>> + Send {
662 let started = self.started.clone();
663 let response = self.response.clone();
664 async move {
665 if let Some(started) = started.lock().take() {
666 let _ = started.send(());
667 }
668 let response = response.lock().take().expect("missing response");
669 response.await.unwrap_or(None)
670 }
671 }
672 }
673
674 struct CapturedDelivery {
675 delivery: Delivery<u8, u16>,
676 value: Bytes,
677 response: oneshot::Sender<bool>,
678 }
679
680 #[derive(Clone, Default)]
681 struct MockConsumer {
682 deliveries: Arc<Mutex<VecDeque<CapturedDelivery>>>,
683 }
684
685 impl MockConsumer {
686 fn pop(&self) -> Option<CapturedDelivery> {
687 self.deliveries.lock().pop_front()
688 }
689
690 fn len(&self) -> usize {
691 self.deliveries.lock().len()
692 }
693 }
694
695 impl Consumer for MockConsumer {
696 type Key = u8;
697 type Value = Bytes;
698 type Subscriber = u16;
699
700 fn deliver(
701 &mut self,
702 delivery: Delivery<Self::Key, Self::Subscriber>,
703 value: Self::Value,
704 ) -> oneshot::Receiver<bool> {
705 let (response, receiver) = oneshot::channel();
706 self.deliveries.lock().push_back(CapturedDelivery {
707 delivery,
708 value,
709 response,
710 });
711 receiver
712 }
713 }
714
715 fn start_resolver<F>(
716 context: deterministic::Context,
717 fetcher: F,
718 consumer: MockConsumer,
719 ) -> Resolver<u8, u16, PublicKey>
720 where
721 F: Fetcher<Key = u8, Value = Bytes> + Clone + Send + 'static,
722 {
723 init(
724 context,
725 fetcher,
726 consumer,
727 NonZeroUsize::new(16).unwrap(),
728 RETRY_TIMEOUT,
729 )
730 }
731
732 async fn wait_for_delivery(
733 context: &deterministic::Context,
734 consumer: &MockConsumer,
735 ) -> CapturedDelivery {
736 for _ in 0..50 {
737 if let Some(delivery) = consumer.pop() {
738 return delivery;
739 }
740 context.sleep(Duration::from_millis(10)).await;
741 }
742 panic!("timed out waiting for delivery");
743 }
744
745 #[test]
746 fn fetch_during_validation_reuses_response_after_success() {
747 Runner::default().start(|context| async move {
748 let fetcher = MockFetcher::default();
749 fetcher.push(1, Some(Bytes::from_static(b"value")));
750 let consumer = MockConsumer::default();
751 let mut resolver =
752 start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
753
754 assert!(resolver
755 .fetch(Fetch {
756 key: 1,
757 subscriber: 10
758 })
759 .accepted());
760 let first = wait_for_delivery(&context, &consumer).await;
761 assert_eq!(first.value, Bytes::from_static(b"value"));
762
763 assert!(resolver
764 .fetch(Fetch {
765 key: 1,
766 subscriber: 11
767 })
768 .accepted());
769 context.sleep(Duration::from_millis(10)).await;
770 first.response.send(true).expect("response dropped");
771
772 let second = wait_for_delivery(&context, &consumer).await;
773 assert_eq!(second.value, Bytes::from_static(b"value"));
774 assert_eq!(second.delivery.subscribers, non_empty_vec![11]);
775 second.response.send(true).expect("response dropped");
776
777 context.sleep(Duration::from_millis(10)).await;
778 assert_eq!(fetcher.calls(), 1);
779 });
780 }
781
782 #[test]
783 fn missing_fetch_retries_until_value_is_available() {
784 Runner::default().start(|context| async move {
785 let fetcher = MockFetcher::default();
786 fetcher.push(1, None);
787 fetcher.push(1, Some(Bytes::from_static(b"value")));
788 let consumer = MockConsumer::default();
789 let mut resolver =
790 start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
791
792 assert!(resolver
793 .fetch(Fetch {
794 key: 1,
795 subscriber: 10
796 })
797 .accepted());
798 context
799 .sleep(RETRY_TIMEOUT + Duration::from_millis(10))
800 .await;
801
802 let delivery = wait_for_delivery(&context, &consumer).await;
803 assert_eq!(delivery.value, Bytes::from_static(b"value"));
804 delivery.response.send(true).expect("response dropped");
805 assert_eq!(fetcher.calls(), 2);
806 });
807 }
808
809 #[test]
810 fn accepted_redelivery_rejection_does_not_refetch() {
811 Runner::default().start(|context| async move {
812 let fetcher = MockFetcher::default();
813 fetcher.push(1, Some(Bytes::from_static(b"value")));
814 let consumer = MockConsumer::default();
815 let mut resolver =
816 start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
817
818 assert!(resolver
819 .fetch(Fetch {
820 key: 1,
821 subscriber: 10
822 })
823 .accepted());
824 let first = wait_for_delivery(&context, &consumer).await;
825
826 assert!(resolver
827 .fetch(Fetch {
828 key: 1,
829 subscriber: 11
830 })
831 .accepted());
832 context.sleep(Duration::from_millis(10)).await;
833 first.response.send(true).expect("response dropped");
834
835 let second = wait_for_delivery(&context, &consumer).await;
836 second.response.send(false).expect("response dropped");
837
838 context
839 .sleep(RETRY_TIMEOUT + Duration::from_millis(10))
840 .await;
841 assert_eq!(fetcher.calls(), 1);
842 assert_eq!(consumer.len(), 0);
843 });
844 }
845
846 #[test]
847 fn retain_prunes_active_fetch_subscribers() {
848 Runner::default().start(|context| async move {
849 let (fetcher, started, response) = BlockingFetcher::new();
850 let consumer = MockConsumer::default();
851 let mut resolver = start_resolver(context.child("resolver"), fetcher, consumer.clone());
852
853 assert!(resolver
854 .fetch(Fetch {
855 key: 1,
856 subscriber: 10
857 })
858 .accepted());
859 assert!(resolver
860 .fetch(Fetch {
861 key: 1,
862 subscriber: 11
863 })
864 .accepted());
865 started.await.expect("fetch did not start");
866 assert!(resolver
867 .retain(|_, subscriber| *subscriber == 11)
868 .accepted());
869 context.sleep(Duration::from_millis(10)).await;
870 response
871 .send(Some(Bytes::from_static(b"value")))
872 .expect("fetcher dropped");
873
874 let delivery = wait_for_delivery(&context, &consumer).await;
875 assert_eq!(delivery.delivery.subscribers, non_empty_vec![11]);
876 delivery.response.send(true).expect("response dropped");
877 });
878 }
879
880 #[test]
881 fn retain_drops_last_subscriber_aborts_active_fetch() {
882 Runner::default().start(|context| async move {
883 let (fetcher, started, response) = BlockingFetcher::new();
884 let consumer = MockConsumer::default();
885 let mut resolver = start_resolver(context.child("resolver"), fetcher, consumer.clone());
886
887 assert!(resolver
888 .fetch(Fetch {
889 key: 1,
890 subscriber: 10
891 })
892 .accepted());
893 started.await.expect("fetch did not start");
894 assert!(resolver.retain(|_, _| false).accepted());
895 context.sleep(Duration::from_millis(10)).await;
896
897 assert!(
898 response.send(Some(Bytes::from_static(b"value"))).is_err(),
899 "fetch future should be aborted after its last subscriber is pruned"
900 );
901 context
902 .sleep(RETRY_TIMEOUT + Duration::from_millis(10))
903 .await;
904 assert_eq!(consumer.len(), 0);
905 });
906 }
907
908 #[test]
909 fn retain_drops_last_subscriber_aborts_active_delivery() {
910 Runner::default().start(|context| async move {
911 let fetcher = MockFetcher::default();
912 fetcher.push(1, Some(Bytes::from_static(b"value")));
913 let consumer = MockConsumer::default();
914 let mut resolver =
915 start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
916
917 assert!(resolver
918 .fetch(Fetch {
919 key: 1,
920 subscriber: 10
921 })
922 .accepted());
923 let delivery = wait_for_delivery(&context, &consumer).await;
924 assert!(resolver.retain(|_, _| false).accepted());
925 context.sleep(Duration::from_millis(10)).await;
926
927 assert!(
928 delivery.response.send(false).is_err(),
929 "delivery future should be aborted after its last subscriber is pruned"
930 );
931 context
932 .sleep(RETRY_TIMEOUT + Duration::from_millis(10))
933 .await;
934 assert_eq!(fetcher.calls(), 1);
935 assert_eq!(consumer.len(), 0);
936 });
937 }
938
939 #[test]
940 fn targeted_fetch_uses_same_opaque_fetch_path() {
941 Runner::default().start(|context| async move {
942 let fetcher = MockFetcher::default();
943 fetcher.push(1, Some(Bytes::from_static(b"value")));
944 let consumer = MockConsumer::default();
945 let mut resolver =
946 start_resolver(context.child("resolver"), fetcher.clone(), consumer.clone());
947 let target = PrivateKey::from_seed(0).public_key();
948
949 assert!(resolver
950 .fetch_targeted(
951 Fetch {
952 key: 1,
953 subscriber: 10
954 },
955 non_empty_vec![target]
956 )
957 .accepted());
958 let delivery = wait_for_delivery(&context, &consumer).await;
959 assert_eq!(delivery.value, Bytes::from_static(b"value"));
960 delivery.response.send(true).expect("response dropped");
961 assert_eq!(fetcher.calls(), 1);
962 });
963 }
964}