1#![allow(clippy::module_inception)]
2#![allow(clippy::type_complexity)]
3#![allow(clippy::too_many_arguments)]
4#![warn(clippy::disallowed_types)]
5#![warn(missing_docs)]
6#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
7#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_cfg))]
9
10mod arc_local_signal;
11mod cache;
12mod cache_scope;
13mod debug_if_devtools_enabled;
14#[cfg(any(
15 all(debug_assertions, feature = "devtools"),
16 feature = "devtools-always"
17))]
18mod events;
19mod global;
20mod maybe_local;
21mod no_reactive_diagnostics_future;
22mod pagination;
23mod query;
24mod query_client;
25mod query_maybe_key;
26mod query_options;
27mod query_scope;
28mod resource_drop_guard;
29#[cfg(any(
30 all(debug_assertions, feature = "devtools"),
31 feature = "devtools-always"
32))]
33mod subs_client;
34mod subs_scope;
35mod trie;
36mod utils;
37mod value_with_callbacks;
38
39#[cfg(any(feature = "devtools", feature = "devtools-always"))]
40mod dev_tools;
41#[cfg(any(feature = "devtools", feature = "devtools-always"))]
42pub use dev_tools::QueryDevtools;
43
44pub use arc_local_signal::*;
45pub use pagination::*;
46pub use query_client::*;
47pub use query_options::*;
48pub use query_scope::{QueryScope, QueryScopeLocal};
49
50#[cfg(test)]
51mod test {
52 use std::{
53 collections::HashMap,
54 fmt::Debug,
55 hash::Hash,
56 marker::PhantomData,
57 ptr::NonNull,
58 sync::{
59 Arc,
60 atomic::{AtomicBool, AtomicUsize, Ordering},
61 },
62 };
63
64 use futures::future::Either;
65 use hydration_context::{
66 PinnedFuture, PinnedStream, SerializedDataId, SharedContext, SsrSharedContext,
67 };
68
69 use any_spawner::Executor;
70 use leptos::{error::ErrorId, prelude::*};
71
72 use rstest::*;
73
74 use crate::{global::does_scope_id_exist, query_scope::QueryScopeLocalTrait, utils::OnDrop};
75
76 use super::*;
77
78 pub struct MockHydrateSharedContext {
79 id: AtomicUsize,
80 is_hydrating: AtomicBool,
81 during_hydration: AtomicBool,
82
83 resolved_resources: Vec<(SerializedDataId, String)>,
88 }
89
90 impl MockHydrateSharedContext {
91 pub async fn new(ssr_ctx: Option<&SsrSharedContext>) -> Self {
92 Self {
93 id: AtomicUsize::new(0),
94 is_hydrating: AtomicBool::new(true),
95 during_hydration: AtomicBool::new(true),
96 resolved_resources: if let Some(ssr_ctx) = ssr_ctx {
99 ssr_ctx.consume_buffers().await
100 } else {
101 vec![]
102 },
103 }
104 }
105 }
106
107 impl Debug for MockHydrateSharedContext {
108 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
109 f.debug_struct("MockHydrateSharedContext").finish()
110 }
111 }
112
113 impl SharedContext for MockHydrateSharedContext {
114 fn is_browser(&self) -> bool {
115 true
116 }
117
118 fn next_id(&self) -> SerializedDataId {
119 let id = self.id.fetch_add(1, Ordering::Relaxed);
120 SerializedDataId::new(id)
121 }
122
123 fn write_async(&self, _id: SerializedDataId, _fut: PinnedFuture<String>) {}
124
125 fn read_data(&self, id: &SerializedDataId) -> Option<String> {
126 self.resolved_resources
127 .get(id.clone().into_inner())
128 .map(|(_, data)| data.to_string())
129 }
130
131 fn await_data(&self, _id: &SerializedDataId) -> Option<String> {
132 todo!()
133 }
134
135 fn pending_data(&self) -> Option<PinnedStream<String>> {
136 None
137 }
138
139 fn during_hydration(&self) -> bool {
140 self.during_hydration.load(Ordering::Relaxed)
141 }
142
143 fn hydration_complete(&self) {
144 self.during_hydration.store(false, Ordering::Relaxed)
145 }
146
147 fn get_is_hydrating(&self) -> bool {
148 self.is_hydrating.load(Ordering::Relaxed)
149 }
150
151 fn set_is_hydrating(&self, is_hydrating: bool) {
152 self.is_hydrating.store(is_hydrating, Ordering::Relaxed)
153 }
154
155 fn errors(&self, _boundary_id: &SerializedDataId) -> Vec<(ErrorId, leptos::error::Error)> {
156 vec![]
157 }
168
169 #[inline(always)]
170 fn register_error(
171 &self,
172 _error_boundary: SerializedDataId,
173 _error_id: ErrorId,
174 _error: leptos::error::Error,
175 ) {
176 }
177
178 #[inline(always)]
179 fn seal_errors(&self, _boundary_id: &SerializedDataId) {}
180
181 fn take_errors(&self) -> Vec<(SerializedDataId, ErrorId, leptos::error::Error)> {
182 vec![]
184 }
185
186 #[inline(always)]
187 fn defer_stream(&self, _wait_for: PinnedFuture<()>) {}
188
189 #[inline(always)]
190 fn await_deferred(&self) -> Option<PinnedFuture<()>> {
191 None
192 }
193
194 #[inline(always)]
195 fn set_incomplete_chunk(&self, _id: SerializedDataId) {}
196
197 fn get_incomplete_chunk(&self, _id: &SerializedDataId) -> bool {
198 false
200 }
201 }
202
203 macro_rules! prep_server {
204 () => {{
205 _ = Executor::init_tokio();
206 let ssr_ctx = Arc::new(SsrSharedContext::new());
207 let owner = Owner::new_root(Some(ssr_ctx.clone()));
208 owner.set();
209 provide_context(crate::test::ExampleCtx);
210 let client = QueryClient::new();
211 (client, ssr_ctx, owner)
212 }};
213 }
214 pub(crate) use prep_server;
215
216 macro_rules! prep_client {
217 () => {{
218 _ = Executor::init_tokio();
219 let owner = Owner::new_root(Some(Arc::new(
220 crate::test::MockHydrateSharedContext::new(None).await,
221 )));
222 owner.set();
223 provide_context(crate::test::ExampleCtx);
224 let client = QueryClient::new();
225 (client, owner)
226 }};
227 ($ssr_ctx:expr) => {{
228 _ = Executor::init_tokio();
229 let owner = Owner::new_root(Some(Arc::new(
230 crate::test::MockHydrateSharedContext::new(Some(&$ssr_ctx)).await,
231 )));
232 owner.set();
233 provide_context(crate::test::ExampleCtx);
234 let client = QueryClient::new();
235 (client, owner)
236 }};
237 }
238 pub(crate) use prep_client;
239
240 macro_rules! prep_vari {
241 ($server:expr) => {
242 if $server {
243 let (client, ssr_ctx, owner) = crate::test::prep_server!();
244 (client, Some(ssr_ctx), owner)
245 } else {
246 let (client, owner) = crate::test::prep_client!();
247 (client, None, owner)
248 }
249 };
250 }
251 pub(crate) use prep_vari;
252
253 macro_rules! tick {
254 () => {
255 Executor::tick().await;
258 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
259 };
260 }
261 pub(crate) use tick;
262
263 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
264 enum ResourceType {
265 Local,
266 Normal,
267 Blocking,
268 }
269
270 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
271 enum InvalidationType {
272 Query,
273 Scope,
274 Predicate,
275 All,
276 Clear,
277 }
278
279 impl InvalidationType {
280 fn invalidate<K, V, M>(
281 &self,
282 client: &QueryClient,
283 query_scope: impl QueryScopeLocalTrait<K, V, M>,
284 key: &K,
285 ) where
286 K: Debug + Hash + PartialEq + Eq + Clone + 'static,
287 V: Debug + Clone + 'static,
288 {
289 match self {
290 InvalidationType::Query => {
291 client.invalidate_query(query_scope, key);
292 }
293 InvalidationType::Scope => {
294 client.invalidate_query_scope(query_scope);
295 }
296 InvalidationType::Predicate => {
297 client
298 .invalidate_queries_with_predicate(query_scope, |test_key| test_key == key);
299 }
300 InvalidationType::All => {
301 client.invalidate_all_queries();
302 }
303 InvalidationType::Clear => {
304 client.clear();
305 }
306 }
307 }
308 }
309
310 macro_rules! vari_new_resource_with_cb {
311 ($cb:ident, $client:expr, $fetcher:expr, $keyer:expr, $resource_type:expr, $arc:expr) => {
312 match ($resource_type, $arc) {
313 (ResourceType::Local, true) => {
314 $cb!(|| $client.arc_local_resource($fetcher, $keyer))
315 }
316 (ResourceType::Local, false) => {
317 $cb!(|| $client.local_resource($fetcher, $keyer))
318 }
319 (ResourceType::Normal, true) => {
320 $cb!(|| $client.arc_resource($fetcher, $keyer))
321 }
322 (ResourceType::Normal, false) => {
323 $cb!(|| $client.resource($fetcher, $keyer))
324 }
325 (ResourceType::Blocking, true) => {
326 $cb!(|| $client.arc_resource_blocking($fetcher, $keyer))
327 }
328 (ResourceType::Blocking, false) => {
329 $cb!(|| $client.resource_blocking($fetcher, $keyer))
330 }
331 }
332 };
333 }
334
335 #[derive(Clone, Copy, Debug)]
336 pub struct ExampleCtx;
337
338 const DEFAULT_FETCHER_MS: u64 = 30;
339 fn default_fetcher() -> (QueryScope<u64, u64>, Arc<AtomicUsize>) {
340 let fetch_calls = Arc::new(AtomicUsize::new(0));
341 let fetcher_src = {
342 let fetch_calls = fetch_calls.clone();
343 move |key: u64| {
344 let fetch_calls = fetch_calls.clone();
345 expect_context::<ExampleCtx>();
346 async move {
347 expect_context::<ExampleCtx>();
348 tokio::time::sleep(tokio::time::Duration::from_millis(DEFAULT_FETCHER_MS))
349 .await;
350 expect_context::<ExampleCtx>();
351 fetch_calls.fetch_add(1, Ordering::Relaxed);
352 key * 2
353 }
354 }
355 };
356 (QueryScope::new(fetcher_src), fetch_calls)
357 }
358
359 pub fn identify_parking_lot_deadlocks() {
360 static ONCE: std::sync::Once = std::sync::Once::new();
361 ONCE.call_once(|| {
362 std::thread::spawn(move || {
363 loop {
364 std::thread::sleep(std::time::Duration::from_secs(5));
365 let deadlocks = parking_lot::deadlock::check_deadlock();
366 if deadlocks.is_empty() {
367 continue;
368 }
369
370 println!("{} deadlocks detected", deadlocks.len());
371 for (i, threads) in deadlocks.iter().enumerate() {
372 println!("Deadlock #{i}");
373 for t in threads {
374 println!("Thread Id {:#?}", t.thread_id());
375 println!("{:#?}", t.backtrace());
376 }
377 }
378 }
379 });
380 });
381 }
382
383 #[rstest]
384 #[tokio::test]
385 async fn test_codecs(
386 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
387 #[values(false, true)] arc: bool,
388 #[values(false, true)] server_ctx: bool,
389 ) {
390 identify_parking_lot_deadlocks();
391 tokio::task::LocalSet::new()
392 .run_until(async move {
393 let (fetcher, _fetch_calls) = default_fetcher();
394
395 let (client_default, _guard, _owner) = prep_vari!(server_ctx);
396 let client_custom =
397 QueryClient::new().set_codec::<codee::binary::FromToBytesCodec>();
398 use_context::<QueryClient>();
399 use_context::<QueryClient<codee::binary::FromToBytesCodec>>();
400
401 macro_rules! check {
402 ($get_resource:expr) => {{
403 let resource = $get_resource();
404 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
406 assert_eq!(resource.await, 4);
407 }
408 }};
409 }
410
411 vari_new_resource_with_cb!(
412 check,
413 client_default,
414 fetcher.clone(),
415 move || 2,
416 resource_type,
417 arc
418 );
419 vari_new_resource_with_cb!(
420 check,
421 client_custom,
422 fetcher.clone(),
423 move || 2,
424 resource_type,
425 arc
426 );
427 })
428 .await;
429 }
430
431 #[rstest]
432 #[tokio::test]
433 async fn test_no_query_args() {
434 identify_parking_lot_deadlocks();
435 tokio::task::LocalSet::new()
436 .run_until(async move {
437 let (client, _guard, _owner) = prep_vari!(false);
438
439 async fn fn_no_arg() -> &'static str {
440 "no_arg"
441 }
442
443 async fn fn_with_arg(arg: &'static str) -> &'static str {
444 arg
445 }
446
447 assert_eq!(client.fetch_query(fn_no_arg, ()).await, "no_arg");
448 assert_eq!(
449 client.fetch_query(fn_with_arg, "with_arg").await,
450 "with_arg"
451 );
452
453 assert_eq!(
454 client.fetch_query(QueryScope::new(fn_no_arg), ()).await,
455 "no_arg"
456 );
457 assert_eq!(
458 client
459 .fetch_query(QueryScope::new(fn_with_arg), "with_arg")
460 .await,
461 "with_arg"
462 );
463
464 assert_eq!(
465 client
466 .fetch_query_local(QueryScopeLocal::new(fn_no_arg), ())
467 .await,
468 "no_arg"
469 );
470 assert_eq!(
471 client
472 .fetch_query_local(QueryScopeLocal::new(fn_with_arg), "with_arg")
473 .await,
474 "with_arg"
475 );
476 })
477 .await;
478 }
479
480 #[rstest]
484 #[tokio::test]
485 async fn test_shared_cache() {
486 identify_parking_lot_deadlocks();
487 tokio::task::LocalSet::new()
488 .run_until(async move {
489 let (fetcher, _fetch_calls) = default_fetcher();
490 let (client, _guard, _owner) = prep_vari!(false);
491
492 client.set_query_local(&fetcher, 2, 1);
494 assert_eq!(client.get_cached_query(&fetcher, 2), Some(1));
495
496 std::thread::spawn({
498 let fetcher = fetcher.clone();
499 move || {
500 tokio::runtime::Builder::new_current_thread()
501 .build()
502 .unwrap()
503 .block_on(async move {
504 assert_eq!(client.get_cached_query(&fetcher, 2), None);
506
507 client.set_query(&fetcher, 2, 3);
509 client.set_query_local(&fetcher, 2, 2);
510 });
511 }
512 })
513 .join()
514 .unwrap();
515 assert_eq!(client.get_cached_query(&fetcher, 2), Some(3));
517
518 let fetcher = fetcher.clone();
520 assert_eq!(client.get_cached_query(&fetcher, 2), Some(3));
521
522 let (fetcher_2, _fetcher_2_calls) = default_fetcher();
524 assert_eq!(client.get_cached_query(&fetcher_2, 2), Some(3));
525
526 let fetcher = QueryScope::new(move |key| {
528 let fetcher = fetcher.clone();
529 async move { query_scope::QueryScopeTrait::query(&fetcher, key).await }
530 });
531 assert_eq!(client.get_cached_query(&fetcher, 2), None);
532 })
533 .await;
534 }
535
536 #[rstest]
538 #[tokio::test]
539 async fn test_scope_cleanup() {
540 identify_parking_lot_deadlocks();
541 tokio::task::LocalSet::new()
542 .run_until(async move {
543 let (client, _guard, _owner) = prep_vari!(false);
544 assert!(does_scope_id_exist(
545 client.untyped_client.scope_lookup.scope_id
546 ));
547 drop(_owner);
548 assert!(!does_scope_id_exist(
549 client.untyped_client.scope_lookup.scope_id
550 ));
551 })
552 .await;
553 }
554
555 #[rstest]
557 #[tokio::test]
558 async fn test_infinite() {
559 identify_parking_lot_deadlocks();
560
561 #[cfg(not(feature = "ssr"))]
563 tokio::task::LocalSet::new()
564 .run_until(async move {
565 #[derive(Clone, Debug, PartialEq, Eq)]
566 struct InfiniteItem(usize);
567
568 #[derive(Clone, Debug, PartialEq, Eq)]
569 struct InfiniteList {
570 items: Vec<InfiniteItem>,
571 offset: usize,
572 more_available: bool,
573 }
574
575 async fn get_list_items(offset: usize) -> Vec<InfiniteItem> {
576 (offset..offset + 10).map(InfiniteItem).collect()
577 }
578
579 async fn get_list_query(_key: ()) -> InfiniteList {
580 let items = get_list_items(0).await;
581 InfiniteList {
582 offset: items.len(),
583 more_available: !items.is_empty(),
584 items,
585 }
586 }
587
588 let (client, _guard, _owner) = prep_vari!(false);
589
590 let resource = client.local_resource(get_list_query, || ());
593 assert_eq!(
594 resource.await,
595 InfiniteList {
596 items: (0..10).map(InfiniteItem).collect::<Vec<_>>(),
597 offset: 10,
598 more_available: true
599 }
600 );
601
602 client
604 .update_query_async(get_list_query, (), async |last| {
605 if last.more_available {
606 let next_items = get_list_items(last.offset).await;
607 last.offset += next_items.len();
608 last.more_available = !next_items.is_empty();
609 last.items.extend(next_items);
610 }
611 })
612 .await;
613
614 assert_eq!(
616 client.get_cached_query(get_list_query, ()),
617 Some(InfiniteList {
618 items: (0..20).map(InfiniteItem).collect::<Vec<_>>(),
619 offset: 20,
620 more_available: true
621 })
622 );
623 })
624 .await;
625 }
626
627 #[rstest]
635 #[tokio::test]
636 async fn test_declaratives() {
637 identify_parking_lot_deadlocks();
638 tokio::task::LocalSet::new()
639 .run_until(async move {
640 let (fetcher, _fetch_calls) = default_fetcher();
641 let (client, _guard, _owner) = prep_vari!(false);
642
643 let key = 1;
644
645 let value_sub_react_count = Arc::new(AtomicUsize::new(0));
647 let value_sub = client.subscribe_value(&fetcher, move || key);
648 Effect::new_isomorphic({
649 let value_sub_react_count = value_sub_react_count.clone();
650 move || {
651 value_sub.get();
652 value_sub_react_count.fetch_add(1, Ordering::Relaxed);
653 }
654 });
655
656 macro_rules! maybe_reacts {
657 ($reacts:expr, $block:expr) => {{
658 tick!();
659 let before = value_sub_react_count.load(Ordering::Relaxed);
660 let result = $block;
661 tick!();
662 let after = value_sub_react_count.load(Ordering::Relaxed);
663 if $reacts {
664 assert_eq!(
665 after,
666 before + 1,
667 "{} != {}, didn't react like expected",
668 after,
669 before
670 );
671 } else {
672 assert_eq!(
673 after, before,
674 "{} != {}, reacted when it shouldn't",
675 after, before
676 );
677 }
678 result
679 }};
680 }
681
682 assert!(!client.query_exists(&fetcher, key));
683 client.set_query_local(&fetcher, key, 1);
684 assert_eq!(client.get_cached_query(&fetcher, key), Some(1));
685
686 client.invalidate_query(&fetcher, key);
687 maybe_reacts!(
688 true,
689 assert!(client.update_query(&fetcher, key, |value| {
690 value
691 .map(|v| {
692 *v = 2;
693 true
694 })
695 .unwrap_or(false)
696 }))
697 );
698 assert!(client.is_key_invalid(&fetcher, key));
700
701 assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
702
703 assert!(client.is_key_invalid(&fetcher, key));
705 client.set_query(&fetcher, key, 3);
707 assert!(client.is_key_invalid(&fetcher, key));
708
709 assert_eq!(client.get_cached_query(&fetcher, key), Some(3));
710
711 maybe_reacts!(
712 true,
713 assert!(client.update_query(&fetcher, key, |value| {
714 value
715 .map(|v| {
716 *v *= 2;
717 true
718 })
719 .unwrap_or(false)
720 }))
721 );
722
723 maybe_reacts!(true, client.update_query(&fetcher, key, |_value| {}));
725 maybe_reacts!(
726 false,
727 client.update_query(&fetcher, key, |_value| { client.untrack_update_query() })
728 );
729
730 assert_eq!(client.get_cached_query(&fetcher, key), Some(6));
731 assert!(client.query_exists(&fetcher, key));
732
733 assert!(client.untyped_client.clear_query(&fetcher, key));
734 assert!(!client.query_exists(&fetcher, key));
735
736 maybe_reacts!(true, client.prefetch_query_local(&fetcher, key).await);
737
738 assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
739 client.clear();
740 assert_eq!(client.total_cached_queries(), 0);
741 maybe_reacts!(true, client.prefetch_query(&fetcher, key).await);
742 assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
743
744 assert!(client.untyped_client.clear_query(&fetcher, key));
745 assert!(!client.query_exists(&fetcher, key));
746 maybe_reacts!(
747 true,
748 assert_eq!(client.fetch_query_local(&fetcher, key).await, 2)
749 );
750 assert!(client.query_exists(&fetcher, key));
751 client.clear();
752 assert_eq!(client.total_cached_queries(), 0);
753 maybe_reacts!(true, assert_eq!(client.fetch_query(&fetcher, key).await, 2));
754
755 maybe_reacts!(
757 true,
758 assert_eq!(
759 client
760 .update_query_async(&fetcher, key, async |value| {
761 *value += 1;
762 *value
763 })
764 .await,
765 3
766 )
767 );
768 assert_eq!(client.get_cached_query(&fetcher, key), Some(3));
769 maybe_reacts!(
771 true,
772 client
773 .update_query_async(&fetcher, key, async |_value| {})
774 .await
775 );
776 maybe_reacts!(
777 false,
778 client
779 .update_query_async(&fetcher, key, async |_value| {
780 client.untrack_update_query()
781 })
782 .await
783 );
784
785 maybe_reacts!(
786 true,
787 assert_eq!(
788 client
789 .update_query_async_local(&fetcher, key, async |value| {
790 *value += 1;
791 *value
792 })
793 .await,
794 4
795 )
796 );
797 assert_eq!(client.get_cached_query(&fetcher, key), Some(4));
798 maybe_reacts!(
800 true,
801 client
802 .update_query_async_local(&fetcher, key, async |_value| {})
803 .await
804 );
805 maybe_reacts!(
806 false,
807 client
808 .update_query_async_local(&fetcher, key, async |_value| {
809 client.untrack_update_query()
810 })
811 .await
812 );
813
814 let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), move || key);
816 assert!(!is_fetching.get_untracked());
817 tokio::join!(
818 async {
819 assert_eq!(
820 client
821 .update_query_async(&fetcher, key, async |value| {
822 tokio::time::sleep(tokio::time::Duration::from_millis(30))
823 .await;
824 *value += 1;
825 *value
826 })
827 .await,
828 5
829 );
830 },
831 async {
832 let elapsed = std::time::Instant::now();
833 tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
834 tick!();
835 while elapsed.elapsed().as_millis() < 25 {
836 assert!(is_fetching.get_untracked());
837 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
838 }
839 }
840 );
841 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
843 assert!(!is_fetching.get_untracked());
844 })
845 .await;
846 }
847
848 #[rstest]
852 #[tokio::test]
853 async fn test_refetch(
854 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
855 #[values(false, true)] arc: bool,
856 #[values(false, true)] set_refetch_enabled: bool,
857 ) {
858 identify_parking_lot_deadlocks();
859 tokio::task::LocalSet::new()
860 .run_until(async move {
861 const REFETCH_TIME_MS: u64 = 100;
862 const FETCH_TIME_MS: u64 = 10;
863
864 let fetch_calls = Arc::new(AtomicUsize::new(0));
865 let fetcher = {
866 let fetch_calls = fetch_calls.clone();
867 move |key: u64| {
868 fetch_calls.fetch_add(1, Ordering::Relaxed);
869 async move {
870 tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_TIME_MS)).await;
871 key * 2
872 }
873 }
874 };
875 let fetcher = QueryScope::new(
876 fetcher
877 ).with_options(QueryOptions::new().with_refetch_interval(std::time::Duration::from_millis(REFETCH_TIME_MS)));
878
879 let (mut client, _guard, owner) = prep_vari!(false);
880 let refetch_enabled = ArcRwSignal::new(true);
881 if set_refetch_enabled {
882 client = client.with_refetch_enabled_toggle(refetch_enabled.clone());
883 }
884
885 macro_rules! with_tmp_owner {
886 ($body:block) => {{
887 let tmp_owner = owner.child();
888 tmp_owner.set();
889 let result = $body;
890 tmp_owner.unset();
891 owner.set();
892 result
893 }};
894 }
895
896 macro_rules! check {
897 ($get_resource:expr) => {{
898
899 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
901
902 with_tmp_owner! {{
904 assert_eq!($get_resource().await, 4);
905 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
906 assert_eq!(client.total_cached_queries(), 1);
907
908 assert_eq!($get_resource().await, 4);
910 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
911 assert_eq!(client.total_cached_queries(), 1);
912 }}
913
914 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
916 tick!();
917 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
918
919 with_tmp_owner! {{
921 let _resource = $get_resource();
923 tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_TIME_MS)).await;
924 tick!();
925 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
926
927 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
929 tick!();
930
931 assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
932 assert_eq!($get_resource().await, 4);
933 assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
934
935 if set_refetch_enabled {
936 refetch_enabled.set(false);
938 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
939 tick!();
940
941 assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
942 assert_eq!($get_resource().await, 4);
943 assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
944
945 refetch_enabled.set(true);
947 }
948
949 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
950 tick!();
951
952 assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
953 assert_eq!($get_resource().await, 4);
954 assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
955 }}
956
957 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
959 tick!();
960 assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
961 }
962 }};
963 }
964
965 vari_new_resource_with_cb!(
966 check,
967 client,
968 fetcher.clone(),
969 || 2,
970 resource_type,
971 arc
972 );
973 })
974 .await;
975 }
976
977 #[rstest]
980 #[tokio::test]
981 async fn test_drop_semantics(#[values(false, true)] local: bool) {
982 tokio::task::LocalSet::new()
983 .run_until(async move {
984 let owner = Owner::default();
985
986 macro_rules! with_tmp_owner {
987 ($body:block) => {{
988 let tmp_owner = owner.child();
989 tmp_owner.set();
990 let result = $body;
991 tmp_owner.unset();
992 owner.set();
993 result
994 }};
995 }
996
997 let dropped = with_tmp_owner! {{
998 let dropped = Arc::new(AtomicBool::new(false));
999 let on_drop = Arc::new(OnDrop::new({
1000 let dropped = dropped.clone();
1001 move || {
1002 dropped.store(true, Ordering::Relaxed);
1003 }}));
1004 if local {
1005 ArenaItem::<_, SyncStorage>::new_with_storage(ArcAsyncDerived::new_unsync(
1006 move || {
1007 let _on_drop = on_drop.clone();
1008 async move {
1009 }
1010 })
1011 );
1012 } else {
1013 ArenaItem::<_, SyncStorage>::new_with_storage(ArcAsyncDerived::new(
1014 move || {
1015 let _on_drop = on_drop.clone();
1016 async move {
1017 }
1018 })
1019 );
1020 }
1021 assert!(!dropped.load(Ordering::Relaxed));
1022 dropped
1023 }};
1024 tick!();
1025 assert!(dropped.load(Ordering::Relaxed));
1026 })
1027 .await;
1028 }
1029
1030 #[rstest]
1032 #[tokio::test]
1033 async fn test_gc(
1034 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1035 #[values(false, true)] arc: bool,
1036 ) {
1037 identify_parking_lot_deadlocks();
1038 tokio::task::LocalSet::new()
1039 .run_until(async move {
1040 const GC_TIME_MS: u64 = 30;
1041
1042 let fetch_calls = Arc::new(AtomicUsize::new(0));
1043 let fetcher = {
1044 let fetch_calls = fetch_calls.clone();
1045 move |key: u64| {
1046 fetch_calls.fetch_add(1, Ordering::Relaxed);
1047 async move {
1048 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1049 key * 2
1050 }
1051 }
1052 };
1053
1054
1055
1056 let fetcher = QueryScope::new(
1057 fetcher
1058 ).with_options(QueryOptions::new().with_gc_time(std::time::Duration::from_millis(GC_TIME_MS)));
1059
1060 let gc_counts = Arc::new(parking_lot::Mutex::new(HashMap::new()));
1061 let fetcher = fetcher.on_gc({
1062 let gc_counts = gc_counts.clone();
1063 move |key| {
1064 let mut counts = gc_counts.lock();
1065 *counts.entry(*key).or_insert(0) += 1;
1066 }
1067 });
1068
1069 let (client, _guard, owner) = prep_vari!(false);
1070
1071 macro_rules! with_tmp_owner {
1072 ($body:block) => {{
1073 let tmp_owner = owner.child();
1074 tmp_owner.set();
1075 let result = $body;
1076 tmp_owner.unset();
1077 owner.set();
1078 result
1079 }};
1080 }
1081
1082 macro_rules! check {
1083 ($get_resource:expr) => {{
1084 let subscribed = client.subscribe_value(fetcher.clone(), move || 2);
1085
1086 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1088
1089 with_tmp_owner! {{
1091 assert_eq!($get_resource().await, 4);
1092 assert_eq!(subscribed.get_untracked(), Some(4));
1093 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1094 assert_eq!(client.total_cached_queries(), 1);
1095
1096 tick!();
1098 assert_eq!($get_resource().await, 4);
1099 assert_eq!(subscribed.get_untracked(), Some(4));
1100 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1101 assert_eq!(client.total_cached_queries(), 1);
1102 assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 0);
1103 }}
1104
1105 with_tmp_owner! {{
1107 tick!();
1108 assert_eq!($get_resource().await, 4);
1109 assert_eq!(subscribed.get_untracked(), Some(4));
1110 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1111 assert_eq!(client.total_cached_queries(), 1);
1112 assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 0);
1113 }}
1114
1115 with_tmp_owner! {{
1117 let _resource = $get_resource();
1118
1119 tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1120 tick!();
1121
1122 assert_eq!($get_resource().await, 4);
1124 assert_eq!(subscribed.get_untracked(), Some(4));
1125 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1126 assert_eq!(client.total_cached_queries(), 1);
1127 assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 0);
1128 }}
1129
1130 with_tmp_owner! {{
1132 assert_eq!(client.total_cached_queries(), 1);
1133
1134 assert_eq!(subscribed.get_untracked(), Some(4));
1135 tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1136 tick!();
1137 assert_eq!(subscribed.get_untracked(), None);
1138 assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 1);
1139
1140 assert_eq!($get_resource().await, 4);
1141 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1142 }}
1143
1144 tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1146 tick!();
1147 assert_eq!(client.total_cached_queries(), 0);
1148 assert_eq!(subscribed.get_untracked(), None);
1149 assert_eq!(*gc_counts.lock().get(&2).unwrap_or(&0), 2);
1150 }
1151 }};
1152 }
1153
1154 vari_new_resource_with_cb!(
1155 check,
1156 client,
1157 fetcher.clone(),
1158 || 2,
1159 resource_type,
1160 arc
1161 );
1162 })
1163 .await;
1164 }
1165
1166 #[rstest]
1168 #[tokio::test]
1169 async fn test_unsync(#[values(false, true)] arc: bool) {
1170 identify_parking_lot_deadlocks();
1171 tokio::task::LocalSet::new()
1172 .run_until(async move {
1173 #[derive(Debug)]
1174 struct UnsyncValue(u64, PhantomData<NonNull<()>>);
1175 impl PartialEq for UnsyncValue {
1176 fn eq(&self, other: &Self) -> bool {
1177 self.0 == other.0
1178 }
1179 }
1180 impl Eq for UnsyncValue {}
1181 impl Clone for UnsyncValue {
1182 fn clone(&self) -> Self {
1183 Self(self.0, PhantomData)
1184 }
1185 }
1186 impl UnsyncValue {
1187 fn new(value: u64) -> Self {
1188 Self(value, PhantomData)
1189 }
1190 }
1191
1192 let fetch_calls = Arc::new(AtomicUsize::new(0));
1193 let fetcher = {
1194 let fetch_calls = fetch_calls.clone();
1195 move |key: u64| {
1196 fetch_calls.fetch_add(1, Ordering::Relaxed);
1197 async move {
1198 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1199 UnsyncValue::new(key * 2)
1200 }
1201 }
1202 };
1203 let fetcher = QueryScopeLocal::new(fetcher);
1204
1205 let (client, _guard, _owner) = prep_vari!(false);
1206
1207 macro_rules! check {
1208 ($get_resource:expr) => {{
1209 let resource = $get_resource();
1210 let subscribed = client.subscribe_value_local(fetcher.clone(), move || 2);
1211
1212 assert!(resource.get_untracked().is_none());
1214 assert!(resource.try_get_untracked().unwrap().is_none());
1215 assert!(resource.get().is_none());
1216 assert!(resource.try_get().unwrap().is_none());
1217 assert!(resource.read().is_none());
1218 assert!(resource.try_read().as_deref().unwrap().is_none());
1219 assert!(subscribed.get_untracked().is_none());
1220
1221 if cfg!(not(feature = "ssr")) {
1223 assert_eq!(resource.await, UnsyncValue::new(4));
1224 assert_eq!(subscribed.get_untracked(), Some(UnsyncValue::new(4)));
1225 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1226
1227 tick!();
1228
1229 assert_eq!($get_resource().await, UnsyncValue::new(4));
1230 assert_eq!(subscribed.get_untracked(), Some(UnsyncValue::new(4)));
1231 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1232 }
1233 }};
1234 }
1235
1236 match arc {
1237 true => {
1238 check!(|| client.arc_local_resource(fetcher.clone(), || 2))
1239 }
1240 false => {
1241 check!(|| client.local_resource(fetcher.clone(), || 2))
1242 }
1243 }
1244 })
1245 .await;
1246 }
1247
1248 #[rstest]
1249 #[tokio::test]
1250 async fn test_subscribe_is_fetching_and_loading(
1251 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1252 #[values(false, true)] arc: bool,
1253 #[values(false, true)] server_ctx: bool,
1254 ) {
1255 identify_parking_lot_deadlocks();
1256 tokio::task::LocalSet::new()
1257 .run_until(async move {
1258 let (fetcher, fetch_calls) = default_fetcher();
1259 let (client, _guard, _owner) = prep_vari!(server_ctx);
1260
1261 macro_rules! check {
1262 ($get_resource:expr) => {{
1263 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1265 assert_eq!(client.subscriber_count(), 0);
1266 let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1267 assert_eq!(is_fetching.get_untracked(), false);
1268 assert_eq!(client.subscriber_count(), 1);
1269 let is_fetching_clone = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1270 assert_eq!(is_fetching_clone.get_untracked(), false);
1271 assert_eq!(client.subscriber_count(), 1);
1273 let is_fetching_other = client.subscribe_is_fetching_arc(fetcher.clone(), || 3);
1274 assert_eq!(is_fetching_other.get_untracked(), false);
1275 assert_eq!(client.subscriber_count(), 2);
1276 let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1277 assert_eq!(is_loading.get_untracked(), false);
1278 assert_eq!(client.subscriber_count(), 3);
1279 let is_loading_other = client.subscribe_is_loading_arc(fetcher.clone(), || 3);
1280 assert_eq!(is_loading_other.get_untracked(), false);
1281 assert_eq!(client.subscriber_count(), 4);
1282
1283
1284 macro_rules! check_all {
1285 ($expected:expr) => {{
1286 assert_eq!(is_fetching.get_untracked(), $expected);
1287 assert_eq!(is_fetching_other.get_untracked(), $expected);
1288 assert_eq!(is_loading.get_untracked(), $expected);
1289 assert_eq!(is_loading_other.get_untracked(), $expected);
1290 }};
1291 }
1292
1293 check_all!(false);
1294
1295 tokio::join!(
1296 async {
1297 assert_eq!($get_resource().await, 4);
1298 },
1299 async {
1300 let elapsed = std::time::Instant::now();
1301 tick!();
1302 while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1303 assert_eq!(is_fetching.get_untracked(), true);
1304 assert_eq!(is_fetching_other.get_untracked(), false);
1305 assert_eq!(is_loading.get_untracked(), true);
1306 assert_eq!(is_loading_other.get_untracked(), false);
1307 tick!();
1308 }
1309 }
1310 );
1311 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1312
1313 check_all!(false);
1314
1315 assert_eq!($get_resource().await, 4);
1316 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1317
1318 check_all!(false);
1319
1320 tokio::join!(
1322 async {
1323 assert_eq!($get_resource().await, 4);
1324 },
1325 async {
1326 let elapsed = std::time::Instant::now();
1327 tick!();
1328 while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1329 check_all!(false);
1330 tick!();
1331 }
1332 }
1333 );
1334 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1335
1336 client.invalidate_query(fetcher.clone(), &2);
1337
1338 tokio::join!(
1339 async {
1340 assert_eq!($get_resource().await, 4);
1341 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1343 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1344 },
1345 async {
1346 let elapsed = std::time::Instant::now();
1347 tick!();
1348 while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1349 assert_eq!(is_fetching.get_untracked(), true);
1350 assert_eq!(is_fetching_other.get_untracked(), false);
1351 assert_eq!(is_loading.get_untracked(), false);
1354 assert_eq!(is_loading_other.get_untracked(), false);
1355 tick!();
1356 }
1357 }
1358 );
1359 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1360
1361 drop(is_fetching);
1362 assert_eq!(client.subscriber_count(), 4);
1364 drop(is_fetching_clone);
1365 assert_eq!(client.subscriber_count(), 3);
1366 drop(is_loading);
1367 assert_eq!(client.subscriber_count(), 2);
1368 drop(is_fetching_other);
1369 assert_eq!(client.subscriber_count(), 1);
1370 drop(is_loading_other);
1371 assert_eq!(client.subscriber_count(), 0);
1372
1373 client.clear();
1374 assert_eq!(client.total_cached_queries(), 0);
1375
1376 tokio::join!(
1378 async {
1379 assert_eq!($get_resource().await, 4);
1380 },
1381 async {
1382 tick!();
1383 let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1384 let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1385 assert_eq!(is_fetching.get_untracked(), true);
1386 assert_eq!(is_loading.get_untracked(), true);
1387 assert_eq!(client.subscriber_count(), 2);
1388 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1389 assert_eq!(is_fetching.get_untracked(), false);
1390 assert_eq!(is_loading.get_untracked(), false);
1391 }
1392 );
1393 assert_eq!(client.subscriber_count(), 0);
1394
1395 client.invalidate_query(fetcher.clone(), &2);
1397 tokio::join!(
1398 async {
1399 assert_eq!($get_resource().await, 4);
1400 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1401 },
1402 async {
1403 tick!();
1404 let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1405 let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1406 assert_eq!(is_fetching.get_untracked(), true);
1407 assert_eq!(is_loading.get_untracked(), false);
1408 assert_eq!(client.subscriber_count(), 2);
1409 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1410 assert_eq!(is_fetching.get_untracked(), false);
1411 assert_eq!(is_loading.get_untracked(), false);
1412 }
1413 );
1414 assert_eq!(client.subscriber_count(), 0);
1415 client.clear();
1416
1417 let sub_key_signal = RwSignal::new(2);
1419 let resource_key_signal = RwSignal::new(2);
1420 let is_fetching = client.subscribe_is_fetching(fetcher.clone(), move || sub_key_signal.get());
1421 let is_loading = client.subscribe_is_loading(fetcher.clone(), move || sub_key_signal.get());
1422 assert_eq!(is_fetching.get_untracked(), false);
1423 assert_eq!(is_loading.get_untracked(), false);
1424
1425 let _resource = client.resource(fetcher.clone(), move || resource_key_signal.get());
1426
1427 assert_eq!(is_fetching.get_untracked(), true);
1429 assert_eq!(is_loading.get_untracked(), true);
1430 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1431 assert_eq!(is_fetching.get_untracked(), false);
1432 assert_eq!(is_loading.get_untracked(), false);
1433
1434 sub_key_signal.set(2);
1436 resource_key_signal.set(2);
1437 tick!();
1438 assert_eq!(is_fetching.get_untracked(), false);
1439 assert_eq!(is_loading.get_untracked(), false);
1440
1441 resource_key_signal.set(3);
1443 sub_key_signal.set(3);
1444 tick!();
1445 assert_eq!(is_fetching.get_untracked(), true);
1446 assert_eq!(is_loading.get_untracked(), true);
1447 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1448 assert_eq!(is_fetching.get_untracked(), false);
1449 assert_eq!(is_loading.get_untracked(), false);
1450 assert_eq!(client.get_cached_query(fetcher.clone(), &3), Some(6));
1451
1452 client.invalidate_query(fetcher.clone(), &3);
1454 tick!();
1455 assert_eq!(is_fetching.get_untracked(), true);
1456 assert_eq!(is_loading.get_untracked(), false);
1457 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1458 assert_eq!(is_fetching.get_untracked(), false);
1459 assert_eq!(is_loading.get_untracked(), false);
1460
1461 resource_key_signal.set(4);
1463 tick!();
1464 assert_eq!(is_fetching.get_untracked(), false);
1465 assert_eq!(is_loading.get_untracked(), false);
1466 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1467 assert_eq!(client.get_cached_query(fetcher.clone(), &4), Some(8));
1468
1469 let last_is_fetching_value = Arc::new(parking_lot::Mutex::new(None));
1472 Effect::new_isomorphic({
1473 let last_is_fetching_value = last_is_fetching_value.clone();
1474 move || {
1475 *last_is_fetching_value.lock() = Some(is_fetching.get());
1476 }});
1477 assert_eq!(*last_is_fetching_value.lock(), None);
1478 tick!();
1479 assert_eq!(*last_is_fetching_value.lock(), Some(false));
1480 resource_key_signal.set(6);
1481 sub_key_signal.set(6);
1482 assert_eq!(*last_is_fetching_value.lock(), Some(false));
1483 tick!();
1484 assert_eq!(*last_is_fetching_value.lock(), Some(true));
1485 assert_eq!(is_fetching.get_untracked(), true);
1486 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1487 assert_eq!(*last_is_fetching_value.lock(), Some(false));
1488 assert_eq!(is_fetching.get_untracked(), false);
1489 assert_eq!(client.get_cached_query(fetcher.clone(), &6), Some(12));
1490 }
1491 }};
1492 }
1493
1494 vari_new_resource_with_cb!(
1495 check,
1496 client,
1497 fetcher.clone(),
1498 || 2,
1499 resource_type,
1500 arc
1501 );
1502 })
1503 .await;
1504 }
1505
1506 #[rstest]
1508 #[tokio::test]
1509 async fn test_optional_key(
1510 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1511 #[values(false, true)] arc: bool,
1512 #[values(false, true)] server_ctx: bool,
1513 ) {
1514 identify_parking_lot_deadlocks();
1515 tokio::task::LocalSet::new()
1516 .run_until(async move {
1517 let (fetcher, fetch_calls) = default_fetcher();
1518 let (client, _guard, _owner) = prep_vari!(server_ctx);
1519
1520 let key_value = RwSignal::new(None);
1521 let keyer = move || key_value.get();
1522
1523 macro_rules! check {
1524 ($get_resource:expr) => {{
1525 let resource = $get_resource();
1526 assert_eq!(resource.get_untracked().flatten(), None);
1527 assert_eq!(resource.try_get_untracked().flatten().flatten(), None);
1528
1529 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1531 assert_eq!($get_resource().await, None);
1532
1533 let sub_is_loading =
1534 client.subscribe_is_loading(fetcher.clone(), keyer);
1535 let sub_is_fetching =
1536 client.subscribe_is_fetching(fetcher.clone(), keyer);
1537 let sub_value = client.subscribe_value(fetcher.clone(), keyer);
1538
1539 assert_eq!(sub_is_loading.get_untracked(), false);
1540 assert_eq!(sub_is_fetching.get_untracked(), false);
1541 assert_eq!(sub_value.get_untracked(), None);
1542
1543 key_value.set(Some(2));
1544 tick!();
1545
1546 assert_eq!(sub_is_loading.get_untracked(), true);
1547 assert_eq!(sub_is_fetching.get_untracked(), true);
1548
1549 assert_eq!($get_resource().await, Some(4));
1550 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1551 assert_eq!(sub_value.get_untracked(), Some(4));
1552 assert_eq!(sub_is_loading.get_untracked(), false);
1553 assert_eq!(sub_is_fetching.get_untracked(), false);
1554 }
1555 }};
1556 }
1557
1558 vari_new_resource_with_cb!(
1559 check,
1560 client,
1561 fetcher.clone(),
1562 keyer,
1563 resource_type,
1564 arc
1565 );
1566 })
1567 .await;
1568 }
1569
1570 #[rstest]
1572 #[tokio::test]
1573 async fn test_invalidation(
1574 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1575 #[values(false, true)] arc: bool,
1576 #[values(false, true)] server_ctx: bool,
1577 #[values(
1578 InvalidationType::Query,
1579 InvalidationType::Scope,
1580 InvalidationType::Predicate,
1581 InvalidationType::All,
1582 InvalidationType::Clear
1583 )]
1584 invalidation_type: InvalidationType,
1585 ) {
1586 identify_parking_lot_deadlocks();
1587 tokio::task::LocalSet::new()
1588 .run_until(async move {
1589 let (fetcher, fetch_calls) = default_fetcher();
1590 let (client, _guard, _owner) = prep_vari!(server_ctx);
1591
1592 let invalidation_counts = Arc::new(parking_lot::Mutex::new(HashMap::new()));
1593
1594 let fetcher = fetcher.on_invalidation({
1595 let invalidation_counts = invalidation_counts.clone();
1596 move |key| {
1597 let mut counts = invalidation_counts.lock();
1598 *counts.entry(*key).or_insert(0) += 1;
1599 }
1600 });
1601
1602 macro_rules! check {
1603 ($get_resource:expr) => {{
1604 let resource = $get_resource();
1605
1606 assert!(resource.get_untracked().is_none());
1608 assert!(resource.try_get_untracked().unwrap().is_none());
1609 assert!(resource.get().is_none());
1610 assert!(resource.try_get().unwrap().is_none());
1611 assert!(resource.read().is_none());
1612 assert!(resource.try_read().as_deref().unwrap().is_none());
1613
1614 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1616 assert_eq!(resource.await, 4);
1617 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1618
1619 tick!();
1620
1621 assert_eq!($get_resource().await, 4);
1623 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1624
1625 assert_eq!(
1626 *invalidation_counts.lock().get(&2).unwrap_or(&0),
1627 0
1628 );
1629 invalidation_type.invalidate(&client, fetcher.clone(), &2);
1630 assert_eq!(
1631 *invalidation_counts.lock().get(&2).unwrap_or(&0),
1632 1
1633 );
1634
1635 invalidation_type.invalidate(&client, fetcher.clone(), &2);
1638 assert_eq!(
1639 *invalidation_counts.lock().get(&2).unwrap_or(&0),
1640 1
1641 );
1642
1643 let resource2 = $get_resource();
1646 tick!();
1647 if matches!(invalidation_type, InvalidationType::Clear) {
1648 assert_eq!(resource2.get_untracked(), None);
1649 } else {
1650 assert_eq!(resource2.get_untracked(), Some(4));
1651 }
1652 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1653
1654 tick!();
1656 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1657 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1658 assert_eq!($get_resource().await, 4);
1659 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1660
1661 assert_eq!(
1663 *invalidation_counts.lock().get(&2).unwrap_or(&0),
1664 1
1665 );
1666 invalidation_type.invalidate(&client, fetcher.clone(), &2);
1667 assert_eq!(
1668 *invalidation_counts.lock().get(&2).unwrap_or(&0),
1669 2
1670 );
1671 }
1672 }};
1673 }
1674
1675 vari_new_resource_with_cb!(
1676 check,
1677 client,
1678 fetcher.clone(),
1679 || 2,
1680 resource_type,
1681 arc
1682 );
1683 })
1684 .await;
1685 }
1686
1687 enum FetchQueryType {
1688 Fetch,
1689 Prefetch,
1690 UpdateAsync,
1691 UpdateAsyncLocal,
1692 }
1693
1694 #[rstest]
1695 #[tokio::test]
1696 async fn test_invalidation_during_inflight_queries(
1697 #[values(
1698 FetchQueryType::Fetch,
1699 FetchQueryType::Prefetch,
1700 FetchQueryType::UpdateAsync,
1701 FetchQueryType::UpdateAsyncLocal
1702 )]
1703 fetch_query_type: FetchQueryType,
1704 #[values(
1705 InvalidationType::Query,
1706 InvalidationType::Scope,
1707 InvalidationType::Predicate,
1708 InvalidationType::All,
1709 InvalidationType::Clear
1710 )]
1711 invalidation_type: InvalidationType,
1712 ) {
1713 identify_parking_lot_deadlocks();
1714 tokio::task::LocalSet::new()
1715 .run_until(async move {
1716 let (client, _guard, _owner) = prep_vari!(false);
1717
1718 const FETCH_SLEEP_MS: u64 = 200;
1719 const UPDATE_SLEEP_MS: u64 = 200;
1720
1721 let num_calls = Arc::new(AtomicUsize::new(0));
1722 let num_completed_calls = Arc::new(AtomicUsize::new(0));
1723 let query_scope = {
1724 let num_calls = num_calls.clone();
1725 let num_completed_calls = num_completed_calls.clone();
1726 move || {
1727 let num_calls = num_calls.clone();
1728 let num_completed_calls = num_completed_calls.clone();
1729 async move {
1730 num_calls.fetch_add(1, Ordering::Relaxed);
1731 tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_SLEEP_MS))
1732 .await;
1733 num_completed_calls.fetch_add(1, Ordering::Relaxed);
1734 "initial_value"
1735 }
1736 }
1737 };
1738
1739 let get_fetch_fut = || match fetch_query_type {
1740 FetchQueryType::Fetch => Either::Left(Either::Left(async {
1741 client.fetch_query(query_scope.clone(), ()).await;
1742 })),
1743 FetchQueryType::Prefetch => Either::Left(Either::Right(async {
1744 client.prefetch_query(query_scope.clone(), ()).await;
1745 })),
1746 FetchQueryType::UpdateAsync => Either::Right(Either::Left(async {
1747 client
1748 .update_query_async(query_scope.clone(), (), async |value| {
1749 tokio::time::sleep(tokio::time::Duration::from_millis(
1750 UPDATE_SLEEP_MS,
1751 ))
1752 .await;
1753 *value = "modified_value";
1754 })
1755 .await;
1756 })),
1757 FetchQueryType::UpdateAsyncLocal => Either::Right(Either::Right(async {
1758 client
1759 .update_query_async_local(query_scope.clone(), (), async |value| {
1760 tokio::time::sleep(tokio::time::Duration::from_millis(
1761 UPDATE_SLEEP_MS,
1762 ))
1763 .await;
1764 *value = "modified_value";
1765 })
1766 .await;
1767 })),
1768 };
1769
1770 let (_, _) = tokio::join!(
1771 async {
1772 tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_SLEEP_MS / 2))
1774 .await;
1775 invalidation_type.invalidate(&client, query_scope.clone(), &());
1776 },
1777 async {
1778 get_fetch_fut().await;
1779 }
1780 );
1781
1782 assert_eq!(num_calls.load(Ordering::Relaxed), 2);
1784 assert_eq!(num_completed_calls.load(Ordering::Relaxed), 1);
1785 assert!(!client.is_key_invalid(query_scope.clone(), ()));
1787
1788 if matches!(fetch_query_type, FetchQueryType::UpdateAsync)
1793 || matches!(fetch_query_type, FetchQueryType::UpdateAsyncLocal)
1794 {
1795 assert_eq!(
1796 client.get_cached_query(query_scope.clone(), ()),
1797 Some("modified_value")
1798 );
1799 client.clear();
1801 let (_, _) = tokio::join!(
1802 async {
1803 tokio::time::sleep(tokio::time::Duration::from_millis(
1805 FETCH_SLEEP_MS + (UPDATE_SLEEP_MS / 2),
1806 ))
1807 .await;
1808 invalidation_type.invalidate(&client, query_scope.clone(), &());
1809 },
1810 async {
1811 get_fetch_fut().await;
1812 }
1813 );
1814 if matches!(invalidation_type, InvalidationType::Clear) {
1815 assert_eq!(client.get_cached_query(query_scope.clone(), ()), None);
1816 } else {
1817 assert_eq!(
1818 client.get_cached_query(query_scope.clone(), ()),
1819 Some("modified_value")
1820 );
1821 assert!(client.is_key_invalid(query_scope.clone(), ()));
1823 }
1824 }
1825 })
1826 .await;
1827 }
1828
1829 #[rstest]
1830 #[tokio::test]
1831 async fn test_invalidation_hierarchy(#[values(false, true)] server_ctx: bool) {
1832 identify_parking_lot_deadlocks();
1833 tokio::task::LocalSet::new()
1834 .run_until(async move {
1835 let (fetcher, _fetch_calls) = default_fetcher();
1836 let (client, _guard, _owner) = prep_vari!(server_ctx);
1837
1838 let fetcher = fetcher.with_invalidation_link(|_key| vec!["base", "users"]);
1840 client.fetch_query(&fetcher, &2).await;
1841 assert!(!client.is_key_invalid(&fetcher, 2));
1842
1843 let hierarchy_parent_scope =
1845 QueryScope::new(async || ()).with_invalidation_link(|_k| ["base"]);
1846 client
1847 .fetch_query(hierarchy_parent_scope.clone(), &())
1848 .await;
1849 assert!(!client.is_key_invalid(&hierarchy_parent_scope, ()));
1850
1851 let hierarchy_child_scope = QueryScope::new(async |user_id| user_id)
1853 .with_invalidation_link(|user_id: &usize| {
1854 ["base".to_string(), "users".to_string(), user_id.to_string()]
1855 });
1856 client
1857 .fetch_query(hierarchy_child_scope.clone(), &100)
1858 .await;
1859 assert!(!client.is_key_invalid(&hierarchy_child_scope, 100));
1860
1861 let hierarchy_sibling_scope =
1863 QueryScope::new(async || ()).with_invalidation_link(|_k| ["base"]);
1864 client
1865 .fetch_query(hierarchy_sibling_scope.clone(), &())
1866 .await;
1867 assert!(!client.is_key_invalid(&hierarchy_sibling_scope, ()));
1868
1869 client.invalidate_query(&hierarchy_parent_scope, ());
1870 tick!();
1871 assert!(client.is_key_invalid(&hierarchy_parent_scope, ()));
1872 assert!(client.is_key_invalid(&fetcher, 2));
1873 assert!(client.is_key_invalid(&hierarchy_child_scope, 100));
1874
1875 client.fetch_query(&hierarchy_parent_scope, &()).await;
1877 client.fetch_query(&fetcher, &2).await;
1878 client
1879 .fetch_query(hierarchy_child_scope.clone(), &100)
1880 .await;
1881
1882 client.invalidate_query(&fetcher, 2);
1883 tick!();
1884 assert!(!client.is_key_invalid(&hierarchy_parent_scope, ()));
1885 assert!(client.is_key_invalid(&fetcher, 2));
1886 assert!(client.is_key_invalid(&hierarchy_child_scope, 100));
1887
1888 client.fetch_query(&hierarchy_parent_scope, &()).await;
1890 client.fetch_query(&fetcher, &2).await;
1891 client
1892 .fetch_query(hierarchy_child_scope.clone(), &100)
1893 .await;
1894
1895 client.invalidate_query(&hierarchy_child_scope, 100);
1896 tick!();
1897 assert!(!client.is_key_invalid(&hierarchy_parent_scope, ()));
1898 assert!(!client.is_key_invalid(&fetcher, 2));
1899 assert!(client.is_key_invalid(&hierarchy_child_scope, 100));
1900
1901 client.fetch_query(&hierarchy_parent_scope, &()).await;
1904 client.fetch_query(&fetcher, &2).await;
1905 client
1906 .fetch_query(hierarchy_sibling_scope.clone(), &())
1907 .await;
1908 client.invalidate_query(&hierarchy_sibling_scope, ());
1909 tick!();
1910 assert!(client.is_key_invalid(&hierarchy_parent_scope, ()));
1911 assert!(client.is_key_invalid(&hierarchy_sibling_scope, ()));
1912
1913 client.fetch_query(&hierarchy_parent_scope, &()).await;
1916 client.fetch_query(&fetcher, &2).await;
1917 client
1918 .fetch_query(hierarchy_sibling_scope.clone(), &())
1919 .await;
1920 client.invalidate_query(&hierarchy_parent_scope, ());
1921 tick!();
1922 assert!(client.is_key_invalid(&hierarchy_parent_scope, ()));
1923 assert!(client.is_key_invalid(&hierarchy_sibling_scope, ()));
1924 })
1925 .await;
1926 }
1927
1928 #[rstest]
1929 #[tokio::test]
1930 async fn test_same_key_invalidation(#[values(false, true)] server_ctx: bool) {
1931 identify_parking_lot_deadlocks();
1932 tokio::task::LocalSet::new()
1933 .run_until(async move {
1934 let (client, _guard, _owner) = prep_vari!(server_ctx);
1935
1936 let scope_a = QueryScope::new(async |id: usize| format!("result_a_{}", id))
1938 .with_invalidation_link(|_id: &usize| ["foo"]);
1939
1940 let scope_b = QueryScope::new(async |id: usize| format!("result_b_{}", id))
1941 .with_invalidation_link(|_id: &usize| ["foo"]);
1942
1943 client.fetch_query(&scope_a, &1).await;
1945 client.fetch_query(&scope_b, &2).await;
1946
1947 assert!(!client.is_key_invalid(&scope_a, 1));
1949 assert!(!client.is_key_invalid(&scope_b, 2));
1950
1951 client.invalidate_query(&scope_a, 1);
1954 tick!();
1955
1956 assert!(client.is_key_invalid(&scope_a, 1));
1958 assert!(client.is_key_invalid(&scope_b, 2));
1959
1960 client.fetch_query(&scope_a, &1).await;
1962 client.fetch_query(&scope_b, &2).await;
1963
1964 assert!(!client.is_key_invalid(&scope_a, 1));
1965 assert!(!client.is_key_invalid(&scope_b, 2));
1966
1967 client.invalidate_query(&scope_b, 2);
1969 tick!();
1970
1971 assert!(client.is_key_invalid(&scope_a, 1));
1972 assert!(client.is_key_invalid(&scope_b, 2));
1973
1974 let scope_c = QueryScope::new(async |id: usize| format!("result_c_{}", id))
1976 .with_invalidation_link(|_id: &usize| ["bar"]);
1977
1978 client.fetch_query(&scope_a, &1).await;
1979 client.fetch_query(&scope_b, &2).await;
1980 client.fetch_query(&scope_c, &3).await;
1981
1982 client.invalidate_query(&scope_c, 3);
1984 tick!();
1985
1986 assert!(!client.is_key_invalid(&scope_a, 1));
1987 assert!(!client.is_key_invalid(&scope_b, 2));
1988 assert!(client.is_key_invalid(&scope_c, 3));
1989
1990 client.invalidate_query(&scope_a, 1);
1992 tick!();
1993
1994 assert!(client.is_key_invalid(&scope_a, 1));
1995 assert!(client.is_key_invalid(&scope_b, 2));
1996 assert!(client.is_key_invalid(&scope_c, 3)); })
1998 .await;
1999 }
2000
2001 #[rstest]
2002 #[tokio::test]
2003 async fn test_key_tracked_autoreload(
2004 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
2005 #[values(false, true)] arc: bool,
2006 #[values(false, true)] server_ctx: bool,
2007 ) {
2008 identify_parking_lot_deadlocks();
2009 tokio::task::LocalSet::new()
2010 .run_until(async move {
2011 let (fetcher, fetch_calls) = default_fetcher();
2012
2013 let (client, _guard, _owner) = prep_vari!(server_ctx);
2014
2015 let add_size = RwSignal::new(1);
2016
2017 macro_rules! check {
2018 ($get_resource:expr) => {{
2019 let resource = $get_resource();
2020 let subscribed =
2021 client.subscribe_value(fetcher.clone(), move || add_size.get());
2022
2023 assert!(resource.get_untracked().is_none());
2025 assert!(resource.try_get_untracked().unwrap().is_none());
2026 assert!(resource.get().is_none());
2027 assert!(resource.try_get().unwrap().is_none());
2028 assert!(resource.read().is_none());
2029 assert!(resource.try_read().as_deref().unwrap().is_none());
2030 assert_eq!(subscribed.get_untracked(), None);
2031
2032 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
2034 let resource = $get_resource();
2035 assert_eq!($get_resource().await, 2);
2036 assert_eq!(subscribed.get_untracked(), Some(2));
2037 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2038
2039 add_size.set(2);
2041
2042 tick!();
2044 assert_eq!(resource.get(), Some(2));
2045 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2046
2047 tokio::time::sleep(std::time::Duration::from_millis(
2049 DEFAULT_FETCHER_MS + 10,
2050 ))
2051 .await;
2052 tick!();
2053
2054 assert_eq!(resource.get(), Some(4));
2056 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
2057 assert_eq!($get_resource().await, 4);
2058 assert_eq!(subscribed.get_untracked(), Some(4));
2059 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
2060 }
2061 }};
2062 }
2063
2064 vari_new_resource_with_cb!(
2065 check,
2066 client,
2067 fetcher.clone(),
2068 move || add_size.get(),
2069 resource_type,
2070 arc
2071 );
2072 })
2073 .await;
2074 }
2075
2076 #[rstest]
2078 #[tokio::test]
2079 async fn test_key_integrity(
2080 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
2081 #[values(false, true)] arc: bool,
2082 #[values(false, true)] server_ctx: bool,
2083 ) {
2084 identify_parking_lot_deadlocks();
2085 tokio::task::LocalSet::new()
2086 .run_until(async move {
2087 if cfg!(feature = "ssr") && resource_type == ResourceType::Local {
2089 return;
2090 }
2091
2092 let (fetcher, fetch_calls) = default_fetcher();
2093 let (client, _guard, _owner) = prep_vari!(server_ctx);
2094
2095 let keys = [1, 2, 3, 4, 5];
2096 let results = futures::future::join_all(keys.iter().cloned().map(|key| {
2097 let fetcher = fetcher.clone();
2098 async move {
2099 macro_rules! cb {
2100 ($get_resource:expr) => {{
2101 let resource = $get_resource();
2102 resource.await
2103 }};
2104 }
2105 vari_new_resource_with_cb!(
2106 cb,
2107 client,
2108 fetcher,
2109 move || key,
2110 resource_type,
2111 arc
2112 )
2113 }
2114 }))
2115 .await;
2116 assert_eq!(results, vec![2, 4, 6, 8, 10]);
2117 assert_eq!(fetch_calls.load(Ordering::Relaxed), 5);
2118
2119 let results = futures::future::join_all(keys.iter().cloned().map(|key| {
2121 let fetcher = fetcher.clone();
2122 async move {
2123 macro_rules! cb {
2124 ($get_resource:expr) => {{
2125 let resource = $get_resource();
2126 resource.await
2127 }};
2128 }
2129 vari_new_resource_with_cb!(
2130 cb,
2131 client,
2132 fetcher,
2133 move || key,
2134 resource_type,
2135 arc
2136 )
2137 }
2138 }))
2139 .await;
2140 assert_eq!(results, vec![2, 4, 6, 8, 10]);
2141 assert_eq!(fetch_calls.load(Ordering::Relaxed), 5);
2142 })
2143 .await;
2144 }
2145
2146 #[rstest]
2148 #[tokio::test]
2149 async fn test_resource_race(
2150 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
2151 #[values(false, true)] arc: bool,
2152 #[values(false, true)] server_ctx: bool,
2153 ) {
2154 identify_parking_lot_deadlocks();
2155 tokio::task::LocalSet::new()
2156 .run_until(async move {
2157 if cfg!(feature = "ssr") && resource_type == ResourceType::Local {
2159 return;
2160 }
2161
2162 let (fetcher, fetch_calls) = default_fetcher();
2163 let (client, _guard, _owner) = prep_vari!(server_ctx);
2164
2165 let keyer = || 1;
2166 let results = futures::future::join_all((0..10).map(|_| {
2167 let fetcher = fetcher.clone();
2168 async move {
2169 macro_rules! cb {
2170 ($get_resource:expr) => {{
2171 let resource = $get_resource();
2172 resource.await
2173 }};
2174 }
2175 vari_new_resource_with_cb!(cb, client, fetcher, keyer, resource_type, arc)
2176 }
2177 }))
2178 .await
2179 .into_iter()
2180 .collect::<Vec<_>>();
2181 assert_eq!(results, vec![2; 10]);
2182 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2183 })
2184 .await;
2185 }
2186
2187 #[cfg(feature = "ssr")]
2188 #[tokio::test]
2189 async fn test_resource_cross_stream_caching() {
2190 identify_parking_lot_deadlocks();
2191 tokio::task::LocalSet::new()
2192 .run_until(async move {
2193 for maybe_sleep_ms in &[None, Some(10), Some(30)] {
2194 let (client, ssr_ctx, _owner) = prep_server!();
2195
2196 let fetch_calls = Arc::new(AtomicUsize::new(0));
2197 let fetcher = {
2198 let fetch_calls = fetch_calls.clone();
2199 move |key: u64| {
2200 fetch_calls.fetch_add(1, Ordering::Relaxed);
2201 async move {
2202 if let Some(sleep_ms) = maybe_sleep_ms {
2203 tokio::time::sleep(tokio::time::Duration::from_millis(
2204 *sleep_ms as u64,
2205 ))
2206 .await;
2207 }
2208 key * 2
2209 }
2210 }
2211 };
2212 let fetcher = QueryScope::new(fetcher);
2213
2214 let keyer = || 1;
2215
2216 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2218 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2219
2220 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2222 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2223
2224 let (client, _owner) = prep_client!(ssr_ctx);
2226
2227 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2229 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2230
2231 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2233 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2234
2235 tick!();
2237
2238 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
2242 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2243
2244 let (ssr_client, ssr_ctx, _owner) = prep_server!();
2246 fetch_calls.store(0, Ordering::Relaxed);
2247
2248 let ssr_resource_1 = ssr_client.arc_resource(fetcher.clone(), keyer);
2250 let ssr_resource_2 = ssr_client.arc_resource(fetcher.clone(), keyer);
2251
2252 let (hydrate_client, _owner) = prep_client!(ssr_ctx);
2253
2254 let hydrate_resource_1 = hydrate_client.arc_resource(fetcher.clone(), keyer);
2256 let hydrate_resource_2 = hydrate_client.arc_resource(fetcher.clone(), keyer);
2257
2258 let results = futures::future::join_all(
2260 vec![
2261 hydrate_resource_2,
2262 ssr_resource_1,
2263 ssr_resource_2,
2264 hydrate_resource_1,
2265 ]
2266 .into_iter()
2267 .map(|resource| async move { resource.await }),
2268 )
2269 .await
2270 .into_iter()
2271 .collect::<Vec<_>>();
2272
2273 assert_eq!(results, vec![2, 2, 2, 2]);
2274 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
2275
2276 tick!();
2277
2278 assert_eq!(hydrate_client.arc_resource(fetcher.clone(), keyer).await, 2);
2280 assert_eq!(
2281 fetch_calls.load(Ordering::Relaxed),
2282 1,
2283 "{maybe_sleep_ms:?}ms"
2284 );
2285 }
2286 })
2287 .await;
2288 }
2289}