1#![allow(ungated_async_fn_track_caller)] use std::{
4 borrow::Borrow,
5 collections::HashMap,
6 fmt::Debug,
7 hash::Hash,
8 sync::{Arc, LazyLock, atomic::AtomicBool},
9};
10
11use chrono::{DateTime, Utc};
12use codee::{Decoder, Encoder};
13use futures::{FutureExt, pin_mut};
14use leptos::{
15 prelude::{
16 ArcRwSignal, ArcSignal, Effect, Get, GetUntracked, LocalStorage, Owner, Read,
17 ReadUntracked, Set, Signal, Track, provide_context,
18 },
19 server::{
20 ArcLocalResource, ArcResource, FromEncodedStr, IntoEncodedString, LocalResource, Resource,
21 },
22};
23use send_wrapper::SendWrapper;
24
25use crate::{
26 ArcLocalSignal, QueryOptions,
27 cache::{
28 CachedOrFetchCbInput, CachedOrFetchCbInputVariant, CachedOrFetchCbOutput, OnScopeMissing,
29 },
30 cache_scope::{QueryAbortReason, QueryOrPending},
31 debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
32 maybe_local::MaybeLocal,
33 query::Query,
34 query_maybe_key::QueryMaybeKey,
35 query_scope::{
36 QueryScopeInfo, QueryScopeLocalTrait, QueryScopeQueryInfo, QueryScopeTrait, ScopeCacheKey,
37 },
38 resource_drop_guard::ResourceDropGuard,
39 utils::{
40 KeyHash, OwnerChain, ResetInvalidated, new_buster_id, new_resource_id,
41 run_external_callbacks,
42 },
43};
44
45use super::cache::ScopeLookup;
46
47#[cfg(not(feature = "rkyv"))]
48pub(crate) type DefaultCodec = codee::string::JsonSerdeCodec;
49
50#[cfg(feature = "rkyv")]
51pub(crate) type DefaultCodec = codee::binary::RkyvCodec;
52
53task_local::task_local! {
54 pub(crate) static ASYNC_TRACK_UPDATE_MARKER: Arc<AtomicBool>;
55}
56
57std::thread_local! {
58 pub(crate) static SYNC_TRACK_UPDATE_MARKER: AtomicBool = const { AtomicBool::new(true) };
59}
60
61pub struct QueryClient<Codec = DefaultCodec> {
84 pub(crate) untyped_client: UntypedQueryClient,
85 _ser: std::marker::PhantomData<SendWrapper<Codec>>,
86}
87
88#[derive(Clone, Copy)]
91pub(crate) struct UntypedQueryClient {
92 pub(crate) scope_lookup: ScopeLookup,
93 options: QueryOptions,
94 created_at: DateTime<Utc>,
95}
96
97impl<Codec> Debug for QueryClient<Codec> {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("QueryClient")
100 .field("scope_lookup", &self.untyped_client.scope_lookup)
101 .field("options", &self.untyped_client.options)
102 .field("codec", &std::any::type_name::<Codec>())
103 .finish()
104 }
105}
106
107impl<Codec: 'static> Clone for QueryClient<Codec> {
108 fn clone(&self) -> Self {
109 *self
110 }
111}
112
113impl<Codec: 'static> Copy for QueryClient<Codec> {}
114
115impl Default for QueryClient<DefaultCodec> {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121impl QueryClient<DefaultCodec> {
122 #[track_caller]
128 pub fn new() -> Self {
129 Self {
130 untyped_client: UntypedQueryClient {
131 scope_lookup: ScopeLookup::new(),
132 options: QueryOptions::default(),
133 created_at: Utc::now(),
134 },
135 _ser: std::marker::PhantomData,
136 }
137 }
138}
139
140impl<Codec: 'static> QueryClient<Codec> {
141 #[track_caller]
151 pub fn provide(self) -> Self {
152 provide_context(self);
153 self
154 }
155
156 #[track_caller]
186 pub fn set_codec<NewCodec>(self) -> QueryClient<NewCodec> {
187 QueryClient {
188 untyped_client: self.untyped_client,
189 _ser: std::marker::PhantomData,
190 }
191 }
192
193 #[track_caller]
197 pub fn with_options(mut self, options: QueryOptions) -> Self {
198 self.untyped_client.options = options;
199 self
200 }
201
202 #[track_caller]
206 pub fn options(&self) -> QueryOptions {
207 self.untyped_client.options
208 }
209
210 #[track_caller]
215 pub fn with_refetch_enabled_toggle(self, refetch_enabled: impl Into<ArcSignal<bool>>) -> Self {
216 self.untyped_client
217 .scope_lookup
218 .scopes_mut()
219 .refetch_enabled = Some(refetch_enabled.into());
220 self
221 }
222
223 #[track_caller]
225 pub fn refetch_enabled(&self) -> Option<ArcSignal<bool>> {
226 self.untyped_client
227 .scope_lookup
228 .scopes()
229 .refetch_enabled
230 .clone()
231 }
232
233 #[track_caller]
237 pub fn local_resource<K, MaybeKey, V, M>(
238 &self,
239 query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
240 keyer: impl Fn() -> MaybeKey + 'static,
241 ) -> LocalResource<MaybeKey::MappedValue>
242 where
243 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
244 MaybeKey: QueryMaybeKey<K, V>,
245 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
246 V: DebugIfDevtoolsEnabled + Clone + 'static,
247 {
248 self.arc_local_resource(query_scope, keyer).into()
249 }
250
251 #[track_caller]
255 pub fn arc_local_resource<K, MaybeKey, V, M>(
256 &self,
257 query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
258 keyer: impl Fn() -> MaybeKey + 'static,
259 ) -> ArcLocalResource<MaybeKey::MappedValue>
260 where
261 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
262 MaybeKey: QueryMaybeKey<K, V>,
263 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
264 V: DebugIfDevtoolsEnabled + Clone + 'static,
265 {
266 let client = *self;
267 let client_options = self.options();
268 let cache_key = query_scope.cache_key();
269 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
270 let query_scope = Arc::new(query_scope);
271 let query_options = query_scope.options();
272 let resource_id = new_resource_id();
273
274 let drop_guard = ResourceDropGuard::<K, V>::new(
276 self.untyped_client.scope_lookup,
277 resource_id,
278 cache_key,
279 );
280
281 ArcLocalResource::new({
282 move || {
283 let query_scope = query_scope.clone();
284 let query_scope_info = query_scope_info.clone();
285 let maybe_key = keyer().into_maybe_key();
286 let drop_guard = drop_guard.clone();
287 if let Some(key) = maybe_key.as_ref() {
288 drop_guard.set_active_key(KeyHash::new(key));
289 }
290 let owner_chain = OwnerChain::new(
294 client.untyped_client,
295 query_scope_info.cache_key,
296 Owner::current(),
297 );
298 async move {
299 if let Some(key) = maybe_key {
300 let query_scope_query_info =
301 || QueryScopeQueryInfo::new_local(&query_scope, &key);
302 let value = client
303 .untyped_client
304 .cached_or_fetch(
305 client_options,
306 query_options,
307 None,
308 &query_scope_info,
309 query_scope_query_info,
310 &key,
311 {
312 let query_scope = query_scope.clone();
313 move |key| {
314 let query_scope = query_scope.clone();
315
316 async move {
317 MaybeLocal::new_local(query_scope.query(key).await)
318 }
319 }
320 },
321 |info| {
322 info.cached.buster.track();
323 info.cached.mark_resource_active(resource_id);
324 match info.variant {
325 CachedOrFetchCbInputVariant::CachedUntouched => {
326 if cfg!(any(test, not(feature = "ssr")))
328 && info.cached.stale_or_invalidated()
329 {
330 let key = key.clone();
331 let query_scope = query_scope.clone();
332 let owner_chain = owner_chain.clone();
333 leptos::task::spawn(SendWrapper::new(async move {
335 client
336 .prefetch_inner(
337 QueryScopeInfo::new_local(&query_scope),
338 || {
339 QueryScopeQueryInfo::new_local(
340 &query_scope,
341 &key,
342 )
343 },
344 async |key| {
345 MaybeLocal::new_local(
346 query_scope.query(key).await,
347 )
348 },
349 key.borrow(),
350 || {
351 MaybeLocal::new_local(
352 key.borrow().clone(),
353 )
354 },
355 &owner_chain,
356 )
357 .await;
358 }));
359 }
360 }
361 CachedOrFetchCbInputVariant::Fresh => {}
362 CachedOrFetchCbInputVariant::CachedUpdated => {
363 panic!("Didn't direct inner to refetch here. (bug)")
364 }
365 }
366 CachedOrFetchCbOutput::Return(
367 info.cached.value_maybe_stale().value_may_panic().clone(),
369 )
370 },
371 None,
372 || MaybeLocal::new_local(key.clone()),
373 &owner_chain,
374 )
375 .await;
376 MaybeKey::prepare_mapped_value(Some(value))
377 } else {
378 MaybeKey::prepare_mapped_value(None)
379 }
380 }
381 }
382 })
383 }
384
385 #[track_caller]
393 pub fn resource<K, MaybeKey, V, M>(
394 &self,
395 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
396 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
397 ) -> Resource<MaybeKey::MappedValue, Codec>
398 where
399 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
400 MaybeKey: QueryMaybeKey<K, V>,
401 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
402 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
403 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
404 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
405 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
406 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
407 Debug,
408 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
409 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
410 {
411 self.arc_resource_with_options(query_scope, keyer, false)
412 .into()
413 }
414
415 #[track_caller]
423 pub fn resource_blocking<K, MaybeKey, V, M>(
424 &self,
425 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
426 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
427 ) -> Resource<MaybeKey::MappedValue, Codec>
428 where
429 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
430 MaybeKey: QueryMaybeKey<K, V>,
431 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
432 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
433 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
434 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
435 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
436 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
437 Debug,
438 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
439 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
440 {
441 self.arc_resource_with_options(query_scope, keyer, true)
442 .into()
443 }
444
445 #[track_caller]
453 pub fn arc_resource<K, MaybeKey, V, M>(
454 &self,
455 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
456 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
457 ) -> ArcResource<MaybeKey::MappedValue, Codec>
458 where
459 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
460 MaybeKey: QueryMaybeKey<K, V>,
461 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
462 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
463 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
464 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
465 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
466 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
467 Debug,
468 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
469 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
470 {
471 self.arc_resource_with_options(query_scope, keyer, false)
472 }
473
474 #[track_caller]
482 pub fn arc_resource_blocking<K, MaybeKey, V, M>(
483 &self,
484 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
485 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
486 ) -> ArcResource<MaybeKey::MappedValue, Codec>
487 where
488 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
489 MaybeKey: QueryMaybeKey<K, V>,
490 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
491 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
492 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
493 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
494 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
495 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
496 Debug,
497 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
498 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
499 {
500 self.arc_resource_with_options(query_scope, keyer, true)
501 }
502
503 #[track_caller]
504 fn arc_resource_with_options<K, MaybeKey, V, M>(
505 &self,
506 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
507 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
508 blocking: bool,
509 ) -> ArcResource<MaybeKey::MappedValue, Codec>
510 where
511 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
512 MaybeKey: QueryMaybeKey<K, V>,
513 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
514 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
515 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
516 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
517 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
518 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
519 Debug,
520 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
521 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
522 {
523 let client = *self;
524 let client_options = self.options();
525 let cache_key = query_scope.cache_key();
526 let query_scope_info = QueryScopeInfo::new(&query_scope);
527 let query_scope = Arc::new(query_scope);
528 let scope_lookup = self.untyped_client.scope_lookup;
529 let query_options = query_scope.options();
530
531 let buster_if_uncached = ArcRwSignal::new(new_buster_id());
532 let resource_id = new_resource_id();
533
534 let drop_guard = ResourceDropGuard::<K, V>::new(
536 self.untyped_client.scope_lookup,
537 resource_id,
538 cache_key,
539 );
540
541 let keyer = Arc::new(keyer);
542 let resource =
543 ArcResource::new_with_options(
544 {
545 let buster_if_uncached = buster_if_uncached.clone();
546 let drop_guard = drop_guard.clone();
547 let keyer = keyer.clone();
548 move || {
549 if let Some(key) = keyer().into_maybe_key() {
550 let key_hash = KeyHash::new(&key);
551 drop_guard.set_active_key(key_hash);
552 scope_lookup.with_cached_query::<K, V, _>(
553 &key_hash,
554 &cache_key,
555 |maybe_cached| {
556 if let Some(cached) = maybe_cached {
557 (Some(key.clone()), cached.buster.get())
559 } else {
560 (Some(key.clone()), buster_if_uncached.get())
562 }
563 },
564 )
565 } else {
566 (None, buster_if_uncached.get())
567 }
568 }
569 },
570 {
571 let buster_if_uncached = buster_if_uncached.clone();
572 let query_scope_info = query_scope_info.clone();
573 let query_scope = query_scope.clone();
574 move |(maybe_key, last_used_buster)| {
575 let query_scope = query_scope.clone();
576 let query_scope_info = query_scope_info.clone();
577 let buster_if_uncached = buster_if_uncached.clone();
578 let _drop_guard = drop_guard.clone(); let owner_chain = OwnerChain::new(
583 client.untyped_client,
584 query_scope_info.cache_key,
585 Owner::current(),
586 );
587 async move {
588 if let Some(key) = maybe_key {
589 let query_scope_query_info =
590 || QueryScopeQueryInfo::new(&query_scope, &key);
591 let value =
592 client
593 .untyped_client
594 .cached_or_fetch(
595 client_options,
596 query_options,
597 Some(buster_if_uncached.clone()),
598 &query_scope_info,
599 query_scope_query_info,
600 &key,
601 {
602 let query_scope = query_scope.clone();
603 move |key| {
604 let query_scope = query_scope.clone();
605 async move {
606 MaybeLocal::new(
607 query_scope.query(key).await,
608 )
609 }
610 }
611 },
612 |info| {
613 info.cached.mark_resource_active(resource_id);
614 match info.variant {
615 CachedOrFetchCbInputVariant::CachedUntouched => {
616 if cfg!(any(test, not(feature = "ssr")))
618 && info.cached.stale_or_invalidated()
619 {
620 let key = key.clone();
621 let query_scope = query_scope.clone();
622 let owner_chain = owner_chain.clone();
623
624 leptos::task::spawn(async move {
625 client
626 .prefetch_inner(
627 QueryScopeInfo::new(&query_scope),
628 || {
629 QueryScopeQueryInfo::new(
630 &query_scope,
631 &key,
632 )
633 },
634 {
635 let query_scope =
636 query_scope.clone();
637 move |key| {
638 let query_scope = query_scope.clone();
639
640 async move {
641 MaybeLocal::new(
642 query_scope
643 .query(key)
644 .await,
645 )
646 }}
647 },
648 key.borrow(),
649 || {
650 MaybeLocal::new(
651 key.borrow().clone(),
652 )
653 },
654 &owner_chain,
655 )
656 .await;
657 });
658 }
659
660 if last_used_buster
664 != info.cached.buster.get_untracked()
665 {
666 buster_if_uncached
667 .set(info.cached.buster.get_untracked());
668 }
669 }
670 CachedOrFetchCbInputVariant::Fresh => {}
671 CachedOrFetchCbInputVariant::CachedUpdated => {
672 panic!("Didn't direct inner to refetch here. (bug)")
673 }
674 }
675 CachedOrFetchCbOutput::Return(
676 info.cached
678 .value_maybe_stale()
679 .value_may_panic()
680 .clone(),
681 )
682 },
683 None,
684 || MaybeLocal::new(key.clone()),
685 &owner_chain,
686 )
687 .await;
688 MaybeKey::prepare_mapped_value(Some(value))
689 } else {
690 MaybeKey::prepare_mapped_value(None)
691 }
692 }
693 }
694 },
695 blocking,
696 );
697
698 let effect = {
701 let resource = resource.clone();
702 let buster_if_uncached = buster_if_uncached.clone();
703 let client_created_at = self.untyped_client.created_at;
704 move |complete: Option<Option<()>>| {
706 if let Some(Some(())) = complete {
707 return Some(());
708 }
709 if let Some(val) = resource.read().as_ref() {
710 if MaybeKey::mapped_value_is_some(val) {
711 scope_lookup.with_cached_scope_mut::<K, V, _, _>(
712 &mut scope_lookup.scopes_mut(),
713 query_scope_info.cache_key,
714 OnScopeMissing::Create(&query_scope_info),
715 |_| {},
716 |maybe_scope, _| {
717 let scope = maybe_scope.expect("provided a default");
718 if let Some(key) = keyer().into_maybe_key() {
719 let query_scope_query_info = || QueryScopeQueryInfo::new(&query_scope, &key);
720 let key_hash = KeyHash::new(&key);
721
722 if (cfg!(test) || cfg!(not(feature = "ssr"))) &&
732 (Utc::now() - client_created_at).num_seconds() < 10
734 {
735 use parking_lot::Mutex;
736
737 static ALREADY_SEEN: LazyLock<Mutex<HashMap<(u64, ScopeCacheKey, KeyHash), ()>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
738
739 let key = (scope_lookup.scope_id, query_scope_info.cache_key, key_hash);
740 let mut guard = ALREADY_SEEN.lock();
741 if guard.contains_key(&key) {
742 return;
743 }
744 guard.insert(key, ());
745 }
746
747 let mut was_pending = false;
748 if let Some(QueryOrPending::Pending { query_abort_tx, .. }) = scope.get_mut_include_pending(&key_hash)
751 && let Some(tx) = query_abort_tx.take() {
752 tx.send(QueryAbortReason::SsrStreamedValueOverride).unwrap();
753 was_pending = true;
754 }
755
756 if was_pending || !scope.contains_key(&key_hash) {
757 let query = Query::new(
758 client_options,
759 client.untyped_client,
760 &query_scope_info,
761 query_scope_query_info(),
762 key_hash,
763 MaybeLocal::new(key),
764 MaybeLocal::new(MaybeKey::mapped_to_maybe_value(val.clone()).expect("Just checked MaybeKey::mapped_value_is_some() is true")),
765 buster_if_uncached.clone(),
766 query_options,
767 None,
768 #[cfg(any(
769 all(debug_assertions, feature = "devtools"),
770 feature = "devtools-always"
771 ))]
772 crate::events::Event::new(
773 crate::events::EventVariant::StreamedFromServer,
774 ),
775 );
776 scope.insert(key_hash, query);
777 }
778 scope
779 .get(&key_hash)
780 .expect("Just inserted")
781 .mark_resource_active(resource_id)
782 }
783 },
784 );
785 }
786 Some(())
787 } else {
788 None
789 }
790 }
791 };
792 if cfg!(test) {
794 let effect = SendWrapper::new(effect);
797 #[allow(clippy::redundant_closure)]
798 Effect::new_isomorphic(move |v| effect(v));
799 } else {
800 Effect::new(effect);
801 }
802
803 resource
804 }
805
806 pub fn untrack_update_query(&self) {
820 SYNC_TRACK_UPDATE_MARKER.with(|marker| {
822 marker.store(false, std::sync::atomic::Ordering::Relaxed);
823 });
824 let _ = ASYNC_TRACK_UPDATE_MARKER.try_with(|marker| {
826 marker.store(false, std::sync::atomic::Ordering::Relaxed);
827 });
828 }
829
830 #[track_caller]
838 pub async fn prefetch_query<K, V, M>(
839 &self,
840 query_scope: impl QueryScopeTrait<K, V, M>,
841 key: impl Borrow<K>,
842 ) where
843 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
844 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
845 {
846 let query_scope_info = QueryScopeInfo::new(&query_scope);
847 let owner_chain = OwnerChain::new(
848 self.untyped_client,
849 query_scope_info.cache_key,
850 Owner::current(),
851 );
852 self.prefetch_inner(
853 query_scope_info,
854 || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
855 async |key| MaybeLocal::new(query_scope.query(key).await),
856 key.borrow(),
857 || MaybeLocal::new(key.borrow().clone()),
858 &owner_chain,
859 )
860 .await
861 }
862
863 #[track_caller]
871 pub async fn prefetch_query_local<K, V, M>(
872 &self,
873 query_scope: impl QueryScopeLocalTrait<K, V, M>,
874 key: impl Borrow<K>,
875 ) where
876 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
877 V: DebugIfDevtoolsEnabled + Clone + 'static,
878 {
879 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
880 let owner_chain = OwnerChain::new(
881 self.untyped_client,
882 query_scope_info.cache_key,
883 Owner::current(),
884 );
885 self.prefetch_inner(
886 query_scope_info,
887 || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
888 async |key| MaybeLocal::new_local(query_scope.query(key).await),
889 key.borrow(),
890 || MaybeLocal::new_local(key.borrow().clone()),
891 &owner_chain,
892 )
893 .await
894 }
895
896 #[track_caller]
897 async fn prefetch_inner<K, V, FetcherFut>(
898 &self,
899 query_scope_info: QueryScopeInfo,
900 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
901 fetcher: impl Fn(K) -> FetcherFut,
902 key: &K,
903 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
904 owner_chain: &OwnerChain,
905 ) where
906 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
907 V: DebugIfDevtoolsEnabled + Clone + 'static,
908 FetcherFut: Future<Output = MaybeLocal<V>>,
909 {
910 self.untyped_client
911 .cached_or_fetch(
912 self.options(),
913 query_scope_info.options,
914 None,
915 &query_scope_info,
916 query_scope_info_for_new_query,
917 key,
918 fetcher,
919 |info| {
920 match info.variant {
921 CachedOrFetchCbInputVariant::CachedUntouched => {
922 if info.cached.stale_or_invalidated() {
923 return CachedOrFetchCbOutput::Refetch;
924 }
925 }
926 CachedOrFetchCbInputVariant::CachedUpdated => {
927 info.cached.buster.set(new_buster_id());
929 }
930 CachedOrFetchCbInputVariant::Fresh => {}
931 }
932 CachedOrFetchCbOutput::Return(())
933 },
934 None,
935 lazy_maybe_local_key,
936 owner_chain,
937 )
938 .await;
939 }
940
941 #[track_caller]
951 pub async fn fetch_query<K, V, M>(
952 &self,
953 query_scope: impl QueryScopeTrait<K, V, M>,
954 key: impl Borrow<K>,
955 ) -> V
956 where
957 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
958 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
959 {
960 self.untyped_client.fetch_query(query_scope, key).await
961 }
962
963 #[track_caller]
973 pub async fn fetch_query_local<K, V, M>(
974 &self,
975 query_scope: impl QueryScopeLocalTrait<K, V, M>,
976 key: impl Borrow<K>,
977 ) -> V
978 where
979 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
980 V: DebugIfDevtoolsEnabled + Clone + 'static,
981 {
982 self.untyped_client
983 .fetch_query_local(query_scope, key)
984 .await
985 }
986
987 #[track_caller]
992 pub fn set_query<K, V, M>(
993 &self,
994 query_scope: impl QueryScopeTrait<K, V, M>,
995 key: impl Borrow<K>,
996 new_value: V,
997 ) where
998 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
999 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1000 {
1001 self.set_inner(
1002 QueryScopeInfo::new(&query_scope),
1003 || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
1004 key.borrow(),
1005 MaybeLocal::new(new_value),
1006 || MaybeLocal::new(key.borrow().clone()),
1007 true,
1008 ResetInvalidated::NoReset,
1011 )
1012 }
1013
1014 #[track_caller]
1019 pub fn set_query_local<K, V, M>(
1020 &self,
1021 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1022 key: impl Borrow<K>,
1023 new_value: V,
1024 ) where
1025 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1026 V: DebugIfDevtoolsEnabled + Clone + 'static,
1027 {
1028 self.set_inner::<K, V>(
1029 QueryScopeInfo::new_local(&query_scope),
1030 || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
1031 key.borrow(),
1032 MaybeLocal::new_local(new_value),
1033 || MaybeLocal::new_local(key.borrow().clone()),
1034 true,
1035 ResetInvalidated::NoReset,
1038 )
1039 }
1040
1041 #[track_caller]
1042 fn set_inner<K, V>(
1043 &self,
1044 query_scope_info: QueryScopeInfo,
1045 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
1046 key: &K,
1047 new_value: MaybeLocal<V>,
1048 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
1049 track: bool,
1050 reset_invalidated: ResetInvalidated,
1051 ) where
1052 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1053 V: DebugIfDevtoolsEnabled + Clone + 'static,
1054 {
1055 let key_hash = KeyHash::new(key.borrow());
1056 self.untyped_client
1057 .scope_lookup
1058 .with_cached_scope_mut::<K, V, _, _>(
1059 &mut self.untyped_client.scope_lookup.scopes_mut(),
1060 query_scope_info.cache_key,
1061 OnScopeMissing::Create(&query_scope_info),
1062 |_| {},
1063 |maybe_scope, _| {
1064 let scope = maybe_scope.expect("provided a default");
1065
1066 let maybe_cached = if !new_value.is_local() {
1068 if let Some(threadsafe_existing) = scope.get_mut_threadsafe_only(&key_hash)
1069 {
1070 Some(threadsafe_existing)
1071 } else {
1072 scope.get_mut_local_only(&key_hash)
1073 }
1074 } else {
1075 scope.get_mut_local_only(&key_hash)
1076 };
1077
1078 if let Some(cached) = maybe_cached {
1079 cached.set_value(
1080 new_value,
1081 track,
1082 #[cfg(any(
1083 all(debug_assertions, feature = "devtools"),
1084 feature = "devtools-always"
1085 ))]
1086 crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1087 reset_invalidated,
1088 );
1089 } else {
1090 let query = Query::new(
1091 self.options(),
1092 self.untyped_client,
1093 &query_scope_info,
1094 query_scope_info_for_new_query(),
1095 key_hash,
1096 lazy_maybe_local_key(),
1097 new_value,
1098 ArcRwSignal::new(new_buster_id()),
1099 query_scope_info.options,
1100 None,
1101 #[cfg(any(
1102 all(debug_assertions, feature = "devtools"),
1103 feature = "devtools-always"
1104 ))]
1105 crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1106 );
1107 scope.insert(key_hash, query);
1108 }
1109 },
1110 );
1111 }
1112
1113 #[track_caller]
1126 pub fn update_query<K, V, T, M>(
1127 &self,
1128 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1129 key: impl Borrow<K>,
1130 modifier: impl FnOnce(Option<&mut V>) -> T,
1131 ) -> T
1132 where
1133 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1134 V: DebugIfDevtoolsEnabled + Clone + 'static,
1135 {
1136 self.update_query_inner::<K, V, T>(
1137 &QueryScopeInfo::new_local(&query_scope),
1138 key.borrow(),
1139 ResetInvalidated::NoReset,
1140 modifier,
1141 )
1142 }
1143
1144 #[track_caller]
1145 fn update_query_inner<K, V, T>(
1146 &self,
1147 query_scope_info: &QueryScopeInfo,
1148 key: &K,
1149 reset_invalidated: ResetInvalidated,
1150 modifier: impl FnOnce(Option<&mut V>) -> T,
1151 ) -> T
1152 where
1153 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1154 V: DebugIfDevtoolsEnabled + Clone + 'static,
1155 {
1156 let key_hash = KeyHash::new(key);
1157 let mut modifier_holder = Some(modifier);
1158
1159 let maybe_return_value = self
1160 .untyped_client
1161 .scope_lookup
1162 .with_cached_scope_mut::<K, V, _, _>(
1163 &mut self.untyped_client.scope_lookup.scopes_mut(),
1164 query_scope_info.cache_key,
1165 OnScopeMissing::Skip,
1166 |_| {},
1167 |maybe_scope, _| {
1168 if let Some(scope) = maybe_scope
1169 && let Some(cached) = scope.get_mut(&key_hash)
1170 {
1171 let modifier = modifier_holder
1172 .take()
1173 .expect("Should never be used more than once. (bug)");
1174 let return_value = cached.update_value(
1175 |value| modifier(Some(value.value_mut_may_panic())),
1177 #[cfg(any(
1178 all(debug_assertions, feature = "devtools"),
1179 feature = "devtools-always"
1180 ))]
1181 crate::events::Event::new(
1182 crate::events::EventVariant::DeclarativeUpdate,
1183 ),
1184 reset_invalidated,
1185 );
1186 return Some(return_value);
1187 }
1188 None
1189 },
1190 );
1191 if let Some(return_value) = maybe_return_value {
1192 return_value
1193 } else {
1194 modifier_holder
1196 .take()
1197 .expect("Should never be used more than once. (bug)")(None)
1198 }
1199 }
1200
1201 #[track_caller]
1212 pub async fn update_query_async<'a, K, V, T, M>(
1213 &'a self,
1214 query_scope: impl QueryScopeTrait<K, V, M>,
1215 key: impl Borrow<K>,
1216 mapper: impl AsyncFnOnce(&mut V) -> T,
1217 ) -> T
1218 where
1219 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
1220 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1221 {
1222 let query_scope_info = QueryScopeInfo::new(&query_scope);
1223 let owner_chain = OwnerChain::new(
1224 self.untyped_client,
1225 query_scope_info.cache_key,
1226 Owner::current(),
1227 );
1228 self.update_query_async_inner(
1229 query_scope_info,
1230 || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
1231 async |key| MaybeLocal::new(query_scope.query(key).await),
1232 key.borrow(),
1233 mapper,
1234 MaybeLocal::new,
1235 || MaybeLocal::new(key.borrow().clone()),
1236 &owner_chain,
1237 )
1238 .await
1239 }
1240
1241 #[track_caller]
1252 pub async fn update_query_async_local<'a, K, V, T, M>(
1253 &'a self,
1254 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1255 key: impl Borrow<K>,
1256 mapper: impl AsyncFnOnce(&mut V) -> T,
1257 ) -> T
1258 where
1259 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1260 V: DebugIfDevtoolsEnabled + Clone + 'static,
1261 {
1262 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
1263 let owner_chain = OwnerChain::new(
1264 self.untyped_client,
1265 query_scope_info.cache_key,
1266 Owner::current(),
1267 );
1268 self.update_query_async_inner(
1269 query_scope_info,
1270 || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
1271 async |key| MaybeLocal::new_local(query_scope.query(key).await),
1272 key.borrow(),
1273 mapper,
1274 MaybeLocal::new_local,
1275 || MaybeLocal::new_local(key.borrow().clone()),
1276 &owner_chain,
1277 )
1278 .await
1279 }
1280
1281 #[track_caller]
1282 async fn update_query_async_inner<'a, K, V, T, FetcherFut>(
1283 &'a self,
1284 query_scope_info: QueryScopeInfo,
1285 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
1286 fetcher: impl Fn(K) -> FetcherFut,
1287 key: &K,
1288 mapper: impl AsyncFnOnce(&mut V) -> T,
1289 into_maybe_local: impl FnOnce(V) -> MaybeLocal<V>,
1290 lazy_maybe_local_key: impl Fn() -> MaybeLocal<K>,
1291 owner_chain: &OwnerChain,
1292 ) -> T
1293 where
1294 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1295 V: DebugIfDevtoolsEnabled + Clone + 'static,
1296 FetcherFut: Future<Output = MaybeLocal<V>>,
1297 {
1298 let key_hash = KeyHash::new(key.borrow());
1299
1300 let fetcher_mutex = self
1302 .untyped_client
1303 .scope_lookup
1304 .fetcher_mutex::<K, V>(key_hash, &query_scope_info);
1305 let fetcher_guard = fetcher_mutex.lock().await;
1306
1307 let mut new_value = self
1308 .untyped_client
1309 .fetch_inner(
1310 query_scope_info.clone(),
1311 &query_scope_info_for_new_query,
1312 fetcher,
1313 key.borrow(),
1314 Some(&fetcher_guard),
1315 &lazy_maybe_local_key,
1316 owner_chain,
1317 )
1318 .await;
1319
1320 self.untyped_client
1322 .scope_lookup
1323 .with_notify_fetching(query_scope_info.cache_key, key_hash, false, async {
1324 let track = Arc::new(AtomicBool::new(true));
1325
1326 let query_abort_rx = self
1328 .untyped_client
1329 .scope_lookup
1330 .prepare_invalidation_channel::<K, V>(
1331 &query_scope_info,
1332 key_hash,
1333 &lazy_maybe_local_key(),
1334 );
1335
1336 let result_fut = ASYNC_TRACK_UPDATE_MARKER
1337 .scope(track.clone(), async { mapper(&mut new_value).await });
1338
1339 let mut cleared_during_user_fn = false;
1346 let result = {
1347 pin_mut!(result_fut);
1348 pin_mut!(query_abort_rx);
1349 match futures::future::select(result_fut, query_abort_rx).await {
1350 futures::future::Either::Left((result, _query_abort_rx)) => result,
1351 futures::future::Either::Right((query_abort_reason, result_fut)) => {
1352 if let Ok(QueryAbortReason::Clear) = query_abort_reason {
1353 cleared_during_user_fn = true;
1355 }
1356 result_fut.await
1357 }
1358 }
1359 };
1360
1361 if !cleared_during_user_fn {
1363 let mut new_value = Some(new_value);
1371 let updated = self.update_query_inner::<K, V, _>(
1372 &query_scope_info,
1373 key,
1374 ResetInvalidated::NoReset,
1376 |value| {
1377 if let Some(value) = value {
1378 if !track.load(std::sync::atomic::Ordering::Relaxed) {
1380 self.untrack_update_query();
1381 }
1382
1383 *value = new_value.take().expect("Should be Some");
1384 true
1385 } else {
1386 false
1387 }
1388 },
1389 );
1390 if !updated {
1391 self.set_inner::<K, V>(
1392 query_scope_info,
1393 query_scope_info_for_new_query,
1394 key.borrow(),
1395 into_maybe_local(
1396 new_value
1397 .take()
1398 .expect("Should be Some, should only be here if not updated"),
1399 ),
1400 lazy_maybe_local_key,
1401 track.load(std::sync::atomic::Ordering::Relaxed),
1402 ResetInvalidated::NoReset,
1404 );
1405 }
1406 }
1407
1408 result
1409 })
1410 .await
1411 }
1412
1413 #[track_caller]
1415 pub fn get_cached_query<K, V, M>(
1416 &self,
1417 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1418 key: impl Borrow<K>,
1419 ) -> Option<V>
1420 where
1421 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1422 V: DebugIfDevtoolsEnabled + Clone + 'static,
1423 {
1424 self.untyped_client
1425 .scope_lookup
1426 .with_cached_query::<K, V, _>(
1427 &KeyHash::new(key.borrow()),
1428 &query_scope.cache_key(),
1429 |maybe_cached| {
1430 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1432 },
1433 )
1434 }
1435
1436 #[track_caller]
1440 pub fn query_exists<K, V, M>(
1441 &self,
1442 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1443 key: impl Borrow<K>,
1444 ) -> bool
1445 where
1446 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1447 V: DebugIfDevtoolsEnabled + 'static,
1448 {
1449 let key_hash = KeyHash::new(key.borrow());
1450 self.untyped_client
1451 .scope_lookup
1452 .with_cached_query::<K, V, _>(&key_hash, &query_scope.cache_key(), |maybe_cached| {
1453 maybe_cached.is_some()
1454 })
1455 }
1456
1457 #[track_caller]
1468 pub fn subscribe_is_loading<K, MaybeKey, V, M>(
1469 &self,
1470 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1471 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1472 ) -> Signal<bool>
1473 where
1474 K: Hash + Send + Sync + 'static,
1475 MaybeKey: QueryMaybeKey<K, V>,
1476 MaybeKey::MappedValue: 'static,
1477 V: 'static,
1478 {
1479 self.subscribe_is_loading_arc(query_scope, keyer).into()
1480 }
1481
1482 #[track_caller]
1493 pub fn subscribe_is_loading_local<K, MaybeKey, V, M>(
1494 &self,
1495 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1496 keyer: impl Fn() -> MaybeKey + 'static,
1497 ) -> Signal<bool>
1498 where
1499 K: Hash + 'static,
1500 MaybeKey: QueryMaybeKey<K, V>,
1501 MaybeKey::MappedValue: 'static,
1502 V: 'static,
1503 {
1504 self.subscribe_is_loading_arc_local(query_scope, keyer)
1505 .into()
1506 }
1507
1508 #[track_caller]
1519 pub fn subscribe_is_loading_arc_local<K, MaybeKey, V, M>(
1520 &self,
1521 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1522 keyer: impl Fn() -> MaybeKey + 'static,
1523 ) -> ArcSignal<bool>
1524 where
1525 K: Hash + 'static,
1526 MaybeKey: QueryMaybeKey<K, V>,
1527 MaybeKey::MappedValue: 'static,
1528 V: 'static,
1529 {
1530 let keyer = SendWrapper::new(keyer);
1531 self.untyped_client
1532 .scope_lookup
1533 .scope_subscriptions_mut()
1534 .add_is_loading_subscription(
1535 query_scope.cache_key(),
1536 MaybeLocal::new_local(ArcSignal::derive(move || {
1537 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1538 })),
1539 )
1540 }
1541
1542 #[track_caller]
1553 pub fn subscribe_is_loading_arc<K, MaybeKey, V, M>(
1554 &self,
1555 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1556 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1557 ) -> ArcSignal<bool>
1558 where
1559 K: Hash + Send + Sync + 'static,
1560 MaybeKey: QueryMaybeKey<K, V>,
1561 MaybeKey::MappedValue: 'static,
1562 V: 'static,
1563 {
1564 self.untyped_client
1565 .scope_lookup
1566 .scope_subscriptions_mut()
1567 .add_is_loading_subscription(
1568 query_scope.cache_key(),
1569 MaybeLocal::new(ArcSignal::derive(move || {
1570 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1571 })),
1572 )
1573 }
1574
1575 #[track_caller]
1586 pub fn subscribe_is_fetching<K, MaybeKey, V, M>(
1587 &self,
1588 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1589 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1590 ) -> Signal<bool>
1591 where
1592 K: Hash + Send + Sync + 'static,
1593 MaybeKey: QueryMaybeKey<K, V>,
1594 MaybeKey::MappedValue: 'static,
1595 V: 'static,
1596 {
1597 self.subscribe_is_fetching_arc(query_scope, keyer).into()
1598 }
1599
1600 #[track_caller]
1611 pub fn subscribe_is_fetching_local<K, MaybeKey, V, M>(
1612 &self,
1613 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1614 keyer: impl Fn() -> MaybeKey + 'static,
1615 ) -> Signal<bool>
1616 where
1617 K: Hash + 'static,
1618 MaybeKey: QueryMaybeKey<K, V>,
1619 MaybeKey::MappedValue: 'static,
1620 V: 'static,
1621 {
1622 self.subscribe_is_fetching_arc_local(query_scope, keyer)
1623 .into()
1624 }
1625
1626 #[track_caller]
1637 pub fn subscribe_is_fetching_arc_local<K, MaybeKey, V, M>(
1638 &self,
1639 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1640 keyer: impl Fn() -> MaybeKey + 'static,
1641 ) -> ArcSignal<bool>
1642 where
1643 K: Hash + 'static,
1644 MaybeKey: QueryMaybeKey<K, V>,
1645 MaybeKey::MappedValue: 'static,
1646 V: 'static,
1647 {
1648 let keyer = SendWrapper::new(keyer);
1649 self.untyped_client
1650 .scope_lookup
1651 .scope_subscriptions_mut()
1652 .add_is_fetching_subscription(
1653 query_scope.cache_key(),
1654 MaybeLocal::new_local(ArcSignal::derive(move || {
1655 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1656 })),
1657 )
1658 }
1659
1660 #[track_caller]
1671 pub fn subscribe_is_fetching_arc<K, MaybeKey, V, M>(
1672 &self,
1673 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1674 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1675 ) -> ArcSignal<bool>
1676 where
1677 K: Hash + Send + Sync + 'static,
1678 MaybeKey: QueryMaybeKey<K, V>,
1679 MaybeKey::MappedValue: 'static,
1680 V: 'static,
1681 {
1682 self.untyped_client
1683 .scope_lookup
1684 .scope_subscriptions_mut()
1685 .add_is_fetching_subscription(
1686 query_scope.cache_key(),
1687 MaybeLocal::new(ArcSignal::derive(move || {
1688 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1689 })),
1690 )
1691 }
1692
1693 #[track_caller]
1701 pub fn subscribe_value<K, MaybeKey, V, M>(
1702 &self,
1703 query_scope: impl QueryScopeTrait<K, V, M>,
1704 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1705 ) -> Signal<Option<V>>
1706 where
1707 K: DebugIfDevtoolsEnabled + Hash + Clone + Send + Sync + 'static,
1708 MaybeKey: QueryMaybeKey<K, V>,
1709 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1710 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1711 {
1712 self.subscribe_value_arc(query_scope, keyer).into()
1713 }
1714
1715 #[track_caller]
1723 pub fn subscribe_value_local<K, MaybeKey, V, M>(
1724 &self,
1725 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1726 keyer: impl Fn() -> MaybeKey + 'static,
1727 ) -> Signal<Option<V>, LocalStorage>
1728 where
1729 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1730 MaybeKey: QueryMaybeKey<K, V>,
1731 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1732 V: DebugIfDevtoolsEnabled + Clone + 'static,
1733 {
1734 self.subscribe_value_arc_local(query_scope, keyer).into()
1735 }
1736
1737 #[track_caller]
1745 pub fn subscribe_value_arc_local<K, MaybeKey, V, M>(
1746 &self,
1747 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1748 keyer: impl Fn() -> MaybeKey + 'static,
1749 ) -> ArcLocalSignal<Option<V>>
1750 where
1751 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1752 MaybeKey: QueryMaybeKey<K, V>,
1753 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1754 V: DebugIfDevtoolsEnabled + Clone + 'static,
1755 {
1756 let cache_key = query_scope.cache_key();
1757 let keyer = ArcLocalSignal::derive_local(move || {
1758 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1759 });
1760 let dyn_signal = self
1761 .untyped_client
1762 .scope_lookup
1763 .scope_subscriptions_mut()
1764 .add_value_set_updated_or_removed_subscription(
1765 cache_key,
1766 MaybeLocal::new_local({
1767 let keyer = keyer.clone();
1768 move || keyer.get()
1769 }),
1770 );
1771
1772 let scope_lookup = self.untyped_client.scope_lookup;
1773 ArcLocalSignal::derive_local(move || {
1774 dyn_signal.track();
1775 if let Some(key) = keyer.read_untracked().as_ref() {
1776 scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1777 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1779 })
1780 } else {
1781 None
1782 }
1783 })
1784 }
1785
1786 #[track_caller]
1794 pub fn subscribe_value_arc<K, MaybeKey, V, M>(
1795 &self,
1796 query_scope: impl QueryScopeTrait<K, V, M>,
1797 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1798 ) -> ArcSignal<Option<V>>
1799 where
1800 K: DebugIfDevtoolsEnabled + Hash + Clone + Send + Sync + 'static,
1801 MaybeKey: QueryMaybeKey<K, V>,
1802 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1803 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1804 {
1805 let cache_key = query_scope.cache_key();
1806 let keyer = ArcSignal::derive(move || keyer().into_maybe_key().map(|k| KeyHash::new(&k)));
1807
1808 let dyn_signal = self
1809 .untyped_client
1810 .scope_lookup
1811 .scope_subscriptions_mut()
1812 .add_value_set_updated_or_removed_subscription(
1813 cache_key,
1814 MaybeLocal::new({
1815 let keyer = keyer.clone();
1816 move || keyer.get()
1817 }),
1818 );
1819
1820 let scope_lookup = self.untyped_client.scope_lookup;
1821 ArcSignal::derive(move || {
1822 dyn_signal.track();
1823 if let Some(key) = keyer.read_untracked().as_ref() {
1824 scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1825 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1827 })
1828 } else {
1829 None
1830 }
1831 })
1832 }
1833
1834 #[track_caller]
1838 pub fn invalidate_query<K, V, M>(
1839 &self,
1840 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1841 key: impl Borrow<K>,
1842 ) -> bool
1843 where
1844 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1845 V: DebugIfDevtoolsEnabled + Clone + 'static,
1846 {
1847 self.untyped_client.invalidate_query(query_scope, key)
1848 }
1849
1850 #[track_caller]
1854 pub fn invalidate_queries<K, V, KRef, M>(
1855 &self,
1856 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1857 keys: impl IntoIterator<Item = KRef>,
1858 ) -> Vec<KRef>
1859 where
1860 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1861 V: DebugIfDevtoolsEnabled + Clone + 'static,
1862 KRef: Borrow<K>,
1863 {
1864 self.untyped_client.invalidate_queries(query_scope, keys)
1865 }
1866
1867 #[track_caller]
1874 pub fn invalidate_queries_with_predicate<K, V, M>(
1875 &self,
1876 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1877 should_invalidate: impl Fn(&K) -> bool,
1878 ) where
1879 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1880 V: DebugIfDevtoolsEnabled + Clone + 'static,
1881 {
1882 let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1883 let mut cbs_scopes = vec![];
1884 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
1885 self.untyped_client
1886 .scope_lookup
1887 .with_cached_scope_mut::<K, V, _, _>(
1888 &mut scopes,
1889 query_scope_info.cache_key,
1890 OnScopeMissing::Skip,
1891 |_| {},
1892 |maybe_scope, _| {
1893 if let Some(scope) = maybe_scope {
1894 for query in scope.all_queries_mut_include_pending() {
1895 if let Some(key) = query.key().value_if_safe()
1896 && should_invalidate(key)
1897 {
1898 let cb_scopes = query.invalidate(QueryAbortReason::Invalidate);
1899 cbs_scopes.push(cb_scopes);
1900 }
1901 }
1902 }
1903 },
1904 );
1905 let mut cbs_external = vec![];
1906 for cb in cbs_scopes {
1907 if let Some(cb_external) = cb(&mut scopes) {
1908 cbs_external.push(cb_external);
1909 }
1910 }
1911 drop(scopes);
1912 run_external_callbacks(
1913 self.untyped_client,
1914 query_scope_info.cache_key,
1915 cbs_external,
1916 );
1917 }
1918
1919 #[track_caller]
1923 pub fn invalidate_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
1924 where
1925 K: Hash + 'static,
1926 V: Clone + 'static,
1927 {
1928 self.invalidate_query_scope_inner(query_scope.cache_key())
1929 }
1930
1931 pub(crate) fn invalidate_query_scope_inner(&self, scope_cache_key: ScopeCacheKey) {
1932 let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1933 let mut cbs_scopes = vec![];
1934 if let Some(scope) = scopes.get_mut(&scope_cache_key) {
1935 let cb_scopes = scope.invalidate_scope(QueryAbortReason::Invalidate);
1936 cbs_scopes.push(cb_scopes);
1937 for buster in scope.busters() {
1938 buster.try_set(new_buster_id());
1939 }
1940 }
1941 let mut cbs_external = vec![];
1942 for cb in cbs_scopes {
1943 if let Some(cb_external) = cb(&mut scopes) {
1944 cbs_external.push(cb_external);
1945 }
1946 }
1947 drop(scopes);
1948 run_external_callbacks(self.untyped_client, scope_cache_key, cbs_external);
1949 }
1950
1951 #[track_caller]
1958 pub fn invalidate_all_queries(&self) {
1959 let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1960 let mut cbs_scopes = vec![];
1961 for scope in scopes.values_mut() {
1962 let busters = scope.busters();
1963 let cb_scopes = scope.invalidate_scope(QueryAbortReason::Invalidate);
1964 cbs_scopes.push((cb_scopes, scope.cache_key()));
1965 for buster in busters {
1966 buster.try_set(new_buster_id());
1967 }
1968 }
1969 let mut cbs_external = vec![];
1970 for (cb, cache_key) in cbs_scopes {
1971 if let Some(cb_external) = cb(&mut scopes) {
1972 cbs_external.push((cb_external, cache_key));
1973 }
1974 }
1975 drop(scopes);
1976 for (cb_external, cache_key) in cbs_external {
1977 run_external_callbacks(self.untyped_client, cache_key, vec![cb_external]);
1978 }
1979 }
1980
1981 #[track_caller]
1990 pub fn clear(&self) {
1991 let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1992 let mut cbs_scopes = vec![];
1993 for scope in scopes.values_mut() {
1994 let busters = scope.busters();
1995 let cb_scopes = scope.invalidate_scope(QueryAbortReason::Clear);
1996 cbs_scopes.push((cb_scopes, scope.cache_key()));
1997 scope.clear();
1998 for buster in busters {
1999 buster.try_set(new_buster_id());
2000 }
2001 }
2002 let mut cbs_external = vec![];
2003 for (cb, cache_key) in cbs_scopes {
2004 if let Some(cb_external) = cb(&mut scopes) {
2005 cbs_external.push((cb_external, cache_key));
2006 }
2007 }
2008 drop(scopes);
2009 for (cb_external, cache_key) in cbs_external {
2010 run_external_callbacks(self.untyped_client, cache_key, vec![cb_external]);
2011 }
2012 }
2013
2014 #[cfg(test)]
2015 pub(crate) fn total_cached_queries(&self) -> usize {
2016 self.untyped_client
2017 .scope_lookup
2018 .scopes()
2019 .values()
2020 .map(|scope| scope.total_cached_queries())
2021 .sum()
2022 }
2023
2024 #[cfg(test)]
2025 pub(crate) fn is_key_invalid<K, V, M>(
2026 &self,
2027 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2028 key: impl Borrow<K>,
2029 ) -> bool
2030 where
2031 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2032 V: DebugIfDevtoolsEnabled + Clone + 'static,
2033 {
2034 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2035 self.untyped_client
2036 .scope_lookup
2037 .with_cached_scope_mut::<K, V, _, _>(
2038 &mut self.untyped_client.scope_lookup.scopes_mut(),
2039 query_scope_info.cache_key,
2040 OnScopeMissing::Skip,
2041 |_| {},
2042 |maybe_scope, _| {
2043 if let Some(scope) = maybe_scope {
2044 scope
2045 .get(&KeyHash::new(key.borrow()))
2046 .map(|query| query.is_invalidated())
2047 .unwrap_or(false)
2048 } else {
2049 false
2050 }
2051 },
2052 )
2053 }
2054
2055 #[cfg(test)]
2056 #[allow(dead_code)]
2057 pub(crate) fn mark_key_valid<K, V, M>(
2058 &self,
2059 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2060 key: impl Borrow<K>,
2061 ) where
2062 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2063 V: DebugIfDevtoolsEnabled + Clone + 'static,
2064 {
2065 self.untyped_client
2066 .scope_lookup
2067 .with_cached_scope_mut::<K, V, _, _>(
2068 &mut self.untyped_client.scope_lookup.scopes_mut(),
2069 QueryScopeInfo::new_local(&query_scope).cache_key,
2070 OnScopeMissing::Skip,
2071 |_| {},
2072 |maybe_scope, _| {
2073 if let Some(scope) = maybe_scope
2074 && let Some(query) = scope.get_mut(&KeyHash::new(key.borrow()))
2075 {
2076 query.mark_valid();
2077 }
2078 },
2079 );
2080 }
2081
2082 #[cfg(test)]
2083 pub(crate) fn subscriber_count(&self) -> usize {
2084 self.untyped_client
2085 .scope_lookup
2086 .scope_subscriptions_mut()
2087 .count()
2088 }
2089}
2090
2091impl UntypedQueryClient {
2092 #[track_caller]
2093 pub(crate) async fn fetch_query<K, V, M>(
2094 &self,
2095 query_scope: impl QueryScopeTrait<K, V, M>,
2096 key: impl Borrow<K>,
2097 ) -> V
2098 where
2099 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
2100 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
2101 {
2102 let query_scope_info = QueryScopeInfo::new(&query_scope);
2103 let owner_chain = OwnerChain::new(*self, query_scope_info.cache_key, Owner::current());
2104 self.fetch_inner(
2105 query_scope_info,
2106 || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
2107 async |key| MaybeLocal::new(query_scope.query(key).await),
2108 key.borrow(),
2109 None,
2110 || MaybeLocal::new(key.borrow().clone()),
2111 &owner_chain,
2112 )
2113 .await
2114 }
2115
2116 #[track_caller]
2117 pub(crate) async fn fetch_query_local<K, V, M>(
2118 &self,
2119 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2120 key: impl Borrow<K>,
2121 ) -> V
2122 where
2123 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
2124 V: DebugIfDevtoolsEnabled + Clone + 'static,
2125 {
2126 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2127 let owner_chain = OwnerChain::new(*self, query_scope_info.cache_key, Owner::current());
2128 self.fetch_inner(
2129 query_scope_info,
2130 || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
2131 async |key| MaybeLocal::new_local(query_scope.query(key).await),
2132 key.borrow(),
2133 None,
2134 || MaybeLocal::new_local(key.borrow().clone()),
2135 &owner_chain,
2136 )
2137 .await
2138 }
2139
2140 #[track_caller]
2141 async fn fetch_inner<K, V, FetcherFut>(
2142 &self,
2143 query_scope_info: QueryScopeInfo,
2144 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
2145 fetcher: impl Fn(K) -> FetcherFut,
2146 key: &K,
2147 maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
2148 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
2149 owner_chain: &OwnerChain,
2150 ) -> V
2151 where
2152 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
2153 V: DebugIfDevtoolsEnabled + Clone + 'static,
2154 FetcherFut: Future<Output = MaybeLocal<V>>,
2155 {
2156 self.cached_or_fetch(
2157 self.options,
2158 query_scope_info.options,
2159 None,
2160 &query_scope_info,
2161 query_scope_info_for_new_query,
2162 key,
2163 fetcher,
2164 |info| {
2165 match info.variant {
2166 CachedOrFetchCbInputVariant::CachedUntouched => {
2167 if info.cached.stale_or_invalidated() {
2168 return CachedOrFetchCbOutput::Refetch;
2169 }
2170 }
2171 CachedOrFetchCbInputVariant::CachedUpdated => {
2172 info.cached.buster.set(new_buster_id());
2174 }
2175 CachedOrFetchCbInputVariant::Fresh => {}
2176 }
2177 CachedOrFetchCbOutput::Return(
2178 info.cached.value_maybe_stale().value_may_panic().clone(),
2180 )
2181 },
2182 maybe_preheld_fetcher_mutex_guard,
2183 lazy_maybe_local_key,
2184 owner_chain,
2185 )
2186 .await
2187 }
2188
2189 #[track_caller]
2190 pub(crate) fn invalidate_query<K, V, M>(
2191 &self,
2192 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2193 key: impl Borrow<K>,
2194 ) -> bool
2195 where
2196 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2197 V: DebugIfDevtoolsEnabled + Clone + 'static,
2198 {
2199 let cleared = self.invalidate_queries(query_scope, std::iter::once(key));
2200 !cleared.is_empty()
2201 }
2202
2203 #[track_caller]
2204 pub(crate) fn invalidate_queries<K, V, KRef, M>(
2205 &self,
2206 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2207 keys: impl IntoIterator<Item = KRef>,
2208 ) -> Vec<KRef>
2209 where
2210 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2211 V: DebugIfDevtoolsEnabled + Clone + 'static,
2212 KRef: Borrow<K>,
2213 {
2214 self.invalidate_queries_inner::<K, V, _>(&QueryScopeInfo::new_local(&query_scope), keys)
2215 }
2216
2217 #[track_caller]
2218 pub(crate) fn invalidate_queries_inner<K, V, KRef>(
2219 &self,
2220 query_scope_info: &QueryScopeInfo,
2221 keys: impl IntoIterator<Item = KRef>,
2222 ) -> Vec<KRef>
2223 where
2224 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2225 V: DebugIfDevtoolsEnabled + Clone + 'static,
2226 KRef: Borrow<K>,
2227 {
2228 let keys = keys.into_iter().collect::<Vec<_>>();
2229 let key_hashes = keys
2230 .iter()
2231 .map(|key| KeyHash::new(key.borrow()))
2232 .collect::<Vec<_>>();
2233
2234 if key_hashes.is_empty() {
2235 return vec![];
2236 }
2237
2238 let mut scopes = self.scope_lookup.scopes_mut();
2239 let mut cbs_scopes = vec![];
2240 let results = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2241 &mut scopes,
2242 query_scope_info.cache_key,
2243 OnScopeMissing::Skip,
2244 |_| {},
2245 |maybe_scope, _| {
2246 let mut invalidated = vec![];
2247 if let Some(scope) = maybe_scope {
2248 for (key, key_hash) in keys.into_iter().zip(key_hashes.iter()) {
2249 if let Some(cached) = scope.get_mut_include_pending(key_hash) {
2250 let cb_scopes = cached.invalidate(QueryAbortReason::Invalidate);
2251 cbs_scopes.push(cb_scopes);
2252 invalidated.push(key);
2253 }
2254 }
2255 }
2256 invalidated
2257 },
2258 );
2259 let mut cbs_external = vec![];
2260 for cb in cbs_scopes {
2261 if let Some(cb_external) = cb(&mut scopes) {
2262 cbs_external.push(cb_external);
2263 }
2264 }
2265 drop(scopes);
2266 run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2267 results
2268 }
2269
2270 #[cfg(test)]
2271 #[track_caller]
2273 pub(crate) fn clear_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
2274 where
2275 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2276 V: DebugIfDevtoolsEnabled + Clone + 'static,
2277 {
2278 let mut scopes = self.scope_lookup.scopes_mut();
2279 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2280 let mut cbs_scopes = vec![];
2281 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2282 &mut scopes,
2283 query_scope_info.cache_key,
2284 OnScopeMissing::Skip,
2285 |_| {},
2286 |maybe_scope, _| {
2287 if let Some(scope) = maybe_scope {
2288 for cached in scope.all_queries_mut_include_pending() {
2289 let cb_scopes = cached.invalidate(QueryAbortReason::Clear);
2290 cbs_scopes.push(cb_scopes);
2291 }
2292 }
2293 },
2294 );
2295 let mut cbs_external = vec![];
2296 for cb in cbs_scopes {
2297 if let Some(cb_external) = cb(&mut scopes) {
2298 cbs_external.push(cb_external);
2299 }
2300 }
2301 drop(scopes);
2302 run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2303 }
2304
2305 #[track_caller]
2307 pub(crate) fn clear_query<K, V, M>(
2308 &self,
2309 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2310 key: impl Borrow<K>,
2311 ) -> bool
2312 where
2313 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2314 V: DebugIfDevtoolsEnabled + Clone + 'static,
2315 {
2316 let mut scopes = self.scope_lookup.scopes_mut();
2317 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2318 let mut cbs_scopes = vec![];
2319 let result = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2320 &mut scopes,
2321 query_scope_info.cache_key,
2322 OnScopeMissing::Skip,
2323 |_| {},
2324 |maybe_scope, _| {
2325 if let Some(scope) = maybe_scope {
2326 let key_hash = KeyHash::new(key.borrow());
2327 if let Some(cached) = scope.get_mut_include_pending(&key_hash) {
2328 let cb_scopes = cached.invalidate(QueryAbortReason::Clear);
2329 cbs_scopes.push(cb_scopes);
2330 }
2331 let removed = scope.remove_entry(&key_hash);
2332 scope.remove_entry(&key_hash);
2334 return removed.is_some();
2335 }
2336 false
2337 },
2338 );
2339 let mut cbs_external = vec![];
2340 for cb in cbs_scopes {
2341 if let Some(cb_external) = cb(&mut scopes) {
2342 cbs_external.push(cb_external);
2343 }
2344 }
2345 drop(scopes);
2346 run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2347 result
2348 }
2349
2350 pub(crate) fn query_metadata<K, V>(
2351 &self,
2352 scope_cache_key: ScopeCacheKey,
2353 key: impl Borrow<K>,
2354 ) -> Option<QueryMetadata>
2355 where
2356 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2357 V: DebugIfDevtoolsEnabled + Clone + 'static,
2358 {
2359 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2360 &mut self.scope_lookup.scopes_mut(),
2361 scope_cache_key,
2362 OnScopeMissing::Skip,
2363 |_| {},
2364 |maybe_scope, _| {
2365 if let Some(scope) = maybe_scope
2366 && let Some(cached) = scope.get(&KeyHash::new(key.borrow()))
2367 {
2368 return Some(QueryMetadata {
2369 updated_at: cached.updated_at,
2370 stale_or_invalidated: cached.stale_or_invalidated(),
2371 });
2372 }
2373 None
2374 },
2375 )
2376 }
2377
2378 pub async fn cached_or_fetch<K, V, T, FetcherFut>(
2379 &self,
2380 client_options: QueryOptions,
2381 scope_options: Option<QueryOptions>,
2382 maybe_buster_if_uncached: Option<ArcRwSignal<u64>>,
2383 query_scope_info: &QueryScopeInfo,
2384 query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
2385 key: &K,
2386 fetcher: impl Fn(K) -> FetcherFut,
2387 return_cb: impl Fn(CachedOrFetchCbInput<K, V>) -> CachedOrFetchCbOutput<T>,
2388 maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
2389 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
2390 owner_chain: &OwnerChain,
2391 ) -> T
2392 where
2393 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2394 V: DebugIfDevtoolsEnabled + Clone + 'static,
2395 FetcherFut: Future<Output = MaybeLocal<V>>,
2396 {
2397 let scope_lookup = &self.scope_lookup;
2398 let key_hash = KeyHash::new(key);
2399 let mut cached_buster = None;
2400 let next_directive = scope_lookup.with_cached_query::<K, V, _>(
2401 &key_hash,
2402 &query_scope_info.cache_key,
2403 |maybe_cached| {
2404 if let Some(cached) = maybe_cached {
2405 cached_buster = Some(cached.buster.clone());
2406 return_cb(CachedOrFetchCbInput {
2407 cached,
2408 variant: CachedOrFetchCbInputVariant::CachedUntouched,
2409 })
2410 } else {
2411 CachedOrFetchCbOutput::Refetch
2412 }
2413 },
2414 );
2415
2416 match next_directive {
2417 CachedOrFetchCbOutput::Return(value) => value,
2418 CachedOrFetchCbOutput::Refetch => {
2419 let fetcher_mutex = scope_lookup.fetcher_mutex::<K, V>(key_hash, query_scope_info);
2421 let _maybe_fetcher_mutex_guard_local = if maybe_preheld_fetcher_mutex_guard
2422 .is_none()
2423 {
2424 let _fetcher_guard = match fetcher_mutex.try_lock() {
2425 Some(fetcher_guard) => fetcher_guard,
2426 None => {
2427 let fetcher_guard = fetcher_mutex.lock().await;
2429 let next_directive = scope_lookup.with_cached_query::<K, V, _>(
2430 &key_hash,
2431 &query_scope_info.cache_key,
2432 |maybe_cached| {
2433 if let Some(cached) = maybe_cached {
2434 cached_buster = Some(cached.buster.clone());
2435 return_cb(CachedOrFetchCbInput {
2436 cached,
2437 variant: CachedOrFetchCbInputVariant::CachedUntouched,
2438 })
2439 } else {
2440 CachedOrFetchCbOutput::Refetch
2441 }
2442 },
2443 );
2444 match next_directive {
2445 CachedOrFetchCbOutput::Return(value) => return value,
2446 CachedOrFetchCbOutput::Refetch => fetcher_guard,
2447 }
2448 }
2449 };
2450 Some(_fetcher_guard)
2451 } else {
2452 None
2454 };
2455
2456 #[cfg(any(
2457 all(debug_assertions, feature = "devtools"),
2458 feature = "devtools-always"
2459 ))]
2460 let before_time = chrono::Utc::now();
2461
2462 let loading_first_time = cached_buster.is_none();
2463
2464 #[cfg(any(
2465 all(debug_assertions, feature = "devtools"),
2466 feature = "devtools-always"
2467 ))]
2468 {
2469 if loading_first_time {
2471 scope_lookup
2472 .client_subscriptions_mut()
2473 .notify_query_created(crate::subs_client::QueryCreatedInfo {
2474 cache_key: query_scope_info.cache_key,
2475 scope_title: query_scope_info.title.clone(),
2476 key_hash,
2477 debug_key: crate::utils::DebugValue::new(key),
2478 combined_options: crate::options_combine(
2479 client_options,
2480 scope_options,
2481 ),
2482 });
2483 }
2484 }
2485
2486 let maybe_local_key = lazy_maybe_local_key();
2487
2488 enum MaybeNewValue<V> {
2489 NewValue(V),
2490 SsrStreamedValueOverride,
2491 }
2492
2493 let maybe_new_value = scope_lookup
2494 .with_notify_fetching(
2495 query_scope_info.cache_key,
2496 key_hash,
2497 loading_first_time,
2498 async {
2500 loop {
2501 let query_abort_rx = scope_lookup
2502 .prepare_invalidation_channel::<K, V>(
2503 query_scope_info,
2504 key_hash,
2505 &maybe_local_key,
2506 );
2507
2508 let fut = owner_chain.with(|| fetcher(key.clone()));
2509
2510 futures::select_biased! {
2511 rx_result = query_abort_rx.fuse() => {
2512 if let Ok(reason) = rx_result {
2513 match reason {
2514 QueryAbortReason::Invalidate | QueryAbortReason::Clear => {},
2515 QueryAbortReason::SsrStreamedValueOverride => {
2516 break MaybeNewValue::SsrStreamedValueOverride;
2517 },
2518 }
2519 }
2520 },
2521 new_value = fut.fuse() => {
2522 break MaybeNewValue::NewValue(new_value);
2523 },
2524 }
2525 }
2526 },
2527 )
2528 .await;
2529
2530 #[cfg(any(
2531 all(debug_assertions, feature = "devtools"),
2532 feature = "devtools-always"
2533 ))]
2534 let elapsed_ms = chrono::Utc::now()
2535 .signed_duration_since(before_time)
2536 .num_milliseconds();
2537
2538 let buster_if_uncached = if loading_first_time {
2539 Some(
2540 maybe_buster_if_uncached
2541 .unwrap_or_else(|| ArcRwSignal::new(new_buster_id())),
2542 )
2543 } else {
2544 None
2545 };
2546
2547 let next_directive = scope_lookup.with_cached_scope_mut::<_, _, _, _>(
2548 &mut scope_lookup.scopes_mut(),
2549 query_scope_info.cache_key,
2550 OnScopeMissing::Create(query_scope_info),
2551 |_| {},
2552 |scope, _| {
2553 let scope = scope.expect("provided a default");
2554 match maybe_new_value {
2555 MaybeNewValue::NewValue(new_value) => {
2556 if let Some(cached) = scope.get_mut(&key_hash) {
2557 cached.set_value(
2558 new_value,
2559 true,
2560 #[cfg(any(
2561 all(debug_assertions, feature = "devtools"),
2562 feature = "devtools-always"
2563 ))]
2564 crate::events::Event::new(
2565 crate::events::EventVariant::Fetched { elapsed_ms },
2566 ),
2567 ResetInvalidated::Reset,
2568 );
2569 return_cb(CachedOrFetchCbInput {
2570 cached,
2571 variant: CachedOrFetchCbInputVariant::CachedUpdated,
2572 })
2573 } else {
2574 scope.insert_without_query_created_notif(
2576 key_hash,
2577 Query::new(
2578 client_options,
2579 *self,
2580 query_scope_info,
2581 query_scope_info_for_new_query(),
2582 key_hash,
2583 maybe_local_key,
2584 new_value,
2585 buster_if_uncached.expect(
2586 "loading_first_time means this is Some(). (bug)",
2587 ),
2588 scope_options,
2589 None,
2590 #[cfg(any(
2591 all(debug_assertions, feature = "devtools"),
2592 feature = "devtools-always"
2593 ))]
2594 crate::events::Event::new(
2595 crate::events::EventVariant::Fetched { elapsed_ms },
2596 ),
2597 ),
2598 );
2599 return_cb(CachedOrFetchCbInput {
2600 cached: scope.get(&key_hash).expect("Just set. (bug)"),
2601 variant: CachedOrFetchCbInputVariant::Fresh,
2602 })
2603 }
2604 }
2605 MaybeNewValue::SsrStreamedValueOverride => {
2606 return_cb(CachedOrFetchCbInput {
2607 cached: scope
2608 .get(&key_hash)
2609 .expect("Should contain value streamed from server. (bug)"),
2610 variant: CachedOrFetchCbInputVariant::Fresh,
2611 })
2612 }
2613 }
2614 },
2615 );
2616
2617 match next_directive {
2618 CachedOrFetchCbOutput::Refetch => {
2619 panic!("Unexpected refetch directive after providing fresh value. (bug)")
2620 }
2621 CachedOrFetchCbOutput::Return(return_value) => return_value,
2622 }
2623 }
2624 }
2625 }
2626}
2627
2628pub(crate) struct QueryMetadata {
2629 pub updated_at: DateTime<Utc>,
2630 pub stale_or_invalidated: bool,
2631}