1#![allow(clippy::module_inception)]
2#![allow(clippy::type_complexity)]
3#![allow(clippy::too_many_arguments)]
4#![warn(clippy::disallowed_types)]
5#![warn(missing_docs)]
6#![doc = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/README.md"))]
7#![cfg_attr(all(doc, CHANNEL_NIGHTLY), feature(doc_auto_cfg))]
9
10mod arc_local_signal;
11mod cache;
12mod cache_scope;
13mod debug_if_devtools_enabled;
14#[cfg(any(
15 all(debug_assertions, feature = "devtools"),
16 feature = "devtools-always"
17))]
18mod events;
19mod maybe_local;
20mod query;
21mod query_client;
22mod query_maybe_key;
23mod query_options;
24mod query_scope;
25mod resource_drop_guard;
26#[cfg(any(
27 all(debug_assertions, feature = "devtools"),
28 feature = "devtools-always"
29))]
30mod subs_client;
31mod subs_scope;
32mod trie;
33mod utils;
34mod value_with_callbacks;
35
36#[cfg(any(feature = "devtools", feature = "devtools-always"))]
37mod dev_tools;
38#[cfg(any(feature = "devtools", feature = "devtools-always"))]
39pub use dev_tools::QueryDevtools;
40
41pub use arc_local_signal::*;
42pub use query_client::*;
43pub use query_options::*;
44pub use query_scope::{QueryScope, QueryScopeLocal};
45
46#[cfg(test)]
47mod test {
48 use std::{
49 fmt::Debug,
50 marker::PhantomData,
51 ptr::NonNull,
52 sync::{
53 Arc,
54 atomic::{AtomicBool, AtomicUsize, Ordering},
55 },
56 };
57
58 use hydration_context::{
59 PinnedFuture, PinnedStream, SerializedDataId, SharedContext, SsrSharedContext,
60 };
61
62 use any_spawner::Executor;
63 use leptos::{error::ErrorId, prelude::*};
64
65 use rstest::*;
66
67 use crate::utils::OnDrop;
68
69 use super::*;
70
71 pub struct MockHydrateSharedContext {
72 id: AtomicUsize,
73 is_hydrating: AtomicBool,
74 during_hydration: AtomicBool,
75
76 resolved_resources: Vec<(SerializedDataId, String)>,
81 }
82
83 impl MockHydrateSharedContext {
84 async fn new(ssr_ctx: Option<&SsrSharedContext>) -> Self {
85 Self {
86 id: AtomicUsize::new(0),
87 is_hydrating: AtomicBool::new(true),
88 during_hydration: AtomicBool::new(true),
89 resolved_resources: if let Some(ssr_ctx) = ssr_ctx {
92 ssr_ctx.consume_buffers().await
93 } else {
94 vec![]
95 },
96 }
97 }
98 }
99
100 impl Debug for MockHydrateSharedContext {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("MockHydrateSharedContext").finish()
103 }
104 }
105
106 impl SharedContext for MockHydrateSharedContext {
107 fn is_browser(&self) -> bool {
108 true
109 }
110
111 fn next_id(&self) -> SerializedDataId {
112 let id = self.id.fetch_add(1, Ordering::Relaxed);
113 SerializedDataId::new(id)
114 }
115
116 fn write_async(&self, _id: SerializedDataId, _fut: PinnedFuture<String>) {}
117
118 fn read_data(&self, id: &SerializedDataId) -> Option<String> {
119 self.resolved_resources
120 .get(id.clone().into_inner())
121 .map(|(_, data)| data.to_string())
122 }
123
124 fn await_data(&self, _id: &SerializedDataId) -> Option<String> {
125 todo!()
126 }
127
128 fn pending_data(&self) -> Option<PinnedStream<String>> {
129 None
130 }
131
132 fn during_hydration(&self) -> bool {
133 self.during_hydration.load(Ordering::Relaxed)
134 }
135
136 fn hydration_complete(&self) {
137 self.during_hydration.store(false, Ordering::Relaxed)
138 }
139
140 fn get_is_hydrating(&self) -> bool {
141 self.is_hydrating.load(Ordering::Relaxed)
142 }
143
144 fn set_is_hydrating(&self, is_hydrating: bool) {
145 self.is_hydrating.store(is_hydrating, Ordering::Relaxed)
146 }
147
148 fn errors(&self, _boundary_id: &SerializedDataId) -> Vec<(ErrorId, leptos::error::Error)> {
149 vec![]
150 }
161
162 #[inline(always)]
163 fn register_error(
164 &self,
165 _error_boundary: SerializedDataId,
166 _error_id: ErrorId,
167 _error: leptos::error::Error,
168 ) {
169 }
170
171 #[inline(always)]
172 fn seal_errors(&self, _boundary_id: &SerializedDataId) {}
173
174 fn take_errors(&self) -> Vec<(SerializedDataId, ErrorId, leptos::error::Error)> {
175 vec![]
177 }
178
179 #[inline(always)]
180 fn defer_stream(&self, _wait_for: PinnedFuture<()>) {}
181
182 #[inline(always)]
183 fn await_deferred(&self) -> Option<PinnedFuture<()>> {
184 None
185 }
186
187 #[inline(always)]
188 fn set_incomplete_chunk(&self, _id: SerializedDataId) {}
189
190 fn get_incomplete_chunk(&self, _id: &SerializedDataId) -> bool {
191 false
193 }
194 }
195
196 macro_rules! prep_server {
197 () => {{
198 _ = Executor::init_tokio();
199 let ssr_ctx = Arc::new(SsrSharedContext::new());
200 let owner = Owner::new_root(Some(ssr_ctx.clone()));
201 owner.set();
202 let client = QueryClient::new();
203 (client, ssr_ctx, owner)
204 }};
205 }
206
207 macro_rules! prep_client {
208 () => {{
209 _ = Executor::init_tokio();
210 let owner = Owner::new_root(Some(Arc::new(MockHydrateSharedContext::new(None).await)));
211 owner.set();
212 let client = QueryClient::new();
213 (client, owner)
214 }};
215 ($ssr_ctx:expr) => {{
216 _ = Executor::init_tokio();
217 let owner = Owner::new_root(Some(Arc::new(
218 MockHydrateSharedContext::new(Some(&$ssr_ctx)).await,
219 )));
220 owner.set();
221 let client = QueryClient::new();
222 (client, owner)
223 }};
224 }
225
226 macro_rules! prep_vari {
227 ($server:expr) => {
228 if $server {
229 let (client, ssr_ctx, owner) = prep_server!();
230 (client, Some(ssr_ctx), owner)
231 } else {
232 let (client, owner) = prep_client!();
233 (client, None, owner)
234 }
235 };
236 }
237
238 macro_rules! tick {
239 () => {
240 Executor::tick().await;
243 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
244 };
245 }
246
247 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
248 enum ResourceType {
249 Local,
250 Normal,
251 Blocking,
252 }
253
254 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
255 enum InvalidationType {
256 Query,
257 Scope,
258 Predicate,
259 All,
260 }
261
262 macro_rules! vari_new_resource_with_cb {
263 ($cb:ident, $client:expr, $fetcher:expr, $keyer:expr, $resource_type:expr, $arc:expr) => {
264 match ($resource_type, $arc) {
265 (ResourceType::Local, true) => {
266 $cb!(|| $client.arc_local_resource($fetcher, $keyer))
267 }
268 (ResourceType::Local, false) => {
269 $cb!(|| $client.local_resource($fetcher, $keyer))
270 }
271 (ResourceType::Normal, true) => {
272 $cb!(|| $client.arc_resource($fetcher, $keyer))
273 }
274 (ResourceType::Normal, false) => {
275 $cb!(|| $client.resource($fetcher, $keyer))
276 }
277 (ResourceType::Blocking, true) => {
278 $cb!(|| $client.arc_resource_blocking($fetcher, $keyer))
279 }
280 (ResourceType::Blocking, false) => {
281 $cb!(|| $client.resource_blocking($fetcher, $keyer))
282 }
283 }
284 };
285 }
286
287 const DEFAULT_FETCHER_MS: u64 = 30;
288 fn default_fetcher() -> (QueryScope<u64, u64>, Arc<AtomicUsize>) {
289 let fetch_calls = Arc::new(AtomicUsize::new(0));
290 let fetcher_src = {
291 let fetch_calls = fetch_calls.clone();
292 move |key: u64| {
293 let fetch_calls = fetch_calls.clone();
294 async move {
295 tokio::time::sleep(tokio::time::Duration::from_millis(DEFAULT_FETCHER_MS))
296 .await;
297 fetch_calls.fetch_add(1, Ordering::Relaxed);
298 key * 2
299 }
300 }
301 };
302 (QueryScope::new(fetcher_src), fetch_calls)
303 }
304
305 fn identify_parking_lot_deadlocks() {
306 static ONCE: std::sync::Once = std::sync::Once::new();
307 ONCE.call_once(|| {
308 std::thread::spawn(move || {
309 loop {
310 std::thread::sleep(std::time::Duration::from_secs(5));
311 let deadlocks = parking_lot::deadlock::check_deadlock();
312 if deadlocks.is_empty() {
313 continue;
314 }
315
316 println!("{} deadlocks detected", deadlocks.len());
317 for (i, threads) in deadlocks.iter().enumerate() {
318 println!("Deadlock #{}", i);
319 for t in threads {
320 println!("Thread Id {:#?}", t.thread_id());
321 println!("{:#?}", t.backtrace());
322 }
323 }
324 }
325 });
326 });
327 }
328
329 #[rstest]
330 #[tokio::test]
331 async fn test_codecs(
332 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
333 #[values(false, true)] arc: bool,
334 #[values(false, true)] server_ctx: bool,
335 ) {
336 identify_parking_lot_deadlocks();
337 tokio::task::LocalSet::new()
338 .run_until(async move {
339 let (fetcher, _fetch_calls) = default_fetcher();
340
341 let (client_default, _guard, _owner) = prep_vari!(server_ctx);
342 let client_custom =
343 QueryClient::new().set_codec::<codee::binary::FromToBytesCodec>();
344 use_context::<QueryClient>();
345 use_context::<QueryClient<codee::binary::FromToBytesCodec>>();
346
347 macro_rules! check {
348 ($get_resource:expr) => {{
349 let resource = $get_resource();
350 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
352 assert_eq!(resource.await, 4);
353 }
354 }};
355 }
356
357 vari_new_resource_with_cb!(
358 check,
359 client_default,
360 fetcher.clone(),
361 move || 2,
362 resource_type,
363 arc
364 );
365 vari_new_resource_with_cb!(
366 check,
367 client_custom,
368 fetcher.clone(),
369 move || 2,
370 resource_type,
371 arc
372 );
373 })
374 .await;
375 }
376
377 #[rstest]
378 #[tokio::test]
379 async fn test_no_query_args() {
380 identify_parking_lot_deadlocks();
381 tokio::task::LocalSet::new()
382 .run_until(async move {
383 let (client, _guard, _owner) = prep_vari!(false);
384
385 async fn fn_no_arg() -> &'static str {
386 "no_arg"
387 }
388
389 async fn fn_with_arg(arg: &'static str) -> &'static str {
390 arg
391 }
392
393 assert_eq!(client.fetch_query(fn_no_arg, ()).await, "no_arg");
394 assert_eq!(
395 client.fetch_query(fn_with_arg, "with_arg").await,
396 "with_arg"
397 );
398
399 assert_eq!(
400 client.fetch_query(QueryScope::new(fn_no_arg), ()).await,
401 "no_arg"
402 );
403 assert_eq!(
404 client
405 .fetch_query(QueryScope::new(fn_with_arg), "with_arg")
406 .await,
407 "with_arg"
408 );
409
410 assert_eq!(
411 client
412 .fetch_query_local(QueryScopeLocal::new(fn_no_arg), ())
413 .await,
414 "no_arg"
415 );
416 assert_eq!(
417 client
418 .fetch_query_local(QueryScopeLocal::new(fn_with_arg), "with_arg")
419 .await,
420 "with_arg"
421 );
422 })
423 .await;
424 }
425
426 #[rstest]
430 #[tokio::test]
431 async fn test_shared_cache() {
432 identify_parking_lot_deadlocks();
433 tokio::task::LocalSet::new()
434 .run_until(async move {
435 let (fetcher, _fetch_calls) = default_fetcher();
436 let (client, _guard, _owner) = prep_vari!(false);
437
438 client.set_query_local(&fetcher, 2, 1);
440 assert_eq!(client.get_cached_query(&fetcher, 2), Some(1));
441
442 std::thread::spawn({
444 let fetcher = fetcher.clone();
445 move || {
446 tokio::runtime::Builder::new_current_thread()
447 .build()
448 .unwrap()
449 .block_on(async move {
450 assert_eq!(client.get_cached_query(&fetcher, 2), None);
452
453 client.set_query(&fetcher, 2, 3);
455 client.set_query_local(&fetcher, 2, 2);
456 });
457 }
458 })
459 .join()
460 .unwrap();
461 assert_eq!(client.get_cached_query(&fetcher, 2), Some(3));
463
464 let fetcher = fetcher.clone();
466 assert_eq!(client.get_cached_query(&fetcher, 2), Some(3));
467
468 let (fetcher_2, _fetcher_2_calls) = default_fetcher();
470 assert_eq!(client.get_cached_query(&fetcher_2, 2), Some(3));
471
472 let fetcher = QueryScope::new(move |key| {
474 let fetcher = fetcher.clone();
475 async move { query_scope::QueryScopeTrait::query(&fetcher, key).await }
476 });
477 assert_eq!(client.get_cached_query(&fetcher, 2), None);
478 })
479 .await;
480 }
481
482 #[rstest]
484 #[tokio::test]
485 async fn test_infinite() {
486 identify_parking_lot_deadlocks();
487
488 #[cfg(not(feature = "ssr"))]
490 tokio::task::LocalSet::new()
491 .run_until(async move {
492 #[derive(Clone, Debug, PartialEq, Eq)]
493 struct InfiniteItem(usize);
494
495 #[derive(Clone, Debug, PartialEq, Eq)]
496 struct InfiniteList {
497 items: Vec<InfiniteItem>,
498 offset: usize,
499 more_available: bool,
500 }
501
502 async fn get_list_items(offset: usize) -> Vec<InfiniteItem> {
503 (offset..offset + 10).map(InfiniteItem).collect()
504 }
505
506 async fn get_list_query(_key: ()) -> InfiniteList {
507 let items = get_list_items(0).await;
508 InfiniteList {
509 offset: items.len(),
510 more_available: !items.is_empty(),
511 items,
512 }
513 }
514
515 let (client, _guard, _owner) = prep_vari!(false);
516
517 let resource = client.local_resource(get_list_query, || ());
520 assert_eq!(
521 resource.await,
522 InfiniteList {
523 items: (0..10).map(InfiniteItem).collect::<Vec<_>>(),
524 offset: 10,
525 more_available: true
526 }
527 );
528
529 client
531 .update_query_async(get_list_query, (), async |last| {
532 if last.more_available {
533 let next_items = get_list_items(last.offset).await;
534 last.offset += next_items.len();
535 last.more_available = !next_items.is_empty();
536 last.items.extend(next_items);
537 }
538 })
539 .await;
540
541 assert_eq!(
543 client.get_cached_query(get_list_query, ()),
544 Some(InfiniteList {
545 items: (0..20).map(InfiniteItem).collect::<Vec<_>>(),
546 offset: 20,
547 more_available: true
548 })
549 );
550 })
551 .await;
552 }
553
554 #[rstest]
562 #[tokio::test]
563 async fn test_declaratives() {
564 identify_parking_lot_deadlocks();
565 tokio::task::LocalSet::new()
566 .run_until(async move {
567 let (fetcher, _fetch_calls) = default_fetcher();
568 let (client, _guard, _owner) = prep_vari!(false);
569
570 let key = 1;
571
572 let value_sub_react_count = Arc::new(AtomicUsize::new(0));
574 let value_sub = client.subscribe_value(&fetcher, move || key);
575 Effect::new_isomorphic({
576 let value_sub_react_count = value_sub_react_count.clone();
577 move || {
578 value_sub.get();
579 value_sub_react_count.fetch_add(1, Ordering::Relaxed);
580 }
581 });
582
583 macro_rules! maybe_reacts {
584 ($reacts:expr, $block:expr) => {{
585 tick!();
586 let before = value_sub_react_count.load(Ordering::Relaxed);
587 let result = $block;
588 tick!();
589 let after = value_sub_react_count.load(Ordering::Relaxed);
590 if $reacts {
591 assert_eq!(
592 after,
593 before + 1,
594 "{} != {}, didn't react like expected",
595 after,
596 before
597 );
598 } else {
599 assert_eq!(
600 after, before,
601 "{} != {}, reacted when it shouldn't",
602 after, before
603 );
604 }
605 result
606 }};
607 }
608
609 assert!(!client.query_exists(&fetcher, key));
610 client.set_query_local(&fetcher, key, 1);
611 assert_eq!(client.get_cached_query(&fetcher, key), Some(1));
612
613 maybe_reacts!(
614 true,
615 assert!(client.update_query(&fetcher, key, |value| {
616 value
617 .map(|v| {
618 *v = 2;
619 true
620 })
621 .unwrap_or(false)
622 }))
623 );
624
625 assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
626
627 maybe_reacts!(true, client.set_query(&fetcher, key, 3));
628
629 assert_eq!(client.get_cached_query(&fetcher, key), Some(3));
630
631 maybe_reacts!(
632 true,
633 assert!(client.update_query(&fetcher, key, |value| {
634 value
635 .map(|v| {
636 *v *= 2;
637 true
638 })
639 .unwrap_or(false)
640 }))
641 );
642
643 maybe_reacts!(true, client.update_query(&fetcher, key, |_value| {}));
645 maybe_reacts!(
646 false,
647 client.update_query(&fetcher, key, |_value| { client.untrack_update_query() })
648 );
649
650 assert_eq!(client.get_cached_query(&fetcher, key), Some(6));
651 assert!(client.query_exists(&fetcher, key));
652
653 assert!(client.clear_query(&fetcher, key));
654 assert!(!client.query_exists(&fetcher, key));
655
656 maybe_reacts!(true, client.prefetch_query_local(&fetcher, key).await);
657
658 assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
659 client.clear();
660 assert_eq!(client.size(), 0);
661 maybe_reacts!(true, client.prefetch_query(&fetcher, key).await);
662 assert_eq!(client.get_cached_query(&fetcher, key), Some(2));
663
664 assert!(client.clear_query(&fetcher, key));
665 assert!(!client.query_exists(&fetcher, key));
666 maybe_reacts!(
667 true,
668 assert_eq!(client.fetch_query_local(&fetcher, key).await, 2)
669 );
670 assert!(client.query_exists(&fetcher, key));
671 client.clear();
672 assert_eq!(client.size(), 0);
673 maybe_reacts!(true, assert_eq!(client.fetch_query(&fetcher, key).await, 2));
674
675 maybe_reacts!(
677 true,
678 assert_eq!(
679 client
680 .update_query_async(&fetcher, key, async |value| {
681 *value += 1;
682 *value
683 })
684 .await,
685 3
686 )
687 );
688 assert_eq!(client.get_cached_query(&fetcher, key), Some(3));
689 maybe_reacts!(
691 true,
692 client
693 .update_query_async(&fetcher, key, async |_value| {})
694 .await
695 );
696 maybe_reacts!(
697 false,
698 client
699 .update_query_async(&fetcher, key, async |_value| {
700 client.untrack_update_query()
701 })
702 .await
703 );
704
705 maybe_reacts!(
706 true,
707 assert_eq!(
708 client
709 .update_query_async_local(&fetcher, key, async |value| {
710 *value += 1;
711 *value
712 })
713 .await,
714 4
715 )
716 );
717 assert_eq!(client.get_cached_query(&fetcher, key), Some(4));
718 maybe_reacts!(
720 true,
721 client
722 .update_query_async_local(&fetcher, key, async |_value| {})
723 .await
724 );
725 maybe_reacts!(
726 false,
727 client
728 .update_query_async_local(&fetcher, key, async |_value| {
729 client.untrack_update_query()
730 })
731 .await
732 );
733
734 let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), move || key);
736 assert!(!is_fetching.get_untracked());
737 tokio::join!(
738 async {
739 assert_eq!(
740 client
741 .update_query_async(&fetcher, key, async |value| {
742 tokio::time::sleep(tokio::time::Duration::from_millis(30))
743 .await;
744 *value += 1;
745 *value
746 })
747 .await,
748 5
749 );
750 },
751 async {
752 let elapsed = std::time::Instant::now();
753 tokio::time::sleep(tokio::time::Duration::from_millis(5)).await;
754 tick!();
755 while elapsed.elapsed().as_millis() < 25 {
756 assert!(is_fetching.get_untracked());
757 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
758 }
759 }
760 );
761 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
763 assert!(!is_fetching.get_untracked());
764 })
765 .await;
766 }
767
768 #[rstest]
770 #[tokio::test]
771 async fn test_refetch(
772 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
773 #[values(false, true)] arc: bool,
774 ) {
775 identify_parking_lot_deadlocks();
776 tokio::task::LocalSet::new()
777 .run_until(async move {
778 const REFETCH_TIME_MS: u64 = 100;
779 const FETCH_TIME_MS: u64 = 10;
780
781 let fetch_calls = Arc::new(AtomicUsize::new(0));
782 let fetcher = {
783 let fetch_calls = fetch_calls.clone();
784 move |key: u64| {
785 fetch_calls.fetch_add(1, Ordering::Relaxed);
786 async move {
787 tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_TIME_MS)).await;
788 key * 2
789 }
790 }
791 };
792 let fetcher = QueryScope::new(
793 fetcher
794 ).with_options(QueryOptions::new().with_refetch_interval(std::time::Duration::from_millis(REFETCH_TIME_MS)));
795
796 let (client, _guard, owner) = prep_vari!(false);
797
798 macro_rules! with_tmp_owner {
799 ($body:block) => {{
800 let tmp_owner = owner.child();
801 tmp_owner.set();
802 let result = $body;
803 tmp_owner.unset();
804 owner.set();
805 result
806 }};
807 }
808
809 macro_rules! check {
810 ($get_resource:expr) => {{
811
812 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
814
815 with_tmp_owner! {{
817 assert_eq!($get_resource().await, 4);
818 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
819 assert_eq!(client.size(), 1);
820
821 assert_eq!($get_resource().await, 4);
823 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
824 assert_eq!(client.size(), 1);
825 }}
826
827 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
829 tick!();
830 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
831
832 with_tmp_owner! {{
834 let _resource = $get_resource();
836 tokio::time::sleep(tokio::time::Duration::from_millis(FETCH_TIME_MS)).await;
837 tick!();
838 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
839
840 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
842 tick!();
843
844 assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
845 assert_eq!($get_resource().await, 4);
846 assert_eq!(fetch_calls.load(Ordering::Relaxed), 3);
847
848 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
850 tick!();
851
852 assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
853 assert_eq!($get_resource().await, 4);
854 assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
855 }}
856
857 tokio::time::sleep(tokio::time::Duration::from_millis(REFETCH_TIME_MS + FETCH_TIME_MS)).await;
859 tick!();
860 assert_eq!(fetch_calls.load(Ordering::Relaxed), 4);
861 }
862 }};
863 }
864
865 vari_new_resource_with_cb!(
866 check,
867 client,
868 fetcher.clone(),
869 || 2,
870 resource_type,
871 arc
872 );
873 })
874 .await;
875 }
876
877 #[rstest]
880 #[tokio::test]
881 async fn test_drop_semantics(#[values(false, true)] local: bool) {
882 tokio::task::LocalSet::new()
883 .run_until(async move {
884 let owner = Owner::default();
885
886 macro_rules! with_tmp_owner {
887 ($body:block) => {{
888 let tmp_owner = owner.child();
889 tmp_owner.set();
890 let result = $body;
891 tmp_owner.unset();
892 owner.set();
893 result
894 }};
895 }
896
897 let dropped = with_tmp_owner! {{
898 let dropped = Arc::new(AtomicBool::new(false));
899 let on_drop = Arc::new(OnDrop::new({
900 let dropped = dropped.clone();
901 move || {
902 dropped.store(true, Ordering::Relaxed);
903 }}));
904 if local {
905 ArenaItem::<_, SyncStorage>::new_with_storage(ArcAsyncDerived::new_unsync(
906 move || {
907 let _on_drop = on_drop.clone();
908 async move {
909 }
910 })
911 );
912 } else {
913 ArenaItem::<_, SyncStorage>::new_with_storage(ArcAsyncDerived::new(
914 move || {
915 let _on_drop = on_drop.clone();
916 async move {
917 }
918 })
919 );
920 }
921 assert!(!dropped.load(Ordering::Relaxed));
922 dropped
923 }};
924 tick!();
925 assert!(dropped.load(Ordering::Relaxed));
926 })
927 .await;
928 }
929
930 #[rstest]
932 #[tokio::test]
933 async fn test_gc(
934 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
935 #[values(false, true)] arc: bool,
936 ) {
937 identify_parking_lot_deadlocks();
938 tokio::task::LocalSet::new()
939 .run_until(async move {
940 const GC_TIME_MS: u64 = 30;
941
942 let fetch_calls = Arc::new(AtomicUsize::new(0));
943 let fetcher = {
944 let fetch_calls = fetch_calls.clone();
945 move |key: u64| {
946 fetch_calls.fetch_add(1, Ordering::Relaxed);
947 async move {
948 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
949 key * 2
950 }
951 }
952 };
953 let fetcher = QueryScope::new(
954 fetcher
955 ).with_options(QueryOptions::new().with_gc_time(std::time::Duration::from_millis(GC_TIME_MS)));
956
957 let (client, _guard, owner) = prep_vari!(false);
958
959 macro_rules! with_tmp_owner {
960 ($body:block) => {{
961 let tmp_owner = owner.child();
962 tmp_owner.set();
963 let result = $body;
964 tmp_owner.unset();
965 owner.set();
966 result
967 }};
968 }
969
970 macro_rules! check {
971 ($get_resource:expr) => {{
972 let subscribed = client.subscribe_value(fetcher.clone(), move || 2);
973
974 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
976
977 with_tmp_owner! {{
979 assert_eq!($get_resource().await, 4);
980 assert_eq!(subscribed.get_untracked(), Some(4));
981 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
982 assert_eq!(client.size(), 1);
983
984 tick!();
986 assert_eq!($get_resource().await, 4);
987 assert_eq!(subscribed.get_untracked(), Some(4));
988 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
989 assert_eq!(client.size(), 1);
990 }}
991
992 with_tmp_owner! {{
994 tick!();
995 assert_eq!($get_resource().await, 4);
996 assert_eq!(subscribed.get_untracked(), Some(4));
997 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
998 assert_eq!(client.size(), 1);
999 }}
1000
1001 with_tmp_owner! {{
1003 let _resource = $get_resource();
1004
1005 tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1006 tick!();
1007
1008 assert_eq!($get_resource().await, 4);
1010 assert_eq!(subscribed.get_untracked(), Some(4));
1011 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1012 assert_eq!(client.size(), 1);
1013 }}
1014
1015 with_tmp_owner! {{
1017 assert_eq!(client.size(), 1);
1018
1019 assert_eq!(subscribed.get_untracked(), Some(4));
1020 tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1021 tick!();
1022 assert_eq!(subscribed.get_untracked(), None);
1023
1024 assert_eq!($get_resource().await, 4);
1025 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1026 }}
1027
1028 tokio::time::sleep(tokio::time::Duration::from_millis(GC_TIME_MS)).await;
1030 tick!();
1031 assert_eq!(client.size(), 0);
1032 assert_eq!(subscribed.get_untracked(), None);
1033 }
1034 }};
1035 }
1036
1037 vari_new_resource_with_cb!(
1038 check,
1039 client,
1040 fetcher.clone(),
1041 || 2,
1042 resource_type,
1043 arc
1044 );
1045 })
1046 .await;
1047 }
1048
1049 #[rstest]
1051 #[tokio::test]
1052 async fn test_unsync(#[values(false, true)] arc: bool) {
1053 identify_parking_lot_deadlocks();
1054 tokio::task::LocalSet::new()
1055 .run_until(async move {
1056 #[derive(Debug)]
1057 struct UnsyncValue(u64, PhantomData<NonNull<()>>);
1058 impl PartialEq for UnsyncValue {
1059 fn eq(&self, other: &Self) -> bool {
1060 self.0 == other.0
1061 }
1062 }
1063 impl Eq for UnsyncValue {}
1064 impl Clone for UnsyncValue {
1065 fn clone(&self) -> Self {
1066 Self(self.0, PhantomData)
1067 }
1068 }
1069 impl UnsyncValue {
1070 fn new(value: u64) -> Self {
1071 Self(value, PhantomData)
1072 }
1073 }
1074
1075 let fetch_calls = Arc::new(AtomicUsize::new(0));
1076 let fetcher = {
1077 let fetch_calls = fetch_calls.clone();
1078 move |key: u64| {
1079 fetch_calls.fetch_add(1, Ordering::Relaxed);
1080 async move {
1081 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1082 UnsyncValue::new(key * 2)
1083 }
1084 }
1085 };
1086 let fetcher = QueryScopeLocal::new(fetcher);
1087
1088 let (client, _guard, _owner) = prep_vari!(false);
1089
1090 macro_rules! check {
1091 ($get_resource:expr) => {{
1092 let resource = $get_resource();
1093 let subscribed = client.subscribe_value_local(fetcher.clone(), move || 2);
1094
1095 assert!(resource.get_untracked().is_none());
1097 assert!(resource.try_get_untracked().unwrap().is_none());
1098 assert!(resource.get().is_none());
1099 assert!(resource.try_get().unwrap().is_none());
1100 assert!(resource.read().is_none());
1101 assert!(resource.try_read().as_deref().unwrap().is_none());
1102 assert!(subscribed.get_untracked().is_none());
1103
1104 if cfg!(not(feature = "ssr")) {
1106 assert_eq!(resource.await, UnsyncValue::new(4));
1107 assert_eq!(subscribed.get_untracked(), Some(UnsyncValue::new(4)));
1108 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1109
1110 tick!();
1111
1112 assert_eq!($get_resource().await, UnsyncValue::new(4));
1113 assert_eq!(subscribed.get_untracked(), Some(UnsyncValue::new(4)));
1114 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1115 }
1116 }};
1117 }
1118
1119 match arc {
1120 true => {
1121 check!(|| client.arc_local_resource(fetcher.clone(), || 2))
1122 }
1123 false => {
1124 check!(|| client.local_resource(fetcher.clone(), || 2))
1125 }
1126 }
1127 })
1128 .await;
1129 }
1130
1131 #[rstest]
1132 #[tokio::test]
1133 async fn test_subscribe_is_fetching_and_loading(
1134 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1135 #[values(false, true)] arc: bool,
1136 #[values(false, true)] server_ctx: bool,
1137 ) {
1138 identify_parking_lot_deadlocks();
1139 tokio::task::LocalSet::new()
1140 .run_until(async move {
1141 let (fetcher, fetch_calls) = default_fetcher();
1142 let (client, _guard, _owner) = prep_vari!(server_ctx);
1143
1144 macro_rules! check {
1145 ($get_resource:expr) => {{
1146 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1148 assert_eq!(client.subscriber_count(), 0);
1149 let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1150 assert_eq!(is_fetching.get_untracked(), false);
1151 assert_eq!(client.subscriber_count(), 1);
1152 let is_fetching_clone = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1153 assert_eq!(is_fetching_clone.get_untracked(), false);
1154 assert_eq!(client.subscriber_count(), 1);
1156 let is_fetching_other = client.subscribe_is_fetching_arc(fetcher.clone(), || 3);
1157 assert_eq!(is_fetching_other.get_untracked(), false);
1158 assert_eq!(client.subscriber_count(), 2);
1159 let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1160 assert_eq!(is_loading.get_untracked(), false);
1161 assert_eq!(client.subscriber_count(), 3);
1162 let is_loading_other = client.subscribe_is_loading_arc(fetcher.clone(), || 3);
1163 assert_eq!(is_loading_other.get_untracked(), false);
1164 assert_eq!(client.subscriber_count(), 4);
1165
1166
1167 macro_rules! check_all {
1168 ($expected:expr) => {{
1169 assert_eq!(is_fetching.get_untracked(), $expected);
1170 assert_eq!(is_fetching_other.get_untracked(), $expected);
1171 assert_eq!(is_loading.get_untracked(), $expected);
1172 assert_eq!(is_loading_other.get_untracked(), $expected);
1173 }};
1174 }
1175
1176 check_all!(false);
1177
1178 tokio::join!(
1179 async {
1180 assert_eq!($get_resource().await, 4);
1181 },
1182 async {
1183 let elapsed = std::time::Instant::now();
1184 tick!();
1185 while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1186 assert_eq!(is_fetching.get_untracked(), true);
1187 assert_eq!(is_fetching_other.get_untracked(), false);
1188 assert_eq!(is_loading.get_untracked(), true);
1189 assert_eq!(is_loading_other.get_untracked(), false);
1190 tick!();
1191 }
1192 }
1193 );
1194 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1195
1196 check_all!(false);
1197
1198 assert_eq!($get_resource().await, 4);
1199 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1200
1201 check_all!(false);
1202
1203 tokio::join!(
1205 async {
1206 assert_eq!($get_resource().await, 4);
1207 },
1208 async {
1209 let elapsed = std::time::Instant::now();
1210 tick!();
1211 while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1212 check_all!(false);
1213 tick!();
1214 }
1215 }
1216 );
1217 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1218
1219 client.invalidate_query(fetcher.clone(), &2);
1220
1221 tokio::join!(
1222 async {
1223 assert_eq!($get_resource().await, 4);
1224 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1226 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1227 },
1228 async {
1229 let elapsed = std::time::Instant::now();
1230 tick!();
1231 while elapsed.elapsed().as_millis() < DEFAULT_FETCHER_MS.into() {
1232 assert_eq!(is_fetching.get_untracked(), true);
1233 assert_eq!(is_fetching_other.get_untracked(), false);
1234 assert_eq!(is_loading.get_untracked(), false);
1237 assert_eq!(is_loading_other.get_untracked(), false);
1238 tick!();
1239 }
1240 }
1241 );
1242 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1243
1244 drop(is_fetching);
1245 assert_eq!(client.subscriber_count(), 4);
1247 drop(is_fetching_clone);
1248 assert_eq!(client.subscriber_count(), 3);
1249 drop(is_loading);
1250 assert_eq!(client.subscriber_count(), 2);
1251 drop(is_fetching_other);
1252 assert_eq!(client.subscriber_count(), 1);
1253 drop(is_loading_other);
1254 assert_eq!(client.subscriber_count(), 0);
1255
1256 client.clear();
1257 assert_eq!(client.size(), 0);
1258
1259 tokio::join!(
1261 async {
1262 assert_eq!($get_resource().await, 4);
1263 },
1264 async {
1265 tick!();
1266 let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1267 let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1268 assert_eq!(is_fetching.get_untracked(), true);
1269 assert_eq!(is_loading.get_untracked(), true);
1270 assert_eq!(client.subscriber_count(), 2);
1271 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1272 assert_eq!(is_fetching.get_untracked(), false);
1273 assert_eq!(is_loading.get_untracked(), false);
1274 }
1275 );
1276 assert_eq!(client.subscriber_count(), 0);
1277
1278 client.invalidate_query(fetcher.clone(), &2);
1280 tokio::join!(
1281 async {
1282 assert_eq!($get_resource().await, 4);
1283 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1284 },
1285 async {
1286 tick!();
1287 let is_fetching = client.subscribe_is_fetching_arc(fetcher.clone(), || 2);
1288 let is_loading = client.subscribe_is_loading_arc(fetcher.clone(), || 2);
1289 assert_eq!(is_fetching.get_untracked(), true);
1290 assert_eq!(is_loading.get_untracked(), false);
1291 assert_eq!(client.subscriber_count(), 2);
1292 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1293 assert_eq!(is_fetching.get_untracked(), false);
1294 assert_eq!(is_loading.get_untracked(), false);
1295 }
1296 );
1297 assert_eq!(client.subscriber_count(), 0);
1298 client.clear();
1299
1300 let sub_key_signal = RwSignal::new(2);
1302 let resource_key_signal = RwSignal::new(2);
1303 let is_fetching = client.subscribe_is_fetching(fetcher.clone(), move || sub_key_signal.get());
1304 let is_loading = client.subscribe_is_loading(fetcher.clone(), move || sub_key_signal.get());
1305 assert_eq!(is_fetching.get_untracked(), false);
1306 assert_eq!(is_loading.get_untracked(), false);
1307
1308 let _resource = client.resource(fetcher.clone(), move || resource_key_signal.get());
1309
1310 assert_eq!(is_fetching.get_untracked(), true);
1312 assert_eq!(is_loading.get_untracked(), true);
1313 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1314 assert_eq!(is_fetching.get_untracked(), false);
1315 assert_eq!(is_loading.get_untracked(), false);
1316
1317 sub_key_signal.set(2);
1319 resource_key_signal.set(2);
1320 tick!();
1321 assert_eq!(is_fetching.get_untracked(), false);
1322 assert_eq!(is_loading.get_untracked(), false);
1323
1324 resource_key_signal.set(3);
1326 sub_key_signal.set(3);
1327 tick!();
1328 assert_eq!(is_fetching.get_untracked(), true);
1329 assert_eq!(is_loading.get_untracked(), true);
1330 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1331 assert_eq!(is_fetching.get_untracked(), false);
1332 assert_eq!(is_loading.get_untracked(), false);
1333 assert_eq!(client.get_cached_query(fetcher.clone(), &3), Some(6));
1334
1335 client.invalidate_query(fetcher.clone(), &3);
1337 tick!();
1338 assert_eq!(is_fetching.get_untracked(), true);
1339 assert_eq!(is_loading.get_untracked(), false);
1340 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1341 assert_eq!(is_fetching.get_untracked(), false);
1342 assert_eq!(is_loading.get_untracked(), false);
1343
1344 resource_key_signal.set(4);
1346 tick!();
1347 assert_eq!(is_fetching.get_untracked(), false);
1348 assert_eq!(is_loading.get_untracked(), false);
1349 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1350 assert_eq!(client.get_cached_query(fetcher.clone(), &4), Some(8));
1351
1352 let last_is_fetching_value = Arc::new(parking_lot::Mutex::new(None));
1355 Effect::new_isomorphic({
1356 let last_is_fetching_value = last_is_fetching_value.clone();
1357 move || {
1358 *last_is_fetching_value.lock() = Some(is_fetching.get());
1359 }});
1360 assert_eq!(*last_is_fetching_value.lock(), None);
1361 tick!();
1362 assert_eq!(*last_is_fetching_value.lock(), Some(false));
1363 resource_key_signal.set(6);
1364 sub_key_signal.set(6);
1365 assert_eq!(*last_is_fetching_value.lock(), Some(false));
1366 tick!();
1367 assert_eq!(*last_is_fetching_value.lock(), Some(true));
1368 assert_eq!(is_fetching.get_untracked(), true);
1369 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1370 assert_eq!(*last_is_fetching_value.lock(), Some(false));
1371 assert_eq!(is_fetching.get_untracked(), false);
1372 assert_eq!(client.get_cached_query(fetcher.clone(), &6), Some(12));
1373 }
1374 }};
1375 }
1376
1377 vari_new_resource_with_cb!(
1378 check,
1379 client,
1380 fetcher.clone(),
1381 || 2,
1382 resource_type,
1383 arc
1384 );
1385 })
1386 .await;
1387 }
1388
1389 #[rstest]
1391 #[tokio::test]
1392 async fn test_optional_key(
1393 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1394 #[values(false, true)] arc: bool,
1395 #[values(false, true)] server_ctx: bool,
1396 ) {
1397 identify_parking_lot_deadlocks();
1398 tokio::task::LocalSet::new()
1399 .run_until(async move {
1400 let (fetcher, fetch_calls) = default_fetcher();
1401 let (client, _guard, _owner) = prep_vari!(server_ctx);
1402
1403 let key_value = RwSignal::new(None);
1404 let keyer = move || key_value.get();
1405
1406 macro_rules! check {
1407 ($get_resource:expr) => {{
1408 let resource = $get_resource();
1409 assert_eq!(resource.get_untracked().flatten(), None);
1410 assert_eq!(resource.try_get_untracked().flatten().flatten(), None);
1411
1412 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1414 assert_eq!($get_resource().await, None);
1415
1416 let sub_is_loading =
1417 client.subscribe_is_loading(fetcher.clone(), keyer);
1418 let sub_is_fetching =
1419 client.subscribe_is_fetching(fetcher.clone(), keyer);
1420 let sub_value = client.subscribe_value(fetcher.clone(), keyer);
1421
1422 assert_eq!(sub_is_loading.get_untracked(), false);
1423 assert_eq!(sub_is_fetching.get_untracked(), false);
1424 assert_eq!(sub_value.get_untracked(), None);
1425
1426 key_value.set(Some(2));
1427 tick!();
1428
1429 assert_eq!(sub_is_loading.get_untracked(), true);
1430 assert_eq!(sub_is_fetching.get_untracked(), true);
1431
1432 assert_eq!($get_resource().await, Some(4));
1433 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1434 assert_eq!(sub_value.get_untracked(), Some(4));
1435 assert_eq!(sub_is_loading.get_untracked(), false);
1436 assert_eq!(sub_is_fetching.get_untracked(), false);
1437 }
1438 }};
1439 }
1440
1441 vari_new_resource_with_cb!(
1442 check,
1443 client,
1444 fetcher.clone(),
1445 keyer,
1446 resource_type,
1447 arc
1448 );
1449 })
1450 .await;
1451 }
1452
1453 #[rstest]
1455 #[tokio::test]
1456 async fn test_invalidation(
1457 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1458 #[values(false, true)] arc: bool,
1459 #[values(false, true)] server_ctx: bool,
1460 #[values(
1461 InvalidationType::Query,
1462 InvalidationType::Scope,
1463 InvalidationType::Predicate,
1464 InvalidationType::All
1465 )]
1466 invalidation_type: InvalidationType,
1467 ) {
1468 identify_parking_lot_deadlocks();
1469 tokio::task::LocalSet::new()
1470 .run_until(async move {
1471 let (fetcher, fetch_calls) = default_fetcher();
1472 let (client, _guard, _owner) = prep_vari!(server_ctx);
1473
1474 macro_rules! check {
1475 ($get_resource:expr) => {{
1476 let resource = $get_resource();
1477
1478 assert!(resource.get_untracked().is_none());
1480 assert!(resource.try_get_untracked().unwrap().is_none());
1481 assert!(resource.get().is_none());
1482 assert!(resource.try_get().unwrap().is_none());
1483 assert!(resource.read().is_none());
1484 assert!(resource.try_read().as_deref().unwrap().is_none());
1485
1486 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1488 assert_eq!(resource.await, 4);
1489 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1490
1491 tick!();
1492
1493 assert_eq!($get_resource().await, 4);
1495 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1496
1497 match invalidation_type {
1498 InvalidationType::Query => {
1499 client.invalidate_query(fetcher.clone(), &2);
1500 }
1501 InvalidationType::Scope => {
1502 client.invalidate_query_scope(fetcher.clone());
1503 }
1504 InvalidationType::Predicate => {
1505 client.invalidate_queries_with_predicate(fetcher.clone(), |key| key == &2);
1506 }
1507 InvalidationType::All => {
1508 client.invalidate_all_queries();
1509 }
1510 }
1511
1512 let resource2 = $get_resource();
1515 tick!();
1516 assert_eq!(resource2.get_untracked(), Some(4));
1517 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1518
1519 tick!();
1521 tokio::time::sleep(std::time::Duration::from_millis(DEFAULT_FETCHER_MS + 10)).await;
1522 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1523 assert_eq!($get_resource().await, 4);
1524 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1525 }
1526 }};
1527 }
1528
1529 vari_new_resource_with_cb!(
1530 check,
1531 client,
1532 fetcher.clone(),
1533 || 2,
1534 resource_type,
1535 arc
1536 );
1537 })
1538 .await;
1539 }
1540
1541 #[rstest]
1542 #[tokio::test]
1543 async fn test_invalidation_hierarchy(#[values(false, true)] server_ctx: bool) {
1544 identify_parking_lot_deadlocks();
1545 tokio::task::LocalSet::new()
1546 .run_until(async move {
1547 let (fetcher, _fetch_calls) = default_fetcher();
1548 let (client, _guard, _owner) = prep_vari!(server_ctx);
1549
1550 let fetcher = fetcher.with_invalidation_link(|_key| vec!["base", "users"]);
1552 client.fetch_query(&fetcher, &2).await;
1553 assert!(!client.is_key_invalid(&fetcher, &2));
1554
1555 let hierarchy_parent_scope =
1557 QueryScope::new(async || ()).with_invalidation_link(|_k| ["base"]);
1558 client
1559 .fetch_query(hierarchy_parent_scope.clone(), &())
1560 .await;
1561 assert!(!client.is_key_invalid(&hierarchy_parent_scope, &()));
1562
1563 let hierarchy_child_scope = QueryScope::new(async |user_id| user_id)
1565 .with_invalidation_link(|user_id: &usize| {
1566 ["base".to_string(), "users".to_string(), user_id.to_string()]
1567 });
1568 client
1569 .fetch_query(hierarchy_child_scope.clone(), &100)
1570 .await;
1571 assert!(!client.is_key_invalid(&hierarchy_child_scope, &100));
1572
1573 client.invalidate_query(&hierarchy_parent_scope, ());
1574 tick!();
1575 assert!(client.is_key_invalid(&hierarchy_parent_scope, &()));
1576 assert!(client.is_key_invalid(&fetcher, &2));
1577 assert!(client.is_key_invalid(&hierarchy_child_scope, &100));
1578
1579 client.fetch_query(&hierarchy_parent_scope, &()).await;
1581 client.fetch_query(&fetcher, &2).await;
1582 client
1583 .fetch_query(hierarchy_child_scope.clone(), &100)
1584 .await;
1585
1586 client.invalidate_query(&fetcher, 2);
1587 tick!();
1588 assert!(!client.is_key_invalid(&hierarchy_parent_scope, &()));
1589 assert!(client.is_key_invalid(&fetcher, &2));
1590 assert!(client.is_key_invalid(&hierarchy_child_scope, &100));
1591
1592 client.fetch_query(&hierarchy_parent_scope, &()).await;
1594 client.fetch_query(&fetcher, &2).await;
1595 client
1596 .fetch_query(hierarchy_child_scope.clone(), &100)
1597 .await;
1598
1599 client.invalidate_query(&hierarchy_child_scope, 100);
1600 tick!();
1601 assert!(!client.is_key_invalid(&hierarchy_parent_scope, &()));
1602 assert!(!client.is_key_invalid(&fetcher, &2));
1603 assert!(client.is_key_invalid(&hierarchy_child_scope, &100));
1604 })
1605 .await;
1606 }
1607
1608 #[rstest]
1609 #[tokio::test]
1610 async fn test_key_tracked_autoreload(
1611 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1612 #[values(false, true)] arc: bool,
1613 #[values(false, true)] server_ctx: bool,
1614 ) {
1615 identify_parking_lot_deadlocks();
1616 tokio::task::LocalSet::new()
1617 .run_until(async move {
1618 let (fetcher, fetch_calls) = default_fetcher();
1619
1620 let (client, _guard, _owner) = prep_vari!(server_ctx);
1621
1622 let add_size = RwSignal::new(1);
1623
1624 macro_rules! check {
1625 ($get_resource:expr) => {{
1626 let resource = $get_resource();
1627 let subscribed =
1628 client.subscribe_value(fetcher.clone(), move || add_size.get());
1629
1630 assert!(resource.get_untracked().is_none());
1632 assert!(resource.try_get_untracked().unwrap().is_none());
1633 assert!(resource.get().is_none());
1634 assert!(resource.try_get().unwrap().is_none());
1635 assert!(resource.read().is_none());
1636 assert!(resource.try_read().as_deref().unwrap().is_none());
1637 assert_eq!(subscribed.get_untracked(), None);
1638
1639 if cfg!(not(feature = "ssr")) || resource_type != ResourceType::Local {
1641 let resource = $get_resource();
1642 assert_eq!($get_resource().await, 2);
1643 assert_eq!(subscribed.get_untracked(), Some(2));
1644 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1645
1646 add_size.set(2);
1648
1649 tick!();
1651 assert_eq!(resource.get(), Some(2));
1652 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1653
1654 tokio::time::sleep(std::time::Duration::from_millis(
1656 DEFAULT_FETCHER_MS + 10,
1657 ))
1658 .await;
1659 tick!();
1660
1661 assert_eq!(resource.get(), Some(4));
1663 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1664 assert_eq!($get_resource().await, 4);
1665 assert_eq!(subscribed.get_untracked(), Some(4));
1666 assert_eq!(fetch_calls.load(Ordering::Relaxed), 2);
1667 }
1668 }};
1669 }
1670
1671 vari_new_resource_with_cb!(
1672 check,
1673 client,
1674 fetcher.clone(),
1675 move || add_size.get(),
1676 resource_type,
1677 arc
1678 );
1679 })
1680 .await;
1681 }
1682
1683 #[rstest]
1685 #[tokio::test]
1686 async fn test_key_integrity(
1687 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1688 #[values(false, true)] arc: bool,
1689 #[values(false, true)] server_ctx: bool,
1690 ) {
1691 identify_parking_lot_deadlocks();
1692 tokio::task::LocalSet::new()
1693 .run_until(async move {
1694 if cfg!(feature = "ssr") && resource_type == ResourceType::Local {
1696 return;
1697 }
1698
1699 let (fetcher, fetch_calls) = default_fetcher();
1700 let (client, _guard, _owner) = prep_vari!(server_ctx);
1701
1702 let keys = [1, 2, 3, 4, 5];
1703 let results = futures::future::join_all(keys.iter().cloned().map(|key| {
1704 let fetcher = fetcher.clone();
1705 async move {
1706 macro_rules! cb {
1707 ($get_resource:expr) => {{
1708 let resource = $get_resource();
1709 resource.await
1710 }};
1711 }
1712 vari_new_resource_with_cb!(
1713 cb,
1714 client,
1715 fetcher,
1716 move || key,
1717 resource_type,
1718 arc
1719 )
1720 }
1721 }))
1722 .await;
1723 assert_eq!(results, vec![2, 4, 6, 8, 10]);
1724 assert_eq!(fetch_calls.load(Ordering::Relaxed), 5);
1725
1726 let results = futures::future::join_all(keys.iter().cloned().map(|key| {
1728 let fetcher = fetcher.clone();
1729 async move {
1730 macro_rules! cb {
1731 ($get_resource:expr) => {{
1732 let resource = $get_resource();
1733 resource.await
1734 }};
1735 }
1736 vari_new_resource_with_cb!(
1737 cb,
1738 client,
1739 fetcher,
1740 move || key,
1741 resource_type,
1742 arc
1743 )
1744 }
1745 }))
1746 .await;
1747 assert_eq!(results, vec![2, 4, 6, 8, 10]);
1748 assert_eq!(fetch_calls.load(Ordering::Relaxed), 5);
1749 })
1750 .await;
1751 }
1752
1753 #[rstest]
1755 #[tokio::test]
1756 async fn test_resource_race(
1757 #[values(ResourceType::Local, ResourceType::Blocking, ResourceType::Normal)] resource_type: ResourceType,
1758 #[values(false, true)] arc: bool,
1759 #[values(false, true)] server_ctx: bool,
1760 ) {
1761 identify_parking_lot_deadlocks();
1762 tokio::task::LocalSet::new()
1763 .run_until(async move {
1764 if cfg!(feature = "ssr") && resource_type == ResourceType::Local {
1766 return;
1767 }
1768
1769 let (fetcher, fetch_calls) = default_fetcher();
1770 let (client, _guard, _owner) = prep_vari!(server_ctx);
1771
1772 let keyer = || 1;
1773 let results = futures::future::join_all((0..10).map(|_| {
1774 let fetcher = fetcher.clone();
1775 async move {
1776 macro_rules! cb {
1777 ($get_resource:expr) => {{
1778 let resource = $get_resource();
1779 resource.await
1780 }};
1781 }
1782 vari_new_resource_with_cb!(cb, client, fetcher, keyer, resource_type, arc)
1783 }
1784 }))
1785 .await
1786 .into_iter()
1787 .collect::<Vec<_>>();
1788 assert_eq!(results, vec![2; 10]);
1789 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1790 })
1791 .await;
1792 }
1793
1794 #[cfg(feature = "ssr")]
1795 #[tokio::test]
1796 async fn test_resource_cross_stream_caching() {
1797 identify_parking_lot_deadlocks();
1798 tokio::task::LocalSet::new()
1799 .run_until(async move {
1800 for maybe_sleep_ms in &[None, Some(10), Some(30)] {
1801 let (client, ssr_ctx, _owner) = prep_server!();
1802
1803 let fetch_calls = Arc::new(AtomicUsize::new(0));
1804 let fetcher = {
1805 let fetch_calls = fetch_calls.clone();
1806 move |key: u64| {
1807 fetch_calls.fetch_add(1, Ordering::Relaxed);
1808 async move {
1809 if let Some(sleep_ms) = maybe_sleep_ms {
1810 tokio::time::sleep(tokio::time::Duration::from_millis(
1811 *sleep_ms as u64,
1812 ))
1813 .await;
1814 }
1815 key * 2
1816 }
1817 }
1818 };
1819 let fetcher = QueryScope::new(fetcher);
1820
1821 let keyer = || 1;
1822
1823 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1825 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1826
1827 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1829 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1830
1831 let (client, _owner) = prep_client!(ssr_ctx);
1833
1834 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1836 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1837
1838 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1840 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1841
1842 tick!();
1844
1845 assert_eq!(client.arc_resource(fetcher.clone(), keyer).await, 2);
1849 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1850
1851 let (ssr_client, ssr_ctx, _owner) = prep_server!();
1853 fetch_calls.store(0, Ordering::Relaxed);
1854
1855 let ssr_resource_1 = ssr_client.arc_resource(fetcher.clone(), keyer);
1857 let ssr_resource_2 = ssr_client.arc_resource(fetcher.clone(), keyer);
1858
1859 let (hydrate_client, _owner) = prep_client!(ssr_ctx);
1860
1861 let hydrate_resource_1 = hydrate_client.arc_resource(fetcher.clone(), keyer);
1863 let hydrate_resource_2 = hydrate_client.arc_resource(fetcher.clone(), keyer);
1864
1865 let results = futures::future::join_all(
1867 vec![
1868 hydrate_resource_2,
1869 ssr_resource_1,
1870 ssr_resource_2,
1871 hydrate_resource_1,
1872 ]
1873 .into_iter()
1874 .map(|resource| async move { resource.await }),
1875 )
1876 .await
1877 .into_iter()
1878 .collect::<Vec<_>>();
1879
1880 assert_eq!(results, vec![2, 2, 2, 2]);
1881 assert_eq!(fetch_calls.load(Ordering::Relaxed), 1);
1882
1883 tick!();
1884
1885 assert_eq!(hydrate_client.arc_resource(fetcher.clone(), keyer).await, 2);
1887 assert_eq!(
1888 fetch_calls.load(Ordering::Relaxed),
1889 1,
1890 "{:?}ms",
1891 maybe_sleep_ms
1892 );
1893 }
1894 })
1895 .await;
1896 }
1897}