leptos_fetch/
query_client.rs

1#![allow(ungated_async_fn_track_caller)] // Want it to auto-turn on when stable
2
3use std::{
4    borrow::Borrow,
5    collections::HashMap,
6    fmt::Debug,
7    hash::Hash,
8    sync::{Arc, LazyLock, atomic::AtomicBool},
9};
10
11use chrono::{DateTime, Utc};
12use codee::{Decoder, Encoder};
13use futures::{FutureExt, pin_mut};
14use leptos::{
15    prelude::{
16        ArcRwSignal, ArcSignal, Effect, Get, GetUntracked, LocalStorage, Owner, Read,
17        ReadUntracked, Set, Signal, Track, provide_context,
18    },
19    server::{
20        ArcLocalResource, ArcResource, FromEncodedStr, IntoEncodedString, LocalResource, Resource,
21    },
22};
23use send_wrapper::SendWrapper;
24
25use crate::{
26    ArcLocalSignal, QueryOptions,
27    cache::{
28        CachedOrFetchCbInput, CachedOrFetchCbInputVariant, CachedOrFetchCbOutput, OnScopeMissing,
29    },
30    cache_scope::{QueryAbortReason, QueryOrPending},
31    debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
32    maybe_local::MaybeLocal,
33    query::Query,
34    query_maybe_key::QueryMaybeKey,
35    query_scope::{
36        QueryScopeInfo, QueryScopeLocalTrait, QueryScopeQueryInfo, QueryScopeTrait, ScopeCacheKey,
37    },
38    resource_drop_guard::ResourceDropGuard,
39    utils::{
40        KeyHash, OwnerChain, ResetInvalidated, new_buster_id, new_resource_id,
41        run_external_callbacks,
42    },
43};
44
45use super::cache::ScopeLookup;
46
47#[cfg(not(feature = "rkyv"))]
48pub(crate) type DefaultCodec = codee::string::JsonSerdeCodec;
49
50#[cfg(feature = "rkyv")]
51pub(crate) type DefaultCodec = codee::binary::RkyvCodec;
52
53task_local::task_local! {
54    pub(crate) static ASYNC_TRACK_UPDATE_MARKER: Arc<AtomicBool>;
55}
56
57std::thread_local! {
58    pub(crate) static SYNC_TRACK_UPDATE_MARKER: AtomicBool = const { AtomicBool::new(true) };
59}
60
61/// The [`QueryClient`] stores all query data, and is used to manage queries.
62///
63/// Should be provided via leptos context at the top of the app.
64///
65/// # Example
66///
67/// ```
68/// use leptos::prelude::*;
69/// use leptos_fetch::QueryClient;
70///
71/// #[component]
72/// pub fn App() -> impl IntoView {
73///    QueryClient::new().provide();
74///     // ...
75/// }
76///
77/// #[component]
78/// pub fn MyComponent() -> impl IntoView {
79///     let client: QueryClient = expect_context();
80///      // ...
81/// }
82/// ```
83pub struct QueryClient<Codec = DefaultCodec> {
84    pub(crate) untyped_client: UntypedQueryClient,
85    _ser: std::marker::PhantomData<SendWrapper<Codec>>,
86}
87
88/// The internal untyped part of the [`QueryClient`],
89/// containing the methods that don't rely on the Codec generic.
90#[derive(Clone, Copy)]
91pub(crate) struct UntypedQueryClient {
92    pub(crate) scope_lookup: ScopeLookup,
93    options: QueryOptions,
94    created_at: DateTime<Utc>,
95}
96
97impl<Codec> Debug for QueryClient<Codec> {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("QueryClient")
100            .field("scope_lookup", &self.untyped_client.scope_lookup)
101            .field("options", &self.untyped_client.options)
102            .field("codec", &std::any::type_name::<Codec>())
103            .finish()
104    }
105}
106
107impl<Codec: 'static> Clone for QueryClient<Codec> {
108    fn clone(&self) -> Self {
109        *self
110    }
111}
112
113impl<Codec: 'static> Copy for QueryClient<Codec> {}
114
115impl Default for QueryClient<DefaultCodec> {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl QueryClient<DefaultCodec> {
122    /// Creates a new [`QueryClient`] with the default codec: [`codee::string::JsonSerdeCodec`].
123    ///
124    /// Call [`QueryClient::set_codec()`] to set a different codec.
125    ///
126    /// Call [`QueryClient::with_options()`] to set non-default options.
127    #[track_caller]
128    pub fn new() -> Self {
129        Self {
130            untyped_client: UntypedQueryClient {
131                scope_lookup: ScopeLookup::new(),
132                options: QueryOptions::default(),
133                created_at: Utc::now(),
134            },
135            _ser: std::marker::PhantomData,
136        }
137    }
138}
139
140impl<Codec: 'static> QueryClient<Codec> {
141    /// Provide the client to leptos context.
142    /// ```no_run
143    /// use leptos_fetch::QueryClient;
144    /// use leptos::prelude::*;
145    ///
146    /// QueryClient::new().provide();
147    ///
148    /// let client: QueryClient = expect_context();
149    /// ```
150    #[track_caller]
151    pub fn provide(self) -> Self {
152        provide_context(self);
153        self
154    }
155
156    /// **Applies to `ssr` only**
157    ///
158    /// It's possible to use non-json codecs for streaming leptos resources from the backend.
159    /// The default is [`codee::string::JsonSerdeCodec`].
160    ///
161    /// The current `codee` major version is `0.3` and will need to be imported in your project to customize the codec.
162    ///
163    /// E.g. to use [`codee::binary::MsgpackSerdeCodec`](https://docs.rs/codee/latest/codee/binary/struct.MsgpackSerdeCodec.html):
164    /// ```toml
165    /// codee = { version = "0.3", features = ["msgpack_serde"] }
166    /// ```
167    ///
168    /// This is a generic type on the [`QueryClient`], so when calling [`leptos::prelude::expect_context`],
169    /// this type must be specified when not using the default.
170    ///
171    /// A useful pattern is to type alias the client with the custom codec for your whole app:
172    ///
173    /// ```no_run
174    /// use codee::binary::MsgpackSerdeCodec;
175    /// use leptos::prelude::*;
176    /// use leptos_fetch::QueryClient;
177    ///
178    /// type MyQueryClient = QueryClient<MsgpackSerdeCodec>;
179    ///
180    /// // Create and provide to context to make accessible everywhere:
181    /// QueryClient::new().set_codec::<MsgpackSerdeCodec>().provide();
182    ///
183    /// let client: MyQueryClient = expect_context();
184    /// ```
185    #[track_caller]
186    pub fn set_codec<NewCodec>(self) -> QueryClient<NewCodec> {
187        QueryClient {
188            untyped_client: self.untyped_client,
189            _ser: std::marker::PhantomData,
190        }
191    }
192
193    /// Set non-default options to apply to all queries.
194    ///
195    /// These options will be combined with any options for a specific query scope.
196    #[track_caller]
197    pub fn with_options(mut self, options: QueryOptions) -> Self {
198        self.untyped_client.options = options;
199        self
200    }
201
202    /// Read the base [`QueryOptions`] for this [`QueryClient`].
203    ///
204    /// These will be combined with any options for a specific query scope.
205    #[track_caller]
206    pub fn options(&self) -> QueryOptions {
207        self.untyped_client.options
208    }
209
210    /// Provide a signal to globally enable/disable auto refetching of queries
211    /// that have active resources and have set [`QueryOptions::with_refetch_interval`].
212    ///
213    /// Whilst this signal returns `false`, refetches will be skipped.
214    #[track_caller]
215    pub fn with_refetch_enabled_toggle(self, refetch_enabled: impl Into<ArcSignal<bool>>) -> Self {
216        self.untyped_client
217            .scope_lookup
218            .scopes_mut()
219            .refetch_enabled = Some(refetch_enabled.into());
220        self
221    }
222
223    /// If set, access the signal that globally enables/disables auto refetching of queries.
224    #[track_caller]
225    pub fn refetch_enabled(&self) -> Option<ArcSignal<bool>> {
226        self.untyped_client
227            .scope_lookup
228            .scopes()
229            .refetch_enabled
230            .clone()
231    }
232
233    /// Query with [`LocalResource`]. Local resouces only load data on the client, so can be used with non-threadsafe/serializable data.
234    ///
235    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
236    #[track_caller]
237    pub fn local_resource<K, MaybeKey, V, M>(
238        &self,
239        query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
240        keyer: impl Fn() -> MaybeKey + 'static,
241    ) -> LocalResource<MaybeKey::MappedValue>
242    where
243        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
244        MaybeKey: QueryMaybeKey<K, V>,
245        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
246        V: DebugIfDevtoolsEnabled + Clone + 'static,
247    {
248        self.arc_local_resource(query_scope, keyer).into()
249    }
250
251    /// Query with [`ArcLocalResource`]. Local resouces only load data on the client, so can be used with non-threadsafe/serializable data.
252    ///
253    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
254    #[track_caller]
255    pub fn arc_local_resource<K, MaybeKey, V, M>(
256        &self,
257        query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
258        keyer: impl Fn() -> MaybeKey + 'static,
259    ) -> ArcLocalResource<MaybeKey::MappedValue>
260    where
261        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
262        MaybeKey: QueryMaybeKey<K, V>,
263        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
264        V: DebugIfDevtoolsEnabled + Clone + 'static,
265    {
266        let client = *self;
267        let client_options = self.options();
268        let cache_key = query_scope.cache_key();
269        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
270        let query_scope = Arc::new(query_scope);
271        let query_options = query_scope.options();
272        let resource_id = new_resource_id();
273
274        // To call .mark_resource_dropped() when the resource is dropped:
275        let drop_guard = ResourceDropGuard::<K, V>::new(
276            self.untyped_client.scope_lookup,
277            resource_id,
278            cache_key,
279        );
280
281        ArcLocalResource::new({
282            move || {
283                let query_scope = query_scope.clone();
284                let query_scope_info = query_scope_info.clone();
285                let maybe_key = keyer().into_maybe_key();
286                let drop_guard = drop_guard.clone();
287                if let Some(key) = maybe_key.as_ref() {
288                    drop_guard.set_active_key(KeyHash::new(key));
289                }
290                // Note: cannot hoist outside of resource,
291                // it prevents the resource itself dropping when the owner is held by the resource itself,
292                // here each instance only lasts as long as the query.
293                let owner_chain = OwnerChain::new(
294                    client.untyped_client,
295                    query_scope_info.cache_key,
296                    Owner::current(),
297                );
298                async move {
299                    if let Some(key) = maybe_key {
300                        let query_scope_query_info =
301                            || QueryScopeQueryInfo::new_local(&query_scope, &key);
302                        let value = client
303                            .untyped_client
304                            .cached_or_fetch(
305                                client_options,
306                                query_options,
307                                None,
308                                &query_scope_info,
309                                query_scope_query_info,
310                                &key,
311                                {
312                                    let query_scope = query_scope.clone();
313                                    move |key| {
314                                        let query_scope = query_scope.clone();
315
316                                        async move {
317                                            MaybeLocal::new_local(query_scope.query(key).await)
318                                        }
319                                    }
320                                },
321                                |info| {
322                                    info.cached.buster.track();
323                                    info.cached.mark_resource_active(resource_id);
324                                    match info.variant {
325                                        CachedOrFetchCbInputVariant::CachedUntouched => {
326                                            // If stale refetch in the background with the prefetch() function, which'll recognise it's stale, refetch it and invalidate busters:
327                                            if cfg!(any(test, not(feature = "ssr")))
328                                                && info.cached.stale_or_invalidated()
329                                            {
330                                                let key = key.clone();
331                                                let query_scope = query_scope.clone();
332                                                let owner_chain = owner_chain.clone();
333                                                // Just adding the SendWrapper and using spawn() rather than spawn_local() to fix tests:
334                                                leptos::task::spawn(SendWrapper::new(async move {
335                                                    client
336                                                        .prefetch_inner(
337                                                            QueryScopeInfo::new_local(&query_scope),
338                                                            || {
339                                                                QueryScopeQueryInfo::new_local(
340                                                                    &query_scope,
341                                                                    &key,
342                                                                )
343                                                            },
344                                                            async |key| {
345                                                                MaybeLocal::new_local(
346                                                                    query_scope.query(key).await,
347                                                                )
348                                                            },
349                                                            key.borrow(),
350                                                            || {
351                                                                MaybeLocal::new_local(
352                                                                    key.borrow().clone(),
353                                                                )
354                                                            },
355                                                            &owner_chain,
356                                                        )
357                                                        .await;
358                                                }));
359                                            }
360                                        }
361                                        CachedOrFetchCbInputVariant::Fresh => {}
362                                        CachedOrFetchCbInputVariant::CachedUpdated => {
363                                            panic!("Didn't direct inner to refetch here. (bug)")
364                                        }
365                                    }
366                                    CachedOrFetchCbOutput::Return(
367                                        // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
368                                        info.cached.value_maybe_stale().value_may_panic().clone(),
369                                    )
370                                },
371                                None,
372                                || MaybeLocal::new_local(key.clone()),
373                                &owner_chain,
374                            )
375                            .await;
376                        MaybeKey::prepare_mapped_value(Some(value))
377                    } else {
378                        MaybeKey::prepare_mapped_value(None)
379                    }
380                }
381            }
382        })
383    }
384
385    /// Query with [`Resource`].
386    ///
387    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
388    ///
389    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
390    ///
391    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
392    #[track_caller]
393    pub fn resource<K, MaybeKey, V, M>(
394        &self,
395        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
396        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
397    ) -> Resource<MaybeKey::MappedValue, Codec>
398    where
399        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
400        MaybeKey: QueryMaybeKey<K, V>,
401        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
402        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
403        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
404        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
405        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
406        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
407            Debug,
408        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
409        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
410    {
411        self.arc_resource_with_options(query_scope, keyer, false)
412            .into()
413    }
414
415    /// Query with a blocking [`Resource`].
416    ///
417    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
418    ///
419    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
420    ///
421    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
422    #[track_caller]
423    pub fn resource_blocking<K, MaybeKey, V, M>(
424        &self,
425        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
426        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
427    ) -> Resource<MaybeKey::MappedValue, Codec>
428    where
429        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
430        MaybeKey: QueryMaybeKey<K, V>,
431        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
432        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
433        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
434        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
435        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
436        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
437            Debug,
438        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
439        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
440    {
441        self.arc_resource_with_options(query_scope, keyer, true)
442            .into()
443    }
444
445    /// Query with [`ArcResource`].
446    ///
447    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
448    ///
449    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
450    ///
451    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
452    #[track_caller]
453    pub fn arc_resource<K, MaybeKey, V, M>(
454        &self,
455        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
456        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
457    ) -> ArcResource<MaybeKey::MappedValue, Codec>
458    where
459        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
460        MaybeKey: QueryMaybeKey<K, V>,
461        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
462        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
463        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
464        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
465        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
466        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
467            Debug,
468        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
469        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
470    {
471        self.arc_resource_with_options(query_scope, keyer, false)
472    }
473
474    /// Query with a blocking [`ArcResource`].
475    ///
476    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
477    ///
478    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
479    ///
480    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
481    #[track_caller]
482    pub fn arc_resource_blocking<K, MaybeKey, V, M>(
483        &self,
484        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
485        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
486    ) -> ArcResource<MaybeKey::MappedValue, Codec>
487    where
488        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
489        MaybeKey: QueryMaybeKey<K, V>,
490        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
491        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
492        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
493        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
494        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
495        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
496            Debug,
497        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
498        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
499    {
500        self.arc_resource_with_options(query_scope, keyer, true)
501    }
502
503    #[track_caller]
504    fn arc_resource_with_options<K, MaybeKey, V, M>(
505        &self,
506        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
507        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
508        blocking: bool,
509    ) -> ArcResource<MaybeKey::MappedValue, Codec>
510    where
511        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
512        MaybeKey: QueryMaybeKey<K, V>,
513        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
514        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
515        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
516        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
517        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
518        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
519            Debug,
520        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
521        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
522    {
523        let client = *self;
524        let client_options = self.options();
525        let cache_key = query_scope.cache_key();
526        let query_scope_info = QueryScopeInfo::new(&query_scope);
527        let query_scope = Arc::new(query_scope);
528        let scope_lookup = self.untyped_client.scope_lookup;
529        let query_options = query_scope.options();
530
531        let buster_if_uncached = ArcRwSignal::new(new_buster_id());
532        let resource_id = new_resource_id();
533
534        // To call .mark_resource_dropped() when the resource is dropped:
535        let drop_guard = ResourceDropGuard::<K, V>::new(
536            self.untyped_client.scope_lookup,
537            resource_id,
538            cache_key,
539        );
540
541        let keyer = Arc::new(keyer);
542        let resource =
543            ArcResource::new_with_options(
544                {
545                    let buster_if_uncached = buster_if_uncached.clone();
546                    let drop_guard = drop_guard.clone();
547                    let keyer = keyer.clone();
548                    move || {
549                        if let Some(key) = keyer().into_maybe_key() {
550                            let key_hash = KeyHash::new(&key);
551                            drop_guard.set_active_key(key_hash);
552                            scope_lookup.with_cached_query::<K, V, _>(
553                                &key_hash,
554                                &cache_key,
555                                |maybe_cached| {
556                                    if let Some(cached) = maybe_cached {
557                                        // Buster must be returned for it to be tracked.
558                                        (Some(key.clone()), cached.buster.get())
559                                    } else {
560                                        // Buster must be returned for it to be tracked.
561                                        (Some(key.clone()), buster_if_uncached.get())
562                                    }
563                                },
564                            )
565                        } else {
566                            (None, buster_if_uncached.get())
567                        }
568                    }
569                },
570                {
571                    let buster_if_uncached = buster_if_uncached.clone();
572                    let query_scope_info = query_scope_info.clone();
573                    let query_scope = query_scope.clone();
574                    move |(maybe_key, last_used_buster)| {
575                        let query_scope = query_scope.clone();
576                        let query_scope_info = query_scope_info.clone();
577                        let buster_if_uncached = buster_if_uncached.clone();
578                        let _drop_guard = drop_guard.clone(); // Want the guard around everywhere until the resource is dropped.
579                        // Note: cannot hoist outside of resource,
580                        // it prevents the resource itself dropping when the owner is held by the resource itself,
581                        // here each instance only lasts as long as the query.
582                        let owner_chain = OwnerChain::new(
583                            client.untyped_client,
584                            query_scope_info.cache_key,
585                            Owner::current(),
586                        );
587                        async move {
588                            if let Some(key) = maybe_key {
589                                let query_scope_query_info =
590                                    || QueryScopeQueryInfo::new(&query_scope, &key);
591                                let value =
592                                    client
593                                        .untyped_client
594                                        .cached_or_fetch(
595                                            client_options,
596                                            query_options,
597                                            Some(buster_if_uncached.clone()),
598                                            &query_scope_info,
599                                            query_scope_query_info,
600                                            &key,
601                                            {
602                                                let query_scope = query_scope.clone();
603                                                move |key| {
604                                                    let query_scope = query_scope.clone();
605                                                    async move {
606                                                        MaybeLocal::new(
607                                                            query_scope.query(key).await,
608                                                        )
609                                                    }
610                                                }
611                                            },
612                                            |info| {
613                                                info.cached.mark_resource_active(resource_id);
614                                                match info.variant {
615                                            CachedOrFetchCbInputVariant::CachedUntouched => {
616                                                // If stale refetch in the background with the prefetch() function, which'll recognise it's stale, refetch it and invalidate busters:
617                                                if cfg!(any(test, not(feature = "ssr")))
618                                                    && info.cached.stale_or_invalidated()
619                                                {
620                                                    let key = key.clone();
621                                                    let query_scope = query_scope.clone();
622                                                    let owner_chain = owner_chain.clone();
623
624                                                    leptos::task::spawn(async move {
625                                                        client
626                                                            .prefetch_inner(
627                                                                QueryScopeInfo::new(&query_scope),
628                                                                || {
629                                                                    QueryScopeQueryInfo::new(
630                                                                        &query_scope,
631                                                                        &key,
632                                                                    )
633                                                                },
634                                                                {
635                                                                    let query_scope =
636                                                                        query_scope.clone();
637                                                                    move |key| {
638                                        let query_scope = query_scope.clone();
639
640                                                                        async move {
641                                                                        MaybeLocal::new(
642                                                                            query_scope
643                                                                                .query(key)
644                                                                                .await,
645                                                                        )
646                                                                    }}
647                                                                },
648                                                                key.borrow(),
649                                                                || {
650                                                                    MaybeLocal::new(
651                                                                        key.borrow().clone(),
652                                                                    )
653                                                                },
654                                                                &owner_chain,
655                                                            )
656                                                            .await;
657                                                    });
658                                                }
659
660                                                // Handle edge case where the key function saw it was uncached, so entered here,
661                                                // but when the cached_or_fetch was acquiring the fetch mutex, someone else cached it,
662                                                // meaning our key function won't be reactive correctly unless we invalidate it now:
663                                                if last_used_buster
664                                                    != info.cached.buster.get_untracked()
665                                                {
666                                                    buster_if_uncached
667                                                        .set(info.cached.buster.get_untracked());
668                                                }
669                                            }
670                                            CachedOrFetchCbInputVariant::Fresh => {}
671                                            CachedOrFetchCbInputVariant::CachedUpdated => {
672                                                panic!("Didn't direct inner to refetch here. (bug)")
673                                            }
674                                        }
675                                                CachedOrFetchCbOutput::Return(
676                                                    // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
677                                                    info.cached
678                                                        .value_maybe_stale()
679                                                        .value_may_panic()
680                                                        .clone(),
681                                                )
682                                            },
683                                            None,
684                                            || MaybeLocal::new(key.clone()),
685                                            &owner_chain,
686                                        )
687                                        .await;
688                                MaybeKey::prepare_mapped_value(Some(value))
689                            } else {
690                                MaybeKey::prepare_mapped_value(None)
691                            }
692                        }
693                    }
694                },
695                blocking,
696            );
697
698        // On the client, want to repopulate the frontend cache, so should write resources to the cache here if they don't exist.
699        // It would be better if in here we could check if the resource was started on the backend/streamed, saves doing most of this if already a frontend resource.
700        let effect = {
701            let resource = resource.clone();
702            let buster_if_uncached = buster_if_uncached.clone();
703            let client_created_at = self.untyped_client.created_at;
704            // Converting to Arc because the tests like the client get dropped even though this persists:
705            move |complete: Option<Option<()>>| {
706                if let Some(Some(())) = complete {
707                    return Some(());
708                }
709                if let Some(val) = resource.read().as_ref() {
710                    if MaybeKey::mapped_value_is_some(val) {
711                        scope_lookup.with_cached_scope_mut::<K, V, _, _>(
712                            &mut scope_lookup.scopes_mut(),
713                            query_scope_info.cache_key,
714                            OnScopeMissing::Create(&query_scope_info),
715                            |_| {},
716                            |maybe_scope, _| {
717                                let scope = maybe_scope.expect("provided a default");
718                                if let Some(key) = keyer().into_maybe_key() {
719                                    let query_scope_query_info = || QueryScopeQueryInfo::new(&query_scope, &key);
720                                    let key_hash = KeyHash::new(&key);
721
722                                    // Had a bug in tests where:
723                                    // calling client.resource() multiple times
724                                    // previous impl would only run the effect once per client.resource(), 
725                                    // but would still run again on subsequent client.resource() with the same key value
726                                    // - client.resource()
727                                    // - invalidate, starts loading in background
728                                    // - client.resource()
729                                    // - effect runs again, sees resource has value, goes to set it, fires QueryAbortReason::SsrStreamedValueOverride which cancels the actual new query which is loading to replace the invalidated one
730                                    // So have to have a client wide protector, to make sure the effect really only runs once on the client, even if resource() called multiple times.
731                                    if (cfg!(test) || cfg!(not(feature = "ssr"))) && 
732                                        // Preventing memory leak, issue is only for first render:
733                                        (Utc::now() - client_created_at).num_seconds() < 10
734                                    {
735                                        use parking_lot::Mutex;
736
737                                        static ALREADY_SEEN: LazyLock<Mutex<HashMap<(u64, ScopeCacheKey, KeyHash), ()>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
738
739                                        let key = (scope_lookup.scope_id, query_scope_info.cache_key, key_hash);
740                                        let mut guard = ALREADY_SEEN.lock();
741                                        if guard.contains_key(&key) {
742                                            return;
743                                        }
744                                        guard.insert(key, ());
745                                    }
746
747                                    let mut was_pending = false;
748                                    // Protect against race condition: cancel any frontend query that started already if there was a client side 
749                                    // local resource or prefetch/fetch_query etc that started on the initial client-side ticks:
750                                    if let Some(QueryOrPending::Pending { query_abort_tx, .. }) = scope.get_mut_include_pending(&key_hash)
751                                        && let Some(tx) = query_abort_tx.take() {
752                                            tx.send(QueryAbortReason::SsrStreamedValueOverride).unwrap();
753                                            was_pending = true;
754                                        }
755
756                                    if was_pending || !scope.contains_key(&key_hash) {
757                                        let query = Query::new(
758                                            client_options,
759                                            client.untyped_client,
760                                            &query_scope_info,
761                                            query_scope_query_info(),
762                                            key_hash,
763                                            MaybeLocal::new(key),
764                                            MaybeLocal::new(MaybeKey::mapped_to_maybe_value(val.clone()).expect("Just checked MaybeKey::mapped_value_is_some() is true")),
765                                            buster_if_uncached.clone(),
766                                            query_options,
767                                            None,
768                                            #[cfg(any(
769                                                all(debug_assertions, feature = "devtools"),
770                                                feature = "devtools-always"
771                                            ))]
772                                            crate::events::Event::new(
773                                                crate::events::EventVariant::StreamedFromServer,
774                                            ),
775                                        );
776                                        scope.insert(key_hash, query);
777                                    }
778                                    scope
779                                        .get(&key_hash)
780                                        .expect("Just inserted")
781                                        .mark_resource_active(resource_id)
782                                }
783                            },
784                        );
785                    }
786                    Some(())
787                } else {
788                    None
789                }
790            }
791        };
792        // Won't run in tests if not isomorphic, but in prod Effect is wanted to not run on server:
793        if cfg!(test) {
794            // Wants Send + Sync on the Codec despite using SendWrapper in the PhantomData.
795            // Can put a SendWrapper around it as this is test only and they're single threaded:
796            let effect = SendWrapper::new(effect);
797            #[allow(clippy::redundant_closure)]
798            Effect::new_isomorphic(move |v| effect(v));
799        } else {
800            Effect::new(effect);
801        }
802
803        resource
804    }
805
806    /// Prevent reactive updates being triggered from the current updater callback fn.
807    ///
808    /// Call [`QueryClient::untrack_update_query`] in the callback of [`QueryClient::update_query`]
809    /// to prevent resources being updated after the callback completes.
810    ///
811    /// This is really useful when e.g:
812    /// - you know nothing changes after reading the existing value in the callback
813    /// - you don't want resources/subs to react and rerender, given nothings changed.
814    ///
815    /// This function is a noop outside the callbacks of:
816    /// - [`QueryClient::update_query`]
817    /// - [`QueryClient::update_query_async`]
818    /// - [`QueryClient::update_query_async_local`]
819    pub fn untrack_update_query(&self) {
820        // Set markers to false, these will all be reset to true automatically where needed.
821        SYNC_TRACK_UPDATE_MARKER.with(|marker| {
822            marker.store(false, std::sync::atomic::Ordering::Relaxed);
823        });
824        // Ignore returned AccessError if didn't exist (not in async ctx):
825        let _ = ASYNC_TRACK_UPDATE_MARKER.try_with(|marker| {
826            marker.store(false, std::sync::atomic::Ordering::Relaxed);
827        });
828    }
829
830    /// Prefetch a query and store it in the cache.
831    ///
832    /// - Entry doesn't exist: fetched and stored in the cache.
833    /// - Entry exists but stale: fetched and updated in the cache.
834    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
835    ///
836    /// If the cached query changes, active resources using the query will be updated.
837    #[track_caller]
838    pub async fn prefetch_query<K, V, M>(
839        &self,
840        query_scope: impl QueryScopeTrait<K, V, M>,
841        key: impl Borrow<K>,
842    ) where
843        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
844        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
845    {
846        let query_scope_info = QueryScopeInfo::new(&query_scope);
847        let owner_chain = OwnerChain::new(
848            self.untyped_client,
849            query_scope_info.cache_key,
850            Owner::current(),
851        );
852        self.prefetch_inner(
853            query_scope_info,
854            || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
855            async |key| MaybeLocal::new(query_scope.query(key).await),
856            key.borrow(),
857            || MaybeLocal::new(key.borrow().clone()),
858            &owner_chain,
859        )
860        .await
861    }
862
863    /// Prefetch a non-threadsafe query and store it in the cache for this thread only.
864    ///
865    /// - Entry doesn't exist: fetched and stored in the cache.
866    /// - Entry exists but stale: fetched and updated in the cache.
867    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
868    ///
869    /// If the cached query changes, active resources using the query will be updated.
870    #[track_caller]
871    pub async fn prefetch_query_local<K, V, M>(
872        &self,
873        query_scope: impl QueryScopeLocalTrait<K, V, M>,
874        key: impl Borrow<K>,
875    ) where
876        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
877        V: DebugIfDevtoolsEnabled + Clone + 'static,
878    {
879        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
880        let owner_chain = OwnerChain::new(
881            self.untyped_client,
882            query_scope_info.cache_key,
883            Owner::current(),
884        );
885        self.prefetch_inner(
886            query_scope_info,
887            || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
888            async |key| MaybeLocal::new_local(query_scope.query(key).await),
889            key.borrow(),
890            || MaybeLocal::new_local(key.borrow().clone()),
891            &owner_chain,
892        )
893        .await
894    }
895
896    #[track_caller]
897    async fn prefetch_inner<K, V, FetcherFut>(
898        &self,
899        query_scope_info: QueryScopeInfo,
900        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
901        fetcher: impl Fn(K) -> FetcherFut,
902        key: &K,
903        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
904        owner_chain: &OwnerChain,
905    ) where
906        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
907        V: DebugIfDevtoolsEnabled + Clone + 'static,
908        FetcherFut: Future<Output = MaybeLocal<V>>,
909    {
910        self.untyped_client
911            .cached_or_fetch(
912                self.options(),
913                query_scope_info.options,
914                None,
915                &query_scope_info,
916                query_scope_info_for_new_query,
917                key,
918                fetcher,
919                |info| {
920                    match info.variant {
921                        CachedOrFetchCbInputVariant::CachedUntouched => {
922                            if info.cached.stale_or_invalidated() {
923                                return CachedOrFetchCbOutput::Refetch;
924                            }
925                        }
926                        CachedOrFetchCbInputVariant::CachedUpdated => {
927                            // Update anything using it:
928                            info.cached.buster.set(new_buster_id());
929                        }
930                        CachedOrFetchCbInputVariant::Fresh => {}
931                    }
932                    CachedOrFetchCbOutput::Return(())
933                },
934                None,
935                lazy_maybe_local_key,
936                owner_chain,
937            )
938            .await;
939    }
940
941    /// Fetch a query, store it in the cache and return it.
942    ///
943    /// - Entry doesn't exist: fetched and stored in the cache.
944    /// - Entry exists but stale: fetched and updated in the cache.
945    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
946    ///
947    /// If the cached query changes, active resources using the query will be updated.
948    ///
949    /// Returns the up-to-date cached query.
950    #[track_caller]
951    pub async fn fetch_query<K, V, M>(
952        &self,
953        query_scope: impl QueryScopeTrait<K, V, M>,
954        key: impl Borrow<K>,
955    ) -> V
956    where
957        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
958        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
959    {
960        self.untyped_client.fetch_query(query_scope, key).await
961    }
962
963    /// Fetch a non-threadsafe query, store it in the cache for this thread only and return it.
964    ///
965    /// - Entry doesn't exist: fetched and stored in the cache.
966    /// - Entry exists but stale: fetched and updated in the cache.
967    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
968    ///
969    /// If the cached query changes, active resources using the query will be updated.
970    ///
971    /// Returns the up-to-date cached query.
972    #[track_caller]
973    pub async fn fetch_query_local<K, V, M>(
974        &self,
975        query_scope: impl QueryScopeLocalTrait<K, V, M>,
976        key: impl Borrow<K>,
977    ) -> V
978    where
979        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
980        V: DebugIfDevtoolsEnabled + Clone + 'static,
981    {
982        self.untyped_client
983            .fetch_query_local(query_scope, key)
984            .await
985    }
986
987    /// Set the value of a query in the cache.
988    /// This cached value will be available from all threads and take priority over any locally cached value for this query.
989    ///
990    /// Active resources using the query will be updated.
991    #[track_caller]
992    pub fn set_query<K, V, M>(
993        &self,
994        query_scope: impl QueryScopeTrait<K, V, M>,
995        key: impl Borrow<K>,
996        new_value: V,
997    ) where
998        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
999        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1000    {
1001        self.set_inner(
1002            QueryScopeInfo::new(&query_scope),
1003            || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
1004            key.borrow(),
1005            MaybeLocal::new(new_value),
1006            || MaybeLocal::new(key.borrow().clone()),
1007            true,
1008            // like update methods, this does not come from the query function itself,
1009            // so should not reset the invalidated status:
1010            ResetInvalidated::NoReset,
1011        )
1012    }
1013
1014    /// Set the value of a non-threadsafe query in the cache for this thread only.
1015    /// This cached value will only be available from this thread, the cache will return empty, unless a nonlocal value is set, from any other thread.
1016    ///
1017    /// Active resources using the query will be updated.
1018    #[track_caller]
1019    pub fn set_query_local<K, V, M>(
1020        &self,
1021        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1022        key: impl Borrow<K>,
1023        new_value: V,
1024    ) where
1025        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1026        V: DebugIfDevtoolsEnabled + Clone + 'static,
1027    {
1028        self.set_inner::<K, V>(
1029            QueryScopeInfo::new_local(&query_scope),
1030            || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
1031            key.borrow(),
1032            MaybeLocal::new_local(new_value),
1033            || MaybeLocal::new_local(key.borrow().clone()),
1034            true,
1035            // like update methods, this does not come from the query function itself,
1036            // so should not reset the invalidated status:
1037            ResetInvalidated::NoReset,
1038        )
1039    }
1040
1041    #[track_caller]
1042    fn set_inner<K, V>(
1043        &self,
1044        query_scope_info: QueryScopeInfo,
1045        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
1046        key: &K,
1047        new_value: MaybeLocal<V>,
1048        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
1049        track: bool,
1050        reset_invalidated: ResetInvalidated,
1051    ) where
1052        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1053        V: DebugIfDevtoolsEnabled + Clone + 'static,
1054    {
1055        let key_hash = KeyHash::new(key.borrow());
1056        self.untyped_client
1057            .scope_lookup
1058            .with_cached_scope_mut::<K, V, _, _>(
1059                &mut self.untyped_client.scope_lookup.scopes_mut(),
1060                query_scope_info.cache_key,
1061                OnScopeMissing::Create(&query_scope_info),
1062                |_| {},
1063                |maybe_scope, _| {
1064                    let scope = maybe_scope.expect("provided a default");
1065
1066                    // Make sure to look both caches if threadsafe, and prefer threadsafe:
1067                    let maybe_cached = if !new_value.is_local() {
1068                        if let Some(threadsafe_existing) = scope.get_mut_threadsafe_only(&key_hash)
1069                        {
1070                            Some(threadsafe_existing)
1071                        } else {
1072                            scope.get_mut_local_only(&key_hash)
1073                        }
1074                    } else {
1075                        scope.get_mut_local_only(&key_hash)
1076                    };
1077
1078                    if let Some(cached) = maybe_cached {
1079                        cached.set_value(
1080                            new_value,
1081                            track,
1082                            #[cfg(any(
1083                                all(debug_assertions, feature = "devtools"),
1084                                feature = "devtools-always"
1085                            ))]
1086                            crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1087                            reset_invalidated,
1088                        );
1089                    } else {
1090                        let query = Query::new(
1091                            self.options(),
1092                            self.untyped_client,
1093                            &query_scope_info,
1094                            query_scope_info_for_new_query(),
1095                            key_hash,
1096                            lazy_maybe_local_key(),
1097                            new_value,
1098                            ArcRwSignal::new(new_buster_id()),
1099                            query_scope_info.options,
1100                            None,
1101                            #[cfg(any(
1102                                all(debug_assertions, feature = "devtools"),
1103                                feature = "devtools-always"
1104                            ))]
1105                            crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1106                        );
1107                        scope.insert(key_hash, query);
1108                    }
1109                },
1110            );
1111    }
1112
1113    /// Synchronously update the value of a query in the cache with a callback.
1114    ///
1115    /// Active resources using the query will be updated.
1116    ///
1117    /// The callback takes `Option<&mut V>`, will be None if the value is not available in the cache.
1118    ///
1119    /// If you want async and/or always get the value, use [`QueryClient::update_query_async`]/[`QueryClient::update_query_async_local`].
1120    ///
1121    /// Returns the output of the callback.
1122    ///
1123    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1124    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1125    #[track_caller]
1126    pub fn update_query<K, V, T, M>(
1127        &self,
1128        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1129        key: impl Borrow<K>,
1130        modifier: impl FnOnce(Option<&mut V>) -> T,
1131    ) -> T
1132    where
1133        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1134        V: DebugIfDevtoolsEnabled + Clone + 'static,
1135    {
1136        self.update_query_inner::<K, V, T>(
1137            &QueryScopeInfo::new_local(&query_scope),
1138            key.borrow(),
1139            ResetInvalidated::NoReset,
1140            modifier,
1141        )
1142    }
1143
1144    #[track_caller]
1145    fn update_query_inner<K, V, T>(
1146        &self,
1147        query_scope_info: &QueryScopeInfo,
1148        key: &K,
1149        reset_invalidated: ResetInvalidated,
1150        modifier: impl FnOnce(Option<&mut V>) -> T,
1151    ) -> T
1152    where
1153        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1154        V: DebugIfDevtoolsEnabled + Clone + 'static,
1155    {
1156        let key_hash = KeyHash::new(key);
1157        let mut modifier_holder = Some(modifier);
1158
1159        let maybe_return_value = self
1160            .untyped_client
1161            .scope_lookup
1162            .with_cached_scope_mut::<K, V, _, _>(
1163                &mut self.untyped_client.scope_lookup.scopes_mut(),
1164                query_scope_info.cache_key,
1165                OnScopeMissing::Skip,
1166                |_| {},
1167                |maybe_scope, _| {
1168                    if let Some(scope) = maybe_scope
1169                        && let Some(cached) = scope.get_mut(&key_hash)
1170                    {
1171                        let modifier = modifier_holder
1172                            .take()
1173                            .expect("Should never be used more than once. (bug)");
1174                        let return_value = cached.update_value(
1175                            // WONTPANIC: the internals will only supply the value if available from this thread:
1176                            |value| modifier(Some(value.value_mut_may_panic())),
1177                            #[cfg(any(
1178                                all(debug_assertions, feature = "devtools"),
1179                                feature = "devtools-always"
1180                            ))]
1181                            crate::events::Event::new(
1182                                crate::events::EventVariant::DeclarativeUpdate,
1183                            ),
1184                            reset_invalidated,
1185                        );
1186                        return Some(return_value);
1187                    }
1188                    None
1189                },
1190            );
1191        if let Some(return_value) = maybe_return_value {
1192            return_value
1193        } else {
1194            // Didn't exist:
1195            modifier_holder
1196                .take()
1197                .expect("Should never be used more than once. (bug)")(None)
1198        }
1199    }
1200
1201    /// Asynchronously map a threadsafe query in the cache from one value to another.
1202    ///
1203    /// Unlike [`QueryClient::update_query`], this will fetch the query first, if it doesn't exist.
1204    ///
1205    /// Active resources using the query will be updated.
1206    ///
1207    /// Returns the output of the callback.
1208    ///
1209    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1210    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1211    #[track_caller]
1212    pub async fn update_query_async<'a, K, V, T, M>(
1213        &'a self,
1214        query_scope: impl QueryScopeTrait<K, V, M>,
1215        key: impl Borrow<K>,
1216        mapper: impl AsyncFnOnce(&mut V) -> T,
1217    ) -> T
1218    where
1219        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
1220        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1221    {
1222        let query_scope_info = QueryScopeInfo::new(&query_scope);
1223        let owner_chain = OwnerChain::new(
1224            self.untyped_client,
1225            query_scope_info.cache_key,
1226            Owner::current(),
1227        );
1228        self.update_query_async_inner(
1229            query_scope_info,
1230            || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
1231            async |key| MaybeLocal::new(query_scope.query(key).await),
1232            key.borrow(),
1233            mapper,
1234            MaybeLocal::new,
1235            || MaybeLocal::new(key.borrow().clone()),
1236            &owner_chain,
1237        )
1238        .await
1239    }
1240
1241    /// Asynchronously map a non-threadsafe query in the cache from one value to another.
1242    ///
1243    /// Unlike [`QueryClient::update_query`], this will fetch the query first, if it doesn't exist.
1244    ///
1245    /// Active resources using the query will be updated.
1246    ///
1247    /// Returns the output of the callback.
1248    ///
1249    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1250    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1251    #[track_caller]
1252    pub async fn update_query_async_local<'a, K, V, T, M>(
1253        &'a self,
1254        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1255        key: impl Borrow<K>,
1256        mapper: impl AsyncFnOnce(&mut V) -> T,
1257    ) -> T
1258    where
1259        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1260        V: DebugIfDevtoolsEnabled + Clone + 'static,
1261    {
1262        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
1263        let owner_chain = OwnerChain::new(
1264            self.untyped_client,
1265            query_scope_info.cache_key,
1266            Owner::current(),
1267        );
1268        self.update_query_async_inner(
1269            query_scope_info,
1270            || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
1271            async |key| MaybeLocal::new_local(query_scope.query(key).await),
1272            key.borrow(),
1273            mapper,
1274            MaybeLocal::new_local,
1275            || MaybeLocal::new_local(key.borrow().clone()),
1276            &owner_chain,
1277        )
1278        .await
1279    }
1280
1281    #[track_caller]
1282    async fn update_query_async_inner<'a, K, V, T, FetcherFut>(
1283        &'a self,
1284        query_scope_info: QueryScopeInfo,
1285        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
1286        fetcher: impl Fn(K) -> FetcherFut,
1287        key: &K,
1288        mapper: impl AsyncFnOnce(&mut V) -> T,
1289        into_maybe_local: impl FnOnce(V) -> MaybeLocal<V>,
1290        lazy_maybe_local_key: impl Fn() -> MaybeLocal<K>,
1291        owner_chain: &OwnerChain,
1292    ) -> T
1293    where
1294        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1295        V: DebugIfDevtoolsEnabled + Clone + 'static,
1296        FetcherFut: Future<Output = MaybeLocal<V>>,
1297    {
1298        let key_hash = KeyHash::new(key.borrow());
1299
1300        // By holding the fetcher mutex from start to finish, prevent the chance of the value being fetched between the fetch async call and the external user mapper async call and the final set().
1301        let fetcher_mutex = self
1302            .untyped_client
1303            .scope_lookup
1304            .fetcher_mutex::<K, V>(key_hash, &query_scope_info);
1305        let fetcher_guard = fetcher_mutex.lock().await;
1306
1307        let mut new_value = self
1308            .untyped_client
1309            .fetch_inner(
1310                query_scope_info.clone(),
1311                &query_scope_info_for_new_query,
1312                fetcher,
1313                key.borrow(),
1314                Some(&fetcher_guard),
1315                &lazy_maybe_local_key,
1316                owner_chain,
1317            )
1318            .await;
1319
1320        // The fetch will "turn on" is_fetching during it's lifetime, but we also want it during the mapper function:
1321        self.untyped_client
1322            .scope_lookup
1323            .with_notify_fetching(query_scope_info.cache_key, key_hash, false, async {
1324                let track = Arc::new(AtomicBool::new(true));
1325
1326                // Will monitor for invalidations during the user async fn, which could take a long time:
1327                let query_abort_rx = self
1328                    .untyped_client
1329                    .scope_lookup
1330                    .prepare_invalidation_channel::<K, V>(
1331                        &query_scope_info,
1332                        key_hash,
1333                        &lazy_maybe_local_key(),
1334                    );
1335
1336                let result_fut = ASYNC_TRACK_UPDATE_MARKER
1337                    .scope(track.clone(), async { mapper(&mut new_value).await });
1338
1339                // Call the user async function, but also monitor for invalidations:
1340                // Specifically, we care about if .clear() is called.
1341                // If so, we shouldn't set the value, as it will have been generated with the cleared cached query value.
1342                // If invalidated should still set, because the value will be deemed "less stale", but won't de-invalidate it.
1343                // Don't care about QueryAbortReason::SsrStreamedValueOverride
1344                // as it shouldn't be possible to get to this point before all ssr streaming is done.
1345                let mut cleared_during_user_fn = false;
1346                let result = {
1347                    pin_mut!(result_fut);
1348                    pin_mut!(query_abort_rx);
1349                    match futures::future::select(result_fut, query_abort_rx).await {
1350                        futures::future::Either::Left((result, _query_abort_rx)) => result,
1351                        futures::future::Either::Right((query_abort_reason, result_fut)) => {
1352                            if let Ok(QueryAbortReason::Clear) = query_abort_reason {
1353                                // If the query was cleared, don't set the new value.
1354                                cleared_during_user_fn = true;
1355                            }
1356                            result_fut.await
1357                        }
1358                    }
1359                };
1360
1361                // If the query was cleared, don't set the new value.
1362                if !cleared_during_user_fn {
1363                    // Cover a small edge case:
1364                    // - value in threadsafe
1365                    // - .update_query_async_local called
1366                    // - replacement value .set() called, stored in local cache because no type safety on threadsafe cache
1367                    // - now have 2 versions in both caches
1368                    // by trying to update first, means it'll stick with existing cache if it exists,
1369                    // otherwise doesn't matter and can just be set normally:
1370                    let mut new_value = Some(new_value);
1371                    let updated = self.update_query_inner::<K, V, _>(
1372                        &query_scope_info,
1373                        key,
1374                        // fetch_inner would've reset it if needed, then the update itself shouldn't change the value:
1375                        ResetInvalidated::NoReset,
1376                        |value| {
1377                            if let Some(value) = value {
1378                                // This sync update call itself needs untracking if !track:
1379                                if !track.load(std::sync::atomic::Ordering::Relaxed) {
1380                                    self.untrack_update_query();
1381                                }
1382
1383                                *value = new_value.take().expect("Should be Some");
1384                                true
1385                            } else {
1386                                false
1387                            }
1388                        },
1389                    );
1390                    if !updated {
1391                        self.set_inner::<K, V>(
1392                            query_scope_info,
1393                            query_scope_info_for_new_query,
1394                            key.borrow(),
1395                            into_maybe_local(
1396                                new_value
1397                                    .take()
1398                                    .expect("Should be Some, should only be here if not updated"),
1399                            ),
1400                            lazy_maybe_local_key,
1401                            track.load(std::sync::atomic::Ordering::Relaxed),
1402                            // fetch_inner would've reset it if needed, then the update itself shouldn't change the value:
1403                            ResetInvalidated::NoReset,
1404                        );
1405                    }
1406                }
1407
1408                result
1409            })
1410            .await
1411    }
1412
1413    /// Synchronously get a query from the cache, if it exists.
1414    #[track_caller]
1415    pub fn get_cached_query<K, V, M>(
1416        &self,
1417        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1418        key: impl Borrow<K>,
1419    ) -> Option<V>
1420    where
1421        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1422        V: DebugIfDevtoolsEnabled + Clone + 'static,
1423    {
1424        self.untyped_client
1425            .scope_lookup
1426            .with_cached_query::<K, V, _>(
1427                &KeyHash::new(key.borrow()),
1428                &query_scope.cache_key(),
1429                |maybe_cached| {
1430                    // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1431                    maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1432                },
1433            )
1434    }
1435
1436    /// Synchronously check if a query exists in the cache.
1437    ///
1438    /// Returns `true` if the query exists.
1439    #[track_caller]
1440    pub fn query_exists<K, V, M>(
1441        &self,
1442        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1443        key: impl Borrow<K>,
1444    ) -> bool
1445    where
1446        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1447        V: DebugIfDevtoolsEnabled + 'static,
1448    {
1449        let key_hash = KeyHash::new(key.borrow());
1450        self.untyped_client
1451            .scope_lookup
1452            .with_cached_query::<K, V, _>(&key_hash, &query_scope.cache_key(), |maybe_cached| {
1453                maybe_cached.is_some()
1454            })
1455    }
1456
1457    /// Subscribe to the `is_loading` status of a query.
1458    /// The keyer function is reactive to changes in `K`.
1459    ///
1460    /// This is `true` when the query is in the process of fetching data for the first time.
1461    /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1462    ///
1463    /// From a resource perspective:
1464    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1465    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1466    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1467    #[track_caller]
1468    pub fn subscribe_is_loading<K, MaybeKey, V, M>(
1469        &self,
1470        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1471        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1472    ) -> Signal<bool>
1473    where
1474        K: Hash + Send + Sync + 'static,
1475        MaybeKey: QueryMaybeKey<K, V>,
1476        MaybeKey::MappedValue: 'static,
1477        V: 'static,
1478    {
1479        self.subscribe_is_loading_arc(query_scope, keyer).into()
1480    }
1481
1482    /// Subscribe to the `is_loading` status of a query with a non-threadsafe key.
1483    /// The keyer function is reactive to changes in `K`.
1484    ///
1485    /// This is `true` when the query is in the process of fetching data for the first time.
1486    /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1487    ///
1488    /// From a resource perspective:
1489    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1490    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1491    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1492    #[track_caller]
1493    pub fn subscribe_is_loading_local<K, MaybeKey, V, M>(
1494        &self,
1495        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1496        keyer: impl Fn() -> MaybeKey + 'static,
1497    ) -> Signal<bool>
1498    where
1499        K: Hash + 'static,
1500        MaybeKey: QueryMaybeKey<K, V>,
1501        MaybeKey::MappedValue: 'static,
1502        V: 'static,
1503    {
1504        self.subscribe_is_loading_arc_local(query_scope, keyer)
1505            .into()
1506    }
1507
1508    /// Subscribe to the `is_loading` status of a query with a non-threadsafe key.
1509    /// The keyer function is reactive to changes in `K`.
1510    ///
1511    /// This is `true` when the query is in the process of fetching data for the first time.
1512    /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1513    ///
1514    /// From a resource perspective:
1515    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1516    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1517    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1518    #[track_caller]
1519    pub fn subscribe_is_loading_arc_local<K, MaybeKey, V, M>(
1520        &self,
1521        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1522        keyer: impl Fn() -> MaybeKey + 'static,
1523    ) -> ArcSignal<bool>
1524    where
1525        K: Hash + 'static,
1526        MaybeKey: QueryMaybeKey<K, V>,
1527        MaybeKey::MappedValue: 'static,
1528        V: 'static,
1529    {
1530        let keyer = SendWrapper::new(keyer);
1531        self.untyped_client
1532            .scope_lookup
1533            .scope_subscriptions_mut()
1534            .add_is_loading_subscription(
1535                query_scope.cache_key(),
1536                MaybeLocal::new_local(ArcSignal::derive(move || {
1537                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1538                })),
1539            )
1540    }
1541
1542    /// Subscribe to the `is_loading` status of a query.
1543    /// The keyer function is reactive to changes in `K`.
1544    ///
1545    /// This is `true` when the query is in the process of fetching data for the first time.
1546    /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1547    ///
1548    /// From a resource perspective:
1549    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1550    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1551    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1552    #[track_caller]
1553    pub fn subscribe_is_loading_arc<K, MaybeKey, V, M>(
1554        &self,
1555        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1556        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1557    ) -> ArcSignal<bool>
1558    where
1559        K: Hash + Send + Sync + 'static,
1560        MaybeKey: QueryMaybeKey<K, V>,
1561        MaybeKey::MappedValue: 'static,
1562        V: 'static,
1563    {
1564        self.untyped_client
1565            .scope_lookup
1566            .scope_subscriptions_mut()
1567            .add_is_loading_subscription(
1568                query_scope.cache_key(),
1569                MaybeLocal::new(ArcSignal::derive(move || {
1570                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1571                })),
1572            )
1573    }
1574
1575    /// Subscribe to the `is_fetching` status of a query.
1576    /// The keyer function is reactive to changes in `K`.
1577    ///
1578    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1579    /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1580    ///
1581    /// From a resource perspective:
1582    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1583    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1584    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1585    #[track_caller]
1586    pub fn subscribe_is_fetching<K, MaybeKey, V, M>(
1587        &self,
1588        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1589        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1590    ) -> Signal<bool>
1591    where
1592        K: Hash + Send + Sync + 'static,
1593        MaybeKey: QueryMaybeKey<K, V>,
1594        MaybeKey::MappedValue: 'static,
1595        V: 'static,
1596    {
1597        self.subscribe_is_fetching_arc(query_scope, keyer).into()
1598    }
1599
1600    /// Subscribe to the `is_fetching` status of a query with a non-threadsafe key.
1601    /// The keyer function is reactive to changes in `K`.
1602    ///
1603    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1604    /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1605    ///
1606    /// From a resource perspective:
1607    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1608    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1609    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1610    #[track_caller]
1611    pub fn subscribe_is_fetching_local<K, MaybeKey, V, M>(
1612        &self,
1613        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1614        keyer: impl Fn() -> MaybeKey + 'static,
1615    ) -> Signal<bool>
1616    where
1617        K: Hash + 'static,
1618        MaybeKey: QueryMaybeKey<K, V>,
1619        MaybeKey::MappedValue: 'static,
1620        V: 'static,
1621    {
1622        self.subscribe_is_fetching_arc_local(query_scope, keyer)
1623            .into()
1624    }
1625
1626    /// Subscribe to the `is_fetching` status of a query with a non-threadsafe key.
1627    /// The keyer function is reactive to changes in `K`.
1628    ///
1629    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1630    /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1631    ///
1632    /// From a resource perspective:
1633    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1634    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1635    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1636    #[track_caller]
1637    pub fn subscribe_is_fetching_arc_local<K, MaybeKey, V, M>(
1638        &self,
1639        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1640        keyer: impl Fn() -> MaybeKey + 'static,
1641    ) -> ArcSignal<bool>
1642    where
1643        K: Hash + 'static,
1644        MaybeKey: QueryMaybeKey<K, V>,
1645        MaybeKey::MappedValue: 'static,
1646        V: 'static,
1647    {
1648        let keyer = SendWrapper::new(keyer);
1649        self.untyped_client
1650            .scope_lookup
1651            .scope_subscriptions_mut()
1652            .add_is_fetching_subscription(
1653                query_scope.cache_key(),
1654                MaybeLocal::new_local(ArcSignal::derive(move || {
1655                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1656                })),
1657            )
1658    }
1659
1660    /// Subscribe to the `is_fetching` status of a query.
1661    /// The keyer function is reactive to changes in `K`.
1662    ///
1663    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1664    /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1665    ///
1666    /// From a resource perspective:
1667    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1668    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1669    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1670    #[track_caller]
1671    pub fn subscribe_is_fetching_arc<K, MaybeKey, V, M>(
1672        &self,
1673        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1674        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1675    ) -> ArcSignal<bool>
1676    where
1677        K: Hash + Send + Sync + 'static,
1678        MaybeKey: QueryMaybeKey<K, V>,
1679        MaybeKey::MappedValue: 'static,
1680        V: 'static,
1681    {
1682        self.untyped_client
1683            .scope_lookup
1684            .scope_subscriptions_mut()
1685            .add_is_fetching_subscription(
1686                query_scope.cache_key(),
1687                MaybeLocal::new(ArcSignal::derive(move || {
1688                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1689                })),
1690            )
1691    }
1692
1693    /// Subscribe to the value of a query.
1694    /// The keyer function is reactive to changes in `K`.
1695    ///
1696    /// This will update whenever the query is created, removed, updated, refetched or set.
1697    ///
1698    /// Compared to a resource:
1699    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.       
1700    #[track_caller]
1701    pub fn subscribe_value<K, MaybeKey, V, M>(
1702        &self,
1703        query_scope: impl QueryScopeTrait<K, V, M>,
1704        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1705    ) -> Signal<Option<V>>
1706    where
1707        K: DebugIfDevtoolsEnabled + Hash + Clone + Send + Sync + 'static,
1708        MaybeKey: QueryMaybeKey<K, V>,
1709        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1710        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1711    {
1712        self.subscribe_value_arc(query_scope, keyer).into()
1713    }
1714
1715    /// Subscribe to the value of a non-threadsafe query.
1716    /// The keyer function is reactive to changes in `K`.
1717    ///
1718    /// This will update whenever the query is created, removed, updated, refetched or set.
1719    ///
1720    /// Compared to a resource:
1721    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1722    #[track_caller]
1723    pub fn subscribe_value_local<K, MaybeKey, V, M>(
1724        &self,
1725        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1726        keyer: impl Fn() -> MaybeKey + 'static,
1727    ) -> Signal<Option<V>, LocalStorage>
1728    where
1729        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1730        MaybeKey: QueryMaybeKey<K, V>,
1731        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1732        V: DebugIfDevtoolsEnabled + Clone + 'static,
1733    {
1734        self.subscribe_value_arc_local(query_scope, keyer).into()
1735    }
1736
1737    /// Subscribe to the value of a non-threadsafe query.
1738    /// The keyer function is reactive to changes in `K`.
1739    ///
1740    /// This will update whenever the query is created, removed, updated, refetched or set.
1741    ///
1742    /// Compared to a resource:
1743    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1744    #[track_caller]
1745    pub fn subscribe_value_arc_local<K, MaybeKey, V, M>(
1746        &self,
1747        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1748        keyer: impl Fn() -> MaybeKey + 'static,
1749    ) -> ArcLocalSignal<Option<V>>
1750    where
1751        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1752        MaybeKey: QueryMaybeKey<K, V>,
1753        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1754        V: DebugIfDevtoolsEnabled + Clone + 'static,
1755    {
1756        let cache_key = query_scope.cache_key();
1757        let keyer = ArcLocalSignal::derive_local(move || {
1758            keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1759        });
1760        let dyn_signal = self
1761            .untyped_client
1762            .scope_lookup
1763            .scope_subscriptions_mut()
1764            .add_value_set_updated_or_removed_subscription(
1765                cache_key,
1766                MaybeLocal::new_local({
1767                    let keyer = keyer.clone();
1768                    move || keyer.get()
1769                }),
1770            );
1771
1772        let scope_lookup = self.untyped_client.scope_lookup;
1773        ArcLocalSignal::derive_local(move || {
1774            dyn_signal.track();
1775            if let Some(key) = keyer.read_untracked().as_ref() {
1776                scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1777                    // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1778                    maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1779                })
1780            } else {
1781                None
1782            }
1783        })
1784    }
1785
1786    /// Subscribe to the value of a query.
1787    /// The keyer function is reactive to changes in `K`.
1788    ///
1789    /// This will update whenever the query is created, removed, updated, refetched or set.
1790    ///
1791    /// Compared to a resource:
1792    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1793    #[track_caller]
1794    pub fn subscribe_value_arc<K, MaybeKey, V, M>(
1795        &self,
1796        query_scope: impl QueryScopeTrait<K, V, M>,
1797        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1798    ) -> ArcSignal<Option<V>>
1799    where
1800        K: DebugIfDevtoolsEnabled + Hash + Clone + Send + Sync + 'static,
1801        MaybeKey: QueryMaybeKey<K, V>,
1802        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1803        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1804    {
1805        let cache_key = query_scope.cache_key();
1806        let keyer = ArcSignal::derive(move || keyer().into_maybe_key().map(|k| KeyHash::new(&k)));
1807
1808        let dyn_signal = self
1809            .untyped_client
1810            .scope_lookup
1811            .scope_subscriptions_mut()
1812            .add_value_set_updated_or_removed_subscription(
1813                cache_key,
1814                MaybeLocal::new({
1815                    let keyer = keyer.clone();
1816                    move || keyer.get()
1817                }),
1818            );
1819
1820        let scope_lookup = self.untyped_client.scope_lookup;
1821        ArcSignal::derive(move || {
1822            dyn_signal.track();
1823            if let Some(key) = keyer.read_untracked().as_ref() {
1824                scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1825                    // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1826                    maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1827                })
1828            } else {
1829                None
1830            }
1831        })
1832    }
1833
1834    /// Mark a query as stale.
1835    ///
1836    /// Any active resources will refetch in the background, replacing them when ready.
1837    #[track_caller]
1838    pub fn invalidate_query<K, V, M>(
1839        &self,
1840        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1841        key: impl Borrow<K>,
1842    ) -> bool
1843    where
1844        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1845        V: DebugIfDevtoolsEnabled + Clone + 'static,
1846    {
1847        self.untyped_client.invalidate_query(query_scope, key)
1848    }
1849
1850    /// Mark multiple queries of a specific type as stale.
1851    ///
1852    /// Any active resources will refetch in the background, replacing them when ready.
1853    #[track_caller]
1854    pub fn invalidate_queries<K, V, KRef, M>(
1855        &self,
1856        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1857        keys: impl IntoIterator<Item = KRef>,
1858    ) -> Vec<KRef>
1859    where
1860        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1861        V: DebugIfDevtoolsEnabled + Clone + 'static,
1862        KRef: Borrow<K>,
1863    {
1864        self.untyped_client.invalidate_queries(query_scope, keys)
1865    }
1866
1867    /// Mark one or more queries of a specific type as stale with a callback.
1868    ///
1869    /// When the callback returns `true`, the specific query will be invalidated.
1870    ///
1871    /// Any active resources subscribing to that query will refetch in the background,
1872    /// replacing them when ready.
1873    #[track_caller]
1874    pub fn invalidate_queries_with_predicate<K, V, M>(
1875        &self,
1876        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1877        should_invalidate: impl Fn(&K) -> bool,
1878    ) where
1879        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1880        V: DebugIfDevtoolsEnabled + Clone + 'static,
1881    {
1882        let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1883        let mut cbs_scopes = vec![];
1884        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
1885        self.untyped_client
1886            .scope_lookup
1887            .with_cached_scope_mut::<K, V, _, _>(
1888                &mut scopes,
1889                query_scope_info.cache_key,
1890                OnScopeMissing::Skip,
1891                |_| {},
1892                |maybe_scope, _| {
1893                    if let Some(scope) = maybe_scope {
1894                        for query in scope.all_queries_mut_include_pending() {
1895                            if let Some(key) = query.key().value_if_safe()
1896                                && should_invalidate(key)
1897                            {
1898                                let cb_scopes = query.invalidate(QueryAbortReason::Invalidate);
1899                                cbs_scopes.push(cb_scopes);
1900                            }
1901                        }
1902                    }
1903                },
1904            );
1905        let mut cbs_external = vec![];
1906        for cb in cbs_scopes {
1907            if let Some(cb_external) = cb(&mut scopes) {
1908                cbs_external.push(cb_external);
1909            }
1910        }
1911        drop(scopes);
1912        run_external_callbacks(
1913            self.untyped_client,
1914            query_scope_info.cache_key,
1915            cbs_external,
1916        );
1917    }
1918
1919    /// Mark all queries of a specific type as stale.
1920    ///
1921    /// Any active resources will refetch in the background, replacing them when ready.
1922    #[track_caller]
1923    pub fn invalidate_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
1924    where
1925        K: Hash + 'static,
1926        V: Clone + 'static,
1927    {
1928        self.invalidate_query_scope_inner(query_scope.cache_key())
1929    }
1930
1931    pub(crate) fn invalidate_query_scope_inner(&self, scope_cache_key: ScopeCacheKey) {
1932        let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1933        let mut cbs_scopes = vec![];
1934        if let Some(scope) = scopes.get_mut(&scope_cache_key) {
1935            let cb_scopes = scope.invalidate_scope(QueryAbortReason::Invalidate);
1936            cbs_scopes.push(cb_scopes);
1937            for buster in scope.busters() {
1938                buster.try_set(new_buster_id());
1939            }
1940        }
1941        let mut cbs_external = vec![];
1942        for cb in cbs_scopes {
1943            if let Some(cb_external) = cb(&mut scopes) {
1944                cbs_external.push(cb_external);
1945            }
1946        }
1947        drop(scopes);
1948        run_external_callbacks(self.untyped_client, scope_cache_key, cbs_external);
1949    }
1950
1951    /// Mark all queries as stale.
1952    ///
1953    /// Any active resources will refetch in the background, replacing them when ready.
1954    ///
1955    /// To have the cache instantly cleared and all listeners reset to pending, e.g. for user logout,
1956    /// see [`QueryClient::clear`].
1957    #[track_caller]
1958    pub fn invalidate_all_queries(&self) {
1959        let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1960        let mut cbs_scopes = vec![];
1961        for scope in scopes.values_mut() {
1962            let busters = scope.busters();
1963            let cb_scopes = scope.invalidate_scope(QueryAbortReason::Invalidate);
1964            cbs_scopes.push((cb_scopes, scope.cache_key()));
1965            for buster in busters {
1966                buster.try_set(new_buster_id());
1967            }
1968        }
1969        let mut cbs_external = vec![];
1970        for (cb, cache_key) in cbs_scopes {
1971            if let Some(cb_external) = cb(&mut scopes) {
1972                cbs_external.push((cb_external, cache_key));
1973            }
1974        }
1975        drop(scopes);
1976        for (cb_external, cache_key) in cbs_external {
1977            run_external_callbacks(self.untyped_client, cache_key, vec![cb_external]);
1978        }
1979    }
1980
1981    /// Empty the cache, like [`QueryClient::invalidate_all_queries`] except:
1982    /// - the cache is instantly cleared of all queries
1983    /// - All active resources etc are reset to their pending state instantly
1984    ///   until the new query finishes refetching.
1985    ///
1986    /// Useful for e.g. user logout.
1987    ///
1988    /// [`QueryClient::invalidate_all_queries`] on the other hand, will only refetch active queries in the background, replacing them when ready.
1989    #[track_caller]
1990    pub fn clear(&self) {
1991        let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1992        let mut cbs_scopes = vec![];
1993        for scope in scopes.values_mut() {
1994            let busters = scope.busters();
1995            let cb_scopes = scope.invalidate_scope(QueryAbortReason::Clear);
1996            cbs_scopes.push((cb_scopes, scope.cache_key()));
1997            scope.clear();
1998            for buster in busters {
1999                buster.try_set(new_buster_id());
2000            }
2001        }
2002        let mut cbs_external = vec![];
2003        for (cb, cache_key) in cbs_scopes {
2004            if let Some(cb_external) = cb(&mut scopes) {
2005                cbs_external.push((cb_external, cache_key));
2006            }
2007        }
2008        drop(scopes);
2009        for (cb_external, cache_key) in cbs_external {
2010            run_external_callbacks(self.untyped_client, cache_key, vec![cb_external]);
2011        }
2012    }
2013
2014    #[cfg(test)]
2015    pub(crate) fn total_cached_queries(&self) -> usize {
2016        self.untyped_client
2017            .scope_lookup
2018            .scopes()
2019            .values()
2020            .map(|scope| scope.total_cached_queries())
2021            .sum()
2022    }
2023
2024    #[cfg(test)]
2025    pub(crate) fn is_key_invalid<K, V, M>(
2026        &self,
2027        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2028        key: impl Borrow<K>,
2029    ) -> bool
2030    where
2031        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2032        V: DebugIfDevtoolsEnabled + Clone + 'static,
2033    {
2034        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2035        self.untyped_client
2036            .scope_lookup
2037            .with_cached_scope_mut::<K, V, _, _>(
2038                &mut self.untyped_client.scope_lookup.scopes_mut(),
2039                query_scope_info.cache_key,
2040                OnScopeMissing::Skip,
2041                |_| {},
2042                |maybe_scope, _| {
2043                    if let Some(scope) = maybe_scope {
2044                        scope
2045                            .get(&KeyHash::new(key.borrow()))
2046                            .map(|query| query.is_invalidated())
2047                            .unwrap_or(false)
2048                    } else {
2049                        false
2050                    }
2051                },
2052            )
2053    }
2054
2055    #[cfg(test)]
2056    #[allow(dead_code)]
2057    pub(crate) fn mark_key_valid<K, V, M>(
2058        &self,
2059        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2060        key: impl Borrow<K>,
2061    ) where
2062        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2063        V: DebugIfDevtoolsEnabled + Clone + 'static,
2064    {
2065        self.untyped_client
2066            .scope_lookup
2067            .with_cached_scope_mut::<K, V, _, _>(
2068                &mut self.untyped_client.scope_lookup.scopes_mut(),
2069                QueryScopeInfo::new_local(&query_scope).cache_key,
2070                OnScopeMissing::Skip,
2071                |_| {},
2072                |maybe_scope, _| {
2073                    if let Some(scope) = maybe_scope
2074                        && let Some(query) = scope.get_mut(&KeyHash::new(key.borrow()))
2075                    {
2076                        query.mark_valid();
2077                    }
2078                },
2079            );
2080    }
2081
2082    #[cfg(test)]
2083    pub(crate) fn subscriber_count(&self) -> usize {
2084        self.untyped_client
2085            .scope_lookup
2086            .scope_subscriptions_mut()
2087            .count()
2088    }
2089}
2090
2091impl UntypedQueryClient {
2092    #[track_caller]
2093    pub(crate) async fn fetch_query<K, V, M>(
2094        &self,
2095        query_scope: impl QueryScopeTrait<K, V, M>,
2096        key: impl Borrow<K>,
2097    ) -> V
2098    where
2099        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
2100        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
2101    {
2102        let query_scope_info = QueryScopeInfo::new(&query_scope);
2103        let owner_chain = OwnerChain::new(*self, query_scope_info.cache_key, Owner::current());
2104        self.fetch_inner(
2105            query_scope_info,
2106            || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
2107            async |key| MaybeLocal::new(query_scope.query(key).await),
2108            key.borrow(),
2109            None,
2110            || MaybeLocal::new(key.borrow().clone()),
2111            &owner_chain,
2112        )
2113        .await
2114    }
2115
2116    #[track_caller]
2117    pub(crate) async fn fetch_query_local<K, V, M>(
2118        &self,
2119        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2120        key: impl Borrow<K>,
2121    ) -> V
2122    where
2123        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
2124        V: DebugIfDevtoolsEnabled + Clone + 'static,
2125    {
2126        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2127        let owner_chain = OwnerChain::new(*self, query_scope_info.cache_key, Owner::current());
2128        self.fetch_inner(
2129            query_scope_info,
2130            || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
2131            async |key| MaybeLocal::new_local(query_scope.query(key).await),
2132            key.borrow(),
2133            None,
2134            || MaybeLocal::new_local(key.borrow().clone()),
2135            &owner_chain,
2136        )
2137        .await
2138    }
2139
2140    #[track_caller]
2141    async fn fetch_inner<K, V, FetcherFut>(
2142        &self,
2143        query_scope_info: QueryScopeInfo,
2144        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
2145        fetcher: impl Fn(K) -> FetcherFut,
2146        key: &K,
2147        maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
2148        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
2149        owner_chain: &OwnerChain,
2150    ) -> V
2151    where
2152        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
2153        V: DebugIfDevtoolsEnabled + Clone + 'static,
2154        FetcherFut: Future<Output = MaybeLocal<V>>,
2155    {
2156        self.cached_or_fetch(
2157            self.options,
2158            query_scope_info.options,
2159            None,
2160            &query_scope_info,
2161            query_scope_info_for_new_query,
2162            key,
2163            fetcher,
2164            |info| {
2165                match info.variant {
2166                    CachedOrFetchCbInputVariant::CachedUntouched => {
2167                        if info.cached.stale_or_invalidated() {
2168                            return CachedOrFetchCbOutput::Refetch;
2169                        }
2170                    }
2171                    CachedOrFetchCbInputVariant::CachedUpdated => {
2172                        // Update anything using it:
2173                        info.cached.buster.set(new_buster_id());
2174                    }
2175                    CachedOrFetchCbInputVariant::Fresh => {}
2176                }
2177                CachedOrFetchCbOutput::Return(
2178                    // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
2179                    info.cached.value_maybe_stale().value_may_panic().clone(),
2180                )
2181            },
2182            maybe_preheld_fetcher_mutex_guard,
2183            lazy_maybe_local_key,
2184            owner_chain,
2185        )
2186        .await
2187    }
2188
2189    #[track_caller]
2190    pub(crate) fn invalidate_query<K, V, M>(
2191        &self,
2192        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2193        key: impl Borrow<K>,
2194    ) -> bool
2195    where
2196        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2197        V: DebugIfDevtoolsEnabled + Clone + 'static,
2198    {
2199        let cleared = self.invalidate_queries(query_scope, std::iter::once(key));
2200        !cleared.is_empty()
2201    }
2202
2203    #[track_caller]
2204    pub(crate) fn invalidate_queries<K, V, KRef, M>(
2205        &self,
2206        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2207        keys: impl IntoIterator<Item = KRef>,
2208    ) -> Vec<KRef>
2209    where
2210        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2211        V: DebugIfDevtoolsEnabled + Clone + 'static,
2212        KRef: Borrow<K>,
2213    {
2214        self.invalidate_queries_inner::<K, V, _>(&QueryScopeInfo::new_local(&query_scope), keys)
2215    }
2216
2217    #[track_caller]
2218    pub(crate) fn invalidate_queries_inner<K, V, KRef>(
2219        &self,
2220        query_scope_info: &QueryScopeInfo,
2221        keys: impl IntoIterator<Item = KRef>,
2222    ) -> Vec<KRef>
2223    where
2224        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2225        V: DebugIfDevtoolsEnabled + Clone + 'static,
2226        KRef: Borrow<K>,
2227    {
2228        let keys = keys.into_iter().collect::<Vec<_>>();
2229        let key_hashes = keys
2230            .iter()
2231            .map(|key| KeyHash::new(key.borrow()))
2232            .collect::<Vec<_>>();
2233
2234        if key_hashes.is_empty() {
2235            return vec![];
2236        }
2237
2238        let mut scopes = self.scope_lookup.scopes_mut();
2239        let mut cbs_scopes = vec![];
2240        let results = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2241            &mut scopes,
2242            query_scope_info.cache_key,
2243            OnScopeMissing::Skip,
2244            |_| {},
2245            |maybe_scope, _| {
2246                let mut invalidated = vec![];
2247                if let Some(scope) = maybe_scope {
2248                    for (key, key_hash) in keys.into_iter().zip(key_hashes.iter()) {
2249                        if let Some(cached) = scope.get_mut_include_pending(key_hash) {
2250                            let cb_scopes = cached.invalidate(QueryAbortReason::Invalidate);
2251                            cbs_scopes.push(cb_scopes);
2252                            invalidated.push(key);
2253                        }
2254                    }
2255                }
2256                invalidated
2257            },
2258        );
2259        let mut cbs_external = vec![];
2260        for cb in cbs_scopes {
2261            if let Some(cb_external) = cb(&mut scopes) {
2262                cbs_external.push(cb_external);
2263            }
2264        }
2265        drop(scopes);
2266        run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2267        results
2268    }
2269
2270    #[cfg(test)]
2271    /// Clear a specific query scope of all its queries.
2272    #[track_caller]
2273    pub(crate) fn clear_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
2274    where
2275        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2276        V: DebugIfDevtoolsEnabled + Clone + 'static,
2277    {
2278        let mut scopes = self.scope_lookup.scopes_mut();
2279        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2280        let mut cbs_scopes = vec![];
2281        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2282            &mut scopes,
2283            query_scope_info.cache_key,
2284            OnScopeMissing::Skip,
2285            |_| {},
2286            |maybe_scope, _| {
2287                if let Some(scope) = maybe_scope {
2288                    for cached in scope.all_queries_mut_include_pending() {
2289                        let cb_scopes = cached.invalidate(QueryAbortReason::Clear);
2290                        cbs_scopes.push(cb_scopes);
2291                    }
2292                }
2293            },
2294        );
2295        let mut cbs_external = vec![];
2296        for cb in cbs_scopes {
2297            if let Some(cb_external) = cb(&mut scopes) {
2298                cbs_external.push(cb_external);
2299            }
2300        }
2301        drop(scopes);
2302        run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2303    }
2304
2305    /// Clear a specific query key.
2306    #[track_caller]
2307    pub(crate) fn clear_query<K, V, M>(
2308        &self,
2309        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2310        key: impl Borrow<K>,
2311    ) -> bool
2312    where
2313        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2314        V: DebugIfDevtoolsEnabled + Clone + 'static,
2315    {
2316        let mut scopes = self.scope_lookup.scopes_mut();
2317        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2318        let mut cbs_scopes = vec![];
2319        let result = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2320            &mut scopes,
2321            query_scope_info.cache_key,
2322            OnScopeMissing::Skip,
2323            |_| {},
2324            |maybe_scope, _| {
2325                if let Some(scope) = maybe_scope {
2326                    let key_hash = KeyHash::new(key.borrow());
2327                    if let Some(cached) = scope.get_mut_include_pending(&key_hash) {
2328                        let cb_scopes = cached.invalidate(QueryAbortReason::Clear);
2329                        cbs_scopes.push(cb_scopes);
2330                    }
2331                    let removed = scope.remove_entry(&key_hash);
2332                    // Calling it again just in case because in tests might be in sync cache and non sync cache:
2333                    scope.remove_entry(&key_hash);
2334                    return removed.is_some();
2335                }
2336                false
2337            },
2338        );
2339        let mut cbs_external = vec![];
2340        for cb in cbs_scopes {
2341            if let Some(cb_external) = cb(&mut scopes) {
2342                cbs_external.push(cb_external);
2343            }
2344        }
2345        drop(scopes);
2346        run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2347        result
2348    }
2349
2350    pub(crate) fn query_metadata<K, V>(
2351        &self,
2352        scope_cache_key: ScopeCacheKey,
2353        key: impl Borrow<K>,
2354    ) -> Option<QueryMetadata>
2355    where
2356        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2357        V: DebugIfDevtoolsEnabled + Clone + 'static,
2358    {
2359        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2360            &mut self.scope_lookup.scopes_mut(),
2361            scope_cache_key,
2362            OnScopeMissing::Skip,
2363            |_| {},
2364            |maybe_scope, _| {
2365                if let Some(scope) = maybe_scope
2366                    && let Some(cached) = scope.get(&KeyHash::new(key.borrow()))
2367                {
2368                    return Some(QueryMetadata {
2369                        updated_at: cached.updated_at,
2370                        stale_or_invalidated: cached.stale_or_invalidated(),
2371                    });
2372                }
2373                None
2374            },
2375        )
2376    }
2377
2378    pub async fn cached_or_fetch<K, V, T, FetcherFut>(
2379        &self,
2380        client_options: QueryOptions,
2381        scope_options: Option<QueryOptions>,
2382        maybe_buster_if_uncached: Option<ArcRwSignal<u64>>,
2383        query_scope_info: &QueryScopeInfo,
2384        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
2385        key: &K,
2386        fetcher: impl Fn(K) -> FetcherFut,
2387        return_cb: impl Fn(CachedOrFetchCbInput<K, V>) -> CachedOrFetchCbOutput<T>,
2388        maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
2389        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
2390        owner_chain: &OwnerChain,
2391    ) -> T
2392    where
2393        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2394        V: DebugIfDevtoolsEnabled + Clone + 'static,
2395        FetcherFut: Future<Output = MaybeLocal<V>>,
2396    {
2397        let scope_lookup = &self.scope_lookup;
2398        let key_hash = KeyHash::new(key);
2399        let mut cached_buster = None;
2400        let next_directive = scope_lookup.with_cached_query::<K, V, _>(
2401            &key_hash,
2402            &query_scope_info.cache_key,
2403            |maybe_cached| {
2404                if let Some(cached) = maybe_cached {
2405                    cached_buster = Some(cached.buster.clone());
2406                    return_cb(CachedOrFetchCbInput {
2407                        cached,
2408                        variant: CachedOrFetchCbInputVariant::CachedUntouched,
2409                    })
2410                } else {
2411                    CachedOrFetchCbOutput::Refetch
2412                }
2413            },
2414        );
2415
2416        match next_directive {
2417            CachedOrFetchCbOutput::Return(value) => value,
2418            CachedOrFetchCbOutput::Refetch => {
2419                // Will probably need to fetch, unless someone fetches whilst trying to get hold of the fetch mutex:
2420                let fetcher_mutex = scope_lookup.fetcher_mutex::<K, V>(key_hash, query_scope_info);
2421                let _maybe_fetcher_mutex_guard_local = if maybe_preheld_fetcher_mutex_guard
2422                    .is_none()
2423                {
2424                    let _fetcher_guard = match fetcher_mutex.try_lock() {
2425                        Some(fetcher_guard) => fetcher_guard,
2426                        None => {
2427                            // If have to wait, should check cache again in case it was fetched while waiting.
2428                            let fetcher_guard = fetcher_mutex.lock().await;
2429                            let next_directive = scope_lookup.with_cached_query::<K, V, _>(
2430                                &key_hash,
2431                                &query_scope_info.cache_key,
2432                                |maybe_cached| {
2433                                    if let Some(cached) = maybe_cached {
2434                                        cached_buster = Some(cached.buster.clone());
2435                                        return_cb(CachedOrFetchCbInput {
2436                                            cached,
2437                                            variant: CachedOrFetchCbInputVariant::CachedUntouched,
2438                                        })
2439                                    } else {
2440                                        CachedOrFetchCbOutput::Refetch
2441                                    }
2442                                },
2443                            );
2444                            match next_directive {
2445                                CachedOrFetchCbOutput::Return(value) => return value,
2446                                CachedOrFetchCbOutput::Refetch => fetcher_guard,
2447                            }
2448                        }
2449                    };
2450                    Some(_fetcher_guard)
2451                } else {
2452                    // Owned externally so not an issue.
2453                    None
2454                };
2455
2456                #[cfg(any(
2457                    all(debug_assertions, feature = "devtools"),
2458                    feature = "devtools-always"
2459                ))]
2460                let before_time = chrono::Utc::now();
2461
2462                let loading_first_time = cached_buster.is_none();
2463
2464                #[cfg(any(
2465                    all(debug_assertions, feature = "devtools"),
2466                    feature = "devtools-always"
2467                ))]
2468                {
2469                    // Running this before fetching so it'll show up in devtools straight away:
2470                    if loading_first_time {
2471                        scope_lookup
2472                            .client_subscriptions_mut()
2473                            .notify_query_created(crate::subs_client::QueryCreatedInfo {
2474                                cache_key: query_scope_info.cache_key,
2475                                scope_title: query_scope_info.title.clone(),
2476                                key_hash,
2477                                debug_key: crate::utils::DebugValue::new(key),
2478                                combined_options: crate::options_combine(
2479                                    client_options,
2480                                    scope_options,
2481                                ),
2482                            });
2483                    }
2484                }
2485
2486                let maybe_local_key = lazy_maybe_local_key();
2487
2488                enum MaybeNewValue<V> {
2489                    NewValue(V),
2490                    SsrStreamedValueOverride,
2491                }
2492
2493                let maybe_new_value = scope_lookup
2494                    .with_notify_fetching(
2495                        query_scope_info.cache_key,
2496                        key_hash,
2497                        loading_first_time,
2498                        // Call the fetcher, but reset and repeat if an invalidation occurs whilst in-flight:
2499                        async {
2500                            loop {
2501                                let query_abort_rx = scope_lookup
2502                                    .prepare_invalidation_channel::<K, V>(
2503                                        query_scope_info,
2504                                        key_hash,
2505                                        &maybe_local_key,
2506                                    );
2507
2508                                let fut = owner_chain.with(|| fetcher(key.clone()));
2509
2510                                futures::select_biased! {
2511                                    rx_result = query_abort_rx.fuse() => {
2512                                        if let Ok(reason) = rx_result {
2513                                            match reason {
2514                                                QueryAbortReason::Invalidate | QueryAbortReason::Clear => {},
2515                                                QueryAbortReason::SsrStreamedValueOverride => {
2516                                                    break MaybeNewValue::SsrStreamedValueOverride;
2517                                                },
2518                                            }
2519                                        }
2520                                    },
2521                                    new_value = fut.fuse() => {
2522                                        break MaybeNewValue::NewValue(new_value);
2523                                    },
2524                                }
2525                            }
2526                        },
2527                    )
2528                    .await;
2529
2530                #[cfg(any(
2531                    all(debug_assertions, feature = "devtools"),
2532                    feature = "devtools-always"
2533                ))]
2534                let elapsed_ms = chrono::Utc::now()
2535                    .signed_duration_since(before_time)
2536                    .num_milliseconds();
2537
2538                let buster_if_uncached = if loading_first_time {
2539                    Some(
2540                        maybe_buster_if_uncached
2541                            .unwrap_or_else(|| ArcRwSignal::new(new_buster_id())),
2542                    )
2543                } else {
2544                    None
2545                };
2546
2547                let next_directive = scope_lookup.with_cached_scope_mut::<_, _, _, _>(
2548                    &mut scope_lookup.scopes_mut(),
2549                    query_scope_info.cache_key,
2550                    OnScopeMissing::Create(query_scope_info),
2551                    |_| {},
2552                    |scope, _| {
2553                        let scope = scope.expect("provided a default");
2554                        match maybe_new_value {
2555                            MaybeNewValue::NewValue(new_value) => {
2556                                if let Some(cached) = scope.get_mut(&key_hash) {
2557                                    cached.set_value(
2558                                        new_value,
2559                                        true,
2560                                        #[cfg(any(
2561                                            all(debug_assertions, feature = "devtools"),
2562                                            feature = "devtools-always"
2563                                        ))]
2564                                        crate::events::Event::new(
2565                                            crate::events::EventVariant::Fetched { elapsed_ms },
2566                                        ),
2567                                        ResetInvalidated::Reset,
2568                                    );
2569                                    return_cb(CachedOrFetchCbInput {
2570                                        cached,
2571                                        variant: CachedOrFetchCbInputVariant::CachedUpdated,
2572                                    })
2573                                } else {
2574                                    // We already notified before the async fetch, so it would show up sooner.
2575                                    scope.insert_without_query_created_notif(
2576                                        key_hash,
2577                                        Query::new(
2578                                            client_options,
2579                                            *self,
2580                                            query_scope_info,
2581                                            query_scope_info_for_new_query(),
2582                                            key_hash,
2583                                            maybe_local_key,
2584                                            new_value,
2585                                            buster_if_uncached.expect(
2586                                                "loading_first_time means this is Some(). (bug)",
2587                                            ),
2588                                            scope_options,
2589                                            None,
2590                                            #[cfg(any(
2591                                                all(debug_assertions, feature = "devtools"),
2592                                                feature = "devtools-always"
2593                                            ))]
2594                                            crate::events::Event::new(
2595                                                crate::events::EventVariant::Fetched { elapsed_ms },
2596                                            ),
2597                                        ),
2598                                    );
2599                                    return_cb(CachedOrFetchCbInput {
2600                                        cached: scope.get(&key_hash).expect("Just set. (bug)"),
2601                                        variant: CachedOrFetchCbInputVariant::Fresh,
2602                                    })
2603                                }
2604                            }
2605                            MaybeNewValue::SsrStreamedValueOverride => {
2606                                return_cb(CachedOrFetchCbInput {
2607                                    cached: scope
2608                                        .get(&key_hash)
2609                                        .expect("Should contain value streamed from server. (bug)"),
2610                                    variant: CachedOrFetchCbInputVariant::Fresh,
2611                                })
2612                            }
2613                        }
2614                    },
2615                );
2616
2617                match next_directive {
2618                    CachedOrFetchCbOutput::Refetch => {
2619                        panic!("Unexpected refetch directive after providing fresh value. (bug)")
2620                    }
2621                    CachedOrFetchCbOutput::Return(return_value) => return_value,
2622                }
2623            }
2624        }
2625    }
2626}
2627
2628pub(crate) struct QueryMetadata {
2629    pub updated_at: DateTime<Utc>,
2630    pub stale_or_invalidated: bool,
2631}