leptos_fetch/
lib.rs

1#![allow(clippy::module_inception)]
2#![allow(clippy::type_complexity)]
3#![allow(clippy::too_many_arguments)]
4#![warn(clippy::disallowed_types)]
5#![warn(missing_docs)]
6#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
7// When docs auto created for docs.rs, will include features, given docs.rs uses nightly by default:
8#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]
9
10mod arc_local_signal;
11mod cache;
12mod cache_scope;
13mod debug_if_devtools_enabled;
14#[cfg(any(
15    all(debug_assertions, feature = "devtools"),
16    feature = "devtools-always"
17))]
18mod events;
19mod maybe_local;
20mod query;
21mod query_client;
22mod query_maybe_key;
23mod query_options;
24mod query_scope;
25mod resource_drop_guard;
26#[cfg(any(
27    all(debug_assertions, feature = "devtools"),
28    feature = "devtools-always"
29))]
30mod subs_client;
31mod subs_scope;
32mod trie;
33mod utils;
34mod value_with_callbacks;
35
36#[cfg(any(feature = "devtools", feature = "devtools-always"))]
37mod dev_tools;
38#[cfg(any(feature = "devtools", feature = "devtools-always"))]
39pub use dev_tools::QueryDevtools;
40
41pub use arc_local_signal::*;
42pub use query_client::*;
43pub use query_options::*;
44pub use query_scope::{QueryScope, QueryScopeLocal};
45
46#[cfg(test)]
47mod test {
48    use std::{
49        fmt::Debug,
50        marker::PhantomData,
51        ptr::NonNull,
52        sync::{
53            Arc,
54            atomic::{AtomicBool, AtomicUsize, Ordering},
55        },
56    };
57
58    use hydration_context::{
59        PinnedFuture, PinnedStream, SerializedDataId, SharedContext, SsrSharedContext,
60    };
61
62    use any_spawner::Executor;
63    use leptos::{error::ErrorId, prelude::*};
64
65    use rstest::*;
66
67    use crate::utils::OnDrop;
68
69    use super::*;
70
71    pub struct MockHydrateSharedContext {
72        id: AtomicUsize,
73        is_hydrating: AtomicBool,
74        during_hydration: AtomicBool,
75
76        // CUSTOM_TO_MOCK:
77
78        // errors: LazyLock<Vec<(SerializedDataId, ErrorId, Error)>>,
79        // incomplete: LazyLock<Vec<SerializedDataId>>,
80        resolved_resources: Vec<(SerializedDataId, String)>,
81    }
82
83    impl MockHydrateSharedContext {
84        async fn new(ssr_ctx: Option<&SsrSharedContext>) -> Self {
85            Self {
86                id: AtomicUsize::new(0),
87                is_hydrating: AtomicBool::new(true),
88                during_hydration: AtomicBool::new(true),
89                // errors: LazyLock::new(serialized_errors),
90                // incomplete: Lazy::new(incomplete_chunks),
91                resolved_resources: if let Some(ssr_ctx) = ssr_ctx {
92                    ssr_ctx.consume_buffers().await
93                } else {
94                    vec![]
95                },
96            }
97        }
98    }
99
100    impl Debug for MockHydrateSharedContext {
101        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102            f.debug_struct("MockHydrateSharedContext").finish()
103        }
104    }
105
106    impl SharedContext for MockHydrateSharedContext {
107        fn is_browser(&self) -> bool {
108            true
109        }
110
111        fn next_id(&self) -> SerializedDataId {
112            let id = self.id.fetch_add(1, Ordering::Relaxed);
113            SerializedDataId::new(id)
114        }
115
116        fn write_async(&self, _id: SerializedDataId, _fut: PinnedFuture<String>) {}
117
118        fn read_data(&self, id: &SerializedDataId) -> Option<String> {
119            self.resolved_resources
120                .get(id.clone().into_inner())
121                .map(|(_, data)| data.to_string())
122        }
123
124        fn await_data(&self, _id: &SerializedDataId) -> Option<String> {
125            todo!()
126        }
127
128        fn pending_data(&self) -> Option<PinnedStream<String>> {
129            None
130        }
131
132        fn during_hydration(&self) -> bool {
133            self.during_hydration.load(Ordering::Relaxed)
134        }
135
136        fn hydration_complete(&self) {
137            self.during_hydration.store(false, Ordering::Relaxed)
138        }
139
140        fn get_is_hydrating(&self) -> bool {
141            self.is_hydrating.load(Ordering::Relaxed)
142        }
143
144        fn set_is_hydrating(&self, is_hydrating: bool) {
145            self.is_hydrating.store(is_hydrating, Ordering::Relaxed)
146        }
147
148        fn errors(&self, _boundary_id: &SerializedDataId) -> Vec<(ErrorId, leptos::error::Error)> {
149            vec![]
150            // self.errors
151            //     .iter()
152            //     .filter_map(|(boundary, id, error)| {
153            //         if boundary == boundary_id {
154            //             Some((id.clone(), error.clone()))
155            //         } else {
156            //             None
157            //         }
158            //     })
159            //     .collect()
160        }
161
162        #[inline(always)]
163        fn register_error(
164            &self,
165            _error_boundary: SerializedDataId,
166            _error_id: ErrorId,
167            _error: leptos::error::Error,
168        ) {
169        }
170
171        #[inline(always)]
172        fn seal_errors(&self, _boundary_id: &SerializedDataId) {}
173
174        fn take_errors(&self) -> Vec<(SerializedDataId, ErrorId, leptos::error::Error)> {
175            // self.errors.clone()
176            vec![]
177        }
178
179        #[inline(always)]
180        fn defer_stream(&self, _wait_for: PinnedFuture<()>) {}
181
182        #[inline(always)]
183        fn await_deferred(&self) -> Option<PinnedFuture<()>> {
184            None
185        }
186
187        #[inline(always)]
188        fn set_incomplete_chunk(&self, _id: SerializedDataId) {}
189
190        fn get_incomplete_chunk(&self, _id: &SerializedDataId) -> bool {
191            // self.incomplete.iter().any(|entry| entry == id)
192            false
193        }
194    }
195
196    macro_rules! prep_server {
197        () => {{
198            _ = Executor::init_tokio();
199            let ssr_ctx = Arc::new(SsrSharedContext::new());
200            let owner = Owner::new_root(Some(ssr_ctx.clone()));
201            owner.set();
202            let client = QueryClient::new();
203            (client, ssr_ctx, owner)
204        }};
205    }
206
207    macro_rules! prep_client {
208        () => {{
209            _ = Executor::init_tokio();
210            let owner = Owner::new_root(Some(Arc::new(MockHydrateSharedContext::new(None).await)));
211            owner.set();
212            let client = QueryClient::new();
213            (client, owner)
214        }};
215        ($ssr_ctx:expr) => {{
216            _ = Executor::init_tokio();
217            let owner = Owner::new_root(Some(Arc::new(
218                MockHydrateSharedContext::new(Some(&$ssr_ctx)).await,
219            )));
220            owner.set();
221            let client = QueryClient::new();
222            (client, owner)
223        }};
224    }
225
226    macro_rules! prep_vari {
227        ($server:expr) => {
228            if $server {
229                let (client, ssr_ctx, owner) = prep_server!();
230                (client, Some(ssr_ctx), owner)
231            } else {
232                let (client, owner) = prep_client!();
233                (client, None, owner)
234            }
235        };
236    }
237
238    macro_rules! tick {
239        () => {
240            // Executor::poll_local();
241            // futures::executor::block_on(Executor::tick());
242            Executor::tick().await;
243            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
244        };
245    }
246
247    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
248    enum ResourceType {
249        Local,
250        Normal,
251        Blocking,
252    }
253
254    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
255    enum InvalidationType {
256        Query,
257        Scope,
258        Predicate,
259        All,
260    }
261
262    macro_rules! vari_new_resource_with_cb {
263        ($cb:ident, $client:expr, $fetcher:expr, $keyer:expr, $resource_type:expr, $arc:expr) => {
264            match ($resource_type, $arc) {
265                (ResourceType::Local, true) => {
266                    $cb!(|| $client.arc_local_resource($fetcher, $keyer))
267                }
268                (ResourceType::Local, false) => {
269                    $cb!(|| $client.local_resource($fetcher, $keyer))
270                }
271                (ResourceType::Normal, true) => {
272                    $cb!(|| $client.arc_resource($fetcher, $keyer))
273                }
274                (ResourceType::Normal, false) => {
275                    $cb!(|| $client.resource($fetcher, $keyer))
276                }
277                (ResourceType::Blocking, true) => {
278                    $cb!(|| $client.arc_resource_blocking($fetcher, $keyer))
279                }
280                (ResourceType::Blocking, false) => {
281                    $cb!(|| $client.resource_blocking($fetcher, $keyer))
282                }
283            }
284        };
285    }
286
287    const DEFAULT_FETCHER_MS: u64 = 30;
288    fn default_fetcher() -> (QueryScope<u64, u64>, Arc<AtomicUsize>) {
289        let fetch_calls = Arc::new(AtomicUsize::new(0));
290        let fetcher_src = {
291            let fetch_calls = fetch_calls.clone();
292            move |key: u64| {
293                let fetch_calls = fetch_calls.clone();
294                async move {
295                    tokio::time::sleep(tokio::time::Duration::from_millis(DEFAULT_FETCHER_MS))
296                        .await;
297                    fetch_calls.fetch_add(1, Ordering::Relaxed);
298                    key * 2
299                }
300            }
301        };
302        (QueryScope::new(fetcher_src), fetch_calls)
303    }
304
305    fn identify_parking_lot_deadlocks() {
306        static ONCE: std::sync::Once = std::sync::Once::new();
307        ONCE.call_once(|| {
308            std::thread::spawn(move || {
309                loop {
310                    std::thread::sleep(std::time::Duration::from_secs(5));
311                    let deadlocks = parking_lot::deadlock::check_deadlock();
312                    if deadlocks.is_empty() {
313                        continue;
314                    }
315
316                    println!("{} deadlocks detected", deadlocks.len());
317                    for (i, threads) in deadlocks.iter().enumerate() {
318                        println!("Deadlock #{}", i);
319                        for t in threads {
320                            println!("Thread Id {:#?}", t.thread_id());
321                            println!("{:#?}", t.backtrace());
322                        }
323                    }
324                }
325            });
326        });
327    }
328
329    #[rstest]
330    #[tokio::test]
331    async fn test_codecs(
332        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
333        #[values(false, true)] arc: bool,
334        #[values(false, true)] server_ctx: bool,
335    ) {
336        identify_parking_lot_deadlocks();
337        tokio::task::LocalSet::new()
338            .run_until(async move {
339                let (fetcher, _fetch_calls) = default_fetcher();
340
341                let (client_default, _guard, _owner) = prep_vari!(server_ctx);
342                let client_custom =
343                    QueryClient::new().set_codec::<codee::binary::FromToBytesCodec>();
344                use_context::<QueryClient>();
345                use_context::<QueryClient<codee::binary::FromToBytesCodec>>();
346
347                macro_rules! check {
348                    ($get_resource:expr) => {{
349                        let resource = $get_resource();
350                        // On the server cannot actually run local resources:
351                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
352                            assert_eq!(resource.await, 4);
353                        }
354                    }};
355                }
356
357                vari_new_resource_with_cb!(
358                    check,
359                    client_default,
360                    fetcher.clone(),
361                    move || 2,
362                    resource_type,
363                    arc
364                );
365                vari_new_resource_with_cb!(
366                    check,
367                    client_custom,
368                    fetcher.clone(),
369                    move || 2,
370                    resource_type,
371                    arc
372                );
373            })
374            .await;
375    }
376
377    #[rstest]
378    #[tokio::test]
379    async fn test_no_query_args() {
380        identify_parking_lot_deadlocks();
381        tokio::task::LocalSet::new()
382            .run_until(async move {
383                let (client, _guard, _owner) = prep_vari!(false);
384
385                async fn fn_no_arg() -> &'static str {
386                    "no_arg"
387                }
388
389                async fn fn_with_arg(arg: &'static str) -> &'static str {
390                    arg
391                }
392
393                assert_eq!(client.fetch_query(fn_no_arg, ()).await, "no_arg");
394                assert_eq!(
395                    client.fetch_query(fn_with_arg, "with_arg").await,
396                    "with_arg"
397                );
398
399                assert_eq!(
400                    client.fetch_query(QueryScope::new(fn_no_arg), ()).await,
401                    "no_arg"
402                );
403                assert_eq!(
404                    client
405                        .fetch_query(QueryScope::new(fn_with_arg), "with_arg")
406                        .await,
407                    "with_arg"
408                );
409
410                assert_eq!(
411                    client
412                        .fetch_query_local(QueryScopeLocal::new(fn_no_arg), ())
413                        .await,
414                    "no_arg"
415                );
416                assert_eq!(
417                    client
418                        .fetch_query_local(QueryScopeLocal::new(fn_with_arg), "with_arg")
419                        .await,
420                    "with_arg"
421                );
422            })
423            .await;
424    }
425
426    /// Local and non-local values should externally be seen as the same cache.
427    /// On the same thread they should both use the cached value.
428    /// On a different thread, locally cached values shouldn't panic, should just be treated like they don't exist.
429    #[rstest]
430    #[tokio::test]
431    async fn test_shared_cache() {
432        identify_parking_lot_deadlocks();
433        tokio::task::LocalSet::new()
434            .run_until(async move {
435                let (fetcher, _fetch_calls) = default_fetcher();
436                let (client, _guard, _owner) = prep_vari!(false);
437
438                // Locally set value to 1:
439                client.set_query_local(&fetcher, 2, 1);
440                assert_eq!(client.get_cached_query(&fetcher, 2), Some(1));
441
442                // Try and get from a different thread, shouldn't try and touch the local cache, should say uncached:
443                std::thread::spawn({
444                    let fetcher = fetcher.clone();
445                    move || {
446                        tokio::runtime::Builder::new_current_thread()
447                            .build()
448                            .unwrap()
449                            .block_on(async move {
450                                // Should be seen as uncached:
451                                assert_eq!(client.get_cached_query(&fetcher, 2), None);
452
453                                // Set nonlocally to 3, set nonlocally to 2:
454                                client.set_query(&fetcher, 2, 3);
455                                client.set_query_local(&fetcher, 2, 2);
456                            });
457                    }
458                })
459                .join()
460                .unwrap();
461                // Should ignore the local value from the different thread, and get the nonlocal value of 3:
462                assert_eq!(client.get_cached_query(&fetcher, 2), Some(3));
463
464                // A clone of the fetcher should still be seen as the same cache:
465                let fetcher = fetcher.clone();
466                assert_eq!(client.get_cached_query(&fetcher, 2), Some(3));
467
468                // Likewise with the same closure passed into a new scope:
469                let (fetcher_2, _fetcher_2_calls) = default_fetcher();
470                assert_eq!(client.get_cached_query(&fetcher_2, 2), Some(3));
471
472                // But a new closure should be seen as a new cache:
473                let fetcher = QueryScope::new(move |key| {
474                    let fetcher = fetcher.clone();
475                    async move { query_scope::QueryScopeTrait::query(&fetcher, key).await }
476                });
477                assert_eq!(client.get_cached_query(&fetcher, 2), None);
478            })
479            .await;
480    }
481
482    // Good example test, nothing new in here though:
483    #[rstest]
484    #[tokio::test]
485    async fn test_infinite() {
486        identify_parking_lot_deadlocks();
487
488        // ssr won't load local_resources, which is what we're testing with to avoid needing serde impls.
489        #[cfg(not(feature = "ssr"))]
490        tokio::task::LocalSet::new()
491            .run_until(async move {
492                #[derive(Clone, Debug, PartialEq, Eq)]
493                struct InfiniteItem(usize);
494
495                #[derive(Clone, Debug, PartialEq, Eq)]
496                struct InfiniteList {
497                    items: Vec<InfiniteItem>,
498                    offset: usize,
499                    more_available: bool,
500                }
501
502                async fn get_list_items(offset: usize) -> Vec<InfiniteItem> {
503                    (offset..offset + 10).map(InfiniteItem).collect()
504                }
505
506                async fn get_list_query(_key: ()) -> InfiniteList {
507                    let items = get_list_items(0).await;
508                    InfiniteList {
509                        offset: items.len(),
510                        more_available: !items.is_empty(),
511                        items,
512                    }
513                }
514
515                let (client, _guard, _owner) = prep_vari!(false);
516
517                // Initialise the query with the first load.
518                // we're not using a reactive key here for extending the list, but declarative updates instead.
519                let resource = client.local_resource(get_list_query, || ());
520                assert_eq!(
521                    resource.await,
522                    InfiniteList {
523                        items: (0..10).map(InfiniteItem).collect::<Vec<_>>(),
524                        offset: 10,
525                        more_available: true
526                    }
527                );
528
529                // When wanting to load more items, update_query_async can be called declaratively to update the cached item:
530                client
531                    .update_query_async(get_list_query, (), async |last| {
532                        if last.more_available {
533                            let next_items = get_list_items(last.offset).await;
534                            last.offset += next_items.len();
535                            last.more_available = !next_items.is_empty();
536                            last.items.extend(next_items);
537                        }
538                    })
539                    .await;
540
541                // Should've been updated in place:
542                assert_eq!(
543                    client.get_cached_query(get_list_query, ()),
544                    Some(InfiniteList {
545                        items: (0..20).map(InfiniteItem).collect::<Vec<_>>(),
546                        offset: 20,
547                        more_available: true
548                    })
549                );
550            })
551            .await;
552    }
553
554    /// prefetch_query
555    /// prefetch_query_local
556    /// fetch_query
557    /// fetch_query_local
558    /// update_query
559    /// query_exists
560    /// update_query_async
561    #[rstest]
562    #[tokio::test]
563    async fn test_declaratives() {
564        identify_parking_lot_deadlocks();
565        tokio::task::LocalSet::new()
566            .run_until(async move {
567                let (fetcher, _fetch_calls) = default_fetcher();
568                let (client, _guard, _owner) = prep_vari!(false);
569
570                let key = 1;
571
572                // Want to confirm by default everything triggers updates, but when using `.untrack_update_query()` magic fn for those applicable it doesn't.
573                let value_sub_react_count = Arc::new(AtomicUsize::new(0));
574                let value_sub = client.subscribe_value(&fetcher, move || key);
575                Effect::new_isomorphic({
576                    let value_sub_react_count = value_sub_react_count.clone();
577                    move || {
578                        value_sub.get();
579                        value_sub_react_count.fetch_add(1, Ordering::Relaxed);
580                    }
581                });
582
583                macro_rules! maybe_reacts {
584                    ($reacts:expr, $block:expr) => {{
585                        tick!();
586                        let before = value_sub_react_count.load(Ordering::Relaxed);
587                        let result = $block;
588                        tick!();
589                        let after = value_sub_react_count.load(Ordering::Relaxed);
590                        if $reacts {
591                            assert_eq!(
592                                after,
593                                before + 1,
594                                "{} != {}, didn't react like expected",
595                                after,
596                                before
597                            );
598                        } else {
599                            assert_eq!(
600                                after, before,
601                                "{} != {}, reacted when it shouldn't",
602                                after, before
603                            );
604                        }
605                        result
606                    }};
607                }
608
609                assert!(!client.query_exists(&fetcher, key));
610                client.set_query_local(&fetcher, key, 1);
611                assert_eq!(client.get_cached_query(&fetcher, key), Some(1));
612
613                maybe_reacts!(
614                    true,
615                    assert!(client.update_query(&fetcher, key, |value| {
616                        value
617                            .map(|v| {
618                                *v = 2;
619                                true
620                            })
621                            .unwrap_or(false)
622                    }))
623                );
624
625                assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
626
627                maybe_reacts!(true, client.set_query(&fetcher, key, 3));
628
629                assert_eq!(client.get_cached_query(&fetcher, key), Some(3));
630
631                maybe_reacts!(
632                    true,
633                    assert!(client.update_query(&fetcher, key, |value| {
634                        value
635                            .map(|v| {
636                                *v *= 2;
637                                true
638                            })
639                            .unwrap_or(false)
640                    }))
641                );
642
643                // Noop would react, but not if client.untrack_update_query() is used:
644                maybe_reacts!(true, client.update_query(&fetcher, key, |_value| {}));
645                maybe_reacts!(
646                    false,
647                    client.update_query(&fetcher, key, |_value| { client.untrack_update_query() })
648                );
649
650                assert_eq!(client.get_cached_query(&fetcher, key), Some(6));
651                assert!(client.query_exists(&fetcher, key));
652
653                assert!(client.clear_query(&fetcher, key));
654                assert!(!client.query_exists(&fetcher, key));
655
656                maybe_reacts!(true, client.prefetch_query_local(&fetcher, key).await);
657
658                assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
659                client.clear();
660                assert_eq!(client.size(), 0);
661                maybe_reacts!(true, client.prefetch_query(&fetcher, key).await);
662                assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
663
664                assert!(client.clear_query(&fetcher, key));
665                assert!(!client.query_exists(&fetcher, key));
666                maybe_reacts!(
667                    true,
668                    assert_eq!(client.fetch_query_local(&fetcher, key).await, 2)
669                );
670                assert!(client.query_exists(&fetcher, key));
671                client.clear();
672                assert_eq!(client.size(), 0);
673                maybe_reacts!(true, assert_eq!(client.fetch_query(&fetcher, key).await, 2));
674
675                // update_query_async/update_query_async_local:
676                maybe_reacts!(
677                    true,
678                    assert_eq!(
679                        client
680                            .update_query_async(&fetcher, key, async |value| {
681                                *value += 1;
682                                *value
683                            })
684                            .await,
685                        3
686                    )
687                );
688                assert_eq!(client.get_cached_query(&fetcher, key), Some(3));
689                // Noop would react, but not if client.untrack_update_query() is used:
690                maybe_reacts!(
691                    true,
692                    client
693                        .update_query_async(&fetcher, key, async |_value| {})
694                        .await
695                );
696                maybe_reacts!(
697                    false,
698                    client
699                        .update_query_async(&fetcher, key, async |_value| {
700                            client.untrack_update_query()
701                        })
702                        .await
703                );
704
705                maybe_reacts!(
706                    true,
707                    assert_eq!(
708                        client
709                            .update_query_async_local(&fetcher, key, async |value| {
710                                *value += 1;
711                                *value
712                            })
713                            .await,
714                        4
715                    )
716                );
717                assert_eq!(client.get_cached_query(&fetcher, key), Some(4));
718                // Noop would react, but not if client.untrack_update_query() is used:
719                maybe_reacts!(
720                    true,
721                    client
722                        .update_query_async_local(&fetcher, key, async |_value| {})
723                        .await
724                );
725                maybe_reacts!(
726                    false,
727                    client
728                        .update_query_async_local(&fetcher, key, async |_value| {
729                            client.untrack_update_query()
730                        })
731                        .await
732                );
733
734                // is_fetching should be true throughout the whole lifetime of update_query_async, even the external async section:
735                let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), move || key);
736                assert!(!is_fetching.get_untracked());
737                tokio::join!(
738                    async {
739                        assert_eq!(
740                            client
741                                .update_query_async(&fetcher, key, async |value| {
742                                    tokio::time::sleep(tokio::time::Duration::from_millis(30))
743                                        .await;
744                                    *value += 1;
745                                    *value
746                                })
747                                .await,
748                            5
749                        );
750                    },
751                    async {
752                        let elapsed = std::time::Instant::now();
753                        tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
754                        tick!();
755                        while elapsed.elapsed().as_millis() < 25 {
756                            assert!(is_fetching.get_untracked());
757                            tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
758                        }
759                    }
760                );
761                // To make sure finished
762                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
763                assert!(!is_fetching.get_untracked());
764            })
765            .await;
766    }
767
768    /// Make sure refetching works at the expected time, and only does so once there are active resources using it.
769    #[rstest]
770    #[tokio::test]
771    async fn test_refetch(
772        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
773        #[values(false, true)] arc: bool,
774    ) {
775        identify_parking_lot_deadlocks();
776        tokio::task::LocalSet::new()
777            .run_until(async move {
778                const REFETCH_TIME_MS: u64 = 100;
779                const FETCH_TIME_MS: u64 = 10;
780
781                let fetch_calls = Arc::new(AtomicUsize::new(0));
782                let fetcher = {
783                    let fetch_calls = fetch_calls.clone();
784                    move |key: u64| {
785                        fetch_calls.fetch_add(1, Ordering::Relaxed);
786                        async move {
787                            tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_TIME_MS)).await;
788                            key * 2
789                        }
790                    }
791                };
792                let fetcher = QueryScope::new(
793                    fetcher
794                ).with_options(QueryOptions::new().with_refetch_interval(std::time::Duration::from_millis(REFETCH_TIME_MS)));
795
796                let (client, _guard, owner) = prep_vari!(false);
797
798                macro_rules! with_tmp_owner {
799                    ($body:block) => {{
800                        let tmp_owner = owner.child();
801                        tmp_owner.set();
802                        let result = $body;
803                        tmp_owner.unset();
804                        owner.set();
805                        result
806                    }};
807                }
808
809                macro_rules! check {
810                    ($get_resource:expr) => {{
811
812                        // On the server cannot actually run local resources:
813                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
814
815                            // Initial caching:
816                            with_tmp_owner! {{
817                                assert_eq!($get_resource().await, 4);
818                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
819                                assert_eq!(client.size(), 1);
820
821                                // less than refetch_time shouldn't have recalled:
822                                assert_eq!($get_resource().await, 4);
823                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
824                                assert_eq!(client.size(), 1);
825                            }}
826
827                            // hit refetch time with no active resources shouldn't have refetched:
828                            tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
829                            tick!();
830                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
831
832                            // hit refetch_time when active resource should refetch:
833                            with_tmp_owner! {{
834                                // Because the refetch call would've still invalidated the value, the new resource should trigger the refetch automatically:
835                                let _resource = $get_resource();
836                                tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_TIME_MS)).await;
837                                tick!();
838                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
839
840                                // There's an active resource now, so this should trigger again without needing to touch the resources:
841                                tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
842                                tick!();
843
844                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
845                                assert_eq!($get_resource().await, 4);
846                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
847
848                                // Run again to make sure:
849                                tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
850                                tick!();
851
852                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
853                                assert_eq!($get_resource().await, 4);
854                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
855                            }}
856
857                            // Should stop refetching once all resources are dropped:
858                            tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
859                            tick!();
860                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
861                        }
862                    }};
863                }
864
865                vari_new_resource_with_cb!(
866                    check,
867                    client,
868                    fetcher.clone(),
869                    || 2,
870                    resource_type,
871                    arc
872                );
873            })
874            .await;
875    }
876
877    /// Leptos sometimes breaks drop semantics for local/nonlocal signals,
878    /// this was the testcase that catches it and is simpler to identify than inside other tests.
879    #[rstest]
880    #[tokio::test]
881    async fn test_drop_semantics(#[values(false, true)] local: bool) {
882        tokio::task::LocalSet::new()
883            .run_until(async move {
884                let owner = Owner::default();
885
886                macro_rules! with_tmp_owner {
887                    ($body:block) => {{
888                        let tmp_owner = owner.child();
889                        tmp_owner.set();
890                        let result = $body;
891                        tmp_owner.unset();
892                        owner.set();
893                        result
894                    }};
895                }
896
897                let dropped = with_tmp_owner! {{
898                    let dropped = Arc::new(AtomicBool::new(false));
899                    let on_drop = Arc::new(OnDrop::new({
900                        let dropped = dropped.clone();
901                        move || {
902                            dropped.store(true, Ordering::Relaxed);
903                    }}));
904                    if local {
905                        ArenaItem::<_, SyncStorage>::new_with_storage(ArcAsyncDerived::new_unsync(
906                            move || {
907                                let _on_drop = on_drop.clone();
908                                async move {
909                                }
910                            })
911                        );
912                    } else {
913                        ArenaItem::<_, SyncStorage>::new_with_storage(ArcAsyncDerived::new(
914                            move || {
915                                let _on_drop = on_drop.clone();
916                                async move {
917                                }
918                            })
919                        );
920                    }
921                    assert!(!dropped.load(Ordering::Relaxed));
922                    dropped
923                }};
924                tick!();
925                assert!(dropped.load(Ordering::Relaxed));
926            })
927            .await;
928    }
929
930    /// Make sure the cache is cleaned up at the expected time, and only do so once no resources are using it.
931    #[rstest]
932    #[tokio::test]
933    async fn test_gc(
934        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
935        #[values(false, true)] arc: bool,
936    ) {
937        identify_parking_lot_deadlocks();
938        tokio::task::LocalSet::new()
939        .run_until(async move {
940                const GC_TIME_MS: u64 = 30;
941
942                let fetch_calls = Arc::new(AtomicUsize::new(0));
943                let fetcher = {
944                    let fetch_calls = fetch_calls.clone();
945                    move |key: u64| {
946                        fetch_calls.fetch_add(1, Ordering::Relaxed);
947                        async move {
948                            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
949                            key * 2
950                        }
951                    }
952                };
953                let fetcher = QueryScope::new(
954                    fetcher
955                ).with_options(QueryOptions::new().with_gc_time(std::time::Duration::from_millis(GC_TIME_MS)));
956
957                let (client, _guard, owner) = prep_vari!(false);
958
959                macro_rules! with_tmp_owner {
960                    ($body:block) => {{
961                        let tmp_owner = owner.child();
962                        tmp_owner.set();
963                        let result = $body;
964                        tmp_owner.unset();
965                        owner.set();
966                        result
967                    }};
968                }
969
970                macro_rules! check {
971                    ($get_resource:expr) => {{
972                        let subscribed = client.subscribe_value(fetcher.clone(), move || 2);
973
974                        // On the server cannot actually run local resources:
975                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
976
977                            // Initial caching:
978                            with_tmp_owner! {{
979                                assert_eq!($get_resource().await, 4);
980                                assert_eq!(subscribed.get_untracked(), Some(4));
981                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
982                                assert_eq!(client.size(), 1);
983
984                                // < gc_time shouldn't have cleaned up:
985                                tick!();
986                                assert_eq!($get_resource().await, 4);
987                                assert_eq!(subscribed.get_untracked(), Some(4));
988                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
989                                assert_eq!(client.size(), 1);
990                            }}
991
992                            // all resources dropped when <gc_time shouldn't have cleaned up:
993                            with_tmp_owner! {{
994                                tick!();
995                                assert_eq!($get_resource().await, 4);
996                                assert_eq!(subscribed.get_untracked(), Some(4));
997                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
998                                assert_eq!(client.size(), 1);
999                            }}
1000
1001                            // >gc_time when active resource shouldn't have cleaned up:
1002                            with_tmp_owner! {{
1003                                let _resource = $get_resource();
1004
1005                                tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1006                                tick!();
1007
1008                                // >gc_time shouldn't cleanup because there's an active resource:
1009                                assert_eq!($get_resource().await, 4);
1010                                assert_eq!(subscribed.get_untracked(), Some(4));
1011                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1012                                assert_eq!(client.size(), 1);
1013                            }}
1014
1015                            // >gc_time and no resources should now have been cleaned up, causing a new fetch:
1016                            with_tmp_owner! {{
1017                                assert_eq!(client.size(), 1);
1018
1019                                assert_eq!(subscribed.get_untracked(), Some(4));
1020                                tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1021                                tick!();
1022                                assert_eq!(subscribed.get_untracked(), None);
1023
1024                                assert_eq!($get_resource().await, 4);
1025                                assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1026                            }}
1027
1028                            // Final cleanup:
1029                            tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1030                            tick!();
1031                            assert_eq!(client.size(), 0);
1032                            assert_eq!(subscribed.get_untracked(), None);
1033                        }
1034                    }};
1035                }
1036
1037                vari_new_resource_with_cb!(
1038                    check,
1039                    client,
1040                    fetcher.clone(),
1041                    || 2,
1042                    resource_type,
1043                    arc
1044                );
1045            })
1046            .await;
1047    }
1048
1049    /// Make sure !Send and !Sync values work with local resources.
1050    #[rstest]
1051    #[tokio::test]
1052    async fn test_unsync(#[values(false, true)] arc: bool) {
1053        identify_parking_lot_deadlocks();
1054        tokio::task::LocalSet::new()
1055            .run_until(async move {
1056                #[derive(Debug)]
1057                struct UnsyncValue(u64, PhantomData<NonNull<()>>);
1058                impl PartialEq for UnsyncValue {
1059                    fn eq(&self, other: &Self) -> bool {
1060                        self.0 == other.0
1061                    }
1062                }
1063                impl Eq for UnsyncValue {}
1064                impl Clone for UnsyncValue {
1065                    fn clone(&self) -> Self {
1066                        Self(self.0, PhantomData)
1067                    }
1068                }
1069                impl UnsyncValue {
1070                    fn new(value: u64) -> Self {
1071                        Self(value, PhantomData)
1072                    }
1073                }
1074
1075                let fetch_calls = Arc::new(AtomicUsize::new(0));
1076                let fetcher = {
1077                    let fetch_calls = fetch_calls.clone();
1078                    move |key: u64| {
1079                        fetch_calls.fetch_add(1, Ordering::Relaxed);
1080                        async move {
1081                            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1082                            UnsyncValue::new(key * 2)
1083                        }
1084                    }
1085                };
1086                let fetcher = QueryScopeLocal::new(fetcher);
1087
1088                let (client, _guard, _owner) = prep_vari!(false);
1089
1090                macro_rules! check {
1091                    ($get_resource:expr) => {{
1092                        let resource = $get_resource();
1093                        let subscribed = client.subscribe_value_local(fetcher.clone(), move || 2);
1094
1095                        // Should be None initially with the sync methods:
1096                        assert!(resource.get_untracked().is_none());
1097                        assert!(resource.try_get_untracked().unwrap().is_none());
1098                        assert!(resource.get().is_none());
1099                        assert!(resource.try_get().unwrap().is_none());
1100                        assert!(resource.read().is_none());
1101                        assert!(resource.try_read().as_deref().unwrap().is_none());
1102                        assert!(subscribed.get_untracked().is_none());
1103
1104                        // On the server cannot actually run local resources:
1105                        if cfg!(not(feature = "ssr")) {
1106                            assert_eq!(resource.await, UnsyncValue::new(4));
1107                            assert_eq!(subscribed.get_untracked(), Some(UnsyncValue::new(4)));
1108                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1109
1110                            tick!();
1111
1112                            assert_eq!($get_resource().await, UnsyncValue::new(4));
1113                            assert_eq!(subscribed.get_untracked(), Some(UnsyncValue::new(4)));
1114                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1115                        }
1116                    }};
1117                }
1118
1119                match arc {
1120                    true => {
1121                        check!(|| client.arc_local_resource(fetcher.clone(), || 2))
1122                    }
1123                    false => {
1124                        check!(|| client.local_resource(fetcher.clone(), || 2))
1125                    }
1126                }
1127            })
1128            .await;
1129    }
1130
1131    #[rstest]
1132    #[tokio::test]
1133    async fn test_subscribe_is_fetching_and_loading(
1134        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1135        #[values(false, true)] arc: bool,
1136        #[values(false, true)] server_ctx: bool,
1137    ) {
1138        identify_parking_lot_deadlocks();
1139        tokio::task::LocalSet::new()
1140            .run_until(async move {
1141                let (fetcher, fetch_calls) = default_fetcher();
1142                let (client, _guard, _owner) = prep_vari!(server_ctx);
1143
1144                macro_rules! check {
1145                    ($get_resource:expr) => {{
1146                        // On the server cannot actually run local resources:
1147                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1148                            assert_eq!(client.subscriber_count(), 0);
1149                            let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1150                            assert_eq!(is_fetching.get_untracked(), false);
1151                            assert_eq!(client.subscriber_count(), 1);
1152                            let is_fetching_clone = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1153                            assert_eq!(is_fetching_clone.get_untracked(), false);
1154                            // Should still be 1, clones reuse:
1155                            assert_eq!(client.subscriber_count(), 1);
1156                            let is_fetching_other = client.subscribe_is_fetching_arc(fetcher.clone(), || 3);
1157                            assert_eq!(is_fetching_other.get_untracked(), false);
1158                            assert_eq!(client.subscriber_count(), 2);
1159                            let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1160                            assert_eq!(is_loading.get_untracked(), false);
1161                            assert_eq!(client.subscriber_count(), 3);
1162                            let is_loading_other = client.subscribe_is_loading_arc(fetcher.clone(), || 3);
1163                            assert_eq!(is_loading_other.get_untracked(), false);
1164                            assert_eq!(client.subscriber_count(), 4);
1165
1166
1167                            macro_rules! check_all {
1168                                ($expected:expr) => {{
1169                                    assert_eq!(is_fetching.get_untracked(), $expected);
1170                                    assert_eq!(is_fetching_other.get_untracked(), $expected);
1171                                    assert_eq!(is_loading.get_untracked(), $expected);
1172                                    assert_eq!(is_loading_other.get_untracked(), $expected);
1173                                }};
1174                            }
1175
1176                            check_all!(false);
1177
1178                            tokio::join!(
1179                                async {
1180                                    assert_eq!($get_resource().await, 4);
1181                                },
1182                                async {
1183                                    let elapsed = std::time::Instant::now();
1184                                    tick!();
1185                                    while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1186                                        assert_eq!(is_fetching.get_untracked(), true);
1187                                        assert_eq!(is_fetching_other.get_untracked(), false);
1188                                        assert_eq!(is_loading.get_untracked(), true);
1189                                        assert_eq!(is_loading_other.get_untracked(), false);
1190                                        tick!();
1191                                    }
1192                                }
1193                            );
1194                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1195
1196                            check_all!(false);
1197
1198                            assert_eq!($get_resource().await, 4);
1199                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1200
1201                            check_all!(false);
1202
1203                            // Already in cache now, all should be false:
1204                            tokio::join!(
1205                                async {
1206                                    assert_eq!($get_resource().await, 4);
1207                                },
1208                                async {
1209                                    let elapsed = std::time::Instant::now();
1210                                    tick!();
1211                                    while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1212                                        check_all!(false);
1213                                        tick!();
1214                                    }
1215                                }
1216                            );
1217                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1218
1219                            client.invalidate_query(fetcher.clone(), &2);
1220
1221                            tokio::join!(
1222                                async {
1223                                    assert_eq!($get_resource().await, 4);
1224                                    // This should have returned the old value straight away, but the refetch will have been initiated in the background:
1225                                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1226                                    tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1227                                },
1228                                async {
1229                                    let elapsed = std::time::Instant::now();
1230                                    tick!();
1231                                    while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1232                                        assert_eq!(is_fetching.get_untracked(), true);
1233                                        assert_eq!(is_fetching_other.get_untracked(), false);
1234                                        // Loading should all be false as this is just a refetch now, 
1235                                        // the get_resource().await will actually return straight away, but it'll trigger the refetch.
1236                                        assert_eq!(is_loading.get_untracked(), false);
1237                                        assert_eq!(is_loading_other.get_untracked(), false);
1238                                        tick!();
1239                                    }
1240                                }
1241                            );
1242                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1243
1244                            drop(is_fetching);
1245                            // Will only drop once the clone is dropped too:
1246                            assert_eq!(client.subscriber_count(), 4);
1247                            drop(is_fetching_clone);
1248                            assert_eq!(client.subscriber_count(), 3);
1249                            drop(is_loading);
1250                            assert_eq!(client.subscriber_count(), 2);
1251                            drop(is_fetching_other);
1252                            assert_eq!(client.subscriber_count(), 1);
1253                            drop(is_loading_other);
1254                            assert_eq!(client.subscriber_count(), 0);
1255
1256                            client.clear();
1257                            assert_eq!(client.size(), 0);
1258
1259                            // Make sure subscriptions start in true state if in the middle of loading:
1260                            tokio::join!(
1261                                async {
1262                                    assert_eq!($get_resource().await, 4);
1263                                },
1264                                async {
1265                                    tick!();
1266                                    let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1267                                    let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1268                                    assert_eq!(is_fetching.get_untracked(), true);
1269                                    assert_eq!(is_loading.get_untracked(), true);
1270                                    assert_eq!(client.subscriber_count(), 2);
1271                                    tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1272                                    assert_eq!(is_fetching.get_untracked(), false);
1273                                    assert_eq!(is_loading.get_untracked(), false);
1274                                }
1275                            );
1276                            assert_eq!(client.subscriber_count(), 0);
1277
1278                            // Make sure refetch only too:
1279                            client.invalidate_query(fetcher.clone(), &2);
1280                            tokio::join!(
1281                                async {
1282                                    assert_eq!($get_resource().await, 4);
1283                                    tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1284                                },
1285                                async {
1286                                    tick!();
1287                                    let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1288                                    let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1289                                    assert_eq!(is_fetching.get_untracked(), true);
1290                                    assert_eq!(is_loading.get_untracked(), false);
1291                                    assert_eq!(client.subscriber_count(), 2);
1292                                    tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1293                                    assert_eq!(is_fetching.get_untracked(), false);
1294                                    assert_eq!(is_loading.get_untracked(), false);
1295                                }
1296                            );
1297                            assert_eq!(client.subscriber_count(), 0);
1298                            client.clear();
1299
1300                            // Now confirm the subscriber's keyer changes are respected, and subscriptions don't accidentally say true for the wrong key:
1301                            let sub_key_signal = RwSignal::new(2);
1302                            let resource_key_signal = RwSignal::new(2);
1303                            let is_fetching = client.subscribe_is_fetching(fetcher.clone(), move || sub_key_signal.get());
1304                            let is_loading = client.subscribe_is_loading(fetcher.clone(), move || sub_key_signal.get());
1305                            assert_eq!(is_fetching.get_untracked(), false);
1306                            assert_eq!(is_loading.get_untracked(), false);
1307
1308                            let _resource = client.resource(fetcher.clone(), move || resource_key_signal.get());
1309
1310                            // The creation of the resource should've triggered the initial fetch:
1311                            assert_eq!(is_fetching.get_untracked(), true);
1312                            assert_eq!(is_loading.get_untracked(), true);
1313                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1314                            assert_eq!(is_fetching.get_untracked(), false);
1315                            assert_eq!(is_loading.get_untracked(), false);
1316
1317                            // Sanity check confirming it won't refetch if the key hasn't actually changed:
1318                            sub_key_signal.set(2);
1319                            resource_key_signal.set(2);
1320                            tick!();
1321                            assert_eq!(is_fetching.get_untracked(), false);
1322                            assert_eq!(is_loading.get_untracked(), false);
1323
1324                            // New value should cause a fresh query, subscriber should match:
1325                            resource_key_signal.set(3);
1326                            sub_key_signal.set(3);
1327                            tick!();
1328                            assert_eq!(is_fetching.get_untracked(), true);
1329                            assert_eq!(is_loading.get_untracked(), true);
1330                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1331                            assert_eq!(is_fetching.get_untracked(), false);
1332                            assert_eq!(is_loading.get_untracked(), false);
1333                            assert_eq!(client.get_cached_query(fetcher.clone(), &3), Some(6));
1334
1335                            // Stale should still mean only is_fetching is true:
1336                            client.invalidate_query(fetcher.clone(), &3);
1337                            tick!();
1338                            assert_eq!(is_fetching.get_untracked(), true);
1339                            assert_eq!(is_loading.get_untracked(), false);
1340                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1341                            assert_eq!(is_fetching.get_untracked(), false);
1342                            assert_eq!(is_loading.get_untracked(), false);
1343
1344                            // If the resource diverges, subscriber shouldn't notice:
1345                            resource_key_signal.set(4);
1346                            tick!();
1347                            assert_eq!(is_fetching.get_untracked(), false);
1348                            assert_eq!(is_loading.get_untracked(), false);
1349                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1350                            assert_eq!(client.get_cached_query(fetcher.clone(), &4), Some(8));
1351
1352                            // Now confirm the keyer is actually reactive, and is able to trigger updates itself. 
1353                            // Do this by subscribing to is_fetching in an effect, and update the key in the keyer.
1354                            let last_is_fetching_value = Arc::new(parking_lot::Mutex::new(None));
1355                            Effect::new_isomorphic({
1356                                let last_is_fetching_value = last_is_fetching_value.clone();
1357                                move || {
1358                                    *last_is_fetching_value.lock() = Some(is_fetching.get());
1359                            }});
1360                            assert_eq!(*last_is_fetching_value.lock(), None);
1361                            tick!();
1362                            assert_eq!(*last_is_fetching_value.lock(), Some(false));
1363                            resource_key_signal.set(6);
1364                            sub_key_signal.set(6);
1365                            assert_eq!(*last_is_fetching_value.lock(), Some(false));
1366                            tick!();
1367                            assert_eq!(*last_is_fetching_value.lock(), Some(true));
1368                            assert_eq!(is_fetching.get_untracked(), true);
1369                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1370                            assert_eq!(*last_is_fetching_value.lock(), Some(false));
1371                            assert_eq!(is_fetching.get_untracked(), false);
1372                            assert_eq!(client.get_cached_query(fetcher.clone(), &6), Some(12));
1373                        }
1374                    }};
1375                }
1376
1377                vari_new_resource_with_cb!(
1378                    check,
1379                    client,
1380                    fetcher.clone(),
1381                    || 2,
1382                    resource_type,
1383                    arc
1384                );
1385            })
1386            .await;
1387    }
1388
1389    /// Make sure resources reload when queries invalidated correctly.
1390    #[rstest]
1391    #[tokio::test]
1392    async fn test_optional_key(
1393        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1394        #[values(false, true)] arc: bool,
1395        #[values(false, true)] server_ctx: bool,
1396    ) {
1397        identify_parking_lot_deadlocks();
1398        tokio::task::LocalSet::new()
1399            .run_until(async move {
1400                let (fetcher, fetch_calls) = default_fetcher();
1401                let (client, _guard, _owner) = prep_vari!(server_ctx);
1402
1403                let key_value = RwSignal::new(None);
1404                let keyer = move || key_value.get();
1405
1406                macro_rules! check {
1407                    ($get_resource:expr) => {{
1408                        let resource = $get_resource();
1409                        assert_eq!(resource.get_untracked().flatten(), None);
1410                        assert_eq!(resource.try_get_untracked().flatten().flatten(), None);
1411
1412                        // On the server cannot actually run local resources:
1413                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1414                            assert_eq!($get_resource().await, None);
1415
1416                            let sub_is_loading =
1417                                client.subscribe_is_loading(fetcher.clone(), keyer);
1418                            let sub_is_fetching =
1419                                client.subscribe_is_fetching(fetcher.clone(), keyer);
1420                            let sub_value = client.subscribe_value(fetcher.clone(), keyer);
1421
1422                            assert_eq!(sub_is_loading.get_untracked(), false);
1423                            assert_eq!(sub_is_fetching.get_untracked(), false);
1424                            assert_eq!(sub_value.get_untracked(), None);
1425
1426                            key_value.set(Some(2));
1427                            tick!();
1428
1429                            assert_eq!(sub_is_loading.get_untracked(), true);
1430                            assert_eq!(sub_is_fetching.get_untracked(), true);
1431
1432                            assert_eq!($get_resource().await, Some(4));
1433                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1434                            assert_eq!(sub_value.get_untracked(), Some(4));
1435                            assert_eq!(sub_is_loading.get_untracked(), false);
1436                            assert_eq!(sub_is_fetching.get_untracked(), false);
1437                        }
1438                    }};
1439                }
1440
1441                vari_new_resource_with_cb!(
1442                    check,
1443                    client,
1444                    fetcher.clone(),
1445                    keyer,
1446                    resource_type,
1447                    arc
1448                );
1449            })
1450            .await;
1451    }
1452
1453    /// Make sure resources reload when queries invalidated correctly.
1454    #[rstest]
1455    #[tokio::test]
1456    async fn test_invalidation(
1457        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1458        #[values(false, true)] arc: bool,
1459        #[values(false, true)] server_ctx: bool,
1460        #[values(
1461            InvalidationType::Query,
1462            InvalidationType::Scope,
1463            InvalidationType::Predicate,
1464            InvalidationType::All
1465        )]
1466        invalidation_type: InvalidationType,
1467    ) {
1468        identify_parking_lot_deadlocks();
1469        tokio::task::LocalSet::new()
1470            .run_until(async move {
1471                let (fetcher, fetch_calls) = default_fetcher();
1472                let (client, _guard, _owner) = prep_vari!(server_ctx);
1473
1474                macro_rules! check {
1475                    ($get_resource:expr) => {{
1476                        let resource = $get_resource();
1477
1478                        // Should be None initially with the sync methods:
1479                        assert!(resource.get_untracked().is_none());
1480                        assert!(resource.try_get_untracked().unwrap().is_none());
1481                        assert!(resource.get().is_none());
1482                        assert!(resource.try_get().unwrap().is_none());
1483                        assert!(resource.read().is_none());
1484                        assert!(resource.try_read().as_deref().unwrap().is_none());
1485
1486                        // On the server cannot actually run local resources:
1487                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1488                            assert_eq!(resource.await, 4);
1489                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1490
1491                            tick!();
1492
1493                            // Shouldn't change despite ticking:
1494                            assert_eq!($get_resource().await, 4);
1495                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1496
1497                            match invalidation_type {
1498                                InvalidationType::Query => {
1499                                    client.invalidate_query(fetcher.clone(), &2);
1500                                }
1501                                InvalidationType::Scope => {
1502                                    client.invalidate_query_scope(fetcher.clone());
1503                                }
1504                                InvalidationType::Predicate => {
1505                                    client.invalidate_queries_with_predicate(fetcher.clone(), |key| key == &2);
1506                                }
1507                                InvalidationType::All => {
1508                                    client.invalidate_all_queries();
1509                                }
1510                            }
1511
1512                            // Because it should now be stale, not gc'd,
1513                            // sync fns on a new resource instance should still return the value, it just means a background refresh has been triggered:
1514                            let resource2 = $get_resource();
1515                            tick!();
1516                            assert_eq!(resource2.get_untracked(), Some(4));
1517                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1518
1519                            // Because the resource should've been auto invalidated, a tick should cause it to auto refetch:
1520                            tick!();
1521                            tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1522                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1523                            assert_eq!($get_resource().await, 4);
1524                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1525                        }
1526                    }};
1527                }
1528
1529                vari_new_resource_with_cb!(
1530                    check,
1531                    client,
1532                    fetcher.clone(),
1533                    || 2,
1534                    resource_type,
1535                    arc
1536                );
1537            })
1538            .await;
1539    }
1540
1541    #[rstest]
1542    #[tokio::test]
1543    async fn test_invalidation_hierarchy(#[values(false, true)] server_ctx: bool) {
1544        identify_parking_lot_deadlocks();
1545        tokio::task::LocalSet::new()
1546            .run_until(async move {
1547                let (fetcher, _fetch_calls) = default_fetcher();
1548                let (client, _guard, _owner) = prep_vari!(server_ctx);
1549
1550                // If this is invalidated, it should also invalidate the child resource.
1551                let fetcher = fetcher.with_invalidation_link(|_key| vec!["base", "users"]);
1552                client.fetch_query(&fetcher, &2).await;
1553                assert!(!client.is_key_invalid(&fetcher, &2));
1554
1555                // If this is invalidated, it'll invalidate both the main fetcher and the child resource.
1556                let hierarchy_parent_scope =
1557                    QueryScope::new(async || ()).with_invalidation_link(|_k| ["base"]);
1558                client
1559                    .fetch_query(hierarchy_parent_scope.clone(), &())
1560                    .await;
1561                assert!(!client.is_key_invalid(&hierarchy_parent_scope, &()));
1562
1563                // If this is invalidated, it shouldn't affect either of the others, as it's the lowest in the invalidation hierarchy.
1564                let hierarchy_child_scope = QueryScope::new(async |user_id| user_id)
1565                    .with_invalidation_link(|user_id: &usize| {
1566                        ["base".to_string(), "users".to_string(), user_id.to_string()]
1567                    });
1568                client
1569                    .fetch_query(hierarchy_child_scope.clone(), &100)
1570                    .await;
1571                assert!(!client.is_key_invalid(&hierarchy_child_scope, &100));
1572
1573                client.invalidate_query(&hierarchy_parent_scope, ());
1574                tick!();
1575                assert!(client.is_key_invalid(&hierarchy_parent_scope, &()));
1576                assert!(client.is_key_invalid(&fetcher, &2));
1577                assert!(client.is_key_invalid(&hierarchy_child_scope, &100));
1578
1579                // Reset:
1580                client.fetch_query(&hierarchy_parent_scope, &()).await;
1581                client.fetch_query(&fetcher, &2).await;
1582                client
1583                    .fetch_query(hierarchy_child_scope.clone(), &100)
1584                    .await;
1585
1586                client.invalidate_query(&fetcher, 2);
1587                tick!();
1588                assert!(!client.is_key_invalid(&hierarchy_parent_scope, &()));
1589                assert!(client.is_key_invalid(&fetcher, &2));
1590                assert!(client.is_key_invalid(&hierarchy_child_scope, &100));
1591
1592                // Reset:
1593                client.fetch_query(&hierarchy_parent_scope, &()).await;
1594                client.fetch_query(&fetcher, &2).await;
1595                client
1596                    .fetch_query(hierarchy_child_scope.clone(), &100)
1597                    .await;
1598
1599                client.invalidate_query(&hierarchy_child_scope, 100);
1600                tick!();
1601                assert!(!client.is_key_invalid(&hierarchy_parent_scope, &()));
1602                assert!(!client.is_key_invalid(&fetcher, &2));
1603                assert!(client.is_key_invalid(&hierarchy_child_scope, &100));
1604            })
1605            .await;
1606    }
1607
1608    #[rstest]
1609    #[tokio::test]
1610    async fn test_key_tracked_autoreload(
1611        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1612        #[values(false, true)] arc: bool,
1613        #[values(false, true)] server_ctx: bool,
1614    ) {
1615        identify_parking_lot_deadlocks();
1616        tokio::task::LocalSet::new()
1617            .run_until(async move {
1618                let (fetcher, fetch_calls) = default_fetcher();
1619
1620                let (client, _guard, _owner) = prep_vari!(server_ctx);
1621
1622                let add_size = RwSignal::new(1);
1623
1624                macro_rules! check {
1625                    ($get_resource:expr) => {{
1626                        let resource = $get_resource();
1627                        let subscribed =
1628                            client.subscribe_value(fetcher.clone(), move || add_size.get());
1629
1630                        // Should be None initially with the sync methods:
1631                        assert!(resource.get_untracked().is_none());
1632                        assert!(resource.try_get_untracked().unwrap().is_none());
1633                        assert!(resource.get().is_none());
1634                        assert!(resource.try_get().unwrap().is_none());
1635                        assert!(resource.read().is_none());
1636                        assert!(resource.try_read().as_deref().unwrap().is_none());
1637                        assert_eq!(subscribed.get_untracked(), None);
1638
1639                        // On the server cannot actually run local resources:
1640                        if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1641                            let resource = $get_resource();
1642                            assert_eq!($get_resource().await, 2);
1643                            assert_eq!(subscribed.get_untracked(), Some(2));
1644                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1645
1646                            // Update the resource key:
1647                            add_size.set(2);
1648
1649                            // Until the new fetch has completed, the old should still be returned:
1650                            tick!();
1651                            assert_eq!(resource.get(), Some(2));
1652                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1653
1654                            // Wait for the new to complete:
1655                            tokio::time::sleep(std::time::Duration::from_millis(
1656                                DEFAULT_FETCHER_MS + 10,
1657                            ))
1658                            .await;
1659                            tick!();
1660
1661                            // Should have updated to the new value:
1662                            assert_eq!(resource.get(), Some(4));
1663                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1664                            assert_eq!($get_resource().await, 4);
1665                            assert_eq!(subscribed.get_untracked(), Some(4));
1666                            assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1667                        }
1668                    }};
1669                }
1670
1671                vari_new_resource_with_cb!(
1672                    check,
1673                    client,
1674                    fetcher.clone(),
1675                    move || add_size.get(),
1676                    resource_type,
1677                    arc
1678                );
1679            })
1680            .await;
1681    }
1682
1683    /// Make sure values on first receival and cached all stick to their specific key.
1684    #[rstest]
1685    #[tokio::test]
1686    async fn test_key_integrity(
1687        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1688        #[values(false, true)] arc: bool,
1689        #[values(false, true)] server_ctx: bool,
1690    ) {
1691        identify_parking_lot_deadlocks();
1692        tokio::task::LocalSet::new()
1693            .run_until(async move {
1694                // On the server cannot actually run local resources:
1695                if cfg!(feature = "ssr") && resource_type == ResourceType::Local {
1696                    return;
1697                }
1698
1699                let (fetcher, fetch_calls) = default_fetcher();
1700                let (client, _guard, _owner) = prep_vari!(server_ctx);
1701
1702                let keys = [1, 2, 3, 4, 5];
1703                let results = futures::future::join_all(keys.iter().cloned().map(|key| {
1704                    let fetcher = fetcher.clone();
1705                    async move {
1706                        macro_rules! cb {
1707                            ($get_resource:expr) => {{
1708                                let resource = $get_resource();
1709                                resource.await
1710                            }};
1711                        }
1712                        vari_new_resource_with_cb!(
1713                            cb,
1714                            client,
1715                            fetcher,
1716                            move || key,
1717                            resource_type,
1718                            arc
1719                        )
1720                    }
1721                }))
1722                .await;
1723                assert_eq!(results, vec![2, 4, 6, 8, 10]);
1724                assert_eq!(fetch_calls.load(Ordering::Relaxed), 5);
1725
1726                // Call again, each should still be accurate, but each should be cached so fetch call doesn't increase:
1727                let results = futures::future::join_all(keys.iter().cloned().map(|key| {
1728                    let fetcher = fetcher.clone();
1729                    async move {
1730                        macro_rules! cb {
1731                            ($get_resource:expr) => {{
1732                                let resource = $get_resource();
1733                                resource.await
1734                            }};
1735                        }
1736                        vari_new_resource_with_cb!(
1737                            cb,
1738                            client,
1739                            fetcher,
1740                            move || key,
1741                            resource_type,
1742                            arc
1743                        )
1744                    }
1745                }))
1746                .await;
1747                assert_eq!(results, vec![2, 4, 6, 8, 10]);
1748                assert_eq!(fetch_calls.load(Ordering::Relaxed), 5);
1749            })
1750            .await;
1751    }
1752
1753    /// Make sure resources that are loaded together only run once but share the value.
1754    #[rstest]
1755    #[tokio::test]
1756    async fn test_resource_race(
1757        #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1758        #[values(false, true)] arc: bool,
1759        #[values(false, true)] server_ctx: bool,
1760    ) {
1761        identify_parking_lot_deadlocks();
1762        tokio::task::LocalSet::new()
1763            .run_until(async move {
1764                // On the server cannot actually run local resources:
1765                if cfg!(feature = "ssr") && resource_type == ResourceType::Local {
1766                    return;
1767                }
1768
1769                let (fetcher, fetch_calls) = default_fetcher();
1770                let (client, _guard, _owner) = prep_vari!(server_ctx);
1771
1772                let keyer = || 1;
1773                let results = futures::future::join_all((0..10).map(|_| {
1774                    let fetcher = fetcher.clone();
1775                    async move {
1776                        macro_rules! cb {
1777                            ($get_resource:expr) => {{
1778                                let resource = $get_resource();
1779                                resource.await
1780                            }};
1781                        }
1782                        vari_new_resource_with_cb!(cb, client, fetcher, keyer, resource_type, arc)
1783                    }
1784                }))
1785                .await
1786                .into_iter()
1787                .collect::<Vec<_>>();
1788                assert_eq!(results, vec![2; 10]);
1789                assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1790            })
1791            .await;
1792    }
1793
1794    #[cfg(feature = "ssr")]
1795    #[tokio::test]
1796    async fn test_resource_cross_stream_caching() {
1797        identify_parking_lot_deadlocks();
1798        tokio::task::LocalSet::new()
1799            .run_until(async move {
1800                for maybe_sleep_ms in &[None, Some(10), Some(30)] {
1801                    let (client, ssr_ctx, _owner) = prep_server!();
1802
1803                    let fetch_calls = Arc::new(AtomicUsize::new(0));
1804                    let fetcher = {
1805                        let fetch_calls = fetch_calls.clone();
1806                        move |key: u64| {
1807                            fetch_calls.fetch_add(1, Ordering::Relaxed);
1808                            async move {
1809                                if let Some(sleep_ms) = maybe_sleep_ms {
1810                                    tokio::time::sleep(tokio::time::Duration::from_millis(
1811                                        *sleep_ms as u64,
1812                                    ))
1813                                    .await;
1814                                }
1815                                key * 2
1816                            }
1817                        }
1818                    };
1819                    let fetcher = QueryScope::new(fetcher);
1820
1821                    let keyer = || 1;
1822
1823                    // First call should require a fetch.
1824                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1825                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1826
1827                    // Second should be cached by the query client because same key:
1828                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1829                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1830
1831                    // Should make it over to the frontend too:
1832                    let (client, _owner) = prep_client!(ssr_ctx);
1833
1834                    // This will stream from the first ssr resource:
1835                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1836                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1837
1838                    // This will stream from the second ssr resource:
1839                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1840                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1841
1842                    // This drives the effect that will put the resource into the frontend cache:
1843                    tick!();
1844
1845                    // This didn't happen in ssr so nothing to stream,
1846                    // but the other 2 resources shoud've still put themselves into the frontend cache,
1847                    // so this should get picked up by that.
1848                    assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1849                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1850
1851                    // Reset and confirm works for non blocking too:
1852                    let (ssr_client, ssr_ctx, _owner) = prep_server!();
1853                    fetch_calls.store(0, Ordering::Relaxed);
1854
1855                    // Don't await:
1856                    let ssr_resource_1 = ssr_client.arc_resource(fetcher.clone(), keyer);
1857                    let ssr_resource_2 = ssr_client.arc_resource(fetcher.clone(), keyer);
1858
1859                    let (hydrate_client, _owner) = prep_client!(ssr_ctx);
1860
1861                    // Matching 2 resources on hydrate, these should stream:
1862                    let hydrate_resource_1 = hydrate_client.arc_resource(fetcher.clone(), keyer);
1863                    let hydrate_resource_2 = hydrate_client.arc_resource(fetcher.clone(), keyer);
1864
1865                    // Wait for all 4 together, should still only have had 1 fetch.
1866                    let results = futures::future::join_all(
1867                        vec![
1868                            hydrate_resource_2,
1869                            ssr_resource_1,
1870                            ssr_resource_2,
1871                            hydrate_resource_1,
1872                        ]
1873                        .into_iter()
1874                        .map(|resource| async move { resource.await }),
1875                    )
1876                    .await
1877                    .into_iter()
1878                    .collect::<Vec<_>>();
1879
1880                    assert_eq!(results, vec![2, 2, 2, 2]);
1881                    assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1882
1883                    tick!();
1884
1885                    // This didn't have a matching backend one so should be using the populated cache and still not fetch:
1886                    assert_eq!(hydrate_client.arc_resource(fetcher.clone(), keyer).await, 2);
1887                    assert_eq!(
1888                        fetch_calls.load(Ordering::Relaxed),
1889                        1,
1890                        "{:?}ms",
1891                        maybe_sleep_ms
1892                    );
1893                }
1894            })
1895            .await;
1896    }
1897}