1#![allow(ungated_async_fn_track_caller)] use std::{
4 borrow::Borrow,
5 collections::HashMap,
6 fmt::Debug,
7 hash::Hash,
8 sync::{Arc, LazyLock, atomic::AtomicBool},
9};
10
11use chrono::{DateTime, Utc};
12use codee::{Decoder, Encoder};
13use futures::{FutureExt, pin_mut};
14use leptos::{
15 prelude::{
16 ArcRwSignal, ArcSignal, Effect, Get, GetUntracked, LocalStorage, Owner, Read, ReadUntracked, Set, Signal,
17 Track, provide_context,
18 },
19 server::{ArcLocalResource, ArcResource, FromEncodedStr, IntoEncodedString, LocalResource, Resource},
20};
21use send_wrapper::SendWrapper;
22
23use crate::{
24 ArcLocalSignal, QueryOptions,
25 cache::{CachedOrFetchCbInput, CachedOrFetchCbInputVariant, CachedOrFetchCbOutput, OnScopeMissing},
26 cache_scope::{QueryAbortReason, QueryOrPending},
27 debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
28 maybe_local::MaybeLocal,
29 query::Query,
30 query_maybe_key::QueryMaybeKey,
31 query_scope::{QueryScopeInfo, QueryScopeLocalTrait, QueryScopeQueryInfo, QueryScopeTrait, ScopeCacheKey},
32 resource_drop_guard::ResourceDropGuard,
33 utils::{
34 KeyHash, OwnerChain, ResetInvalidated, client_only_yield_now, new_buster_id, new_resource_id,
35 run_external_callbacks,
36 },
37};
38
39use super::cache::ScopeLookup;
40
41#[cfg(not(feature = "rkyv"))]
42pub(crate) type DefaultCodec = codee::string::JsonSerdeCodec;
43
44#[cfg(feature = "rkyv")]
45pub(crate) type DefaultCodec = codee::binary::RkyvCodec;
46
47task_local::task_local! {
48 pub(crate) static ASYNC_TRACK_UPDATE_MARKER: Arc<AtomicBool>;
49}
50
51std::thread_local! {
52 pub(crate) static SYNC_TRACK_UPDATE_MARKER: AtomicBool = const { AtomicBool::new(true) };
53}
54
55pub struct QueryClient<Codec = DefaultCodec> {
78 pub(crate) untyped_client: UntypedQueryClient,
79 _ser: std::marker::PhantomData<SendWrapper<Codec>>,
80}
81
82#[derive(Clone, Copy)]
85pub(crate) struct UntypedQueryClient {
86 pub(crate) scope_lookup: ScopeLookup,
87 options: QueryOptions,
88 created_at: DateTime<Utc>,
89}
90
91impl<Codec> Debug for QueryClient<Codec> {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 f.debug_struct("QueryClient")
94 .field("scope_lookup", &self.untyped_client.scope_lookup)
95 .field("options", &self.untyped_client.options)
96 .field("codec", &std::any::type_name::<Codec>())
97 .finish()
98 }
99}
100
101impl<Codec: 'static> Clone for QueryClient<Codec> {
102 fn clone(&self) -> Self {
103 *self
104 }
105}
106
107impl<Codec: 'static> Copy for QueryClient<Codec> {}
108
109impl Default for QueryClient<DefaultCodec> {
110 fn default() -> Self {
111 Self::new()
112 }
113}
114
115impl QueryClient<DefaultCodec> {
116 #[track_caller]
122 pub fn new() -> Self {
123 Self {
124 untyped_client: UntypedQueryClient {
125 scope_lookup: ScopeLookup::new(),
126 options: QueryOptions::default(),
127 created_at: Utc::now(),
128 },
129 _ser: std::marker::PhantomData,
130 }
131 }
132}
133
134impl<Codec: 'static> QueryClient<Codec> {
135 #[track_caller]
145 pub fn provide(self) -> Self {
146 provide_context(self);
147 self
148 }
149
150 #[track_caller]
184 pub fn set_codec<NewCodec>(self) -> QueryClient<NewCodec> {
185 QueryClient {
186 untyped_client: self.untyped_client,
187 _ser: std::marker::PhantomData,
188 }
189 }
190
191 #[track_caller]
195 pub fn with_options(mut self, options: QueryOptions) -> Self {
196 self.untyped_client.options = options;
197 self
198 }
199
200 #[track_caller]
204 pub fn options(&self) -> QueryOptions {
205 self.untyped_client.options
206 }
207
208 #[track_caller]
213 pub fn with_refetch_enabled_toggle(self, refetch_enabled: impl Into<ArcSignal<bool>>) -> Self {
214 self.untyped_client.scope_lookup.scopes_mut().refetch_enabled = Some(refetch_enabled.into());
215 self
216 }
217
218 #[track_caller]
220 pub fn refetch_enabled(&self) -> Option<ArcSignal<bool>> {
221 self.untyped_client.scope_lookup.scopes().refetch_enabled.clone()
222 }
223
224 #[track_caller]
230 pub fn local_resource<K, MaybeKey, V, M>(
231 &self,
232 query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
233 keyer: impl Fn() -> MaybeKey + 'static,
234 ) -> LocalResource<MaybeKey::MappedValue>
235 where
236 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
237 MaybeKey: QueryMaybeKey<K, V>,
238 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
239 V: DebugIfDevtoolsEnabled + Clone + 'static,
240 {
241 self.arc_local_resource(query_scope, keyer).into()
242 }
243
244 #[track_caller]
250 pub fn arc_local_resource<K, MaybeKey, V, M>(
251 &self,
252 query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
253 keyer: impl Fn() -> MaybeKey + 'static,
254 ) -> ArcLocalResource<MaybeKey::MappedValue>
255 where
256 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
257 MaybeKey: QueryMaybeKey<K, V>,
258 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
259 V: DebugIfDevtoolsEnabled + Clone + 'static,
260 {
261 let client = *self;
262 let client_options = self.options();
263 let cache_key = query_scope.cache_key();
264 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
265 let query_scope = Arc::new(query_scope);
266 let query_options = query_scope.options();
267 let resource_id = new_resource_id();
268
269 let drop_guard = ResourceDropGuard::<K, V>::new(self.untyped_client.scope_lookup, resource_id, cache_key);
271
272 ArcLocalResource::new({
273 move || {
274 let query_scope = query_scope.clone();
275 let query_scope_info = query_scope_info.clone();
276 let maybe_key = keyer().into_maybe_key();
277 let drop_guard = drop_guard.clone();
278 if let Some(key) = maybe_key.as_ref() {
279 drop_guard.set_active_key(KeyHash::new(key));
280 }
281 let owner_chain = OwnerChain::new(client.untyped_client, query_scope_info.cache_key, Owner::current());
286 async move {
287 if let Some(key) = maybe_key {
288 let query_scope_query_info = || QueryScopeQueryInfo::new_local(&query_scope, &key);
289
290 let was_cached = AtomicBool::new(false);
297
298 let value = client
299 .untyped_client
300 .cached_or_fetch(
301 client_options,
302 query_options,
303 None,
304 &query_scope_info,
305 query_scope_query_info,
306 &key,
307 {
308 let query_scope = query_scope.clone();
309 move |key| {
310 let query_scope = query_scope.clone();
311
312 async move { MaybeLocal::new_local(query_scope.query(key).await) }
313 }
314 },
315 |info| {
316 info.cached.buster.track();
317 info.cached.mark_resource_active(resource_id);
318 match info.variant {
319 CachedOrFetchCbInputVariant::CachedUntouched => {
320 was_cached.store(true, std::sync::atomic::Ordering::Relaxed);
321 if cfg!(any(test, not(feature = "ssr")))
325 && info.cached.stale_or_invalidated()
326 {
327 let key = key.clone();
328 let query_scope = query_scope.clone();
329 let owner_chain = owner_chain.clone();
330 leptos::task::spawn(SendWrapper::new(async move {
334 client
335 .prefetch_inner(
336 QueryScopeInfo::new_local(&query_scope),
337 || QueryScopeQueryInfo::new_local(&query_scope, &key),
338 async |key| {
339 MaybeLocal::new_local(query_scope.query(key).await)
340 },
341 key.borrow(),
342 || MaybeLocal::new_local(key.borrow().clone()),
343 &owner_chain,
344 )
345 .await;
346 }));
347 }
348 }
349 CachedOrFetchCbInputVariant::Fresh => {}
350 CachedOrFetchCbInputVariant::CachedUpdated => {
351 panic!("Didn't direct inner to refetch here. (bug)")
352 }
353 }
354 CachedOrFetchCbOutput::Return(
355 info.cached.value_maybe_stale().value_may_panic().clone(),
359 )
360 },
361 None,
362 || MaybeLocal::new_local(key.clone()),
363 &owner_chain,
364 )
365 .await;
366
367 if was_cached.load(std::sync::atomic::Ordering::Relaxed) {
371 client_only_yield_now().await;
372 }
373
374 MaybeKey::prepare_mapped_value(Some(value))
375 } else {
376 MaybeKey::prepare_mapped_value(None)
377 }
378 }
379 }
380 })
381 }
382
383 #[track_caller]
392 pub fn resource<K, MaybeKey, V, M>(
393 &self,
394 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
395 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
396 ) -> Resource<MaybeKey::MappedValue, Codec>
397 where
398 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
399 MaybeKey: QueryMaybeKey<K, V>,
400 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
401 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
402 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
403 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
404 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
405 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
406 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
407 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
408 {
409 self.arc_resource_with_options(query_scope, keyer, false).into()
410 }
411
412 #[track_caller]
421 pub fn resource_blocking<K, MaybeKey, V, M>(
422 &self,
423 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
424 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
425 ) -> Resource<MaybeKey::MappedValue, Codec>
426 where
427 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
428 MaybeKey: QueryMaybeKey<K, V>,
429 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
430 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
431 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
432 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
433 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
434 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
435 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
436 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
437 {
438 self.arc_resource_with_options(query_scope, keyer, true).into()
439 }
440
441 #[track_caller]
450 pub fn arc_resource<K, MaybeKey, V, M>(
451 &self,
452 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
453 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
454 ) -> ArcResource<MaybeKey::MappedValue, Codec>
455 where
456 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
457 MaybeKey: QueryMaybeKey<K, V>,
458 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
459 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
460 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
461 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
462 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
463 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
464 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
465 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
466 {
467 self.arc_resource_with_options(query_scope, keyer, false)
468 }
469
470 #[track_caller]
479 pub fn arc_resource_blocking<K, MaybeKey, V, M>(
480 &self,
481 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
482 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
483 ) -> ArcResource<MaybeKey::MappedValue, Codec>
484 where
485 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
486 MaybeKey: QueryMaybeKey<K, V>,
487 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
488 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
489 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
490 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
491 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
492 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
493 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
494 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
495 {
496 self.arc_resource_with_options(query_scope, keyer, true)
497 }
498
499 #[track_caller]
500 fn arc_resource_with_options<K, MaybeKey, V, M>(
501 &self,
502 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
503 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
504 blocking: bool,
505 ) -> ArcResource<MaybeKey::MappedValue, Codec>
506 where
507 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
508 MaybeKey: QueryMaybeKey<K, V>,
509 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
510 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
511 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
512 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
513 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
514 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
515 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
516 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
517 {
518 let client = *self;
519 let client_options = self.options();
520 let cache_key = query_scope.cache_key();
521 let query_scope_info = QueryScopeInfo::new(&query_scope);
522 let query_scope = Arc::new(query_scope);
523 let scope_lookup = self.untyped_client.scope_lookup;
524 let query_options = query_scope.options();
525
526 let buster_if_uncached = ArcRwSignal::new(new_buster_id());
527 let resource_id = new_resource_id();
528
529 let drop_guard = ResourceDropGuard::<K, V>::new(self.untyped_client.scope_lookup, resource_id, cache_key);
531
532 let keyer = Arc::new(keyer);
533 let resource = ArcResource::new_with_options(
534 {
535 let buster_if_uncached = buster_if_uncached.clone();
536 let drop_guard = drop_guard.clone();
537 let keyer = keyer.clone();
538 move || {
539 if let Some(key) = keyer().into_maybe_key() {
540 let key_hash = KeyHash::new(&key);
541 drop_guard.set_active_key(key_hash);
542 scope_lookup.with_cached_query::<K, V, _>(&key_hash, &cache_key, |maybe_cached| {
543 if let Some(cached) = maybe_cached {
544 (Some(key.clone()), cached.buster.get())
546 } else {
547 (Some(key.clone()), buster_if_uncached.get())
549 }
550 })
551 } else {
552 (None, buster_if_uncached.get())
553 }
554 }
555 },
556 {
557 let buster_if_uncached = buster_if_uncached.clone();
558 let query_scope_info = query_scope_info.clone();
559 let query_scope = query_scope.clone();
560 move |(maybe_key, last_used_buster)| {
561 let query_scope = query_scope.clone();
562 let query_scope_info = query_scope_info.clone();
563 let buster_if_uncached = buster_if_uncached.clone();
564 let _drop_guard = drop_guard.clone();
567 let owner_chain =
573 OwnerChain::new(client.untyped_client, query_scope_info.cache_key, Owner::current());
574 async move {
575 if let Some(key) = maybe_key {
576 let query_scope_query_info = || QueryScopeQueryInfo::new(&query_scope, &key);
577
578 let was_cached = AtomicBool::new(false);
585
586 let value = client
587 .untyped_client
588 .cached_or_fetch(
589 client_options,
590 query_options,
591 Some(buster_if_uncached.clone()),
592 &query_scope_info,
593 query_scope_query_info,
594 &key,
595 {
596 let query_scope = query_scope.clone();
597 move |key| {
598 let query_scope = query_scope.clone();
599 async move { MaybeLocal::new(query_scope.query(key).await) }
600 }
601 },
602 |info| {
603 info.cached.mark_resource_active(resource_id);
604 match info.variant {
605 CachedOrFetchCbInputVariant::CachedUntouched => {
606 was_cached.store(true, std::sync::atomic::Ordering::Relaxed);
607
608 if cfg!(any(test, not(feature = "ssr")))
612 && info.cached.stale_or_invalidated()
613 {
614 let key = key.clone();
615 let query_scope = query_scope.clone();
616 let owner_chain = owner_chain.clone();
617
618 leptos::task::spawn(async move {
619 client
620 .prefetch_inner(
621 QueryScopeInfo::new(&query_scope),
622 || QueryScopeQueryInfo::new(&query_scope, &key),
623 {
624 let query_scope = query_scope.clone();
625 move |key| {
626 let query_scope = query_scope.clone();
627
628 async move {
629 MaybeLocal::new(
630 query_scope.query(key).await,
631 )
632 }
633 }
634 },
635 key.borrow(),
636 || MaybeLocal::new(key.borrow().clone()),
637 &owner_chain,
638 )
639 .await;
640 });
641 }
642
643 if last_used_buster != info.cached.buster.get_untracked() {
652 buster_if_uncached.set(info.cached.buster.get_untracked());
653 }
654 }
655 CachedOrFetchCbInputVariant::Fresh => {}
656 CachedOrFetchCbInputVariant::CachedUpdated => {
657 panic!("Didn't direct inner to refetch here. (bug)")
658 }
659 }
660 CachedOrFetchCbOutput::Return(
661 info.cached.value_maybe_stale().value_may_panic().clone(),
665 )
666 },
667 None,
668 || MaybeLocal::new(key.clone()),
669 &owner_chain,
670 )
671 .await;
672
673 if was_cached.load(std::sync::atomic::Ordering::Relaxed) {
677 client_only_yield_now().await;
678 }
679
680 MaybeKey::prepare_mapped_value(Some(value))
681 } else {
682 MaybeKey::prepare_mapped_value(None)
683 }
684 }
685 }
686 },
687 blocking,
688 );
689
690 let effect = {
697 let resource = resource.clone();
698 let buster_if_uncached = buster_if_uncached.clone();
699 let client_created_at = self.untyped_client.created_at;
700 move |complete: Option<Option<()>>| {
703 if let Some(Some(())) = complete {
704 return Some(());
705 }
706 if let Some(val) = resource.read().as_ref() {
707 if MaybeKey::mapped_value_is_some(val) {
708 scope_lookup.with_cached_scope_mut::<K, V, _, _>(
709 &mut scope_lookup.scopes_mut(),
710 query_scope_info.cache_key,
711 OnScopeMissing::Create(&query_scope_info),
712 |_| {},
713 |maybe_scope, _| {
714 let scope = maybe_scope.expect("provided a default");
715 if let Some(key) = keyer().into_maybe_key() {
716 let query_scope_query_info = || QueryScopeQueryInfo::new(&query_scope, &key);
717 let key_hash = KeyHash::new(&key);
718
719 if (cfg!(test) || cfg!(not(feature = "ssr"))) &&
741 (Utc::now() - client_created_at).num_seconds() < 10
743 {
744 use parking_lot::Mutex;
745
746 type AlreadySeenMap = HashMap<(u64, ScopeCacheKey, KeyHash), ()>;
747 static ALREADY_SEEN: LazyLock<Mutex<AlreadySeenMap>> =
748 LazyLock::new(|| Mutex::new(HashMap::new()));
749
750 let key = (scope_lookup.scope_id, query_scope_info.cache_key, key_hash);
751 let mut guard = ALREADY_SEEN.lock();
752 if guard.contains_key(&key) {
753 return;
754 }
755 guard.insert(key, ());
756 }
757
758 let mut was_pending = false;
759 if let Some(QueryOrPending::Pending { query_abort_tx, .. }) =
766 scope.get_mut_include_pending(&key_hash)
767 && let Some(tx) = query_abort_tx.take()
768 {
769 tx.send(QueryAbortReason::SsrStreamedValueOverride).unwrap();
770 was_pending = true;
771 }
772
773 if was_pending || !scope.contains_key(&key_hash) {
774 let query = Query::new(
775 client_options,
776 client.untyped_client,
777 &query_scope_info,
778 query_scope_query_info(),
779 key_hash,
780 MaybeLocal::new(key),
781 MaybeLocal::new(MaybeKey::mapped_to_maybe_value(val.clone()).expect(
782 "Just checked \
783 MaybeKey::mapped_value_is_some() \
784 is true",
785 )),
786 buster_if_uncached.clone(),
787 query_options,
788 None,
789 #[cfg(any(
790 all(debug_assertions, feature = "devtools"),
791 feature = "devtools-always"
792 ))]
793 crate::events::Event::new(crate::events::EventVariant::StreamedFromServer),
794 );
795 scope.insert(key_hash, query);
796 }
797 scope
798 .get(&key_hash)
799 .expect("Just inserted")
800 .mark_resource_active(resource_id)
801 }
802 },
803 );
804 }
805 Some(())
806 } else {
807 None
808 }
809 }
810 };
811 if cfg!(test) {
813 let effect = SendWrapper::new(effect);
816 #[allow(clippy::redundant_closure)]
817 Effect::new_isomorphic(move |v| effect(v));
818 } else {
819 Effect::new(effect);
820 }
821
822 resource
823 }
824
825 pub fn untrack_update_query(&self) {
839 SYNC_TRACK_UPDATE_MARKER.with(|marker| {
841 marker.store(false, std::sync::atomic::Ordering::Relaxed);
842 });
843 let _ = ASYNC_TRACK_UPDATE_MARKER.try_with(|marker| {
845 marker.store(false, std::sync::atomic::Ordering::Relaxed);
846 });
847 }
848
849 #[track_caller]
857 pub async fn prefetch_query<K, V, M>(&self, query_scope: impl QueryScopeTrait<K, V, M>, key: impl Borrow<K>)
858 where
859 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
860 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
861 {
862 let query_scope_info = QueryScopeInfo::new(&query_scope);
863 let owner_chain = OwnerChain::new(self.untyped_client, query_scope_info.cache_key, Owner::current());
864 self.prefetch_inner(
865 query_scope_info,
866 || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
867 async |key| MaybeLocal::new(query_scope.query(key).await),
868 key.borrow(),
869 || MaybeLocal::new(key.borrow().clone()),
870 &owner_chain,
871 )
872 .await
873 }
874
875 #[track_caller]
883 pub async fn prefetch_query_local<K, V, M>(
884 &self,
885 query_scope: impl QueryScopeLocalTrait<K, V, M>,
886 key: impl Borrow<K>,
887 ) where
888 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
889 V: DebugIfDevtoolsEnabled + Clone + 'static,
890 {
891 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
892 let owner_chain = OwnerChain::new(self.untyped_client, query_scope_info.cache_key, Owner::current());
893 self.prefetch_inner(
894 query_scope_info,
895 || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
896 async |key| MaybeLocal::new_local(query_scope.query(key).await),
897 key.borrow(),
898 || MaybeLocal::new_local(key.borrow().clone()),
899 &owner_chain,
900 )
901 .await
902 }
903
904 #[track_caller]
905 async fn prefetch_inner<K, V, FetcherFut>(
906 &self,
907 query_scope_info: QueryScopeInfo,
908 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
909 fetcher: impl Fn(K) -> FetcherFut,
910 key: &K,
911 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
912 owner_chain: &OwnerChain,
913 ) where
914 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
915 V: DebugIfDevtoolsEnabled + Clone + 'static,
916 FetcherFut: Future<Output = MaybeLocal<V>>,
917 {
918 self.untyped_client
919 .cached_or_fetch(
920 self.options(),
921 query_scope_info.options,
922 None,
923 &query_scope_info,
924 query_scope_info_for_new_query,
925 key,
926 fetcher,
927 |info| {
928 match info.variant {
929 CachedOrFetchCbInputVariant::CachedUntouched => {
930 if info.cached.stale_or_invalidated() {
931 return CachedOrFetchCbOutput::Refetch;
932 }
933 }
934 CachedOrFetchCbInputVariant::CachedUpdated => {
935 info.cached.buster.set(new_buster_id());
937 }
938 CachedOrFetchCbInputVariant::Fresh => {}
939 }
940 CachedOrFetchCbOutput::Return(())
941 },
942 None,
943 lazy_maybe_local_key,
944 owner_chain,
945 )
946 .await;
947 }
948
949 #[track_caller]
959 pub async fn fetch_query<K, V, M>(&self, query_scope: impl QueryScopeTrait<K, V, M>, key: impl Borrow<K>) -> V
960 where
961 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
962 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
963 {
964 self.untyped_client.fetch_query(query_scope, key).await
965 }
966
967 #[track_caller]
977 pub async fn fetch_query_local<K, V, M>(
978 &self,
979 query_scope: impl QueryScopeLocalTrait<K, V, M>,
980 key: impl Borrow<K>,
981 ) -> V
982 where
983 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
984 V: DebugIfDevtoolsEnabled + Clone + 'static,
985 {
986 self.untyped_client.fetch_query_local(query_scope, key).await
987 }
988
989 #[track_caller]
995 pub fn set_query<K, V, M>(&self, query_scope: impl QueryScopeTrait<K, V, M>, key: impl Borrow<K>, new_value: V)
996 where
997 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
998 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
999 {
1000 self.set_inner(
1001 QueryScopeInfo::new(&query_scope),
1002 || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
1003 key.borrow(),
1004 MaybeLocal::new(new_value),
1005 || MaybeLocal::new(key.borrow().clone()),
1006 true,
1007 ResetInvalidated::NoReset,
1010 )
1011 }
1012
1013 #[track_caller]
1019 pub fn set_query_local<K, V, M>(
1020 &self,
1021 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1022 key: impl Borrow<K>,
1023 new_value: V,
1024 ) where
1025 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1026 V: DebugIfDevtoolsEnabled + Clone + 'static,
1027 {
1028 self.set_inner::<K, V>(
1029 QueryScopeInfo::new_local(&query_scope),
1030 || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
1031 key.borrow(),
1032 MaybeLocal::new_local(new_value),
1033 || MaybeLocal::new_local(key.borrow().clone()),
1034 true,
1035 ResetInvalidated::NoReset,
1038 )
1039 }
1040
1041 #[track_caller]
1042 fn set_inner<K, V>(
1043 &self,
1044 query_scope_info: QueryScopeInfo,
1045 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
1046 key: &K,
1047 new_value: MaybeLocal<V>,
1048 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
1049 track: bool,
1050 reset_invalidated: ResetInvalidated,
1051 ) where
1052 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1053 V: DebugIfDevtoolsEnabled + Clone + 'static,
1054 {
1055 let key_hash = KeyHash::new(key.borrow());
1056 self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1057 &mut self.untyped_client.scope_lookup.scopes_mut(),
1058 query_scope_info.cache_key,
1059 OnScopeMissing::Create(&query_scope_info),
1060 |_| {},
1061 |maybe_scope, _| {
1062 let scope = maybe_scope.expect("provided a default");
1063
1064 let maybe_cached = if !new_value.is_local() {
1066 if let Some(threadsafe_existing) = scope.get_mut_threadsafe_only(&key_hash) {
1067 Some(threadsafe_existing)
1068 } else {
1069 scope.get_mut_local_only(&key_hash)
1070 }
1071 } else {
1072 scope.get_mut_local_only(&key_hash)
1073 };
1074
1075 if let Some(cached) = maybe_cached {
1076 cached.set_value(
1077 new_value,
1078 track,
1079 #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
1080 crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1081 reset_invalidated,
1082 );
1083 } else {
1084 let query = Query::new(
1085 self.options(),
1086 self.untyped_client,
1087 &query_scope_info,
1088 query_scope_info_for_new_query(),
1089 key_hash,
1090 lazy_maybe_local_key(),
1091 new_value,
1092 ArcRwSignal::new(new_buster_id()),
1093 query_scope_info.options,
1094 None,
1095 #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
1096 crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1097 );
1098 scope.insert(key_hash, query);
1099 }
1100 },
1101 );
1102 }
1103
1104 #[track_caller]
1118 pub fn update_query<K, V, T, M>(
1119 &self,
1120 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1121 key: impl Borrow<K>,
1122 modifier: impl FnOnce(Option<&mut V>) -> T,
1123 ) -> T
1124 where
1125 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1126 V: DebugIfDevtoolsEnabled + Clone + 'static,
1127 {
1128 self.update_query_inner::<K, V, T>(
1129 &QueryScopeInfo::new_local(&query_scope),
1130 key.borrow(),
1131 ResetInvalidated::NoReset,
1132 modifier,
1133 )
1134 }
1135
1136 #[track_caller]
1137 fn update_query_inner<K, V, T>(
1138 &self,
1139 query_scope_info: &QueryScopeInfo,
1140 key: &K,
1141 reset_invalidated: ResetInvalidated,
1142 modifier: impl FnOnce(Option<&mut V>) -> T,
1143 ) -> T
1144 where
1145 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1146 V: DebugIfDevtoolsEnabled + Clone + 'static,
1147 {
1148 let key_hash = KeyHash::new(key);
1149 let mut modifier_holder = Some(modifier);
1150
1151 let maybe_return_value = self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1152 &mut self.untyped_client.scope_lookup.scopes_mut(),
1153 query_scope_info.cache_key,
1154 OnScopeMissing::Skip,
1155 |_| {},
1156 |maybe_scope, _| {
1157 if let Some(scope) = maybe_scope
1158 && let Some(cached) = scope.get_mut(&key_hash)
1159 {
1160 let modifier = modifier_holder
1161 .take()
1162 .expect("Should never be used more than once. (bug)");
1163 let return_value = cached.update_value(
1164 |value| modifier(Some(value.value_mut_may_panic())),
1166 #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
1167 crate::events::Event::new(crate::events::EventVariant::DeclarativeUpdate),
1168 reset_invalidated,
1169 );
1170 return Some(return_value);
1171 }
1172 None
1173 },
1174 );
1175 if let Some(return_value) = maybe_return_value {
1176 return_value
1177 } else {
1178 modifier_holder
1180 .take()
1181 .expect("Should never be used more than once. (bug)")(None)
1182 }
1183 }
1184
1185 #[track_caller]
1196 pub async fn update_query_async<'a, K, V, T, M>(
1197 &'a self,
1198 query_scope: impl QueryScopeTrait<K, V, M>,
1199 key: impl Borrow<K>,
1200 mapper: impl AsyncFnOnce(&mut V) -> T,
1201 ) -> T
1202 where
1203 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
1204 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1205 {
1206 let query_scope_info = QueryScopeInfo::new(&query_scope);
1207 let owner_chain = OwnerChain::new(self.untyped_client, query_scope_info.cache_key, Owner::current());
1208 self.update_query_async_inner(
1209 query_scope_info,
1210 || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
1211 async |key| MaybeLocal::new(query_scope.query(key).await),
1212 key.borrow(),
1213 mapper,
1214 MaybeLocal::new,
1215 || MaybeLocal::new(key.borrow().clone()),
1216 &owner_chain,
1217 )
1218 .await
1219 }
1220
1221 #[track_caller]
1232 pub async fn update_query_async_local<'a, K, V, T, M>(
1233 &'a self,
1234 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1235 key: impl Borrow<K>,
1236 mapper: impl AsyncFnOnce(&mut V) -> T,
1237 ) -> T
1238 where
1239 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1240 V: DebugIfDevtoolsEnabled + Clone + 'static,
1241 {
1242 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
1243 let owner_chain = OwnerChain::new(self.untyped_client, query_scope_info.cache_key, Owner::current());
1244 self.update_query_async_inner(
1245 query_scope_info,
1246 || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
1247 async |key| MaybeLocal::new_local(query_scope.query(key).await),
1248 key.borrow(),
1249 mapper,
1250 MaybeLocal::new_local,
1251 || MaybeLocal::new_local(key.borrow().clone()),
1252 &owner_chain,
1253 )
1254 .await
1255 }
1256
1257 #[track_caller]
1258 async fn update_query_async_inner<'a, K, V, T, FetcherFut>(
1259 &'a self,
1260 query_scope_info: QueryScopeInfo,
1261 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
1262 fetcher: impl Fn(K) -> FetcherFut,
1263 key: &K,
1264 mapper: impl AsyncFnOnce(&mut V) -> T,
1265 into_maybe_local: impl FnOnce(V) -> MaybeLocal<V>,
1266 lazy_maybe_local_key: impl Fn() -> MaybeLocal<K>,
1267 owner_chain: &OwnerChain,
1268 ) -> T
1269 where
1270 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1271 V: DebugIfDevtoolsEnabled + Clone + 'static,
1272 FetcherFut: Future<Output = MaybeLocal<V>>,
1273 {
1274 let key_hash = KeyHash::new(key.borrow());
1275
1276 let fetcher_mutex = self
1280 .untyped_client
1281 .scope_lookup
1282 .fetcher_mutex::<K, V>(key_hash, &query_scope_info);
1283 let fetcher_guard = fetcher_mutex.lock().await;
1284
1285 let mut new_value = self
1286 .untyped_client
1287 .fetch_inner(
1288 query_scope_info.clone(),
1289 &query_scope_info_for_new_query,
1290 fetcher,
1291 key.borrow(),
1292 Some(&fetcher_guard),
1293 &lazy_maybe_local_key,
1294 owner_chain,
1295 )
1296 .await;
1297
1298 self.untyped_client
1300 .scope_lookup
1301 .with_notify_fetching(query_scope_info.cache_key, key_hash, false, async {
1302 let track = Arc::new(AtomicBool::new(true));
1303
1304 let query_abort_rx = self.untyped_client.scope_lookup.prepare_invalidation_channel::<K, V>(
1306 &query_scope_info,
1307 key_hash,
1308 &lazy_maybe_local_key(),
1309 );
1310
1311 let result_fut = ASYNC_TRACK_UPDATE_MARKER.scope(track.clone(), async { mapper(&mut new_value).await });
1312
1313 let mut cleared_during_user_fn = false;
1322 let result = {
1323 pin_mut!(result_fut);
1324 pin_mut!(query_abort_rx);
1325 match futures::future::select(result_fut, query_abort_rx).await {
1326 futures::future::Either::Left((result, _query_abort_rx)) => result,
1327 futures::future::Either::Right((query_abort_reason, result_fut)) => {
1328 if let Ok(QueryAbortReason::Clear) = query_abort_reason {
1329 cleared_during_user_fn = true;
1331 }
1332 result_fut.await
1333 }
1334 }
1335 };
1336
1337 if !cleared_during_user_fn {
1339 let mut new_value = Some(new_value);
1348 let updated = self.update_query_inner::<K, V, _>(
1349 &query_scope_info,
1350 key,
1351 ResetInvalidated::NoReset,
1353 |value| {
1354 if let Some(value) = value {
1355 if !track.load(std::sync::atomic::Ordering::Relaxed) {
1357 self.untrack_update_query();
1358 }
1359
1360 *value = new_value.take().expect("Should be Some");
1361 true
1362 } else {
1363 false
1364 }
1365 },
1366 );
1367 if !updated {
1368 self.set_inner::<K, V>(
1369 query_scope_info,
1370 query_scope_info_for_new_query,
1371 key.borrow(),
1372 into_maybe_local(
1373 new_value
1374 .take()
1375 .expect("Should be Some, should only be here if not updated"),
1376 ),
1377 lazy_maybe_local_key,
1378 track.load(std::sync::atomic::Ordering::Relaxed),
1379 ResetInvalidated::NoReset,
1382 );
1383 }
1384 }
1385
1386 result
1387 })
1388 .await
1389 }
1390
1391 #[track_caller]
1393 pub fn get_cached_query<K, V, M>(
1394 &self,
1395 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1396 key: impl Borrow<K>,
1397 ) -> Option<V>
1398 where
1399 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1400 V: DebugIfDevtoolsEnabled + Clone + 'static,
1401 {
1402 self.untyped_client.scope_lookup.with_cached_query::<K, V, _>(
1403 &KeyHash::new(key.borrow()),
1404 &query_scope.cache_key(),
1405 |maybe_cached| {
1406 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1408 },
1409 )
1410 }
1411
1412 #[track_caller]
1416 pub fn query_exists<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>, key: impl Borrow<K>) -> bool
1417 where
1418 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1419 V: DebugIfDevtoolsEnabled + 'static,
1420 {
1421 let key_hash = KeyHash::new(key.borrow());
1422 self.untyped_client.scope_lookup.with_cached_query::<K, V, _>(
1423 &key_hash,
1424 &query_scope.cache_key(),
1425 |maybe_cached| maybe_cached.is_some(),
1426 )
1427 }
1428
1429 #[track_caller]
1442 pub fn subscribe_is_loading<K, MaybeKey, V, M>(
1443 &self,
1444 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1445 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1446 ) -> Signal<bool>
1447 where
1448 K: Hash + Send + Sync + 'static,
1449 MaybeKey: QueryMaybeKey<K, V>,
1450 MaybeKey::MappedValue: 'static,
1451 V: 'static,
1452 {
1453 self.subscribe_is_loading_arc(query_scope, keyer).into()
1454 }
1455
1456 #[track_caller]
1469 pub fn subscribe_is_loading_local<K, MaybeKey, V, M>(
1470 &self,
1471 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1472 keyer: impl Fn() -> MaybeKey + 'static,
1473 ) -> Signal<bool>
1474 where
1475 K: Hash + 'static,
1476 MaybeKey: QueryMaybeKey<K, V>,
1477 MaybeKey::MappedValue: 'static,
1478 V: 'static,
1479 {
1480 self.subscribe_is_loading_arc_local(query_scope, keyer).into()
1481 }
1482
1483 #[track_caller]
1496 pub fn subscribe_is_loading_arc_local<K, MaybeKey, V, M>(
1497 &self,
1498 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1499 keyer: impl Fn() -> MaybeKey + 'static,
1500 ) -> ArcSignal<bool>
1501 where
1502 K: Hash + 'static,
1503 MaybeKey: QueryMaybeKey<K, V>,
1504 MaybeKey::MappedValue: 'static,
1505 V: 'static,
1506 {
1507 let keyer = SendWrapper::new(keyer);
1508 self.untyped_client
1509 .scope_lookup
1510 .scope_subscriptions_mut()
1511 .add_is_loading_subscription(
1512 query_scope.cache_key(),
1513 MaybeLocal::new_local(ArcSignal::derive(move || {
1514 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1515 })),
1516 )
1517 }
1518
1519 #[track_caller]
1532 pub fn subscribe_is_loading_arc<K, MaybeKey, V, M>(
1533 &self,
1534 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1535 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1536 ) -> ArcSignal<bool>
1537 where
1538 K: Hash + Send + Sync + 'static,
1539 MaybeKey: QueryMaybeKey<K, V>,
1540 MaybeKey::MappedValue: 'static,
1541 V: 'static,
1542 {
1543 self.untyped_client
1544 .scope_lookup
1545 .scope_subscriptions_mut()
1546 .add_is_loading_subscription(
1547 query_scope.cache_key(),
1548 MaybeLocal::new(ArcSignal::derive(move || {
1549 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1550 })),
1551 )
1552 }
1553
1554 #[track_caller]
1567 pub fn subscribe_is_fetching<K, MaybeKey, V, M>(
1568 &self,
1569 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1570 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1571 ) -> Signal<bool>
1572 where
1573 K: Hash + Send + Sync + 'static,
1574 MaybeKey: QueryMaybeKey<K, V>,
1575 MaybeKey::MappedValue: 'static,
1576 V: 'static,
1577 {
1578 self.subscribe_is_fetching_arc(query_scope, keyer).into()
1579 }
1580
1581 #[track_caller]
1594 pub fn subscribe_is_fetching_local<K, MaybeKey, V, M>(
1595 &self,
1596 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1597 keyer: impl Fn() -> MaybeKey + 'static,
1598 ) -> Signal<bool>
1599 where
1600 K: Hash + 'static,
1601 MaybeKey: QueryMaybeKey<K, V>,
1602 MaybeKey::MappedValue: 'static,
1603 V: 'static,
1604 {
1605 self.subscribe_is_fetching_arc_local(query_scope, keyer).into()
1606 }
1607
1608 #[track_caller]
1621 pub fn subscribe_is_fetching_arc_local<K, MaybeKey, V, M>(
1622 &self,
1623 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1624 keyer: impl Fn() -> MaybeKey + 'static,
1625 ) -> ArcSignal<bool>
1626 where
1627 K: Hash + 'static,
1628 MaybeKey: QueryMaybeKey<K, V>,
1629 MaybeKey::MappedValue: 'static,
1630 V: 'static,
1631 {
1632 let keyer = SendWrapper::new(keyer);
1633 self.untyped_client
1634 .scope_lookup
1635 .scope_subscriptions_mut()
1636 .add_is_fetching_subscription(
1637 query_scope.cache_key(),
1638 MaybeLocal::new_local(ArcSignal::derive(move || {
1639 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1640 })),
1641 )
1642 }
1643
1644 #[track_caller]
1657 pub fn subscribe_is_fetching_arc<K, MaybeKey, V, M>(
1658 &self,
1659 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1660 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1661 ) -> ArcSignal<bool>
1662 where
1663 K: Hash + Send + Sync + 'static,
1664 MaybeKey: QueryMaybeKey<K, V>,
1665 MaybeKey::MappedValue: 'static,
1666 V: 'static,
1667 {
1668 self.untyped_client
1669 .scope_lookup
1670 .scope_subscriptions_mut()
1671 .add_is_fetching_subscription(
1672 query_scope.cache_key(),
1673 MaybeLocal::new(ArcSignal::derive(move || {
1674 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1675 })),
1676 )
1677 }
1678
1679 #[track_caller]
1687 pub fn subscribe_value<K, MaybeKey, V, M>(
1688 &self,
1689 query_scope: impl QueryScopeTrait<K, V, M>,
1690 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1691 ) -> Signal<Option<V>>
1692 where
1693 K: DebugIfDevtoolsEnabled + Hash + Clone + Send + Sync + 'static,
1694 MaybeKey: QueryMaybeKey<K, V>,
1695 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1696 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1697 {
1698 self.subscribe_value_arc(query_scope, keyer).into()
1699 }
1700
1701 #[track_caller]
1709 pub fn subscribe_value_local<K, MaybeKey, V, M>(
1710 &self,
1711 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1712 keyer: impl Fn() -> MaybeKey + 'static,
1713 ) -> Signal<Option<V>, LocalStorage>
1714 where
1715 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1716 MaybeKey: QueryMaybeKey<K, V>,
1717 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1718 V: DebugIfDevtoolsEnabled + Clone + 'static,
1719 {
1720 self.subscribe_value_arc_local(query_scope, keyer).into()
1721 }
1722
1723 #[track_caller]
1731 pub fn subscribe_value_arc_local<K, MaybeKey, V, M>(
1732 &self,
1733 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1734 keyer: impl Fn() -> MaybeKey + 'static,
1735 ) -> ArcLocalSignal<Option<V>>
1736 where
1737 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1738 MaybeKey: QueryMaybeKey<K, V>,
1739 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1740 V: DebugIfDevtoolsEnabled + Clone + 'static,
1741 {
1742 let cache_key = query_scope.cache_key();
1743 let keyer = ArcLocalSignal::derive_local(move || keyer().into_maybe_key().map(|k| KeyHash::new(&k)));
1744 let dyn_signal = self
1745 .untyped_client
1746 .scope_lookup
1747 .scope_subscriptions_mut()
1748 .add_value_set_updated_or_removed_subscription(
1749 cache_key,
1750 MaybeLocal::new_local({
1751 let keyer = keyer.clone();
1752 move || keyer.get()
1753 }),
1754 );
1755
1756 let scope_lookup = self.untyped_client.scope_lookup;
1757 ArcLocalSignal::derive_local(move || {
1758 dyn_signal.track();
1759 if let Some(key) = keyer.read_untracked().as_ref() {
1760 scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1761 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1763 })
1764 } else {
1765 None
1766 }
1767 })
1768 }
1769
1770 #[track_caller]
1778 pub fn subscribe_value_arc<K, MaybeKey, V, M>(
1779 &self,
1780 query_scope: impl QueryScopeTrait<K, V, M>,
1781 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1782 ) -> ArcSignal<Option<V>>
1783 where
1784 K: DebugIfDevtoolsEnabled + Hash + Clone + Send + Sync + 'static,
1785 MaybeKey: QueryMaybeKey<K, V>,
1786 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1787 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1788 {
1789 let cache_key = query_scope.cache_key();
1790 let keyer = ArcSignal::derive(move || keyer().into_maybe_key().map(|k| KeyHash::new(&k)));
1791
1792 let dyn_signal = self
1793 .untyped_client
1794 .scope_lookup
1795 .scope_subscriptions_mut()
1796 .add_value_set_updated_or_removed_subscription(
1797 cache_key,
1798 MaybeLocal::new({
1799 let keyer = keyer.clone();
1800 move || keyer.get()
1801 }),
1802 );
1803
1804 let scope_lookup = self.untyped_client.scope_lookup;
1805 ArcSignal::derive(move || {
1806 dyn_signal.track();
1807 if let Some(key) = keyer.read_untracked().as_ref() {
1808 scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1809 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1811 })
1812 } else {
1813 None
1814 }
1815 })
1816 }
1817
1818 #[track_caller]
1822 pub fn invalidate_query<K, V, M>(
1823 &self,
1824 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1825 key: impl Borrow<K>,
1826 ) -> bool
1827 where
1828 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1829 V: DebugIfDevtoolsEnabled + Clone + 'static,
1830 {
1831 self.untyped_client.invalidate_query(query_scope, key)
1832 }
1833
1834 #[track_caller]
1838 pub fn invalidate_queries<K, V, KRef, M>(
1839 &self,
1840 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1841 keys: impl IntoIterator<Item = KRef>,
1842 ) -> Vec<KRef>
1843 where
1844 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1845 V: DebugIfDevtoolsEnabled + Clone + 'static,
1846 KRef: Borrow<K>,
1847 {
1848 self.untyped_client.invalidate_queries(query_scope, keys)
1849 }
1850
1851 #[track_caller]
1858 pub fn invalidate_queries_with_predicate<K, V, M>(
1859 &self,
1860 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1861 should_invalidate: impl Fn(&K) -> bool,
1862 ) where
1863 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1864 V: DebugIfDevtoolsEnabled + Clone + 'static,
1865 {
1866 let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1867 let mut cbs_scopes = vec![];
1868 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
1869 self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1870 &mut scopes,
1871 query_scope_info.cache_key,
1872 OnScopeMissing::Skip,
1873 |_| {},
1874 |maybe_scope, _| {
1875 if let Some(scope) = maybe_scope {
1876 for query in scope.all_queries_mut_include_pending() {
1877 if let Some(key) = query.key().value_if_safe()
1878 && should_invalidate(key)
1879 {
1880 let cb_scopes = query.invalidate(QueryAbortReason::Invalidate);
1881 cbs_scopes.push(cb_scopes);
1882 }
1883 }
1884 }
1885 },
1886 );
1887 let mut cbs_external = vec![];
1888 for cb in cbs_scopes {
1889 if let Some(cb_external) = cb(&mut scopes) {
1890 cbs_external.push(cb_external);
1891 }
1892 }
1893 drop(scopes);
1894 run_external_callbacks(self.untyped_client, query_scope_info.cache_key, cbs_external);
1895 }
1896
1897 #[track_caller]
1901 pub fn invalidate_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
1902 where
1903 K: Hash + 'static,
1904 V: Clone + 'static,
1905 {
1906 self.invalidate_query_scope_inner(query_scope.cache_key())
1907 }
1908
1909 pub(crate) fn invalidate_query_scope_inner(&self, scope_cache_key: ScopeCacheKey) {
1910 let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1911 let mut cbs_scopes = vec![];
1912 if let Some(scope) = scopes.get_mut(&scope_cache_key) {
1913 let cb_scopes = scope.invalidate_scope(QueryAbortReason::Invalidate);
1914 cbs_scopes.push(cb_scopes);
1915 for buster in scope.busters() {
1916 buster.try_set(new_buster_id());
1917 }
1918 }
1919 let mut cbs_external = vec![];
1920 for cb in cbs_scopes {
1921 if let Some(cb_external) = cb(&mut scopes) {
1922 cbs_external.push(cb_external);
1923 }
1924 }
1925 drop(scopes);
1926 run_external_callbacks(self.untyped_client, scope_cache_key, cbs_external);
1927 }
1928
1929 #[track_caller]
1936 pub fn invalidate_all_queries(&self) {
1937 let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1938 let mut cbs_scopes = vec![];
1939 for scope in scopes.values_mut() {
1940 let busters = scope.busters();
1941 let cb_scopes = scope.invalidate_scope(QueryAbortReason::Invalidate);
1942 cbs_scopes.push((cb_scopes, scope.cache_key()));
1943 for buster in busters {
1944 buster.try_set(new_buster_id());
1945 }
1946 }
1947 let mut cbs_external = vec![];
1948 for (cb, cache_key) in cbs_scopes {
1949 if let Some(cb_external) = cb(&mut scopes) {
1950 cbs_external.push((cb_external, cache_key));
1951 }
1952 }
1953 drop(scopes);
1954 for (cb_external, cache_key) in cbs_external {
1955 run_external_callbacks(self.untyped_client, cache_key, vec![cb_external]);
1956 }
1957 }
1958
1959 #[track_caller]
1969 pub fn clear(&self) {
1970 let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1971 let mut cbs_scopes = vec![];
1972 for scope in scopes.values_mut() {
1973 let busters = scope.busters();
1974 let cb_scopes = scope.invalidate_scope(QueryAbortReason::Clear);
1975 cbs_scopes.push((cb_scopes, scope.cache_key()));
1976 scope.clear();
1977 for buster in busters {
1978 buster.try_set(new_buster_id());
1979 }
1980 }
1981 let mut cbs_external = vec![];
1982 for (cb, cache_key) in cbs_scopes {
1983 if let Some(cb_external) = cb(&mut scopes) {
1984 cbs_external.push((cb_external, cache_key));
1985 }
1986 }
1987 drop(scopes);
1988 for (cb_external, cache_key) in cbs_external {
1989 run_external_callbacks(self.untyped_client, cache_key, vec![cb_external]);
1990 }
1991 }
1992
1993 #[cfg(test)]
1994 pub(crate) fn total_cached_queries(&self) -> usize {
1995 self.untyped_client
1996 .scope_lookup
1997 .scopes()
1998 .values()
1999 .map(|scope| scope.total_cached_queries())
2000 .sum()
2001 }
2002
2003 #[cfg(test)]
2004 pub(crate) fn is_key_invalid<K, V, M>(
2005 &self,
2006 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2007 key: impl Borrow<K>,
2008 ) -> bool
2009 where
2010 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2011 V: DebugIfDevtoolsEnabled + Clone + 'static,
2012 {
2013 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2014 self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2015 &mut self.untyped_client.scope_lookup.scopes_mut(),
2016 query_scope_info.cache_key,
2017 OnScopeMissing::Skip,
2018 |_| {},
2019 |maybe_scope, _| {
2020 if let Some(scope) = maybe_scope {
2021 scope
2022 .get(&KeyHash::new(key.borrow()))
2023 .map(|query| query.is_invalidated())
2024 .unwrap_or(false)
2025 } else {
2026 false
2027 }
2028 },
2029 )
2030 }
2031
2032 #[cfg(test)]
2033 #[allow(dead_code)]
2034 pub(crate) fn mark_key_valid<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>, key: impl Borrow<K>)
2035 where
2036 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2037 V: DebugIfDevtoolsEnabled + Clone + 'static,
2038 {
2039 self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2040 &mut self.untyped_client.scope_lookup.scopes_mut(),
2041 QueryScopeInfo::new_local(&query_scope).cache_key,
2042 OnScopeMissing::Skip,
2043 |_| {},
2044 |maybe_scope, _| {
2045 if let Some(scope) = maybe_scope
2046 && let Some(query) = scope.get_mut(&KeyHash::new(key.borrow()))
2047 {
2048 query.mark_valid();
2049 }
2050 },
2051 );
2052 }
2053
2054 #[cfg(test)]
2055 pub(crate) fn subscriber_count(&self) -> usize {
2056 self.untyped_client.scope_lookup.scope_subscriptions_mut().count()
2057 }
2058}
2059
2060impl UntypedQueryClient {
2061 #[track_caller]
2062 pub(crate) async fn fetch_query<K, V, M>(
2063 &self,
2064 query_scope: impl QueryScopeTrait<K, V, M>,
2065 key: impl Borrow<K>,
2066 ) -> V
2067 where
2068 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
2069 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
2070 {
2071 let query_scope_info = QueryScopeInfo::new(&query_scope);
2072 let owner_chain = OwnerChain::new(*self, query_scope_info.cache_key, Owner::current());
2073 self.fetch_inner(
2074 query_scope_info,
2075 || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
2076 async |key| MaybeLocal::new(query_scope.query(key).await),
2077 key.borrow(),
2078 None,
2079 || MaybeLocal::new(key.borrow().clone()),
2080 &owner_chain,
2081 )
2082 .await
2083 }
2084
2085 #[track_caller]
2086 pub(crate) async fn fetch_query_local<K, V, M>(
2087 &self,
2088 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2089 key: impl Borrow<K>,
2090 ) -> V
2091 where
2092 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
2093 V: DebugIfDevtoolsEnabled + Clone + 'static,
2094 {
2095 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2096 let owner_chain = OwnerChain::new(*self, query_scope_info.cache_key, Owner::current());
2097 self.fetch_inner(
2098 query_scope_info,
2099 || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
2100 async |key| MaybeLocal::new_local(query_scope.query(key).await),
2101 key.borrow(),
2102 None,
2103 || MaybeLocal::new_local(key.borrow().clone()),
2104 &owner_chain,
2105 )
2106 .await
2107 }
2108
2109 #[track_caller]
2110 async fn fetch_inner<K, V, FetcherFut>(
2111 &self,
2112 query_scope_info: QueryScopeInfo,
2113 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
2114 fetcher: impl Fn(K) -> FetcherFut,
2115 key: &K,
2116 maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
2117 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
2118 owner_chain: &OwnerChain,
2119 ) -> V
2120 where
2121 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
2122 V: DebugIfDevtoolsEnabled + Clone + 'static,
2123 FetcherFut: Future<Output = MaybeLocal<V>>,
2124 {
2125 self.cached_or_fetch(
2126 self.options,
2127 query_scope_info.options,
2128 None,
2129 &query_scope_info,
2130 query_scope_info_for_new_query,
2131 key,
2132 fetcher,
2133 |info| {
2134 match info.variant {
2135 CachedOrFetchCbInputVariant::CachedUntouched => {
2136 if info.cached.stale_or_invalidated() {
2137 return CachedOrFetchCbOutput::Refetch;
2138 }
2139 }
2140 CachedOrFetchCbInputVariant::CachedUpdated => {
2141 info.cached.buster.set(new_buster_id());
2143 }
2144 CachedOrFetchCbInputVariant::Fresh => {}
2145 }
2146 CachedOrFetchCbOutput::Return(
2147 info.cached.value_maybe_stale().value_may_panic().clone(),
2149 )
2150 },
2151 maybe_preheld_fetcher_mutex_guard,
2152 lazy_maybe_local_key,
2153 owner_chain,
2154 )
2155 .await
2156 }
2157
2158 #[track_caller]
2159 pub(crate) fn invalidate_query<K, V, M>(
2160 &self,
2161 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2162 key: impl Borrow<K>,
2163 ) -> bool
2164 where
2165 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2166 V: DebugIfDevtoolsEnabled + Clone + 'static,
2167 {
2168 let cleared = self.invalidate_queries(query_scope, std::iter::once(key));
2169 !cleared.is_empty()
2170 }
2171
2172 #[track_caller]
2173 pub(crate) fn invalidate_queries<K, V, KRef, M>(
2174 &self,
2175 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2176 keys: impl IntoIterator<Item = KRef>,
2177 ) -> Vec<KRef>
2178 where
2179 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2180 V: DebugIfDevtoolsEnabled + Clone + 'static,
2181 KRef: Borrow<K>,
2182 {
2183 self.invalidate_queries_inner::<K, V, _>(&QueryScopeInfo::new_local(&query_scope), keys)
2184 }
2185
2186 #[track_caller]
2187 pub(crate) fn invalidate_queries_inner<K, V, KRef>(
2188 &self,
2189 query_scope_info: &QueryScopeInfo,
2190 keys: impl IntoIterator<Item = KRef>,
2191 ) -> Vec<KRef>
2192 where
2193 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2194 V: DebugIfDevtoolsEnabled + Clone + 'static,
2195 KRef: Borrow<K>,
2196 {
2197 let keys = keys.into_iter().collect::<Vec<_>>();
2198 let key_hashes = keys.iter().map(|key| KeyHash::new(key.borrow())).collect::<Vec<_>>();
2199
2200 if key_hashes.is_empty() {
2201 return vec![];
2202 }
2203
2204 let mut scopes = self.scope_lookup.scopes_mut();
2205 let mut cbs_scopes = vec![];
2206 let results = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2207 &mut scopes,
2208 query_scope_info.cache_key,
2209 OnScopeMissing::Skip,
2210 |_| {},
2211 |maybe_scope, _| {
2212 let mut invalidated = vec![];
2213 if let Some(scope) = maybe_scope {
2214 for (key, key_hash) in keys.into_iter().zip(key_hashes.iter()) {
2215 if let Some(cached) = scope.get_mut_include_pending(key_hash) {
2216 let cb_scopes = cached.invalidate(QueryAbortReason::Invalidate);
2217 cbs_scopes.push(cb_scopes);
2218 invalidated.push(key);
2219 }
2220 }
2221 }
2222 invalidated
2223 },
2224 );
2225 let mut cbs_external = vec![];
2226 for cb in cbs_scopes {
2227 if let Some(cb_external) = cb(&mut scopes) {
2228 cbs_external.push(cb_external);
2229 }
2230 }
2231 drop(scopes);
2232 run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2233 results
2234 }
2235
2236 #[cfg(test)]
2237 #[track_caller]
2239 pub(crate) fn clear_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
2240 where
2241 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2242 V: DebugIfDevtoolsEnabled + Clone + 'static,
2243 {
2244 let mut scopes = self.scope_lookup.scopes_mut();
2245 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2246 let mut cbs_scopes = vec![];
2247 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2248 &mut scopes,
2249 query_scope_info.cache_key,
2250 OnScopeMissing::Skip,
2251 |_| {},
2252 |maybe_scope, _| {
2253 if let Some(scope) = maybe_scope {
2254 for cached in scope.all_queries_mut_include_pending() {
2255 let cb_scopes = cached.invalidate(QueryAbortReason::Clear);
2256 cbs_scopes.push(cb_scopes);
2257 }
2258 }
2259 },
2260 );
2261 let mut cbs_external = vec![];
2262 for cb in cbs_scopes {
2263 if let Some(cb_external) = cb(&mut scopes) {
2264 cbs_external.push(cb_external);
2265 }
2266 }
2267 drop(scopes);
2268 run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2269 }
2270
2271 #[track_caller]
2273 pub(crate) fn clear_query<K, V, M>(
2274 &self,
2275 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2276 key: impl Borrow<K>,
2277 ) -> bool
2278 where
2279 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2280 V: DebugIfDevtoolsEnabled + Clone + 'static,
2281 {
2282 let mut scopes = self.scope_lookup.scopes_mut();
2283 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2284 let mut cbs_scopes = vec![];
2285 let result = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2286 &mut scopes,
2287 query_scope_info.cache_key,
2288 OnScopeMissing::Skip,
2289 |_| {},
2290 |maybe_scope, _| {
2291 if let Some(scope) = maybe_scope {
2292 let key_hash = KeyHash::new(key.borrow());
2293 if let Some(cached) = scope.get_mut_include_pending(&key_hash) {
2294 let cb_scopes = cached.invalidate(QueryAbortReason::Clear);
2295 cbs_scopes.push(cb_scopes);
2296 }
2297 let removed = scope.remove_entry(&key_hash);
2298 scope.remove_entry(&key_hash);
2300 return removed.is_some();
2301 }
2302 false
2303 },
2304 );
2305 let mut cbs_external = vec![];
2306 for cb in cbs_scopes {
2307 if let Some(cb_external) = cb(&mut scopes) {
2308 cbs_external.push(cb_external);
2309 }
2310 }
2311 drop(scopes);
2312 run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2313 result
2314 }
2315
2316 pub(crate) fn query_metadata<K, V>(
2317 &self,
2318 scope_cache_key: ScopeCacheKey,
2319 key: impl Borrow<K>,
2320 ) -> Option<QueryMetadata>
2321 where
2322 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2323 V: DebugIfDevtoolsEnabled + Clone + 'static,
2324 {
2325 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2326 &mut self.scope_lookup.scopes_mut(),
2327 scope_cache_key,
2328 OnScopeMissing::Skip,
2329 |_| {},
2330 |maybe_scope, _| {
2331 if let Some(scope) = maybe_scope
2332 && let Some(cached) = scope.get(&KeyHash::new(key.borrow()))
2333 {
2334 return Some(QueryMetadata {
2335 updated_at: cached.updated_at,
2336 stale_or_invalidated: cached.stale_or_invalidated(),
2337 });
2338 }
2339 None
2340 },
2341 )
2342 }
2343
2344 pub async fn cached_or_fetch<K, V, T, FetcherFut>(
2345 &self,
2346 client_options: QueryOptions,
2347 scope_options: Option<QueryOptions>,
2348 maybe_buster_if_uncached: Option<ArcRwSignal<u64>>,
2349 query_scope_info: &QueryScopeInfo,
2350 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
2351 key: &K,
2352 fetcher: impl Fn(K) -> FetcherFut,
2353 return_cb: impl Fn(CachedOrFetchCbInput<K, V>) -> CachedOrFetchCbOutput<T>,
2354 maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
2355 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
2356 owner_chain: &OwnerChain,
2357 ) -> T
2358 where
2359 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2360 V: DebugIfDevtoolsEnabled + Clone + 'static,
2361 FetcherFut: Future<Output = MaybeLocal<V>>,
2362 {
2363 let scope_lookup = &self.scope_lookup;
2364 let key_hash = KeyHash::new(key);
2365 let mut cached_buster = None;
2366 let next_directive =
2367 scope_lookup.with_cached_query::<K, V, _>(&key_hash, &query_scope_info.cache_key, |maybe_cached| {
2368 if let Some(cached) = maybe_cached {
2369 cached_buster = Some(cached.buster.clone());
2370 return_cb(CachedOrFetchCbInput {
2371 cached,
2372 variant: CachedOrFetchCbInputVariant::CachedUntouched,
2373 })
2374 } else {
2375 CachedOrFetchCbOutput::Refetch
2376 }
2377 });
2378
2379 match next_directive {
2380 CachedOrFetchCbOutput::Return(value) => value,
2381 CachedOrFetchCbOutput::Refetch => {
2382 let fetcher_mutex = scope_lookup.fetcher_mutex::<K, V>(key_hash, query_scope_info);
2384 let _maybe_fetcher_mutex_guard_local = if maybe_preheld_fetcher_mutex_guard.is_none() {
2385 let _fetcher_guard = match fetcher_mutex.try_lock() {
2386 Some(fetcher_guard) => fetcher_guard,
2387 None => {
2388 let fetcher_guard = fetcher_mutex.lock().await;
2390 let next_directive = scope_lookup.with_cached_query::<K, V, _>(
2391 &key_hash,
2392 &query_scope_info.cache_key,
2393 |maybe_cached| {
2394 if let Some(cached) = maybe_cached {
2395 cached_buster = Some(cached.buster.clone());
2396 return_cb(CachedOrFetchCbInput {
2397 cached,
2398 variant: CachedOrFetchCbInputVariant::CachedUntouched,
2399 })
2400 } else {
2401 CachedOrFetchCbOutput::Refetch
2402 }
2403 },
2404 );
2405 match next_directive {
2406 CachedOrFetchCbOutput::Return(value) => return value,
2407 CachedOrFetchCbOutput::Refetch => fetcher_guard,
2408 }
2409 }
2410 };
2411 Some(_fetcher_guard)
2412 } else {
2413 None
2415 };
2416
2417 #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
2418 let before_time = chrono::Utc::now();
2419
2420 let loading_first_time = cached_buster.is_none();
2421
2422 #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
2423 {
2424 if loading_first_time {
2426 scope_lookup.client_subscriptions_mut().notify_query_created(
2427 crate::subs_client::QueryCreatedInfo {
2428 cache_key: query_scope_info.cache_key,
2429 scope_title: query_scope_info.title.clone(),
2430 key_hash,
2431 debug_key: crate::utils::DebugValue::new(key),
2432 combined_options: crate::options_combine(client_options, scope_options),
2433 },
2434 );
2435 }
2436 }
2437
2438 let maybe_local_key = lazy_maybe_local_key();
2439
2440 enum MaybeNewValue<V> {
2441 NewValue(V),
2442 SsrStreamedValueOverride,
2443 }
2444
2445 let maybe_new_value = scope_lookup
2446 .with_notify_fetching(
2447 query_scope_info.cache_key,
2448 key_hash,
2449 loading_first_time,
2450 async {
2452 loop {
2453 let query_abort_rx = scope_lookup.prepare_invalidation_channel::<K, V>(
2454 query_scope_info,
2455 key_hash,
2456 &maybe_local_key,
2457 );
2458
2459 let fut = owner_chain.with(|| fetcher(key.clone()));
2460
2461 futures::select_biased! {
2462 rx_result = query_abort_rx.fuse() => {
2463 if let Ok(reason) = rx_result {
2464 match reason {
2465 QueryAbortReason::Invalidate | QueryAbortReason::Clear => {},
2466 QueryAbortReason::SsrStreamedValueOverride => {
2467 break MaybeNewValue::SsrStreamedValueOverride;
2468 },
2469 }
2470 }
2471 },
2472 new_value = fut.fuse() => {
2473 break MaybeNewValue::NewValue(new_value);
2474 },
2475 }
2476 }
2477 },
2478 )
2479 .await;
2480
2481 #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
2482 let elapsed_ms = chrono::Utc::now().signed_duration_since(before_time).num_milliseconds();
2483
2484 let buster_if_uncached = if loading_first_time {
2485 Some(maybe_buster_if_uncached.unwrap_or_else(|| ArcRwSignal::new(new_buster_id())))
2486 } else {
2487 None
2488 };
2489
2490 let next_directive = scope_lookup.with_cached_scope_mut::<_, _, _, _>(
2491 &mut scope_lookup.scopes_mut(),
2492 query_scope_info.cache_key,
2493 OnScopeMissing::Create(query_scope_info),
2494 |_| {},
2495 |scope, _| {
2496 let scope = scope.expect("provided a default");
2497 match maybe_new_value {
2498 MaybeNewValue::NewValue(new_value) => {
2499 if let Some(cached) = scope.get_mut(&key_hash) {
2500 cached.set_value(
2501 new_value,
2502 true,
2503 #[cfg(any(
2504 all(debug_assertions, feature = "devtools"),
2505 feature = "devtools-always"
2506 ))]
2507 crate::events::Event::new(crate::events::EventVariant::Fetched { elapsed_ms }),
2508 ResetInvalidated::Reset,
2509 );
2510 return_cb(CachedOrFetchCbInput {
2511 cached,
2512 variant: CachedOrFetchCbInputVariant::CachedUpdated,
2513 })
2514 } else {
2515 scope.insert_without_query_created_notif(
2517 key_hash,
2518 Query::new(
2519 client_options,
2520 *self,
2521 query_scope_info,
2522 query_scope_info_for_new_query(),
2523 key_hash,
2524 maybe_local_key,
2525 new_value,
2526 buster_if_uncached.expect("loading_first_time means this is Some(). (bug)"),
2527 scope_options,
2528 None,
2529 #[cfg(any(
2530 all(debug_assertions, feature = "devtools"),
2531 feature = "devtools-always"
2532 ))]
2533 crate::events::Event::new(crate::events::EventVariant::Fetched {
2534 elapsed_ms,
2535 }),
2536 ),
2537 );
2538 return_cb(CachedOrFetchCbInput {
2539 cached: scope.get(&key_hash).expect("Just set. (bug)"),
2540 variant: CachedOrFetchCbInputVariant::Fresh,
2541 })
2542 }
2543 }
2544 MaybeNewValue::SsrStreamedValueOverride => return_cb(CachedOrFetchCbInput {
2545 cached: scope
2546 .get(&key_hash)
2547 .expect("Should contain value streamed from server. (bug)"),
2548 variant: CachedOrFetchCbInputVariant::Fresh,
2549 }),
2550 }
2551 },
2552 );
2553
2554 match next_directive {
2555 CachedOrFetchCbOutput::Refetch => {
2556 panic!("Unexpected refetch directive after providing fresh value. (bug)")
2557 }
2558 CachedOrFetchCbOutput::Return(return_value) => return_value,
2559 }
2560 }
2561 }
2562 }
2563}
2564
2565pub(crate) struct QueryMetadata {
2566 pub updated_at: DateTime<Utc>,
2567 pub stale_or_invalidated: bool,
2568}