leptos_fetch/query_client.rs
1#![allow(ungated_async_fn_track_caller)] // Want it to auto-turn on when stable
2
3use std::{
4 borrow::Borrow,
5 collections::HashMap,
6 fmt::Debug,
7 hash::Hash,
8 sync::{Arc, LazyLock, atomic::AtomicBool},
9};
10
11use chrono::{DateTime, Utc};
12use codee::{Decoder, Encoder};
13use futures::pin_mut;
14use leptos::{
15 prelude::{
16 ArcRwSignal, ArcSignal, Effect, Get, GetUntracked, LocalStorage, Owner, Read,
17 ReadUntracked, Set, Signal, Track, provide_context,
18 },
19 server::{
20 ArcLocalResource, ArcResource, FromEncodedStr, IntoEncodedString, LocalResource, Resource,
21 },
22};
23use send_wrapper::SendWrapper;
24
25use crate::{
26 ArcLocalSignal, QueryOptions,
27 cache::{CachedOrFetchCbInputVariant, CachedOrFetchCbOutput},
28 cache_scope::{QueryAbortReason, QueryOrPending},
29 debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
30 maybe_local::MaybeLocal,
31 query::Query,
32 query_maybe_key::QueryMaybeKey,
33 query_scope::{QueryScopeInfo, QueryScopeLocalTrait, QueryScopeTrait, ScopeCacheKey},
34 resource_drop_guard::ResourceDropGuard,
35 utils::{KeyHash, OwnerChain, ResetInvalidated, new_buster_id, new_resource_id},
36};
37
38use super::cache::ScopeLookup;
39
40#[cfg(not(feature = "rkyv"))]
41pub(crate) type DefaultCodec = codee::string::JsonSerdeCodec;
42
43#[cfg(feature = "rkyv")]
44pub(crate) type DefaultCodec = codee::binary::RkyvCodec;
45
46task_local::task_local! {
47 pub(crate) static ASYNC_TRACK_UPDATE_MARKER: Arc<AtomicBool>;
48}
49
50std::thread_local! {
51 pub(crate) static SYNC_TRACK_UPDATE_MARKER: AtomicBool = const { AtomicBool::new(true) };
52}
53
54/// The [`QueryClient`] stores all query data, and is used to manage queries.
55///
56/// Should be provided via leptos context at the top of the app.
57///
58/// # Example
59///
60/// ```
61/// use leptos::prelude::*;
62/// use leptos_fetch::QueryClient;
63///
64/// #[component]
65/// pub fn App() -> impl IntoView {
66/// QueryClient::new().provide();
67/// // ...
68/// }
69///
70/// #[component]
71/// pub fn MyComponent() -> impl IntoView {
72/// let client: QueryClient = expect_context();
73/// // ...
74/// }
75/// ```
76pub struct QueryClient<Codec = DefaultCodec> {
77 pub(crate) scope_lookup: ScopeLookup,
78 options: QueryOptions,
79 created_at: DateTime<Utc>,
80 _ser: std::marker::PhantomData<SendWrapper<Codec>>,
81}
82
83impl<Codec> Debug for QueryClient<Codec> {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("QueryClient")
86 .field("scope_lookup", &self.scope_lookup)
87 .field("options", &self.options)
88 .field("codec", &std::any::type_name::<Codec>())
89 .finish()
90 }
91}
92
93impl<Codec: 'static> Clone for QueryClient<Codec> {
94 fn clone(&self) -> Self {
95 *self
96 }
97}
98
99impl<Codec: 'static> Copy for QueryClient<Codec> {}
100
101impl Default for QueryClient<DefaultCodec> {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl QueryClient<DefaultCodec> {
108 /// Creates a new [`QueryClient`] with the default codec: [`codee::string::JsonSerdeCodec`].
109 ///
110 /// Call [`QueryClient::set_codec()`] to set a different codec.
111 ///
112 /// Call [`QueryClient::with_options()`] to set non-default options.
113 #[track_caller]
114 pub fn new() -> Self {
115 Self {
116 scope_lookup: ScopeLookup::new(),
117 options: QueryOptions::default(),
118 created_at: Utc::now(),
119 _ser: std::marker::PhantomData,
120 }
121 }
122}
123
124impl<Codec: 'static> QueryClient<Codec> {
125 /// Provide the client to leptos context.
126 /// ```no_run
127 /// use leptos_fetch::QueryClient;
128 /// use leptos::prelude::*;
129 ///
130 /// QueryClient::new().provide();
131 ///
132 /// let client: QueryClient = expect_context();
133 /// ```
134 #[track_caller]
135 pub fn provide(self) -> Self {
136 provide_context(self);
137 self
138 }
139
140 /// **Applies to `ssr` only**
141 ///
142 /// It's possible to use non-json codecs for streaming leptos resources from the backend.
143 /// The default is [`codee::string::JsonSerdeCodec`].
144 ///
145 /// The current `codee` major version is `0.3` and will need to be imported in your project to customize the codec.
146 ///
147 /// E.g. to use [`codee::binary::MsgpackSerdeCodec`](https://docs.rs/codee/latest/codee/binary/struct.MsgpackSerdeCodec.html):
148 /// ```toml
149 /// codee = { version = "0.3", features = ["msgpack_serde"] }
150 /// ```
151 ///
152 /// This is a generic type on the [`QueryClient`], so when calling [`leptos::prelude::expect_context`],
153 /// this type must be specified when not using the default.
154 ///
155 /// A useful pattern is to type alias the client with the custom codec for your whole app:
156 ///
157 /// ```no_run
158 /// use codee::binary::MsgpackSerdeCodec;
159 /// use leptos::prelude::*;
160 /// use leptos_fetch::QueryClient;
161 ///
162 /// type MyQueryClient = QueryClient<MsgpackSerdeCodec>;
163 ///
164 /// // Create and provide to context to make accessible everywhere:
165 /// QueryClient::new().set_codec::<MsgpackSerdeCodec>().provide();
166 ///
167 /// let client: MyQueryClient = expect_context();
168 /// ```
169 #[track_caller]
170 pub fn set_codec<NewCodec>(self) -> QueryClient<NewCodec> {
171 QueryClient {
172 scope_lookup: self.scope_lookup,
173 options: self.options,
174 created_at: self.created_at,
175 _ser: std::marker::PhantomData,
176 }
177 }
178
179 /// Set non-default options to apply to all queries.
180 ///
181 /// These options will be combined with any options for a specific query scope.
182 #[track_caller]
183 pub fn with_options(mut self, options: QueryOptions) -> Self {
184 self.options = options;
185 self
186 }
187
188 /// Read the base [`QueryOptions`] for this [`QueryClient`].
189 ///
190 /// These will be combined with any options for a specific query scope.
191 #[track_caller]
192 pub fn options(&self) -> QueryOptions {
193 self.options
194 }
195
196 /// Provide a signal to globally enable/disable auto refetching of queries
197 /// that have active resources and have set [`QueryOptions::with_refetch_interval`].
198 ///
199 /// Whilst this signal returns `false`, refetches will be skipped.
200 #[track_caller]
201 pub fn with_refetch_enabled_toggle(self, refetch_enabled: impl Into<ArcSignal<bool>>) -> Self {
202 self.scope_lookup.scopes_mut().refetch_enabled = Some(refetch_enabled.into());
203 self
204 }
205
206 /// If set, access the signal that globally enables/disables auto refetching of queries.
207 #[track_caller]
208 pub fn refetch_enabled(&self) -> Option<ArcSignal<bool>> {
209 self.scope_lookup.scopes().refetch_enabled.clone()
210 }
211
212 /// Query with [`LocalResource`]. Local resouces only load data on the client, so can be used with non-threadsafe/serializable data.
213 ///
214 /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
215 #[track_caller]
216 pub fn local_resource<K, MaybeKey, V, M>(
217 &self,
218 query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
219 keyer: impl Fn() -> MaybeKey + 'static,
220 ) -> LocalResource<MaybeKey::MappedValue>
221 where
222 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
223 MaybeKey: QueryMaybeKey<K, V>,
224 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
225 V: DebugIfDevtoolsEnabled + Clone + 'static,
226 {
227 self.arc_local_resource(query_scope, keyer).into()
228 }
229
230 /// Query with [`ArcLocalResource`]. Local resouces only load data on the client, so can be used with non-threadsafe/serializable data.
231 ///
232 /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
233 #[track_caller]
234 pub fn arc_local_resource<K, MaybeKey, V, M>(
235 &self,
236 query_scope: impl QueryScopeLocalTrait<K, V, M> + 'static,
237 keyer: impl Fn() -> MaybeKey + 'static,
238 ) -> ArcLocalResource<MaybeKey::MappedValue>
239 where
240 K: DebugIfDevtoolsEnabled + Hash + Clone + 'static,
241 MaybeKey: QueryMaybeKey<K, V>,
242 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
243 V: DebugIfDevtoolsEnabled + Clone + 'static,
244 {
245 let client = *self;
246 let client_options = self.options();
247 let scope_lookup = self.scope_lookup;
248 let cache_key = query_scope.cache_key();
249 let query_scope_info = QueryScopeInfo::new_local(&query_scope);
250 let query_scope = Arc::new(query_scope);
251 let query_options = query_scope.options();
252 let resource_id = new_resource_id();
253
254 // To call .mark_resource_dropped() when the resource is dropped:
255 let drop_guard = ResourceDropGuard::<K, V>::new(self.scope_lookup, resource_id, cache_key);
256
257 ArcLocalResource::new({
258 move || {
259 let query_scope = query_scope.clone();
260 let query_scope_info = query_scope_info.clone();
261 let maybe_key = keyer().into_maybe_key();
262 let invalidation_prefix = maybe_key
263 .as_ref()
264 .and_then(|key| query_scope.invalidation_prefix(key));
265 let drop_guard = drop_guard.clone();
266 if let Some(key) = maybe_key.as_ref() {
267 drop_guard.set_active_key(KeyHash::new(key));
268 }
269 // Note: cannot hoist outside of resource,
270 // it prevents the resource itself dropping when the owner is held by the resource itself,
271 // here each instance only lasts as long as the query.
272 let owner_chain = OwnerChain::new(Owner::current());
273 async move {
274 if let Some(key) = maybe_key {
275 let value = scope_lookup
276 .cached_or_fetch(
277 client_options,
278 query_options,
279 None,
280 &query_scope_info,
281 invalidation_prefix,
282 &key,
283 {
284 let query_scope = query_scope.clone();
285 async move |key| {
286 MaybeLocal::new_local(query_scope.query(key).await)
287 }
288 },
289 |info| {
290 info.cached.buster.track();
291 info.cached.mark_resource_active(resource_id);
292 match info.variant {
293 CachedOrFetchCbInputVariant::CachedUntouched => {
294 // If stale refetch in the background with the prefetch() function, which'll recognise it's stale, refetch it and invalidate busters:
295 if cfg!(any(test, not(feature = "ssr")))
296 && info.cached.stale()
297 {
298 let key = key.clone();
299 let query_scope = query_scope.clone();
300 let owner_chain = owner_chain.clone();
301 // Just adding the SendWrapper and using spawn() rather than spawn_local() to fix tests:
302 leptos::task::spawn(SendWrapper::new(async move {
303 client
304 .prefetch_inner(
305 QueryScopeInfo::new_local(&query_scope),
306 query_scope
307 .invalidation_prefix(key.borrow()),
308 async move |key| {
309 MaybeLocal::new_local(
310 query_scope.query(key).await,
311 )
312 },
313 key.borrow(),
314 || {
315 MaybeLocal::new_local(
316 key.borrow().clone(),
317 )
318 },
319 &owner_chain,
320 )
321 .await;
322 }));
323 }
324 }
325 CachedOrFetchCbInputVariant::Fresh => {}
326 CachedOrFetchCbInputVariant::CachedUpdated => {
327 panic!("Didn't direct inner to refetch here. (bug)")
328 }
329 }
330 CachedOrFetchCbOutput::Return(
331 // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
332 info.cached.value_maybe_stale().value_may_panic().clone(),
333 )
334 },
335 None,
336 || MaybeLocal::new_local(key.clone()),
337 &owner_chain,
338 )
339 .await;
340 MaybeKey::prepare_mapped_value(Some(value))
341 } else {
342 MaybeKey::prepare_mapped_value(None)
343 }
344 }
345 }
346 })
347 }
348
349 /// Query with [`Resource`].
350 ///
351 /// Resources must be serializable to potentially load in `ssr` and stream to the client.
352 ///
353 /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
354 ///
355 /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
356 #[track_caller]
357 pub fn resource<K, MaybeKey, V, M>(
358 &self,
359 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
360 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
361 ) -> Resource<MaybeKey::MappedValue, Codec>
362 where
363 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
364 MaybeKey: QueryMaybeKey<K, V>,
365 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
366 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
367 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
368 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
369 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
370 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
371 Debug,
372 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
373 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
374 {
375 self.arc_resource_with_options(query_scope, keyer, false)
376 .into()
377 }
378
379 /// Query with a blocking [`Resource`].
380 ///
381 /// Resources must be serializable to potentially load in `ssr` and stream to the client.
382 ///
383 /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
384 ///
385 /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
386 #[track_caller]
387 pub fn resource_blocking<K, MaybeKey, V, M>(
388 &self,
389 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
390 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
391 ) -> Resource<MaybeKey::MappedValue, Codec>
392 where
393 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
394 MaybeKey: QueryMaybeKey<K, V>,
395 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
396 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
397 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
398 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
399 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
400 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
401 Debug,
402 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
403 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
404 {
405 self.arc_resource_with_options(query_scope, keyer, true)
406 .into()
407 }
408
409 /// Query with [`ArcResource`].
410 ///
411 /// Resources must be serializable to potentially load in `ssr` and stream to the client.
412 ///
413 /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
414 ///
415 /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
416 #[track_caller]
417 pub fn arc_resource<K, MaybeKey, V, M>(
418 &self,
419 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
420 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
421 ) -> ArcResource<MaybeKey::MappedValue, Codec>
422 where
423 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
424 MaybeKey: QueryMaybeKey<K, V>,
425 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
426 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
427 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
428 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
429 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
430 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
431 Debug,
432 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
433 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
434 {
435 self.arc_resource_with_options(query_scope, keyer, false)
436 }
437
438 /// Query with a blocking [`ArcResource`].
439 ///
440 /// Resources must be serializable to potentially load in `ssr` and stream to the client.
441 ///
442 /// Resources must be `Send` and `Sync` to be multithreaded in ssr.
443 ///
444 /// If a cached value exists but is stale, the cached value will be initially used, then refreshed in the background, updating once the new value is ready.
445 #[track_caller]
446 pub fn arc_resource_blocking<K, MaybeKey, V, M>(
447 &self,
448 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
449 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
450 ) -> ArcResource<MaybeKey::MappedValue, Codec>
451 where
452 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
453 MaybeKey: QueryMaybeKey<K, V>,
454 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
455 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
456 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
457 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
458 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
459 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
460 Debug,
461 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
462 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
463 {
464 self.arc_resource_with_options(query_scope, keyer, true)
465 }
466
467 #[track_caller]
468 fn arc_resource_with_options<K, MaybeKey, V, M>(
469 &self,
470 query_scope: impl QueryScopeTrait<K, V, M> + Send + Sync + 'static,
471 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
472 blocking: bool,
473 ) -> ArcResource<MaybeKey::MappedValue, Codec>
474 where
475 K: DebugIfDevtoolsEnabled + PartialEq + Hash + Clone + Send + Sync + 'static,
476 MaybeKey: QueryMaybeKey<K, V>,
477 MaybeKey::MappedValue: Clone + Send + Sync + 'static,
478 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
479 Codec: Encoder<MaybeKey::MappedValue> + Decoder<MaybeKey::MappedValue>,
480 <Codec as Encoder<MaybeKey::MappedValue>>::Error: Debug,
481 <Codec as Decoder<MaybeKey::MappedValue>>::Error: Debug,
482 <<Codec as Decoder<MaybeKey::MappedValue>>::Encoded as FromEncodedStr>::DecodingError:
483 Debug,
484 <Codec as Encoder<MaybeKey::MappedValue>>::Encoded: IntoEncodedString,
485 <Codec as Decoder<MaybeKey::MappedValue>>::Encoded: FromEncodedStr,
486 {
487 let client = *self;
488 let client_options = self.options();
489 let cache_key = query_scope.cache_key();
490 let query_scope_info = QueryScopeInfo::new(&query_scope);
491 let query_scope = Arc::new(query_scope);
492 let scope_lookup = self.scope_lookup;
493 let query_options = query_scope.options();
494
495 let buster_if_uncached = ArcRwSignal::new(new_buster_id());
496 let resource_id = new_resource_id();
497
498 // To call .mark_resource_dropped() when the resource is dropped:
499 let drop_guard = ResourceDropGuard::<K, V>::new(self.scope_lookup, resource_id, cache_key);
500
501 let keyer = Arc::new(keyer);
502 let resource = ArcResource::new_with_options(
503 {
504 let buster_if_uncached = buster_if_uncached.clone();
505 let drop_guard = drop_guard.clone();
506 let keyer = keyer.clone();
507 move || {
508 if let Some(key) = keyer().into_maybe_key() {
509 let key_hash = KeyHash::new(&key);
510 drop_guard.set_active_key(key_hash);
511 scope_lookup.with_cached_query::<K, V, _>(
512 &key_hash,
513 &cache_key,
514 |maybe_cached| {
515 if let Some(cached) = maybe_cached {
516 // Buster must be returned for it to be tracked.
517 (Some(key.clone()), cached.buster.get())
518 } else {
519 // Buster must be returned for it to be tracked.
520 (Some(key.clone()), buster_if_uncached.get())
521 }
522 },
523 )
524 } else {
525 (None, buster_if_uncached.get())
526 }
527 }
528 },
529 {
530 let buster_if_uncached = buster_if_uncached.clone();
531 let query_scope_info = query_scope_info.clone();
532 let query_scope = query_scope.clone();
533 move |(maybe_key, last_used_buster)| {
534 let query_scope = query_scope.clone();
535 let query_scope_info = query_scope_info.clone();
536 let invalidation_prefix = maybe_key
537 .as_ref()
538 .and_then(|key| query_scope.invalidation_prefix(key));
539 let buster_if_uncached = buster_if_uncached.clone();
540 let _drop_guard = drop_guard.clone(); // Want the guard around everywhere until the resource is dropped.
541 // Note: cannot hoist outside of resource,
542 // it prevents the resource itself dropping when the owner is held by the resource itself,
543 // here each instance only lasts as long as the query.
544 let owner_chain = OwnerChain::new(Owner::current());
545 async move {
546 if let Some(key) = maybe_key {
547 let value = scope_lookup
548 .cached_or_fetch(
549 client_options,
550 query_options,
551 Some(buster_if_uncached.clone()),
552 &query_scope_info,
553 invalidation_prefix,
554 &key,
555 {
556 let query_scope = query_scope.clone();
557 async move |key| {
558 MaybeLocal::new(query_scope.query(key).await)
559 }
560 },
561 |info| {
562 info.cached.mark_resource_active(resource_id);
563 match info.variant {
564 CachedOrFetchCbInputVariant::CachedUntouched => {
565 // If stale refetch in the background with the prefetch() function, which'll recognise it's stale, refetch it and invalidate busters:
566 if cfg!(any(test, not(feature = "ssr")))
567 && info.cached.stale()
568 {
569 let key = key.clone();
570 let query_scope = query_scope.clone();
571 let owner_chain = owner_chain.clone();
572 leptos::task::spawn(async move {
573 client
574 .prefetch_inner(
575 QueryScopeInfo::new(&query_scope),
576 query_scope.invalidation_prefix(
577 key.borrow(),
578 ),
579 async move |key| {
580 MaybeLocal::new(
581 query_scope
582 .query(key)
583 .await,
584 )
585 },
586 key.borrow(),
587 || {
588 MaybeLocal::new(
589 key.borrow().clone(),
590 )
591 },
592 &owner_chain,
593 )
594 .await;
595 });
596 }
597
598 // Handle edge case where the key function saw it was uncached, so entered here,
599 // but when the cached_or_fetch was acquiring the fetch mutex, someone else cached it,
600 // meaning our key function won't be reactive correctly unless we invalidate it now:
601 if last_used_buster
602 != info.cached.buster.get_untracked()
603 {
604 buster_if_uncached
605 .set(info.cached.buster.get_untracked());
606 }
607 }
608 CachedOrFetchCbInputVariant::Fresh => {}
609 CachedOrFetchCbInputVariant::CachedUpdated => {
610 panic!("Didn't direct inner to refetch here. (bug)")
611 }
612 }
613 CachedOrFetchCbOutput::Return(
614 // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
615 info.cached
616 .value_maybe_stale()
617 .value_may_panic()
618 .clone(),
619 )
620 },
621 None,
622 || MaybeLocal::new(key.clone()),
623 &owner_chain,
624 )
625 .await;
626 MaybeKey::prepare_mapped_value(Some(value))
627 } else {
628 MaybeKey::prepare_mapped_value(None)
629 }
630 }
631 }
632 },
633 blocking,
634 );
635
636 // On the client, want to repopulate the frontend cache, so should write resources to the cache here if they don't exist.
637 // It would be better if in here we could check if the resource was started on the backend/streamed, saves doing most of this if already a frontend resource.
638 let effect = {
639 let resource = resource.clone();
640 let buster_if_uncached = buster_if_uncached.clone();
641 let client_created_at = self.created_at;
642 // Converting to Arc because the tests like the client get dropped even though this persists:
643 move |complete: Option<Option<()>>| {
644 if let Some(Some(())) = complete {
645 return Some(());
646 }
647 if let Some(val) = resource.read().as_ref() {
648 if MaybeKey::mapped_value_is_some(val) {
649 scope_lookup.with_cached_scope_mut::<K, V, _, _>(
650 &query_scope_info,
651 true,
652 |_| {},
653 |maybe_scope, _| {
654 let scope = maybe_scope.expect("provided a default");
655 if let Some(key) = keyer().into_maybe_key() {
656 let key_hash = KeyHash::new(&key);
657
658 // Had a bug in tests where:
659 // calling client.resource() multiple times
660 // previous impl would only run the effect once per client.resource(),
661 // but would still run again on subsequent client.resource() with the same key value
662 // - client.resource()
663 // - invalidate, starts loading in background
664 // - client.resource()
665 // - effect runs again, sees resource has value, goes to set it, fires QueryAbortReason::SsrStreamedValueOverride which cancels the actual new query which is loading to replace the invalidated one
666 // So have to have a client wide protector, to make sure the effect really only runs once on the client, even if resource() called multiple times.
667 if (cfg!(test) || cfg!(not(feature = "ssr"))) &&
668 // Preventing memory leak, issue is only for first render:
669 (Utc::now() - client_created_at).num_seconds() < 10
670 {
671 use parking_lot::Mutex;
672
673 static ALREADY_SEEN: LazyLock<Mutex<HashMap<(u64, ScopeCacheKey, KeyHash), ()>>> = LazyLock::new(|| Mutex::new(HashMap::new()));
674
675 let key = (scope_lookup.scope_id, query_scope_info.cache_key, key_hash);
676 let mut guard = ALREADY_SEEN.lock();
677 if guard.contains_key(&key) {
678 return;
679 }
680 guard.insert(key, ());
681 }
682
683 let invalidation_prefix = query_scope.invalidation_prefix(&key);
684
685 let mut was_pending = false;
686 // Protect against race condition: cancel any frontend query that started already if there was a client side
687 // local resource or prefetch/fetch_query etc that started on the initial client-side ticks:
688 if let Some(QueryOrPending::Pending { query_abort_tx, .. }) = scope.get_mut_include_pending(&key_hash)
689 && let Some(tx) = query_abort_tx.take() {
690 tx.send(QueryAbortReason::SsrStreamedValueOverride).unwrap();
691 was_pending = true;
692 }
693
694 if was_pending || !scope.contains_key(&key_hash) {
695 let query = Query::new(
696 client_options,
697 scope_lookup,
698 &query_scope_info,
699 invalidation_prefix,
700 key_hash,
701 MaybeLocal::new(key),
702 MaybeLocal::new(MaybeKey::mapped_to_maybe_value(val.clone()).expect("Just checked MaybeKey::mapped_value_is_some() is true")),
703 buster_if_uncached.clone(),
704 query_options,
705 None,
706 #[cfg(any(
707 all(debug_assertions, feature = "devtools"),
708 feature = "devtools-always"
709 ))]
710 crate::events::Event::new(
711 crate::events::EventVariant::StreamedFromServer,
712 ),
713 );
714 scope.insert(key_hash, query);
715 }
716 scope
717 .get(&key_hash)
718 .expect("Just inserted")
719 .mark_resource_active(resource_id)
720 }
721 },
722 );
723 }
724 Some(())
725 } else {
726 None
727 }
728 }
729 };
730 // Won't run in tests if not isomorphic, but in prod Effect is wanted to not run on server:
731 if cfg!(test) {
732 // Wants Send + Sync on the Codec despite using SendWrapper in the PhantomData.
733 // Can put a SendWrapper around it as this is test only and they're single threaded:
734 let effect = SendWrapper::new(effect);
735 #[allow(clippy::redundant_closure)]
736 Effect::new_isomorphic(move |v| effect(v));
737 } else {
738 Effect::new(effect);
739 }
740
741 resource
742 }
743
744 /// Prevent reactive updates being triggered from the current updater callback fn.
745 ///
746 /// Call [`QueryClient::untrack_update_query`] in the callback of [`QueryClient::update_query`]
747 /// to prevent resources being updated after the callback completes.
748 ///
749 /// This is really useful when e.g:
750 /// - you know nothing changes after reading the existing value in the callback
751 /// - you don't want resources/subs to react and rerender, given nothings changed.
752 ///
753 /// This function is a noop outside the callbacks of:
754 /// - [`QueryClient::update_query`]
755 /// - [`QueryClient::update_query_async`]
756 /// - [`QueryClient::update_query_async_local`]
757 pub fn untrack_update_query(&self) {
758 // Set markers to false, these will all be reset to true automatically where needed.
759 SYNC_TRACK_UPDATE_MARKER.with(|marker| {
760 marker.store(false, std::sync::atomic::Ordering::Relaxed);
761 });
762 // Ignore returned AccessError if didn't exist (not in async ctx):
763 let _ = ASYNC_TRACK_UPDATE_MARKER.try_with(|marker| {
764 marker.store(false, std::sync::atomic::Ordering::Relaxed);
765 });
766 }
767
768 /// Prefetch a query and store it in the cache.
769 ///
770 /// - Entry doesn't exist: fetched and stored in the cache.
771 /// - Entry exists but stale: fetched and updated in the cache.
772 /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
773 ///
774 /// If the cached query changes, active resources using the query will be updated.
775 #[track_caller]
776 pub async fn prefetch_query<K, V, M>(
777 &self,
778 query_scope: impl QueryScopeTrait<K, V, M>,
779 key: impl Borrow<K>,
780 ) where
781 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
782 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
783 {
784 self.prefetch_inner(
785 QueryScopeInfo::new(&query_scope),
786 query_scope.invalidation_prefix(key.borrow()),
787 async move |key| MaybeLocal::new(query_scope.query(key).await),
788 key.borrow(),
789 || MaybeLocal::new(key.borrow().clone()),
790 &OwnerChain::new(Owner::current()),
791 )
792 .await
793 }
794
795 /// Prefetch a non-threadsafe query and store it in the cache for this thread only.
796 ///
797 /// - Entry doesn't exist: fetched and stored in the cache.
798 /// - Entry exists but stale: fetched and updated in the cache.
799 /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
800 ///
801 /// If the cached query changes, active resources using the query will be updated.
802 #[track_caller]
803 pub async fn prefetch_query_local<K, V, M>(
804 &self,
805 query_scope: impl QueryScopeLocalTrait<K, V, M>,
806 key: impl Borrow<K>,
807 ) where
808 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
809 V: DebugIfDevtoolsEnabled + Clone + 'static,
810 {
811 self.prefetch_inner(
812 QueryScopeInfo::new_local(&query_scope),
813 query_scope.invalidation_prefix(key.borrow()),
814 async move |key| MaybeLocal::new_local(query_scope.query(key).await),
815 key.borrow(),
816 || MaybeLocal::new_local(key.borrow().clone()),
817 &OwnerChain::new(Owner::current()),
818 )
819 .await
820 }
821
822 #[track_caller]
823 async fn prefetch_inner<K, V>(
824 &self,
825 query_scope_info: QueryScopeInfo,
826 invalidation_prefix: Option<Vec<String>>,
827 fetcher: impl AsyncFn(K) -> MaybeLocal<V>,
828 key: &K,
829 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
830 owner_chain: &OwnerChain,
831 ) where
832 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
833 V: DebugIfDevtoolsEnabled + Clone + 'static,
834 {
835 self.scope_lookup
836 .cached_or_fetch(
837 self.options(),
838 query_scope_info.options,
839 None,
840 &query_scope_info,
841 invalidation_prefix,
842 key,
843 fetcher,
844 |info| {
845 match info.variant {
846 CachedOrFetchCbInputVariant::CachedUntouched => {
847 if info.cached.stale() {
848 return CachedOrFetchCbOutput::Refetch;
849 }
850 }
851 CachedOrFetchCbInputVariant::CachedUpdated => {
852 // Update anything using it:
853 info.cached.buster.set(new_buster_id());
854 }
855 CachedOrFetchCbInputVariant::Fresh => {}
856 }
857 CachedOrFetchCbOutput::Return(())
858 },
859 None,
860 lazy_maybe_local_key,
861 owner_chain,
862 )
863 .await;
864 }
865
866 /// Fetch a query, store it in the cache and return it.
867 ///
868 /// - Entry doesn't exist: fetched and stored in the cache.
869 /// - Entry exists but stale: fetched and updated in the cache.
870 /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
871 ///
872 /// If the cached query changes, active resources using the query will be updated.
873 ///
874 /// Returns the up-to-date cached query.
875 #[track_caller]
876 pub async fn fetch_query<K, V, M>(
877 &self,
878 query_scope: impl QueryScopeTrait<K, V, M>,
879 key: impl Borrow<K>,
880 ) -> V
881 where
882 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
883 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
884 {
885 self.fetch_inner(
886 QueryScopeInfo::new(&query_scope),
887 query_scope.invalidation_prefix(key.borrow()),
888 async move |key| MaybeLocal::new(query_scope.query(key).await),
889 key.borrow(),
890 None,
891 || MaybeLocal::new(key.borrow().clone()),
892 &OwnerChain::new(Owner::current()),
893 )
894 .await
895 }
896
897 /// Fetch a non-threadsafe query, store it in the cache for this thread only and return it.
898 ///
899 /// - Entry doesn't exist: fetched and stored in the cache.
900 /// - Entry exists but stale: fetched and updated in the cache.
901 /// - Entry exists but **not** stale: not refreshed, existing cache item remains.
902 ///
903 /// If the cached query changes, active resources using the query will be updated.
904 ///
905 /// Returns the up-to-date cached query.
906 #[track_caller]
907 pub async fn fetch_query_local<K, V, M>(
908 &self,
909 query_scope: impl QueryScopeLocalTrait<K, V, M>,
910 key: impl Borrow<K>,
911 ) -> V
912 where
913 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
914 V: DebugIfDevtoolsEnabled + Clone + 'static,
915 {
916 self.fetch_inner(
917 QueryScopeInfo::new_local(&query_scope),
918 query_scope.invalidation_prefix(key.borrow()),
919 async move |key| MaybeLocal::new_local(query_scope.query(key).await),
920 key.borrow(),
921 None,
922 || MaybeLocal::new_local(key.borrow().clone()),
923 &OwnerChain::new(Owner::current()),
924 )
925 .await
926 }
927
928 #[track_caller]
929 async fn fetch_inner<K, V>(
930 &self,
931 query_scope_info: QueryScopeInfo,
932 invalidation_prefix: Option<Vec<String>>,
933 fetcher: impl AsyncFn(K) -> MaybeLocal<V>,
934 key: &K,
935 maybe_preheld_fetcher_mutex_guard: Option<&futures::lock::MutexGuard<'_, ()>>,
936 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
937 owner_chain: &OwnerChain,
938 ) -> V
939 where
940 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
941 V: DebugIfDevtoolsEnabled + Clone + 'static,
942 {
943 self.scope_lookup
944 .cached_or_fetch(
945 self.options(),
946 query_scope_info.options,
947 None,
948 &query_scope_info,
949 invalidation_prefix,
950 key,
951 fetcher,
952 |info| {
953 match info.variant {
954 CachedOrFetchCbInputVariant::CachedUntouched => {
955 if info.cached.stale() {
956 return CachedOrFetchCbOutput::Refetch;
957 }
958 }
959 CachedOrFetchCbInputVariant::CachedUpdated => {
960 // Update anything using it:
961 info.cached.buster.set(new_buster_id());
962 }
963 CachedOrFetchCbInputVariant::Fresh => {}
964 }
965 CachedOrFetchCbOutput::Return(
966 // WONTPANIC: cached_or_fetch will only output values that are safe on this thread:
967 info.cached.value_maybe_stale().value_may_panic().clone(),
968 )
969 },
970 maybe_preheld_fetcher_mutex_guard,
971 lazy_maybe_local_key,
972 owner_chain,
973 )
974 .await
975 }
976
977 /// Set the value of a query in the cache.
978 /// This cached value will be available from all threads and take priority over any locally cached value for this query.
979 ///
980 /// Active resources using the query will be updated.
981 #[track_caller]
982 pub fn set_query<K, V, M>(
983 &self,
984 query_scope: impl QueryScopeTrait<K, V, M>,
985 key: impl Borrow<K>,
986 new_value: V,
987 ) where
988 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
989 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
990 {
991 self.set_inner(
992 QueryScopeInfo::new(&query_scope),
993 query_scope.invalidation_prefix(key.borrow()),
994 key.borrow(),
995 MaybeLocal::new(new_value),
996 || MaybeLocal::new(key.borrow().clone()),
997 true,
998 // like update methods, this does not come from the query function itself,
999 // so should not reset the invalidated status:
1000 ResetInvalidated::NoReset,
1001 )
1002 }
1003
1004 /// Set the value of a non-threadsafe query in the cache for this thread only.
1005 /// This cached value will only be available from this thread, the cache will return empty, unless a nonlocal value is set, from any other thread.
1006 ///
1007 /// Active resources using the query will be updated.
1008 #[track_caller]
1009 pub fn set_query_local<K, V, M>(
1010 &self,
1011 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1012 key: impl Borrow<K>,
1013 new_value: V,
1014 ) where
1015 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1016 V: DebugIfDevtoolsEnabled + Clone + 'static,
1017 {
1018 self.set_inner::<K, V>(
1019 QueryScopeInfo::new_local(&query_scope),
1020 query_scope.invalidation_prefix(key.borrow()),
1021 key.borrow(),
1022 MaybeLocal::new_local(new_value),
1023 || MaybeLocal::new_local(key.borrow().clone()),
1024 true,
1025 // like update methods, this does not come from the query function itself,
1026 // so should not reset the invalidated status:
1027 ResetInvalidated::NoReset,
1028 )
1029 }
1030
1031 #[track_caller]
1032 fn set_inner<K, V>(
1033 &self,
1034 query_scope_info: QueryScopeInfo,
1035 invalidation_prefix: Option<Vec<String>>,
1036 key: &K,
1037 new_value: MaybeLocal<V>,
1038 lazy_maybe_local_key: impl FnOnce() -> MaybeLocal<K>,
1039 track: bool,
1040 reset_invalidated: ResetInvalidated,
1041 ) where
1042 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1043 V: DebugIfDevtoolsEnabled + Clone + 'static,
1044 {
1045 let key_hash = KeyHash::new(key.borrow());
1046 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1047 &query_scope_info,
1048 true,
1049 |_| {},
1050 |maybe_scope, _| {
1051 let scope = maybe_scope.expect("provided a default");
1052
1053 // Make sure to look both caches if threadsafe, and prefer threadsafe:
1054 let maybe_cached = if !new_value.is_local() {
1055 if let Some(threadsafe_existing) = scope.get_mut_threadsafe_only(&key_hash) {
1056 Some(threadsafe_existing)
1057 } else {
1058 scope.get_mut_local_only(&key_hash)
1059 }
1060 } else {
1061 scope.get_mut_local_only(&key_hash)
1062 };
1063
1064 if let Some(cached) = maybe_cached {
1065 cached.set_value(
1066 new_value,
1067 track,
1068 #[cfg(any(
1069 all(debug_assertions, feature = "devtools"),
1070 feature = "devtools-always"
1071 ))]
1072 crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1073 reset_invalidated,
1074 );
1075 } else {
1076 let query = Query::new(
1077 self.options(),
1078 self.scope_lookup,
1079 &query_scope_info,
1080 invalidation_prefix,
1081 key_hash,
1082 lazy_maybe_local_key(),
1083 new_value,
1084 ArcRwSignal::new(new_buster_id()),
1085 query_scope_info.options,
1086 None,
1087 #[cfg(any(
1088 all(debug_assertions, feature = "devtools"),
1089 feature = "devtools-always"
1090 ))]
1091 crate::events::Event::new(crate::events::EventVariant::DeclarativeSet),
1092 );
1093 scope.insert(key_hash, query);
1094 }
1095 },
1096 );
1097 }
1098
1099 /// Synchronously update the value of a query in the cache with a callback.
1100 ///
1101 /// Active resources using the query will be updated.
1102 ///
1103 /// The callback takes `Option<&mut V>`, will be None if the value is not available in the cache.
1104 ///
1105 /// If you want async and/or always get the value, use [`QueryClient::update_query_async`]/[`QueryClient::update_query_async_local`].
1106 ///
1107 /// Returns the output of the callback.
1108 ///
1109 /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1110 /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1111 #[track_caller]
1112 pub fn update_query<K, V, T, M>(
1113 &self,
1114 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1115 key: impl Borrow<K>,
1116 modifier: impl FnOnce(Option<&mut V>) -> T,
1117 ) -> T
1118 where
1119 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1120 V: DebugIfDevtoolsEnabled + Clone + 'static,
1121 {
1122 self.update_query_inner::<K, V, T>(
1123 &QueryScopeInfo::new_local(&query_scope),
1124 key.borrow(),
1125 ResetInvalidated::NoReset,
1126 modifier,
1127 )
1128 }
1129
1130 #[track_caller]
1131 fn update_query_inner<K, V, T>(
1132 &self,
1133 query_scope_info: &QueryScopeInfo,
1134 key: &K,
1135 reset_invalidated: ResetInvalidated,
1136 modifier: impl FnOnce(Option<&mut V>) -> T,
1137 ) -> T
1138 where
1139 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1140 V: DebugIfDevtoolsEnabled + Clone + 'static,
1141 {
1142 let key_hash = KeyHash::new(key);
1143 let mut modifier_holder = Some(modifier);
1144
1145 let maybe_return_value = self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1146 query_scope_info,
1147 false,
1148 |_| {},
1149 |maybe_scope, _| {
1150 if let Some(scope) = maybe_scope
1151 && let Some(cached) = scope.get_mut(&key_hash)
1152 {
1153 let modifier = modifier_holder
1154 .take()
1155 .expect("Should never be used more than once. (bug)");
1156 let return_value = cached.update_value(
1157 // WONTPANIC: the internals will only supply the value if available from this thread:
1158 |value| modifier(Some(value.value_mut_may_panic())),
1159 #[cfg(any(
1160 all(debug_assertions, feature = "devtools"),
1161 feature = "devtools-always"
1162 ))]
1163 crate::events::Event::new(crate::events::EventVariant::DeclarativeUpdate),
1164 reset_invalidated,
1165 );
1166 return Some(return_value);
1167 }
1168 None
1169 },
1170 );
1171 if let Some(return_value) = maybe_return_value {
1172 return_value
1173 } else {
1174 // Didn't exist:
1175 modifier_holder
1176 .take()
1177 .expect("Should never be used more than once. (bug)")(None)
1178 }
1179 }
1180
1181 /// Asynchronously map a threadsafe query in the cache from one value to another.
1182 ///
1183 /// Unlike [`QueryClient::update_query`], this will fetch the query first, if it doesn't exist.
1184 ///
1185 /// Active resources using the query will be updated.
1186 ///
1187 /// Returns the output of the callback.
1188 ///
1189 /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1190 /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1191 #[track_caller]
1192 pub async fn update_query_async<'a, K, V, T, M>(
1193 &'a self,
1194 query_scope: impl QueryScopeTrait<K, V, M>,
1195 key: impl Borrow<K>,
1196 mapper: impl AsyncFnOnce(&mut V) -> T,
1197 ) -> T
1198 where
1199 K: DebugIfDevtoolsEnabled + Clone + Hash + Send + Sync + 'static,
1200 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1201 {
1202 self.update_query_async_inner(
1203 QueryScopeInfo::new(&query_scope),
1204 query_scope.invalidation_prefix(key.borrow()),
1205 async move |key| MaybeLocal::new(query_scope.query(key).await),
1206 key.borrow(),
1207 mapper,
1208 MaybeLocal::new,
1209 || MaybeLocal::new(key.borrow().clone()),
1210 &OwnerChain::new(Owner::current()),
1211 )
1212 .await
1213 }
1214
1215 /// Asynchronously map a non-threadsafe query in the cache from one value to another.
1216 ///
1217 /// Unlike [`QueryClient::update_query`], this will fetch the query first, if it doesn't exist.
1218 ///
1219 /// Active resources using the query will be updated.
1220 ///
1221 /// Returns the output of the callback.
1222 ///
1223 /// If you decide you don't want to trigger resources and subscribers, e.g. if you know nothing changed,
1224 /// call [`QueryClient::untrack_update_query`] in the callback to prevent reactive updates.
1225 #[track_caller]
1226 pub async fn update_query_async_local<'a, K, V, T, M>(
1227 &'a self,
1228 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1229 key: impl Borrow<K>,
1230 mapper: impl AsyncFnOnce(&mut V) -> T,
1231 ) -> T
1232 where
1233 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1234 V: DebugIfDevtoolsEnabled + Clone + 'static,
1235 {
1236 self.update_query_async_inner(
1237 QueryScopeInfo::new_local(&query_scope),
1238 query_scope.invalidation_prefix(key.borrow()),
1239 async move |key| MaybeLocal::new_local(query_scope.query(key).await),
1240 key.borrow(),
1241 mapper,
1242 MaybeLocal::new_local,
1243 || MaybeLocal::new_local(key.borrow().clone()),
1244 &OwnerChain::new(Owner::current()),
1245 )
1246 .await
1247 }
1248
1249 #[track_caller]
1250 async fn update_query_async_inner<'a, K, V, T>(
1251 &'a self,
1252 query_scope_info: QueryScopeInfo,
1253 invalidation_prefix: Option<Vec<String>>,
1254 fetcher: impl AsyncFn(K) -> MaybeLocal<V>,
1255 key: &K,
1256 mapper: impl AsyncFnOnce(&mut V) -> T,
1257 into_maybe_local: impl FnOnce(V) -> MaybeLocal<V>,
1258 lazy_maybe_local_key: impl Fn() -> MaybeLocal<K>,
1259 owner_chain: &OwnerChain,
1260 ) -> T
1261 where
1262 K: DebugIfDevtoolsEnabled + Clone + Hash + 'static,
1263 V: DebugIfDevtoolsEnabled + Clone + 'static,
1264 {
1265 let key_hash = KeyHash::new(key.borrow());
1266
1267 // By holding the fetcher mutex from start to finish, prevent the chance of the value being fetched between the fetch async call and the external user mapper async call and the final set().
1268 let fetcher_mutex = self
1269 .scope_lookup
1270 .fetcher_mutex::<K, V>(key_hash, &query_scope_info);
1271 let fetcher_guard = fetcher_mutex.lock().await;
1272
1273 let mut new_value = self
1274 .fetch_inner(
1275 query_scope_info.clone(),
1276 invalidation_prefix.clone(),
1277 fetcher,
1278 key.borrow(),
1279 Some(&fetcher_guard),
1280 &lazy_maybe_local_key,
1281 owner_chain,
1282 )
1283 .await;
1284
1285 // The fetch will "turn on" is_fetching during it's lifetime, but we also want it during the mapper function:
1286 self.scope_lookup
1287 .with_notify_fetching(query_scope_info.cache_key, key_hash, false, async {
1288 let track = Arc::new(AtomicBool::new(true));
1289
1290 // Will monitor for invalidations during the user async fn, which could take a long time:
1291 let query_abort_rx = self.scope_lookup.prepare_invalidation_channel::<K, V>(
1292 &query_scope_info,
1293 key_hash,
1294 &lazy_maybe_local_key(),
1295 );
1296
1297 let result_fut = ASYNC_TRACK_UPDATE_MARKER
1298 .scope(track.clone(), async { mapper(&mut new_value).await });
1299
1300 // Call the user async function, but also monitor for invalidations:
1301 // Specifically, we care about if .clear() is called.
1302 // If so, we shouldn't set the value, as it will have been generated with the cleared cached query value.
1303 // If invalidated should still set, because the value will be deemed "less stale", but won't de-invalidate it.
1304 // Don't care about QueryAbortReason::SsrStreamedValueOverride
1305 // as it shouldn't be possible to get to this point before all ssr streaming is done.
1306 let mut cleared_during_user_fn = false;
1307 let result = {
1308 pin_mut!(result_fut);
1309 pin_mut!(query_abort_rx);
1310 match futures::future::select(result_fut, query_abort_rx).await {
1311 futures::future::Either::Left((result, _query_abort_rx)) => result,
1312 futures::future::Either::Right((query_abort_reason, result_fut)) => {
1313 if let Ok(QueryAbortReason::Clear) = query_abort_reason {
1314 // If the query was cleared, don't set the new value.
1315 cleared_during_user_fn = true;
1316 }
1317 result_fut.await
1318 }
1319 }
1320 };
1321
1322 // If the query was cleared, don't set the new value.
1323 if !cleared_during_user_fn {
1324 // Cover a small edge case:
1325 // - value in threadsafe
1326 // - .update_query_async_local called
1327 // - replacement value .set() called, stored in local cache because no type safety on threadsafe cache
1328 // - now have 2 versions in both caches
1329 // by trying to update first, means it'll stick with existing cache if it exists,
1330 // otherwise doesn't matter and can just be set normally:
1331 let mut new_value = Some(new_value);
1332 let updated = self.update_query_inner::<K, V, _>(
1333 &query_scope_info,
1334 key,
1335 // fetch_inner would've reset it if needed, then the update itself shouldn't change the value:
1336 ResetInvalidated::NoReset,
1337 |value| {
1338 if let Some(value) = value {
1339 // This sync update call itself needs untracking if !track:
1340 if !track.load(std::sync::atomic::Ordering::Relaxed) {
1341 self.untrack_update_query();
1342 }
1343
1344 *value = new_value.take().expect("Should be Some");
1345 true
1346 } else {
1347 false
1348 }
1349 },
1350 );
1351 if !updated {
1352 self.set_inner::<K, V>(
1353 query_scope_info,
1354 invalidation_prefix,
1355 key.borrow(),
1356 into_maybe_local(
1357 new_value
1358 .take()
1359 .expect("Should be Some, should only be here if not updated"),
1360 ),
1361 lazy_maybe_local_key,
1362 track.load(std::sync::atomic::Ordering::Relaxed),
1363 // fetch_inner would've reset it if needed, then the update itself shouldn't change the value:
1364 ResetInvalidated::NoReset,
1365 );
1366 }
1367 }
1368
1369 result
1370 })
1371 .await
1372 }
1373
1374 /// Synchronously get a query from the cache, if it exists.
1375 #[track_caller]
1376 pub fn get_cached_query<K, V, M>(
1377 &self,
1378 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1379 key: impl Borrow<K>,
1380 ) -> Option<V>
1381 where
1382 K: DebugIfDevtoolsEnabled + Hash + 'static,
1383 V: DebugIfDevtoolsEnabled + Clone + 'static,
1384 {
1385 self.scope_lookup.with_cached_query::<K, V, _>(
1386 &KeyHash::new(key.borrow()),
1387 &query_scope.cache_key(),
1388 |maybe_cached| {
1389 // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1390 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1391 },
1392 )
1393 }
1394
1395 /// Synchronously check if a query exists in the cache.
1396 ///
1397 /// Returns `true` if the query exists.
1398 #[track_caller]
1399 pub fn query_exists<K, V, M>(
1400 &self,
1401 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1402 key: impl Borrow<K>,
1403 ) -> bool
1404 where
1405 K: DebugIfDevtoolsEnabled + Hash + 'static,
1406 V: DebugIfDevtoolsEnabled + 'static,
1407 {
1408 let key_hash = KeyHash::new(key.borrow());
1409 self.scope_lookup.with_cached_query::<K, V, _>(
1410 &key_hash,
1411 &query_scope.cache_key(),
1412 |maybe_cached| maybe_cached.is_some(),
1413 )
1414 }
1415
1416 /// Subscribe to the `is_loading` status of a query.
1417 /// The keyer function is reactive to changes in `K`.
1418 ///
1419 /// This is `true` when the query is in the process of fetching data for the first time.
1420 /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1421 ///
1422 /// From a resource perspective:
1423 /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1424 /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1425 /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1426 #[track_caller]
1427 pub fn subscribe_is_loading<K, MaybeKey, V, M>(
1428 &self,
1429 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1430 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1431 ) -> Signal<bool>
1432 where
1433 K: Hash + Send + Sync + 'static,
1434 MaybeKey: QueryMaybeKey<K, V>,
1435 MaybeKey::MappedValue: 'static,
1436 V: 'static,
1437 {
1438 self.subscribe_is_loading_arc(query_scope, keyer).into()
1439 }
1440
1441 /// Subscribe to the `is_loading` status of a query with a non-threadsafe key.
1442 /// The keyer function is reactive to changes in `K`.
1443 ///
1444 /// This is `true` when the query is in the process of fetching data for the first time.
1445 /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1446 ///
1447 /// From a resource perspective:
1448 /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1449 /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1450 /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1451 #[track_caller]
1452 pub fn subscribe_is_loading_local<K, MaybeKey, V, M>(
1453 &self,
1454 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1455 keyer: impl Fn() -> MaybeKey + 'static,
1456 ) -> Signal<bool>
1457 where
1458 K: Hash + 'static,
1459 MaybeKey: QueryMaybeKey<K, V>,
1460 MaybeKey::MappedValue: 'static,
1461 V: 'static,
1462 {
1463 self.subscribe_is_loading_arc_local(query_scope, keyer)
1464 .into()
1465 }
1466
1467 /// Subscribe to the `is_loading` status of a query with a non-threadsafe key.
1468 /// The keyer function is reactive to changes in `K`.
1469 ///
1470 /// This is `true` when the query is in the process of fetching data for the first time.
1471 /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1472 ///
1473 /// From a resource perspective:
1474 /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1475 /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1476 /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1477 #[track_caller]
1478 pub fn subscribe_is_loading_arc_local<K, MaybeKey, V, M>(
1479 &self,
1480 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1481 keyer: impl Fn() -> MaybeKey + 'static,
1482 ) -> ArcSignal<bool>
1483 where
1484 K: Hash + 'static,
1485 MaybeKey: QueryMaybeKey<K, V>,
1486 MaybeKey::MappedValue: 'static,
1487 V: 'static,
1488 {
1489 let keyer = SendWrapper::new(keyer);
1490 self.scope_lookup
1491 .scope_subscriptions_mut()
1492 .add_is_loading_subscription(
1493 query_scope.cache_key(),
1494 MaybeLocal::new_local(ArcSignal::derive(move || {
1495 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1496 })),
1497 )
1498 }
1499
1500 /// Subscribe to the `is_loading` status of a query.
1501 /// The keyer function is reactive to changes in `K`.
1502 ///
1503 /// This is `true` when the query is in the process of fetching data for the first time.
1504 /// This is in contrast to `is_fetching`, that is `true` whenever the query is fetching data, including when it's refetching.
1505 ///
1506 /// From a resource perspective:
1507 /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1508 /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1509 /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1510 #[track_caller]
1511 pub fn subscribe_is_loading_arc<K, MaybeKey, V, M>(
1512 &self,
1513 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1514 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1515 ) -> ArcSignal<bool>
1516 where
1517 K: Hash + Send + Sync + 'static,
1518 MaybeKey: QueryMaybeKey<K, V>,
1519 MaybeKey::MappedValue: 'static,
1520 V: 'static,
1521 {
1522 self.scope_lookup
1523 .scope_subscriptions_mut()
1524 .add_is_loading_subscription(
1525 query_scope.cache_key(),
1526 MaybeLocal::new(ArcSignal::derive(move || {
1527 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1528 })),
1529 )
1530 }
1531
1532 /// Subscribe to the `is_fetching` status of a query.
1533 /// The keyer function is reactive to changes in `K`.
1534 ///
1535 /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1536 /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1537 ///
1538 /// From a resource perspective:
1539 /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1540 /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1541 /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1542 #[track_caller]
1543 pub fn subscribe_is_fetching<K, MaybeKey, V, M>(
1544 &self,
1545 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1546 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1547 ) -> Signal<bool>
1548 where
1549 K: Hash + Send + Sync + 'static,
1550 MaybeKey: QueryMaybeKey<K, V>,
1551 MaybeKey::MappedValue: 'static,
1552 V: 'static,
1553 {
1554 self.subscribe_is_fetching_arc(query_scope, keyer).into()
1555 }
1556
1557 /// Subscribe to the `is_fetching` status of a query with a non-threadsafe key.
1558 /// The keyer function is reactive to changes in `K`.
1559 ///
1560 /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1561 /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1562 ///
1563 /// From a resource perspective:
1564 /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1565 /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1566 /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1567 #[track_caller]
1568 pub fn subscribe_is_fetching_local<K, MaybeKey, V, M>(
1569 &self,
1570 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1571 keyer: impl Fn() -> MaybeKey + 'static,
1572 ) -> Signal<bool>
1573 where
1574 K: Hash + 'static,
1575 MaybeKey: QueryMaybeKey<K, V>,
1576 MaybeKey::MappedValue: 'static,
1577 V: 'static,
1578 {
1579 self.subscribe_is_fetching_arc_local(query_scope, keyer)
1580 .into()
1581 }
1582
1583 /// Subscribe to the `is_fetching` status of a query with a non-threadsafe key.
1584 /// The keyer function is reactive to changes in `K`.
1585 ///
1586 /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1587 /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1588 ///
1589 /// From a resource perspective:
1590 /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1591 /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1592 /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1593 #[track_caller]
1594 pub fn subscribe_is_fetching_arc_local<K, MaybeKey, V, M>(
1595 &self,
1596 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1597 keyer: impl Fn() -> MaybeKey + 'static,
1598 ) -> ArcSignal<bool>
1599 where
1600 K: Hash + 'static,
1601 MaybeKey: QueryMaybeKey<K, V>,
1602 MaybeKey::MappedValue: 'static,
1603 V: 'static,
1604 {
1605 let keyer = SendWrapper::new(keyer);
1606 self.scope_lookup
1607 .scope_subscriptions_mut()
1608 .add_is_fetching_subscription(
1609 query_scope.cache_key(),
1610 MaybeLocal::new_local(ArcSignal::derive(move || {
1611 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1612 })),
1613 )
1614 }
1615
1616 /// Subscribe to the `is_fetching` status of a query.
1617 /// The keyer function is reactive to changes in `K`.
1618 ///
1619 /// This is `true` is `true` whenever the query is fetching data, including when it's refetching.
1620 /// This is in contrast to `is_loading`, that is `true` when the query is in the process of fetching data for the first time only.
1621 ///
1622 /// From a resource perspective:
1623 /// - `is_loading=true`, the resource will be in a pending state until ready and implies `is_fetching=true`
1624 /// - `is_fetching=true` + `is_loading=false` means the resource is showing previous data, and will update once new data finishes refetching
1625 /// - `is_fetching=false` means the resource is showing the latest data and implies `is_loading=false`
1626 #[track_caller]
1627 pub fn subscribe_is_fetching_arc<K, MaybeKey, V, M>(
1628 &self,
1629 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1630 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1631 ) -> ArcSignal<bool>
1632 where
1633 K: Hash + Send + Sync + 'static,
1634 MaybeKey: QueryMaybeKey<K, V>,
1635 MaybeKey::MappedValue: 'static,
1636 V: 'static,
1637 {
1638 self.scope_lookup
1639 .scope_subscriptions_mut()
1640 .add_is_fetching_subscription(
1641 query_scope.cache_key(),
1642 MaybeLocal::new(ArcSignal::derive(move || {
1643 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1644 })),
1645 )
1646 }
1647
1648 /// Subscribe to the value of a query.
1649 /// The keyer function is reactive to changes in `K`.
1650 ///
1651 /// This will update whenever the query is created, removed, updated, refetched or set.
1652 ///
1653 /// Compared to a resource:
1654 /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1655 #[track_caller]
1656 pub fn subscribe_value<K, MaybeKey, V, M>(
1657 &self,
1658 query_scope: impl QueryScopeTrait<K, V, M>,
1659 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1660 ) -> Signal<Option<V>>
1661 where
1662 K: DebugIfDevtoolsEnabled + Hash + Send + Sync + 'static,
1663 MaybeKey: QueryMaybeKey<K, V>,
1664 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1665 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1666 {
1667 self.subscribe_value_arc(query_scope, keyer).into()
1668 }
1669
1670 /// Subscribe to the value of a non-threadsafe query.
1671 /// The keyer function is reactive to changes in `K`.
1672 ///
1673 /// This will update whenever the query is created, removed, updated, refetched or set.
1674 ///
1675 /// Compared to a resource:
1676 /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1677 #[track_caller]
1678 pub fn subscribe_value_local<K, MaybeKey, V, M>(
1679 &self,
1680 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1681 keyer: impl Fn() -> MaybeKey + 'static,
1682 ) -> Signal<Option<V>, LocalStorage>
1683 where
1684 K: DebugIfDevtoolsEnabled + Hash + 'static,
1685 MaybeKey: QueryMaybeKey<K, V>,
1686 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1687 V: DebugIfDevtoolsEnabled + Clone + 'static,
1688 {
1689 self.subscribe_value_arc_local(query_scope, keyer).into()
1690 }
1691
1692 /// Subscribe to the value of a non-threadsafe query.
1693 /// The keyer function is reactive to changes in `K`.
1694 ///
1695 /// This will update whenever the query is created, removed, updated, refetched or set.
1696 ///
1697 /// Compared to a resource:
1698 /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1699 #[track_caller]
1700 pub fn subscribe_value_arc_local<K, MaybeKey, V, M>(
1701 &self,
1702 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1703 keyer: impl Fn() -> MaybeKey + 'static,
1704 ) -> ArcLocalSignal<Option<V>>
1705 where
1706 K: DebugIfDevtoolsEnabled + Hash + 'static,
1707 MaybeKey: QueryMaybeKey<K, V>,
1708 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + 'static,
1709 V: DebugIfDevtoolsEnabled + Clone + 'static,
1710 {
1711 let cache_key = query_scope.cache_key();
1712 let keyer = ArcLocalSignal::derive_local(move || {
1713 keyer().into_maybe_key().map(|k| KeyHash::new(&k))
1714 });
1715 let dyn_signal = self
1716 .scope_lookup
1717 .scope_subscriptions_mut()
1718 .add_value_set_updated_or_removed_subscription(
1719 cache_key,
1720 MaybeLocal::new_local({
1721 let keyer = keyer.clone();
1722 move || keyer.get()
1723 }),
1724 );
1725
1726 let scope_lookup = self.scope_lookup;
1727 ArcLocalSignal::derive_local(move || {
1728 dyn_signal.track();
1729 if let Some(key) = keyer.read_untracked().as_ref() {
1730 scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1731 // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1732 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1733 })
1734 } else {
1735 None
1736 }
1737 })
1738 }
1739
1740 /// Subscribe to the value of a query.
1741 /// The keyer function is reactive to changes in `K`.
1742 ///
1743 /// This will update whenever the query is created, removed, updated, refetched or set.
1744 ///
1745 /// Compared to a resource:
1746 /// - This will not trigger a fetch of a query, if it's not in the cache, this will be `None`.
1747 #[track_caller]
1748 pub fn subscribe_value_arc<K, MaybeKey, V, M>(
1749 &self,
1750 query_scope: impl QueryScopeTrait<K, V, M>,
1751 keyer: impl Fn() -> MaybeKey + Send + Sync + 'static,
1752 ) -> ArcSignal<Option<V>>
1753 where
1754 K: DebugIfDevtoolsEnabled + Hash + Send + Sync + 'static,
1755 MaybeKey: QueryMaybeKey<K, V>,
1756 MaybeKey::MappedValue: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1757 V: DebugIfDevtoolsEnabled + Clone + Send + Sync + 'static,
1758 {
1759 let cache_key = query_scope.cache_key();
1760 let keyer = ArcSignal::derive(move || keyer().into_maybe_key().map(|k| KeyHash::new(&k)));
1761
1762 let dyn_signal = self
1763 .scope_lookup
1764 .scope_subscriptions_mut()
1765 .add_value_set_updated_or_removed_subscription(
1766 cache_key,
1767 MaybeLocal::new({
1768 let keyer = keyer.clone();
1769 move || keyer.get()
1770 }),
1771 );
1772
1773 let scope_lookup = self.scope_lookup;
1774 ArcSignal::derive(move || {
1775 dyn_signal.track();
1776 if let Some(key) = keyer.read_untracked().as_ref() {
1777 scope_lookup.with_cached_query::<K, V, _>(key, &cache_key, |maybe_cached| {
1778 // WONTPANIC: with_cached_query will only output values that are safe on this thread:
1779 maybe_cached.map(|cached| cached.value_maybe_stale().value_may_panic().clone())
1780 })
1781 } else {
1782 None
1783 }
1784 })
1785 }
1786
1787 /// Mark a query as stale.
1788 ///
1789 /// Any active resources will refetch in the background, replacing them when ready.
1790 #[track_caller]
1791 pub fn invalidate_query<K, V, M>(
1792 &self,
1793 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1794 key: impl Borrow<K>,
1795 ) -> bool
1796 where
1797 K: DebugIfDevtoolsEnabled + Hash + 'static,
1798 V: DebugIfDevtoolsEnabled + Clone + 'static,
1799 {
1800 let cleared = self.invalidate_queries(query_scope, std::iter::once(key));
1801 !cleared.is_empty()
1802 }
1803
1804 /// Mark multiple queries of a specific type as stale.
1805 ///
1806 /// Any active resources will refetch in the background, replacing them when ready.
1807 #[track_caller]
1808 pub fn invalidate_queries<K, V, KRef, M>(
1809 &self,
1810 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1811 keys: impl IntoIterator<Item = KRef>,
1812 ) -> Vec<KRef>
1813 where
1814 K: DebugIfDevtoolsEnabled + Hash + 'static,
1815 V: DebugIfDevtoolsEnabled + Clone + 'static,
1816 KRef: Borrow<K>,
1817 {
1818 self.invalidate_queries_inner::<K, V, _>(&QueryScopeInfo::new_local(&query_scope), keys)
1819 }
1820
1821 /// Mark one or more queries of a specific type as stale with a callback.
1822 ///
1823 /// When the callback returns `true`, the specific query will be invalidated.
1824 ///
1825 /// Any active resources subscribing to that query will refetch in the background,
1826 /// replacing them when ready.
1827 #[track_caller]
1828 pub fn invalidate_queries_with_predicate<K, V, M>(
1829 &self,
1830 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1831 should_invalidate: impl Fn(&K) -> bool,
1832 ) where
1833 K: DebugIfDevtoolsEnabled + Hash + 'static,
1834 V: DebugIfDevtoolsEnabled + Clone + 'static,
1835 {
1836 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1837 &QueryScopeInfo::new_local(&query_scope),
1838 false,
1839 |_| {},
1840 |maybe_scope, _| {
1841 if let Some(scope) = maybe_scope {
1842 for query in scope.all_queries_mut_include_pending() {
1843 if let Some(key) = query.key().value_if_safe()
1844 && should_invalidate(key)
1845 {
1846 query.invalidate(QueryAbortReason::Invalidate);
1847 }
1848 }
1849 }
1850 },
1851 )
1852 }
1853
1854 #[track_caller]
1855 pub(crate) fn invalidate_queries_inner<K, V, KRef>(
1856 &self,
1857 query_scope_info: &QueryScopeInfo,
1858 keys: impl IntoIterator<Item = KRef>,
1859 ) -> Vec<KRef>
1860 where
1861 K: DebugIfDevtoolsEnabled + Hash + 'static,
1862 V: DebugIfDevtoolsEnabled + Clone + 'static,
1863 KRef: Borrow<K>,
1864 {
1865 let keys = keys.into_iter().collect::<Vec<_>>();
1866 let key_hashes = keys
1867 .iter()
1868 .map(|key| KeyHash::new(key.borrow()))
1869 .collect::<Vec<_>>();
1870
1871 if key_hashes.is_empty() {
1872 return vec![];
1873 }
1874
1875 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1876 query_scope_info,
1877 false,
1878 |_| {},
1879 |maybe_scope, _| {
1880 let mut invalidated = vec![];
1881 if let Some(scope) = maybe_scope {
1882 for (key, key_hash) in keys.into_iter().zip(key_hashes.iter()) {
1883 if let Some(cached) = scope.get_mut_include_pending(key_hash) {
1884 cached.invalidate(QueryAbortReason::Invalidate);
1885 invalidated.push(key);
1886 }
1887 }
1888 }
1889 invalidated
1890 },
1891 )
1892 }
1893
1894 /// Mark all queries of a specific type as stale.
1895 ///
1896 /// Any active resources will refetch in the background, replacing them when ready.
1897 #[track_caller]
1898 pub fn invalidate_query_scope<K, V, M>(&self, query_scope: impl QueryScopeLocalTrait<K, V, M>)
1899 where
1900 K: Hash + 'static,
1901 V: Clone + 'static,
1902 {
1903 self.invalidate_query_scope_inner(&query_scope.cache_key())
1904 }
1905
1906 pub(crate) fn invalidate_query_scope_inner(&self, scope_cache_key: &ScopeCacheKey) {
1907 let mut guard = self.scope_lookup.scopes_mut();
1908 if let Some(scope) = guard.get_mut(scope_cache_key) {
1909 scope.invalidate_scope(QueryAbortReason::Invalidate);
1910 for buster in scope.busters() {
1911 buster.try_set(new_buster_id());
1912 }
1913 }
1914 }
1915
1916 /// Mark all queries as stale.
1917 ///
1918 /// Any active resources will refetch in the background, replacing them when ready.
1919 ///
1920 /// To have the cache instantly cleared and all listeners reset to pending, e.g. for user logout,
1921 /// see [`QueryClient::clear`].
1922 #[track_caller]
1923 pub fn invalidate_all_queries(&self) {
1924 for scope in self.scope_lookup.scopes_mut().values_mut() {
1925 let busters = scope.busters();
1926 scope.invalidate_scope(QueryAbortReason::Invalidate);
1927 for buster in busters {
1928 buster.try_set(new_buster_id());
1929 }
1930 }
1931 }
1932
1933 /// Empty the cache, like [`QueryClient::invalidate_all_queries`] except:
1934 /// - the cache is instantly cleared of all queries
1935 /// - All active resources etc are reset to their pending state instantly
1936 /// until the new query finishes refetching.
1937 ///
1938 /// Useful for e.g. user logout.
1939 ///
1940 /// [`QueryClient::invalidate_all_queries`] on the other hand, will only refetch active queries in the background, replacing them when ready.
1941 #[track_caller]
1942 pub fn clear(&self) {
1943 for scope in self.scope_lookup.scopes_mut().values_mut() {
1944 let busters = scope.busters();
1945 scope.invalidate_scope(QueryAbortReason::Clear);
1946 scope.clear();
1947 for buster in busters {
1948 buster.try_set(new_buster_id());
1949 }
1950 }
1951 }
1952
1953 #[cfg(test)]
1954 /// Clear a specific query key.
1955 #[track_caller]
1956 pub fn clear_query<K, V, M>(
1957 &self,
1958 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1959 key: impl Borrow<K>,
1960 ) -> bool
1961 where
1962 K: DebugIfDevtoolsEnabled + Hash + 'static,
1963 V: DebugIfDevtoolsEnabled + Clone + 'static,
1964 {
1965 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
1966 &QueryScopeInfo::new_local(&query_scope),
1967 false,
1968 |_| {},
1969 |maybe_scope, _| {
1970 if let Some(scope) = maybe_scope {
1971 let key_hash = KeyHash::new(key.borrow());
1972 if let Some(cached) = scope.get_mut_include_pending(&key_hash) {
1973 cached.invalidate(QueryAbortReason::Clear);
1974 }
1975 let removed = scope.remove_entry(&key_hash);
1976 // Calling it again just in case because in tests might be in sync cache and non sync cache:
1977 scope.remove_entry(&KeyHash::new(key.borrow()));
1978 return removed.is_some();
1979 }
1980 false
1981 },
1982 )
1983 }
1984
1985 #[cfg(test)]
1986 pub(crate) fn size(&self) -> usize {
1987 self.scope_lookup
1988 .scopes()
1989 .values()
1990 .map(|scope| scope.size())
1991 .sum()
1992 }
1993
1994 #[cfg(test)]
1995 pub(crate) fn is_key_invalid<K, V, M>(
1996 &self,
1997 query_scope: impl QueryScopeLocalTrait<K, V, M>,
1998 key: impl Borrow<K>,
1999 ) -> bool
2000 where
2001 K: DebugIfDevtoolsEnabled + Hash + 'static,
2002 V: DebugIfDevtoolsEnabled + Clone + 'static,
2003 {
2004 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2005 &QueryScopeInfo::new_local(&query_scope),
2006 false,
2007 |_| {},
2008 |maybe_scope, _| {
2009 if let Some(scope) = maybe_scope {
2010 scope
2011 .get(&KeyHash::new(key.borrow()))
2012 .map(|query| query.is_invalidated())
2013 .unwrap_or(false)
2014 } else {
2015 false
2016 }
2017 },
2018 )
2019 }
2020
2021 #[cfg(test)]
2022 #[allow(dead_code)]
2023 pub(crate) fn mark_key_valid<K, V, M>(
2024 &self,
2025 query_scope: impl QueryScopeLocalTrait<K, V, M>,
2026 key: impl Borrow<K>,
2027 ) where
2028 K: DebugIfDevtoolsEnabled + Hash + 'static,
2029 V: DebugIfDevtoolsEnabled + Clone + 'static,
2030 {
2031 self.scope_lookup.with_cached_scope_mut::<K, V, _, _>(
2032 &QueryScopeInfo::new_local(&query_scope),
2033 false,
2034 |_| {},
2035 |maybe_scope, _| {
2036 if let Some(scope) = maybe_scope
2037 && let Some(query) = scope.get_mut(&KeyHash::new(key.borrow()))
2038 {
2039 query.mark_valid();
2040 }
2041 },
2042 );
2043 }
2044
2045 #[cfg(test)]
2046 pub(crate) fn subscriber_count(&self) -> usize {
2047 self.scope_lookup.scope_subscriptions_mut().count()
2048 }
2049}