1use std::time::Duration;
2use std::{hash::Hash, sync::Arc};
3
4use leptos::prelude::*;
5
6use crate::{
7 PaginatedPageKey, QueryOptions, QueryScope, QueryScopeLocal, UntypedQueryClient, cache::OnScopeMissing,
8 debug_if_devtools_enabled::DebugIfDevtoolsEnabled, query_scope::ScopeCacheKey,
9};
10
11macro_rules! define {
12 ([$($impl_fut_generics:tt)*], [$($impl_fn_generics:tt)*], $name:ident, $fetch_fn:ident, $sname:literal) => {
13
14 impl<Key, PageItem> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>
15 where
16 Key: DebugIfDevtoolsEnabled + Clone + Hash + PartialEq + 'static $($impl_fn_generics)*,
17 PageItem: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
18 {
19 pub fn new_paginated_with_cursor<Cursor, Fut>(
67 getter: impl Fn(Key, usize, Option<Cursor>) -> Fut + 'static $($impl_fn_generics)*,
68 ) -> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>
69 where
70 Cursor: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
71 Fut: Future<Output = (Vec<PageItem>, Option<Cursor>)> $($impl_fut_generics)*,
72 {
73 let getter = Arc::new(getter);
74 let backing_cache_scope = $name::new({
75 let getter = getter.clone();
76 move |key: KeyWithItemCountRequestedUnhashed<Key>| {
77 let getter = getter.clone();
78 async move {
79 let (items, mut cursor) = getter(key.key, key.item_count_requested, None).await;
80
81 if items.is_empty() {
83 cursor = None;
84 }
85
86 BackingCache {
87 inner: Arc::new(BackingCacheInner {
88 items: parking_lot::Mutex::new(items),
89 cursor: parking_lot::Mutex::new(cursor),
90 update_lock: futures::lock::Mutex::new(()),
91 }),
92 }
93 }
94 }
95 })
96 .with_options(
98 QueryOptions::default()
99 .with_stale_time(Duration::MAX)
100 .with_gc_time(Duration::MAX)
101 .with_refetch_interval(Duration::MAX)
102 );
103
104 $name::new({
105 let backing_cache_scope = backing_cache_scope.clone();
106 move |page_key: PaginatedPageKey<Key>| {
107 let backing_cache_scope = backing_cache_scope.clone();
108 let getter = getter.clone();
109 async move {
110 let untyped_client = use_context::<UntypedQueryClient>()
111 .expect(
112 "leptos-fetch bug, UntypedQueryClient should always have been \
113 provided to the query context internally"
114 );
115 let scope_cache_key = use_context::<ScopeCacheKey>()
116 .expect(
117 "leptos-fetch bug, ScopeCacheKey itself should always have been \
118 provided to the query context internally"
119 );
120
121 if let Some(metadata) = untyped_client
125 .query_metadata::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>(
126 scope_cache_key,
127 &page_key,
128 )
129 && metadata.stale_or_invalidated
130 && let Some(backing_metadata) = untyped_client
131 .query_metadata::<KeyWithItemCountRequestedUnhashed<Key>, BackingCache<PageItem, Cursor>>(
132 backing_cache_scope.cache_key,
133 &KeyWithItemCountRequestedUnhashed {
134 key: page_key.key.clone(),
135 item_count_requested: 0, },
137 )
138 && backing_metadata.updated_at <= metadata.updated_at
139 {
140 untyped_client.invalidate_query(
141 &backing_cache_scope,
142 KeyWithItemCountRequestedUnhashed {
143 key: page_key.key.clone(),
144 item_count_requested: 0, },
146 );
147 }
148
149 let infinite_cache = untyped_client
150 .$fetch_fn(
151 backing_cache_scope,
152 KeyWithItemCountRequestedUnhashed {
153 key: page_key.key.clone(),
154 item_count_requested: page_key.page_size,
155 },
156 )
157 .await;
158
159 let target_idx_start = page_key.page_index * (page_key.page_size as usize);
160 let target_idx_end_exclusive = (page_key.page_index + 1) * (page_key.page_size as usize);
161
162 let should_request_x_more = || {
164 let items = infinite_cache.inner.items.lock();
165 if items.len() < target_idx_end_exclusive
166 && infinite_cache.inner.cursor.lock().is_some() {
167 Some(target_idx_end_exclusive - items.len())
168 } else {
169 None
170 }
171 };
172
173 if should_request_x_more().is_some() {
174 let mut _guard = infinite_cache.inner.update_lock.lock().await;
177 while let Some(amount_needed) = should_request_x_more() {
178 let cur_token = (&*infinite_cache.inner.cursor.lock()).clone();
179 let (items, cursor) =
180 getter(page_key.key.clone(), amount_needed, cur_token).await;
181 if !items.is_empty() {
182 infinite_cache.inner.items.lock().extend(items);
183 *infinite_cache.inner.cursor.lock() = cursor;
184 } else {
185 *infinite_cache.inner.cursor.lock() = None;
187 }
188 }
189 drop(_guard);
190 }
191
192 let items_guard = infinite_cache.inner.items.lock();
193 if target_idx_start >= items_guard.len() {
194 None
195 } else {
196 let items = items_guard
197 [target_idx_start..std::cmp::min(target_idx_end_exclusive, items_guard.len())]
198 .to_vec();
199 let next_page_exists = items_guard.len() > target_idx_end_exclusive
200 || infinite_cache.inner.cursor.lock().is_some();
201 Some((items, next_page_exists))
202 }
203 }
204 }
205 })
206 .on_invalidation({
208 let backing_cache_scope = backing_cache_scope.clone();
209 move |key| {
210 let untyped_client = use_context::<UntypedQueryClient>()
211 .expect(
212 "leptos-fetch bug, UntypedQueryClient should always have been \
213 provided to the on_invalidation context internally"
214 );
215 untyped_client.invalidate_query(
216 &backing_cache_scope,
217 KeyWithItemCountRequestedUnhashed {
218 key: key.key.clone(),
219 item_count_requested: 0, },
221 );
222 }
223 })
224 .on_gc(move |key| {
226 let untyped_client = use_context::<UntypedQueryClient>()
227 .expect(
228 "leptos-fetch bug, UntypedQueryClient should always have been \
229 provided to the on_gc context internally"
230 );
231 let scope_cache_key = use_context::<ScopeCacheKey>()
232 .expect(
233 "leptos-fetch bug, ScopeCacheKey itself should always have been \
234 provided to the on_gc context internally"
235 );
236 let mut found_nb = 0;
237 untyped_client
238 .scope_lookup
239 .with_cached_scope_mut::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>, _, _>(
240 &mut untyped_client.scope_lookup.scopes_mut(),
241 scope_cache_key,
242 OnScopeMissing::Skip,
243 |_| {},
244 |maybe_scope, _| {
245 if let Some(scope) = maybe_scope {
246 for query_or_pending in scope.all_queries_mut_include_pending() {
247 if query_or_pending
248 .key()
249 .value_if_safe()
250 .map(|test_key| test_key.key == key.key)
251 .unwrap_or(false)
252 {
253 found_nb += 1;
254 }
255 }
256 }
257 },
258 );
259 if found_nb == 0 {
260 untyped_client.clear_query(
261 &backing_cache_scope,
262 KeyWithItemCountRequestedUnhashed {
263 key: key.key.clone(),
264 item_count_requested: 0, },
266 );
267 }
268 })
269 }
270 }
271 };
272}
273
274define! { [+ Send], [+ Send + Sync], QueryScope, fetch_query, "QueryScope" }
275define! { [], [], QueryScopeLocal, fetch_query_local, "QueryScopeLocal" }
276
277#[derive(Debug, Clone)]
278struct KeyWithItemCountRequestedUnhashed<Key> {
279 key: Key,
280 item_count_requested: usize,
281}
282
283impl<Key: Hash> Hash for KeyWithItemCountRequestedUnhashed<Key> {
284 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
285 self.key.hash(state);
286 }
287}
288
289#[derive(Debug, Clone)]
290struct BackingCache<Item, Cursor> {
291 inner: Arc<BackingCacheInner<Item, Cursor>>,
292}
293
294#[derive(Debug)]
295struct BackingCacheInner<Item, Cursor> {
296 items: parking_lot::Mutex<Vec<Item>>,
297 cursor: parking_lot::Mutex<Option<Cursor>>,
298 update_lock: futures::lock::Mutex<()>,
299}
300
301#[cfg(test)]
302mod tests {
303 use any_spawner::Executor;
304 use hydration_context::SsrSharedContext;
305 use leptos::prelude::*;
306 use rstest::*;
307 use std::sync::Arc;
308 use std::sync::atomic::{AtomicUsize, Ordering};
309
310 use crate::test::prep_vari;
311 use crate::{PaginatedPageKey, QueryClient, QueryScope};
312
313 #[tokio::test]
318 async fn test_paginated_serialization_works() {
319 crate::test::identify_parking_lot_deadlocks();
320 tokio::task::LocalSet::new()
321 .run_until(async move {
322 let (client, _guard, _owner) = prep_vari!(true);
323 let scope = QueryScope::new_paginated_with_cursor(|_: (), _, _| async { (vec![()], Some(())) });
324 client.resource(scope, || PaginatedPageKey {
325 key: (),
326 page_index: 0,
327 page_size: 10,
328 });
329 })
330 .await;
331 }
332
333 fn get_simple_api_fn(
334 num_rows: usize,
335 ) -> (
336 Arc<AtomicUsize>,
337 impl Fn(usize, Option<usize>) -> (Vec<usize>, Option<usize>) + Clone + 'static,
338 ) {
339 let call_count = Arc::new(AtomicUsize::new(0));
340
341 let api_fn = {
342 let call_count = call_count.clone();
343 move |target_return_count: usize, offset: Option<usize>| {
344 let call_count = call_count.clone();
345 call_count.fetch_add(1, Ordering::Relaxed);
346
347 let offset = offset.unwrap_or(0);
348 let items = (0..num_rows).skip(offset).take(target_return_count).collect::<Vec<_>>();
349 let next_offset = if offset + target_return_count < num_rows {
350 Some(offset + items.len())
351 } else {
352 None
353 };
354 (items, next_offset)
355 }
356 };
357
358 (call_count, api_fn)
359 }
360
361 #[tokio::test]
362 async fn test_paginated_cursor() {
363 crate::test::identify_parking_lot_deadlocks();
364 tokio::task::LocalSet::new()
365 .run_until(async move {
366 let (client, _guard, _owner) = prep_vari!(true);
367
368 let (_call_count, my_api_fn) = get_simple_api_fn(30);
369
370 let scope = QueryScope::new_paginated_with_cursor(move |_query_key, page_size, offset| {
374 let my_api_fn = my_api_fn.clone();
375 async move {
376 let (items, maybe_next_offset) = my_api_fn(page_size, offset);
377 (items, maybe_next_offset)
378 }
379 });
380
381 let (first_page_logs, more_pages) = client
382 .fetch_query(
383 scope.clone(),
384 PaginatedPageKey {
385 key: (),
386 page_index: 0,
387 page_size: 20,
388 },
389 )
390 .await
391 .expect(
392 "This page should exist (Some()), \
393 None when a page is requested beyond the end of the data.",
394 );
395
396 assert_eq!(first_page_logs, (0..20).collect::<Vec<_>>());
397
398 assert!(
399 more_pages,
400 "There should be more pages after the first page, \
401 ROW_COUNT=30 which is > page size of 20."
402 );
403
404 let (second_page_logs, more_pages) = client
405 .fetch_query(
406 scope.clone(),
407 PaginatedPageKey {
408 key: (),
409 page_index: 1,
410 page_size: 20,
411 },
412 )
413 .await
414 .expect(
415 "This page should exist (Some()), \
416 None when a page is requested beyond the end of the data.",
417 );
418
419 assert_eq!(second_page_logs, (20..30).collect::<Vec<_>>());
420
421 assert!(
422 !more_pages,
423 "20+20=40 which is > ROW_COUNT=30, so no more pages after the second page."
424 );
425
426 assert!(
427 client
428 .fetch_query(
429 scope.clone(),
430 PaginatedPageKey {
431 key: (),
432 page_index: 2,
433 page_size: 20,
434 },
435 )
436 .await
437 .is_none()
438 );
439 })
440 .await;
441 }
442
443 #[tokio::test]
445 async fn test_paginated_cursor_fills_page_size_with_multiple_calls() {
446 crate::test::identify_parking_lot_deadlocks();
447 tokio::task::LocalSet::new()
448 .run_until(async move {
449 let (client, _guard, _owner) = prep_vari!(true);
450
451 let call_count = Arc::new(AtomicUsize::new(0));
452 let call_count_clone = call_count.clone();
453
454 let scope = QueryScope::new_paginated_with_cursor(move |_key: (), _page_size, continuation| {
456 let call_count = call_count_clone.clone();
457 async move {
458 call_count.fetch_add(1, Ordering::Relaxed);
459
460 match continuation {
462 None => {
463 (vec![0, 1, 2], Some(3))
465 }
466 Some(3) => {
467 (vec![3, 4, 5], Some(6))
469 }
470 Some(6) => {
471 (vec![6, 7, 8], Some(9))
473 }
474 Some(9) => {
475 (vec![9, 10], None) }
478 _ => (vec![], None),
479 }
480 }
481 });
482
483 let (items, has_more) = client
485 .fetch_query(
486 scope.clone(),
487 PaginatedPageKey {
488 key: (),
489 page_index: 0,
490 page_size: 10,
491 },
492 )
493 .await
494 .expect("Page should exist");
495
496 assert_eq!(items.len(), 10, "Should have filled to page_size");
497 assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
498 assert!(has_more, "Should indicate more pages available");
499 assert_eq!(
500 call_count.load(Ordering::Relaxed),
501 4,
502 "Should have made 4 API calls to fill page"
503 );
504
505 let (items, has_more) = client
507 .fetch_query(
508 scope.clone(),
509 PaginatedPageKey {
510 key: (),
511 page_index: 1,
512 page_size: 10,
513 },
514 )
515 .await
516 .expect("Page should exist");
517
518 assert_eq!(items.len(), 1, "Only 1 item left");
519 assert_eq!(items, vec![10]);
520 assert!(!has_more, "No more pages");
521 assert_eq!(
522 call_count.load(Ordering::Relaxed),
523 4,
524 "Should not make additional calls - data already cached"
525 );
526 })
527 .await;
528
529 let (_call_count, my_api_fn) = get_simple_api_fn(30);
530
531 let scope = QueryScope::new_paginated_with_cursor(move |_query_key, page_size, offset| {
533 let my_api_fn = my_api_fn.clone();
534 async move {
535 let (items, maybe_next_offset) = my_api_fn(page_size, offset);
536 (items, maybe_next_offset)
537 }
538 });
539
540 let client = QueryClient::new();
541
542 let (first_page, more_pages) = client
544 .fetch_query(
545 scope.clone(),
546 PaginatedPageKey {
547 key: (),
548 page_index: 0,
549 page_size: 20,
550 },
551 )
552 .await
553 .expect("First page should exist");
554
555 assert_eq!(first_page, (0..20).collect::<Vec<_>>());
556 assert!(more_pages);
557
558 let (second_page, more_pages) = client
560 .fetch_query(
561 scope.clone(),
562 PaginatedPageKey {
563 key: (),
564 page_index: 1,
565 page_size: 20,
566 },
567 )
568 .await
569 .expect("Second page should exist");
570
571 assert_eq!(second_page, (20..30).collect::<Vec<_>>());
572 assert!(!more_pages);
573
574 assert!(
576 client
577 .fetch_query(
578 scope.clone(),
579 PaginatedPageKey {
580 key: (),
581 page_index: 2,
582 page_size: 20,
583 },
584 )
585 .await
586 .is_none()
587 );
588 }
589
590 #[tokio::test]
592 async fn test_paginated_cursor_no_unnecessary_api_calls() {
593 crate::test::identify_parking_lot_deadlocks();
594 tokio::task::LocalSet::new()
595 .run_until(async move {
596 let (client, _guard, _owner) = prep_vari!(true);
597
598 let call_count = Arc::new(AtomicUsize::new(0));
599 let call_count_clone = call_count.clone();
600
601 let scope = QueryScope::new_paginated_with_cursor(move |_key: (), page_size, continuation| {
602 let call_count = call_count_clone.clone();
603 async move {
604 call_count.fetch_add(1, Ordering::Relaxed);
605
606 let offset = continuation.unwrap_or(0);
607 let items: Vec<usize> = (offset..offset + page_size).collect();
608 let next = if offset + page_size < 100 {
609 Some(offset + page_size)
610 } else {
611 None
612 };
613 (items, next)
614 }
615 });
616
617 let _ = client
619 .fetch_query(
620 scope.clone(),
621 PaginatedPageKey {
622 key: (),
623 page_index: 0,
624 page_size: 10,
625 },
626 )
627 .await;
628 assert_eq!(
629 call_count.load(Ordering::Relaxed),
630 1,
631 "First fetch should make 1 API call"
632 );
633
634 let _ = client
636 .fetch_query(
637 scope.clone(),
638 PaginatedPageKey {
639 key: (),
640 page_index: 0,
641 page_size: 10,
642 },
643 )
644 .await;
645 assert_eq!(call_count.load(Ordering::Relaxed), 1, "Should still be 1 call - cached");
646
647 let _ = client
649 .fetch_query(
650 scope.clone(),
651 PaginatedPageKey {
652 key: (),
653 page_index: 1,
654 page_size: 10,
655 },
656 )
657 .await;
658 assert_eq!(call_count.load(Ordering::Relaxed), 2, "Second page needs new API call");
659
660 let _ = client
662 .fetch_query(
663 scope.clone(),
664 PaginatedPageKey {
665 key: (),
666 page_index: 0,
667 page_size: 10,
668 },
669 )
670 .await;
671 assert_eq!(
672 call_count.load(Ordering::Relaxed),
673 2,
674 "Should still be 2 calls - first page cached"
675 );
676 })
677 .await;
678 }
679
680 #[rstest]
682 #[tokio::test]
683 async fn test_paginated_cursor_linked_invalidation_and_clear(#[values(true, false)] clear: bool) {
684 crate::test::identify_parking_lot_deadlocks();
685 tokio::task::LocalSet::new()
686 .run_until(async move {
687 let (client, _guard, _owner) = prep_vari!(true);
688
689 let version = Arc::new(AtomicUsize::new(0));
690 let version_clone = version.clone();
691
692 let scope = QueryScope::new_paginated_with_cursor(move |key: String, page_size, continuation| {
693 let v = version_clone.clone();
694 async move {
695 let current_version = v.load(Ordering::Relaxed);
696 let offset = continuation.unwrap_or(0);
697
698 let items: Vec<String> = (offset..offset + page_size)
700 .map(|i| format!("{}_v{}_{}", key, current_version, i))
701 .collect();
702
703 let next = if offset + page_size < 30 {
704 Some(offset + page_size)
705 } else {
706 None
707 };
708 (items, next)
709 }
710 });
711
712 let (items1, _) = client
714 .fetch_query(
715 scope.clone(),
716 PaginatedPageKey {
717 key: "test".to_string(),
718 page_index: 0,
719 page_size: 10,
720 },
721 )
722 .await
723 .expect("Page should exist");
724
725 assert_eq!(items1[0], "test_v0_0");
726
727 let (items2, _) = client
729 .fetch_query(
730 scope.clone(),
731 PaginatedPageKey {
732 key: "test".to_string(),
733 page_index: 1,
734 page_size: 10,
735 },
736 )
737 .await
738 .expect("Page should exist");
739
740 assert_eq!(items2[0], "test_v0_10");
741
742 version.store(1, Ordering::Relaxed);
744
745 let (items1_new, _) = client
747 .fetch_query(
748 scope.clone(),
749 PaginatedPageKey {
750 key: "test".to_string(),
751 page_index: 0,
752 page_size: 10,
753 },
754 )
755 .await
756 .expect("Page should exist");
757
758 assert_eq!(
759 items1_new[0], "test_v0_0",
760 "Should have new version after invalidation/clear"
761 );
762
763 if clear {
765 client.untyped_client.clear_query_scope(scope.clone());
767 } else {
768 client.invalidate_query_scope(scope.clone());
769 }
770
771 let (items1_new, _) = client
773 .fetch_query(
774 scope.clone(),
775 PaginatedPageKey {
776 key: "test".to_string(),
777 page_index: 0,
778 page_size: 10,
779 },
780 )
781 .await
782 .expect("Page should exist");
783
784 assert_eq!(
785 items1_new[0], "test_v1_0",
786 "Should have new version after invalidation/clear"
787 );
788
789 let (items2_new, _) = client
791 .fetch_query(
792 scope.clone(),
793 PaginatedPageKey {
794 key: "test".to_string(),
795 page_index: 1,
796 page_size: 10,
797 },
798 )
799 .await
800 .expect("Page should exist");
801
802 assert_eq!(items2_new[0], "test_v1_10", "Second page should also have new version");
803 })
804 .await;
805 }
806
807 #[tokio::test]
809 async fn test_paginated_cursor_empty_response_handling() {
810 crate::test::identify_parking_lot_deadlocks();
811 tokio::task::LocalSet::new()
812 .run_until(async move {
813 let (client, _guard, _owner) = prep_vari!(true);
814
815 let scope = QueryScope::new_paginated_with_cursor(|_key: (), _page_size, continuation| async move {
816 match continuation {
817 None => (vec![1, 2, 3], Some(3)),
818 Some(3) => (vec![], Some(6)), _ => (vec![], None),
820 }
821 });
822
823 let (items, has_more) = client
824 .fetch_query(
825 scope.clone(),
826 PaginatedPageKey {
827 key: (),
828 page_index: 0,
829 page_size: 10,
830 },
831 )
832 .await
833 .expect("Page should exist");
834
835 assert_eq!(items.len(), 3);
837 assert!(!has_more, "Should not have more pages when empty response received");
838 })
839 .await;
840 }
841
842 #[tokio::test]
844 async fn test_paginated_cursor_concurrent_fetches() {
845 crate::test::identify_parking_lot_deadlocks();
846 tokio::task::LocalSet::new()
847 .run_until(async move {
848 let (client, _guard, _owner) = prep_vari!(true);
849
850 let call_count = Arc::new(AtomicUsize::new(0));
851 let call_count_clone = call_count.clone();
852
853 let scope = QueryScope::new_paginated_with_cursor(move |_key: (), page_size, continuation| {
854 let call_count = call_count_clone.clone();
855 async move {
856 call_count.fetch_add(1, Ordering::Relaxed);
857
858 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
860
861 let offset = continuation.unwrap_or(0);
862 let items: Vec<usize> = (offset..offset + page_size).collect();
863 let next = if offset + page_size < 100 {
864 Some(offset + page_size)
865 } else {
866 None
867 };
868 (items, next)
869 }
870 });
871
872 let futures = (0..5).map(|_| {
874 client.fetch_query(
875 scope.clone(),
876 PaginatedPageKey {
877 key: (),
878 page_index: 0,
879 page_size: 10,
880 },
881 )
882 });
883
884 let results = futures::future::join_all(futures).await;
885
886 for result in results {
888 assert!(result.is_some());
889 }
890
891 assert_eq!(
893 call_count.load(Ordering::Relaxed),
894 1,
895 "Concurrent fetches should share single API call"
896 );
897 })
898 .await;
899 }
900
901 #[tokio::test]
903 async fn test_paginated_cursor_different_page_sizes() {
904 crate::test::identify_parking_lot_deadlocks();
905 tokio::task::LocalSet::new()
906 .run_until(async move {
907 let (client, _guard, _owner) = prep_vari!(true);
908
909 let call_count = Arc::new(AtomicUsize::new(0));
910 let call_count_clone = call_count.clone();
911
912 let scope = QueryScope::new_paginated_with_cursor(move |_key: (), page_size, continuation| {
913 let call_count = call_count_clone.clone();
914 async move {
915 call_count.fetch_add(1, Ordering::Relaxed);
916
917 let offset = continuation.unwrap_or(0);
918 let items: Vec<usize> = (offset..std::cmp::min(offset + page_size, 50)).collect();
919 let next = if offset + page_size < 50 {
920 Some(offset + page_size)
921 } else {
922 None
923 };
924 (items, next)
925 }
926 });
927
928 let (items1, _) = client
930 .fetch_query(
931 scope.clone(),
932 PaginatedPageKey {
933 key: (),
934 page_index: 0,
935 page_size: 5,
936 },
937 )
938 .await
939 .expect("Page should exist");
940 assert_eq!(items1.len(), 5);
941 assert_eq!(call_count.load(Ordering::Relaxed), 1);
942
943 let (items2, _) = client
945 .fetch_query(
946 scope.clone(),
947 PaginatedPageKey {
948 key: (),
949 page_index: 0,
950 page_size: 15,
951 },
952 )
953 .await
954 .expect("Page should exist");
955 assert_eq!(items2.len(), 15);
956 assert_eq!(
957 call_count.load(Ordering::Relaxed),
958 2,
959 "Should fetch more data for larger page"
960 );
961
962 let (items3, _) = client
964 .fetch_query(
965 scope.clone(),
966 PaginatedPageKey {
967 key: (),
968 page_index: 0,
969 page_size: 10,
970 },
971 )
972 .await
973 .expect("Page should exist");
974 assert_eq!(items3.len(), 10);
975 assert_eq!(
976 call_count.load(Ordering::Relaxed),
977 2,
978 "Should use cached data, no new fetch"
979 );
980 })
981 .await;
982 }
983
984 #[rstest]
987 #[case::gc_time(TestMode::GcTime)]
988 #[case::stale_time(TestMode::StaleTime)]
989 #[tokio::test]
990 async fn test_paginated_cursor_backing_cache_lifecycle(#[case] mode: TestMode) {
991 crate::test::identify_parking_lot_deadlocks();
992 tokio::task::LocalSet::new()
993 .run_until(async move {
994 let (client, _guard, _owner) = prep_vari!(true);
995
996 let call_count = Arc::new(AtomicUsize::new(0));
997 let call_count_clone = call_count.clone();
998
999 let scope = QueryScope::new_paginated_with_cursor(move |key: String, page_size, continuation| {
1000 let call_count = call_count_clone.clone();
1001 async move {
1002 call_count.fetch_add(1, Ordering::Relaxed);
1003
1004 let offset = continuation.unwrap_or(0);
1005 let items: Vec<String> =
1006 (offset..offset + page_size).map(|i| format!("{}_{}", key, i)).collect();
1007 let next = if offset + page_size < 30 {
1008 Some(offset + page_size)
1009 } else {
1010 None
1011 };
1012 (items, next)
1013 }
1014 })
1015 .with_options(match mode {
1016 TestMode::GcTime => {
1017 crate::QueryOptions::default().with_gc_time(std::time::Duration::from_millis(100))
1018 }
1019 TestMode::StaleTime => {
1020 crate::QueryOptions::default().with_stale_time(std::time::Duration::from_millis(100))
1021 }
1022 });
1023
1024 let (items, _) = client
1026 .fetch_query(
1027 scope.clone(),
1028 PaginatedPageKey {
1029 key: "test".to_string(),
1030 page_index: 0,
1031 page_size: 10,
1032 },
1033 )
1034 .await
1035 .expect("Page should exist");
1036 assert_eq!(items[0], "test_0");
1037 assert_eq!(call_count.load(Ordering::Relaxed), 1);
1038
1039 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1041
1042 let (items, _) = client
1044 .fetch_query(
1045 scope.clone(),
1046 PaginatedPageKey {
1047 key: "test".to_string(),
1048 page_index: 1,
1049 page_size: 10,
1050 },
1051 )
1052 .await
1053 .expect("Page should exist");
1054 assert_eq!(items[0], "test_10");
1055 assert_eq!(call_count.load(Ordering::Relaxed), 2);
1056
1057 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1060
1061 let (items, _) = client
1063 .fetch_query(
1064 scope.clone(),
1065 PaginatedPageKey {
1066 key: "test".to_string(),
1067 page_index: 0,
1068 page_size: 10,
1069 },
1070 )
1071 .await
1072 .expect("Page should exist");
1073 assert_eq!(items[0], "test_0");
1074
1075 let (items, _) = client
1077 .fetch_query(
1078 scope.clone(),
1079 PaginatedPageKey {
1080 key: "test".to_string(),
1081 page_index: 1,
1082 page_size: 10,
1083 },
1084 )
1085 .await
1086 .expect("Page should exist");
1087 assert_eq!(items[0], "test_10");
1088
1089 let expected_calls = match mode {
1090 TestMode::GcTime => {
1091 2
1094 }
1095 TestMode::StaleTime => {
1096 3
1099 }
1100 };
1101 assert_eq!(call_count.load(Ordering::Relaxed), expected_calls);
1102
1103 if matches!(mode, TestMode::GcTime) {
1105 tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1106
1107 let (items, _) = client
1109 .fetch_query(
1110 scope.clone(),
1111 PaginatedPageKey {
1112 key: "test".to_string(),
1113 page_index: 0,
1114 page_size: 10,
1115 },
1116 )
1117 .await
1118 .expect("Page should exist");
1119 assert_eq!(items[0], "test_0");
1120 assert_eq!(
1121 call_count.load(Ordering::Relaxed),
1122 3,
1123 "Backing cache should be cleared when all pages are GC'd"
1124 );
1125 }
1126 })
1127 .await;
1128 }
1129
1130 #[derive(Debug, Clone, Copy)]
1131 enum TestMode {
1132 GcTime,
1134 StaleTime,
1136 }
1137}