Skip to main content

leptos_fetch/
query_client.rs

1#![allow(ungated_async_fn_track_caller)] // Want it to auto-turn on when stable
2
3use std::{
4    borrow::Borrow,
5    collections::HashMap,
6    fmt::Debug,
7    hash::Hash,
8    sync::{Arc, LazyLock, atomic::AtomicBool},
9};
10
11use chrono::{DateTime, Utc};
12use codee::{Decoder, Encoder};
13use futures::{FutureExt, pin_mut};
14use leptos::{
15    prelude::{
16        ArcRwSignal, ArcSignal, Effect, Get, GetUntracked, LocalStorage, Owner, Read, ReadUntracked, Set, Signal,
17        Track, provide_context,
18    },
19    server::{ArcLocalResource, ArcResource, FromEncodedStr, IntoEncodedString, LocalResource, Resource},
20};
21use send_wrapper::SendWrapper;
22
23use crate::{
24    ArcLocalSignal, QueryOptions,
25    cache::{CachedOrFetchCbInput, CachedOrFetchCbInputVariant, CachedOrFetchCbOutput, OnScopeMissing},
26    cache_scope::{QueryAbortReason, QueryOrPending},
27    debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
28    maybe_local::MaybeLocal,
29    query::Query,
30    query_maybe_key::QueryMaybeKey,
31    query_scope::{QueryScopeInfo, QueryScopeLocalTrait, QueryScopeQueryInfo, QueryScopeTrait, ScopeCacheKey},
32    resource_drop_guard::ResourceDropGuard,
33    utils::{
34        KeyHash, OwnerChain, ResetInvalidated, client_only_yield_now, new_buster_id, new_resource_id,
35        run_external_callbacks,
36    },
37};
38
39use super::cache::ScopeLookup;
40
41#[cfg(not(feature = "rkyv"))]
42pub(crate) type DefaultCodec = codee::string::JsonSerdeCodec;
43
44#[cfg(feature = "rkyv")]
45pub(crate) type DefaultCodec = codee::binary::RkyvCodec;
46
47task_local::task_local! {
48    pub(crate) static ASYNC_TRACK_UPDATE_MARKER: Arc<AtomicBool>;
49}
50
51std::thread_local! {
52    pub(crate) static SYNC_TRACK_UPDATE_MARKER: AtomicBool = const { AtomicBool::new(true) };
53}
54
55/// The [`QueryClient`] stores all query data, and is used to manage queries.
56///
57/// Should be provided via leptos context at the top of the app.
58///
59/// # Example
60///
61/// ```
62/// use leptos::prelude::*;
63/// use leptos_fetch::QueryClient;
64///
65/// #[component]
66/// pub fn App() -> impl IntoView {
67///    QueryClient::new().provide();
68///     // ...
69/// }
70///
71/// #[component]
72/// pub fn MyComponent() -> impl IntoView {
73///     let client: QueryClient = expect_context();
74///      // ...
75/// }
76/// ```
77pub struct QueryClient<Codec = DefaultCodec> {
78    pub(crate) untyped_client: UntypedQueryClient,
79    _ser: std::marker::PhantomData<SendWrapper<Codec>>,
80}
81
82/// The internal untyped part of the [`QueryClient`],
83/// containing the methods that don't rely on the Codec generic.
84#[derive(Clone, Copy)]
85pub(crate) struct UntypedQueryClient {
86    pub(crate) scope_lookup: ScopeLookup,
87    options: QueryOptions,
88    created_at: DateTime<Utc>,
89}
90
91impl<Codec> Debug for QueryClient<Codec> {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        f.debug_struct("QueryClient")
94            .field("scope_lookup", &self.untyped_client.scope_lookup)
95            .field("options", &self.untyped_client.options)
96            .field("codec", &std::any::type_name::<Codec>())
97            .finish()
98    }
99}
100
101impl<Codec: 'static> Clone for QueryClient<Codec> {
102    fn clone(&self) -> Self {
103        *self
104    }
105}
106
107impl<Codec: 'static> Copy for QueryClient<Codec> {}
108
109impl Default for QueryClient<DefaultCodec> {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115impl QueryClient<DefaultCodec> {
116    /// Creates a new [`QueryClient`] with the default codec: [`codee::string::JsonSerdeCodec`].
117    ///
118    /// Call [`QueryClient::set_codec()`] to set a different codec.
119    ///
120    /// Call [`QueryClient::with_options()`] to set non-default options.
121    #[track_caller]
122    pub fn new() -> Self {
123        Self {
124            untyped_client: UntypedQueryClient {
125                scope_lookup: ScopeLookup::new(),
126                options: QueryOptions::default(),
127                created_at: Utc::now(),
128            },
129            _ser: std::marker::PhantomData,
130        }
131    }
132}
133
134impl<Codec: 'static> QueryClient<Codec> {
135    /// Provide the client to leptos context.
136    /// ```no_run
137    /// use leptos_fetch::QueryClient;
138    /// use leptos::prelude::*;
139    ///
140    /// QueryClient::new().provide();
141    ///
142    /// let client: QueryClient = expect_context();
143    /// ```
144    #[track_caller]
145    pub fn provide(self) -> Self {
146        provide_context(self);
147        self
148    }
149
150    /// **Applies to `ssr` only**
151    ///
152    /// It's possible to use non-json codecs for streaming leptos resources from the backend.
153    /// The default is [`codee::string::JsonSerdeCodec`].
154    ///
155    /// The current `codee` major version is `0.3` and will need to be
156    /// imported in your project to customize the codec.
157    ///
158    /// E.g. to use [`codee::binary::MsgpackSerdeCodec`]:
159    /// ```toml
160    /// codee = { version = "0.3", features = ["msgpack_serde"] }
161    /// ```
162    ///
163    /// [`codee::binary::MsgpackSerdeCodec`]: https://docs.rs/codee/latest/codee/binary/struct.MsgpackSerdeCodec.html
164    ///
165    /// This is a generic type on the [`QueryClient`], so when calling
166    /// [`leptos::prelude::expect_context`],
167    /// this type must be specified when not using the default.
168    ///
169    /// A useful pattern is to type alias the client with the custom codec for your whole app:
170    ///
171    /// ```no_run
172    /// use codee::binary::MsgpackSerdeCodec;
173    /// use leptos::prelude::*;
174    /// use leptos_fetch::QueryClient;
175    ///
176    /// type MyQueryClient = QueryClient<MsgpackSerdeCodec>;
177    ///
178    /// // Create and provide to context to make accessible everywhere:
179    /// QueryClient::new().set_codec::<MsgpackSerdeCodec>().provide();
180    ///
181    /// let client: MyQueryClient = expect_context();
182    /// ```
183    #[track_caller]
184    pub fn set_codec<NewCodec>(self) -> QueryClient<NewCodec> {
185        QueryClient {
186            untyped_client: self.untyped_client,
187            _ser: std::marker::PhantomData,
188        }
189    }
190
191    /// Set non-default options to apply to all queries.
192    ///
193    /// These options will be combined with any options for a specific query scope.
194    #[track_caller]
195    pub fn with_options(mut self, options: QueryOptions) -> Self {
196        self.untyped_client.options = options;
197        self
198    }
199
200    /// Read the base [`QueryOptions`] for this [`QueryClient`].
201    ///
202    /// These will be combined with any options for a specific query scope.
203    #[track_caller]
204    pub fn options(&self) -> QueryOptions {
205        self.untyped_client.options
206    }
207
208    /// Provide a signal to globally enable/disable auto refetching of queries
209    /// that have active resources and have set [`QueryOptions::with_refetch_interval`].
210    ///
211    /// Whilst this signal returns `false`, refetches will be skipped.
212    #[track_caller]
213    pub fn with_refetch_enabled_toggle(self, refetch_enabled: impl Into<ArcSignal<bool>>) -> Self {
214        self.untyped_client.scope_lookup.scopes_mut().refetch_enabled = Some(refetch_enabled.into());
215        self
216    }
217
218    /// If set, access the signal that globally enables/disables auto refetching of queries.
219    #[track_caller]
220    pub fn refetch_enabled(&self) -> Option<ArcSignal<bool>> {
221        self.untyped_client.scope_lookup.scopes().refetch_enabled.clone()
222    }
223
224    /// Query with [`LocalResource`]. Local resouces only load data on the client,
225    /// so can be used with non-threadsafe/serializable data.
226    ///
227    /// If a cached value exists but is stale, the cached value will be initially used,
228    /// then refreshed in the background, updating once the new value is ready.
229    #[track_caller]
230    pub fn local_resource<K, MaybeKey, V, M>(
231        &self,
232        query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
233        keyer: impl Fn() -> MaybeKey + 'static,
234    ) -> LocalResource<MaybeKey::MappedValue>
235    where
236        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
237        MaybeKey: QueryMaybeKey<K, V>,
238        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
239        V: DebugIfDevtoolsEnabled + Clone + 'static,
240    {
241        self.arc_local_resource(query_scope, keyer).into()
242    }
243
244    /// Query with [`ArcLocalResource`]. Local resouces only load data on the client,
245    /// so can be used with non-threadsafe/serializable data.
246    ///
247    /// If a cached value exists but is stale, the cached value will be initially used,
248    /// then refreshed in the background, updating once the new value is ready.
249    #[track_caller]
250    pub fn arc_local_resource<K, MaybeKey, V, M>(
251        &self,
252        query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
253        keyer: impl Fn() -> MaybeKey + 'static,
254    ) -> ArcLocalResource<MaybeKey::MappedValue>
255    where
256        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
257        MaybeKey: QueryMaybeKey<K, V>,
258        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
259        V: DebugIfDevtoolsEnabled + Clone + 'static,
260    {
261        let client = *self;
262        let client_options = self.options();
263        let cache_key = query_scope.cache_key();
264        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
265        let query_scope = Arc::new(query_scope);
266        let query_options = query_scope.options();
267        let resource_id = new_resource_id();
268
269        // To call .mark_resource_dropped() when the resource is dropped:
270        let drop_guard = ResourceDropGuard::<K, V>::new(self.untyped_client.scope_lookup, resource_id, cache_key);
271
272        ArcLocalResource::new({
273            move || {
274                let query_scope = query_scope.clone();
275                let query_scope_info = query_scope_info.clone();
276                let maybe_key = keyer().into_maybe_key();
277                let drop_guard = drop_guard.clone();
278                if let Some(key) = maybe_key.as_ref() {
279                    drop_guard.set_active_key(KeyHash::new(key));
280                }
281                // Note: cannot hoist outside of resource,
282                // it prevents the resource itself dropping when
283                // the owner is held by the resource itself, here
284                // each instance only lasts as long as the query.
285                let owner_chain = OwnerChain::new(client.untyped_client, query_scope_info.cache_key, Owner::current());
286                async move {
287                    if let Some(key) = maybe_key {
288                        let query_scope_query_info = || QueryScopeQueryInfo::new_local(&query_scope, &key);
289
290                        // Regression test for https://github.com/zakstucke/leptos-fetch/issues/64
291                        //
292                        // When data is prefetched, resources should still go through an async suspend-resolve
293                        // cycle (yield) rather than resolving synchronously. This is critical for `Transition` to
294                        // work correctly: without the yield, the Transition's internal `nth_run` counter doesn't
295                        // advance past the threshold, causing the fallback to show on the first key change.
296                        let was_cached = AtomicBool::new(false);
297
298                        let value = client
299                            .untyped_client
300                            .cached_or_fetch(
301                                client_options,
302                                query_options,
303                                None,
304                                &query_scope_info,
305                                query_scope_query_info,
306                                &key,
307                                {
308                                    let query_scope = query_scope.clone();
309                                    move |key| {
310                                        let query_scope = query_scope.clone();
311
312                                        async move { MaybeLocal::new_local(query_scope.query(key).await) }
313                                    }
314                                },
315                                |info| {
316                                    info.cached.buster.track();
317                                    info.cached.mark_resource_active(resource_id);
318                                    match info.variant {
319                                        CachedOrFetchCbInputVariant::CachedUntouched => {
320                                            was_cached.store(true, std::sync::atomic::Ordering::Relaxed);
321                                            // If stale refetch in the background with the
322                                            // prefetch() function, which'll recognise it's
323                                            // stale, refetch it and invalidate busters:
324                                            if cfg!(any(test, not(feature = "ssr")))
325                                                && info.cached.stale_or_invalidated()
326                                            {
327                                                let key = key.clone();
328                                                let query_scope = query_scope.clone();
329                                                let owner_chain = owner_chain.clone();
330                                                // Just adding the SendWrapper and using
331                                                // spawn() rather than spawn_local()
332                                                // to fix tests:
333                                                leptos::task::spawn(SendWrapper::new(async move {
334                                                    client
335                                                        .prefetch_inner(
336                                                            QueryScopeInfo::new_local(&query_scope),
337                                                            || QueryScopeQueryInfo::new_local(&query_scope, &key),
338                                                            async |key| {
339                                                                MaybeLocal::new_local(query_scope.query(key).await)
340                                                            },
341                                                            key.borrow(),
342                                                            || MaybeLocal::new_local(key.borrow().clone()),
343                                                            &owner_chain,
344                                                        )
345                                                        .await;
346                                                }));
347                                            }
348                                        }
349                                        CachedOrFetchCbInputVariant::Fresh => {}
350                                        CachedOrFetchCbInputVariant::CachedUpdated => {
351                                            panic!("Didn't direct inner to refetch here. (bug)")
352                                        }
353                                    }
354                                    CachedOrFetchCbOutput::Return(
355                                        // WONTPANIC: cached_or_fetch will
356                                        // only output values that are safe
357                                        // on this thread:
358                                        info.cached.value_maybe_stale().value_may_panic().clone(),
359                                    )
360                                },
361                                None,
362                                || MaybeLocal::new_local(key.clone()),
363                                &owner_chain,
364                            )
365                            .await;
366
367                        // Used to add a yield sleep when cached,
368                        // to prevent Transitions registering the first load, leading to fallback being shown on second load.
369                        // https://github.com/zakstucke/leptos-fetch/issues/64
370                        if was_cached.load(std::sync::atomic::Ordering::Relaxed) {
371                            client_only_yield_now().await;
372                        }
373
374                        MaybeKey::prepare_mapped_value(Some(value))
375                    } else {
376                        MaybeKey::prepare_mapped_value(None)
377                    }
378                }
379            }
380        })
381    }
382
383    /// Query with [`Resource`].
384    ///
385    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
386    ///
387    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
388    ///
389    /// If a cached value exists but is stale, the cached value will be initially used,
390    /// then refreshed in the background, updating once the new value is ready.
391    #[track_caller]
392    pub fn resource<K, MaybeKey, V, M>(
393        &self,
394        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
395        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
396    ) -> Resource<MaybeKey::MappedValue, Codec>
397    where
398        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
399        MaybeKey: QueryMaybeKey<K, V>,
400        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
401        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
402        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
403        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
404        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
405        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
406        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
407        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
408    {
409        self.arc_resource_with_options(query_scope, keyer, false).into()
410    }
411
412    /// Query with a blocking [`Resource`].
413    ///
414    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
415    ///
416    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
417    ///
418    /// If a cached value exists but is stale, the cached value will be initially used,
419    /// then refreshed in the background, updating once the new value is ready.
420    #[track_caller]
421    pub fn resource_blocking<K, MaybeKey, V, M>(
422        &self,
423        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
424        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
425    ) -> Resource<MaybeKey::MappedValue, Codec>
426    where
427        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
428        MaybeKey: QueryMaybeKey<K, V>,
429        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
430        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
431        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
432        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
433        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
434        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
435        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
436        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
437    {
438        self.arc_resource_with_options(query_scope, keyer, true).into()
439    }
440
441    /// Query with [`ArcResource`].
442    ///
443    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
444    ///
445    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
446    ///
447    /// If a cached value exists but is stale, the cached value will be initially used,
448    /// then refreshed in the background, updating once the new value is ready.
449    #[track_caller]
450    pub fn arc_resource<K, MaybeKey, V, M>(
451        &self,
452        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
453        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
454    ) -> ArcResource<MaybeKey::MappedValue, Codec>
455    where
456        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
457        MaybeKey: QueryMaybeKey<K, V>,
458        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
459        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
460        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
461        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
462        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
463        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
464        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
465        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
466    {
467        self.arc_resource_with_options(query_scope, keyer, false)
468    }
469
470    /// Query with a blocking [`ArcResource`].
471    ///
472    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
473    ///
474    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
475    ///
476    /// If a cached value exists but is stale, the cached value will be initially used,
477    /// then refreshed in the background, updating once the new value is ready.
478    #[track_caller]
479    pub fn arc_resource_blocking<K, MaybeKey, V, M>(
480        &self,
481        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
482        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
483    ) -> ArcResource<MaybeKey::MappedValue, Codec>
484    where
485        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
486        MaybeKey: QueryMaybeKey<K, V>,
487        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
488        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
489        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
490        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
491        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
492        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
493        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
494        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
495    {
496        self.arc_resource_with_options(query_scope, keyer, true)
497    }
498
499    #[track_caller]
500    fn arc_resource_with_options<K, MaybeKey, V, M>(
501        &self,
502        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
503        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
504        blocking: bool,
505    ) -> ArcResource<MaybeKey::MappedValue, Codec>
506    where
507        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
508        MaybeKey: QueryMaybeKey<K, V>,
509        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
510        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
511        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
512        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
513        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
514        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError: Debug,
515        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
516        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
517    {
518        let client = *self;
519        let client_options = self.options();
520        let cache_key = query_scope.cache_key();
521        let query_scope_info = QueryScopeInfo::new(&query_scope);
522        let query_scope = Arc::new(query_scope);
523        let scope_lookup = self.untyped_client.scope_lookup;
524        let query_options = query_scope.options();
525
526        let buster_if_uncached = ArcRwSignal::new(new_buster_id());
527        let resource_id = new_resource_id();
528
529        // To call .mark_resource_dropped() when the resource is dropped:
530        let drop_guard = ResourceDropGuard::<K, V>::new(self.untyped_client.scope_lookup, resource_id, cache_key);
531
532        let keyer = Arc::new(keyer);
533        let resource = ArcResource::new_with_options(
534            {
535                let buster_if_uncached = buster_if_uncached.clone();
536                let drop_guard = drop_guard.clone();
537                let keyer = keyer.clone();
538                move || {
539                    if let Some(key) = keyer().into_maybe_key() {
540                        let key_hash = KeyHash::new(&key);
541                        drop_guard.set_active_key(key_hash);
542                        scope_lookup.with_cached_query::<K, V, _>(&key_hash, &cache_key, |maybe_cached| {
543                            if let Some(cached) = maybe_cached {
544                                // Buster must be returned for it to be tracked.
545                                (Some(key.clone()), cached.buster.get())
546                            } else {
547                                // Buster must be returned for it to be tracked.
548                                (Some(key.clone()), buster_if_uncached.get())
549                            }
550                        })
551                    } else {
552                        (None, buster_if_uncached.get())
553                    }
554                }
555            },
556            {
557                let buster_if_uncached = buster_if_uncached.clone();
558                let query_scope_info = query_scope_info.clone();
559                let query_scope = query_scope.clone();
560                move |(maybe_key, last_used_buster)| {
561                    let query_scope = query_scope.clone();
562                    let query_scope_info = query_scope_info.clone();
563                    let buster_if_uncached = buster_if_uncached.clone();
564                    // Want the guard around everywhere
565                    // until the resource is dropped.
566                    let _drop_guard = drop_guard.clone();
567                    // Note: cannot hoist outside of resource,
568                    // it prevents the resource itself dropping
569                    // when the owner is held by the resource
570                    // itself, here each instance only lasts
571                    // as long as the query.
572                    let owner_chain =
573                        OwnerChain::new(client.untyped_client, query_scope_info.cache_key, Owner::current());
574                    async move {
575                        if let Some(key) = maybe_key {
576                            let query_scope_query_info = || QueryScopeQueryInfo::new(&query_scope, &key);
577
578                            // Regression test for https://github.com/zakstucke/leptos-fetch/issues/64
579                            //
580                            // When data is prefetched, resources should still go through an async suspend-resolve
581                            // cycle (yield) rather than resolving synchronously. This is critical for `Transition` to
582                            // work correctly: without the yield, the Transition's internal `nth_run` counter doesn't
583                            // advance past the threshold, causing the fallback to show on the first key change.
584                            let was_cached = AtomicBool::new(false);
585
586                            let value = client
587                                .untyped_client
588                                .cached_or_fetch(
589                                    client_options,
590                                    query_options,
591                                    Some(buster_if_uncached.clone()),
592                                    &query_scope_info,
593                                    query_scope_query_info,
594                                    &key,
595                                    {
596                                        let query_scope = query_scope.clone();
597                                        move |key| {
598                                            let query_scope = query_scope.clone();
599                                            async move { MaybeLocal::new(query_scope.query(key).await) }
600                                        }
601                                    },
602                                    |info| {
603                                        info.cached.mark_resource_active(resource_id);
604                                        match info.variant {
605                                            CachedOrFetchCbInputVariant::CachedUntouched => {
606                                                was_cached.store(true, std::sync::atomic::Ordering::Relaxed);
607
608                                                // If stale, refetch in the background
609                                                // with prefetch(), which'll recognise
610                                                // it's stale and invalidate busters:
611                                                if cfg!(any(test, not(feature = "ssr")))
612                                                    && info.cached.stale_or_invalidated()
613                                                {
614                                                    let key = key.clone();
615                                                    let query_scope = query_scope.clone();
616                                                    let owner_chain = owner_chain.clone();
617
618                                                    leptos::task::spawn(async move {
619                                                        client
620                                                            .prefetch_inner(
621                                                                QueryScopeInfo::new(&query_scope),
622                                                                || QueryScopeQueryInfo::new(&query_scope, &key),
623                                                                {
624                                                                    let query_scope = query_scope.clone();
625                                                                    move |key| {
626                                                                        let query_scope = query_scope.clone();
627
628                                                                        async move {
629                                                                            MaybeLocal::new(
630                                                                                query_scope.query(key).await,
631                                                                            )
632                                                                        }
633                                                                    }
634                                                                },
635                                                                key.borrow(),
636                                                                || MaybeLocal::new(key.borrow().clone()),
637                                                                &owner_chain,
638                                                            )
639                                                            .await;
640                                                    });
641                                                }
642
643                                                // Handle edge case where the key
644                                                // function saw it was uncached, so
645                                                // entered here, but when
646                                                // cached_or_fetch was acquiring the
647                                                // fetch mutex, someone else cached
648                                                // it, meaning our key function won't
649                                                // be reactive correctly unless we
650                                                // invalidate it now:
651                                                if last_used_buster != info.cached.buster.get_untracked() {
652                                                    buster_if_uncached.set(info.cached.buster.get_untracked());
653                                                }
654                                            }
655                                            CachedOrFetchCbInputVariant::Fresh => {}
656                                            CachedOrFetchCbInputVariant::CachedUpdated => {
657                                                panic!("Didn't direct inner to refetch here. (bug)")
658                                            }
659                                        }
660                                        CachedOrFetchCbOutput::Return(
661                                            // WONTPANIC: cached_or_fetch will
662                                            // only output values that are safe
663                                            // on this thread:
664                                            info.cached.value_maybe_stale().value_may_panic().clone(),
665                                        )
666                                    },
667                                    None,
668                                    || MaybeLocal::new(key.clone()),
669                                    &owner_chain,
670                                )
671                                .await;
672
673                            // Used to add a yield sleep when cached,
674                            // to prevent Transitions registering the first load, leading to fallback being shown on second load.
675                            // https://github.com/zakstucke/leptos-fetch/issues/64
676                            if was_cached.load(std::sync::atomic::Ordering::Relaxed) {
677                                client_only_yield_now().await;
678                            }
679
680                            MaybeKey::prepare_mapped_value(Some(value))
681                        } else {
682                            MaybeKey::prepare_mapped_value(None)
683                        }
684                    }
685                }
686            },
687            blocking,
688        );
689
690        // On the client, want to repopulate the frontend cache,
691        // so should write resources to the cache here if they
692        // don't exist.
693        // It would be better if in here we could check if the
694        // resource was started on the backend/streamed, saves
695        // doing most of this if already a frontend resource.
696        let effect = {
697            let resource = resource.clone();
698            let buster_if_uncached = buster_if_uncached.clone();
699            let client_created_at = self.untyped_client.created_at;
700            // Converting to Arc because the tests like the client
701            // get dropped even though this persists:
702            move |complete: Option<Option<()>>| {
703                if let Some(Some(())) = complete {
704                    return Some(());
705                }
706                if let Some(val) = resource.read().as_ref() {
707                    if MaybeKey::mapped_value_is_some(val) {
708                        scope_lookup.with_cached_scope_mut::<K, V, _, _>(
709                            &mut scope_lookup.scopes_mut(),
710                            query_scope_info.cache_key,
711                            OnScopeMissing::Create(&query_scope_info),
712                            |_| {},
713                            |maybe_scope, _| {
714                                let scope = maybe_scope.expect("provided a default");
715                                if let Some(key) = keyer().into_maybe_key() {
716                                    let query_scope_query_info = || QueryScopeQueryInfo::new(&query_scope, &key);
717                                    let key_hash = KeyHash::new(&key);
718
719                                    // Had a bug in tests where:
720                                    // calling client.resource() multiple
721                                    // times, previous impl would only run
722                                    // the effect once per
723                                    // client.resource(), but would still
724                                    // run again on subsequent
725                                    // client.resource() with the same key
726                                    // value
727                                    // - client.resource()
728                                    // - invalidate, starts loading in bg
729                                    // - client.resource()
730                                    // - effect runs again, sees resource
731                                    //   has value, goes to set it, fires
732                                    //   SsrStreamedValueOverride which
733                                    //   cancels the actual new query
734                                    //   loading to replace invalidated one
735                                    // So have to have a client wide
736                                    // protector, to make sure the effect
737                                    // really only runs once on the client,
738                                    // even if resource() called multiple
739                                    // times.
740                                    if (cfg!(test) || cfg!(not(feature = "ssr"))) && 
741                                        // Preventing memory leak, issue is only for first render:
742                                        (Utc::now() - client_created_at).num_seconds() < 10
743                                    {
744                                        use parking_lot::Mutex;
745
746                                        type AlreadySeenMap = HashMap<(u64, ScopeCacheKey, KeyHash), ()>;
747                                        static ALREADY_SEEN: LazyLock<Mutex<AlreadySeenMap>> =
748                                            LazyLock::new(|| Mutex::new(HashMap::new()));
749
750                                        let key = (scope_lookup.scope_id, query_scope_info.cache_key, key_hash);
751                                        let mut guard = ALREADY_SEEN.lock();
752                                        if guard.contains_key(&key) {
753                                            return;
754                                        }
755                                        guard.insert(key, ());
756                                    }
757
758                                    let mut was_pending = false;
759                                    // Protect against race condition:
760                                    // cancel any frontend query that
761                                    // started already if there was a
762                                    // client side local resource or
763                                    // prefetch/fetch_query etc that
764                                    // started on initial client-side ticks:
765                                    if let Some(QueryOrPending::Pending { query_abort_tx, .. }) =
766                                        scope.get_mut_include_pending(&key_hash)
767                                        && let Some(tx) = query_abort_tx.take()
768                                    {
769                                        tx.send(QueryAbortReason::SsrStreamedValueOverride).unwrap();
770                                        was_pending = true;
771                                    }
772
773                                    if was_pending || !scope.contains_key(&key_hash) {
774                                        let query = Query::new(
775                                            client_options,
776                                            client.untyped_client,
777                                            &query_scope_info,
778                                            query_scope_query_info(),
779                                            key_hash,
780                                            MaybeLocal::new(key),
781                                            MaybeLocal::new(MaybeKey::mapped_to_maybe_value(val.clone()).expect(
782                                                "Just checked \
783                                                    MaybeKey::mapped_value_is_some() \
784                                                    is true",
785                                            )),
786                                            buster_if_uncached.clone(),
787                                            query_options,
788                                            None,
789                                            #[cfg(any(
790                                                all(debug_assertions, feature = "devtools"),
791                                                feature = "devtools-always"
792                                            ))]
793                                            crate::events::Event::new(crate::events::EventVariant::StreamedFromServer),
794                                        );
795                                        scope.insert(key_hash, query);
796                                    }
797                                    scope
798                                        .get(&key_hash)
799                                        .expect("Just inserted")
800                                        .mark_resource_active(resource_id)
801                                }
802                            },
803                        );
804                    }
805                    Some(())
806                } else {
807                    None
808                }
809            }
810        };
811        // Won't run in tests if not isomorphic, but in prod Effect is wanted to not run on server:
812        if cfg!(test) {
813            // Wants Send + Sync on the Codec despite using SendWrapper in the PhantomData.
814            // Can put a SendWrapper around it as this is test only and they're single threaded:
815            let effect = SendWrapper::new(effect);
816            #[allow(clippy::redundant_closure)]
817            Effect::new_isomorphic(move |v| effect(v));
818        } else {
819            Effect::new(effect);
820        }
821
822        resource
823    }
824
825    /// Prevent reactive updates being triggered from the current updater callback fn.
826    ///
827    /// Call [`QueryClient::untrack_update_query`] in the callback of [`QueryClient::update_query`]
828    /// to prevent resources being updated after the callback completes.
829    ///
830    /// This is really useful when e.g:
831    /// - you know nothing changes after reading the existing value in the callback
832    /// - you don't want resources/subs to react and rerender, given nothings changed.
833    ///
834    /// This function is a noop outside the callbacks of:
835    /// - [`QueryClient::update_query`]
836    /// - [`QueryClient::update_query_async`]
837    /// - [`QueryClient::update_query_async_local`]
838    pub fn untrack_update_query(&self) {
839        // Set markers to false, these will all be reset to true automatically where needed.
840        SYNC_TRACK_UPDATE_MARKER.with(|marker| {
841            marker.store(false, std::sync::atomic::Ordering::Relaxed);
842        });
843        // Ignore returned AccessError if didn't exist (not in async ctx):
844        let _ = ASYNC_TRACK_UPDATE_MARKER.try_with(|marker| {
845            marker.store(false, std::sync::atomic::Ordering::Relaxed);
846        });
847    }
848
849    /// Prefetch a query and store it in the cache.
850    ///
851    /// - Entry doesn't exist: fetched and stored in the cache.
852    /// - Entry exists but stale: fetched and updated in the cache.
853    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
854    ///
855    /// If the cached query changes, active resources using the query will be updated.
856    #[track_caller]
857    pub async fn prefetch_query<K, V, M>(&self, query_scope: impl QueryScopeTrait<K, V, M>, key: impl Borrow<K>)
858    where
859        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
860        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
861    {
862        let query_scope_info = QueryScopeInfo::new(&query_scope);
863        let owner_chain = OwnerChain::new(self.untyped_client, query_scope_info.cache_key, Owner::current());
864        self.prefetch_inner(
865            query_scope_info,
866            || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
867            async |key| MaybeLocal::new(query_scope.query(key).await),
868            key.borrow(),
869            || MaybeLocal::new(key.borrow().clone()),
870            &owner_chain,
871        )
872        .await
873    }
874
875    /// Prefetch a non-threadsafe query and store it in the cache for this thread only.
876    ///
877    /// - Entry doesn't exist: fetched and stored in the cache.
878    /// - Entry exists but stale: fetched and updated in the cache.
879    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
880    ///
881    /// If the cached query changes, active resources using the query will be updated.
882    #[track_caller]
883    pub async fn prefetch_query_local<K, V, M>(
884        &self,
885        query_scope: impl QueryScopeLocalTrait<K, V, M>,
886        key: impl Borrow<K>,
887    ) where
888        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
889        V: DebugIfDevtoolsEnabled + Clone + 'static,
890    {
891        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
892        let owner_chain = OwnerChain::new(self.untyped_client, query_scope_info.cache_key, Owner::current());
893        self.prefetch_inner(
894            query_scope_info,
895            || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
896            async |key| MaybeLocal::new_local(query_scope.query(key).await),
897            key.borrow(),
898            || MaybeLocal::new_local(key.borrow().clone()),
899            &owner_chain,
900        )
901        .await
902    }
903
904    #[track_caller]
905    async fn prefetch_inner<K, V, FetcherFut>(
906        &self,
907        query_scope_info: QueryScopeInfo,
908        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
909        fetcher: impl Fn(K) -> FetcherFut,
910        key: &K,
911        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
912        owner_chain: &OwnerChain,
913    ) where
914        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
915        V: DebugIfDevtoolsEnabled + Clone + 'static,
916        FetcherFut: Future<Output = MaybeLocal<V>>,
917    {
918        self.untyped_client
919            .cached_or_fetch(
920                self.options(),
921                query_scope_info.options,
922                None,
923                &query_scope_info,
924                query_scope_info_for_new_query,
925                key,
926                fetcher,
927                |info| {
928                    match info.variant {
929                        CachedOrFetchCbInputVariant::CachedUntouched => {
930                            if info.cached.stale_or_invalidated() {
931                                return CachedOrFetchCbOutput::Refetch;
932                            }
933                        }
934                        CachedOrFetchCbInputVariant::CachedUpdated => {
935                            // Update anything using it:
936                            info.cached.buster.set(new_buster_id());
937                        }
938                        CachedOrFetchCbInputVariant::Fresh => {}
939                    }
940                    CachedOrFetchCbOutput::Return(())
941                },
942                None,
943                lazy_maybe_local_key,
944                owner_chain,
945            )
946            .await;
947    }
948
949    /// Fetch a query, store it in the cache and return it.
950    ///
951    /// - Entry doesn't exist: fetched and stored in the cache.
952    /// - Entry exists but stale: fetched and updated in the cache.
953    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
954    ///
955    /// If the cached query changes, active resources using the query will be updated.
956    ///
957    /// Returns the up-to-date cached query.
958    #[track_caller]
959    pub async fn fetch_query<K, V, M>(&self, query_scope: impl QueryScopeTrait<K, V, M>, key: impl Borrow<K>) -> V
960    where
961        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
962        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
963    {
964        self.untyped_client.fetch_query(query_scope, key).await
965    }
966
967    /// Fetch a non-threadsafe query, store it in the cache for this thread only and return it.
968    ///
969    /// - Entry doesn't exist: fetched and stored in the cache.
970    /// - Entry exists but stale: fetched and updated in the cache.
971    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
972    ///
973    /// If the cached query changes, active resources using the query will be updated.
974    ///
975    /// Returns the up-to-date cached query.
976    #[track_caller]
977    pub async fn fetch_query_local<K, V, M>(
978        &self,
979        query_scope: impl QueryScopeLocalTrait<K, V, M>,
980        key: impl Borrow<K>,
981    ) -> V
982    where
983        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
984        V: DebugIfDevtoolsEnabled + Clone + 'static,
985    {
986        self.untyped_client.fetch_query_local(query_scope, key).await
987    }
988
989    /// Set the value of a query in the cache.
990    /// This cached value will be available from all threads and take priority
991    /// over any locally cached value for this query.
992    ///
993    /// Active resources using the query will be updated.
994    #[track_caller]
995    pub fn set_query<K, V, M>(&self, query_scope: impl QueryScopeTrait<K, V, M>, key: impl Borrow<K>, new_value: V)
996    where
997        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
998        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
999    {
1000        self.set_inner(
1001            QueryScopeInfo::new(&query_scope),
1002            || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
1003            key.borrow(),
1004            MaybeLocal::new(new_value),
1005            || MaybeLocal::new(key.borrow().clone()),
1006            true,
1007            // like update methods, this does not come from the query function itself,
1008            // so should not reset the invalidated status:
1009            ResetInvalidated::NoReset,
1010        )
1011    }
1012
1013    /// Set the value of a non-threadsafe query in the cache for this thread only.
1014    /// This cached value will only be available from this thread, the cache will
1015    /// return empty, unless a nonlocal value is set, from any other thread.
1016    ///
1017    /// Active resources using the query will be updated.
1018    #[track_caller]
1019    pub fn set_query_local<K, V, M>(
1020        &self,
1021        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1022        key: impl Borrow<K>,
1023        new_value: V,
1024    ) where
1025        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1026        V: DebugIfDevtoolsEnabled + Clone + 'static,
1027    {
1028        self.set_inner::<K, V>(
1029            QueryScopeInfo::new_local(&query_scope),
1030            || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
1031            key.borrow(),
1032            MaybeLocal::new_local(new_value),
1033            || MaybeLocal::new_local(key.borrow().clone()),
1034            true,
1035            // like update methods, this does not come from the query function itself,
1036            // so should not reset the invalidated status:
1037            ResetInvalidated::NoReset,
1038        )
1039    }
1040
1041    #[track_caller]
1042    fn set_inner<K, V>(
1043        &self,
1044        query_scope_info: QueryScopeInfo,
1045        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
1046        key: &K,
1047        new_value: MaybeLocal<V>,
1048        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
1049        track: bool,
1050        reset_invalidated: ResetInvalidated,
1051    ) where
1052        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1053        V: DebugIfDevtoolsEnabled + Clone + 'static,
1054    {
1055        let key_hash = KeyHash::new(key.borrow());
1056        self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1057            &mut self.untyped_client.scope_lookup.scopes_mut(),
1058            query_scope_info.cache_key,
1059            OnScopeMissing::Create(&query_scope_info),
1060            |_| {},
1061            |maybe_scope, _| {
1062                let scope = maybe_scope.expect("provided a default");
1063
1064                // Make sure to look both caches if threadsafe, and prefer threadsafe:
1065                let maybe_cached = if !new_value.is_local() {
1066                    if let Some(threadsafe_existing) = scope.get_mut_threadsafe_only(&key_hash) {
1067                        Some(threadsafe_existing)
1068                    } else {
1069                        scope.get_mut_local_only(&key_hash)
1070                    }
1071                } else {
1072                    scope.get_mut_local_only(&key_hash)
1073                };
1074
1075                if let Some(cached) = maybe_cached {
1076                    cached.set_value(
1077                        new_value,
1078                        track,
1079                        #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
1080                        crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1081                        reset_invalidated,
1082                    );
1083                } else {
1084                    let query = Query::new(
1085                        self.options(),
1086                        self.untyped_client,
1087                        &query_scope_info,
1088                        query_scope_info_for_new_query(),
1089                        key_hash,
1090                        lazy_maybe_local_key(),
1091                        new_value,
1092                        ArcRwSignal::new(new_buster_id()),
1093                        query_scope_info.options,
1094                        None,
1095                        #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
1096                        crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1097                    );
1098                    scope.insert(key_hash, query);
1099                }
1100            },
1101        );
1102    }
1103
1104    /// Synchronously update the value of a query in the cache with a callback.
1105    ///
1106    /// Active resources using the query will be updated.
1107    ///
1108    /// The callback takes `Option<&mut V>`, will be None if the value is not available in the cache.
1109    ///
1110    /// If you want async and/or always get the value, use
1111    /// [`QueryClient::update_query_async`]/[`QueryClient::update_query_async_local`].
1112    ///
1113    /// Returns the output of the callback.
1114    ///
1115    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1116    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1117    #[track_caller]
1118    pub fn update_query<K, V, T, M>(
1119        &self,
1120        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1121        key: impl Borrow<K>,
1122        modifier: impl FnOnce(Option<&mut V>) -> T,
1123    ) -> T
1124    where
1125        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1126        V: DebugIfDevtoolsEnabled + Clone + 'static,
1127    {
1128        self.update_query_inner::<K, V, T>(
1129            &QueryScopeInfo::new_local(&query_scope),
1130            key.borrow(),
1131            ResetInvalidated::NoReset,
1132            modifier,
1133        )
1134    }
1135
1136    #[track_caller]
1137    fn update_query_inner<K, V, T>(
1138        &self,
1139        query_scope_info: &QueryScopeInfo,
1140        key: &K,
1141        reset_invalidated: ResetInvalidated,
1142        modifier: impl FnOnce(Option<&mut V>) -> T,
1143    ) -> T
1144    where
1145        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1146        V: DebugIfDevtoolsEnabled + Clone + 'static,
1147    {
1148        let key_hash = KeyHash::new(key);
1149        let mut modifier_holder = Some(modifier);
1150
1151        let maybe_return_value = self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1152            &mut self.untyped_client.scope_lookup.scopes_mut(),
1153            query_scope_info.cache_key,
1154            OnScopeMissing::Skip,
1155            |_| {},
1156            |maybe_scope, _| {
1157                if let Some(scope) = maybe_scope
1158                    && let Some(cached) = scope.get_mut(&key_hash)
1159                {
1160                    let modifier = modifier_holder
1161                        .take()
1162                        .expect("Should never be used more than once. (bug)");
1163                    let return_value = cached.update_value(
1164                        // WONTPANIC: the internals will only supply the value if available from this thread:
1165                        |value| modifier(Some(value.value_mut_may_panic())),
1166                        #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
1167                        crate::events::Event::new(crate::events::EventVariant::DeclarativeUpdate),
1168                        reset_invalidated,
1169                    );
1170                    return Some(return_value);
1171                }
1172                None
1173            },
1174        );
1175        if let Some(return_value) = maybe_return_value {
1176            return_value
1177        } else {
1178            // Didn't exist:
1179            modifier_holder
1180                .take()
1181                .expect("Should never be used more than once. (bug)")(None)
1182        }
1183    }
1184
1185    /// Asynchronously map a threadsafe query in the cache from one value to another.
1186    ///
1187    /// Unlike [`QueryClient::update_query`], this will fetch the query first, if it doesn't exist.
1188    ///
1189    /// Active resources using the query will be updated.
1190    ///
1191    /// Returns the output of the callback.
1192    ///
1193    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1194    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1195    #[track_caller]
1196    pub async fn update_query_async<'a, K, V, T, M>(
1197        &'a self,
1198        query_scope: impl QueryScopeTrait<K, V, M>,
1199        key: impl Borrow<K>,
1200        mapper: impl AsyncFnOnce(&mut V) -> T,
1201    ) -> T
1202    where
1203        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
1204        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1205    {
1206        let query_scope_info = QueryScopeInfo::new(&query_scope);
1207        let owner_chain = OwnerChain::new(self.untyped_client, query_scope_info.cache_key, Owner::current());
1208        self.update_query_async_inner(
1209            query_scope_info,
1210            || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
1211            async |key| MaybeLocal::new(query_scope.query(key).await),
1212            key.borrow(),
1213            mapper,
1214            MaybeLocal::new,
1215            || MaybeLocal::new(key.borrow().clone()),
1216            &owner_chain,
1217        )
1218        .await
1219    }
1220
1221    /// Asynchronously map a non-threadsafe query in the cache from one value to another.
1222    ///
1223    /// Unlike [`QueryClient::update_query`], this will fetch the query first, if it doesn't exist.
1224    ///
1225    /// Active resources using the query will be updated.
1226    ///
1227    /// Returns the output of the callback.
1228    ///
1229    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1230    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1231    #[track_caller]
1232    pub async fn update_query_async_local<'a, K, V, T, M>(
1233        &'a self,
1234        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1235        key: impl Borrow<K>,
1236        mapper: impl AsyncFnOnce(&mut V) -> T,
1237    ) -> T
1238    where
1239        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1240        V: DebugIfDevtoolsEnabled + Clone + 'static,
1241    {
1242        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
1243        let owner_chain = OwnerChain::new(self.untyped_client, query_scope_info.cache_key, Owner::current());
1244        self.update_query_async_inner(
1245            query_scope_info,
1246            || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
1247            async |key| MaybeLocal::new_local(query_scope.query(key).await),
1248            key.borrow(),
1249            mapper,
1250            MaybeLocal::new_local,
1251            || MaybeLocal::new_local(key.borrow().clone()),
1252            &owner_chain,
1253        )
1254        .await
1255    }
1256
1257    #[track_caller]
1258    async fn update_query_async_inner<'a, K, V, T, FetcherFut>(
1259        &'a self,
1260        query_scope_info: QueryScopeInfo,
1261        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
1262        fetcher: impl Fn(K) -> FetcherFut,
1263        key: &K,
1264        mapper: impl AsyncFnOnce(&mut V) -> T,
1265        into_maybe_local: impl FnOnce(V) -> MaybeLocal<V>,
1266        lazy_maybe_local_key: impl Fn() -> MaybeLocal<K>,
1267        owner_chain: &OwnerChain,
1268    ) -> T
1269    where
1270        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1271        V: DebugIfDevtoolsEnabled + Clone + 'static,
1272        FetcherFut: Future<Output = MaybeLocal<V>>,
1273    {
1274        let key_hash = KeyHash::new(key.borrow());
1275
1276        // By holding the fetcher mutex from start to finish, prevent the chance of the value
1277        // being fetched between the fetch async call and the external user mapper async call
1278        // and the final set().
1279        let fetcher_mutex = self
1280            .untyped_client
1281            .scope_lookup
1282            .fetcher_mutex::<K, V>(key_hash, &query_scope_info);
1283        let fetcher_guard = fetcher_mutex.lock().await;
1284
1285        let mut new_value = self
1286            .untyped_client
1287            .fetch_inner(
1288                query_scope_info.clone(),
1289                &query_scope_info_for_new_query,
1290                fetcher,
1291                key.borrow(),
1292                Some(&fetcher_guard),
1293                &lazy_maybe_local_key,
1294                owner_chain,
1295            )
1296            .await;
1297
1298        // The fetch will "turn on" is_fetching during it's lifetime, but we also want it during the mapper function:
1299        self.untyped_client
1300            .scope_lookup
1301            .with_notify_fetching(query_scope_info.cache_key, key_hash, false, async {
1302                let track = Arc::new(AtomicBool::new(true));
1303
1304                // Will monitor for invalidations during the user async fn, which could take a long time:
1305                let query_abort_rx = self.untyped_client.scope_lookup.prepare_invalidation_channel::<K, V>(
1306                    &query_scope_info,
1307                    key_hash,
1308                    &lazy_maybe_local_key(),
1309                );
1310
1311                let result_fut = ASYNC_TRACK_UPDATE_MARKER.scope(track.clone(), async { mapper(&mut new_value).await });
1312
1313                // Call the user async function, but also monitor for invalidations:
1314                // Specifically, we care about if .clear() is called.
1315                // If so, we shouldn't set the value, as it will have been generated with
1316                // the cleared cached query value.
1317                // If invalidated should still set, because the value will be deemed
1318                // "less stale", but won't de-invalidate it.
1319                // Don't care about QueryAbortReason::SsrStreamedValueOverride
1320                // as it shouldn't be possible to get to this point before all ssr streaming is done.
1321                let mut cleared_during_user_fn = false;
1322                let result = {
1323                    pin_mut!(result_fut);
1324                    pin_mut!(query_abort_rx);
1325                    match futures::future::select(result_fut, query_abort_rx).await {
1326                        futures::future::Either::Left((result, _query_abort_rx)) => result,
1327                        futures::future::Either::Right((query_abort_reason, result_fut)) => {
1328                            if let Ok(QueryAbortReason::Clear) = query_abort_reason {
1329                                // If the query was cleared, don't set the new value.
1330                                cleared_during_user_fn = true;
1331                            }
1332                            result_fut.await
1333                        }
1334                    }
1335                };
1336
1337                // If the query was cleared, don't set the new value.
1338                if !cleared_during_user_fn {
1339                    // Cover a small edge case:
1340                    // - value in threadsafe
1341                    // - .update_query_async_local called
1342                    // - replacement value .set() called, stored in local cache because
1343                    //   no type safety on threadsafe cache
1344                    // - now have 2 versions in both caches
1345                    // by trying to update first, means it'll stick with existing cache if it exists,
1346                    // otherwise doesn't matter and can just be set normally:
1347                    let mut new_value = Some(new_value);
1348                    let updated = self.update_query_inner::<K, V, _>(
1349                        &query_scope_info,
1350                        key,
1351                        // fetch_inner would've reset it if needed, then the update itself shouldn't change the value:
1352                        ResetInvalidated::NoReset,
1353                        |value| {
1354                            if let Some(value) = value {
1355                                // This sync update call itself needs untracking if !track:
1356                                if !track.load(std::sync::atomic::Ordering::Relaxed) {
1357                                    self.untrack_update_query();
1358                                }
1359
1360                                *value = new_value.take().expect("Should be Some");
1361                                true
1362                            } else {
1363                                false
1364                            }
1365                        },
1366                    );
1367                    if !updated {
1368                        self.set_inner::<K, V>(
1369                            query_scope_info,
1370                            query_scope_info_for_new_query,
1371                            key.borrow(),
1372                            into_maybe_local(
1373                                new_value
1374                                    .take()
1375                                    .expect("Should be Some, should only be here if not updated"),
1376                            ),
1377                            lazy_maybe_local_key,
1378                            track.load(std::sync::atomic::Ordering::Relaxed),
1379                            // fetch_inner would've reset it if needed, then the update
1380                            // itself shouldn't change the value:
1381                            ResetInvalidated::NoReset,
1382                        );
1383                    }
1384                }
1385
1386                result
1387            })
1388            .await
1389    }
1390
1391    /// Synchronously get a query from the cache, if it exists.
1392    #[track_caller]
1393    pub fn get_cached_query<K, V, M>(
1394        &self,
1395        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1396        key: impl Borrow<K>,
1397    ) -> Option<V>
1398    where
1399        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1400        V: DebugIfDevtoolsEnabled + Clone + 'static,
1401    {
1402        self.untyped_client.scope_lookup.with_cached_query::<K, V, _>(
1403            &KeyHash::new(key.borrow()),
1404            &query_scope.cache_key(),
1405            |maybe_cached| {
1406                // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1407                maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1408            },
1409        )
1410    }
1411
1412    /// Synchronously check if a query exists in the cache.
1413    ///
1414    /// Returns `true` if the query exists.
1415    #[track_caller]
1416    pub fn query_exists<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>, key: impl Borrow<K>) -> bool
1417    where
1418        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1419        V: DebugIfDevtoolsEnabled + 'static,
1420    {
1421        let key_hash = KeyHash::new(key.borrow());
1422        self.untyped_client.scope_lookup.with_cached_query::<K, V, _>(
1423            &key_hash,
1424            &query_scope.cache_key(),
1425            |maybe_cached| maybe_cached.is_some(),
1426        )
1427    }
1428
1429    /// Subscribe to the `is_loading` status of a query.
1430    /// The keyer function is reactive to changes in `K`.
1431    ///
1432    /// This is `true` when the query is in the process of fetching data for the first time.
1433    /// This is in contrast to `is_fetching`, that is `true` whenever the query
1434    /// is fetching data, including when it's refetching.
1435    ///
1436    /// From a resource perspective:
1437    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1438    /// - `is_fetching=true` + `is_loading=false` means the resource is showing
1439    ///   previous data, and will update once new data finishes refetching
1440    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1441    #[track_caller]
1442    pub fn subscribe_is_loading<K, MaybeKey, V, M>(
1443        &self,
1444        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1445        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1446    ) -> Signal<bool>
1447    where
1448        K: Hash + Send + Sync + 'static,
1449        MaybeKey: QueryMaybeKey<K, V>,
1450        MaybeKey::MappedValue: 'static,
1451        V: 'static,
1452    {
1453        self.subscribe_is_loading_arc(query_scope, keyer).into()
1454    }
1455
1456    /// Subscribe to the `is_loading` status of a query with a non-threadsafe key.
1457    /// The keyer function is reactive to changes in `K`.
1458    ///
1459    /// This is `true` when the query is in the process of fetching data for the first time.
1460    /// This is in contrast to `is_fetching`, that is `true` whenever the query
1461    /// is fetching data, including when it's refetching.
1462    ///
1463    /// From a resource perspective:
1464    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1465    /// - `is_fetching=true` + `is_loading=false` means the resource is showing
1466    ///   previous data, and will update once new data finishes refetching
1467    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1468    #[track_caller]
1469    pub fn subscribe_is_loading_local<K, MaybeKey, V, M>(
1470        &self,
1471        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1472        keyer: impl Fn() -> MaybeKey + 'static,
1473    ) -> Signal<bool>
1474    where
1475        K: Hash + 'static,
1476        MaybeKey: QueryMaybeKey<K, V>,
1477        MaybeKey::MappedValue: 'static,
1478        V: 'static,
1479    {
1480        self.subscribe_is_loading_arc_local(query_scope, keyer).into()
1481    }
1482
1483    /// Subscribe to the `is_loading` status of a query with a non-threadsafe key.
1484    /// The keyer function is reactive to changes in `K`.
1485    ///
1486    /// This is `true` when the query is in the process of fetching data for the first time.
1487    /// This is in contrast to `is_fetching`, that is `true` whenever the query
1488    /// is fetching data, including when it's refetching.
1489    ///
1490    /// From a resource perspective:
1491    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1492    /// - `is_fetching=true` + `is_loading=false` means the resource is showing
1493    ///   previous data, and will update once new data finishes refetching
1494    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1495    #[track_caller]
1496    pub fn subscribe_is_loading_arc_local<K, MaybeKey, V, M>(
1497        &self,
1498        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1499        keyer: impl Fn() -> MaybeKey + 'static,
1500    ) -> ArcSignal<bool>
1501    where
1502        K: Hash + 'static,
1503        MaybeKey: QueryMaybeKey<K, V>,
1504        MaybeKey::MappedValue: 'static,
1505        V: 'static,
1506    {
1507        let keyer = SendWrapper::new(keyer);
1508        self.untyped_client
1509            .scope_lookup
1510            .scope_subscriptions_mut()
1511            .add_is_loading_subscription(
1512                query_scope.cache_key(),
1513                MaybeLocal::new_local(ArcSignal::derive(move || {
1514                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1515                })),
1516            )
1517    }
1518
1519    /// Subscribe to the `is_loading` status of a query.
1520    /// The keyer function is reactive to changes in `K`.
1521    ///
1522    /// This is `true` when the query is in the process of fetching data for the first time.
1523    /// This is in contrast to `is_fetching`, that is `true` whenever the query
1524    /// is fetching data, including when it's refetching.
1525    ///
1526    /// From a resource perspective:
1527    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1528    /// - `is_fetching=true` + `is_loading=false` means the resource is showing
1529    ///   previous data, and will update once new data finishes refetching
1530    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1531    #[track_caller]
1532    pub fn subscribe_is_loading_arc<K, MaybeKey, V, M>(
1533        &self,
1534        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1535        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1536    ) -> ArcSignal<bool>
1537    where
1538        K: Hash + Send + Sync + 'static,
1539        MaybeKey: QueryMaybeKey<K, V>,
1540        MaybeKey::MappedValue: 'static,
1541        V: 'static,
1542    {
1543        self.untyped_client
1544            .scope_lookup
1545            .scope_subscriptions_mut()
1546            .add_is_loading_subscription(
1547                query_scope.cache_key(),
1548                MaybeLocal::new(ArcSignal::derive(move || {
1549                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1550                })),
1551            )
1552    }
1553
1554    /// Subscribe to the `is_fetching` status of a query.
1555    /// The keyer function is reactive to changes in `K`.
1556    ///
1557    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1558    /// This is in contrast to `is_loading`, that is `true` when the query is in
1559    /// the process of fetching data for the first time only.
1560    ///
1561    /// From a resource perspective:
1562    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1563    /// - `is_fetching=true` + `is_loading=false` means the resource is showing
1564    ///   previous data, and will update once new data finishes refetching
1565    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1566    #[track_caller]
1567    pub fn subscribe_is_fetching<K, MaybeKey, V, M>(
1568        &self,
1569        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1570        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1571    ) -> Signal<bool>
1572    where
1573        K: Hash + Send + Sync + 'static,
1574        MaybeKey: QueryMaybeKey<K, V>,
1575        MaybeKey::MappedValue: 'static,
1576        V: 'static,
1577    {
1578        self.subscribe_is_fetching_arc(query_scope, keyer).into()
1579    }
1580
1581    /// Subscribe to the `is_fetching` status of a query with a non-threadsafe key.
1582    /// The keyer function is reactive to changes in `K`.
1583    ///
1584    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1585    /// This is in contrast to `is_loading`, that is `true` when the query is in
1586    /// the process of fetching data for the first time only.
1587    ///
1588    /// From a resource perspective:
1589    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1590    /// - `is_fetching=true` + `is_loading=false` means the resource is showing
1591    ///   previous data, and will update once new data finishes refetching
1592    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1593    #[track_caller]
1594    pub fn subscribe_is_fetching_local<K, MaybeKey, V, M>(
1595        &self,
1596        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1597        keyer: impl Fn() -> MaybeKey + 'static,
1598    ) -> Signal<bool>
1599    where
1600        K: Hash + 'static,
1601        MaybeKey: QueryMaybeKey<K, V>,
1602        MaybeKey::MappedValue: 'static,
1603        V: 'static,
1604    {
1605        self.subscribe_is_fetching_arc_local(query_scope, keyer).into()
1606    }
1607
1608    /// Subscribe to the `is_fetching` status of a query with a non-threadsafe key.
1609    /// The keyer function is reactive to changes in `K`.
1610    ///
1611    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1612    /// This is in contrast to `is_loading`, that is `true` when the query is in
1613    /// the process of fetching data for the first time only.
1614    ///
1615    /// From a resource perspective:
1616    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1617    /// - `is_fetching=true` + `is_loading=false` means the resource is showing
1618    ///   previous data, and will update once new data finishes refetching
1619    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1620    #[track_caller]
1621    pub fn subscribe_is_fetching_arc_local<K, MaybeKey, V, M>(
1622        &self,
1623        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1624        keyer: impl Fn() -> MaybeKey + 'static,
1625    ) -> ArcSignal<bool>
1626    where
1627        K: Hash + 'static,
1628        MaybeKey: QueryMaybeKey<K, V>,
1629        MaybeKey::MappedValue: 'static,
1630        V: 'static,
1631    {
1632        let keyer = SendWrapper::new(keyer);
1633        self.untyped_client
1634            .scope_lookup
1635            .scope_subscriptions_mut()
1636            .add_is_fetching_subscription(
1637                query_scope.cache_key(),
1638                MaybeLocal::new_local(ArcSignal::derive(move || {
1639                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1640                })),
1641            )
1642    }
1643
1644    /// Subscribe to the `is_fetching` status of a query.
1645    /// The keyer function is reactive to changes in `K`.
1646    ///
1647    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1648    /// This is in contrast to `is_loading`, that is `true` when the query is in
1649    /// the process of fetching data for the first time only.
1650    ///
1651    /// From a resource perspective:
1652    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1653    /// - `is_fetching=true` + `is_loading=false` means the resource is showing
1654    ///   previous data, and will update once new data finishes refetching
1655    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1656    #[track_caller]
1657    pub fn subscribe_is_fetching_arc<K, MaybeKey, V, M>(
1658        &self,
1659        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1660        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1661    ) -> ArcSignal<bool>
1662    where
1663        K: Hash + Send + Sync + 'static,
1664        MaybeKey: QueryMaybeKey<K, V>,
1665        MaybeKey::MappedValue: 'static,
1666        V: 'static,
1667    {
1668        self.untyped_client
1669            .scope_lookup
1670            .scope_subscriptions_mut()
1671            .add_is_fetching_subscription(
1672                query_scope.cache_key(),
1673                MaybeLocal::new(ArcSignal::derive(move || {
1674                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1675                })),
1676            )
1677    }
1678
1679    /// Subscribe to the value of a query.
1680    /// The keyer function is reactive to changes in `K`.
1681    ///
1682    /// This will update whenever the query is created, removed, updated, refetched or set.
1683    ///
1684    /// Compared to a resource:
1685    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.       
1686    #[track_caller]
1687    pub fn subscribe_value<K, MaybeKey, V, M>(
1688        &self,
1689        query_scope: impl QueryScopeTrait<K, V, M>,
1690        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1691    ) -> Signal<Option<V>>
1692    where
1693        K: DebugIfDevtoolsEnabled + Hash + Clone + Send + Sync + 'static,
1694        MaybeKey: QueryMaybeKey<K, V>,
1695        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1696        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1697    {
1698        self.subscribe_value_arc(query_scope, keyer).into()
1699    }
1700
1701    /// Subscribe to the value of a non-threadsafe query.
1702    /// The keyer function is reactive to changes in `K`.
1703    ///
1704    /// This will update whenever the query is created, removed, updated, refetched or set.
1705    ///
1706    /// Compared to a resource:
1707    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1708    #[track_caller]
1709    pub fn subscribe_value_local<K, MaybeKey, V, M>(
1710        &self,
1711        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1712        keyer: impl Fn() -> MaybeKey + 'static,
1713    ) -> Signal<Option<V>, LocalStorage>
1714    where
1715        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1716        MaybeKey: QueryMaybeKey<K, V>,
1717        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1718        V: DebugIfDevtoolsEnabled + Clone + 'static,
1719    {
1720        self.subscribe_value_arc_local(query_scope, keyer).into()
1721    }
1722
1723    /// Subscribe to the value of a non-threadsafe query.
1724    /// The keyer function is reactive to changes in `K`.
1725    ///
1726    /// This will update whenever the query is created, removed, updated, refetched or set.
1727    ///
1728    /// Compared to a resource:
1729    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1730    #[track_caller]
1731    pub fn subscribe_value_arc_local<K, MaybeKey, V, M>(
1732        &self,
1733        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1734        keyer: impl Fn() -> MaybeKey + 'static,
1735    ) -> ArcLocalSignal<Option<V>>
1736    where
1737        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1738        MaybeKey: QueryMaybeKey<K, V>,
1739        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1740        V: DebugIfDevtoolsEnabled + Clone + 'static,
1741    {
1742        let cache_key = query_scope.cache_key();
1743        let keyer = ArcLocalSignal::derive_local(move || keyer().into_maybe_key().map(|k| KeyHash::new(&k)));
1744        let dyn_signal = self
1745            .untyped_client
1746            .scope_lookup
1747            .scope_subscriptions_mut()
1748            .add_value_set_updated_or_removed_subscription(
1749                cache_key,
1750                MaybeLocal::new_local({
1751                    let keyer = keyer.clone();
1752                    move || keyer.get()
1753                }),
1754            );
1755
1756        let scope_lookup = self.untyped_client.scope_lookup;
1757        ArcLocalSignal::derive_local(move || {
1758            dyn_signal.track();
1759            if let Some(key) = keyer.read_untracked().as_ref() {
1760                scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1761                    // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1762                    maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1763                })
1764            } else {
1765                None
1766            }
1767        })
1768    }
1769
1770    /// Subscribe to the value of a query.
1771    /// The keyer function is reactive to changes in `K`.
1772    ///
1773    /// This will update whenever the query is created, removed, updated, refetched or set.
1774    ///
1775    /// Compared to a resource:
1776    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1777    #[track_caller]
1778    pub fn subscribe_value_arc<K, MaybeKey, V, M>(
1779        &self,
1780        query_scope: impl QueryScopeTrait<K, V, M>,
1781        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1782    ) -> ArcSignal<Option<V>>
1783    where
1784        K: DebugIfDevtoolsEnabled + Hash + Clone + Send + Sync + 'static,
1785        MaybeKey: QueryMaybeKey<K, V>,
1786        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1787        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1788    {
1789        let cache_key = query_scope.cache_key();
1790        let keyer = ArcSignal::derive(move || keyer().into_maybe_key().map(|k| KeyHash::new(&k)));
1791
1792        let dyn_signal = self
1793            .untyped_client
1794            .scope_lookup
1795            .scope_subscriptions_mut()
1796            .add_value_set_updated_or_removed_subscription(
1797                cache_key,
1798                MaybeLocal::new({
1799                    let keyer = keyer.clone();
1800                    move || keyer.get()
1801                }),
1802            );
1803
1804        let scope_lookup = self.untyped_client.scope_lookup;
1805        ArcSignal::derive(move || {
1806            dyn_signal.track();
1807            if let Some(key) = keyer.read_untracked().as_ref() {
1808                scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1809                    // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1810                    maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1811                })
1812            } else {
1813                None
1814            }
1815        })
1816    }
1817
1818    /// Mark a query as stale.
1819    ///
1820    /// Any active resources will refetch in the background, replacing them when ready.
1821    #[track_caller]
1822    pub fn invalidate_query<K, V, M>(
1823        &self,
1824        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1825        key: impl Borrow<K>,
1826    ) -> bool
1827    where
1828        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1829        V: DebugIfDevtoolsEnabled + Clone + 'static,
1830    {
1831        self.untyped_client.invalidate_query(query_scope, key)
1832    }
1833
1834    /// Mark multiple queries of a specific type as stale.
1835    ///
1836    /// Any active resources will refetch in the background, replacing them when ready.
1837    #[track_caller]
1838    pub fn invalidate_queries<K, V, KRef, M>(
1839        &self,
1840        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1841        keys: impl IntoIterator<Item = KRef>,
1842    ) -> Vec<KRef>
1843    where
1844        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1845        V: DebugIfDevtoolsEnabled + Clone + 'static,
1846        KRef: Borrow<K>,
1847    {
1848        self.untyped_client.invalidate_queries(query_scope, keys)
1849    }
1850
1851    /// Mark one or more queries of a specific type as stale with a callback.
1852    ///
1853    /// When the callback returns `true`, the specific query will be invalidated.
1854    ///
1855    /// Any active resources subscribing to that query will refetch in the background,
1856    /// replacing them when ready.
1857    #[track_caller]
1858    pub fn invalidate_queries_with_predicate<K, V, M>(
1859        &self,
1860        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1861        should_invalidate: impl Fn(&K) -> bool,
1862    ) where
1863        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
1864        V: DebugIfDevtoolsEnabled + Clone + 'static,
1865    {
1866        let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1867        let mut cbs_scopes = vec![];
1868        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
1869        self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1870            &mut scopes,
1871            query_scope_info.cache_key,
1872            OnScopeMissing::Skip,
1873            |_| {},
1874            |maybe_scope, _| {
1875                if let Some(scope) = maybe_scope {
1876                    for query in scope.all_queries_mut_include_pending() {
1877                        if let Some(key) = query.key().value_if_safe()
1878                            && should_invalidate(key)
1879                        {
1880                            let cb_scopes = query.invalidate(QueryAbortReason::Invalidate);
1881                            cbs_scopes.push(cb_scopes);
1882                        }
1883                    }
1884                }
1885            },
1886        );
1887        let mut cbs_external = vec![];
1888        for cb in cbs_scopes {
1889            if let Some(cb_external) = cb(&mut scopes) {
1890                cbs_external.push(cb_external);
1891            }
1892        }
1893        drop(scopes);
1894        run_external_callbacks(self.untyped_client, query_scope_info.cache_key, cbs_external);
1895    }
1896
1897    /// Mark all queries of a specific type as stale.
1898    ///
1899    /// Any active resources will refetch in the background, replacing them when ready.
1900    #[track_caller]
1901    pub fn invalidate_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
1902    where
1903        K: Hash + 'static,
1904        V: Clone + 'static,
1905    {
1906        self.invalidate_query_scope_inner(query_scope.cache_key())
1907    }
1908
1909    pub(crate) fn invalidate_query_scope_inner(&self, scope_cache_key: ScopeCacheKey) {
1910        let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1911        let mut cbs_scopes = vec![];
1912        if let Some(scope) = scopes.get_mut(&scope_cache_key) {
1913            let cb_scopes = scope.invalidate_scope(QueryAbortReason::Invalidate);
1914            cbs_scopes.push(cb_scopes);
1915            for buster in scope.busters() {
1916                buster.try_set(new_buster_id());
1917            }
1918        }
1919        let mut cbs_external = vec![];
1920        for cb in cbs_scopes {
1921            if let Some(cb_external) = cb(&mut scopes) {
1922                cbs_external.push(cb_external);
1923            }
1924        }
1925        drop(scopes);
1926        run_external_callbacks(self.untyped_client, scope_cache_key, cbs_external);
1927    }
1928
1929    /// Mark all queries as stale.
1930    ///
1931    /// Any active resources will refetch in the background, replacing them when ready.
1932    ///
1933    /// To have the cache instantly cleared and all listeners reset to pending, e.g. for user logout,
1934    /// see [`QueryClient::clear`].
1935    #[track_caller]
1936    pub fn invalidate_all_queries(&self) {
1937        let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1938        let mut cbs_scopes = vec![];
1939        for scope in scopes.values_mut() {
1940            let busters = scope.busters();
1941            let cb_scopes = scope.invalidate_scope(QueryAbortReason::Invalidate);
1942            cbs_scopes.push((cb_scopes, scope.cache_key()));
1943            for buster in busters {
1944                buster.try_set(new_buster_id());
1945            }
1946        }
1947        let mut cbs_external = vec![];
1948        for (cb, cache_key) in cbs_scopes {
1949            if let Some(cb_external) = cb(&mut scopes) {
1950                cbs_external.push((cb_external, cache_key));
1951            }
1952        }
1953        drop(scopes);
1954        for (cb_external, cache_key) in cbs_external {
1955            run_external_callbacks(self.untyped_client, cache_key, vec![cb_external]);
1956        }
1957    }
1958
1959    /// Empty the cache, like [`QueryClient::invalidate_all_queries`] except:
1960    /// - the cache is instantly cleared of all queries
1961    /// - All active resources etc are reset to their pending state instantly
1962    ///   until the new query finishes refetching.
1963    ///
1964    /// Useful for e.g. user logout.
1965    ///
1966    /// [`QueryClient::invalidate_all_queries`] on the other hand, will only refetch
1967    /// active queries in the background, replacing them when ready.
1968    #[track_caller]
1969    pub fn clear(&self) {
1970        let mut scopes = self.untyped_client.scope_lookup.scopes_mut();
1971        let mut cbs_scopes = vec![];
1972        for scope in scopes.values_mut() {
1973            let busters = scope.busters();
1974            let cb_scopes = scope.invalidate_scope(QueryAbortReason::Clear);
1975            cbs_scopes.push((cb_scopes, scope.cache_key()));
1976            scope.clear();
1977            for buster in busters {
1978                buster.try_set(new_buster_id());
1979            }
1980        }
1981        let mut cbs_external = vec![];
1982        for (cb, cache_key) in cbs_scopes {
1983            if let Some(cb_external) = cb(&mut scopes) {
1984                cbs_external.push((cb_external, cache_key));
1985            }
1986        }
1987        drop(scopes);
1988        for (cb_external, cache_key) in cbs_external {
1989            run_external_callbacks(self.untyped_client, cache_key, vec![cb_external]);
1990        }
1991    }
1992
1993    #[cfg(test)]
1994    pub(crate) fn total_cached_queries(&self) -> usize {
1995        self.untyped_client
1996            .scope_lookup
1997            .scopes()
1998            .values()
1999            .map(|scope| scope.total_cached_queries())
2000            .sum()
2001    }
2002
2003    #[cfg(test)]
2004    pub(crate) fn is_key_invalid<K, V, M>(
2005        &self,
2006        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2007        key: impl Borrow<K>,
2008    ) -> bool
2009    where
2010        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2011        V: DebugIfDevtoolsEnabled + Clone + 'static,
2012    {
2013        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2014        self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2015            &mut self.untyped_client.scope_lookup.scopes_mut(),
2016            query_scope_info.cache_key,
2017            OnScopeMissing::Skip,
2018            |_| {},
2019            |maybe_scope, _| {
2020                if let Some(scope) = maybe_scope {
2021                    scope
2022                        .get(&KeyHash::new(key.borrow()))
2023                        .map(|query| query.is_invalidated())
2024                        .unwrap_or(false)
2025                } else {
2026                    false
2027                }
2028            },
2029        )
2030    }
2031
2032    #[cfg(test)]
2033    #[allow(dead_code)]
2034    pub(crate) fn mark_key_valid<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>, key: impl Borrow<K>)
2035    where
2036        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2037        V: DebugIfDevtoolsEnabled + Clone + 'static,
2038    {
2039        self.untyped_client.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2040            &mut self.untyped_client.scope_lookup.scopes_mut(),
2041            QueryScopeInfo::new_local(&query_scope).cache_key,
2042            OnScopeMissing::Skip,
2043            |_| {},
2044            |maybe_scope, _| {
2045                if let Some(scope) = maybe_scope
2046                    && let Some(query) = scope.get_mut(&KeyHash::new(key.borrow()))
2047                {
2048                    query.mark_valid();
2049                }
2050            },
2051        );
2052    }
2053
2054    #[cfg(test)]
2055    pub(crate) fn subscriber_count(&self) -> usize {
2056        self.untyped_client.scope_lookup.scope_subscriptions_mut().count()
2057    }
2058}
2059
2060impl UntypedQueryClient {
2061    #[track_caller]
2062    pub(crate) async fn fetch_query<K, V, M>(
2063        &self,
2064        query_scope: impl QueryScopeTrait<K, V, M>,
2065        key: impl Borrow<K>,
2066    ) -> V
2067    where
2068        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
2069        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
2070    {
2071        let query_scope_info = QueryScopeInfo::new(&query_scope);
2072        let owner_chain = OwnerChain::new(*self, query_scope_info.cache_key, Owner::current());
2073        self.fetch_inner(
2074            query_scope_info,
2075            || QueryScopeQueryInfo::new(&query_scope, key.borrow()),
2076            async |key| MaybeLocal::new(query_scope.query(key).await),
2077            key.borrow(),
2078            None,
2079            || MaybeLocal::new(key.borrow().clone()),
2080            &owner_chain,
2081        )
2082        .await
2083    }
2084
2085    #[track_caller]
2086    pub(crate) async fn fetch_query_local<K, V, M>(
2087        &self,
2088        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2089        key: impl Borrow<K>,
2090    ) -> V
2091    where
2092        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
2093        V: DebugIfDevtoolsEnabled + Clone + 'static,
2094    {
2095        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2096        let owner_chain = OwnerChain::new(*self, query_scope_info.cache_key, Owner::current());
2097        self.fetch_inner(
2098            query_scope_info,
2099            || QueryScopeQueryInfo::new_local(&query_scope, key.borrow()),
2100            async |key| MaybeLocal::new_local(query_scope.query(key).await),
2101            key.borrow(),
2102            None,
2103            || MaybeLocal::new_local(key.borrow().clone()),
2104            &owner_chain,
2105        )
2106        .await
2107    }
2108
2109    #[track_caller]
2110    async fn fetch_inner<K, V, FetcherFut>(
2111        &self,
2112        query_scope_info: QueryScopeInfo,
2113        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
2114        fetcher: impl Fn(K) -> FetcherFut,
2115        key: &K,
2116        maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
2117        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
2118        owner_chain: &OwnerChain,
2119    ) -> V
2120    where
2121        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
2122        V: DebugIfDevtoolsEnabled + Clone + 'static,
2123        FetcherFut: Future<Output = MaybeLocal<V>>,
2124    {
2125        self.cached_or_fetch(
2126            self.options,
2127            query_scope_info.options,
2128            None,
2129            &query_scope_info,
2130            query_scope_info_for_new_query,
2131            key,
2132            fetcher,
2133            |info| {
2134                match info.variant {
2135                    CachedOrFetchCbInputVariant::CachedUntouched => {
2136                        if info.cached.stale_or_invalidated() {
2137                            return CachedOrFetchCbOutput::Refetch;
2138                        }
2139                    }
2140                    CachedOrFetchCbInputVariant::CachedUpdated => {
2141                        // Update anything using it:
2142                        info.cached.buster.set(new_buster_id());
2143                    }
2144                    CachedOrFetchCbInputVariant::Fresh => {}
2145                }
2146                CachedOrFetchCbOutput::Return(
2147                    // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
2148                    info.cached.value_maybe_stale().value_may_panic().clone(),
2149                )
2150            },
2151            maybe_preheld_fetcher_mutex_guard,
2152            lazy_maybe_local_key,
2153            owner_chain,
2154        )
2155        .await
2156    }
2157
2158    #[track_caller]
2159    pub(crate) fn invalidate_query<K, V, M>(
2160        &self,
2161        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2162        key: impl Borrow<K>,
2163    ) -> bool
2164    where
2165        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2166        V: DebugIfDevtoolsEnabled + Clone + 'static,
2167    {
2168        let cleared = self.invalidate_queries(query_scope, std::iter::once(key));
2169        !cleared.is_empty()
2170    }
2171
2172    #[track_caller]
2173    pub(crate) fn invalidate_queries<K, V, KRef, M>(
2174        &self,
2175        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2176        keys: impl IntoIterator<Item = KRef>,
2177    ) -> Vec<KRef>
2178    where
2179        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2180        V: DebugIfDevtoolsEnabled + Clone + 'static,
2181        KRef: Borrow<K>,
2182    {
2183        self.invalidate_queries_inner::<K, V, _>(&QueryScopeInfo::new_local(&query_scope), keys)
2184    }
2185
2186    #[track_caller]
2187    pub(crate) fn invalidate_queries_inner<K, V, KRef>(
2188        &self,
2189        query_scope_info: &QueryScopeInfo,
2190        keys: impl IntoIterator<Item = KRef>,
2191    ) -> Vec<KRef>
2192    where
2193        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2194        V: DebugIfDevtoolsEnabled + Clone + 'static,
2195        KRef: Borrow<K>,
2196    {
2197        let keys = keys.into_iter().collect::<Vec<_>>();
2198        let key_hashes = keys.iter().map(|key| KeyHash::new(key.borrow())).collect::<Vec<_>>();
2199
2200        if key_hashes.is_empty() {
2201            return vec![];
2202        }
2203
2204        let mut scopes = self.scope_lookup.scopes_mut();
2205        let mut cbs_scopes = vec![];
2206        let results = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2207            &mut scopes,
2208            query_scope_info.cache_key,
2209            OnScopeMissing::Skip,
2210            |_| {},
2211            |maybe_scope, _| {
2212                let mut invalidated = vec![];
2213                if let Some(scope) = maybe_scope {
2214                    for (key, key_hash) in keys.into_iter().zip(key_hashes.iter()) {
2215                        if let Some(cached) = scope.get_mut_include_pending(key_hash) {
2216                            let cb_scopes = cached.invalidate(QueryAbortReason::Invalidate);
2217                            cbs_scopes.push(cb_scopes);
2218                            invalidated.push(key);
2219                        }
2220                    }
2221                }
2222                invalidated
2223            },
2224        );
2225        let mut cbs_external = vec![];
2226        for cb in cbs_scopes {
2227            if let Some(cb_external) = cb(&mut scopes) {
2228                cbs_external.push(cb_external);
2229            }
2230        }
2231        drop(scopes);
2232        run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2233        results
2234    }
2235
2236    #[cfg(test)]
2237    /// Clear a specific query scope of all its queries.
2238    #[track_caller]
2239    pub(crate) fn clear_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
2240    where
2241        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2242        V: DebugIfDevtoolsEnabled + Clone + 'static,
2243    {
2244        let mut scopes = self.scope_lookup.scopes_mut();
2245        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2246        let mut cbs_scopes = vec![];
2247        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2248            &mut scopes,
2249            query_scope_info.cache_key,
2250            OnScopeMissing::Skip,
2251            |_| {},
2252            |maybe_scope, _| {
2253                if let Some(scope) = maybe_scope {
2254                    for cached in scope.all_queries_mut_include_pending() {
2255                        let cb_scopes = cached.invalidate(QueryAbortReason::Clear);
2256                        cbs_scopes.push(cb_scopes);
2257                    }
2258                }
2259            },
2260        );
2261        let mut cbs_external = vec![];
2262        for cb in cbs_scopes {
2263            if let Some(cb_external) = cb(&mut scopes) {
2264                cbs_external.push(cb_external);
2265            }
2266        }
2267        drop(scopes);
2268        run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2269    }
2270
2271    /// Clear a specific query key.
2272    #[track_caller]
2273    pub(crate) fn clear_query<K, V, M>(
2274        &self,
2275        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2276        key: impl Borrow<K>,
2277    ) -> bool
2278    where
2279        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2280        V: DebugIfDevtoolsEnabled + Clone + 'static,
2281    {
2282        let mut scopes = self.scope_lookup.scopes_mut();
2283        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
2284        let mut cbs_scopes = vec![];
2285        let result = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2286            &mut scopes,
2287            query_scope_info.cache_key,
2288            OnScopeMissing::Skip,
2289            |_| {},
2290            |maybe_scope, _| {
2291                if let Some(scope) = maybe_scope {
2292                    let key_hash = KeyHash::new(key.borrow());
2293                    if let Some(cached) = scope.get_mut_include_pending(&key_hash) {
2294                        let cb_scopes = cached.invalidate(QueryAbortReason::Clear);
2295                        cbs_scopes.push(cb_scopes);
2296                    }
2297                    let removed = scope.remove_entry(&key_hash);
2298                    // Calling it again just in case because in tests might be in sync cache and non sync cache:
2299                    scope.remove_entry(&key_hash);
2300                    return removed.is_some();
2301                }
2302                false
2303            },
2304        );
2305        let mut cbs_external = vec![];
2306        for cb in cbs_scopes {
2307            if let Some(cb_external) = cb(&mut scopes) {
2308                cbs_external.push(cb_external);
2309            }
2310        }
2311        drop(scopes);
2312        run_external_callbacks(*self, query_scope_info.cache_key, cbs_external);
2313        result
2314    }
2315
2316    pub(crate) fn query_metadata<K, V>(
2317        &self,
2318        scope_cache_key: ScopeCacheKey,
2319        key: impl Borrow<K>,
2320    ) -> Option<QueryMetadata>
2321    where
2322        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2323        V: DebugIfDevtoolsEnabled + Clone + 'static,
2324    {
2325        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2326            &mut self.scope_lookup.scopes_mut(),
2327            scope_cache_key,
2328            OnScopeMissing::Skip,
2329            |_| {},
2330            |maybe_scope, _| {
2331                if let Some(scope) = maybe_scope
2332                    && let Some(cached) = scope.get(&KeyHash::new(key.borrow()))
2333                {
2334                    return Some(QueryMetadata {
2335                        updated_at: cached.updated_at,
2336                        stale_or_invalidated: cached.stale_or_invalidated(),
2337                    });
2338                }
2339                None
2340            },
2341        )
2342    }
2343
2344    pub async fn cached_or_fetch<K, V, T, FetcherFut>(
2345        &self,
2346        client_options: QueryOptions,
2347        scope_options: Option<QueryOptions>,
2348        maybe_buster_if_uncached: Option<ArcRwSignal<u64>>,
2349        query_scope_info: &QueryScopeInfo,
2350        query_scope_info_for_new_query: impl Fn() -> QueryScopeQueryInfo<K>,
2351        key: &K,
2352        fetcher: impl Fn(K) -> FetcherFut,
2353        return_cb: impl Fn(CachedOrFetchCbInput<K, V>) -> CachedOrFetchCbOutput<T>,
2354        maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
2355        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
2356        owner_chain: &OwnerChain,
2357    ) -> T
2358    where
2359        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
2360        V: DebugIfDevtoolsEnabled + Clone + 'static,
2361        FetcherFut: Future<Output = MaybeLocal<V>>,
2362    {
2363        let scope_lookup = &self.scope_lookup;
2364        let key_hash = KeyHash::new(key);
2365        let mut cached_buster = None;
2366        let next_directive =
2367            scope_lookup.with_cached_query::<K, V, _>(&key_hash, &query_scope_info.cache_key, |maybe_cached| {
2368                if let Some(cached) = maybe_cached {
2369                    cached_buster = Some(cached.buster.clone());
2370                    return_cb(CachedOrFetchCbInput {
2371                        cached,
2372                        variant: CachedOrFetchCbInputVariant::CachedUntouched,
2373                    })
2374                } else {
2375                    CachedOrFetchCbOutput::Refetch
2376                }
2377            });
2378
2379        match next_directive {
2380            CachedOrFetchCbOutput::Return(value) => value,
2381            CachedOrFetchCbOutput::Refetch => {
2382                // Will probably need to fetch, unless someone fetches whilst trying to get hold of the fetch mutex:
2383                let fetcher_mutex = scope_lookup.fetcher_mutex::<K, V>(key_hash, query_scope_info);
2384                let _maybe_fetcher_mutex_guard_local = if maybe_preheld_fetcher_mutex_guard.is_none() {
2385                    let _fetcher_guard = match fetcher_mutex.try_lock() {
2386                        Some(fetcher_guard) => fetcher_guard,
2387                        None => {
2388                            // If have to wait, should check cache again in case it was fetched while waiting.
2389                            let fetcher_guard = fetcher_mutex.lock().await;
2390                            let next_directive = scope_lookup.with_cached_query::<K, V, _>(
2391                                &key_hash,
2392                                &query_scope_info.cache_key,
2393                                |maybe_cached| {
2394                                    if let Some(cached) = maybe_cached {
2395                                        cached_buster = Some(cached.buster.clone());
2396                                        return_cb(CachedOrFetchCbInput {
2397                                            cached,
2398                                            variant: CachedOrFetchCbInputVariant::CachedUntouched,
2399                                        })
2400                                    } else {
2401                                        CachedOrFetchCbOutput::Refetch
2402                                    }
2403                                },
2404                            );
2405                            match next_directive {
2406                                CachedOrFetchCbOutput::Return(value) => return value,
2407                                CachedOrFetchCbOutput::Refetch => fetcher_guard,
2408                            }
2409                        }
2410                    };
2411                    Some(_fetcher_guard)
2412                } else {
2413                    // Owned externally so not an issue.
2414                    None
2415                };
2416
2417                #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
2418                let before_time = chrono::Utc::now();
2419
2420                let loading_first_time = cached_buster.is_none();
2421
2422                #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
2423                {
2424                    // Running this before fetching so it'll show up in devtools straight away:
2425                    if loading_first_time {
2426                        scope_lookup.client_subscriptions_mut().notify_query_created(
2427                            crate::subs_client::QueryCreatedInfo {
2428                                cache_key: query_scope_info.cache_key,
2429                                scope_title: query_scope_info.title.clone(),
2430                                key_hash,
2431                                debug_key: crate::utils::DebugValue::new(key),
2432                                combined_options: crate::options_combine(client_options, scope_options),
2433                            },
2434                        );
2435                    }
2436                }
2437
2438                let maybe_local_key = lazy_maybe_local_key();
2439
2440                enum MaybeNewValue<V> {
2441                    NewValue(V),
2442                    SsrStreamedValueOverride,
2443                }
2444
2445                let maybe_new_value = scope_lookup
2446                    .with_notify_fetching(
2447                        query_scope_info.cache_key,
2448                        key_hash,
2449                        loading_first_time,
2450                        // Call the fetcher, but reset and repeat if an invalidation occurs whilst in-flight:
2451                        async {
2452                            loop {
2453                                let query_abort_rx = scope_lookup.prepare_invalidation_channel::<K, V>(
2454                                    query_scope_info,
2455                                    key_hash,
2456                                    &maybe_local_key,
2457                                );
2458
2459                                let fut = owner_chain.with(|| fetcher(key.clone()));
2460
2461                                futures::select_biased! {
2462                                    rx_result = query_abort_rx.fuse() => {
2463                                        if let Ok(reason) = rx_result {
2464                                            match reason {
2465                                                QueryAbortReason::Invalidate | QueryAbortReason::Clear => {},
2466                                                QueryAbortReason::SsrStreamedValueOverride => {
2467                                                    break MaybeNewValue::SsrStreamedValueOverride;
2468                                                },
2469                                            }
2470                                        }
2471                                    },
2472                                    new_value = fut.fuse() => {
2473                                        break MaybeNewValue::NewValue(new_value);
2474                                    },
2475                                }
2476                            }
2477                        },
2478                    )
2479                    .await;
2480
2481                #[cfg(any(all(debug_assertions, feature = "devtools"), feature = "devtools-always"))]
2482                let elapsed_ms = chrono::Utc::now().signed_duration_since(before_time).num_milliseconds();
2483
2484                let buster_if_uncached = if loading_first_time {
2485                    Some(maybe_buster_if_uncached.unwrap_or_else(|| ArcRwSignal::new(new_buster_id())))
2486                } else {
2487                    None
2488                };
2489
2490                let next_directive = scope_lookup.with_cached_scope_mut::<_, _, _, _>(
2491                    &mut scope_lookup.scopes_mut(),
2492                    query_scope_info.cache_key,
2493                    OnScopeMissing::Create(query_scope_info),
2494                    |_| {},
2495                    |scope, _| {
2496                        let scope = scope.expect("provided a default");
2497                        match maybe_new_value {
2498                            MaybeNewValue::NewValue(new_value) => {
2499                                if let Some(cached) = scope.get_mut(&key_hash) {
2500                                    cached.set_value(
2501                                        new_value,
2502                                        true,
2503                                        #[cfg(any(
2504                                            all(debug_assertions, feature = "devtools"),
2505                                            feature = "devtools-always"
2506                                        ))]
2507                                        crate::events::Event::new(crate::events::EventVariant::Fetched { elapsed_ms }),
2508                                        ResetInvalidated::Reset,
2509                                    );
2510                                    return_cb(CachedOrFetchCbInput {
2511                                        cached,
2512                                        variant: CachedOrFetchCbInputVariant::CachedUpdated,
2513                                    })
2514                                } else {
2515                                    // We already notified before the async fetch, so it would show up sooner.
2516                                    scope.insert_without_query_created_notif(
2517                                        key_hash,
2518                                        Query::new(
2519                                            client_options,
2520                                            *self,
2521                                            query_scope_info,
2522                                            query_scope_info_for_new_query(),
2523                                            key_hash,
2524                                            maybe_local_key,
2525                                            new_value,
2526                                            buster_if_uncached.expect("loading_first_time means this is Some(). (bug)"),
2527                                            scope_options,
2528                                            None,
2529                                            #[cfg(any(
2530                                                all(debug_assertions, feature = "devtools"),
2531                                                feature = "devtools-always"
2532                                            ))]
2533                                            crate::events::Event::new(crate::events::EventVariant::Fetched {
2534                                                elapsed_ms,
2535                                            }),
2536                                        ),
2537                                    );
2538                                    return_cb(CachedOrFetchCbInput {
2539                                        cached: scope.get(&key_hash).expect("Just set. (bug)"),
2540                                        variant: CachedOrFetchCbInputVariant::Fresh,
2541                                    })
2542                                }
2543                            }
2544                            MaybeNewValue::SsrStreamedValueOverride => return_cb(CachedOrFetchCbInput {
2545                                cached: scope
2546                                    .get(&key_hash)
2547                                    .expect("Should contain value streamed from server. (bug)"),
2548                                variant: CachedOrFetchCbInputVariant::Fresh,
2549                            }),
2550                        }
2551                    },
2552                );
2553
2554                match next_directive {
2555                    CachedOrFetchCbOutput::Refetch => {
2556                        panic!("Unexpected refetch directive after providing fresh value. (bug)")
2557                    }
2558                    CachedOrFetchCbOutput::Return(return_value) => return_value,
2559                }
2560            }
2561        }
2562    }
2563}
2564
2565pub(crate) struct QueryMetadata {
2566    pub updated_at: DateTime<Utc>,
2567    pub stale_or_invalidated: bool,
2568}