Skip to main content

leptos_fetch/pagination/
paginated_cursor.rs

1use std::time::Duration;
2use std::{hash::Hash, sync::Arc};
3
4use leptos::prelude::*;
5
6use crate::{
7    PaginatedPageKey, QueryOptions, QueryScope, QueryScopeLocal, UntypedQueryClient, cache::OnScopeMissing,
8    debug_if_devtools_enabled::DebugIfDevtoolsEnabled, query_scope::ScopeCacheKey,
9};
10
11macro_rules! define {
12    ([$($impl_fut_generics:tt)*], [$($impl_fn_generics:tt)*], $name:ident, $fetch_fn:ident, $sname:literal) => {
13
14        impl<Key, PageItem> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>
15        where
16            Key: DebugIfDevtoolsEnabled + Clone + Hash + PartialEq + 'static $($impl_fn_generics)*,
17            PageItem: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
18        {
19            /// Create a cursor-based paginated query scope.
20            ///
21            /// Use this when your API uses continuation tokens/cursors rather than numeric offsets.
22            /// Good for infinite scroll patterns or when your API doesn't support offset-based pagination.
23            ///
24            /// # Arguments
25            ///
26            /// The getter receives:
27            /// - `query_key: Key` - Your custom key, same across all pages
28            /// - `nb_items_requested: usize` - Target number of items to return,
29            ///   will call again if not enough items returned
30            /// - `cursor: Option<Cursor>` - Cursor from previous fetch, `None` on first page
31            ///
32            /// The getter must return:
33            /// - `Vec<Item>` - Items for this page
34            /// - `Option<Cursor>` - Next cursor token, `None` when no more data
35            ///
36            /// # Example
37            ///
38            /// ```rust,ignore
39            /// use leptos_fetch::{QueryScope, PaginatedPageKey};
40            ///
41            /// // Create paginated scope with cursor-based API
42            /// let scope = QueryScope::new_paginated_with_cursor(
43            ///     |_key: (), nb_items, cursor: Option<String>| async move {
44            ///         // Call your API with cursor token
45            ///         let (items, next_cursor) = fetch_from_api(cursor, nb_items).await;
46            ///         (items, next_cursor)
47            ///     }
48            /// );
49            ///
50            /// // Use like any other scope - fetch pages with PaginatedPageKey
51            /// let (items, has_more) = client.fetch_query(scope, PaginatedPageKey {
52            ///     key: (),
53            ///     page_index: 0,
54            ///     page_size: 20,
55            /// }).await.expect("Page exists");
56            ///
57            /// // has_more: bool indicates if another page is available
58            /// if has_more {
59            ///     let (next_items, _) = client.fetch_query(scope, PaginatedPageKey {
60            ///         key: (),
61            ///         page_index: 1,
62            ///         page_size: 20,
63            ///     }).await.expect("Next page exists");
64            /// }
65            /// ```
66            pub fn new_paginated_with_cursor<Cursor, Fut>(
67                getter: impl Fn(Key, usize, Option<Cursor>) -> Fut + 'static $($impl_fn_generics)*,
68            ) -> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>
69            where
70                Cursor: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
71                Fut: Future<Output = (Vec<PageItem>, Option<Cursor>)> $($impl_fut_generics)*,
72            {
73                let getter = Arc::new(getter);
74                let backing_cache_scope = $name::new({
75                    let getter = getter.clone();
76                    move |key: KeyWithItemCountRequestedUnhashed<Key>| {
77                        let getter = getter.clone();
78                        async move {
79                            let (items, mut cursor) = getter(key.key, key.item_count_requested, None).await;
80
81                            // Protect incorrect Some(cursor) when clearly no items left:
82                            if items.is_empty() {
83                                cursor = None;
84                            }
85
86                            BackingCache {
87                                inner: Arc::new(BackingCacheInner {
88                                    items: parking_lot::Mutex::new(items),
89                                    cursor: parking_lot::Mutex::new(cursor),
90                                    update_lock: futures::lock::Mutex::new(()),
91                                }),
92                            }
93                        }
94                    }
95                })
96                // Shouldn't itself expire, the paginated query will control that:
97                .with_options(
98                    QueryOptions::default()
99                        .with_stale_time(Duration::MAX)
100                        .with_gc_time(Duration::MAX)
101                        .with_refetch_interval(Duration::MAX)
102                );
103
104                $name::new({
105                    let backing_cache_scope = backing_cache_scope.clone();
106                    move |page_key: PaginatedPageKey<Key>| {
107                        let backing_cache_scope = backing_cache_scope.clone();
108                        let getter = getter.clone();
109                        async move {
110                            let untyped_client = use_context::<UntypedQueryClient>()
111                                .expect(
112                                    "leptos-fetch bug, UntypedQueryClient should always have been \
113                                    provided to the query context internally"
114                            );
115                            let scope_cache_key = use_context::<ScopeCacheKey>()
116                                .expect(
117                                    "leptos-fetch bug, ScopeCacheKey itself should always have been \
118                                    provided to the query context internally"
119                                );
120
121                            // If this query is reloading because it was stale, should
122                            // invalidate the backing cache before reading it,
123                            // otherwise will still get back the same stale data again:
124                            if let Some(metadata) = untyped_client
125                                .query_metadata::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>(
126                                scope_cache_key,
127                                &page_key,
128                            )
129                            && metadata.stale_or_invalidated
130                            && let Some(backing_metadata) = untyped_client
131                                .query_metadata::<KeyWithItemCountRequestedUnhashed<Key>, BackingCache<PageItem, Cursor>>(
132                                backing_cache_scope.cache_key,
133                                &KeyWithItemCountRequestedUnhashed {
134                                    key: page_key.key.clone(),
135                                    item_count_requested: 0, // Doesn't matter, it's not part of the hash
136                                },
137                            )
138                            && backing_metadata.updated_at <= metadata.updated_at
139                            {
140                                untyped_client.invalidate_query(
141                                    &backing_cache_scope,
142                                    KeyWithItemCountRequestedUnhashed {
143                                        key: page_key.key.clone(),
144                                        item_count_requested: 0, // Doesn't matter, it's not part of the hash
145                                    },
146                                );
147                            }
148
149                            let infinite_cache = untyped_client
150                                .$fetch_fn(
151                                    backing_cache_scope,
152                                    KeyWithItemCountRequestedUnhashed {
153                                        key: page_key.key.clone(),
154                                        item_count_requested: page_key.page_size,
155                                    },
156                                )
157                                .await;
158
159                            let target_idx_start = page_key.page_index * (page_key.page_size as usize);
160                            let target_idx_end_exclusive = (page_key.page_index + 1) * (page_key.page_size as usize);
161
162                            // Load x more if needed:
163                            let should_request_x_more = || {
164                                let items = infinite_cache.inner.items.lock();
165                                if items.len() < target_idx_end_exclusive
166                                    && infinite_cache.inner.cursor.lock().is_some() {
167                                        Some(target_idx_end_exclusive - items.len())
168                                    } else {
169                                        None
170                                    }
171                            };
172
173                            if should_request_x_more().is_some() {
174                                // Preventing multiple simultaneous fetches by holding an async lock,
175                                // but note making sure to not hold any sync locks across await boundaries.
176                                let mut _guard = infinite_cache.inner.update_lock.lock().await;
177                                while let Some(amount_needed) = should_request_x_more() {
178                                    let cur_token = (&*infinite_cache.inner.cursor.lock()).clone();
179                                    let (items, cursor) =
180                                        getter(page_key.key.clone(), amount_needed, cur_token).await;
181                                    if !items.is_empty() {
182                                        infinite_cache.inner.items.lock().extend(items);
183                                        *infinite_cache.inner.cursor.lock() = cursor;
184                                    } else {
185                                        // Protect incorrect Some(cursor) when clearly no items left:
186                                        *infinite_cache.inner.cursor.lock() = None;
187                                    }
188                                }
189                                drop(_guard);
190                            }
191
192                            let items_guard = infinite_cache.inner.items.lock();
193                            if target_idx_start >= items_guard.len() {
194                                None
195                            } else {
196                                let items = items_guard
197                                    [target_idx_start..std::cmp::min(target_idx_end_exclusive, items_guard.len())]
198                                    .to_vec();
199                                let next_page_exists = items_guard.len() > target_idx_end_exclusive
200                                    || infinite_cache.inner.cursor.lock().is_some();
201                                Some((items, next_page_exists))
202                            }
203                        }
204                    }
205                })
206                // An invalidation or clear of any page should invalidate the backing cache:
207                .on_invalidation({
208                    let backing_cache_scope = backing_cache_scope.clone();
209                    move |key| {
210                        let untyped_client = use_context::<UntypedQueryClient>()
211                            .expect(
212                                "leptos-fetch bug, UntypedQueryClient should always have been \
213                                provided to the on_invalidation context internally"
214                            );
215                        untyped_client.invalidate_query(
216                            &backing_cache_scope,
217                            KeyWithItemCountRequestedUnhashed {
218                                key: key.key.clone(),
219                                item_count_requested: 0, // Doesn't matter, it's not part of the hash
220                            },
221                        );
222                    }
223                })
224                // If this was the last page existing now being gc'd, clear the backing cache too:
225                .on_gc(move |key| {
226                    let untyped_client = use_context::<UntypedQueryClient>()
227                        .expect(
228                            "leptos-fetch bug, UntypedQueryClient should always have been \
229                            provided to the on_gc context internally"
230                        );
231                    let scope_cache_key = use_context::<ScopeCacheKey>()
232                        .expect(
233                            "leptos-fetch bug, ScopeCacheKey itself should always have been \
234                            provided to the on_gc context internally"
235                        );
236                    let mut found_nb = 0;
237                    untyped_client
238                        .scope_lookup
239                        .with_cached_scope_mut::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>, _, _>(
240                            &mut untyped_client.scope_lookup.scopes_mut(),
241                            scope_cache_key,
242                            OnScopeMissing::Skip,
243                            |_| {},
244                            |maybe_scope, _| {
245                                if let Some(scope) = maybe_scope {
246                                    for query_or_pending in scope.all_queries_mut_include_pending() {
247                                        if query_or_pending
248                                            .key()
249                                            .value_if_safe()
250                                            .map(|test_key| test_key.key == key.key)
251                                            .unwrap_or(false)
252                                        {
253                                            found_nb += 1;
254                                        }
255                                    }
256                                }
257                            },
258                        );
259                    if found_nb == 0 {
260                        untyped_client.clear_query(
261                            &backing_cache_scope,
262                            KeyWithItemCountRequestedUnhashed {
263                                key: key.key.clone(),
264                                item_count_requested: 0, // Doesn't matter, it's not part of the hash
265                            },
266                        );
267                    }
268                })
269            }
270        }
271    };
272}
273
274define! { [+ Send], [+ Send + Sync], QueryScope, fetch_query, "QueryScope" }
275define! { [], [], QueryScopeLocal, fetch_query_local, "QueryScopeLocal" }
276
277#[derive(Debug, Clone)]
278struct KeyWithItemCountRequestedUnhashed<Key> {
279    key: Key,
280    item_count_requested: usize,
281}
282
283impl<Key: Hash> Hash for KeyWithItemCountRequestedUnhashed<Key> {
284    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
285        self.key.hash(state);
286    }
287}
288
289#[derive(Debug, Clone)]
290struct BackingCache<Item, Cursor> {
291    inner: Arc<BackingCacheInner<Item, Cursor>>,
292}
293
294#[derive(Debug)]
295struct BackingCacheInner<Item, Cursor> {
296    items: parking_lot::Mutex<Vec<Item>>,
297    cursor: parking_lot::Mutex<Option<Cursor>>,
298    update_lock: futures::lock::Mutex<()>,
299}
300
301#[cfg(test)]
302mod tests {
303    use any_spawner::Executor;
304    use hydration_context::SsrSharedContext;
305    use leptos::prelude::*;
306    use rstest::*;
307    use std::sync::Arc;
308    use std::sync::atomic::{AtomicUsize, Ordering};
309
310    use crate::test::prep_vari;
311    use crate::{PaginatedPageKey, QueryClient, QueryScope};
312
313    /// We don't know the serialization mechanism the user is using, so cannot return
314    /// wrapping types from the query function.
315    /// Hence returning (Vec<Item>, has_more_pages: bool) instead of a custom wrapping Page<Item> type.
316    /// This test is just checking compilation.
317    #[tokio::test]
318    async fn test_paginated_serialization_works() {
319        crate::test::identify_parking_lot_deadlocks();
320        tokio::task::LocalSet::new()
321            .run_until(async move {
322                let (client, _guard, _owner) = prep_vari!(true);
323                let scope = QueryScope::new_paginated_with_cursor(|_: (), _, _| async { (vec![()], Some(())) });
324                client.resource(scope, || PaginatedPageKey {
325                    key: (),
326                    page_index: 0,
327                    page_size: 10,
328                });
329            })
330            .await;
331    }
332
333    fn get_simple_api_fn(
334        num_rows: usize,
335    ) -> (
336        Arc<AtomicUsize>,
337        impl Fn(usize, Option<usize>) -> (Vec<usize>, Option<usize>) + Clone + 'static,
338    ) {
339        let call_count = Arc::new(AtomicUsize::new(0));
340
341        let api_fn = {
342            let call_count = call_count.clone();
343            move |target_return_count: usize, offset: Option<usize>| {
344                let call_count = call_count.clone();
345                call_count.fetch_add(1, Ordering::Relaxed);
346
347                let offset = offset.unwrap_or(0);
348                let items = (0..num_rows).skip(offset).take(target_return_count).collect::<Vec<_>>();
349                let next_offset = if offset + target_return_count < num_rows {
350                    Some(offset + items.len())
351                } else {
352                    None
353                };
354                (items, next_offset)
355            }
356        };
357
358        (call_count, api_fn)
359    }
360
361    #[tokio::test]
362    async fn test_paginated_cursor() {
363        crate::test::identify_parking_lot_deadlocks();
364        tokio::task::LocalSet::new()
365            .run_until(async move {
366                let (client, _guard, _owner) = prep_vari!(true);
367
368                let (_call_count, my_api_fn) = get_simple_api_fn(30);
369
370                // The scope/queryer can now be used like any other QueryScope,
371                // just with PaginatedPageKey<YourKey> as the key type.
372                // In resources, fetch_query, etc.
373                let scope = QueryScope::new_paginated_with_cursor(move |_query_key, page_size, offset| {
374                    let my_api_fn = my_api_fn.clone();
375                    async move {
376                        let (items, maybe_next_offset) = my_api_fn(page_size, offset);
377                        (items, maybe_next_offset)
378                    }
379                });
380
381                let (first_page_logs, more_pages) = client
382                    .fetch_query(
383                        scope.clone(),
384                        PaginatedPageKey {
385                            key: (),
386                            page_index: 0,
387                            page_size: 20,
388                        },
389                    )
390                    .await
391                    .expect(
392                        "This page should exist (Some()), \
393                        None when a page is requested beyond the end of the data.",
394                    );
395
396                assert_eq!(first_page_logs, (0..20).collect::<Vec<_>>());
397
398                assert!(
399                    more_pages,
400                    "There should be more pages after the first page, \
401                    ROW_COUNT=30 which is > page size of 20."
402                );
403
404                let (second_page_logs, more_pages) = client
405                    .fetch_query(
406                        scope.clone(),
407                        PaginatedPageKey {
408                            key: (),
409                            page_index: 1,
410                            page_size: 20,
411                        },
412                    )
413                    .await
414                    .expect(
415                        "This page should exist (Some()), \
416                        None when a page is requested beyond the end of the data.",
417                    );
418
419                assert_eq!(second_page_logs, (20..30).collect::<Vec<_>>());
420
421                assert!(
422                    !more_pages,
423                    "20+20=40 which is > ROW_COUNT=30, so no more pages after the second page."
424                );
425
426                assert!(
427                    client
428                        .fetch_query(
429                            scope.clone(),
430                            PaginatedPageKey {
431                                key: (),
432                                page_index: 2,
433                                page_size: 20,
434                            },
435                        )
436                        .await
437                        .is_none()
438                );
439            })
440            .await;
441    }
442
443    /// Test that the pagination logic keeps calling the API until page_size items are available
444    #[tokio::test]
445    async fn test_paginated_cursor_fills_page_size_with_multiple_calls() {
446        crate::test::identify_parking_lot_deadlocks();
447        tokio::task::LocalSet::new()
448            .run_until(async move {
449                let (client, _guard, _owner) = prep_vari!(true);
450
451                let call_count = Arc::new(AtomicUsize::new(0));
452                let call_count_clone = call_count.clone();
453
454                // API that returns fewer items than requested
455                let scope = QueryScope::new_paginated_with_cursor(move |_key: (), _page_size, continuation| {
456                    let call_count = call_count_clone.clone();
457                    async move {
458                        call_count.fetch_add(1, Ordering::Relaxed);
459
460                        // Return only 3 items per call, even if more are requested
461                        match continuation {
462                            None => {
463                                // First call
464                                (vec![0, 1, 2], Some(3))
465                            }
466                            Some(3) => {
467                                // Second call
468                                (vec![3, 4, 5], Some(6))
469                            }
470                            Some(6) => {
471                                // Third call
472                                (vec![6, 7, 8], Some(9))
473                            }
474                            Some(9) => {
475                                // Fourth call
476                                (vec![9, 10], None) // Only 2 items left
477                            }
478                            _ => (vec![], None),
479                        }
480                    }
481                });
482
483                // Request page with size 10 - should make 4 API calls to get 10 items
484                let (items, has_more) = client
485                    .fetch_query(
486                        scope.clone(),
487                        PaginatedPageKey {
488                            key: (),
489                            page_index: 0,
490                            page_size: 10,
491                        },
492                    )
493                    .await
494                    .expect("Page should exist");
495
496                assert_eq!(items.len(), 10, "Should have filled to page_size");
497                assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
498                assert!(has_more, "Should indicate more pages available");
499                assert_eq!(
500                    call_count.load(Ordering::Relaxed),
501                    4,
502                    "Should have made 4 API calls to fill page"
503                );
504
505                // Request second page - should only need 1 more API call
506                let (items, has_more) = client
507                    .fetch_query(
508                        scope.clone(),
509                        PaginatedPageKey {
510                            key: (),
511                            page_index: 1,
512                            page_size: 10,
513                        },
514                    )
515                    .await
516                    .expect("Page should exist");
517
518                assert_eq!(items.len(), 1, "Only 1 item left");
519                assert_eq!(items, vec![10]);
520                assert!(!has_more, "No more pages");
521                assert_eq!(
522                    call_count.load(Ordering::Relaxed),
523                    4,
524                    "Should not make additional calls - data already cached"
525                );
526            })
527            .await;
528
529        let (_call_count, my_api_fn) = get_simple_api_fn(30);
530
531        // Create the scope
532        let scope = QueryScope::new_paginated_with_cursor(move |_query_key, page_size, offset| {
533            let my_api_fn = my_api_fn.clone();
534            async move {
535                let (items, maybe_next_offset) = my_api_fn(page_size, offset);
536                (items, maybe_next_offset)
537            }
538        });
539
540        let client = QueryClient::new();
541
542        // Fetch first page
543        let (first_page, more_pages) = client
544            .fetch_query(
545                scope.clone(),
546                PaginatedPageKey {
547                    key: (),
548                    page_index: 0,
549                    page_size: 20,
550                },
551            )
552            .await
553            .expect("First page should exist");
554
555        assert_eq!(first_page, (0..20).collect::<Vec<_>>());
556        assert!(more_pages);
557
558        // Fetch second page
559        let (second_page, more_pages) = client
560            .fetch_query(
561                scope.clone(),
562                PaginatedPageKey {
563                    key: (),
564                    page_index: 1,
565                    page_size: 20,
566                },
567            )
568            .await
569            .expect("Second page should exist");
570
571        assert_eq!(second_page, (20..30).collect::<Vec<_>>());
572        assert!(!more_pages);
573
574        // Requesting beyond available data returns None
575        assert!(
576            client
577                .fetch_query(
578                    scope.clone(),
579                    PaginatedPageKey {
580                        key: (),
581                        page_index: 2,
582                        page_size: 20,
583                    },
584                )
585                .await
586                .is_none()
587        );
588    }
589
590    /// Test that API calls are cached properly and not repeated unnecessarily
591    #[tokio::test]
592    async fn test_paginated_cursor_no_unnecessary_api_calls() {
593        crate::test::identify_parking_lot_deadlocks();
594        tokio::task::LocalSet::new()
595            .run_until(async move {
596                let (client, _guard, _owner) = prep_vari!(true);
597
598                let call_count = Arc::new(AtomicUsize::new(0));
599                let call_count_clone = call_count.clone();
600
601                let scope = QueryScope::new_paginated_with_cursor(move |_key: (), page_size, continuation| {
602                    let call_count = call_count_clone.clone();
603                    async move {
604                        call_count.fetch_add(1, Ordering::Relaxed);
605
606                        let offset = continuation.unwrap_or(0);
607                        let items: Vec<usize> = (offset..offset + page_size).collect();
608                        let next = if offset + page_size < 100 {
609                            Some(offset + page_size)
610                        } else {
611                            None
612                        };
613                        (items, next)
614                    }
615                });
616
617                // First fetch
618                let _ = client
619                    .fetch_query(
620                        scope.clone(),
621                        PaginatedPageKey {
622                            key: (),
623                            page_index: 0,
624                            page_size: 10,
625                        },
626                    )
627                    .await;
628                assert_eq!(
629                    call_count.load(Ordering::Relaxed),
630                    1,
631                    "First fetch should make 1 API call"
632                );
633
634                // Fetch same page again - should use cache
635                let _ = client
636                    .fetch_query(
637                        scope.clone(),
638                        PaginatedPageKey {
639                            key: (),
640                            page_index: 0,
641                            page_size: 10,
642                        },
643                    )
644                    .await;
645                assert_eq!(call_count.load(Ordering::Relaxed), 1, "Should still be 1 call - cached");
646
647                // Fetch next page
648                let _ = client
649                    .fetch_query(
650                        scope.clone(),
651                        PaginatedPageKey {
652                            key: (),
653                            page_index: 1,
654                            page_size: 10,
655                        },
656                    )
657                    .await;
658                assert_eq!(call_count.load(Ordering::Relaxed), 2, "Second page needs new API call");
659
660                // Fetch first page again - should still be cached
661                let _ = client
662                    .fetch_query(
663                        scope.clone(),
664                        PaginatedPageKey {
665                            key: (),
666                            page_index: 0,
667                            page_size: 10,
668                        },
669                    )
670                    .await;
671                assert_eq!(
672                    call_count.load(Ordering::Relaxed),
673                    2,
674                    "Should still be 2 calls - first page cached"
675                );
676            })
677            .await;
678    }
679
680    /// Test linked invalidation and clear between pages with same key
681    #[rstest]
682    #[tokio::test]
683    async fn test_paginated_cursor_linked_invalidation_and_clear(#[values(true, false)] clear: bool) {
684        crate::test::identify_parking_lot_deadlocks();
685        tokio::task::LocalSet::new()
686            .run_until(async move {
687                let (client, _guard, _owner) = prep_vari!(true);
688
689                let version = Arc::new(AtomicUsize::new(0));
690                let version_clone = version.clone();
691
692                let scope = QueryScope::new_paginated_with_cursor(move |key: String, page_size, continuation| {
693                    let v = version_clone.clone();
694                    async move {
695                        let current_version = v.load(Ordering::Relaxed);
696                        let offset = continuation.unwrap_or(0);
697
698                        // Return different data based on version
699                        let items: Vec<String> = (offset..offset + page_size)
700                            .map(|i| format!("{}_v{}_{}", key, current_version, i))
701                            .collect();
702
703                        let next = if offset + page_size < 30 {
704                            Some(offset + page_size)
705                        } else {
706                            None
707                        };
708                        (items, next)
709                    }
710                });
711
712                // Fetch first page
713                let (items1, _) = client
714                    .fetch_query(
715                        scope.clone(),
716                        PaginatedPageKey {
717                            key: "test".to_string(),
718                            page_index: 0,
719                            page_size: 10,
720                        },
721                    )
722                    .await
723                    .expect("Page should exist");
724
725                assert_eq!(items1[0], "test_v0_0");
726
727                // Fetch second page
728                let (items2, _) = client
729                    .fetch_query(
730                        scope.clone(),
731                        PaginatedPageKey {
732                            key: "test".to_string(),
733                            page_index: 1,
734                            page_size: 10,
735                        },
736                    )
737                    .await
738                    .expect("Page should exist");
739
740                assert_eq!(items2[0], "test_v0_10");
741
742                // Increment version to simulate data change
743                version.store(1, Ordering::Relaxed);
744
745                // Fetch first page again - should still be old data
746                let (items1_new, _) = client
747                    .fetch_query(
748                        scope.clone(),
749                        PaginatedPageKey {
750                            key: "test".to_string(),
751                            page_index: 0,
752                            page_size: 10,
753                        },
754                    )
755                    .await
756                    .expect("Page should exist");
757
758                assert_eq!(
759                    items1_new[0], "test_v0_0",
760                    "Should have new version after invalidation/clear"
761                );
762
763                // Invalidation/clear should lead to new data:
764                if clear {
765                    // Currently not public, but still want to check as used internally for some things:
766                    client.untyped_client.clear_query_scope(scope.clone());
767                } else {
768                    client.invalidate_query_scope(scope.clone());
769                }
770
771                // Fetch first page again - should get new data
772                let (items1_new, _) = client
773                    .fetch_query(
774                        scope.clone(),
775                        PaginatedPageKey {
776                            key: "test".to_string(),
777                            page_index: 0,
778                            page_size: 10,
779                        },
780                    )
781                    .await
782                    .expect("Page should exist");
783
784                assert_eq!(
785                    items1_new[0], "test_v1_0",
786                    "Should have new version after invalidation/clear"
787                );
788
789                // Second page should also be invalidated/cleared
790                let (items2_new, _) = client
791                    .fetch_query(
792                        scope.clone(),
793                        PaginatedPageKey {
794                            key: "test".to_string(),
795                            page_index: 1,
796                            page_size: 10,
797                        },
798                    )
799                    .await
800                    .expect("Page should exist");
801
802                assert_eq!(items2_new[0], "test_v1_10", "Second page should also have new version");
803            })
804            .await;
805    }
806
807    /// Test that empty responses are handled correctly
808    #[tokio::test]
809    async fn test_paginated_cursor_empty_response_handling() {
810        crate::test::identify_parking_lot_deadlocks();
811        tokio::task::LocalSet::new()
812            .run_until(async move {
813                let (client, _guard, _owner) = prep_vari!(true);
814
815                let scope = QueryScope::new_paginated_with_cursor(|_key: (), _page_size, continuation| async move {
816                    match continuation {
817                        None => (vec![1, 2, 3], Some(3)),
818                        Some(3) => (vec![], Some(6)), // Empty response but with cursor
819                        _ => (vec![], None),
820                    }
821                });
822
823                let (items, has_more) = client
824                    .fetch_query(
825                        scope.clone(),
826                        PaginatedPageKey {
827                            key: (),
828                            page_index: 0,
829                            page_size: 10,
830                        },
831                    )
832                    .await
833                    .expect("Page should exist");
834
835                // Should only have 3 items since second call returned empty
836                assert_eq!(items.len(), 3);
837                assert!(!has_more, "Should not have more pages when empty response received");
838            })
839            .await;
840    }
841
842    /// Test concurrent page fetches don't cause duplicate API calls
843    #[tokio::test]
844    async fn test_paginated_cursor_concurrent_fetches() {
845        crate::test::identify_parking_lot_deadlocks();
846        tokio::task::LocalSet::new()
847            .run_until(async move {
848                let (client, _guard, _owner) = prep_vari!(true);
849
850                let call_count = Arc::new(AtomicUsize::new(0));
851                let call_count_clone = call_count.clone();
852
853                let scope = QueryScope::new_paginated_with_cursor(move |_key: (), page_size, continuation| {
854                    let call_count = call_count_clone.clone();
855                    async move {
856                        call_count.fetch_add(1, Ordering::Relaxed);
857
858                        // Add a small delay to simulate network latency
859                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
860
861                        let offset = continuation.unwrap_or(0);
862                        let items: Vec<usize> = (offset..offset + page_size).collect();
863                        let next = if offset + page_size < 100 {
864                            Some(offset + page_size)
865                        } else {
866                            None
867                        };
868                        (items, next)
869                    }
870                });
871
872                // Launch multiple concurrent fetches for the same page
873                let futures = (0..5).map(|_| {
874                    client.fetch_query(
875                        scope.clone(),
876                        PaginatedPageKey {
877                            key: (),
878                            page_index: 0,
879                            page_size: 10,
880                        },
881                    )
882                });
883
884                let results = futures::future::join_all(futures).await;
885
886                // All should succeed
887                for result in results {
888                    assert!(result.is_some());
889                }
890
891                // Should only make 1 API call despite 5 concurrent requests
892                assert_eq!(
893                    call_count.load(Ordering::Relaxed),
894                    1,
895                    "Concurrent fetches should share single API call"
896                );
897            })
898            .await;
899    }
900
901    /// Test different page sizes work correctly with shared cache
902    #[tokio::test]
903    async fn test_paginated_cursor_different_page_sizes() {
904        crate::test::identify_parking_lot_deadlocks();
905        tokio::task::LocalSet::new()
906            .run_until(async move {
907                let (client, _guard, _owner) = prep_vari!(true);
908
909                let call_count = Arc::new(AtomicUsize::new(0));
910                let call_count_clone = call_count.clone();
911
912                let scope = QueryScope::new_paginated_with_cursor(move |_key: (), page_size, continuation| {
913                    let call_count = call_count_clone.clone();
914                    async move {
915                        call_count.fetch_add(1, Ordering::Relaxed);
916
917                        let offset = continuation.unwrap_or(0);
918                        let items: Vec<usize> = (offset..std::cmp::min(offset + page_size, 50)).collect();
919                        let next = if offset + page_size < 50 {
920                            Some(offset + page_size)
921                        } else {
922                            None
923                        };
924                        (items, next)
925                    }
926                });
927
928                // Fetch with page size 5
929                let (items1, _) = client
930                    .fetch_query(
931                        scope.clone(),
932                        PaginatedPageKey {
933                            key: (),
934                            page_index: 0,
935                            page_size: 5,
936                        },
937                    )
938                    .await
939                    .expect("Page should exist");
940                assert_eq!(items1.len(), 5);
941                assert_eq!(call_count.load(Ordering::Relaxed), 1);
942
943                // Fetch with page size 15 - should reuse cached data and fetch more
944                let (items2, _) = client
945                    .fetch_query(
946                        scope.clone(),
947                        PaginatedPageKey {
948                            key: (),
949                            page_index: 0,
950                            page_size: 15,
951                        },
952                    )
953                    .await
954                    .expect("Page should exist");
955                assert_eq!(items2.len(), 15);
956                assert_eq!(
957                    call_count.load(Ordering::Relaxed),
958                    2,
959                    "Should fetch more data for larger page"
960                );
961
962                // Fetch with page size 10 - should use cached data
963                let (items3, _) = client
964                    .fetch_query(
965                        scope.clone(),
966                        PaginatedPageKey {
967                            key: (),
968                            page_index: 0,
969                            page_size: 10,
970                        },
971                    )
972                    .await
973                    .expect("Page should exist");
974                assert_eq!(items3.len(), 10);
975                assert_eq!(
976                    call_count.load(Ordering::Relaxed),
977                    2,
978                    "Should use cached data, no new fetch"
979                );
980            })
981            .await;
982    }
983
984    /// Test that backing cache is properly managed when paginated pages expire
985    /// Tests both GC (garbage collection) and stale time scenarios
986    #[rstest]
987    #[case::gc_time(TestMode::GcTime)]
988    #[case::stale_time(TestMode::StaleTime)]
989    #[tokio::test]
990    async fn test_paginated_cursor_backing_cache_lifecycle(#[case] mode: TestMode) {
991        crate::test::identify_parking_lot_deadlocks();
992        tokio::task::LocalSet::new()
993            .run_until(async move {
994                let (client, _guard, _owner) = prep_vari!(true);
995
996                let call_count = Arc::new(AtomicUsize::new(0));
997                let call_count_clone = call_count.clone();
998
999                let scope = QueryScope::new_paginated_with_cursor(move |key: String, page_size, continuation| {
1000                    let call_count = call_count_clone.clone();
1001                    async move {
1002                        call_count.fetch_add(1, Ordering::Relaxed);
1003
1004                        let offset = continuation.unwrap_or(0);
1005                        let items: Vec<String> =
1006                            (offset..offset + page_size).map(|i| format!("{}_{}", key, i)).collect();
1007                        let next = if offset + page_size < 30 {
1008                            Some(offset + page_size)
1009                        } else {
1010                            None
1011                        };
1012                        (items, next)
1013                    }
1014                })
1015                .with_options(match mode {
1016                    TestMode::GcTime => {
1017                        crate::QueryOptions::default().with_gc_time(std::time::Duration::from_millis(100))
1018                    }
1019                    TestMode::StaleTime => {
1020                        crate::QueryOptions::default().with_stale_time(std::time::Duration::from_millis(100))
1021                    }
1022                });
1023
1024                // Fetch first page
1025                let (items, _) = client
1026                    .fetch_query(
1027                        scope.clone(),
1028                        PaginatedPageKey {
1029                            key: "test".to_string(),
1030                            page_index: 0,
1031                            page_size: 10,
1032                        },
1033                    )
1034                    .await
1035                    .expect("Page should exist");
1036                assert_eq!(items[0], "test_0");
1037                assert_eq!(call_count.load(Ordering::Relaxed), 1);
1038
1039                // Wait for 50ms, 50ms left till gc/stale:
1040                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1041
1042                // Fetch second page
1043                let (items, _) = client
1044                    .fetch_query(
1045                        scope.clone(),
1046                        PaginatedPageKey {
1047                            key: "test".to_string(),
1048                            page_index: 1,
1049                            page_size: 10,
1050                        },
1051                    )
1052                    .await
1053                    .expect("Page should exist");
1054                assert_eq!(items[0], "test_10");
1055                assert_eq!(call_count.load(Ordering::Relaxed), 2);
1056
1057                // Wait another 50ms, so the first query is stale/gc'd, but the second
1058                // is valid even when gc'd because 50ms left:
1059                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1060
1061                // Fetch first page again
1062                let (items, _) = client
1063                    .fetch_query(
1064                        scope.clone(),
1065                        PaginatedPageKey {
1066                            key: "test".to_string(),
1067                            page_index: 0,
1068                            page_size: 10,
1069                        },
1070                    )
1071                    .await
1072                    .expect("Page should exist");
1073                assert_eq!(items[0], "test_0");
1074
1075                // Fetch second page again
1076                let (items, _) = client
1077                    .fetch_query(
1078                        scope.clone(),
1079                        PaginatedPageKey {
1080                            key: "test".to_string(),
1081                            page_index: 1,
1082                            page_size: 10,
1083                        },
1084                    )
1085                    .await
1086                    .expect("Page should exist");
1087                assert_eq!(items[0], "test_10");
1088
1089                let expected_calls = match mode {
1090                    TestMode::GcTime => {
1091                        // GC cleared page 0, but page 1 kept backing cache alive
1092                        // So page 0 refetch reuses backing cache, plus page 1 still alive so also reused
1093                        2
1094                    }
1095                    TestMode::StaleTime => {
1096                        // Stale page triggers invalidation which clears backing cache
1097                        // So page 0 refetch needs new API call, but page 1 is still valid so reused
1098                        3
1099                    }
1100                };
1101                assert_eq!(call_count.load(Ordering::Relaxed), expected_calls);
1102
1103                // If gc, should clear the backing cache only when all pages are gc'd:
1104                if matches!(mode, TestMode::GcTime) {
1105                    tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1106
1107                    // Now fetch again - backing cache should be gone
1108                    let (items, _) = client
1109                        .fetch_query(
1110                            scope.clone(),
1111                            PaginatedPageKey {
1112                                key: "test".to_string(),
1113                                page_index: 0,
1114                                page_size: 10,
1115                            },
1116                        )
1117                        .await
1118                        .expect("Page should exist");
1119                    assert_eq!(items[0], "test_0");
1120                    assert_eq!(
1121                        call_count.load(Ordering::Relaxed),
1122                        3,
1123                        "Backing cache should be cleared when all pages are GC'd"
1124                    );
1125                }
1126            })
1127            .await;
1128    }
1129
1130    #[derive(Debug, Clone, Copy)]
1131    enum TestMode {
1132        /// Test GC behavior: backing cache should only be cleared when all pages are GC'd
1133        GcTime,
1134        /// Test stale time behavior: backing cache should be invalidated when any page goes stale
1135        StaleTime,
1136    }
1137}