leptos_fetch/
query_client.rs

1#![allow(ungated_async_fn_track_caller)] // Want it to auto-turn on when stable
2
3use std::{
4    borrow::Borrow,
5    collections::HashMap,
6    fmt::Debug,
7    hash::Hash,
8    sync::{Arc, LazyLock, atomic::AtomicBool},
9};
10
11use chrono::{DateTime, Utc};
12use codee::{Decoder, Encoder};
13use futures::pin_mut;
14use leptos::{
15    prelude::{
16        ArcRwSignal, ArcSignal, Effect, Get, GetUntracked, LocalStorage, Owner, Read,
17        ReadUntracked, Set, Signal, Track, provide_context,
18    },
19    server::{
20        ArcLocalResource, ArcResource, FromEncodedStr, IntoEncodedString, LocalResource, Resource,
21    },
22};
23use send_wrapper::SendWrapper;
24
25use crate::{
26    ArcLocalSignal, QueryOptions,
27    cache::{CachedOrFetchCbInputVariant, CachedOrFetchCbOutput},
28    cache_scope::{QueryAbortReason, QueryOrPending},
29    debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
30    maybe_local::MaybeLocal,
31    query::Query,
32    query_maybe_key::QueryMaybeKey,
33    query_scope::{QueryScopeInfo, QueryScopeLocalTrait, QueryScopeTrait, ScopeCacheKey},
34    resource_drop_guard::ResourceDropGuard,
35    utils::{KeyHash, OwnerChain, ResetInvalidated, new_buster_id, new_resource_id},
36};
37
38use super::cache::ScopeLookup;
39
40#[cfg(not(feature = "rkyv"))]
41pub(crate) type DefaultCodec = codee::string::JsonSerdeCodec;
42
43#[cfg(feature = "rkyv")]
44pub(crate) type DefaultCodec = codee::binary::RkyvCodec;
45
46task_local::task_local! {
47    pub(crate) static ASYNC_TRACK_UPDATE_MARKER: Arc<AtomicBool>;
48}
49
50std::thread_local! {
51    pub(crate) static SYNC_TRACK_UPDATE_MARKER: AtomicBool = const { AtomicBool::new(true) };
52}
53
54/// The [`QueryClient`] stores all query data, and is used to manage queries.
55///
56/// Should be provided via leptos context at the top of the app.
57///
58/// # Example
59///
60/// ```
61/// use leptos::prelude::*;
62/// use leptos_fetch::QueryClient;
63///
64/// #[component]
65/// pub fn App() -> impl IntoView {
66///    QueryClient::new().provide();
67///     // ...
68/// }
69///
70/// #[component]
71/// pub fn MyComponent() -> impl IntoView {
72///     let client: QueryClient = expect_context();
73///      // ...
74/// }
75/// ```
76pub struct QueryClient<Codec = DefaultCodec> {
77    pub(crate) scope_lookup: ScopeLookup,
78    options: QueryOptions,
79    created_at: DateTime<Utc>,
80    _ser: std::marker::PhantomData<SendWrapper<Codec>>,
81}
82
83impl<Codec> Debug for QueryClient<Codec> {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("QueryClient")
86            .field("scope_lookup", &self.scope_lookup)
87            .field("options", &self.options)
88            .field("codec", &std::any::type_name::<Codec>())
89            .finish()
90    }
91}
92
93impl<Codec: 'static> Clone for QueryClient<Codec> {
94    fn clone(&self) -> Self {
95        *self
96    }
97}
98
99impl<Codec: 'static> Copy for QueryClient<Codec> {}
100
101impl Default for QueryClient<DefaultCodec> {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl QueryClient<DefaultCodec> {
108    /// Creates a new [`QueryClient`] with the default codec: [`codee::string::JsonSerdeCodec`].
109    ///
110    /// Call [`QueryClient::set_codec()`] to set a different codec.
111    ///
112    /// Call [`QueryClient::with_options()`] to set non-default options.
113    #[track_caller]
114    pub fn new() -> Self {
115        Self {
116            scope_lookup: ScopeLookup::new(),
117            options: QueryOptions::default(),
118            created_at: Utc::now(),
119            _ser: std::marker::PhantomData,
120        }
121    }
122}
123
124impl<Codec: 'static> QueryClient<Codec> {
125    /// Provide the client to leptos context.
126    /// ```no_run
127    /// use leptos_fetch::QueryClient;
128    /// use leptos::prelude::*;
129    ///
130    /// QueryClient::new().provide();
131    ///
132    /// let client: QueryClient = expect_context();
133    /// ```
134    #[track_caller]
135    pub fn provide(self) -> Self {
136        provide_context(self);
137        self
138    }
139
140    /// **Applies to `ssr` only**
141    ///
142    /// It's possible to use non-json codecs for streaming leptos resources from the backend.
143    /// The default is [`codee::string::JsonSerdeCodec`].
144    ///
145    /// The current `codee` major version is `0.3` and will need to be imported in your project to customize the codec.
146    ///
147    /// E.g. to use [`codee::binary::MsgpackSerdeCodec`](https://docs.rs/codee/latest/codee/binary/struct.MsgpackSerdeCodec.html):
148    /// ```toml
149    /// codee = { version = "0.3", features = ["msgpack_serde"] }
150    /// ```
151    ///
152    /// This is a generic type on the [`QueryClient`], so when calling [`leptos::prelude::expect_context`],
153    /// this type must be specified when not using the default.
154    ///
155    /// A useful pattern is to type alias the client with the custom codec for your whole app:
156    ///
157    /// ```no_run
158    /// use codee::binary::MsgpackSerdeCodec;
159    /// use leptos::prelude::*;
160    /// use leptos_fetch::QueryClient;
161    ///
162    /// type MyQueryClient = QueryClient<MsgpackSerdeCodec>;
163    ///
164    /// // Create and provide to context to make accessible everywhere:
165    /// QueryClient::new().set_codec::<MsgpackSerdeCodec>().provide();
166    ///
167    /// let client: MyQueryClient = expect_context();
168    /// ```
169    #[track_caller]
170    pub fn set_codec<NewCodec>(self) -> QueryClient<NewCodec> {
171        QueryClient {
172            scope_lookup: self.scope_lookup,
173            options: self.options,
174            created_at: self.created_at,
175            _ser: std::marker::PhantomData,
176        }
177    }
178
179    /// Set non-default options to apply to all queries.
180    ///
181    /// These options will be combined with any options for a specific query scope.
182    #[track_caller]
183    pub fn with_options(mut self, options: QueryOptions) -> Self {
184        self.options = options;
185        self
186    }
187
188    /// Read the base [`QueryOptions`] for this [`QueryClient`].
189    ///
190    /// These will be combined with any options for a specific query scope.
191    #[track_caller]
192    pub fn options(&self) -> QueryOptions {
193        self.options
194    }
195
196    /// Provide a signal to globally enable/disable auto refetching of queries
197    /// that have active resources and have set [`QueryOptions::with_refetch_interval`].
198    ///
199    /// Whilst this signal returns `false`, refetches will be skipped.
200    #[track_caller]
201    pub fn with_refetch_enabled_toggle(self, refetch_enabled: impl Into<ArcSignal<bool>>) -> Self {
202        self.scope_lookup.scopes_mut().refetch_enabled = Some(refetch_enabled.into());
203        self
204    }
205
206    /// If set, access the signal that globally enables/disables auto refetching of queries.
207    #[track_caller]
208    pub fn refetch_enabled(&self) -> Option<ArcSignal<bool>> {
209        self.scope_lookup.scopes().refetch_enabled.clone()
210    }
211
212    /// Query with [`LocalResource`]. Local resouces only load data on the client, so can be used with non-threadsafe/serializable data.
213    ///
214    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
215    #[track_caller]
216    pub fn local_resource<K, MaybeKey, V, M>(
217        &self,
218        query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
219        keyer: impl Fn() -> MaybeKey + 'static,
220    ) -> LocalResource<MaybeKey::MappedValue>
221    where
222        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
223        MaybeKey: QueryMaybeKey<K, V>,
224        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
225        V: DebugIfDevtoolsEnabled + Clone + 'static,
226    {
227        self.arc_local_resource(query_scope, keyer).into()
228    }
229
230    /// Query with [`ArcLocalResource`]. Local resouces only load data on the client, so can be used with non-threadsafe/serializable data.
231    ///
232    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
233    #[track_caller]
234    pub fn arc_local_resource<K, MaybeKey, V, M>(
235        &self,
236        query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
237        keyer: impl Fn() -> MaybeKey + 'static,
238    ) -> ArcLocalResource<MaybeKey::MappedValue>
239    where
240        K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
241        MaybeKey: QueryMaybeKey<K, V>,
242        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
243        V: DebugIfDevtoolsEnabled + Clone + 'static,
244    {
245        let client = *self;
246        let client_options = self.options();
247        let scope_lookup = self.scope_lookup;
248        let cache_key = query_scope.cache_key();
249        let query_scope_info = QueryScopeInfo::new_local(&query_scope);
250        let query_scope = Arc::new(query_scope);
251        let query_options = query_scope.options();
252        let resource_id = new_resource_id();
253
254        // To call .mark_resource_dropped() when the resource is dropped:
255        let drop_guard = ResourceDropGuard::<K, V>::new(self.scope_lookup, resource_id, cache_key);
256
257        ArcLocalResource::new({
258            move || {
259                let query_scope = query_scope.clone();
260                let query_scope_info = query_scope_info.clone();
261                let maybe_key = keyer().into_maybe_key();
262                let invalidation_prefix = maybe_key
263                    .as_ref()
264                    .and_then(|key| query_scope.invalidation_prefix(key));
265                let drop_guard = drop_guard.clone();
266                if let Some(key) = maybe_key.as_ref() {
267                    drop_guard.set_active_key(KeyHash::new(key));
268                }
269                // Note: cannot hoist outside of resource,
270                // it prevents the resource itself dropping when the owner is held by the resource itself,
271                // here each instance only lasts as long as the query.
272                let owner_chain = OwnerChain::new(Owner::current());
273                async move {
274                    if let Some(key) = maybe_key {
275                        let value = scope_lookup
276                            .cached_or_fetch(
277                                client_options,
278                                query_options,
279                                None,
280                                &query_scope_info,
281                                invalidation_prefix,
282                                &key,
283                                {
284                                    let query_scope = query_scope.clone();
285                                    async move |key| {
286                                        MaybeLocal::new_local(query_scope.query(key).await)
287                                    }
288                                },
289                                |info| {
290                                    info.cached.buster.track();
291                                    info.cached.mark_resource_active(resource_id);
292                                    match info.variant {
293                                        CachedOrFetchCbInputVariant::CachedUntouched => {
294                                            // If stale refetch in the background with the prefetch() function, which'll recognise it's stale, refetch it and invalidate busters:
295                                            if cfg!(any(test, not(feature = "ssr")))
296                                                && info.cached.stale()
297                                            {
298                                                let key = key.clone();
299                                                let query_scope = query_scope.clone();
300                                                let owner_chain = owner_chain.clone();
301                                                // Just adding the SendWrapper and using spawn() rather than spawn_local() to fix tests:
302                                                leptos::task::spawn(SendWrapper::new(async move {
303                                                    client
304                                                        .prefetch_inner(
305                                                            QueryScopeInfo::new_local(&query_scope),
306                                                            query_scope
307                                                                .invalidation_prefix(key.borrow()),
308                                                            async move |key| {
309                                                                MaybeLocal::new_local(
310                                                                    query_scope.query(key).await,
311                                                                )
312                                                            },
313                                                            key.borrow(),
314                                                            || {
315                                                                MaybeLocal::new_local(
316                                                                    key.borrow().clone(),
317                                                                )
318                                                            },
319                                                            &owner_chain,
320                                                        )
321                                                        .await;
322                                                }));
323                                            }
324                                        }
325                                        CachedOrFetchCbInputVariant::Fresh => {}
326                                        CachedOrFetchCbInputVariant::CachedUpdated => {
327                                            panic!("Didn't direct inner to refetch here. (bug)")
328                                        }
329                                    }
330                                    CachedOrFetchCbOutput::Return(
331                                        // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
332                                        info.cached.value_maybe_stale().value_may_panic().clone(),
333                                    )
334                                },
335                                None,
336                                || MaybeLocal::new_local(key.clone()),
337                                &owner_chain,
338                            )
339                            .await;
340                        MaybeKey::prepare_mapped_value(Some(value))
341                    } else {
342                        MaybeKey::prepare_mapped_value(None)
343                    }
344                }
345            }
346        })
347    }
348
349    /// Query with [`Resource`].
350    ///
351    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
352    ///
353    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
354    ///
355    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
356    #[track_caller]
357    pub fn resource<K, MaybeKey, V, M>(
358        &self,
359        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
360        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
361    ) -> Resource<MaybeKey::MappedValue, Codec>
362    where
363        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
364        MaybeKey: QueryMaybeKey<K, V>,
365        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
366        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
367        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
368        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
369        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
370        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
371            Debug,
372        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
373        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
374    {
375        self.arc_resource_with_options(query_scope, keyer, false)
376            .into()
377    }
378
379    /// Query with a blocking [`Resource`].
380    ///
381    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
382    ///
383    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
384    ///
385    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
386    #[track_caller]
387    pub fn resource_blocking<K, MaybeKey, V, M>(
388        &self,
389        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
390        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
391    ) -> Resource<MaybeKey::MappedValue, Codec>
392    where
393        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
394        MaybeKey: QueryMaybeKey<K, V>,
395        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
396        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
397        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
398        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
399        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
400        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
401            Debug,
402        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
403        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
404    {
405        self.arc_resource_with_options(query_scope, keyer, true)
406            .into()
407    }
408
409    /// Query with [`ArcResource`].
410    ///
411    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
412    ///
413    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
414    ///
415    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
416    #[track_caller]
417    pub fn arc_resource<K, MaybeKey, V, M>(
418        &self,
419        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
420        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
421    ) -> ArcResource<MaybeKey::MappedValue, Codec>
422    where
423        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
424        MaybeKey: QueryMaybeKey<K, V>,
425        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
426        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
427        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
428        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
429        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
430        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
431            Debug,
432        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
433        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
434    {
435        self.arc_resource_with_options(query_scope, keyer, false)
436    }
437
438    /// Query with a blocking [`ArcResource`].
439    ///
440    /// Resources must be serializable to potentially load in `ssr` and stream to the client.
441    ///
442    /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
443    ///
444    /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
445    #[track_caller]
446    pub fn arc_resource_blocking<K, MaybeKey, V, M>(
447        &self,
448        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
449        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
450    ) -> ArcResource<MaybeKey::MappedValue, Codec>
451    where
452        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
453        MaybeKey: QueryMaybeKey<K, V>,
454        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
455        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
456        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
457        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
458        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
459        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
460            Debug,
461        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
462        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
463    {
464        self.arc_resource_with_options(query_scope, keyer, true)
465    }
466
467    #[track_caller]
468    fn arc_resource_with_options<K, MaybeKey, V, M>(
469        &self,
470        query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
471        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
472        blocking: bool,
473    ) -> ArcResource<MaybeKey::MappedValue, Codec>
474    where
475        K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
476        MaybeKey: QueryMaybeKey<K, V>,
477        MaybeKey::MappedValue: Clone + Send + Sync + 'static,
478        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
479        Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
480        <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
481        <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
482        <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
483            Debug,
484        <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
485        <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
486    {
487        let client = *self;
488        let client_options = self.options();
489        let cache_key = query_scope.cache_key();
490        let query_scope_info = QueryScopeInfo::new(&query_scope);
491        let query_scope = Arc::new(query_scope);
492        let scope_lookup = self.scope_lookup;
493        let query_options = query_scope.options();
494
495        let buster_if_uncached = ArcRwSignal::new(new_buster_id());
496        let resource_id = new_resource_id();
497
498        // To call .mark_resource_dropped() when the resource is dropped:
499        let drop_guard = ResourceDropGuard::<K, V>::new(self.scope_lookup, resource_id, cache_key);
500
501        let keyer = Arc::new(keyer);
502        let resource = ArcResource::new_with_options(
503            {
504                let buster_if_uncached = buster_if_uncached.clone();
505                let drop_guard = drop_guard.clone();
506                let keyer = keyer.clone();
507                move || {
508                    if let Some(key) = keyer().into_maybe_key() {
509                        let key_hash = KeyHash::new(&key);
510                        drop_guard.set_active_key(key_hash);
511                        scope_lookup.with_cached_query::<K, V, _>(
512                            &key_hash,
513                            &cache_key,
514                            |maybe_cached| {
515                                if let Some(cached) = maybe_cached {
516                                    // Buster must be returned for it to be tracked.
517                                    (Some(key.clone()), cached.buster.get())
518                                } else {
519                                    // Buster must be returned for it to be tracked.
520                                    (Some(key.clone()), buster_if_uncached.get())
521                                }
522                            },
523                        )
524                    } else {
525                        (None, buster_if_uncached.get())
526                    }
527                }
528            },
529            {
530                let buster_if_uncached = buster_if_uncached.clone();
531                let query_scope_info = query_scope_info.clone();
532                let query_scope = query_scope.clone();
533                move |(maybe_key, last_used_buster)| {
534                    let query_scope = query_scope.clone();
535                    let query_scope_info = query_scope_info.clone();
536                    let invalidation_prefix = maybe_key
537                        .as_ref()
538                        .and_then(|key| query_scope.invalidation_prefix(key));
539                    let buster_if_uncached = buster_if_uncached.clone();
540                    let _drop_guard = drop_guard.clone(); // Want the guard around everywhere until the resource is dropped.
541                    // Note: cannot hoist outside of resource,
542                    // it prevents the resource itself dropping when the owner is held by the resource itself,
543                    // here each instance only lasts as long as the query.
544                    let owner_chain = OwnerChain::new(Owner::current());
545                    async move {
546                        if let Some(key) = maybe_key {
547                            let value = scope_lookup
548                                .cached_or_fetch(
549                                    client_options,
550                                    query_options,
551                                    Some(buster_if_uncached.clone()),
552                                    &query_scope_info,
553                                    invalidation_prefix,
554                                    &key,
555                                    {
556                                        let query_scope = query_scope.clone();
557                                        async move |key| {
558                                            MaybeLocal::new(query_scope.query(key).await)
559                                        }
560                                    },
561                                    |info| {
562                                        info.cached.mark_resource_active(resource_id);
563                                        match info.variant {
564                                            CachedOrFetchCbInputVariant::CachedUntouched => {
565                                                // If stale refetch in the background with the prefetch() function, which'll recognise it's stale, refetch it and invalidate busters:
566                                                if cfg!(any(test, not(feature = "ssr")))
567                                                    && info.cached.stale()
568                                                {
569                                                    let key = key.clone();
570                                                    let query_scope = query_scope.clone();
571                                                    let owner_chain = owner_chain.clone();
572                                                    leptos::task::spawn(async move {
573                                                        client
574                                                            .prefetch_inner(
575                                                                QueryScopeInfo::new(&query_scope),
576                                                                query_scope.invalidation_prefix(
577                                                                    key.borrow(),
578                                                                ),
579                                                                async move |key| {
580                                                                    MaybeLocal::new(
581                                                                        query_scope
582                                                                            .query(key)
583                                                                            .await,
584                                                                    )
585                                                                },
586                                                                key.borrow(),
587                                                                || {
588                                                                    MaybeLocal::new(
589                                                                        key.borrow().clone(),
590                                                                    )
591                                                                },
592                                                                &owner_chain,
593                                                            )
594                                                            .await;
595                                                    });
596                                                }
597
598                                                // Handle edge case where the key function saw it was uncached, so entered here,
599                                                // but when the cached_or_fetch was acquiring the fetch mutex, someone else cached it,
600                                                // meaning our key function won't be reactive correctly unless we invalidate it now:
601                                                if last_used_buster
602                                                    != info.cached.buster.get_untracked()
603                                                {
604                                                    buster_if_uncached
605                                                        .set(info.cached.buster.get_untracked());
606                                                }
607                                            }
608                                            CachedOrFetchCbInputVariant::Fresh => {}
609                                            CachedOrFetchCbInputVariant::CachedUpdated => {
610                                                panic!("Didn't direct inner to refetch here. (bug)")
611                                            }
612                                        }
613                                        CachedOrFetchCbOutput::Return(
614                                            // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
615                                            info.cached
616                                                .value_maybe_stale()
617                                                .value_may_panic()
618                                                .clone(),
619                                        )
620                                    },
621                                    None,
622                                    || MaybeLocal::new(key.clone()),
623                                    &owner_chain,
624                                )
625                                .await;
626                            MaybeKey::prepare_mapped_value(Some(value))
627                        } else {
628                            MaybeKey::prepare_mapped_value(None)
629                        }
630                    }
631                }
632            },
633            blocking,
634        );
635
636        // On the client, want to repopulate the frontend cache, so should write resources to the cache here if they don't exist.
637        // It would be better if in here we could check if the resource was started on the backend/streamed, saves doing most of this if already a frontend resource.
638        let effect = {
639            let resource = resource.clone();
640            let buster_if_uncached = buster_if_uncached.clone();
641            let client_created_at = self.created_at;
642            // Converting to Arc because the tests like the client get dropped even though this persists:
643            move |complete: Option<Option<()>>| {
644                if let Some(Some(())) = complete {
645                    return Some(());
646                }
647                if let Some(val) = resource.read().as_ref() {
648                    if MaybeKey::mapped_value_is_some(val) {
649                        scope_lookup.with_cached_scope_mut::<K, V, _, _>(
650                            &query_scope_info,
651                            true,
652                            |_| {},
653                            |maybe_scope, _| {
654                                let scope = maybe_scope.expect("provided a default");
655                                if let Some(key) = keyer().into_maybe_key() {
656                                    let key_hash = KeyHash::new(&key);
657
658                                    // Had a bug in tests where:
659                                    // calling client.resource() multiple times
660                                    // previous impl would only run the effect once per client.resource(), 
661                                    // but would still run again on subsequent client.resource() with the same key value
662                                    // - client.resource()
663                                    // - invalidate, starts loading in background
664                                    // - client.resource()
665                                    // - effect runs again, sees resource has value, goes to set it, fires QueryAbortReason::SsrStreamedValueOverride which cancels the actual new query which is loading to replace the invalidated one
666                                    // So have to have a client wide protector, to make sure the effect really only runs once on the client, even if resource() called multiple times.
667                                    if (cfg!(test) || cfg!(not(feature = "ssr"))) && 
668                                        // Preventing memory leak, issue is only for first render:
669                                        (Utc::now() - client_created_at).num_seconds() < 10
670                                    {
671                                        use parking_lot::Mutex;
672
673                                        static ALREADY_SEEN: LazyLock<Mutex<HashMap<(u64, ScopeCacheKey, KeyHash), ()>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
674
675                                        let key = (scope_lookup.scope_id, query_scope_info.cache_key, key_hash);
676                                        let mut guard = ALREADY_SEEN.lock();
677                                        if guard.contains_key(&key) {
678                                            return;
679                                        }
680                                        guard.insert(key, ());
681                                    }
682
683                                    let invalidation_prefix = query_scope.invalidation_prefix(&key);
684
685                                    let mut was_pending = false;
686                                    // Protect against race condition: cancel any frontend query that started already if there was a client side 
687                                    // local resource or prefetch/fetch_query etc that started on the initial client-side ticks:
688                                    if let Some(QueryOrPending::Pending { query_abort_tx, .. }) = scope.get_mut_include_pending(&key_hash)
689                                        && let Some(tx) = query_abort_tx.take() {
690                                            tx.send(QueryAbortReason::SsrStreamedValueOverride).unwrap();
691                                            was_pending = true;
692                                        }
693
694                                    if was_pending || !scope.contains_key(&key_hash) {
695                                        let query = Query::new(
696                                            client_options,
697                                            scope_lookup,
698                                            &query_scope_info,
699                                            invalidation_prefix,
700                                            key_hash,
701                                            MaybeLocal::new(key),
702                                            MaybeLocal::new(MaybeKey::mapped_to_maybe_value(val.clone()).expect("Just checked MaybeKey::mapped_value_is_some() is true")),
703                                            buster_if_uncached.clone(),
704                                            query_options,
705                                            None,
706                                            #[cfg(any(
707                                                all(debug_assertions, feature = "devtools"),
708                                                feature = "devtools-always"
709                                            ))]
710                                            crate::events::Event::new(
711                                                crate::events::EventVariant::StreamedFromServer,
712                                            ),
713                                        );
714                                        scope.insert(key_hash, query);
715                                    }
716                                    scope
717                                        .get(&key_hash)
718                                        .expect("Just inserted")
719                                        .mark_resource_active(resource_id)
720                                }
721                            },
722                        );
723                    }
724                    Some(())
725                } else {
726                    None
727                }
728            }
729        };
730        // Won't run in tests if not isomorphic, but in prod Effect is wanted to not run on server:
731        if cfg!(test) {
732            // Wants Send + Sync on the Codec despite using SendWrapper in the PhantomData.
733            // Can put a SendWrapper around it as this is test only and they're single threaded:
734            let effect = SendWrapper::new(effect);
735            #[allow(clippy::redundant_closure)]
736            Effect::new_isomorphic(move |v| effect(v));
737        } else {
738            Effect::new(effect);
739        }
740
741        resource
742    }
743
744    /// Prevent reactive updates being triggered from the current updater callback fn.
745    ///
746    /// Call [`QueryClient::untrack_update_query`] in the callback of [`QueryClient::update_query`]
747    /// to prevent resources being updated after the callback completes.
748    ///
749    /// This is really useful when e.g:
750    /// - you know nothing changes after reading the existing value in the callback
751    /// - you don't want resources/subs to react and rerender, given nothings changed.
752    ///
753    /// This function is a noop outside the callbacks of:
754    /// - [`QueryClient::update_query`]
755    /// - [`QueryClient::update_query_async`]
756    /// - [`QueryClient::update_query_async_local`]
757    pub fn untrack_update_query(&self) {
758        // Set markers to false, these will all be reset to true automatically where needed.
759        SYNC_TRACK_UPDATE_MARKER.with(|marker| {
760            marker.store(false, std::sync::atomic::Ordering::Relaxed);
761        });
762        // Ignore returned AccessError if didn't exist (not in async ctx):
763        let _ = ASYNC_TRACK_UPDATE_MARKER.try_with(|marker| {
764            marker.store(false, std::sync::atomic::Ordering::Relaxed);
765        });
766    }
767
768    /// Prefetch a query and store it in the cache.
769    ///
770    /// - Entry doesn't exist: fetched and stored in the cache.
771    /// - Entry exists but stale: fetched and updated in the cache.
772    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
773    ///
774    /// If the cached query changes, active resources using the query will be updated.
775    #[track_caller]
776    pub async fn prefetch_query<K, V, M>(
777        &self,
778        query_scope: impl QueryScopeTrait<K, V, M>,
779        key: impl Borrow<K>,
780    ) where
781        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
782        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
783    {
784        self.prefetch_inner(
785            QueryScopeInfo::new(&query_scope),
786            query_scope.invalidation_prefix(key.borrow()),
787            async move |key| MaybeLocal::new(query_scope.query(key).await),
788            key.borrow(),
789            || MaybeLocal::new(key.borrow().clone()),
790            &OwnerChain::new(Owner::current()),
791        )
792        .await
793    }
794
795    /// Prefetch a non-threadsafe query and store it in the cache for this thread only.
796    ///
797    /// - Entry doesn't exist: fetched and stored in the cache.
798    /// - Entry exists but stale: fetched and updated in the cache.
799    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
800    ///
801    /// If the cached query changes, active resources using the query will be updated.
802    #[track_caller]
803    pub async fn prefetch_query_local<K, V, M>(
804        &self,
805        query_scope: impl QueryScopeLocalTrait<K, V, M>,
806        key: impl Borrow<K>,
807    ) where
808        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
809        V: DebugIfDevtoolsEnabled + Clone + 'static,
810    {
811        self.prefetch_inner(
812            QueryScopeInfo::new_local(&query_scope),
813            query_scope.invalidation_prefix(key.borrow()),
814            async move |key| MaybeLocal::new_local(query_scope.query(key).await),
815            key.borrow(),
816            || MaybeLocal::new_local(key.borrow().clone()),
817            &OwnerChain::new(Owner::current()),
818        )
819        .await
820    }
821
822    #[track_caller]
823    async fn prefetch_inner<K, V>(
824        &self,
825        query_scope_info: QueryScopeInfo,
826        invalidation_prefix: Option<Vec<String>>,
827        fetcher: impl AsyncFn(K) -> MaybeLocal<V>,
828        key: &K,
829        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
830        owner_chain: &OwnerChain,
831    ) where
832        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
833        V: DebugIfDevtoolsEnabled + Clone + 'static,
834    {
835        self.scope_lookup
836            .cached_or_fetch(
837                self.options(),
838                query_scope_info.options,
839                None,
840                &query_scope_info,
841                invalidation_prefix,
842                key,
843                fetcher,
844                |info| {
845                    match info.variant {
846                        CachedOrFetchCbInputVariant::CachedUntouched => {
847                            if info.cached.stale() {
848                                return CachedOrFetchCbOutput::Refetch;
849                            }
850                        }
851                        CachedOrFetchCbInputVariant::CachedUpdated => {
852                            // Update anything using it:
853                            info.cached.buster.set(new_buster_id());
854                        }
855                        CachedOrFetchCbInputVariant::Fresh => {}
856                    }
857                    CachedOrFetchCbOutput::Return(())
858                },
859                None,
860                lazy_maybe_local_key,
861                owner_chain,
862            )
863            .await;
864    }
865
866    /// Fetch a query, store it in the cache and return it.
867    ///
868    /// - Entry doesn't exist: fetched and stored in the cache.
869    /// - Entry exists but stale: fetched and updated in the cache.
870    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
871    ///
872    /// If the cached query changes, active resources using the query will be updated.
873    ///
874    /// Returns the up-to-date cached query.
875    #[track_caller]
876    pub async fn fetch_query<K, V, M>(
877        &self,
878        query_scope: impl QueryScopeTrait<K, V, M>,
879        key: impl Borrow<K>,
880    ) -> V
881    where
882        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
883        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
884    {
885        self.fetch_inner(
886            QueryScopeInfo::new(&query_scope),
887            query_scope.invalidation_prefix(key.borrow()),
888            async move |key| MaybeLocal::new(query_scope.query(key).await),
889            key.borrow(),
890            None,
891            || MaybeLocal::new(key.borrow().clone()),
892            &OwnerChain::new(Owner::current()),
893        )
894        .await
895    }
896
897    /// Fetch a non-threadsafe query, store it in the cache for this thread only and return it.
898    ///
899    /// - Entry doesn't exist: fetched and stored in the cache.
900    /// - Entry exists but stale: fetched and updated in the cache.
901    /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
902    ///
903    /// If the cached query changes, active resources using the query will be updated.
904    ///
905    /// Returns the up-to-date cached query.
906    #[track_caller]
907    pub async fn fetch_query_local<K, V, M>(
908        &self,
909        query_scope: impl QueryScopeLocalTrait<K, V, M>,
910        key: impl Borrow<K>,
911    ) -> V
912    where
913        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
914        V: DebugIfDevtoolsEnabled + Clone + 'static,
915    {
916        self.fetch_inner(
917            QueryScopeInfo::new_local(&query_scope),
918            query_scope.invalidation_prefix(key.borrow()),
919            async move |key| MaybeLocal::new_local(query_scope.query(key).await),
920            key.borrow(),
921            None,
922            || MaybeLocal::new_local(key.borrow().clone()),
923            &OwnerChain::new(Owner::current()),
924        )
925        .await
926    }
927
928    #[track_caller]
929    async fn fetch_inner<K, V>(
930        &self,
931        query_scope_info: QueryScopeInfo,
932        invalidation_prefix: Option<Vec<String>>,
933        fetcher: impl AsyncFn(K) -> MaybeLocal<V>,
934        key: &K,
935        maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
936        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
937        owner_chain: &OwnerChain,
938    ) -> V
939    where
940        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
941        V: DebugIfDevtoolsEnabled + Clone + 'static,
942    {
943        self.scope_lookup
944            .cached_or_fetch(
945                self.options(),
946                query_scope_info.options,
947                None,
948                &query_scope_info,
949                invalidation_prefix,
950                key,
951                fetcher,
952                |info| {
953                    match info.variant {
954                        CachedOrFetchCbInputVariant::CachedUntouched => {
955                            if info.cached.stale() {
956                                return CachedOrFetchCbOutput::Refetch;
957                            }
958                        }
959                        CachedOrFetchCbInputVariant::CachedUpdated => {
960                            // Update anything using it:
961                            info.cached.buster.set(new_buster_id());
962                        }
963                        CachedOrFetchCbInputVariant::Fresh => {}
964                    }
965                    CachedOrFetchCbOutput::Return(
966                        // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
967                        info.cached.value_maybe_stale().value_may_panic().clone(),
968                    )
969                },
970                maybe_preheld_fetcher_mutex_guard,
971                lazy_maybe_local_key,
972                owner_chain,
973            )
974            .await
975    }
976
977    /// Set the value of a query in the cache.
978    /// This cached value will be available from all threads and take priority over any locally cached value for this query.
979    ///
980    /// Active resources using the query will be updated.
981    #[track_caller]
982    pub fn set_query<K, V, M>(
983        &self,
984        query_scope: impl QueryScopeTrait<K, V, M>,
985        key: impl Borrow<K>,
986        new_value: V,
987    ) where
988        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
989        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
990    {
991        self.set_inner(
992            QueryScopeInfo::new(&query_scope),
993            query_scope.invalidation_prefix(key.borrow()),
994            key.borrow(),
995            MaybeLocal::new(new_value),
996            || MaybeLocal::new(key.borrow().clone()),
997            true,
998            // like update methods, this does not come from the query function itself,
999            // so should not reset the invalidated status:
1000            ResetInvalidated::NoReset,
1001        )
1002    }
1003
1004    /// Set the value of a non-threadsafe query in the cache for this thread only.
1005    /// This cached value will only be available from this thread, the cache will return empty, unless a nonlocal value is set, from any other thread.
1006    ///
1007    /// Active resources using the query will be updated.
1008    #[track_caller]
1009    pub fn set_query_local<K, V, M>(
1010        &self,
1011        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1012        key: impl Borrow<K>,
1013        new_value: V,
1014    ) where
1015        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1016        V: DebugIfDevtoolsEnabled + Clone + 'static,
1017    {
1018        self.set_inner::<K, V>(
1019            QueryScopeInfo::new_local(&query_scope),
1020            query_scope.invalidation_prefix(key.borrow()),
1021            key.borrow(),
1022            MaybeLocal::new_local(new_value),
1023            || MaybeLocal::new_local(key.borrow().clone()),
1024            true,
1025            // like update methods, this does not come from the query function itself,
1026            // so should not reset the invalidated status:
1027            ResetInvalidated::NoReset,
1028        )
1029    }
1030
1031    #[track_caller]
1032    fn set_inner<K, V>(
1033        &self,
1034        query_scope_info: QueryScopeInfo,
1035        invalidation_prefix: Option<Vec<String>>,
1036        key: &K,
1037        new_value: MaybeLocal<V>,
1038        lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
1039        track: bool,
1040        reset_invalidated: ResetInvalidated,
1041    ) where
1042        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1043        V: DebugIfDevtoolsEnabled + Clone + 'static,
1044    {
1045        let key_hash = KeyHash::new(key.borrow());
1046        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1047            &query_scope_info,
1048            true,
1049            |_| {},
1050            |maybe_scope, _| {
1051                let scope = maybe_scope.expect("provided a default");
1052
1053                // Make sure to look both caches if threadsafe, and prefer threadsafe:
1054                let maybe_cached = if !new_value.is_local() {
1055                    if let Some(threadsafe_existing) = scope.get_mut_threadsafe_only(&key_hash) {
1056                        Some(threadsafe_existing)
1057                    } else {
1058                        scope.get_mut_local_only(&key_hash)
1059                    }
1060                } else {
1061                    scope.get_mut_local_only(&key_hash)
1062                };
1063
1064                if let Some(cached) = maybe_cached {
1065                    cached.set_value(
1066                        new_value,
1067                        track,
1068                        #[cfg(any(
1069                            all(debug_assertions, feature = "devtools"),
1070                            feature = "devtools-always"
1071                        ))]
1072                        crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1073                        reset_invalidated,
1074                    );
1075                } else {
1076                    let query = Query::new(
1077                        self.options(),
1078                        self.scope_lookup,
1079                        &query_scope_info,
1080                        invalidation_prefix,
1081                        key_hash,
1082                        lazy_maybe_local_key(),
1083                        new_value,
1084                        ArcRwSignal::new(new_buster_id()),
1085                        query_scope_info.options,
1086                        None,
1087                        #[cfg(any(
1088                            all(debug_assertions, feature = "devtools"),
1089                            feature = "devtools-always"
1090                        ))]
1091                        crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1092                    );
1093                    scope.insert(key_hash, query);
1094                }
1095            },
1096        );
1097    }
1098
1099    /// Synchronously update the value of a query in the cache with a callback.
1100    ///
1101    /// Active resources using the query will be updated.
1102    ///
1103    /// The callback takes `Option<&mut V>`, will be None if the value is not available in the cache.
1104    ///
1105    /// If you want async and/or always get the value, use [`QueryClient::update_query_async`]/[`QueryClient::update_query_async_local`].
1106    ///
1107    /// Returns the output of the callback.
1108    ///
1109    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1110    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1111    #[track_caller]
1112    pub fn update_query<K, V, T, M>(
1113        &self,
1114        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1115        key: impl Borrow<K>,
1116        modifier: impl FnOnce(Option<&mut V>) -> T,
1117    ) -> T
1118    where
1119        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1120        V: DebugIfDevtoolsEnabled + Clone + 'static,
1121    {
1122        self.update_query_inner::<K, V, T>(
1123            &QueryScopeInfo::new_local(&query_scope),
1124            key.borrow(),
1125            ResetInvalidated::NoReset,
1126            modifier,
1127        )
1128    }
1129
1130    #[track_caller]
1131    fn update_query_inner<K, V, T>(
1132        &self,
1133        query_scope_info: &QueryScopeInfo,
1134        key: &K,
1135        reset_invalidated: ResetInvalidated,
1136        modifier: impl FnOnce(Option<&mut V>) -> T,
1137    ) -> T
1138    where
1139        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1140        V: DebugIfDevtoolsEnabled + Clone + 'static,
1141    {
1142        let key_hash = KeyHash::new(key);
1143        let mut modifier_holder = Some(modifier);
1144
1145        let maybe_return_value = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1146            query_scope_info,
1147            false,
1148            |_| {},
1149            |maybe_scope, _| {
1150                if let Some(scope) = maybe_scope
1151                    && let Some(cached) = scope.get_mut(&key_hash)
1152                {
1153                    let modifier = modifier_holder
1154                        .take()
1155                        .expect("Should never be used more than once. (bug)");
1156                    let return_value = cached.update_value(
1157                        // WONTPANIC: the internals will only supply the value if available from this thread:
1158                        |value| modifier(Some(value.value_mut_may_panic())),
1159                        #[cfg(any(
1160                            all(debug_assertions, feature = "devtools"),
1161                            feature = "devtools-always"
1162                        ))]
1163                        crate::events::Event::new(crate::events::EventVariant::DeclarativeUpdate),
1164                        reset_invalidated,
1165                    );
1166                    return Some(return_value);
1167                }
1168                None
1169            },
1170        );
1171        if let Some(return_value) = maybe_return_value {
1172            return_value
1173        } else {
1174            // Didn't exist:
1175            modifier_holder
1176                .take()
1177                .expect("Should never be used more than once. (bug)")(None)
1178        }
1179    }
1180
1181    /// Asynchronously map a threadsafe query in the cache from one value to another.
1182    ///
1183    /// Unlike [`QueryClient::update_query`], this will fetch the query first, if it doesn't exist.
1184    ///
1185    /// Active resources using the query will be updated.
1186    ///
1187    /// Returns the output of the callback.
1188    ///
1189    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1190    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1191    #[track_caller]
1192    pub async fn update_query_async<'a, K, V, T, M>(
1193        &'a self,
1194        query_scope: impl QueryScopeTrait<K, V, M>,
1195        key: impl Borrow<K>,
1196        mapper: impl AsyncFnOnce(&mut V) -> T,
1197    ) -> T
1198    where
1199        K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
1200        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1201    {
1202        self.update_query_async_inner(
1203            QueryScopeInfo::new(&query_scope),
1204            query_scope.invalidation_prefix(key.borrow()),
1205            async move |key| MaybeLocal::new(query_scope.query(key).await),
1206            key.borrow(),
1207            mapper,
1208            MaybeLocal::new,
1209            || MaybeLocal::new(key.borrow().clone()),
1210            &OwnerChain::new(Owner::current()),
1211        )
1212        .await
1213    }
1214
1215    /// Asynchronously map a non-threadsafe query in the cache from one value to another.
1216    ///
1217    /// Unlike [`QueryClient::update_query`], this will fetch the query first, if it doesn't exist.
1218    ///
1219    /// Active resources using the query will be updated.
1220    ///
1221    /// Returns the output of the callback.
1222    ///
1223    /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1224    /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1225    #[track_caller]
1226    pub async fn update_query_async_local<'a, K, V, T, M>(
1227        &'a self,
1228        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1229        key: impl Borrow<K>,
1230        mapper: impl AsyncFnOnce(&mut V) -> T,
1231    ) -> T
1232    where
1233        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1234        V: DebugIfDevtoolsEnabled + Clone + 'static,
1235    {
1236        self.update_query_async_inner(
1237            QueryScopeInfo::new_local(&query_scope),
1238            query_scope.invalidation_prefix(key.borrow()),
1239            async move |key| MaybeLocal::new_local(query_scope.query(key).await),
1240            key.borrow(),
1241            mapper,
1242            MaybeLocal::new_local,
1243            || MaybeLocal::new_local(key.borrow().clone()),
1244            &OwnerChain::new(Owner::current()),
1245        )
1246        .await
1247    }
1248
1249    #[track_caller]
1250    async fn update_query_async_inner<'a, K, V, T>(
1251        &'a self,
1252        query_scope_info: QueryScopeInfo,
1253        invalidation_prefix: Option<Vec<String>>,
1254        fetcher: impl AsyncFn(K) -> MaybeLocal<V>,
1255        key: &K,
1256        mapper: impl AsyncFnOnce(&mut V) -> T,
1257        into_maybe_local: impl FnOnce(V) -> MaybeLocal<V>,
1258        lazy_maybe_local_key: impl Fn() -> MaybeLocal<K>,
1259        owner_chain: &OwnerChain,
1260    ) -> T
1261    where
1262        K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1263        V: DebugIfDevtoolsEnabled + Clone + 'static,
1264    {
1265        let key_hash = KeyHash::new(key.borrow());
1266
1267        // By holding the fetcher mutex from start to finish, prevent the chance of the value being fetched between the fetch async call and the external user mapper async call and the final set().
1268        let fetcher_mutex = self
1269            .scope_lookup
1270            .fetcher_mutex::<K, V>(key_hash, &query_scope_info);
1271        let fetcher_guard = fetcher_mutex.lock().await;
1272
1273        let mut new_value = self
1274            .fetch_inner(
1275                query_scope_info.clone(),
1276                invalidation_prefix.clone(),
1277                fetcher,
1278                key.borrow(),
1279                Some(&fetcher_guard),
1280                &lazy_maybe_local_key,
1281                owner_chain,
1282            )
1283            .await;
1284
1285        // The fetch will "turn on" is_fetching during it's lifetime, but we also want it during the mapper function:
1286        self.scope_lookup
1287            .with_notify_fetching(query_scope_info.cache_key, key_hash, false, async {
1288                let track = Arc::new(AtomicBool::new(true));
1289
1290                // Will monitor for invalidations during the user async fn, which could take a long time:
1291                let query_abort_rx = self.scope_lookup.prepare_invalidation_channel::<K, V>(
1292                    &query_scope_info,
1293                    key_hash,
1294                    &lazy_maybe_local_key(),
1295                );
1296
1297                let result_fut = ASYNC_TRACK_UPDATE_MARKER
1298                    .scope(track.clone(), async { mapper(&mut new_value).await });
1299
1300                // Call the user async function, but also monitor for invalidations:
1301                // Specifically, we care about if .clear() is called.
1302                // If so, we shouldn't set the value, as it will have been generated with the cleared cached query value.
1303                // If invalidated should still set, because the value will be deemed "less stale", but won't de-invalidate it.
1304                // Don't care about QueryAbortReason::SsrStreamedValueOverride
1305                // as it shouldn't be possible to get to this point before all ssr streaming is done.
1306                let mut cleared_during_user_fn = false;
1307                let result = {
1308                    pin_mut!(result_fut);
1309                    pin_mut!(query_abort_rx);
1310                    match futures::future::select(result_fut, query_abort_rx).await {
1311                        futures::future::Either::Left((result, _query_abort_rx)) => result,
1312                        futures::future::Either::Right((query_abort_reason, result_fut)) => {
1313                            if let Ok(QueryAbortReason::Clear) = query_abort_reason {
1314                                // If the query was cleared, don't set the new value.
1315                                cleared_during_user_fn = true;
1316                            }
1317                            result_fut.await
1318                        }
1319                    }
1320                };
1321
1322                // If the query was cleared, don't set the new value.
1323                if !cleared_during_user_fn {
1324                    // Cover a small edge case:
1325                    // - value in threadsafe
1326                    // - .update_query_async_local called
1327                    // - replacement value .set() called, stored in local cache because no type safety on threadsafe cache
1328                    // - now have 2 versions in both caches
1329                    // by trying to update first, means it'll stick with existing cache if it exists,
1330                    // otherwise doesn't matter and can just be set normally:
1331                    let mut new_value = Some(new_value);
1332                    let updated = self.update_query_inner::<K, V, _>(
1333                        &query_scope_info,
1334                        key,
1335                        // fetch_inner would've reset it if needed, then the update itself shouldn't change the value:
1336                        ResetInvalidated::NoReset,
1337                        |value| {
1338                            if let Some(value) = value {
1339                                // This sync update call itself needs untracking if !track:
1340                                if !track.load(std::sync::atomic::Ordering::Relaxed) {
1341                                    self.untrack_update_query();
1342                                }
1343
1344                                *value = new_value.take().expect("Should be Some");
1345                                true
1346                            } else {
1347                                false
1348                            }
1349                        },
1350                    );
1351                    if !updated {
1352                        self.set_inner::<K, V>(
1353                            query_scope_info,
1354                            invalidation_prefix,
1355                            key.borrow(),
1356                            into_maybe_local(
1357                                new_value
1358                                    .take()
1359                                    .expect("Should be Some, should only be here if not updated"),
1360                            ),
1361                            lazy_maybe_local_key,
1362                            track.load(std::sync::atomic::Ordering::Relaxed),
1363                            // fetch_inner would've reset it if needed, then the update itself shouldn't change the value:
1364                            ResetInvalidated::NoReset,
1365                        );
1366                    }
1367                }
1368
1369                result
1370            })
1371            .await
1372    }
1373
1374    /// Synchronously get a query from the cache, if it exists.
1375    #[track_caller]
1376    pub fn get_cached_query<K, V, M>(
1377        &self,
1378        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1379        key: impl Borrow<K>,
1380    ) -> Option<V>
1381    where
1382        K: DebugIfDevtoolsEnabled + Hash + 'static,
1383        V: DebugIfDevtoolsEnabled + Clone + 'static,
1384    {
1385        self.scope_lookup.with_cached_query::<K, V, _>(
1386            &KeyHash::new(key.borrow()),
1387            &query_scope.cache_key(),
1388            |maybe_cached| {
1389                // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1390                maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1391            },
1392        )
1393    }
1394
1395    /// Synchronously check if a query exists in the cache.
1396    ///
1397    /// Returns `true` if the query exists.
1398    #[track_caller]
1399    pub fn query_exists<K, V, M>(
1400        &self,
1401        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1402        key: impl Borrow<K>,
1403    ) -> bool
1404    where
1405        K: DebugIfDevtoolsEnabled + Hash + 'static,
1406        V: DebugIfDevtoolsEnabled + 'static,
1407    {
1408        let key_hash = KeyHash::new(key.borrow());
1409        self.scope_lookup.with_cached_query::<K, V, _>(
1410            &key_hash,
1411            &query_scope.cache_key(),
1412            |maybe_cached| maybe_cached.is_some(),
1413        )
1414    }
1415
1416    /// Subscribe to the `is_loading` status of a query.
1417    /// The keyer function is reactive to changes in `K`.
1418    ///
1419    /// This is `true` when the query is in the process of fetching data for the first time.
1420    /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1421    ///
1422    /// From a resource perspective:
1423    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1424    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1425    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1426    #[track_caller]
1427    pub fn subscribe_is_loading<K, MaybeKey, V, M>(
1428        &self,
1429        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1430        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1431    ) -> Signal<bool>
1432    where
1433        K: Hash + Send + Sync + 'static,
1434        MaybeKey: QueryMaybeKey<K, V>,
1435        MaybeKey::MappedValue: 'static,
1436        V: 'static,
1437    {
1438        self.subscribe_is_loading_arc(query_scope, keyer).into()
1439    }
1440
1441    /// Subscribe to the `is_loading` status of a query with a non-threadsafe key.
1442    /// The keyer function is reactive to changes in `K`.
1443    ///
1444    /// This is `true` when the query is in the process of fetching data for the first time.
1445    /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1446    ///
1447    /// From a resource perspective:
1448    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1449    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1450    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1451    #[track_caller]
1452    pub fn subscribe_is_loading_local<K, MaybeKey, V, M>(
1453        &self,
1454        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1455        keyer: impl Fn() -> MaybeKey + 'static,
1456    ) -> Signal<bool>
1457    where
1458        K: Hash + 'static,
1459        MaybeKey: QueryMaybeKey<K, V>,
1460        MaybeKey::MappedValue: 'static,
1461        V: 'static,
1462    {
1463        self.subscribe_is_loading_arc_local(query_scope, keyer)
1464            .into()
1465    }
1466
1467    /// Subscribe to the `is_loading` status of a query with a non-threadsafe key.
1468    /// The keyer function is reactive to changes in `K`.
1469    ///
1470    /// This is `true` when the query is in the process of fetching data for the first time.
1471    /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1472    ///
1473    /// From a resource perspective:
1474    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1475    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1476    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1477    #[track_caller]
1478    pub fn subscribe_is_loading_arc_local<K, MaybeKey, V, M>(
1479        &self,
1480        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1481        keyer: impl Fn() -> MaybeKey + 'static,
1482    ) -> ArcSignal<bool>
1483    where
1484        K: Hash + 'static,
1485        MaybeKey: QueryMaybeKey<K, V>,
1486        MaybeKey::MappedValue: 'static,
1487        V: 'static,
1488    {
1489        let keyer = SendWrapper::new(keyer);
1490        self.scope_lookup
1491            .scope_subscriptions_mut()
1492            .add_is_loading_subscription(
1493                query_scope.cache_key(),
1494                MaybeLocal::new_local(ArcSignal::derive(move || {
1495                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1496                })),
1497            )
1498    }
1499
1500    /// Subscribe to the `is_loading` status of a query.
1501    /// The keyer function is reactive to changes in `K`.
1502    ///
1503    /// This is `true` when the query is in the process of fetching data for the first time.
1504    /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1505    ///
1506    /// From a resource perspective:
1507    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1508    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1509    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1510    #[track_caller]
1511    pub fn subscribe_is_loading_arc<K, MaybeKey, V, M>(
1512        &self,
1513        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1514        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1515    ) -> ArcSignal<bool>
1516    where
1517        K: Hash + Send + Sync + 'static,
1518        MaybeKey: QueryMaybeKey<K, V>,
1519        MaybeKey::MappedValue: 'static,
1520        V: 'static,
1521    {
1522        self.scope_lookup
1523            .scope_subscriptions_mut()
1524            .add_is_loading_subscription(
1525                query_scope.cache_key(),
1526                MaybeLocal::new(ArcSignal::derive(move || {
1527                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1528                })),
1529            )
1530    }
1531
1532    /// Subscribe to the `is_fetching` status of a query.
1533    /// The keyer function is reactive to changes in `K`.
1534    ///
1535    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1536    /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1537    ///
1538    /// From a resource perspective:
1539    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1540    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1541    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1542    #[track_caller]
1543    pub fn subscribe_is_fetching<K, MaybeKey, V, M>(
1544        &self,
1545        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1546        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1547    ) -> Signal<bool>
1548    where
1549        K: Hash + Send + Sync + 'static,
1550        MaybeKey: QueryMaybeKey<K, V>,
1551        MaybeKey::MappedValue: 'static,
1552        V: 'static,
1553    {
1554        self.subscribe_is_fetching_arc(query_scope, keyer).into()
1555    }
1556
1557    /// Subscribe to the `is_fetching` status of a query with a non-threadsafe key.
1558    /// The keyer function is reactive to changes in `K`.
1559    ///
1560    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1561    /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1562    ///
1563    /// From a resource perspective:
1564    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1565    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1566    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1567    #[track_caller]
1568    pub fn subscribe_is_fetching_local<K, MaybeKey, V, M>(
1569        &self,
1570        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1571        keyer: impl Fn() -> MaybeKey + 'static,
1572    ) -> Signal<bool>
1573    where
1574        K: Hash + 'static,
1575        MaybeKey: QueryMaybeKey<K, V>,
1576        MaybeKey::MappedValue: 'static,
1577        V: 'static,
1578    {
1579        self.subscribe_is_fetching_arc_local(query_scope, keyer)
1580            .into()
1581    }
1582
1583    /// Subscribe to the `is_fetching` status of a query with a non-threadsafe key.
1584    /// The keyer function is reactive to changes in `K`.
1585    ///
1586    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1587    /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1588    ///
1589    /// From a resource perspective:
1590    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1591    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1592    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1593    #[track_caller]
1594    pub fn subscribe_is_fetching_arc_local<K, MaybeKey, V, M>(
1595        &self,
1596        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1597        keyer: impl Fn() -> MaybeKey + 'static,
1598    ) -> ArcSignal<bool>
1599    where
1600        K: Hash + 'static,
1601        MaybeKey: QueryMaybeKey<K, V>,
1602        MaybeKey::MappedValue: 'static,
1603        V: 'static,
1604    {
1605        let keyer = SendWrapper::new(keyer);
1606        self.scope_lookup
1607            .scope_subscriptions_mut()
1608            .add_is_fetching_subscription(
1609                query_scope.cache_key(),
1610                MaybeLocal::new_local(ArcSignal::derive(move || {
1611                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1612                })),
1613            )
1614    }
1615
1616    /// Subscribe to the `is_fetching` status of a query.
1617    /// The keyer function is reactive to changes in `K`.
1618    ///
1619    /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1620    /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1621    ///
1622    /// From a resource perspective:
1623    /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1624    /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1625    /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1626    #[track_caller]
1627    pub fn subscribe_is_fetching_arc<K, MaybeKey, V, M>(
1628        &self,
1629        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1630        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1631    ) -> ArcSignal<bool>
1632    where
1633        K: Hash + Send + Sync + 'static,
1634        MaybeKey: QueryMaybeKey<K, V>,
1635        MaybeKey::MappedValue: 'static,
1636        V: 'static,
1637    {
1638        self.scope_lookup
1639            .scope_subscriptions_mut()
1640            .add_is_fetching_subscription(
1641                query_scope.cache_key(),
1642                MaybeLocal::new(ArcSignal::derive(move || {
1643                    keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1644                })),
1645            )
1646    }
1647
1648    /// Subscribe to the value of a query.
1649    /// The keyer function is reactive to changes in `K`.
1650    ///
1651    /// This will update whenever the query is created, removed, updated, refetched or set.
1652    ///
1653    /// Compared to a resource:
1654    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.       
1655    #[track_caller]
1656    pub fn subscribe_value<K, MaybeKey, V, M>(
1657        &self,
1658        query_scope: impl QueryScopeTrait<K, V, M>,
1659        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1660    ) -> Signal<Option<V>>
1661    where
1662        K: DebugIfDevtoolsEnabled + Hash + Send + Sync + 'static,
1663        MaybeKey: QueryMaybeKey<K, V>,
1664        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1665        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1666    {
1667        self.subscribe_value_arc(query_scope, keyer).into()
1668    }
1669
1670    /// Subscribe to the value of a non-threadsafe query.
1671    /// The keyer function is reactive to changes in `K`.
1672    ///
1673    /// This will update whenever the query is created, removed, updated, refetched or set.
1674    ///
1675    /// Compared to a resource:
1676    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1677    #[track_caller]
1678    pub fn subscribe_value_local<K, MaybeKey, V, M>(
1679        &self,
1680        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1681        keyer: impl Fn() -> MaybeKey + 'static,
1682    ) -> Signal<Option<V>, LocalStorage>
1683    where
1684        K: DebugIfDevtoolsEnabled + Hash + 'static,
1685        MaybeKey: QueryMaybeKey<K, V>,
1686        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1687        V: DebugIfDevtoolsEnabled + Clone + 'static,
1688    {
1689        self.subscribe_value_arc_local(query_scope, keyer).into()
1690    }
1691
1692    /// Subscribe to the value of a non-threadsafe query.
1693    /// The keyer function is reactive to changes in `K`.
1694    ///
1695    /// This will update whenever the query is created, removed, updated, refetched or set.
1696    ///
1697    /// Compared to a resource:
1698    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1699    #[track_caller]
1700    pub fn subscribe_value_arc_local<K, MaybeKey, V, M>(
1701        &self,
1702        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1703        keyer: impl Fn() -> MaybeKey + 'static,
1704    ) -> ArcLocalSignal<Option<V>>
1705    where
1706        K: DebugIfDevtoolsEnabled + Hash + 'static,
1707        MaybeKey: QueryMaybeKey<K, V>,
1708        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1709        V: DebugIfDevtoolsEnabled + Clone + 'static,
1710    {
1711        let cache_key = query_scope.cache_key();
1712        let keyer = ArcLocalSignal::derive_local(move || {
1713            keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1714        });
1715        let dyn_signal = self
1716            .scope_lookup
1717            .scope_subscriptions_mut()
1718            .add_value_set_updated_or_removed_subscription(
1719                cache_key,
1720                MaybeLocal::new_local({
1721                    let keyer = keyer.clone();
1722                    move || keyer.get()
1723                }),
1724            );
1725
1726        let scope_lookup = self.scope_lookup;
1727        ArcLocalSignal::derive_local(move || {
1728            dyn_signal.track();
1729            if let Some(key) = keyer.read_untracked().as_ref() {
1730                scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1731                    // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1732                    maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1733                })
1734            } else {
1735                None
1736            }
1737        })
1738    }
1739
1740    /// Subscribe to the value of a query.
1741    /// The keyer function is reactive to changes in `K`.
1742    ///
1743    /// This will update whenever the query is created, removed, updated, refetched or set.
1744    ///
1745    /// Compared to a resource:
1746    /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1747    #[track_caller]
1748    pub fn subscribe_value_arc<K, MaybeKey, V, M>(
1749        &self,
1750        query_scope: impl QueryScopeTrait<K, V, M>,
1751        keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1752    ) -> ArcSignal<Option<V>>
1753    where
1754        K: DebugIfDevtoolsEnabled + Hash + Send + Sync + 'static,
1755        MaybeKey: QueryMaybeKey<K, V>,
1756        MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1757        V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1758    {
1759        let cache_key = query_scope.cache_key();
1760        let keyer = ArcSignal::derive(move || keyer().into_maybe_key().map(|k| KeyHash::new(&k)));
1761
1762        let dyn_signal = self
1763            .scope_lookup
1764            .scope_subscriptions_mut()
1765            .add_value_set_updated_or_removed_subscription(
1766                cache_key,
1767                MaybeLocal::new({
1768                    let keyer = keyer.clone();
1769                    move || keyer.get()
1770                }),
1771            );
1772
1773        let scope_lookup = self.scope_lookup;
1774        ArcSignal::derive(move || {
1775            dyn_signal.track();
1776            if let Some(key) = keyer.read_untracked().as_ref() {
1777                scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1778                    // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1779                    maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1780                })
1781            } else {
1782                None
1783            }
1784        })
1785    }
1786
1787    /// Mark a query as stale.
1788    ///
1789    /// Any active resources will refetch in the background, replacing them when ready.
1790    #[track_caller]
1791    pub fn invalidate_query<K, V, M>(
1792        &self,
1793        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1794        key: impl Borrow<K>,
1795    ) -> bool
1796    where
1797        K: DebugIfDevtoolsEnabled + Hash + 'static,
1798        V: DebugIfDevtoolsEnabled + Clone + 'static,
1799    {
1800        let cleared = self.invalidate_queries(query_scope, std::iter::once(key));
1801        !cleared.is_empty()
1802    }
1803
1804    /// Mark multiple queries of a specific type as stale.
1805    ///
1806    /// Any active resources will refetch in the background, replacing them when ready.
1807    #[track_caller]
1808    pub fn invalidate_queries<K, V, KRef, M>(
1809        &self,
1810        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1811        keys: impl IntoIterator<Item = KRef>,
1812    ) -> Vec<KRef>
1813    where
1814        K: DebugIfDevtoolsEnabled + Hash + 'static,
1815        V: DebugIfDevtoolsEnabled + Clone + 'static,
1816        KRef: Borrow<K>,
1817    {
1818        self.invalidate_queries_inner::<K, V, _>(&QueryScopeInfo::new_local(&query_scope), keys)
1819    }
1820
1821    /// Mark one or more queries of a specific type as stale with a callback.
1822    ///
1823    /// When the callback returns `true`, the specific query will be invalidated.
1824    ///
1825    /// Any active resources subscribing to that query will refetch in the background,
1826    /// replacing them when ready.
1827    #[track_caller]
1828    pub fn invalidate_queries_with_predicate<K, V, M>(
1829        &self,
1830        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1831        should_invalidate: impl Fn(&K) -> bool,
1832    ) where
1833        K: DebugIfDevtoolsEnabled + Hash + 'static,
1834        V: DebugIfDevtoolsEnabled + Clone + 'static,
1835    {
1836        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1837            &QueryScopeInfo::new_local(&query_scope),
1838            false,
1839            |_| {},
1840            |maybe_scope, _| {
1841                if let Some(scope) = maybe_scope {
1842                    for query in scope.all_queries_mut_include_pending() {
1843                        if let Some(key) = query.key().value_if_safe()
1844                            && should_invalidate(key)
1845                        {
1846                            query.invalidate(QueryAbortReason::Invalidate);
1847                        }
1848                    }
1849                }
1850            },
1851        )
1852    }
1853
1854    #[track_caller]
1855    pub(crate) fn invalidate_queries_inner<K, V, KRef>(
1856        &self,
1857        query_scope_info: &QueryScopeInfo,
1858        keys: impl IntoIterator<Item = KRef>,
1859    ) -> Vec<KRef>
1860    where
1861        K: DebugIfDevtoolsEnabled + Hash + 'static,
1862        V: DebugIfDevtoolsEnabled + Clone + 'static,
1863        KRef: Borrow<K>,
1864    {
1865        let keys = keys.into_iter().collect::<Vec<_>>();
1866        let key_hashes = keys
1867            .iter()
1868            .map(|key| KeyHash::new(key.borrow()))
1869            .collect::<Vec<_>>();
1870
1871        if key_hashes.is_empty() {
1872            return vec![];
1873        }
1874
1875        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1876            query_scope_info,
1877            false,
1878            |_| {},
1879            |maybe_scope, _| {
1880                let mut invalidated = vec![];
1881                if let Some(scope) = maybe_scope {
1882                    for (key, key_hash) in keys.into_iter().zip(key_hashes.iter()) {
1883                        if let Some(cached) = scope.get_mut_include_pending(key_hash) {
1884                            cached.invalidate(QueryAbortReason::Invalidate);
1885                            invalidated.push(key);
1886                        }
1887                    }
1888                }
1889                invalidated
1890            },
1891        )
1892    }
1893
1894    /// Mark all queries of a specific type as stale.
1895    ///
1896    /// Any active resources will refetch in the background, replacing them when ready.
1897    #[track_caller]
1898    pub fn invalidate_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
1899    where
1900        K: Hash + 'static,
1901        V: Clone + 'static,
1902    {
1903        self.invalidate_query_scope_inner(&query_scope.cache_key())
1904    }
1905
1906    pub(crate) fn invalidate_query_scope_inner(&self, scope_cache_key: &ScopeCacheKey) {
1907        let mut guard = self.scope_lookup.scopes_mut();
1908        if let Some(scope) = guard.get_mut(scope_cache_key) {
1909            scope.invalidate_scope(QueryAbortReason::Invalidate);
1910            for buster in scope.busters() {
1911                buster.try_set(new_buster_id());
1912            }
1913        }
1914    }
1915
1916    /// Mark all queries as stale.
1917    ///
1918    /// Any active resources will refetch in the background, replacing them when ready.
1919    ///
1920    /// To have the cache instantly cleared and all listeners reset to pending, e.g. for user logout,
1921    /// see [`QueryClient::clear`].
1922    #[track_caller]
1923    pub fn invalidate_all_queries(&self) {
1924        for scope in self.scope_lookup.scopes_mut().values_mut() {
1925            let busters = scope.busters();
1926            scope.invalidate_scope(QueryAbortReason::Invalidate);
1927            for buster in busters {
1928                buster.try_set(new_buster_id());
1929            }
1930        }
1931    }
1932
1933    /// Empty the cache, like [`QueryClient::invalidate_all_queries`] except:
1934    /// - the cache is instantly cleared of all queries
1935    /// - All active resources etc are reset to their pending state instantly
1936    ///   until the new query finishes refetching.
1937    ///
1938    /// Useful for e.g. user logout.
1939    ///
1940    /// [`QueryClient::invalidate_all_queries`] on the other hand, will only refetch active queries in the background, replacing them when ready.
1941    #[track_caller]
1942    pub fn clear(&self) {
1943        for scope in self.scope_lookup.scopes_mut().values_mut() {
1944            let busters = scope.busters();
1945            scope.invalidate_scope(QueryAbortReason::Clear);
1946            scope.clear();
1947            for buster in busters {
1948                buster.try_set(new_buster_id());
1949            }
1950        }
1951    }
1952
1953    #[cfg(test)]
1954    /// Clear a specific query key.
1955    #[track_caller]
1956    pub fn clear_query<K, V, M>(
1957        &self,
1958        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1959        key: impl Borrow<K>,
1960    ) -> bool
1961    where
1962        K: DebugIfDevtoolsEnabled + Hash + 'static,
1963        V: DebugIfDevtoolsEnabled + Clone + 'static,
1964    {
1965        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1966            &QueryScopeInfo::new_local(&query_scope),
1967            false,
1968            |_| {},
1969            |maybe_scope, _| {
1970                if let Some(scope) = maybe_scope {
1971                    let key_hash = KeyHash::new(key.borrow());
1972                    if let Some(cached) = scope.get_mut_include_pending(&key_hash) {
1973                        cached.invalidate(QueryAbortReason::Clear);
1974                    }
1975                    let removed = scope.remove_entry(&key_hash);
1976                    // Calling it again just in case because in tests might be in sync cache and non sync cache:
1977                    scope.remove_entry(&KeyHash::new(key.borrow()));
1978                    return removed.is_some();
1979                }
1980                false
1981            },
1982        )
1983    }
1984
1985    #[cfg(test)]
1986    pub(crate) fn size(&self) -> usize {
1987        self.scope_lookup
1988            .scopes()
1989            .values()
1990            .map(|scope| scope.size())
1991            .sum()
1992    }
1993
1994    #[cfg(test)]
1995    pub(crate) fn is_key_invalid<K, V, M>(
1996        &self,
1997        query_scope: impl QueryScopeLocalTrait<K, V, M>,
1998        key: impl Borrow<K>,
1999    ) -> bool
2000    where
2001        K: DebugIfDevtoolsEnabled + Hash + 'static,
2002        V: DebugIfDevtoolsEnabled + Clone + 'static,
2003    {
2004        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2005            &QueryScopeInfo::new_local(&query_scope),
2006            false,
2007            |_| {},
2008            |maybe_scope, _| {
2009                if let Some(scope) = maybe_scope {
2010                    scope
2011                        .get(&KeyHash::new(key.borrow()))
2012                        .map(|query| query.is_invalidated())
2013                        .unwrap_or(false)
2014                } else {
2015                    false
2016                }
2017            },
2018        )
2019    }
2020
2021    #[cfg(test)]
2022    #[allow(dead_code)]
2023    pub(crate) fn mark_key_valid<K, V, M>(
2024        &self,
2025        query_scope: impl QueryScopeLocalTrait<K, V, M>,
2026        key: impl Borrow<K>,
2027    ) where
2028        K: DebugIfDevtoolsEnabled + Hash + 'static,
2029        V: DebugIfDevtoolsEnabled + Clone + 'static,
2030    {
2031        self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2032            &QueryScopeInfo::new_local(&query_scope),
2033            false,
2034            |_| {},
2035            |maybe_scope, _| {
2036                if let Some(scope) = maybe_scope
2037                    && let Some(query) = scope.get_mut(&KeyHash::new(key.borrow()))
2038                {
2039                    query.mark_valid();
2040                }
2041            },
2042        );
2043    }
2044
2045    #[cfg(test)]
2046    pub(crate) fn subscriber_count(&self) -> usize {
2047        self.scope_lookup.scope_subscriptions_mut().count()
2048    }
2049}