leptos_fetch/pagination/paginated_cursor.rs
1use std::time::Duration;
2use std::{hash::Hash, sync::Arc};
3
4use leptos::prelude::*;
5
6use crate::{
7 PaginatedPageKey, QueryOptions, QueryScope, QueryScopeLocal, UntypedQueryClient,
8 cache::OnScopeMissing, debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
9 query_scope::ScopeCacheKey,
10};
11
12macro_rules! define {
13 ([$($impl_fut_generics:tt)*], [$($impl_fn_generics:tt)*], $name:ident, $fetch_fn:ident, $sname:literal) => {
14
15 impl<Key, PageItem> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>
16 where
17 Key: DebugIfDevtoolsEnabled + Clone + Hash + PartialEq + 'static $($impl_fn_generics)*,
18 PageItem: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
19 {
20 /// Create a cursor-based paginated query scope.
21 ///
22 /// Use this when your API uses continuation tokens/cursors rather than numeric offsets.
23 /// Good for infinite scroll patterns or when your API doesn't support offset-based pagination.
24 ///
25 /// # Arguments
26 ///
27 /// The getter receives:
28 /// - `query_key: Key` - Your custom key, same across all pages
29 /// - `nb_items_requested: usize` - Target number of items to return, will call again if not enough items returned
30 /// - `cursor: Option<Cursor>` - Cursor from previous fetch, `None` on first page
31 ///
32 /// The getter must return:
33 /// - `Vec<Item>` - Items for this page
34 /// - `Option<Cursor>` - Next cursor token, `None` when no more data
35 ///
36 /// # Example
37 ///
38 /// ```rust,ignore
39 /// use leptos_fetch::{QueryScope, PaginatedPageKey};
40 ///
41 /// // Create paginated scope with cursor-based API
42 /// let scope = QueryScope::new_paginated_with_cursor(
43 /// |_key: (), nb_items, cursor: Option<String>| async move {
44 /// // Call your API with cursor token
45 /// let (items, next_cursor) = fetch_from_api(cursor, nb_items).await;
46 /// (items, next_cursor)
47 /// }
48 /// );
49 ///
50 /// // Use like any other scope - fetch pages with PaginatedPageKey
51 /// let (items, has_more) = client.fetch_query(scope, PaginatedPageKey {
52 /// key: (),
53 /// page_index: 0,
54 /// page_size: 20,
55 /// }).await.expect("Page exists");
56 ///
57 /// // has_more: bool indicates if another page is available
58 /// if has_more {
59 /// let (next_items, _) = client.fetch_query(scope, PaginatedPageKey {
60 /// key: (),
61 /// page_index: 1,
62 /// page_size: 20,
63 /// }).await.expect("Next page exists");
64 /// }
65 /// ```
66 pub fn new_paginated_with_cursor<Cursor, Fut>(
67 getter: impl Fn(Key, usize, Option<Cursor>) -> Fut + 'static $($impl_fn_generics)*,
68 ) -> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>
69 where
70 Cursor: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
71 Fut: Future<Output = (Vec<PageItem>, Option<Cursor>)> $($impl_fut_generics)*,
72 {
73 let getter = Arc::new(getter);
74 let backing_cache_scope = $name::new({
75 let getter = getter.clone();
76 move |key: KeyWithItemCountRequestedUnhashed<Key>| {
77 let getter = getter.clone();
78 async move {
79 let (items, mut cursor) = getter(key.key, key.item_count_requested, None).await;
80
81 // Protect incorrect Some(cursor) when clearly no items left:
82 if items.is_empty() {
83 cursor = None;
84 }
85
86 BackingCache {
87 inner: Arc::new(BackingCacheInner {
88 items: parking_lot::Mutex::new(items),
89 cursor: parking_lot::Mutex::new(cursor),
90 update_lock: futures::lock::Mutex::new(()),
91 }),
92 }
93 }
94 }
95 })
96 // Shouldn't itself expire, the paginated query will control that:
97 .with_options(
98 QueryOptions::default()
99 .with_stale_time(Duration::MAX)
100 .with_gc_time(Duration::MAX)
101 .with_refetch_interval(Duration::MAX)
102 );
103
104 $name::new({
105 let backing_cache_scope = backing_cache_scope.clone();
106 move |page_key: PaginatedPageKey<Key>| {
107 let backing_cache_scope = backing_cache_scope.clone();
108 let getter = getter.clone();
109 async move {
110 let untyped_client = use_context::<UntypedQueryClient>()
111 .expect(
112 "leptos-fetch bug, UntypedQueryClient should always have been \
113 provided to the query context internally"
114 );
115 let scope_cache_key = use_context::<ScopeCacheKey>()
116 .expect(
117 "leptos-fetch bug, ScopeCacheKey itself should always have been \
118 provided to the query context internally"
119 );
120
121 // If this query is reloading because it was stale, should invalidate the backing cache before reading it,
122 // otherwise will still get back the same stale data again:
123 if let Some(metadata) = untyped_client.query_metadata::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>(
124 scope_cache_key,
125 &page_key,
126 )
127 && metadata.stale_or_invalidated
128 && let Some(backing_metadata) = untyped_client.query_metadata::<KeyWithItemCountRequestedUnhashed<Key>, BackingCache<PageItem, Cursor>>(
129 backing_cache_scope.cache_key,
130 &KeyWithItemCountRequestedUnhashed {
131 key: page_key.key.clone(),
132 item_count_requested: 0, // Doesn't matter, it's not part of the hash
133 },
134 )
135 && backing_metadata.updated_at <= metadata.updated_at
136 {
137 untyped_client.invalidate_query(
138 &backing_cache_scope,
139 KeyWithItemCountRequestedUnhashed {
140 key: page_key.key.clone(),
141 item_count_requested: 0, // Doesn't matter, it's not part of the hash
142 },
143 );
144 }
145
146 let infinite_cache = untyped_client
147 .$fetch_fn(
148 backing_cache_scope,
149 KeyWithItemCountRequestedUnhashed {
150 key: page_key.key.clone(),
151 item_count_requested: page_key.page_size,
152 },
153 )
154 .await;
155
156 let target_idx_start = page_key.page_index * (page_key.page_size as usize);
157 let target_idx_end_exclusive = (page_key.page_index + 1) * (page_key.page_size as usize);
158
159 // Load x more if needed:
160 let should_request_x_more = || {
161 let items = infinite_cache.inner.items.lock();
162 if items.len() < target_idx_end_exclusive
163 && infinite_cache.inner.cursor.lock().is_some() {
164 Some(target_idx_end_exclusive - items.len())
165 } else {
166 None
167 }
168 };
169
170 if should_request_x_more().is_some() {
171 // Preventing multiple simultaneous fetches by holding an async lock,
172 // but note making sure to not hold any sync locks across await boundaries.
173 let mut _guard = infinite_cache.inner.update_lock.lock().await;
174 while let Some(amount_needed) = should_request_x_more() {
175 let cur_token = (&*infinite_cache.inner.cursor.lock()).clone();
176 let (items, cursor) =
177 getter(page_key.key.clone(), amount_needed, cur_token).await;
178 if !items.is_empty() {
179 infinite_cache.inner.items.lock().extend(items);
180 *infinite_cache.inner.cursor.lock() = cursor;
181 } else {
182 // Protect incorrect Some(cursor) when clearly no items left:
183 *infinite_cache.inner.cursor.lock() = None;
184 }
185 }
186 drop(_guard);
187 }
188
189 let items_guard = infinite_cache.inner.items.lock();
190 if target_idx_start >= items_guard.len() {
191 None
192 } else {
193 let items = items_guard
194 [target_idx_start..std::cmp::min(target_idx_end_exclusive, items_guard.len())]
195 .to_vec();
196 let next_page_exists = items_guard.len() > target_idx_end_exclusive
197 || infinite_cache.inner.cursor.lock().is_some();
198 Some((items, next_page_exists))
199 }
200 }
201 }
202 })
203 // An invalidation or clear of any page should invalidate the backing cache:
204 .on_invalidation({
205 let backing_cache_scope = backing_cache_scope.clone();
206 move |key| {
207 let untyped_client = use_context::<UntypedQueryClient>()
208 .expect(
209 "leptos-fetch bug, UntypedQueryClient should always have been \
210 provided to the on_invalidation context internally"
211 );
212 untyped_client.invalidate_query(
213 &backing_cache_scope,
214 KeyWithItemCountRequestedUnhashed {
215 key: key.key.clone(),
216 item_count_requested: 0, // Doesn't matter, it's not part of the hash
217 },
218 );
219 }
220 })
221 // If this was the last page existing now being gc'd, clear the backing cache too:
222 .on_gc(move |key| {
223 let untyped_client = use_context::<UntypedQueryClient>()
224 .expect(
225 "leptos-fetch bug, UntypedQueryClient should always have been \
226 provided to the on_gc context internally"
227 );
228 let scope_cache_key = use_context::<ScopeCacheKey>()
229 .expect(
230 "leptos-fetch bug, ScopeCacheKey itself should always have been \
231 provided to the on_gc context internally"
232 );
233 let mut found_nb = 0;
234 untyped_client
235 .scope_lookup
236 .with_cached_scope_mut::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>, _, _>(
237 &mut untyped_client.scope_lookup.scopes_mut(),
238 scope_cache_key,
239 OnScopeMissing::Skip,
240 |_| {},
241 |maybe_scope, _| {
242 if let Some(scope) = maybe_scope {
243 for query_or_pending in scope.all_queries_mut_include_pending() {
244 if query_or_pending.key().value_if_safe().map(|test_key| test_key.key == key.key).unwrap_or(false) {
245 found_nb += 1;
246 }
247 }
248 }
249 },
250 );
251 if found_nb == 0 {
252 untyped_client.clear_query(
253 &backing_cache_scope,
254 KeyWithItemCountRequestedUnhashed {
255 key: key.key.clone(),
256 item_count_requested: 0, // Doesn't matter, it's not part of the hash
257 },
258 );
259 }
260 })
261 }
262 }
263 };
264}
265
266define! { [+ Send], [+ Send + Sync], QueryScope, fetch_query, "QueryScope" }
267define! { [], [], QueryScopeLocal, fetch_query_local, "QueryScopeLocal" }
268
269#[derive(Debug, Clone)]
270struct KeyWithItemCountRequestedUnhashed<Key> {
271 key: Key,
272 item_count_requested: usize,
273}
274
275impl<Key: Hash> Hash for KeyWithItemCountRequestedUnhashed<Key> {
276 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
277 self.key.hash(state);
278 }
279}
280
281#[derive(Debug, Clone)]
282struct BackingCache<Item, Cursor> {
283 inner: Arc<BackingCacheInner<Item, Cursor>>,
284}
285
286#[derive(Debug)]
287struct BackingCacheInner<Item, Cursor> {
288 items: parking_lot::Mutex<Vec<Item>>,
289 cursor: parking_lot::Mutex<Option<Cursor>>,
290 update_lock: futures::lock::Mutex<()>,
291}
292
293#[cfg(test)]
294mod tests {
295 use any_spawner::Executor;
296 use hydration_context::SsrSharedContext;
297 use leptos::prelude::*;
298 use rstest::*;
299 use std::sync::Arc;
300 use std::sync::atomic::{AtomicUsize, Ordering};
301
302 use crate::test::prep_vari;
303 use crate::{PaginatedPageKey, QueryClient, QueryScope};
304
305 /// We don't know the serialization mechanism the user is using, so cannot return wrapping types from the query function.
306 /// Hence returning (Vec<Item>, has_more_pages: bool) instead of a custom wrapping Page<Item> type.
307 /// This test is just checking compilation.
308 #[tokio::test]
309 async fn test_paginated_serialization_works() {
310 crate::test::identify_parking_lot_deadlocks();
311 tokio::task::LocalSet::new()
312 .run_until(async move {
313 let (client, _guard, _owner) = prep_vari!(true);
314 let scope = QueryScope::new_paginated_with_cursor(|_: (), _, _| async {
315 (vec![()], Some(()))
316 });
317 client.resource(scope, || PaginatedPageKey {
318 key: (),
319 page_index: 0,
320 page_size: 10,
321 });
322 })
323 .await;
324 }
325
326 fn get_simple_api_fn(
327 num_rows: usize,
328 ) -> (
329 Arc<AtomicUsize>,
330 impl Fn(usize, Option<usize>) -> (Vec<usize>, Option<usize>) + Clone + 'static,
331 ) {
332 let call_count = Arc::new(AtomicUsize::new(0));
333
334 let api_fn = {
335 let call_count = call_count.clone();
336 move |target_return_count: usize, offset: Option<usize>| {
337 let call_count = call_count.clone();
338 call_count.fetch_add(1, Ordering::Relaxed);
339
340 let offset = offset.unwrap_or(0);
341 let items = (0..num_rows)
342 .skip(offset)
343 .take(target_return_count)
344 .collect::<Vec<_>>();
345 let next_offset = if offset + target_return_count < num_rows {
346 Some(offset + items.len())
347 } else {
348 None
349 };
350 (items, next_offset)
351 }
352 };
353
354 (call_count, api_fn)
355 }
356
357 #[tokio::test]
358 async fn test_paginated_cursor() {
359 crate::test::identify_parking_lot_deadlocks();
360 tokio::task::LocalSet::new()
361 .run_until(async move {
362 let (client, _guard, _owner) = prep_vari!(true);
363
364 let (_call_count, my_api_fn) = get_simple_api_fn(30);
365
366 // The scope/queryer can now be used like any other QueryScope, just with PaginatedPageKey<YourKey> as the key type.
367 // In resources, fetch_query, etc.
368 let scope =
369 QueryScope::new_paginated_with_cursor(move |_query_key, page_size, offset| {
370 let my_api_fn = my_api_fn.clone();
371 async move {
372 let (items, maybe_next_offset) = my_api_fn(page_size, offset);
373 (items, maybe_next_offset)
374 }
375 });
376
377 let (first_page_logs, more_pages) = client
378 .fetch_query(
379 scope.clone(),
380 PaginatedPageKey {
381 key: (),
382 page_index: 0,
383 page_size: 20,
384 },
385 )
386 .await
387 .expect(
388 "This page should exist (Some()), \
389 None when a page is requested beyond the end of the data.",
390 );
391
392 assert_eq!(first_page_logs, (0..20).collect::<Vec<_>>());
393
394 assert!(
395 more_pages,
396 "There should be more pages after the first page, \
397 ROW_COUNT=30 which is > page size of 20."
398 );
399
400 let (second_page_logs, more_pages) = client
401 .fetch_query(
402 scope.clone(),
403 PaginatedPageKey {
404 key: (),
405 page_index: 1,
406 page_size: 20,
407 },
408 )
409 .await
410 .expect(
411 "This page should exist (Some()), \
412 None when a page is requested beyond the end of the data.",
413 );
414
415 assert_eq!(second_page_logs, (20..30).collect::<Vec<_>>());
416
417 assert!(
418 !more_pages,
419 "20+20=40 which is > ROW_COUNT=30, so no more pages after the second page."
420 );
421
422 assert!(
423 client
424 .fetch_query(
425 scope.clone(),
426 PaginatedPageKey {
427 key: (),
428 page_index: 2,
429 page_size: 20,
430 },
431 )
432 .await
433 .is_none()
434 );
435 })
436 .await;
437 }
438
439 /// Test that the pagination logic keeps calling the API until page_size items are available
440 #[tokio::test]
441 async fn test_paginated_cursor_fills_page_size_with_multiple_calls() {
442 crate::test::identify_parking_lot_deadlocks();
443 tokio::task::LocalSet::new()
444 .run_until(async move {
445 let (client, _guard, _owner) = prep_vari!(true);
446
447 let call_count = Arc::new(AtomicUsize::new(0));
448 let call_count_clone = call_count.clone();
449
450 // API that returns fewer items than requested
451 let scope = QueryScope::new_paginated_with_cursor(
452 move |_key: (), _page_size, continuation| {
453 let call_count = call_count_clone.clone();
454 async move {
455 call_count.fetch_add(1, Ordering::Relaxed);
456
457 // Return only 3 items per call, even if more are requested
458 match continuation {
459 None => {
460 // First call
461 (vec![0, 1, 2], Some(3))
462 }
463 Some(3) => {
464 // Second call
465 (vec![3, 4, 5], Some(6))
466 }
467 Some(6) => {
468 // Third call
469 (vec![6, 7, 8], Some(9))
470 }
471 Some(9) => {
472 // Fourth call
473 (vec![9, 10], None) // Only 2 items left
474 }
475 _ => (vec![], None),
476 }
477 }
478 },
479 );
480
481 // Request page with size 10 - should make 4 API calls to get 10 items
482 let (items, has_more) = client
483 .fetch_query(
484 scope.clone(),
485 PaginatedPageKey {
486 key: (),
487 page_index: 0,
488 page_size: 10,
489 },
490 )
491 .await
492 .expect("Page should exist");
493
494 assert_eq!(items.len(), 10, "Should have filled to page_size");
495 assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
496 assert!(has_more, "Should indicate more pages available");
497 assert_eq!(
498 call_count.load(Ordering::Relaxed),
499 4,
500 "Should have made 4 API calls to fill page"
501 );
502
503 // Request second page - should only need 1 more API call
504 let (items, has_more) = client
505 .fetch_query(
506 scope.clone(),
507 PaginatedPageKey {
508 key: (),
509 page_index: 1,
510 page_size: 10,
511 },
512 )
513 .await
514 .expect("Page should exist");
515
516 assert_eq!(items.len(), 1, "Only 1 item left");
517 assert_eq!(items, vec![10]);
518 assert!(!has_more, "No more pages");
519 assert_eq!(
520 call_count.load(Ordering::Relaxed),
521 4,
522 "Should not make additional calls - data already cached"
523 );
524 })
525 .await;
526
527 let (_call_count, my_api_fn) = get_simple_api_fn(30);
528
529 // Create the scope
530 let scope = QueryScope::new_paginated_with_cursor(move |_query_key, page_size, offset| {
531 let my_api_fn = my_api_fn.clone();
532 async move {
533 let (items, maybe_next_offset) = my_api_fn(page_size, offset);
534 (items, maybe_next_offset)
535 }
536 });
537
538 let client = QueryClient::new();
539
540 // Fetch first page
541 let (first_page, more_pages) = client
542 .fetch_query(
543 scope.clone(),
544 PaginatedPageKey {
545 key: (),
546 page_index: 0,
547 page_size: 20,
548 },
549 )
550 .await
551 .expect("First page should exist");
552
553 assert_eq!(first_page, (0..20).collect::<Vec<_>>());
554 assert!(more_pages);
555
556 // Fetch second page
557 let (second_page, more_pages) = client
558 .fetch_query(
559 scope.clone(),
560 PaginatedPageKey {
561 key: (),
562 page_index: 1,
563 page_size: 20,
564 },
565 )
566 .await
567 .expect("Second page should exist");
568
569 assert_eq!(second_page, (20..30).collect::<Vec<_>>());
570 assert!(!more_pages);
571
572 // Requesting beyond available data returns None
573 assert!(
574 client
575 .fetch_query(
576 scope.clone(),
577 PaginatedPageKey {
578 key: (),
579 page_index: 2,
580 page_size: 20,
581 },
582 )
583 .await
584 .is_none()
585 );
586 }
587
588 /// Test that API calls are cached properly and not repeated unnecessarily
589 #[tokio::test]
590 async fn test_paginated_cursor_no_unnecessary_api_calls() {
591 crate::test::identify_parking_lot_deadlocks();
592 tokio::task::LocalSet::new()
593 .run_until(async move {
594 let (client, _guard, _owner) = prep_vari!(true);
595
596 let call_count = Arc::new(AtomicUsize::new(0));
597 let call_count_clone = call_count.clone();
598
599 let scope = QueryScope::new_paginated_with_cursor(
600 move |_key: (), page_size, continuation| {
601 let call_count = call_count_clone.clone();
602 async move {
603 call_count.fetch_add(1, Ordering::Relaxed);
604
605 let offset = continuation.unwrap_or(0);
606 let items: Vec<usize> = (offset..offset + page_size).collect();
607 let next = if offset + page_size < 100 {
608 Some(offset + page_size)
609 } else {
610 None
611 };
612 (items, next)
613 }
614 },
615 );
616
617 // First fetch
618 let _ = client
619 .fetch_query(
620 scope.clone(),
621 PaginatedPageKey {
622 key: (),
623 page_index: 0,
624 page_size: 10,
625 },
626 )
627 .await;
628 assert_eq!(
629 call_count.load(Ordering::Relaxed),
630 1,
631 "First fetch should make 1 API call"
632 );
633
634 // Fetch same page again - should use cache
635 let _ = client
636 .fetch_query(
637 scope.clone(),
638 PaginatedPageKey {
639 key: (),
640 page_index: 0,
641 page_size: 10,
642 },
643 )
644 .await;
645 assert_eq!(
646 call_count.load(Ordering::Relaxed),
647 1,
648 "Should still be 1 call - cached"
649 );
650
651 // Fetch next page
652 let _ = client
653 .fetch_query(
654 scope.clone(),
655 PaginatedPageKey {
656 key: (),
657 page_index: 1,
658 page_size: 10,
659 },
660 )
661 .await;
662 assert_eq!(
663 call_count.load(Ordering::Relaxed),
664 2,
665 "Second page needs new API call"
666 );
667
668 // Fetch first page again - should still be cached
669 let _ = client
670 .fetch_query(
671 scope.clone(),
672 PaginatedPageKey {
673 key: (),
674 page_index: 0,
675 page_size: 10,
676 },
677 )
678 .await;
679 assert_eq!(
680 call_count.load(Ordering::Relaxed),
681 2,
682 "Should still be 2 calls - first page cached"
683 );
684 })
685 .await;
686 }
687
688 /// Test linked invalidation and clear between pages with same key
689 #[rstest]
690 #[tokio::test]
691 async fn test_paginated_cursor_linked_invalidation_and_clear(
692 #[values(true, false)] clear: bool,
693 ) {
694 crate::test::identify_parking_lot_deadlocks();
695 tokio::task::LocalSet::new()
696 .run_until(async move {
697 let (client, _guard, _owner) = prep_vari!(true);
698
699 let version = Arc::new(AtomicUsize::new(0));
700 let version_clone = version.clone();
701
702 let scope = QueryScope::new_paginated_with_cursor(
703 move |key: String, page_size, continuation| {
704 let v = version_clone.clone();
705 async move {
706 let current_version = v.load(Ordering::Relaxed);
707 let offset = continuation.unwrap_or(0);
708
709 // Return different data based on version
710 let items: Vec<String> = (offset..offset + page_size)
711 .map(|i| format!("{}_v{}_{}", key, current_version, i))
712 .collect();
713
714 let next = if offset + page_size < 30 {
715 Some(offset + page_size)
716 } else {
717 None
718 };
719 (items, next)
720 }
721 },
722 );
723
724 // Fetch first page
725 let (items1, _) = client
726 .fetch_query(
727 scope.clone(),
728 PaginatedPageKey {
729 key: "test".to_string(),
730 page_index: 0,
731 page_size: 10,
732 },
733 )
734 .await
735 .expect("Page should exist");
736
737 assert_eq!(items1[0], "test_v0_0");
738
739 // Fetch second page
740 let (items2, _) = client
741 .fetch_query(
742 scope.clone(),
743 PaginatedPageKey {
744 key: "test".to_string(),
745 page_index: 1,
746 page_size: 10,
747 },
748 )
749 .await
750 .expect("Page should exist");
751
752 assert_eq!(items2[0], "test_v0_10");
753
754 // Increment version to simulate data change
755 version.store(1, Ordering::Relaxed);
756
757 // Fetch first page again - should still be old data
758 let (items1_new, _) = client
759 .fetch_query(
760 scope.clone(),
761 PaginatedPageKey {
762 key: "test".to_string(),
763 page_index: 0,
764 page_size: 10,
765 },
766 )
767 .await
768 .expect("Page should exist");
769
770 assert_eq!(
771 items1_new[0], "test_v0_0",
772 "Should have new version after invalidation/clear"
773 );
774
775 // Invalidation/clear should lead to new data:
776 if clear {
777 // Currently not public, but still want to check as used internally for some things:
778 client.untyped_client.clear_query_scope(scope.clone());
779 } else {
780 client.invalidate_query_scope(scope.clone());
781 }
782
783 // Fetch first page again - should get new data
784 let (items1_new, _) = client
785 .fetch_query(
786 scope.clone(),
787 PaginatedPageKey {
788 key: "test".to_string(),
789 page_index: 0,
790 page_size: 10,
791 },
792 )
793 .await
794 .expect("Page should exist");
795
796 assert_eq!(
797 items1_new[0], "test_v1_0",
798 "Should have new version after invalidation/clear"
799 );
800
801 // Second page should also be invalidated/cleared
802 let (items2_new, _) = client
803 .fetch_query(
804 scope.clone(),
805 PaginatedPageKey {
806 key: "test".to_string(),
807 page_index: 1,
808 page_size: 10,
809 },
810 )
811 .await
812 .expect("Page should exist");
813
814 assert_eq!(
815 items2_new[0], "test_v1_10",
816 "Second page should also have new version"
817 );
818 })
819 .await;
820 }
821
822 /// Test that empty responses are handled correctly
823 #[tokio::test]
824 async fn test_paginated_cursor_empty_response_handling() {
825 crate::test::identify_parking_lot_deadlocks();
826 tokio::task::LocalSet::new()
827 .run_until(async move {
828 let (client, _guard, _owner) = prep_vari!(true);
829
830 let scope = QueryScope::new_paginated_with_cursor(
831 |_key: (), _page_size, continuation| async move {
832 match continuation {
833 None => (vec![1, 2, 3], Some(3)),
834 Some(3) => (vec![], Some(6)), // Empty response but with cursor
835 _ => (vec![], None),
836 }
837 },
838 );
839
840 let (items, has_more) = client
841 .fetch_query(
842 scope.clone(),
843 PaginatedPageKey {
844 key: (),
845 page_index: 0,
846 page_size: 10,
847 },
848 )
849 .await
850 .expect("Page should exist");
851
852 // Should only have 3 items since second call returned empty
853 assert_eq!(items.len(), 3);
854 assert!(
855 !has_more,
856 "Should not have more pages when empty response received"
857 );
858 })
859 .await;
860 }
861
862 /// Test concurrent page fetches don't cause duplicate API calls
863 #[tokio::test]
864 async fn test_paginated_cursor_concurrent_fetches() {
865 crate::test::identify_parking_lot_deadlocks();
866 tokio::task::LocalSet::new()
867 .run_until(async move {
868 let (client, _guard, _owner) = prep_vari!(true);
869
870 let call_count = Arc::new(AtomicUsize::new(0));
871 let call_count_clone = call_count.clone();
872
873 let scope = QueryScope::new_paginated_with_cursor(
874 move |_key: (), page_size, continuation| {
875 let call_count = call_count_clone.clone();
876 async move {
877 call_count.fetch_add(1, Ordering::Relaxed);
878
879 // Add a small delay to simulate network latency
880 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
881
882 let offset = continuation.unwrap_or(0);
883 let items: Vec<usize> = (offset..offset + page_size).collect();
884 let next = if offset + page_size < 100 {
885 Some(offset + page_size)
886 } else {
887 None
888 };
889 (items, next)
890 }
891 },
892 );
893
894 // Launch multiple concurrent fetches for the same page
895 let futures = (0..5).map(|_| {
896 client.fetch_query(
897 scope.clone(),
898 PaginatedPageKey {
899 key: (),
900 page_index: 0,
901 page_size: 10,
902 },
903 )
904 });
905
906 let results = futures::future::join_all(futures).await;
907
908 // All should succeed
909 for result in results {
910 assert!(result.is_some());
911 }
912
913 // Should only make 1 API call despite 5 concurrent requests
914 assert_eq!(
915 call_count.load(Ordering::Relaxed),
916 1,
917 "Concurrent fetches should share single API call"
918 );
919 })
920 .await;
921 }
922
923 /// Test different page sizes work correctly with shared cache
924 #[tokio::test]
925 async fn test_paginated_cursor_different_page_sizes() {
926 crate::test::identify_parking_lot_deadlocks();
927 tokio::task::LocalSet::new()
928 .run_until(async move {
929 let (client, _guard, _owner) = prep_vari!(true);
930
931 let call_count = Arc::new(AtomicUsize::new(0));
932 let call_count_clone = call_count.clone();
933
934 let scope = QueryScope::new_paginated_with_cursor(
935 move |_key: (), page_size, continuation| {
936 let call_count = call_count_clone.clone();
937 async move {
938 call_count.fetch_add(1, Ordering::Relaxed);
939
940 let offset = continuation.unwrap_or(0);
941 let items: Vec<usize> =
942 (offset..std::cmp::min(offset + page_size, 50)).collect();
943 let next = if offset + page_size < 50 {
944 Some(offset + page_size)
945 } else {
946 None
947 };
948 (items, next)
949 }
950 },
951 );
952
953 // Fetch with page size 5
954 let (items1, _) = client
955 .fetch_query(
956 scope.clone(),
957 PaginatedPageKey {
958 key: (),
959 page_index: 0,
960 page_size: 5,
961 },
962 )
963 .await
964 .expect("Page should exist");
965 assert_eq!(items1.len(), 5);
966 assert_eq!(call_count.load(Ordering::Relaxed), 1);
967
968 // Fetch with page size 15 - should reuse cached data and fetch more
969 let (items2, _) = client
970 .fetch_query(
971 scope.clone(),
972 PaginatedPageKey {
973 key: (),
974 page_index: 0,
975 page_size: 15,
976 },
977 )
978 .await
979 .expect("Page should exist");
980 assert_eq!(items2.len(), 15);
981 assert_eq!(
982 call_count.load(Ordering::Relaxed),
983 2,
984 "Should fetch more data for larger page"
985 );
986
987 // Fetch with page size 10 - should use cached data
988 let (items3, _) = client
989 .fetch_query(
990 scope.clone(),
991 PaginatedPageKey {
992 key: (),
993 page_index: 0,
994 page_size: 10,
995 },
996 )
997 .await
998 .expect("Page should exist");
999 assert_eq!(items3.len(), 10);
1000 assert_eq!(
1001 call_count.load(Ordering::Relaxed),
1002 2,
1003 "Should use cached data, no new fetch"
1004 );
1005 })
1006 .await;
1007 }
1008
1009 /// Test that backing cache is properly managed when paginated pages expire
1010 /// Tests both GC (garbage collection) and stale time scenarios
1011 #[rstest]
1012 #[case::gc_time(TestMode::GcTime)]
1013 #[case::stale_time(TestMode::StaleTime)]
1014 #[tokio::test]
1015 async fn test_paginated_cursor_backing_cache_lifecycle(#[case] mode: TestMode) {
1016 crate::test::identify_parking_lot_deadlocks();
1017 tokio::task::LocalSet::new()
1018 .run_until(async move {
1019 let (client, _guard, _owner) = prep_vari!(true);
1020
1021 let call_count = Arc::new(AtomicUsize::new(0));
1022 let call_count_clone = call_count.clone();
1023
1024 let scope = QueryScope::new_paginated_with_cursor(
1025 move |key: String, page_size, continuation| {
1026 let call_count = call_count_clone.clone();
1027 async move {
1028 call_count.fetch_add(1, Ordering::Relaxed);
1029
1030 let offset = continuation.unwrap_or(0);
1031 let items: Vec<String> = (offset..offset + page_size)
1032 .map(|i| format!("{}_{}", key, i))
1033 .collect();
1034 let next = if offset + page_size < 30 {
1035 Some(offset + page_size)
1036 } else {
1037 None
1038 };
1039 (items, next)
1040 }
1041 },
1042 )
1043 .with_options(match mode {
1044 TestMode::GcTime => crate::QueryOptions::default()
1045 .with_gc_time(std::time::Duration::from_millis(100)),
1046 TestMode::StaleTime => crate::QueryOptions::default()
1047 .with_stale_time(std::time::Duration::from_millis(100)),
1048 });
1049
1050 // Fetch first page
1051 let (items, _) = client
1052 .fetch_query(
1053 scope.clone(),
1054 PaginatedPageKey {
1055 key: "test".to_string(),
1056 page_index: 0,
1057 page_size: 10,
1058 },
1059 )
1060 .await
1061 .expect("Page should exist");
1062 assert_eq!(items[0], "test_0");
1063 assert_eq!(call_count.load(Ordering::Relaxed), 1);
1064
1065 // Wait for 50ms, 50ms left till gc/stale:
1066 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1067
1068 // Fetch second page
1069 let (items, _) = client
1070 .fetch_query(
1071 scope.clone(),
1072 PaginatedPageKey {
1073 key: "test".to_string(),
1074 page_index: 1,
1075 page_size: 10,
1076 },
1077 )
1078 .await
1079 .expect("Page should exist");
1080 assert_eq!(items[0], "test_10");
1081 assert_eq!(call_count.load(Ordering::Relaxed), 2);
1082
1083 // Wait another 50ms, so the first query is stale/gc'd, but the second is valid even when gc'd because 50ms left:
1084 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1085
1086 // Fetch first page again
1087 let (items, _) = client
1088 .fetch_query(
1089 scope.clone(),
1090 PaginatedPageKey {
1091 key: "test".to_string(),
1092 page_index: 0,
1093 page_size: 10,
1094 },
1095 )
1096 .await
1097 .expect("Page should exist");
1098 assert_eq!(items[0], "test_0");
1099
1100 // Fetch second page again
1101 let (items, _) = client
1102 .fetch_query(
1103 scope.clone(),
1104 PaginatedPageKey {
1105 key: "test".to_string(),
1106 page_index: 1,
1107 page_size: 10,
1108 },
1109 )
1110 .await
1111 .expect("Page should exist");
1112 assert_eq!(items[0], "test_10");
1113
1114 let expected_calls = match mode {
1115 TestMode::GcTime => {
1116 // GC cleared page 0, but page 1 kept backing cache alive
1117 // So page 0 refetch reuses backing cache, plus page 1 still alive so also reused
1118 2
1119 }
1120 TestMode::StaleTime => {
1121 // Stale page triggers invalidation which clears backing cache
1122 // So page 0 refetch needs new API call, but page 1 is still valid so reused
1123 3
1124 }
1125 };
1126 assert_eq!(call_count.load(Ordering::Relaxed), expected_calls);
1127
1128 // If gc, should clear the backing cache only when all pages are gc'd:
1129 if matches!(mode, TestMode::GcTime) {
1130 tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1131
1132 // Now fetch again - backing cache should be gone
1133 let (items, _) = client
1134 .fetch_query(
1135 scope.clone(),
1136 PaginatedPageKey {
1137 key: "test".to_string(),
1138 page_index: 0,
1139 page_size: 10,
1140 },
1141 )
1142 .await
1143 .expect("Page should exist");
1144 assert_eq!(items[0], "test_0");
1145 assert_eq!(
1146 call_count.load(Ordering::Relaxed),
1147 3,
1148 "Backing cache should be cleared when all pages are GC'd"
1149 );
1150 }
1151 })
1152 .await;
1153 }
1154
1155 #[derive(Debug, Clone, Copy)]
1156 enum TestMode {
1157 /// Test GC behavior: backing cache should only be cleared when all pages are GC'd
1158 GcTime,
1159 /// Test stale time behavior: backing cache should be invalidated when any page goes stale
1160 StaleTime,
1161 }
1162}