leptos_fetch/
lib.rs

1#![allow(clippy::module_inception)]
2#![allow(clippy::type_complexity)]
3#![allow(clippy::too_many_arguments)]
4#![warn(clippy::disallowed_types)]
5#![warn(missing_docs)]
6#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
7// When docs auto created for docs.rs, will include features, given docs.rs uses nightly by default:
8#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_cfg))]
9
10mod arc_local_signal;
11mod cache;
12mod cache_scope;
13mod debug_if_devtools_enabled;
14#[cfg(any(
15    all(debug_assertions, feature = "devtools"),
16    feature = "devtools-always"
17))]
18mod events;
19mod global;
20mod maybe_local;
21mod no_reactive_diagnostics_future;
22mod pagination;
23mod query;
24mod query_client;
25mod query_maybe_key;
26mod query_options;
27mod query_scope;
28mod resource_drop_guard;
29#[cfg(any(
30    all(debug_assertions, feature = "devtools"),
31    feature = "devtools-always"
32))]
33mod subs_client;
34mod subs_scope;
35mod trie;
36mod utils;
37mod value_with_callbacks;
38
39#[cfg(any(feature = "devtools", feature = "devtools-always"))]
40mod dev_tools;
41#[cfg(any(feature = "devtools", feature = "devtools-always"))]
42pub use dev_tools::QueryDevtools;
43
44pub use arc_local_signal::*;
45pub use pagination::*;
46pub use query_client::*;
47pub use query_options::*;
48pub use query_scope::{QueryScope, QueryScopeLocal};
49
50#[cfg(test)]
51mod test {
52    use std::{
53        collections::HashMap,
54        fmt::Debug,
55        hash::Hash,
56        marker::PhantomData,
57        ptr::NonNull,
58        sync::{
59            Arc,
60            atomic::{AtomicBool, AtomicUsize, Ordering},
61        },
62    };
63
64    use futures::future::Either;
65    use hydration_context::{
66        PinnedFuture, PinnedStream, SerializedDataId, SharedContext, SsrSharedContext,
67    };
68
69    use any_spawner::Executor;
70    use leptos::{error::ErrorId, prelude::*};
71
72    use rstest::*;
73
74    use crate::{global::does_scope_id_exist, query_scope::QueryScopeLocalTrait, utils::OnDrop};
75
76    use super::*;
77
78    pub struct MockHydrateSharedContext {
79        id: AtomicUsize,
80        is_hydrating: AtomicBool,
81        during_hydration: AtomicBool,
82
83        // CUSTOM_TO_MOCK:
84
85        // errors: LazyLock<Vec<(SerializedDataId, ErrorId, Error)>>,
86        // incomplete: LazyLock<Vec<SerializedDataId>>,
87        resolved_resources: Vec<(SerializedDataId, String)>,
88    }
89
90    impl MockHydrateSharedContext {
91        pub async fn new(ssr_ctx: Option<&SsrSharedContext>) -> Self {
92            Self {
93                id: AtomicUsize::new(0),
94                is_hydrating: AtomicBool::new(true),
95                during_hydration: AtomicBool::new(true),
96                // errors: LazyLock::new(serialized_errors),
97                // incomplete: Lazy::new(incomplete_chunks),
98                resolved_resources: if let Some(ssr_ctx) = ssr_ctx {
99                    ssr_ctx.consume_buffers().await
100                } else {
101                    vec![]
102                },
103            }
104        }
105    }
106
107    impl Debug for MockHydrateSharedContext {
108        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109            f.debug_struct("MockHydrateSharedContext").finish()
110        }
111    }
112
113    impl SharedContext for MockHydrateSharedContext {
114        fn is_browser(&self) -> bool {
115            true
116        }
117
118        fn next_id(&self) -> SerializedDataId {
119            let id = self.id.fetch_add(1, Ordering::Relaxed);
120            SerializedDataId::new(id)
121        }
122
123        fn write_async(&self, _id: SerializedDataId, _fut: PinnedFuture<String>) {}
124
125        fn read_data(&self, id: &SerializedDataId) -> Option<String> {
126            self.resolved_resources
127                .get(id.clone().into_inner())
128                .map(|(_, data)| data.to_string())
129        }
130
131        fn await_data(&self, _id: &SerializedDataId) -> Option<String> {
132            todo!()
133        }
134
135        fn pending_data(&self) -> Option<PinnedStream<String>> {
136            None
137        }
138
139        fn during_hydration(&self) -> bool {
140            self.during_hydration.load(Ordering::Relaxed)
141        }
142
143        fn hydration_complete(&self) {
144            self.during_hydration.store(false, Ordering::Relaxed)
145        }
146
147        fn get_is_hydrating(&self) -> bool {
148            self.is_hydrating.load(Ordering::Relaxed)
149        }
150
151        fn set_is_hydrating(&self, is_hydrating: bool) {
152            self.is_hydrating.store(is_hydrating, Ordering::Relaxed)
153        }
154
155        fn errors(&self, _boundary_id: &SerializedDataId) -> Vec<(ErrorId, leptos::error::Error)> {
156            vec![]
157            // self.errors
158            //     .iter()
159            //     .filter_map(|(boundary, id, error)| {
160            //         if boundary == boundary_id {
161            //             Some((id.clone(), error.clone()))
162            //         } else {
163            //             None
164            //         }
165            //     })
166            //     .collect()
167        }
168
169        #[inline(always)]
170        fn register_error(
171            &self,
172            _error_boundary: SerializedDataId,
173            _error_id: ErrorId,
174            _error: leptos::error::Error,
175        ) {
176        }
177
178        #[inline(always)]
179        fn seal_errors(&self, _boundary_id: &SerializedDataId) {}
180
181        fn take_errors(&self) -> Vec<(SerializedDataId, ErrorId, leptos::error::Error)> {
182            // self.errors.clone()
183            vec![]
184        }
185
186        #[inline(always)]
187        fn defer_stream(&self, _wait_for: PinnedFuture<()>) {}
188
189        #[inline(always)]
190        fn await_deferred(&self) -> Option<PinnedFuture<()>> {
191            None
192        }
193
194        #[inline(always)]
195        fn set_incomplete_chunk(&self, _id: SerializedDataId) {}
196
197        fn get_incomplete_chunk(&self, _id: &SerializedDataId) -> bool {
198            // self.incomplete.iter().any(|entry| entry == id)
199            false
200        }
201    }
202
203    macro_rules! prep_server {
204        () => {{
205            _ = Executor::init_tokio();
206            let ssr_ctx = Arc::new(SsrSharedContext::new());
207            let owner = Owner::new_root(Some(ssr_ctx.clone()));
208            owner.set();
209            provide_context(crate::test::ExampleCtx);
210            let client = QueryClient::new();
211            (client, ssr_ctx, owner)
212        }};
213    }
214    pub(crate) use prep_server;
215
216    macro_rules! prep_client {
217        () => {{
218            _ = Executor::init_tokio();
219            let owner = Owner::new_root(Some(Arc::new(
220                crate::test::MockHydrateSharedContext::new(None).await,
221            )));
222            owner.set();
223            provide_context(crate::test::ExampleCtx);
224            let client = QueryClient::new();
225            (client, owner)
226        }};
227        ($ssr_ctx:expr) => {{
228            _ = Executor::init_tokio();
229            let owner = Owner::new_root(Some(Arc::new(
230                crate::test::MockHydrateSharedContext::new(Some(&$ssr_ctx)).await,
231            )));
232            owner.set();
233            provide_context(crate::test::ExampleCtx);
234            let client = QueryClient::new();
235            (client, owner)
236        }};
237    }
238    pub(crate) use prep_client;
239
240    macro_rules! prep_vari {
241        ($server:expr) => {
242            if $server {
243                let (client, ssr_ctx, owner) = crate::test::prep_server!();
244                (client, Some(ssr_ctx), owner)
245            } else {
246                let (client, owner) = crate::test::prep_client!();
247                (client, None, owner)
248            }
249        };
250    }
251    pub(crate) use prep_vari;
252
253    macro_rules! tick {
254        () => {
255            // Executor::poll_local();
256            // futures::executor::block_on(Executor::tick());
257            Executor::tick().await;
258            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
259        };
260    }
261    pub(crate) use tick;
262
263    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
264    enum ResourceType {
265        Local,
266        Normal,
267        Blocking,
268    }
269
270    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
271    enum InvalidationType {
272        Query,
273        Scope,
274        Predicate,
275        All,
276        Clear,
277    }
278
279    impl InvalidationType {
280        fn invalidate<K, V, M>(
281            &self,
282            client: &QueryClient,
283            query_scope: impl QueryScopeLocalTrait<K, V, M>,
284            key: &K,
285        ) where
286            K: Debug + Hash + PartialEq + Eq + Clone + 'static,
287            V: Debug + Clone + 'static,
288        {
289            match self {
290                InvalidationType::Query => {
291                    client.invalidate_query(query_scope, key);
292                }
293                InvalidationType::Scope => {
294                    client.invalidate_query_scope(query_scope);
295                }
296                InvalidationType::Predicate => {
297                    client
298                        .invalidate_queries_with_predicate(query_scope, |test_key| test_key == key);
299                }
300                InvalidationType::All => {
301                    client.invalidate_all_queries();
302                }
303                InvalidationType::Clear => {
304                    client.clear();
305                }
306            }
307        }
308    }
309
310    macro_rules! vari_new_resource_with_cb {
311        ($cb:ident, $client:expr, $fetcher:expr, $keyer:expr, $resource_type:expr, $arc:expr) => {
312            match ($resource_type, $arc) {
313                (ResourceType::Local, true) => {
314                    $cb!(|| $client.arc_local_resource($fetcher, $keyer))
315                }
316                (ResourceType::Local, false) => {
317                    $cb!(|| $client.local_resource($fetcher, $keyer))
318                }
319                (ResourceType::Normal, true) => {
320                    $cb!(|| $client.arc_resource($fetcher, $keyer))
321                }
322                (ResourceType::Normal, false) => {
323                    $cb!(|| $client.resource($fetcher, $keyer))
324                }
325                (ResourceType::Blocking, true) => {
326                    $cb!(|| $client.arc_resource_blocking($fetcher, $keyer))
327                }
328                (ResourceType::Blocking, false) => {
329                    $cb!(|| $client.resource_blocking($fetcher, $keyer))
330                }
331            }
332        };
333    }
334
335    #[derive(Clone, Copy, Debug)]
336    pub struct ExampleCtx;
337
338    const DEFAULT_FETCHER_MS: u64 = 30;
339    fn default_fetcher() -> (QueryScope<u64, u64>, Arc<AtomicUsize>) {
340        let fetch_calls = Arc::new(AtomicUsize::new(0));
341        let fetcher_src = {
342            let fetch_calls = fetch_calls.clone();
343            move |key: u64| {
344                let fetch_calls = fetch_calls.clone();
345                expect_context::<ExampleCtx>();
346                async move {
347                    expect_context::<ExampleCtx>();
348                    tokio::time::sleep(tokio::time::Duration::from_millis(DEFAULT_FETCHER_MS))
349                        .await;
350                    expect_context::<ExampleCtx>();
351                    fetch_calls.fetch_add(1, Ordering::Relaxed);
352                    key * 2
353                }
354            }
355        };
356        (QueryScope::new(fetcher_src), fetch_calls)
357    }
358
359    pub fn identify_parking_lot_deadlocks() {
360        static ONCE: std::sync::Once = std::sync::Once::new();
361        ONCE.call_once(|| {
362            std::thread::spawn(move || {
363                loop {
364                    std::thread::sleep(std::time::Duration::from_secs(5));
365                    let deadlocks = parking_lot::deadlock::check_deadlock();
366                    if deadlocks.is_empty() {
367                        continue;
368                    }
369
370                    println!("{} deadlocks detected", deadlocks.len());
371                    for (i, threads) in deadlocks.iter().enumerate() {
372                        println!("Deadlock #{i}");
373                        for t in threads {
374                            println!("Thread Id {:#?}", t.thread_id());
375                            println!("{:#?}", t.backtrace());
376                        }
377                    }
378                }
379            });
380        });
381    }
382
383    #[rstest]
384    #[tokio::test]
385    async fn test_codecs(
386        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
387        #[values(false, true)] arc: bool,
388        #[values(false, true)] server_ctx: bool,
389    ) {
390        identify_parking_lot_deadlocks();
391        tokio::task::LocalSet::new()
392            .run_until(async move {
393                let (fetcher, _fetch_calls) = default_fetcher();
394
395                let (client_default, _guard, _owner) = prep_vari!(server_ctx);
396                let client_custom =
397                    QueryClient::new().set_codec::<codee::binary::FromToBytesCodec>();
398                use_context::<QueryClient>();
399                use_context::<QueryClient<codee::binary::FromToBytesCodec>>();
400
401                macro_rules! check {
402                    ($get_resource:expr) => {{
403                        let resource = $get_resource();
404                        // On the server cannot actually run local resources:
405                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
406                            assert_eq!(resource.await, 4);
407                        }
408                    }};
409                }
410
411                vari_new_resource_with_cb!(
412                    check,
413                    client_default,
414                    fetcher.clone(),
415                    move || 2,
416                    resource_type,
417                    arc
418                );
419                vari_new_resource_with_cb!(
420                    check,
421                    client_custom,
422                    fetcher.clone(),
423                    move || 2,
424                    resource_type,
425                    arc
426                );
427            })
428            .await;
429    }
430
431    #[rstest]
432    #[tokio::test]
433    async fn test_no_query_args() {
434        identify_parking_lot_deadlocks();
435        tokio::task::LocalSet::new()
436            .run_until(async move {
437                let (client, _guard, _owner) = prep_vari!(false);
438
439                async fn fn_no_arg() -> &'static str {
440                    "no_arg"
441                }
442
443                async fn fn_with_arg(arg: &'static str) -> &'static str {
444                    arg
445                }
446
447                assert_eq!(client.fetch_query(fn_no_arg, ()).await, "no_arg");
448                assert_eq!(
449                    client.fetch_query(fn_with_arg, "with_arg").await,
450                    "with_arg"
451                );
452
453                assert_eq!(
454                    client.fetch_query(QueryScope::new(fn_no_arg), ()).await,
455                    "no_arg"
456                );
457                assert_eq!(
458                    client
459                        .fetch_query(QueryScope::new(fn_with_arg), "with_arg")
460                        .await,
461                    "with_arg"
462                );
463
464                assert_eq!(
465                    client
466                        .fetch_query_local(QueryScopeLocal::new(fn_no_arg), ())
467                        .await,
468                    "no_arg"
469                );
470                assert_eq!(
471                    client
472                        .fetch_query_local(QueryScopeLocal::new(fn_with_arg), "with_arg")
473                        .await,
474                    "with_arg"
475                );
476            })
477            .await;
478    }
479
480    /// Local and non-local values should externally be seen as the same cache.
481    /// On the same thread they should both use the cached value.
482    /// On a different thread, locally cached values shouldn't panic, should just be treated like they don't exist.
483    #[rstest]
484    #[tokio::test]
485    async fn test_shared_cache() {
486        identify_parking_lot_deadlocks();
487        tokio::task::LocalSet::new()
488            .run_until(async move {
489                let (fetcher, _fetch_calls) = default_fetcher();
490                let (client, _guard, _owner) = prep_vari!(false);
491
492                // Locally set value to 1:
493                client.set_query_local(&fetcher, 2, 1);
494                assert_eq!(client.get_cached_query(&fetcher, 2), Some(1));
495
496                // Try and get from a different thread, shouldn't try and touch the local cache, should say uncached:
497                std::thread::spawn({
498                    let fetcher = fetcher.clone();
499                    move || {
500                        tokio::runtime::Builder::new_current_thread()
501                            .build()
502                            .unwrap()
503                            .block_on(async move {
504                                // Should be seen as uncached:
505                                assert_eq!(client.get_cached_query(&fetcher, 2), None);
506
507                                // Set nonlocally to 3, set nonlocally to 2:
508                                client.set_query(&fetcher, 2, 3);
509                                client.set_query_local(&fetcher, 2, 2);
510                            });
511                    }
512                })
513                .join()
514                .unwrap();
515                // Should ignore the local value from the different thread, and get the nonlocal value of 3:
516                assert_eq!(client.get_cached_query(&fetcher, 2), Some(3));
517
518                // A clone of the fetcher should still be seen as the same cache:
519                let fetcher = fetcher.clone();
520                assert_eq!(client.get_cached_query(&fetcher, 2), Some(3));
521
522                // Likewise with the same closure passed into a new scope:
523                let (fetcher_2, _fetcher_2_calls) = default_fetcher();
524                assert_eq!(client.get_cached_query(&fetcher_2, 2), Some(3));
525
526                // But a new closure should be seen as a new cache:
527                let fetcher = QueryScope::new(move |key| {
528                    let fetcher = fetcher.clone();
529                    async move { query_scope::QueryScopeTrait::query(&fetcher, key).await }
530                });
531                assert_eq!(client.get_cached_query(&fetcher, 2), None);
532            })
533            .await;
534    }
535
536    /// Confirm scopes cleanup when the outermost owner is cleaned up:
537    #[rstest]
538    #[tokio::test]
539    async fn test_scope_cleanup() {
540        identify_parking_lot_deadlocks();
541        tokio::task::LocalSet::new()
542            .run_until(async move {
543                let (client, _guard, _owner) = prep_vari!(false);
544                assert!(does_scope_id_exist(
545                    client.untyped_client.scope_lookup.scope_id
546                ));
547                drop(_owner);
548                assert!(!does_scope_id_exist(
549                    client.untyped_client.scope_lookup.scope_id
550                ));
551            })
552            .await;
553    }
554
555    // Good example test, nothing new in here though:
556    #[rstest]
557    #[tokio::test]
558    async fn test_infinite() {
559        identify_parking_lot_deadlocks();
560
561        // ssr won't load local_resources, which is what we're testing with to avoid needing serde impls.
562        #[cfg(not(feature = "ssr"))]
563        tokio::task::LocalSet::new()
564            .run_until(async move {
565                #[derive(Clone, Debug, PartialEq, Eq)]
566                struct InfiniteItem(usize);
567
568                #[derive(Clone, Debug, PartialEq, Eq)]
569                struct InfiniteList {
570                    items: Vec<InfiniteItem>,
571                    offset: usize,
572                    more_available: bool,
573                }
574
575                async fn get_list_items(offset: usize) -> Vec<InfiniteItem> {
576                    (offset..offset + 10).map(InfiniteItem).collect()
577                }
578
579                async fn get_list_query(_key: ()) -> InfiniteList {
580                    let items = get_list_items(0).await;
581                    InfiniteList {
582                        offset: items.len(),
583                        more_available: !items.is_empty(),
584                        items,
585                    }
586                }
587
588                let (client, _guard, _owner) = prep_vari!(false);
589
590                // Initialise the query with the first load.
591                // we're not using a reactive key here for extending the list, but declarative updates instead.
592                let resource = client.local_resource(get_list_query, || ());
593                assert_eq!(
594                    resource.await,
595                    InfiniteList {
596                        items: (0..10).map(InfiniteItem).collect::<Vec<_>>(),
597                        offset: 10,
598                        more_available: true
599                    }
600                );
601
602                // When wanting to load more items, update_query_async can be called declaratively to update the cached item:
603                client
604                    .update_query_async(get_list_query, (), async |last| {
605                        if last.more_available {
606                            let next_items = get_list_items(last.offset).await;
607                            last.offset += next_items.len();
608                            last.more_available = !next_items.is_empty();
609                            last.items.extend(next_items);
610                        }
611                    })
612                    .await;
613
614                // Should've been updated in place:
615                assert_eq!(
616                    client.get_cached_query(get_list_query, ()),
617                    Some(InfiniteList {
618                        items: (0..20).map(InfiniteItem).collect::<Vec<_>>(),
619                        offset: 20,
620                        more_available: true
621                    })
622                );
623            })
624            .await;
625    }
626
627    /// prefetch_query
628    /// prefetch_query_local
629    /// fetch_query
630    /// fetch_query_local
631    /// update_query
632    /// query_exists
633    /// update_query_async
634    #[rstest]
635    #[tokio::test]
636    async fn test_declaratives() {
637        identify_parking_lot_deadlocks();
638        tokio::task::LocalSet::new()
639            .run_until(async move {
640                let (fetcher, _fetch_calls) = default_fetcher();
641                let (client, _guard, _owner) = prep_vari!(false);
642
643                let key = 1;
644
645                // Want to confirm by default everything triggers updates, but when using `.untrack_update_query()` magic fn for those applicable it doesn't.
646                let value_sub_react_count = Arc::new(AtomicUsize::new(0));
647                let value_sub = client.subscribe_value(&fetcher, move || key);
648                Effect::new_isomorphic({
649                    let value_sub_react_count = value_sub_react_count.clone();
650                    move || {
651                        value_sub.get();
652                        value_sub_react_count.fetch_add(1, Ordering::Relaxed);
653                    }
654                });
655
656                macro_rules! maybe_reacts {
657                    ($reacts:expr, $block:expr) => {{
658                        tick!();
659                        let before = value_sub_react_count.load(Ordering::Relaxed);
660                        let result = $block;
661                        tick!();
662                        let after = value_sub_react_count.load(Ordering::Relaxed);
663                        if $reacts {
664                            assert_eq!(
665                                after,
666                                before + 1,
667                                "{} != {}, didn't react like expected",
668                                after,
669                                before
670                            );
671                        } else {
672                            assert_eq!(
673                                after, before,
674                                "{} != {}, reacted when it shouldn't",
675                                after, before
676                            );
677                        }
678                        result
679                    }};
680                }
681
682                assert!(!client.query_exists(&fetcher, key));
683                client.set_query_local(&fetcher, key, 1);
684                assert_eq!(client.get_cached_query(&fetcher, key), Some(1));
685
686                client.invalidate_query(&fetcher, key);
687                maybe_reacts!(
688                    true,
689                    assert!(client.update_query(&fetcher, key, |value| {
690                        value
691                            .map(|v| {
692                                *v = 2;
693                                true
694                            })
695                            .unwrap_or(false)
696                    }))
697                );
698                // update_query shouldn't have reset the invalidated status:
699                assert!(client.is_key_invalid(&fetcher, key));
700
701                assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
702
703                // set_update_query shouldn't have reset the invalidated status:
704                assert!(client.is_key_invalid(&fetcher, key));
705                // maybe_reacts!(true, client.set_query(&fetcher, key, 3));
706                client.set_query(&fetcher, key, 3);
707                assert!(client.is_key_invalid(&fetcher, key));
708
709                assert_eq!(client.get_cached_query(&fetcher, key), Some(3));
710
711                maybe_reacts!(
712                    true,
713                    assert!(client.update_query(&fetcher, key, |value| {
714                        value
715                            .map(|v| {
716                                *v *= 2;
717                                true
718                            })
719                            .unwrap_or(false)
720                    }))
721                );
722
723                // Noop would react, but not if client.untrack_update_query() is used:
724                maybe_reacts!(true, client.update_query(&fetcher, key, |_value| {}));
725                maybe_reacts!(
726                    false,
727                    client.update_query(&fetcher, key, |_value| { client.untrack_update_query() })
728                );
729
730                assert_eq!(client.get_cached_query(&fetcher, key), Some(6));
731                assert!(client.query_exists(&fetcher, key));
732
733                assert!(client.untyped_client.clear_query(&fetcher, key));
734                assert!(!client.query_exists(&fetcher, key));
735
736                maybe_reacts!(true, client.prefetch_query_local(&fetcher, key).await);
737
738                assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
739                client.clear();
740                assert_eq!(client.total_cached_queries(), 0);
741                maybe_reacts!(true, client.prefetch_query(&fetcher, key).await);
742                assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
743
744                assert!(client.untyped_client.clear_query(&fetcher, key));
745                assert!(!client.query_exists(&fetcher, key));
746                maybe_reacts!(
747                    true,
748                    assert_eq!(client.fetch_query_local(&fetcher, key).await, 2)
749                );
750                assert!(client.query_exists(&fetcher, key));
751                client.clear();
752                assert_eq!(client.total_cached_queries(), 0);
753                maybe_reacts!(true, assert_eq!(client.fetch_query(&fetcher, key).await, 2));
754
755                // update_query_async/update_query_async_local:
756                maybe_reacts!(
757                    true,
758                    assert_eq!(
759                        client
760                            .update_query_async(&fetcher, key, async |value| {
761                                *value += 1;
762                                *value
763                            })
764                            .await,
765                        3
766                    )
767                );
768                assert_eq!(client.get_cached_query(&fetcher, key), Some(3));
769                // Noop would react, but not if client.untrack_update_query() is used:
770                maybe_reacts!(
771                    true,
772                    client
773                        .update_query_async(&fetcher, key, async |_value| {})
774                        .await
775                );
776                maybe_reacts!(
777                    false,
778                    client
779                        .update_query_async(&fetcher, key, async |_value| {
780                            client.untrack_update_query()
781                        })
782                        .await
783                );
784
785                maybe_reacts!(
786                    true,
787                    assert_eq!(
788                        client
789                            .update_query_async_local(&fetcher, key, async |value| {
790                                *value += 1;
791                                *value
792                            })
793                            .await,
794                        4
795                    )
796                );
797                assert_eq!(client.get_cached_query(&fetcher, key), Some(4));
798                // Noop would react, but not if client.untrack_update_query() is used:
799                maybe_reacts!(
800                    true,
801                    client
802                        .update_query_async_local(&fetcher, key, async |_value| {})
803                        .await
804                );
805                maybe_reacts!(
806                    false,
807                    client
808                        .update_query_async_local(&fetcher, key, async |_value| {
809                            client.untrack_update_query()
810                        })
811                        .await
812                );
813
814                // is_fetching should be true throughout the whole lifetime of update_query_async, even the external async section:
815                let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), move || key);
816                assert!(!is_fetching.get_untracked());
817                tokio::join!(
818                    async {
819                        assert_eq!(
820                            client
821                                .update_query_async(&fetcher, key, async |value| {
822                                    tokio::time::sleep(tokio::time::Duration::from_millis(30))
823                                        .await;
824                                    *value += 1;
825                                    *value
826                                })
827                                .await,
828                            5
829                        );
830                    },
831                    async {
832                        let elapsed = std::time::Instant::now();
833                        tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
834                        tick!();
835                        while elapsed.elapsed().as_millis() < 25 {
836                            assert!(is_fetching.get_untracked());
837                            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
838                        }
839                    }
840                );
841                // To make sure finished
842                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
843                assert!(!is_fetching.get_untracked());
844            })
845            .await;
846    }
847
848    /// Make sure refetching works at the expected time,
849    /// only does so once there are active resources using it,
850    /// and skips when refetching is disabled.
851    #[rstest]
852    #[tokio::test]
853    async fn test_refetch(
854        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
855        #[values(false, true)] arc: bool,
856        #[values(false, true)] set_refetch_enabled: bool,
857    ) {
858        identify_parking_lot_deadlocks();
859        tokio::task::LocalSet::new()
860            .run_until(async move {
861                const REFETCH_TIME_MS: u64 = 100;
862                const FETCH_TIME_MS: u64 = 10;
863
864                let fetch_calls = Arc::new(AtomicUsize::new(0));
865                let fetcher = {
866                    let fetch_calls = fetch_calls.clone();
867                    move |key: u64| {
868                        fetch_calls.fetch_add(1, Ordering::Relaxed);
869                        async move {
870                            tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_TIME_MS)).await;
871                            key * 2
872                        }
873                    }
874                };
875                let fetcher = QueryScope::new(
876                    fetcher
877                ).with_options(QueryOptions::new().with_refetch_interval(std::time::Duration::from_millis(REFETCH_TIME_MS)));
878
879                let (mut client, _guard, owner) = prep_vari!(false);
880                let refetch_enabled = ArcRwSignal::new(true);
881                if set_refetch_enabled {
882                    client = client.with_refetch_enabled_toggle(refetch_enabled.clone());
883                }
884
885                macro_rules! with_tmp_owner {
886                    ($body:block) => {{
887                        let tmp_owner = owner.child();
888                        tmp_owner.set();
889                        let result = $body;
890                        tmp_owner.unset();
891                        owner.set();
892                        result
893                    }};
894                }
895
896                macro_rules! check {
897                    ($get_resource:expr) => {{
898
899                        // On the server cannot actually run local resources:
900                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
901
902                            // Initial caching:
903                            with_tmp_owner! {{
904                                assert_eq!($get_resource().await, 4);
905                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
906                                assert_eq!(client.total_cached_queries(), 1);
907
908                                // less than refetch_time shouldn't have recalled:
909                                assert_eq!($get_resource().await, 4);
910                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
911                                assert_eq!(client.total_cached_queries(), 1);
912                            }}
913
914                            // hit refetch time with no active resources shouldn't have refetched:
915                            tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
916                            tick!();
917                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
918
919                            // hit refetch_time when active resource should refetch:
920                            with_tmp_owner! {{
921                                // Because the refetch call would've still invalidated the value, the new resource should trigger the refetch automatically:
922                                let _resource = $get_resource();
923                                tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_TIME_MS)).await;
924                                tick!();
925                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
926
927                                // There's an active resource now, so this should trigger again without needing to touch the resources:
928                                tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
929                                tick!();
930
931                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
932                                assert_eq!($get_resource().await, 4);
933                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
934
935                                if set_refetch_enabled {
936                                    // Disable refetching, should skip this:
937                                    refetch_enabled.set(false);
938                                    tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
939                                    tick!();
940
941                                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
942                                    assert_eq!($get_resource().await, 4);
943                                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
944
945                                    // Should start refetching again once re-enabled:
946                                    refetch_enabled.set(true);
947                                }
948
949                                tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
950                                tick!();
951
952                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
953                                assert_eq!($get_resource().await, 4);
954                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
955                            }}
956
957                            // Should stop refetching once all resources are dropped:
958                            tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
959                            tick!();
960                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
961                        }
962                    }};
963                }
964
965                vari_new_resource_with_cb!(
966                    check,
967                    client,
968                    fetcher.clone(),
969                    || 2,
970                    resource_type,
971                    arc
972                );
973            })
974            .await;
975    }
976
977    /// Leptos sometimes breaks drop semantics for local/nonlocal signals,
978    /// this was the testcase that catches it and is simpler to identify than inside other tests.
979    #[rstest]
980    #[tokio::test]
981    async fn test_drop_semantics(#[values(false, true)] local: bool) {
982        tokio::task::LocalSet::new()
983            .run_until(async move {
984                let owner = Owner::default();
985
986                macro_rules! with_tmp_owner {
987                    ($body:block) => {{
988                        let tmp_owner = owner.child();
989                        tmp_owner.set();
990                        let result = $body;
991                        tmp_owner.unset();
992                        owner.set();
993                        result
994                    }};
995                }
996
997                let dropped = with_tmp_owner! {{
998                    let dropped = Arc::new(AtomicBool::new(false));
999                    let on_drop = Arc::new(OnDrop::new({
1000                        let dropped = dropped.clone();
1001                        move || {
1002                            dropped.store(true, Ordering::Relaxed);
1003                    }}));
1004                    if local {
1005                        ArenaItem::<_, SyncStorage>::new_with_storage(ArcAsyncDerived::new_unsync(
1006                            move || {
1007                                let _on_drop = on_drop.clone();
1008                                async move {
1009                                }
1010                            })
1011                        );
1012                    } else {
1013                        ArenaItem::<_, SyncStorage>::new_with_storage(ArcAsyncDerived::new(
1014                            move || {
1015                                let _on_drop = on_drop.clone();
1016                                async move {
1017                                }
1018                            })
1019                        );
1020                    }
1021                    assert!(!dropped.load(Ordering::Relaxed));
1022                    dropped
1023                }};
1024                tick!();
1025                assert!(dropped.load(Ordering::Relaxed));
1026            })
1027            .await;
1028    }
1029
1030    /// Make sure the cache is cleaned up at the expected time, and only do so once no resources are using it.
1031    #[rstest]
1032    #[tokio::test]
1033    async fn test_gc(
1034        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1035        #[values(false, true)] arc: bool,
1036    ) {
1037        identify_parking_lot_deadlocks();
1038        tokio::task::LocalSet::new()
1039        .run_until(async move {
1040                const GC_TIME_MS: u64 = 30;
1041
1042                let fetch_calls = Arc::new(AtomicUsize::new(0));
1043                let fetcher = {
1044                    let fetch_calls = fetch_calls.clone();
1045                    move |key: u64| {
1046                        fetch_calls.fetch_add(1, Ordering::Relaxed);
1047                        async move {
1048                            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1049                            key * 2
1050                        }
1051                    }
1052                };
1053
1054
1055
1056                let fetcher = QueryScope::new(
1057                    fetcher
1058                ).with_options(QueryOptions::new().with_gc_time(std::time::Duration::from_millis(GC_TIME_MS)));
1059
1060                let gc_counts = Arc::new(parking_lot::Mutex::new(HashMap::new()));
1061                let fetcher = fetcher.on_gc({
1062                    let gc_counts = gc_counts.clone();
1063                    move |key| {
1064                        let mut counts = gc_counts.lock();
1065                        *counts.entry(*key).or_insert(0) += 1;
1066                    }
1067                });
1068
1069                let (client, _guard, owner) = prep_vari!(false);
1070
1071                macro_rules! with_tmp_owner {
1072                    ($body:block) => {{
1073                        let tmp_owner = owner.child();
1074                        tmp_owner.set();
1075                        let result = $body;
1076                        tmp_owner.unset();
1077                        owner.set();
1078                        result
1079                    }};
1080                }
1081
1082                macro_rules! check {
1083                    ($get_resource:expr) => {{
1084                        let subscribed = client.subscribe_value(fetcher.clone(), move || 2);
1085
1086                        // On the server cannot actually run local resources:
1087                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1088
1089                            // Initial caching:
1090                            with_tmp_owner! {{
1091                                assert_eq!($get_resource().await, 4);
1092                                assert_eq!(subscribed.get_untracked(), Some(4));
1093                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1094                                assert_eq!(client.total_cached_queries(), 1);
1095
1096                                // < gc_time shouldn't have cleaned up:
1097                                tick!();
1098                                assert_eq!($get_resource().await, 4);
1099                                assert_eq!(subscribed.get_untracked(), Some(4));
1100                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1101                                assert_eq!(client.total_cached_queries(), 1);
1102                                assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 0);
1103                            }}
1104
1105                            // all resources dropped when <gc_time shouldn't have cleaned up:
1106                            with_tmp_owner! {{
1107                                tick!();
1108                                assert_eq!($get_resource().await, 4);
1109                                assert_eq!(subscribed.get_untracked(), Some(4));
1110                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1111                                assert_eq!(client.total_cached_queries(), 1);
1112                                assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 0);
1113                            }}
1114
1115                            // >gc_time when active resource shouldn't have cleaned up:
1116                            with_tmp_owner! {{
1117                                let _resource = $get_resource();
1118
1119                                tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1120                                tick!();
1121
1122                                // >gc_time shouldn't cleanup because there's an active resource:
1123                                assert_eq!($get_resource().await, 4);
1124                                assert_eq!(subscribed.get_untracked(), Some(4));
1125                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1126                                assert_eq!(client.total_cached_queries(), 1);
1127                                assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 0);
1128                            }}
1129
1130                            // >gc_time and no resources should now have been cleaned up, causing a new fetch:
1131                            with_tmp_owner! {{
1132                                assert_eq!(client.total_cached_queries(), 1);
1133
1134                                assert_eq!(subscribed.get_untracked(), Some(4));
1135                                tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1136                                tick!();
1137                                assert_eq!(subscribed.get_untracked(), None);
1138                                assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 1);
1139
1140                                assert_eq!($get_resource().await, 4);
1141                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1142                            }}
1143
1144                            // Final cleanup:
1145                            tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1146                            tick!();
1147                            assert_eq!(client.total_cached_queries(), 0);
1148                            assert_eq!(subscribed.get_untracked(), None);
1149                            assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 2);
1150                        }
1151                    }};
1152                }
1153
1154                vari_new_resource_with_cb!(
1155                    check,
1156                    client,
1157                    fetcher.clone(),
1158                    || 2,
1159                    resource_type,
1160                    arc
1161                );
1162            })
1163            .await;
1164    }
1165
1166    /// Make sure !Send and !Sync values work with local resources.
1167    #[rstest]
1168    #[tokio::test]
1169    async fn test_unsync(#[values(false, true)] arc: bool) {
1170        identify_parking_lot_deadlocks();
1171        tokio::task::LocalSet::new()
1172            .run_until(async move {
1173                #[derive(Debug)]
1174                struct UnsyncValue(u64, PhantomData<NonNull<()>>);
1175                impl PartialEq for UnsyncValue {
1176                    fn eq(&self, other: &Self) -> bool {
1177                        self.0 == other.0
1178                    }
1179                }
1180                impl Eq for UnsyncValue {}
1181                impl Clone for UnsyncValue {
1182                    fn clone(&self) -> Self {
1183                        Self(self.0, PhantomData)
1184                    }
1185                }
1186                impl UnsyncValue {
1187                    fn new(value: u64) -> Self {
1188                        Self(value, PhantomData)
1189                    }
1190                }
1191
1192                let fetch_calls = Arc::new(AtomicUsize::new(0));
1193                let fetcher = {
1194                    let fetch_calls = fetch_calls.clone();
1195                    move |key: u64| {
1196                        fetch_calls.fetch_add(1, Ordering::Relaxed);
1197                        async move {
1198                            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1199                            UnsyncValue::new(key * 2)
1200                        }
1201                    }
1202                };
1203                let fetcher = QueryScopeLocal::new(fetcher);
1204
1205                let (client, _guard, _owner) = prep_vari!(false);
1206
1207                macro_rules! check {
1208                    ($get_resource:expr) => {{
1209                        let resource = $get_resource();
1210                        let subscribed = client.subscribe_value_local(fetcher.clone(), move || 2);
1211
1212                        // Should be None initially with the sync methods:
1213                        assert!(resource.get_untracked().is_none());
1214                        assert!(resource.try_get_untracked().unwrap().is_none());
1215                        assert!(resource.get().is_none());
1216                        assert!(resource.try_get().unwrap().is_none());
1217                        assert!(resource.read().is_none());
1218                        assert!(resource.try_read().as_deref().unwrap().is_none());
1219                        assert!(subscribed.get_untracked().is_none());
1220
1221                        // On the server cannot actually run local resources:
1222                        if cfg!(not(feature = "ssr")) {
1223                            assert_eq!(resource.await, UnsyncValue::new(4));
1224                            assert_eq!(subscribed.get_untracked(), Some(UnsyncValue::new(4)));
1225                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1226
1227                            tick!();
1228
1229                            assert_eq!($get_resource().await, UnsyncValue::new(4));
1230                            assert_eq!(subscribed.get_untracked(), Some(UnsyncValue::new(4)));
1231                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1232                        }
1233                    }};
1234                }
1235
1236                match arc {
1237                    true => {
1238                        check!(|| client.arc_local_resource(fetcher.clone(), || 2))
1239                    }
1240                    false => {
1241                        check!(|| client.local_resource(fetcher.clone(), || 2))
1242                    }
1243                }
1244            })
1245            .await;
1246    }
1247
1248    #[rstest]
1249    #[tokio::test]
1250    async fn test_subscribe_is_fetching_and_loading(
1251        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1252        #[values(false, true)] arc: bool,
1253        #[values(false, true)] server_ctx: bool,
1254    ) {
1255        identify_parking_lot_deadlocks();
1256        tokio::task::LocalSet::new()
1257            .run_until(async move {
1258                let (fetcher, fetch_calls) = default_fetcher();
1259                let (client, _guard, _owner) = prep_vari!(server_ctx);
1260
1261                macro_rules! check {
1262                    ($get_resource:expr) => {{
1263                        // On the server cannot actually run local resources:
1264                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1265                            assert_eq!(client.subscriber_count(), 0);
1266                            let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1267                            assert_eq!(is_fetching.get_untracked(), false);
1268                            assert_eq!(client.subscriber_count(), 1);
1269                            let is_fetching_clone = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1270                            assert_eq!(is_fetching_clone.get_untracked(), false);
1271                            // Should still be 1, clones reuse:
1272                            assert_eq!(client.subscriber_count(), 1);
1273                            let is_fetching_other = client.subscribe_is_fetching_arc(fetcher.clone(), || 3);
1274                            assert_eq!(is_fetching_other.get_untracked(), false);
1275                            assert_eq!(client.subscriber_count(), 2);
1276                            let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1277                            assert_eq!(is_loading.get_untracked(), false);
1278                            assert_eq!(client.subscriber_count(), 3);
1279                            let is_loading_other = client.subscribe_is_loading_arc(fetcher.clone(), || 3);
1280                            assert_eq!(is_loading_other.get_untracked(), false);
1281                            assert_eq!(client.subscriber_count(), 4);
1282
1283
1284                            macro_rules! check_all {
1285                                ($expected:expr) => {{
1286                                    assert_eq!(is_fetching.get_untracked(), $expected);
1287                                    assert_eq!(is_fetching_other.get_untracked(), $expected);
1288                                    assert_eq!(is_loading.get_untracked(), $expected);
1289                                    assert_eq!(is_loading_other.get_untracked(), $expected);
1290                                }};
1291                            }
1292
1293                            check_all!(false);
1294
1295                            tokio::join!(
1296                                async {
1297                                    assert_eq!($get_resource().await, 4);
1298                                },
1299                                async {
1300                                    let elapsed = std::time::Instant::now();
1301                                    tick!();
1302                                    while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1303                                        assert_eq!(is_fetching.get_untracked(), true);
1304                                        assert_eq!(is_fetching_other.get_untracked(), false);
1305                                        assert_eq!(is_loading.get_untracked(), true);
1306                                        assert_eq!(is_loading_other.get_untracked(), false);
1307                                        tick!();
1308                                    }
1309                                }
1310                            );
1311                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1312
1313                            check_all!(false);
1314
1315                            assert_eq!($get_resource().await, 4);
1316                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1317
1318                            check_all!(false);
1319
1320                            // Already in cache now, all should be false:
1321                            tokio::join!(
1322                                async {
1323                                    assert_eq!($get_resource().await, 4);
1324                                },
1325                                async {
1326                                    let elapsed = std::time::Instant::now();
1327                                    tick!();
1328                                    while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1329                                        check_all!(false);
1330                                        tick!();
1331                                    }
1332                                }
1333                            );
1334                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1335
1336                            client.invalidate_query(fetcher.clone(), &2);
1337
1338                            tokio::join!(
1339                                async {
1340                                    assert_eq!($get_resource().await, 4);
1341                                    // This should have returned the old value straight away, but the refetch will have been initiated in the background:
1342                                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1343                                    tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1344                                },
1345                                async {
1346                                    let elapsed = std::time::Instant::now();
1347                                    tick!();
1348                                    while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1349                                        assert_eq!(is_fetching.get_untracked(), true);
1350                                        assert_eq!(is_fetching_other.get_untracked(), false);
1351                                        // Loading should all be false as this is just a refetch now, 
1352                                        // the get_resource().await will actually return straight away, but it'll trigger the refetch.
1353                                        assert_eq!(is_loading.get_untracked(), false);
1354                                        assert_eq!(is_loading_other.get_untracked(), false);
1355                                        tick!();
1356                                    }
1357                                }
1358                            );
1359                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1360
1361                            drop(is_fetching);
1362                            // Will only drop once the clone is dropped too:
1363                            assert_eq!(client.subscriber_count(), 4);
1364                            drop(is_fetching_clone);
1365                            assert_eq!(client.subscriber_count(), 3);
1366                            drop(is_loading);
1367                            assert_eq!(client.subscriber_count(), 2);
1368                            drop(is_fetching_other);
1369                            assert_eq!(client.subscriber_count(), 1);
1370                            drop(is_loading_other);
1371                            assert_eq!(client.subscriber_count(), 0);
1372
1373                            client.clear();
1374                            assert_eq!(client.total_cached_queries(), 0);
1375
1376                            // Make sure subscriptions start in true state if in the middle of loading:
1377                            tokio::join!(
1378                                async {
1379                                    assert_eq!($get_resource().await, 4);
1380                                },
1381                                async {
1382                                    tick!();
1383                                    let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1384                                    let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1385                                    assert_eq!(is_fetching.get_untracked(), true);
1386                                    assert_eq!(is_loading.get_untracked(), true);
1387                                    assert_eq!(client.subscriber_count(), 2);
1388                                    tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1389                                    assert_eq!(is_fetching.get_untracked(), false);
1390                                    assert_eq!(is_loading.get_untracked(), false);
1391                                }
1392                            );
1393                            assert_eq!(client.subscriber_count(), 0);
1394
1395                            // Make sure refetch only too:
1396                            client.invalidate_query(fetcher.clone(), &2);
1397                            tokio::join!(
1398                                async {
1399                                    assert_eq!($get_resource().await, 4);
1400                                    tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1401                                },
1402                                async {
1403                                    tick!();
1404                                    let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1405                                    let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1406                                    assert_eq!(is_fetching.get_untracked(), true);
1407                                    assert_eq!(is_loading.get_untracked(), false);
1408                                    assert_eq!(client.subscriber_count(), 2);
1409                                    tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1410                                    assert_eq!(is_fetching.get_untracked(), false);
1411                                    assert_eq!(is_loading.get_untracked(), false);
1412                                }
1413                            );
1414                            assert_eq!(client.subscriber_count(), 0);
1415                            client.clear();
1416
1417                            // Now confirm the subscriber's keyer changes are respected, and subscriptions don't accidentally say true for the wrong key:
1418                            let sub_key_signal = RwSignal::new(2);
1419                            let resource_key_signal = RwSignal::new(2);
1420                            let is_fetching = client.subscribe_is_fetching(fetcher.clone(), move || sub_key_signal.get());
1421                            let is_loading = client.subscribe_is_loading(fetcher.clone(), move || sub_key_signal.get());
1422                            assert_eq!(is_fetching.get_untracked(), false);
1423                            assert_eq!(is_loading.get_untracked(), false);
1424
1425                            let _resource = client.resource(fetcher.clone(), move || resource_key_signal.get());
1426
1427                            // The creation of the resource should've triggered the initial fetch:
1428                            assert_eq!(is_fetching.get_untracked(), true);
1429                            assert_eq!(is_loading.get_untracked(), true);
1430                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1431                            assert_eq!(is_fetching.get_untracked(), false);
1432                            assert_eq!(is_loading.get_untracked(), false);
1433
1434                            // Sanity check confirming it won't refetch if the key hasn't actually changed:
1435                            sub_key_signal.set(2);
1436                            resource_key_signal.set(2);
1437                            tick!();
1438                            assert_eq!(is_fetching.get_untracked(), false);
1439                            assert_eq!(is_loading.get_untracked(), false);
1440
1441                            // New value should cause a fresh query, subscriber should match:
1442                            resource_key_signal.set(3);
1443                            sub_key_signal.set(3);
1444                            tick!();
1445                            assert_eq!(is_fetching.get_untracked(), true);
1446                            assert_eq!(is_loading.get_untracked(), true);
1447                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1448                            assert_eq!(is_fetching.get_untracked(), false);
1449                            assert_eq!(is_loading.get_untracked(), false);
1450                            assert_eq!(client.get_cached_query(fetcher.clone(), &3), Some(6));
1451
1452                            // Stale should still mean only is_fetching is true:
1453                            client.invalidate_query(fetcher.clone(), &3);
1454                            tick!();
1455                            assert_eq!(is_fetching.get_untracked(), true);
1456                            assert_eq!(is_loading.get_untracked(), false);
1457                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1458                            assert_eq!(is_fetching.get_untracked(), false);
1459                            assert_eq!(is_loading.get_untracked(), false);
1460
1461                            // If the resource diverges, subscriber shouldn't notice:
1462                            resource_key_signal.set(4);
1463                            tick!();
1464                            assert_eq!(is_fetching.get_untracked(), false);
1465                            assert_eq!(is_loading.get_untracked(), false);
1466                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1467                            assert_eq!(client.get_cached_query(fetcher.clone(), &4), Some(8));
1468
1469                            // Now confirm the keyer is actually reactive, and is able to trigger updates itself. 
1470                            // Do this by subscribing to is_fetching in an effect, and update the key in the keyer.
1471                            let last_is_fetching_value = Arc::new(parking_lot::Mutex::new(None));
1472                            Effect::new_isomorphic({
1473                                let last_is_fetching_value = last_is_fetching_value.clone();
1474                                move || {
1475                                    *last_is_fetching_value.lock() = Some(is_fetching.get());
1476                            }});
1477                            assert_eq!(*last_is_fetching_value.lock(), None);
1478                            tick!();
1479                            assert_eq!(*last_is_fetching_value.lock(), Some(false));
1480                            resource_key_signal.set(6);
1481                            sub_key_signal.set(6);
1482                            assert_eq!(*last_is_fetching_value.lock(), Some(false));
1483                            tick!();
1484                            assert_eq!(*last_is_fetching_value.lock(), Some(true));
1485                            assert_eq!(is_fetching.get_untracked(), true);
1486                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1487                            assert_eq!(*last_is_fetching_value.lock(), Some(false));
1488                            assert_eq!(is_fetching.get_untracked(), false);
1489                            assert_eq!(client.get_cached_query(fetcher.clone(), &6), Some(12));
1490                        }
1491                    }};
1492                }
1493
1494                vari_new_resource_with_cb!(
1495                    check,
1496                    client,
1497                    fetcher.clone(),
1498                    || 2,
1499                    resource_type,
1500                    arc
1501                );
1502            })
1503            .await;
1504    }
1505
1506    /// Make sure resources reload when queries invalidated correctly.
1507    #[rstest]
1508    #[tokio::test]
1509    async fn test_optional_key(
1510        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1511        #[values(false, true)] arc: bool,
1512        #[values(false, true)] server_ctx: bool,
1513    ) {
1514        identify_parking_lot_deadlocks();
1515        tokio::task::LocalSet::new()
1516            .run_until(async move {
1517                let (fetcher, fetch_calls) = default_fetcher();
1518                let (client, _guard, _owner) = prep_vari!(server_ctx);
1519
1520                let key_value = RwSignal::new(None);
1521                let keyer = move || key_value.get();
1522
1523                macro_rules! check {
1524                    ($get_resource:expr) => {{
1525                        let resource = $get_resource();
1526                        assert_eq!(resource.get_untracked().flatten(), None);
1527                        assert_eq!(resource.try_get_untracked().flatten().flatten(), None);
1528
1529                        // On the server cannot actually run local resources:
1530                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1531                            assert_eq!($get_resource().await, None);
1532
1533                            let sub_is_loading =
1534                                client.subscribe_is_loading(fetcher.clone(), keyer);
1535                            let sub_is_fetching =
1536                                client.subscribe_is_fetching(fetcher.clone(), keyer);
1537                            let sub_value = client.subscribe_value(fetcher.clone(), keyer);
1538
1539                            assert_eq!(sub_is_loading.get_untracked(), false);
1540                            assert_eq!(sub_is_fetching.get_untracked(), false);
1541                            assert_eq!(sub_value.get_untracked(), None);
1542
1543                            key_value.set(Some(2));
1544                            tick!();
1545
1546                            assert_eq!(sub_is_loading.get_untracked(), true);
1547                            assert_eq!(sub_is_fetching.get_untracked(), true);
1548
1549                            assert_eq!($get_resource().await, Some(4));
1550                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1551                            assert_eq!(sub_value.get_untracked(), Some(4));
1552                            assert_eq!(sub_is_loading.get_untracked(), false);
1553                            assert_eq!(sub_is_fetching.get_untracked(), false);
1554                        }
1555                    }};
1556                }
1557
1558                vari_new_resource_with_cb!(
1559                    check,
1560                    client,
1561                    fetcher.clone(),
1562                    keyer,
1563                    resource_type,
1564                    arc
1565                );
1566            })
1567            .await;
1568    }
1569
1570    /// Make sure resources reload when queries invalidated correctly.
1571    #[rstest]
1572    #[tokio::test]
1573    async fn test_invalidation(
1574        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1575        #[values(false, true)] arc: bool,
1576        #[values(false, true)] server_ctx: bool,
1577        #[values(
1578            InvalidationType::Query,
1579            InvalidationType::Scope,
1580            InvalidationType::Predicate,
1581            InvalidationType::All,
1582            InvalidationType::Clear
1583        )]
1584        invalidation_type: InvalidationType,
1585    ) {
1586        identify_parking_lot_deadlocks();
1587        tokio::task::LocalSet::new()
1588            .run_until(async move {
1589                let (fetcher, fetch_calls) = default_fetcher();
1590                let (client, _guard, _owner) = prep_vari!(server_ctx);
1591
1592                let invalidation_counts = Arc::new(parking_lot::Mutex::new(HashMap::new()));
1593
1594                let fetcher = fetcher.on_invalidation({
1595                    let invalidation_counts = invalidation_counts.clone();
1596                    move |key| {
1597                        let mut counts = invalidation_counts.lock();
1598                        *counts.entry(*key).or_insert(0) += 1;
1599                    }
1600                });
1601
1602                macro_rules! check {
1603                    ($get_resource:expr) => {{
1604                        let resource = $get_resource();
1605
1606                        // Should be None initially with the sync methods:
1607                        assert!(resource.get_untracked().is_none());
1608                        assert!(resource.try_get_untracked().unwrap().is_none());
1609                        assert!(resource.get().is_none());
1610                        assert!(resource.try_get().unwrap().is_none());
1611                        assert!(resource.read().is_none());
1612                        assert!(resource.try_read().as_deref().unwrap().is_none());
1613
1614                        // On the server cannot actually run local resources:
1615                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1616                            assert_eq!(resource.await, 4);
1617                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1618
1619                            tick!();
1620
1621                            // Shouldn't change despite ticking:
1622                            assert_eq!($get_resource().await, 4);
1623                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1624
1625                            assert_eq!(
1626                                *invalidation_counts.lock().get(&2).unwrap_or(&0),
1627                                0
1628                            );
1629                            invalidation_type.invalidate(&client, fetcher.clone(), &2);
1630                            assert_eq!(
1631                                *invalidation_counts.lock().get(&2).unwrap_or(&0),
1632                                1
1633                            );
1634
1635                            // A second invalidation on something that's already invalid,
1636                            // should not trigger the on_invalidation callback again:
1637                            invalidation_type.invalidate(&client, fetcher.clone(), &2);
1638                            assert_eq!(
1639                                *invalidation_counts.lock().get(&2).unwrap_or(&0),
1640                                1
1641                            );
1642
1643                            // Other than clear, because it should now be stale, not gc'd,
1644                            // sync fns on a new resource instance should still return the value, it just means a background refresh has been triggered:
1645                            let resource2 = $get_resource();
1646                            tick!();
1647                            if matches!(invalidation_type, InvalidationType::Clear) {
1648                                assert_eq!(resource2.get_untracked(), None);
1649                            } else {
1650                                assert_eq!(resource2.get_untracked(), Some(4));
1651                            }
1652                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1653
1654                            // Because the resource should've been auto invalidated, a tick should cause it to auto refetch:
1655                            tick!();
1656                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1657                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1658                            assert_eq!($get_resource().await, 4);
1659                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1660
1661                            // Invalidation callback should repeatedly work:
1662                            assert_eq!(
1663                                *invalidation_counts.lock().get(&2).unwrap_or(&0),
1664                                1
1665                            );
1666                            invalidation_type.invalidate(&client, fetcher.clone(), &2);
1667                            assert_eq!(
1668                                *invalidation_counts.lock().get(&2).unwrap_or(&0),
1669                                2
1670                            );
1671                        }
1672                    }};
1673                }
1674
1675                vari_new_resource_with_cb!(
1676                    check,
1677                    client,
1678                    fetcher.clone(),
1679                    || 2,
1680                    resource_type,
1681                    arc
1682                );
1683            })
1684            .await;
1685    }
1686
1687    enum FetchQueryType {
1688        Fetch,
1689        Prefetch,
1690        UpdateAsync,
1691        UpdateAsyncLocal,
1692    }
1693
1694    #[rstest]
1695    #[tokio::test]
1696    async fn test_invalidation_during_inflight_queries(
1697        #[values(
1698            FetchQueryType::Fetch,
1699            FetchQueryType::Prefetch,
1700            FetchQueryType::UpdateAsync,
1701            FetchQueryType::UpdateAsyncLocal
1702        )]
1703        fetch_query_type: FetchQueryType,
1704        #[values(
1705            InvalidationType::Query,
1706            InvalidationType::Scope,
1707            InvalidationType::Predicate,
1708            InvalidationType::All,
1709            InvalidationType::Clear
1710        )]
1711        invalidation_type: InvalidationType,
1712    ) {
1713        identify_parking_lot_deadlocks();
1714        tokio::task::LocalSet::new()
1715            .run_until(async move {
1716                let (client, _guard, _owner) = prep_vari!(false);
1717
1718                const FETCH_SLEEP_MS: u64 = 200;
1719                const UPDATE_SLEEP_MS: u64 = 200;
1720
1721                let num_calls = Arc::new(AtomicUsize::new(0));
1722                let num_completed_calls = Arc::new(AtomicUsize::new(0));
1723                let query_scope = {
1724                    let num_calls = num_calls.clone();
1725                    let num_completed_calls = num_completed_calls.clone();
1726                    move || {
1727                        let num_calls = num_calls.clone();
1728                        let num_completed_calls = num_completed_calls.clone();
1729                        async move {
1730                            num_calls.fetch_add(1, Ordering::Relaxed);
1731                            tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_SLEEP_MS))
1732                                .await;
1733                            num_completed_calls.fetch_add(1, Ordering::Relaxed);
1734                            "initial_value"
1735                        }
1736                    }
1737                };
1738
1739                let get_fetch_fut = || match fetch_query_type {
1740                    FetchQueryType::Fetch => Either::Left(Either::Left(async {
1741                        client.fetch_query(query_scope.clone(), ()).await;
1742                    })),
1743                    FetchQueryType::Prefetch => Either::Left(Either::Right(async {
1744                        client.prefetch_query(query_scope.clone(), ()).await;
1745                    })),
1746                    FetchQueryType::UpdateAsync => Either::Right(Either::Left(async {
1747                        client
1748                            .update_query_async(query_scope.clone(), (), async |value| {
1749                                tokio::time::sleep(tokio::time::Duration::from_millis(
1750                                    UPDATE_SLEEP_MS,
1751                                ))
1752                                .await;
1753                                *value = "modified_value";
1754                            })
1755                            .await;
1756                    })),
1757                    FetchQueryType::UpdateAsyncLocal => Either::Right(Either::Right(async {
1758                        client
1759                            .update_query_async_local(query_scope.clone(), (), async |value| {
1760                                tokio::time::sleep(tokio::time::Duration::from_millis(
1761                                    UPDATE_SLEEP_MS,
1762                                ))
1763                                .await;
1764                                *value = "modified_value";
1765                            })
1766                            .await;
1767                    })),
1768                };
1769
1770                let (_, _) = tokio::join!(
1771                    async {
1772                        // Wait for the fetch to start before invalidating:
1773                        tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_SLEEP_MS / 2))
1774                            .await;
1775                        invalidation_type.invalidate(&client, query_scope.clone(), &());
1776                    },
1777                    async {
1778                        get_fetch_fut().await;
1779                    }
1780                );
1781
1782                // The invalidation should have forced the in-flight query to be cancelled, and a replacement fired.
1783                assert_eq!(num_calls.load(Ordering::Relaxed), 2);
1784                assert_eq!(num_completed_calls.load(Ordering::Relaxed), 1);
1785                // Should not be invalid because the underlying query was refetched:
1786                assert!(!client.is_key_invalid(query_scope.clone(), ()));
1787
1788                // In general we treat update functions as resetting any invalid = true,
1789                // however if an async update user fn test, need to check that:
1790                // - when cleared during the user update async fn (after the query pull), the value isn't set, as would be stale
1791                // - opposite for invalidate, these should still be set, as should make the invalidated query "less stale", shouldn't override the new invalidated status though.
1792                if matches!(fetch_query_type, FetchQueryType::UpdateAsync)
1793                    || matches!(fetch_query_type, FetchQueryType::UpdateAsyncLocal)
1794                {
1795                    assert_eq!(
1796                        client.get_cached_query(query_scope.clone(), ()),
1797                        Some("modified_value")
1798                    );
1799                    // Clear so the fetch sleep should be accounted for below:
1800                    client.clear();
1801                    let (_, _) = tokio::join!(
1802                        async {
1803                            // Wait for the user update async fn to start before invalidating:
1804                            tokio::time::sleep(tokio::time::Duration::from_millis(
1805                                FETCH_SLEEP_MS + (UPDATE_SLEEP_MS / 2),
1806                            ))
1807                            .await;
1808                            invalidation_type.invalidate(&client, query_scope.clone(), &());
1809                        },
1810                        async {
1811                            get_fetch_fut().await;
1812                        }
1813                    );
1814                    if matches!(invalidation_type, InvalidationType::Clear) {
1815                        assert_eq!(client.get_cached_query(query_scope.clone(), ()), None);
1816                    } else {
1817                        assert_eq!(
1818                            client.get_cached_query(query_scope.clone(), ()),
1819                            Some("modified_value")
1820                        );
1821                        // Should still be invalid:
1822                        assert!(client.is_key_invalid(query_scope.clone(), ()));
1823                    }
1824                }
1825            })
1826            .await;
1827    }
1828
1829    #[rstest]
1830    #[tokio::test]
1831    async fn test_invalidation_hierarchy(#[values(false, true)] server_ctx: bool) {
1832        identify_parking_lot_deadlocks();
1833        tokio::task::LocalSet::new()
1834            .run_until(async move {
1835                let (fetcher, _fetch_calls) = default_fetcher();
1836                let (client, _guard, _owner) = prep_vari!(server_ctx);
1837
1838                // If this is invalidated, it should also invalidate the child resource.
1839                let fetcher = fetcher.with_invalidation_link(|_key| vec!["base", "users"]);
1840                client.fetch_query(&fetcher, &2).await;
1841                assert!(!client.is_key_invalid(&fetcher, 2));
1842
1843                // If this is invalidated, it'll invalidate both the main fetcher and the child resource.
1844                let hierarchy_parent_scope =
1845                    QueryScope::new(async || ()).with_invalidation_link(|_k| ["base"]);
1846                client
1847                    .fetch_query(hierarchy_parent_scope.clone(), &())
1848                    .await;
1849                assert!(!client.is_key_invalid(&hierarchy_parent_scope, ()));
1850
1851                // If this is invalidated, it shouldn't affect either of the others, as it's the lowest in the invalidation hierarchy.
1852                let hierarchy_child_scope = QueryScope::new(async |user_id| user_id)
1853                    .with_invalidation_link(|user_id: &usize| {
1854                        ["base".to_string(), "users".to_string(), user_id.to_string()]
1855                    });
1856                client
1857                    .fetch_query(hierarchy_child_scope.clone(), &100)
1858                    .await;
1859                assert!(!client.is_key_invalid(&hierarchy_child_scope, 100));
1860
1861                // Should be invalidated when the hierarchy_parent_scope and vice versa, siblings should invalidate each other
1862                let hierarchy_sibling_scope =
1863                    QueryScope::new(async || ()).with_invalidation_link(|_k| ["base"]);
1864                client
1865                    .fetch_query(hierarchy_sibling_scope.clone(), &())
1866                    .await;
1867                assert!(!client.is_key_invalid(&hierarchy_sibling_scope, ()));
1868
1869                client.invalidate_query(&hierarchy_parent_scope, ());
1870                tick!();
1871                assert!(client.is_key_invalid(&hierarchy_parent_scope, ()));
1872                assert!(client.is_key_invalid(&fetcher, 2));
1873                assert!(client.is_key_invalid(&hierarchy_child_scope, 100));
1874
1875                // Reset:
1876                client.fetch_query(&hierarchy_parent_scope, &()).await;
1877                client.fetch_query(&fetcher, &2).await;
1878                client
1879                    .fetch_query(hierarchy_child_scope.clone(), &100)
1880                    .await;
1881
1882                client.invalidate_query(&fetcher, 2);
1883                tick!();
1884                assert!(!client.is_key_invalid(&hierarchy_parent_scope, ()));
1885                assert!(client.is_key_invalid(&fetcher, 2));
1886                assert!(client.is_key_invalid(&hierarchy_child_scope, 100));
1887
1888                // Reset:
1889                client.fetch_query(&hierarchy_parent_scope, &()).await;
1890                client.fetch_query(&fetcher, &2).await;
1891                client
1892                    .fetch_query(hierarchy_child_scope.clone(), &100)
1893                    .await;
1894
1895                client.invalidate_query(&hierarchy_child_scope, 100);
1896                tick!();
1897                assert!(!client.is_key_invalid(&hierarchy_parent_scope, ()));
1898                assert!(!client.is_key_invalid(&fetcher, 2));
1899                assert!(client.is_key_invalid(&hierarchy_child_scope, 100));
1900
1901                // Sibling should also invalidate the others:
1902                // Reset:
1903                client.fetch_query(&hierarchy_parent_scope, &()).await;
1904                client.fetch_query(&fetcher, &2).await;
1905                client
1906                    .fetch_query(hierarchy_sibling_scope.clone(), &())
1907                    .await;
1908                client.invalidate_query(&hierarchy_sibling_scope, ());
1909                tick!();
1910                assert!(client.is_key_invalid(&hierarchy_parent_scope, ()));
1911                assert!(client.is_key_invalid(&hierarchy_sibling_scope, ()));
1912
1913                // Repeat the other direction to be sure:
1914                // Reset:
1915                client.fetch_query(&hierarchy_parent_scope, &()).await;
1916                client.fetch_query(&fetcher, &2).await;
1917                client
1918                    .fetch_query(hierarchy_sibling_scope.clone(), &())
1919                    .await;
1920                client.invalidate_query(&hierarchy_parent_scope, ());
1921                tick!();
1922                assert!(client.is_key_invalid(&hierarchy_parent_scope, ()));
1923                assert!(client.is_key_invalid(&hierarchy_sibling_scope, ()));
1924            })
1925            .await;
1926    }
1927
1928    #[rstest]
1929    #[tokio::test]
1930    async fn test_same_key_invalidation(#[values(false, true)] server_ctx: bool) {
1931        identify_parking_lot_deadlocks();
1932        tokio::task::LocalSet::new()
1933            .run_until(async move {
1934                let (client, _guard, _owner) = prep_vari!(server_ctx);
1935
1936                // Create two different query scopes that both use ["foo"] as their invalidation key
1937                let scope_a = QueryScope::new(async |id: usize| format!("result_a_{}", id))
1938                    .with_invalidation_link(|_id: &usize| ["foo"]);
1939
1940                let scope_b = QueryScope::new(async |id: usize| format!("result_b_{}", id))
1941                    .with_invalidation_link(|_id: &usize| ["foo"]);
1942
1943                // Fetch both queries with different IDs
1944                client.fetch_query(&scope_a, &1).await;
1945                client.fetch_query(&scope_b, &2).await;
1946
1947                // Verify both are valid
1948                assert!(!client.is_key_invalid(&scope_a, 1));
1949                assert!(!client.is_key_invalid(&scope_b, 2));
1950
1951                // Invalidate scope_a with its key - this should invalidate both
1952                // since they share the same invalidation key ["foo"]
1953                client.invalidate_query(&scope_a, 1);
1954                tick!();
1955
1956                // Both should now be invalid because they share ["foo"]
1957                assert!(client.is_key_invalid(&scope_a, 1));
1958                assert!(client.is_key_invalid(&scope_b, 2));
1959
1960                // Reset and test the other direction
1961                client.fetch_query(&scope_a, &1).await;
1962                client.fetch_query(&scope_b, &2).await;
1963
1964                assert!(!client.is_key_invalid(&scope_a, 1));
1965                assert!(!client.is_key_invalid(&scope_b, 2));
1966
1967                // Invalidate scope_b - should also invalidate scope_a
1968                client.invalidate_query(&scope_b, 2);
1969                tick!();
1970
1971                assert!(client.is_key_invalid(&scope_a, 1));
1972                assert!(client.is_key_invalid(&scope_b, 2));
1973
1974                // Also test with a third scope that has a different key
1975                let scope_c = QueryScope::new(async |id: usize| format!("result_c_{}", id))
1976                    .with_invalidation_link(|_id: &usize| ["bar"]);
1977
1978                client.fetch_query(&scope_a, &1).await;
1979                client.fetch_query(&scope_b, &2).await;
1980                client.fetch_query(&scope_c, &3).await;
1981
1982                // Invalidating scope_c should NOT affect scope_a or scope_b
1983                client.invalidate_query(&scope_c, 3);
1984                tick!();
1985
1986                assert!(!client.is_key_invalid(&scope_a, 1));
1987                assert!(!client.is_key_invalid(&scope_b, 2));
1988                assert!(client.is_key_invalid(&scope_c, 3));
1989
1990                // But invalidating scope_a should still invalidate scope_b (but not scope_c)
1991                client.invalidate_query(&scope_a, 1);
1992                tick!();
1993
1994                assert!(client.is_key_invalid(&scope_a, 1));
1995                assert!(client.is_key_invalid(&scope_b, 2));
1996                assert!(client.is_key_invalid(&scope_c, 3)); // Still invalid from before
1997            })
1998            .await;
1999    }
2000
2001    #[rstest]
2002    #[tokio::test]
2003    async fn test_key_tracked_autoreload(
2004        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
2005        #[values(false, true)] arc: bool,
2006        #[values(false, true)] server_ctx: bool,
2007    ) {
2008        identify_parking_lot_deadlocks();
2009        tokio::task::LocalSet::new()
2010            .run_until(async move {
2011                let (fetcher, fetch_calls) = default_fetcher();
2012
2013                let (client, _guard, _owner) = prep_vari!(server_ctx);
2014
2015                let add_size = RwSignal::new(1);
2016
2017                macro_rules! check {
2018                    ($get_resource:expr) => {{
2019                        let resource = $get_resource();
2020                        let subscribed =
2021                            client.subscribe_value(fetcher.clone(), move || add_size.get());
2022
2023                        // Should be None initially with the sync methods:
2024                        assert!(resource.get_untracked().is_none());
2025                        assert!(resource.try_get_untracked().unwrap().is_none());
2026                        assert!(resource.get().is_none());
2027                        assert!(resource.try_get().unwrap().is_none());
2028                        assert!(resource.read().is_none());
2029                        assert!(resource.try_read().as_deref().unwrap().is_none());
2030                        assert_eq!(subscribed.get_untracked(), None);
2031
2032                        // On the server cannot actually run local resources:
2033                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
2034                            let resource = $get_resource();
2035                            assert_eq!($get_resource().await, 2);
2036                            assert_eq!(subscribed.get_untracked(), Some(2));
2037                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2038
2039                            // Update the resource key:
2040                            add_size.set(2);
2041
2042                            // Until the new fetch has completed, the old should still be returned:
2043                            tick!();
2044                            assert_eq!(resource.get(), Some(2));
2045                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2046
2047                            // Wait for the new to complete:
2048                            tokio::time::sleep(std::time::Duration::from_millis(
2049                                DEFAULT_FETCHER_MS + 10,
2050                            ))
2051                            .await;
2052                            tick!();
2053
2054                            // Should have updated to the new value:
2055                            assert_eq!(resource.get(), Some(4));
2056                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
2057                            assert_eq!($get_resource().await, 4);
2058                            assert_eq!(subscribed.get_untracked(), Some(4));
2059                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
2060                        }
2061                    }};
2062                }
2063
2064                vari_new_resource_with_cb!(
2065                    check,
2066                    client,
2067                    fetcher.clone(),
2068                    move || add_size.get(),
2069                    resource_type,
2070                    arc
2071                );
2072            })
2073            .await;
2074    }
2075
2076    /// Make sure values on first receival and cached all stick to their specific key.
2077    #[rstest]
2078    #[tokio::test]
2079    async fn test_key_integrity(
2080        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
2081        #[values(false, true)] arc: bool,
2082        #[values(false, true)] server_ctx: bool,
2083    ) {
2084        identify_parking_lot_deadlocks();
2085        tokio::task::LocalSet::new()
2086            .run_until(async move {
2087                // On the server cannot actually run local resources:
2088                if cfg!(feature = "ssr") && resource_type == ResourceType::Local {
2089                    return;
2090                }
2091
2092                let (fetcher, fetch_calls) = default_fetcher();
2093                let (client, _guard, _owner) = prep_vari!(server_ctx);
2094
2095                let keys = [1, 2, 3, 4, 5];
2096                let results = futures::future::join_all(keys.iter().cloned().map(|key| {
2097                    let fetcher = fetcher.clone();
2098                    async move {
2099                        macro_rules! cb {
2100                            ($get_resource:expr) => {{
2101                                let resource = $get_resource();
2102                                resource.await
2103                            }};
2104                        }
2105                        vari_new_resource_with_cb!(
2106                            cb,
2107                            client,
2108                            fetcher,
2109                            move || key,
2110                            resource_type,
2111                            arc
2112                        )
2113                    }
2114                }))
2115                .await;
2116                assert_eq!(results, vec![2, 4, 6, 8, 10]);
2117                assert_eq!(fetch_calls.load(Ordering::Relaxed), 5);
2118
2119                // Call again, each should still be accurate, but each should be cached so fetch call doesn't increase:
2120                let results = futures::future::join_all(keys.iter().cloned().map(|key| {
2121                    let fetcher = fetcher.clone();
2122                    async move {
2123                        macro_rules! cb {
2124                            ($get_resource:expr) => {{
2125                                let resource = $get_resource();
2126                                resource.await
2127                            }};
2128                        }
2129                        vari_new_resource_with_cb!(
2130                            cb,
2131                            client,
2132                            fetcher,
2133                            move || key,
2134                            resource_type,
2135                            arc
2136                        )
2137                    }
2138                }))
2139                .await;
2140                assert_eq!(results, vec![2, 4, 6, 8, 10]);
2141                assert_eq!(fetch_calls.load(Ordering::Relaxed), 5);
2142            })
2143            .await;
2144    }
2145
2146    /// Make sure resources that are loaded together only run once but share the value.
2147    #[rstest]
2148    #[tokio::test]
2149    async fn test_resource_race(
2150        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
2151        #[values(false, true)] arc: bool,
2152        #[values(false, true)] server_ctx: bool,
2153    ) {
2154        identify_parking_lot_deadlocks();
2155        tokio::task::LocalSet::new()
2156            .run_until(async move {
2157                // On the server cannot actually run local resources:
2158                if cfg!(feature = "ssr") && resource_type == ResourceType::Local {
2159                    return;
2160                }
2161
2162                let (fetcher, fetch_calls) = default_fetcher();
2163                let (client, _guard, _owner) = prep_vari!(server_ctx);
2164
2165                let keyer = || 1;
2166                let results = futures::future::join_all((0..10).map(|_| {
2167                    let fetcher = fetcher.clone();
2168                    async move {
2169                        macro_rules! cb {
2170                            ($get_resource:expr) => {{
2171                                let resource = $get_resource();
2172                                resource.await
2173                            }};
2174                        }
2175                        vari_new_resource_with_cb!(cb, client, fetcher, keyer, resource_type, arc)
2176                    }
2177                }))
2178                .await
2179                .into_iter()
2180                .collect::<Vec<_>>();
2181                assert_eq!(results, vec![2; 10]);
2182                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2183            })
2184            .await;
2185    }
2186
2187    #[cfg(feature = "ssr")]
2188    #[tokio::test]
2189    async fn test_resource_cross_stream_caching() {
2190        identify_parking_lot_deadlocks();
2191        tokio::task::LocalSet::new()
2192            .run_until(async move {
2193                for maybe_sleep_ms in &[None, Some(10), Some(30)] {
2194                    let (client, ssr_ctx, _owner) = prep_server!();
2195
2196                    let fetch_calls = Arc::new(AtomicUsize::new(0));
2197                    let fetcher = {
2198                        let fetch_calls = fetch_calls.clone();
2199                        move |key: u64| {
2200                            fetch_calls.fetch_add(1, Ordering::Relaxed);
2201                            async move {
2202                                if let Some(sleep_ms) = maybe_sleep_ms {
2203                                    tokio::time::sleep(tokio::time::Duration::from_millis(
2204                                        *sleep_ms as u64,
2205                                    ))
2206                                    .await;
2207                                }
2208                                key * 2
2209                            }
2210                        }
2211                    };
2212                    let fetcher = QueryScope::new(fetcher);
2213
2214                    let keyer = || 1;
2215
2216                    // First call should require a fetch.
2217                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2218                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2219
2220                    // Second should be cached by the query client because same key:
2221                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2222                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2223
2224                    // Should make it over to the frontend too:
2225                    let (client, _owner) = prep_client!(ssr_ctx);
2226
2227                    // This will stream from the first ssr resource:
2228                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2229                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2230
2231                    // This will stream from the second ssr resource:
2232                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2233                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2234
2235                    // This drives the effect that will put the resource into the frontend cache:
2236                    tick!();
2237
2238                    // This didn't happen in ssr so nothing to stream,
2239                    // but the other 2 resources shoud've still put themselves into the frontend cache,
2240                    // so this should get picked up by that.
2241                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2242                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2243
2244                    // Reset and confirm works for non blocking too:
2245                    let (ssr_client, ssr_ctx, _owner) = prep_server!();
2246                    fetch_calls.store(0, Ordering::Relaxed);
2247
2248                    // Don't await:
2249                    let ssr_resource_1 = ssr_client.arc_resource(fetcher.clone(), keyer);
2250                    let ssr_resource_2 = ssr_client.arc_resource(fetcher.clone(), keyer);
2251
2252                    let (hydrate_client, _owner) = prep_client!(ssr_ctx);
2253
2254                    // Matching 2 resources on hydrate, these should stream:
2255                    let hydrate_resource_1 = hydrate_client.arc_resource(fetcher.clone(), keyer);
2256                    let hydrate_resource_2 = hydrate_client.arc_resource(fetcher.clone(), keyer);
2257
2258                    // Wait for all 4 together, should still only have had 1 fetch.
2259                    let results = futures::future::join_all(
2260                        vec![
2261                            hydrate_resource_2,
2262                            ssr_resource_1,
2263                            ssr_resource_2,
2264                            hydrate_resource_1,
2265                        ]
2266                        .into_iter()
2267                        .map(|resource| async move { resource.await }),
2268                    )
2269                    .await
2270                    .into_iter()
2271                    .collect::<Vec<_>>();
2272
2273                    assert_eq!(results, vec![2, 2, 2, 2]);
2274                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2275
2276                    tick!();
2277
2278                    // This didn't have a matching backend one so should be using the populated cache and still not fetch:
2279                    assert_eq!(hydrate_client.arc_resource(fetcher.clone(), keyer).await, 2);
2280                    assert_eq!(
2281                        fetch_calls.load(Ordering::Relaxed),
2282                        1,
2283                        "{maybe_sleep_ms:?}ms"
2284                    );
2285                }
2286            })
2287            .await;
2288    }
2289}