leptos_fetch/pagination/
paginated_cursor.rs

1use std::time::Duration;
2use std::{hash::Hash, sync::Arc};
3
4use leptos::prelude::*;
5
6use crate::{
7    PaginatedPageKey, QueryOptions, QueryScope, QueryScopeLocal, UntypedQueryClient,
8    cache::OnScopeMissing, debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
9    query_scope::ScopeCacheKey,
10};
11
12macro_rules! define {
13    ([$($impl_fut_generics:tt)*], [$($impl_fn_generics:tt)*], $name:ident, $fetch_fn:ident, $sname:literal) => {
14
15        impl<Key, PageItem> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>
16        where
17            Key: DebugIfDevtoolsEnabled + Clone + Hash + PartialEq + 'static $($impl_fn_generics)*,
18            PageItem: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
19        {
20            /// Create a cursor-based paginated query scope.
21            ///
22            /// Use this when your API uses continuation tokens/cursors rather than numeric offsets.
23            /// Good for infinite scroll patterns or when your API doesn't support offset-based pagination.
24            ///
25            /// # Arguments
26            ///
27            /// The getter receives:
28            /// - `query_key: Key` - Your custom key, same across all pages
29            /// - `nb_items_requested: usize` - Target number of items to return, will call again if not enough items returned
30            /// - `cursor: Option<Cursor>` - Cursor from previous fetch, `None` on first page
31            ///
32            /// The getter must return:
33            /// - `Vec<Item>` - Items for this page
34            /// - `Option<Cursor>` - Next cursor token, `None` when no more data
35            ///
36            /// # Example
37            ///
38            /// ```rust,ignore
39            /// use leptos_fetch::{QueryScope, PaginatedPageKey};
40            ///
41            /// // Create paginated scope with cursor-based API
42            /// let scope = QueryScope::new_paginated_with_cursor(
43            ///     |_key: (), nb_items, cursor: Option<String>| async move {
44            ///         // Call your API with cursor token
45            ///         let (items, next_cursor) = fetch_from_api(cursor, nb_items).await;
46            ///         (items, next_cursor)
47            ///     }
48            /// );
49            ///
50            /// // Use like any other scope - fetch pages with PaginatedPageKey
51            /// let (items, has_more) = client.fetch_query(scope, PaginatedPageKey {
52            ///     key: (),
53            ///     page_index: 0,
54            ///     page_size: 20,
55            /// }).await.expect("Page exists");
56            ///
57            /// // has_more: bool indicates if another page is available
58            /// if has_more {
59            ///     let (next_items, _) = client.fetch_query(scope, PaginatedPageKey {
60            ///         key: (),
61            ///         page_index: 1,
62            ///         page_size: 20,
63            ///     }).await.expect("Next page exists");
64            /// }
65            /// ```
66            pub fn new_paginated_with_cursor<Cursor, Fut>(
67                getter: impl Fn(Key, usize, Option<Cursor>) -> Fut + 'static $($impl_fn_generics)*,
68            ) -> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>
69            where
70                Cursor: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
71                Fut: Future<Output = (Vec<PageItem>, Option<Cursor>)> $($impl_fut_generics)*,
72            {
73                let getter = Arc::new(getter);
74                let backing_cache_scope = $name::new({
75                    let getter = getter.clone();
76                    move |key: KeyWithItemCountRequestedUnhashed<Key>| {
77                        let getter = getter.clone();
78                        async move {
79                            let (items, mut cursor) = getter(key.key, key.item_count_requested, None).await;
80
81                            // Protect incorrect Some(cursor) when clearly no items left:
82                            if items.is_empty() {
83                                cursor = None;
84                            }
85
86                            BackingCache {
87                                inner: Arc::new(BackingCacheInner {
88                                    items: parking_lot::Mutex::new(items),
89                                    cursor: parking_lot::Mutex::new(cursor),
90                                    update_lock: futures::lock::Mutex::new(()),
91                                }),
92                            }
93                        }
94                    }
95                })
96                // Shouldn't itself expire, the paginated query will control that:
97                .with_options(
98                    QueryOptions::default()
99                        .with_stale_time(Duration::MAX)
100                        .with_gc_time(Duration::MAX)
101                        .with_refetch_interval(Duration::MAX)
102                );
103
104                $name::new({
105                    let backing_cache_scope = backing_cache_scope.clone();
106                    move |page_key: PaginatedPageKey<Key>| {
107                        let backing_cache_scope = backing_cache_scope.clone();
108                        let getter = getter.clone();
109                        async move {
110                            let untyped_client = use_context::<UntypedQueryClient>()
111                                .expect(
112                                    "leptos-fetch bug, UntypedQueryClient should always have been \
113                                    provided to the query context internally"
114                            );
115                            let scope_cache_key = use_context::<ScopeCacheKey>()
116                                .expect(
117                                    "leptos-fetch bug, ScopeCacheKey itself should always have been \
118                                    provided to the query context internally"
119                                );
120
121                            // If this query is reloading because it was stale, should invalidate the backing cache before reading it,
122                            // otherwise will still get back the same stale data again:
123                            if let Some(metadata) = untyped_client.query_metadata::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>>(
124                                scope_cache_key,
125                                &page_key,
126                            )
127                            && metadata.stale_or_invalidated
128                            && let Some(backing_metadata) = untyped_client.query_metadata::<KeyWithItemCountRequestedUnhashed<Key>, BackingCache<PageItem, Cursor>>(
129                                backing_cache_scope.cache_key,
130                                &KeyWithItemCountRequestedUnhashed {
131                                    key: page_key.key.clone(),
132                                    item_count_requested: 0, // Doesn't matter, it's not part of the hash
133                                },
134                            )
135                            && backing_metadata.updated_at <= metadata.updated_at
136                            {
137                                untyped_client.invalidate_query(
138                                    &backing_cache_scope,
139                                    KeyWithItemCountRequestedUnhashed {
140                                        key: page_key.key.clone(),
141                                        item_count_requested: 0, // Doesn't matter, it's not part of the hash
142                                    },
143                                );
144                            }
145
146                            let infinite_cache = untyped_client
147                                .$fetch_fn(
148                                    backing_cache_scope,
149                                    KeyWithItemCountRequestedUnhashed {
150                                        key: page_key.key.clone(),
151                                        item_count_requested: page_key.page_size,
152                                    },
153                                )
154                                .await;
155
156                            let target_idx_start = page_key.page_index * (page_key.page_size as usize);
157                            let target_idx_end_exclusive = (page_key.page_index + 1) * (page_key.page_size as usize);
158
159                            // Load x more if needed:
160                            let should_request_x_more = || {
161                                let items = infinite_cache.inner.items.lock();
162                                if items.len() < target_idx_end_exclusive
163                                    && infinite_cache.inner.cursor.lock().is_some() {
164                                        Some(target_idx_end_exclusive - items.len())
165                                    } else {
166                                        None
167                                    }
168                            };
169
170                            if should_request_x_more().is_some() {
171                                // Preventing multiple simultaneous fetches by holding an async lock,
172                                // but note making sure to not hold any sync locks across await boundaries.
173                                let mut _guard = infinite_cache.inner.update_lock.lock().await;
174                                while let Some(amount_needed) = should_request_x_more() {
175                                    let cur_token = (&*infinite_cache.inner.cursor.lock()).clone();
176                                    let (items, cursor) =
177                                        getter(page_key.key.clone(), amount_needed, cur_token).await;
178                                    if !items.is_empty() {
179                                        infinite_cache.inner.items.lock().extend(items);
180                                        *infinite_cache.inner.cursor.lock() = cursor;
181                                    } else {
182                                        // Protect incorrect Some(cursor) when clearly no items left:
183                                        *infinite_cache.inner.cursor.lock() = None;
184                                    }
185                                }
186                                drop(_guard);
187                            }
188
189                            let items_guard = infinite_cache.inner.items.lock();
190                            if target_idx_start >= items_guard.len() {
191                                None
192                            } else {
193                                let items = items_guard
194                                    [target_idx_start..std::cmp::min(target_idx_end_exclusive, items_guard.len())]
195                                    .to_vec();
196                                let next_page_exists = items_guard.len() > target_idx_end_exclusive
197                                    || infinite_cache.inner.cursor.lock().is_some();
198                                Some((items, next_page_exists))
199                            }
200                        }
201                    }
202                })
203                // An invalidation or clear of any page should invalidate the backing cache:
204                .on_invalidation({
205                    let backing_cache_scope = backing_cache_scope.clone();
206                    move |key| {
207                        let untyped_client = use_context::<UntypedQueryClient>()
208                            .expect(
209                                "leptos-fetch bug, UntypedQueryClient should always have been \
210                                provided to the on_invalidation context internally"
211                            );
212                        untyped_client.invalidate_query(
213                            &backing_cache_scope,
214                            KeyWithItemCountRequestedUnhashed {
215                                key: key.key.clone(),
216                                item_count_requested: 0, // Doesn't matter, it's not part of the hash
217                            },
218                        );
219                    }
220                })
221                // If this was the last page existing now being gc'd, clear the backing cache too:
222                .on_gc(move |key| {
223                    let untyped_client = use_context::<UntypedQueryClient>()
224                        .expect(
225                            "leptos-fetch bug, UntypedQueryClient should always have been \
226                            provided to the on_gc context internally"
227                        );
228                    let scope_cache_key = use_context::<ScopeCacheKey>()
229                        .expect(
230                            "leptos-fetch bug, ScopeCacheKey itself should always have been \
231                            provided to the on_gc context internally"
232                        );
233                    let mut found_nb = 0;
234                    untyped_client
235                        .scope_lookup
236                        .with_cached_scope_mut::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, bool)>, _, _>(
237                            &mut untyped_client.scope_lookup.scopes_mut(),
238                            scope_cache_key,
239                            OnScopeMissing::Skip,
240                            |_| {},
241                            |maybe_scope, _| {
242                                if let Some(scope) = maybe_scope {
243                                    for query_or_pending in scope.all_queries_mut_include_pending() {
244                                        if query_or_pending.key().value_if_safe().map(|test_key| test_key.key == key.key).unwrap_or(false) {
245                                            found_nb += 1;
246                                        }
247                                    }
248                                }
249                            },
250                        );
251                    if found_nb == 0 {
252                        untyped_client.clear_query(
253                            &backing_cache_scope,
254                            KeyWithItemCountRequestedUnhashed {
255                                key: key.key.clone(),
256                                item_count_requested: 0, // Doesn't matter, it's not part of the hash
257                            },
258                        );
259                    }
260                })
261            }
262        }
263    };
264}
265
266define! { [+ Send], [+ Send + Sync], QueryScope, fetch_query, "QueryScope" }
267define! { [], [], QueryScopeLocal, fetch_query_local, "QueryScopeLocal" }
268
269#[derive(Debug, Clone)]
270struct KeyWithItemCountRequestedUnhashed<Key> {
271    key: Key,
272    item_count_requested: usize,
273}
274
275impl<Key: Hash> Hash for KeyWithItemCountRequestedUnhashed<Key> {
276    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
277        self.key.hash(state);
278    }
279}
280
281#[derive(Debug, Clone)]
282struct BackingCache<Item, Cursor> {
283    inner: Arc<BackingCacheInner<Item, Cursor>>,
284}
285
286#[derive(Debug)]
287struct BackingCacheInner<Item, Cursor> {
288    items: parking_lot::Mutex<Vec<Item>>,
289    cursor: parking_lot::Mutex<Option<Cursor>>,
290    update_lock: futures::lock::Mutex<()>,
291}
292
293#[cfg(test)]
294mod tests {
295    use any_spawner::Executor;
296    use hydration_context::SsrSharedContext;
297    use leptos::prelude::*;
298    use rstest::*;
299    use std::sync::Arc;
300    use std::sync::atomic::{AtomicUsize, Ordering};
301
302    use crate::test::prep_vari;
303    use crate::{PaginatedPageKey, QueryClient, QueryScope};
304
305    /// We don't know the serialization mechanism the user is using, so cannot return wrapping types from the query function.
306    /// Hence returning (Vec<Item>, has_more_pages: bool) instead of a custom wrapping Page<Item> type.
307    /// This test is just checking compilation.
308    #[tokio::test]
309    async fn test_paginated_serialization_works() {
310        crate::test::identify_parking_lot_deadlocks();
311        tokio::task::LocalSet::new()
312            .run_until(async move {
313                let (client, _guard, _owner) = prep_vari!(true);
314                let scope = QueryScope::new_paginated_with_cursor(|_: (), _, _| async {
315                    (vec![()], Some(()))
316                });
317                client.resource(scope, || PaginatedPageKey {
318                    key: (),
319                    page_index: 0,
320                    page_size: 10,
321                });
322            })
323            .await;
324    }
325
326    fn get_simple_api_fn(
327        num_rows: usize,
328    ) -> (
329        Arc<AtomicUsize>,
330        impl Fn(usize, Option<usize>) -> (Vec<usize>, Option<usize>) + Clone + 'static,
331    ) {
332        let call_count = Arc::new(AtomicUsize::new(0));
333
334        let api_fn = {
335            let call_count = call_count.clone();
336            move |target_return_count: usize, offset: Option<usize>| {
337                let call_count = call_count.clone();
338                call_count.fetch_add(1, Ordering::Relaxed);
339
340                let offset = offset.unwrap_or(0);
341                let items = (0..num_rows)
342                    .skip(offset)
343                    .take(target_return_count)
344                    .collect::<Vec<_>>();
345                let next_offset = if offset + target_return_count < num_rows {
346                    Some(offset + items.len())
347                } else {
348                    None
349                };
350                (items, next_offset)
351            }
352        };
353
354        (call_count, api_fn)
355    }
356
357    #[tokio::test]
358    async fn test_paginated_cursor() {
359        crate::test::identify_parking_lot_deadlocks();
360        tokio::task::LocalSet::new()
361            .run_until(async move {
362                let (client, _guard, _owner) = prep_vari!(true);
363
364                let (_call_count, my_api_fn) = get_simple_api_fn(30);
365
366                // The scope/queryer can now be used like any other QueryScope, just with PaginatedPageKey<YourKey> as the key type.
367                // In resources, fetch_query, etc.
368                let scope =
369                    QueryScope::new_paginated_with_cursor(move |_query_key, page_size, offset| {
370                        let my_api_fn = my_api_fn.clone();
371                        async move {
372                            let (items, maybe_next_offset) = my_api_fn(page_size, offset);
373                            (items, maybe_next_offset)
374                        }
375                    });
376
377                let (first_page_logs, more_pages) = client
378                    .fetch_query(
379                        scope.clone(),
380                        PaginatedPageKey {
381                            key: (),
382                            page_index: 0,
383                            page_size: 20,
384                        },
385                    )
386                    .await
387                    .expect(
388                        "This page should exist (Some()), \
389                        None when a page is requested beyond the end of the data.",
390                    );
391
392                assert_eq!(first_page_logs, (0..20).collect::<Vec<_>>());
393
394                assert!(
395                    more_pages,
396                    "There should be more pages after the first page, \
397                    ROW_COUNT=30 which is > page size of 20."
398                );
399
400                let (second_page_logs, more_pages) = client
401                    .fetch_query(
402                        scope.clone(),
403                        PaginatedPageKey {
404                            key: (),
405                            page_index: 1,
406                            page_size: 20,
407                        },
408                    )
409                    .await
410                    .expect(
411                        "This page should exist (Some()), \
412                        None when a page is requested beyond the end of the data.",
413                    );
414
415                assert_eq!(second_page_logs, (20..30).collect::<Vec<_>>());
416
417                assert!(
418                    !more_pages,
419                    "20+20=40 which is > ROW_COUNT=30, so no more pages after the second page."
420                );
421
422                assert!(
423                    client
424                        .fetch_query(
425                            scope.clone(),
426                            PaginatedPageKey {
427                                key: (),
428                                page_index: 2,
429                                page_size: 20,
430                            },
431                        )
432                        .await
433                        .is_none()
434                );
435            })
436            .await;
437    }
438
439    /// Test that the pagination logic keeps calling the API until page_size items are available
440    #[tokio::test]
441    async fn test_paginated_cursor_fills_page_size_with_multiple_calls() {
442        crate::test::identify_parking_lot_deadlocks();
443        tokio::task::LocalSet::new()
444            .run_until(async move {
445                let (client, _guard, _owner) = prep_vari!(true);
446
447                let call_count = Arc::new(AtomicUsize::new(0));
448                let call_count_clone = call_count.clone();
449
450                // API that returns fewer items than requested
451                let scope = QueryScope::new_paginated_with_cursor(
452                    move |_key: (), _page_size, continuation| {
453                        let call_count = call_count_clone.clone();
454                        async move {
455                            call_count.fetch_add(1, Ordering::Relaxed);
456
457                            // Return only 3 items per call, even if more are requested
458                            match continuation {
459                                None => {
460                                    // First call
461                                    (vec![0, 1, 2], Some(3))
462                                }
463                                Some(3) => {
464                                    // Second call
465                                    (vec![3, 4, 5], Some(6))
466                                }
467                                Some(6) => {
468                                    // Third call
469                                    (vec![6, 7, 8], Some(9))
470                                }
471                                Some(9) => {
472                                    // Fourth call
473                                    (vec![9, 10], None) // Only 2 items left
474                                }
475                                _ => (vec![], None),
476                            }
477                        }
478                    },
479                );
480
481                // Request page with size 10 - should make 4 API calls to get 10 items
482                let (items, has_more) = client
483                    .fetch_query(
484                        scope.clone(),
485                        PaginatedPageKey {
486                            key: (),
487                            page_index: 0,
488                            page_size: 10,
489                        },
490                    )
491                    .await
492                    .expect("Page should exist");
493
494                assert_eq!(items.len(), 10, "Should have filled to page_size");
495                assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
496                assert!(has_more, "Should indicate more pages available");
497                assert_eq!(
498                    call_count.load(Ordering::Relaxed),
499                    4,
500                    "Should have made 4 API calls to fill page"
501                );
502
503                // Request second page - should only need 1 more API call
504                let (items, has_more) = client
505                    .fetch_query(
506                        scope.clone(),
507                        PaginatedPageKey {
508                            key: (),
509                            page_index: 1,
510                            page_size: 10,
511                        },
512                    )
513                    .await
514                    .expect("Page should exist");
515
516                assert_eq!(items.len(), 1, "Only 1 item left");
517                assert_eq!(items, vec![10]);
518                assert!(!has_more, "No more pages");
519                assert_eq!(
520                    call_count.load(Ordering::Relaxed),
521                    4,
522                    "Should not make additional calls - data already cached"
523                );
524            })
525            .await;
526
527        let (_call_count, my_api_fn) = get_simple_api_fn(30);
528
529        // Create the scope
530        let scope = QueryScope::new_paginated_with_cursor(move |_query_key, page_size, offset| {
531            let my_api_fn = my_api_fn.clone();
532            async move {
533                let (items, maybe_next_offset) = my_api_fn(page_size, offset);
534                (items, maybe_next_offset)
535            }
536        });
537
538        let client = QueryClient::new();
539
540        // Fetch first page
541        let (first_page, more_pages) = client
542            .fetch_query(
543                scope.clone(),
544                PaginatedPageKey {
545                    key: (),
546                    page_index: 0,
547                    page_size: 20,
548                },
549            )
550            .await
551            .expect("First page should exist");
552
553        assert_eq!(first_page, (0..20).collect::<Vec<_>>());
554        assert!(more_pages);
555
556        // Fetch second page
557        let (second_page, more_pages) = client
558            .fetch_query(
559                scope.clone(),
560                PaginatedPageKey {
561                    key: (),
562                    page_index: 1,
563                    page_size: 20,
564                },
565            )
566            .await
567            .expect("Second page should exist");
568
569        assert_eq!(second_page, (20..30).collect::<Vec<_>>());
570        assert!(!more_pages);
571
572        // Requesting beyond available data returns None
573        assert!(
574            client
575                .fetch_query(
576                    scope.clone(),
577                    PaginatedPageKey {
578                        key: (),
579                        page_index: 2,
580                        page_size: 20,
581                    },
582                )
583                .await
584                .is_none()
585        );
586    }
587
588    /// Test that API calls are cached properly and not repeated unnecessarily
589    #[tokio::test]
590    async fn test_paginated_cursor_no_unnecessary_api_calls() {
591        crate::test::identify_parking_lot_deadlocks();
592        tokio::task::LocalSet::new()
593            .run_until(async move {
594                let (client, _guard, _owner) = prep_vari!(true);
595
596                let call_count = Arc::new(AtomicUsize::new(0));
597                let call_count_clone = call_count.clone();
598
599                let scope = QueryScope::new_paginated_with_cursor(
600                    move |_key: (), page_size, continuation| {
601                        let call_count = call_count_clone.clone();
602                        async move {
603                            call_count.fetch_add(1, Ordering::Relaxed);
604
605                            let offset = continuation.unwrap_or(0);
606                            let items: Vec<usize> = (offset..offset + page_size).collect();
607                            let next = if offset + page_size < 100 {
608                                Some(offset + page_size)
609                            } else {
610                                None
611                            };
612                            (items, next)
613                        }
614                    },
615                );
616
617                // First fetch
618                let _ = client
619                    .fetch_query(
620                        scope.clone(),
621                        PaginatedPageKey {
622                            key: (),
623                            page_index: 0,
624                            page_size: 10,
625                        },
626                    )
627                    .await;
628                assert_eq!(
629                    call_count.load(Ordering::Relaxed),
630                    1,
631                    "First fetch should make 1 API call"
632                );
633
634                // Fetch same page again - should use cache
635                let _ = client
636                    .fetch_query(
637                        scope.clone(),
638                        PaginatedPageKey {
639                            key: (),
640                            page_index: 0,
641                            page_size: 10,
642                        },
643                    )
644                    .await;
645                assert_eq!(
646                    call_count.load(Ordering::Relaxed),
647                    1,
648                    "Should still be 1 call - cached"
649                );
650
651                // Fetch next page
652                let _ = client
653                    .fetch_query(
654                        scope.clone(),
655                        PaginatedPageKey {
656                            key: (),
657                            page_index: 1,
658                            page_size: 10,
659                        },
660                    )
661                    .await;
662                assert_eq!(
663                    call_count.load(Ordering::Relaxed),
664                    2,
665                    "Second page needs new API call"
666                );
667
668                // Fetch first page again - should still be cached
669                let _ = client
670                    .fetch_query(
671                        scope.clone(),
672                        PaginatedPageKey {
673                            key: (),
674                            page_index: 0,
675                            page_size: 10,
676                        },
677                    )
678                    .await;
679                assert_eq!(
680                    call_count.load(Ordering::Relaxed),
681                    2,
682                    "Should still be 2 calls - first page cached"
683                );
684            })
685            .await;
686    }
687
688    /// Test linked invalidation and clear between pages with same key
689    #[rstest]
690    #[tokio::test]
691    async fn test_paginated_cursor_linked_invalidation_and_clear(
692        #[values(true, false)] clear: bool,
693    ) {
694        crate::test::identify_parking_lot_deadlocks();
695        tokio::task::LocalSet::new()
696            .run_until(async move {
697                let (client, _guard, _owner) = prep_vari!(true);
698
699                let version = Arc::new(AtomicUsize::new(0));
700                let version_clone = version.clone();
701
702                let scope = QueryScope::new_paginated_with_cursor(
703                    move |key: String, page_size, continuation| {
704                        let v = version_clone.clone();
705                        async move {
706                            let current_version = v.load(Ordering::Relaxed);
707                            let offset = continuation.unwrap_or(0);
708
709                            // Return different data based on version
710                            let items: Vec<String> = (offset..offset + page_size)
711                                .map(|i| format!("{}_v{}_{}", key, current_version, i))
712                                .collect();
713
714                            let next = if offset + page_size < 30 {
715                                Some(offset + page_size)
716                            } else {
717                                None
718                            };
719                            (items, next)
720                        }
721                    },
722                );
723
724                // Fetch first page
725                let (items1, _) = client
726                    .fetch_query(
727                        scope.clone(),
728                        PaginatedPageKey {
729                            key: "test".to_string(),
730                            page_index: 0,
731                            page_size: 10,
732                        },
733                    )
734                    .await
735                    .expect("Page should exist");
736
737                assert_eq!(items1[0], "test_v0_0");
738
739                // Fetch second page
740                let (items2, _) = client
741                    .fetch_query(
742                        scope.clone(),
743                        PaginatedPageKey {
744                            key: "test".to_string(),
745                            page_index: 1,
746                            page_size: 10,
747                        },
748                    )
749                    .await
750                    .expect("Page should exist");
751
752                assert_eq!(items2[0], "test_v0_10");
753
754                // Increment version to simulate data change
755                version.store(1, Ordering::Relaxed);
756
757                // Fetch first page again - should still be old data
758                let (items1_new, _) = client
759                    .fetch_query(
760                        scope.clone(),
761                        PaginatedPageKey {
762                            key: "test".to_string(),
763                            page_index: 0,
764                            page_size: 10,
765                        },
766                    )
767                    .await
768                    .expect("Page should exist");
769
770                assert_eq!(
771                    items1_new[0], "test_v0_0",
772                    "Should have new version after invalidation/clear"
773                );
774
775                // Invalidation/clear should lead to new data:
776                if clear {
777                    // Currently not public, but still want to check as used internally for some things:
778                    client.untyped_client.clear_query_scope(scope.clone());
779                } else {
780                    client.invalidate_query_scope(scope.clone());
781                }
782
783                // Fetch first page again - should get new data
784                let (items1_new, _) = client
785                    .fetch_query(
786                        scope.clone(),
787                        PaginatedPageKey {
788                            key: "test".to_string(),
789                            page_index: 0,
790                            page_size: 10,
791                        },
792                    )
793                    .await
794                    .expect("Page should exist");
795
796                assert_eq!(
797                    items1_new[0], "test_v1_0",
798                    "Should have new version after invalidation/clear"
799                );
800
801                // Second page should also be invalidated/cleared
802                let (items2_new, _) = client
803                    .fetch_query(
804                        scope.clone(),
805                        PaginatedPageKey {
806                            key: "test".to_string(),
807                            page_index: 1,
808                            page_size: 10,
809                        },
810                    )
811                    .await
812                    .expect("Page should exist");
813
814                assert_eq!(
815                    items2_new[0], "test_v1_10",
816                    "Second page should also have new version"
817                );
818            })
819            .await;
820    }
821
822    /// Test that empty responses are handled correctly
823    #[tokio::test]
824    async fn test_paginated_cursor_empty_response_handling() {
825        crate::test::identify_parking_lot_deadlocks();
826        tokio::task::LocalSet::new()
827            .run_until(async move {
828                let (client, _guard, _owner) = prep_vari!(true);
829
830                let scope = QueryScope::new_paginated_with_cursor(
831                    |_key: (), _page_size, continuation| async move {
832                        match continuation {
833                            None => (vec![1, 2, 3], Some(3)),
834                            Some(3) => (vec![], Some(6)), // Empty response but with cursor
835                            _ => (vec![], None),
836                        }
837                    },
838                );
839
840                let (items, has_more) = client
841                    .fetch_query(
842                        scope.clone(),
843                        PaginatedPageKey {
844                            key: (),
845                            page_index: 0,
846                            page_size: 10,
847                        },
848                    )
849                    .await
850                    .expect("Page should exist");
851
852                // Should only have 3 items since second call returned empty
853                assert_eq!(items.len(), 3);
854                assert!(
855                    !has_more,
856                    "Should not have more pages when empty response received"
857                );
858            })
859            .await;
860    }
861
862    /// Test concurrent page fetches don't cause duplicate API calls
863    #[tokio::test]
864    async fn test_paginated_cursor_concurrent_fetches() {
865        crate::test::identify_parking_lot_deadlocks();
866        tokio::task::LocalSet::new()
867            .run_until(async move {
868                let (client, _guard, _owner) = prep_vari!(true);
869
870                let call_count = Arc::new(AtomicUsize::new(0));
871                let call_count_clone = call_count.clone();
872
873                let scope = QueryScope::new_paginated_with_cursor(
874                    move |_key: (), page_size, continuation| {
875                        let call_count = call_count_clone.clone();
876                        async move {
877                            call_count.fetch_add(1, Ordering::Relaxed);
878
879                            // Add a small delay to simulate network latency
880                            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
881
882                            let offset = continuation.unwrap_or(0);
883                            let items: Vec<usize> = (offset..offset + page_size).collect();
884                            let next = if offset + page_size < 100 {
885                                Some(offset + page_size)
886                            } else {
887                                None
888                            };
889                            (items, next)
890                        }
891                    },
892                );
893
894                // Launch multiple concurrent fetches for the same page
895                let futures = (0..5).map(|_| {
896                    client.fetch_query(
897                        scope.clone(),
898                        PaginatedPageKey {
899                            key: (),
900                            page_index: 0,
901                            page_size: 10,
902                        },
903                    )
904                });
905
906                let results = futures::future::join_all(futures).await;
907
908                // All should succeed
909                for result in results {
910                    assert!(result.is_some());
911                }
912
913                // Should only make 1 API call despite 5 concurrent requests
914                assert_eq!(
915                    call_count.load(Ordering::Relaxed),
916                    1,
917                    "Concurrent fetches should share single API call"
918                );
919            })
920            .await;
921    }
922
923    /// Test different page sizes work correctly with shared cache
924    #[tokio::test]
925    async fn test_paginated_cursor_different_page_sizes() {
926        crate::test::identify_parking_lot_deadlocks();
927        tokio::task::LocalSet::new()
928            .run_until(async move {
929                let (client, _guard, _owner) = prep_vari!(true);
930
931                let call_count = Arc::new(AtomicUsize::new(0));
932                let call_count_clone = call_count.clone();
933
934                let scope = QueryScope::new_paginated_with_cursor(
935                    move |_key: (), page_size, continuation| {
936                        let call_count = call_count_clone.clone();
937                        async move {
938                            call_count.fetch_add(1, Ordering::Relaxed);
939
940                            let offset = continuation.unwrap_or(0);
941                            let items: Vec<usize> =
942                                (offset..std::cmp::min(offset + page_size, 50)).collect();
943                            let next = if offset + page_size < 50 {
944                                Some(offset + page_size)
945                            } else {
946                                None
947                            };
948                            (items, next)
949                        }
950                    },
951                );
952
953                // Fetch with page size 5
954                let (items1, _) = client
955                    .fetch_query(
956                        scope.clone(),
957                        PaginatedPageKey {
958                            key: (),
959                            page_index: 0,
960                            page_size: 5,
961                        },
962                    )
963                    .await
964                    .expect("Page should exist");
965                assert_eq!(items1.len(), 5);
966                assert_eq!(call_count.load(Ordering::Relaxed), 1);
967
968                // Fetch with page size 15 - should reuse cached data and fetch more
969                let (items2, _) = client
970                    .fetch_query(
971                        scope.clone(),
972                        PaginatedPageKey {
973                            key: (),
974                            page_index: 0,
975                            page_size: 15,
976                        },
977                    )
978                    .await
979                    .expect("Page should exist");
980                assert_eq!(items2.len(), 15);
981                assert_eq!(
982                    call_count.load(Ordering::Relaxed),
983                    2,
984                    "Should fetch more data for larger page"
985                );
986
987                // Fetch with page size 10 - should use cached data
988                let (items3, _) = client
989                    .fetch_query(
990                        scope.clone(),
991                        PaginatedPageKey {
992                            key: (),
993                            page_index: 0,
994                            page_size: 10,
995                        },
996                    )
997                    .await
998                    .expect("Page should exist");
999                assert_eq!(items3.len(), 10);
1000                assert_eq!(
1001                    call_count.load(Ordering::Relaxed),
1002                    2,
1003                    "Should use cached data, no new fetch"
1004                );
1005            })
1006            .await;
1007    }
1008
1009    /// Test that backing cache is properly managed when paginated pages expire
1010    /// Tests both GC (garbage collection) and stale time scenarios
1011    #[rstest]
1012    #[case::gc_time(TestMode::GcTime)]
1013    #[case::stale_time(TestMode::StaleTime)]
1014    #[tokio::test]
1015    async fn test_paginated_cursor_backing_cache_lifecycle(#[case] mode: TestMode) {
1016        crate::test::identify_parking_lot_deadlocks();
1017        tokio::task::LocalSet::new()
1018            .run_until(async move {
1019                let (client, _guard, _owner) = prep_vari!(true);
1020
1021                let call_count = Arc::new(AtomicUsize::new(0));
1022                let call_count_clone = call_count.clone();
1023
1024                let scope = QueryScope::new_paginated_with_cursor(
1025                    move |key: String, page_size, continuation| {
1026                        let call_count = call_count_clone.clone();
1027                        async move {
1028                            call_count.fetch_add(1, Ordering::Relaxed);
1029
1030                            let offset = continuation.unwrap_or(0);
1031                            let items: Vec<String> = (offset..offset + page_size)
1032                                .map(|i| format!("{}_{}", key, i))
1033                                .collect();
1034                            let next = if offset + page_size < 30 {
1035                                Some(offset + page_size)
1036                            } else {
1037                                None
1038                            };
1039                            (items, next)
1040                        }
1041                    },
1042                )
1043                .with_options(match mode {
1044                    TestMode::GcTime => crate::QueryOptions::default()
1045                        .with_gc_time(std::time::Duration::from_millis(100)),
1046                    TestMode::StaleTime => crate::QueryOptions::default()
1047                        .with_stale_time(std::time::Duration::from_millis(100)),
1048                });
1049
1050                // Fetch first page
1051                let (items, _) = client
1052                    .fetch_query(
1053                        scope.clone(),
1054                        PaginatedPageKey {
1055                            key: "test".to_string(),
1056                            page_index: 0,
1057                            page_size: 10,
1058                        },
1059                    )
1060                    .await
1061                    .expect("Page should exist");
1062                assert_eq!(items[0], "test_0");
1063                assert_eq!(call_count.load(Ordering::Relaxed), 1);
1064
1065                // Wait for 50ms, 50ms left till gc/stale:
1066                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1067
1068                // Fetch second page
1069                let (items, _) = client
1070                    .fetch_query(
1071                        scope.clone(),
1072                        PaginatedPageKey {
1073                            key: "test".to_string(),
1074                            page_index: 1,
1075                            page_size: 10,
1076                        },
1077                    )
1078                    .await
1079                    .expect("Page should exist");
1080                assert_eq!(items[0], "test_10");
1081                assert_eq!(call_count.load(Ordering::Relaxed), 2);
1082
1083                // Wait another 50ms, so the first query is stale/gc'd, but the second is valid even when gc'd because 50ms left:
1084                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1085
1086                // Fetch first page again
1087                let (items, _) = client
1088                    .fetch_query(
1089                        scope.clone(),
1090                        PaginatedPageKey {
1091                            key: "test".to_string(),
1092                            page_index: 0,
1093                            page_size: 10,
1094                        },
1095                    )
1096                    .await
1097                    .expect("Page should exist");
1098                assert_eq!(items[0], "test_0");
1099
1100                // Fetch second page again
1101                let (items, _) = client
1102                    .fetch_query(
1103                        scope.clone(),
1104                        PaginatedPageKey {
1105                            key: "test".to_string(),
1106                            page_index: 1,
1107                            page_size: 10,
1108                        },
1109                    )
1110                    .await
1111                    .expect("Page should exist");
1112                assert_eq!(items[0], "test_10");
1113
1114                let expected_calls = match mode {
1115                    TestMode::GcTime => {
1116                        // GC cleared page 0, but page 1 kept backing cache alive
1117                        // So page 0 refetch reuses backing cache, plus page 1 still alive so also reused
1118                        2
1119                    }
1120                    TestMode::StaleTime => {
1121                        // Stale page triggers invalidation which clears backing cache
1122                        // So page 0 refetch needs new API call, but page 1 is still valid so reused
1123                        3
1124                    }
1125                };
1126                assert_eq!(call_count.load(Ordering::Relaxed), expected_calls);
1127
1128                // If gc, should clear the backing cache only when all pages are gc'd:
1129                if matches!(mode, TestMode::GcTime) {
1130                    tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1131
1132                    // Now fetch again - backing cache should be gone
1133                    let (items, _) = client
1134                        .fetch_query(
1135                            scope.clone(),
1136                            PaginatedPageKey {
1137                                key: "test".to_string(),
1138                                page_index: 0,
1139                                page_size: 10,
1140                            },
1141                        )
1142                        .await
1143                        .expect("Page should exist");
1144                    assert_eq!(items[0], "test_0");
1145                    assert_eq!(
1146                        call_count.load(Ordering::Relaxed),
1147                        3,
1148                        "Backing cache should be cleared when all pages are GC'd"
1149                    );
1150                }
1151            })
1152            .await;
1153    }
1154
1155    #[derive(Debug, Clone, Copy)]
1156    enum TestMode {
1157        /// Test GC behavior: backing cache should only be cleared when all pages are GC'd
1158        GcTime,
1159        /// Test stale time behavior: backing cache should be invalidated when any page goes stale
1160        StaleTime,
1161    }
1162}