leptos_fetch/pagination/
paginated_offset.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3use std::{hash::Hash, sync::Arc};
4
5use leptos::prelude::*;
6
7use crate::{
8    PaginatedPageKey, QueryOptions, QueryScope, QueryScopeLocal, UntypedQueryClient,
9    cache::OnScopeMissing, debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
10    query_scope::ScopeCacheKey,
11};
12
13macro_rules! define {
14    ([$($impl_fut_generics:tt)*], [$($impl_fn_generics:tt)*], $name:ident, $fetch_fn:ident, $sname:literal) => {
15
16        impl<Key, PageItem> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>
17        where
18            Key: DebugIfDevtoolsEnabled + Clone + Hash + PartialEq + 'static $($impl_fn_generics)*,
19            PageItem: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
20        {
21            /// Create an offset-based paginated query scope.
22            ///
23            /// Preferred when your API supports numeric offsets. Enables jumping to any page without
24            /// loading intermediate pages and can return total item count for page calculations.
25            ///
26            /// # Arguments
27            ///
28            /// The getter receives:
29            /// - `query_key: Key` - Your custom key, same across all pages
30            /// - `nb_items_requested: usize` - Target number of items to return
31            /// - `offset: u64` - Starting position/offset for this fetch
32            ///
33            /// The getter must return:
34            /// - `Vec<Item>` - Items for this page
35            /// - `Option<u64>` - Total item count if known (enables page count calculations)
36            ///
37            /// # Example
38            ///
39            /// ```rust,ignore
40            /// use leptos_fetch::{QueryScope, PaginatedPageKey};
41            ///
42            /// // Create paginated scope with offset-based API
43            /// let scope = QueryScope::new_paginated_with_offset(
44            ///     |_key: (), nb_items, offset| async move {
45            ///         // Call your API with offset and limit
46            ///         let (items, total) = fetch_from_api(offset, nb_items).await;
47            ///         (items, Some(total))
48            ///     }
49            /// );
50            ///
51            /// // Fetch page 0
52            /// let (items, total) = client.fetch_query(scope, PaginatedPageKey {
53            ///     key: (),
54            ///     page_index: 0,
55            ///     page_size: 20,
56            /// }).await.expect("Page exists");
57            ///
58            /// // Jump directly to page 10 (no need to load pages 1-9)
59            /// let (items, total) = client.fetch_query(scope, PaginatedPageKey {
60            ///     key: (),
61            ///     page_index: 10,
62            ///     page_size: 20,
63            /// }).await.expect("Page exists");
64            ///
65            /// // Calculate total pages from returned total count
66            /// if let Some(total_items) = total {
67            ///     let total_pages = (total_items as f64 / 20.0).ceil() as u64;
68            /// }
69            /// ```
70            pub fn new_paginated_with_offset<Fut>(
71                getter: impl Fn(Key, usize, u64) -> Fut + 'static $($impl_fn_generics)*,
72            ) -> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>
73            where
74                Fut: Future<Output = (Vec<PageItem>, Option<u64>)> $($impl_fut_generics)*,
75            {
76                let getter = Arc::new(getter);
77                let backing_cache_scope = $name::new({
78                    let getter = getter.clone();
79                    move |key: KeyWithUnhashedItemCountAndOffsetRequested<Key>| {
80                        let getter = getter.clone();
81                        async move {
82                            let (items, mut maybe_total_items) = getter(key.key, key.item_count_requested, key.offset_requested).await;
83
84                            // Protect against incorrect item count when clearly no items left:
85                            if items.is_empty() {
86                                if let Some(cur_total_items) = &mut maybe_total_items {
87                                    if *cur_total_items > key.offset_requested {
88                                        *cur_total_items = key.offset_requested;
89                                    }
90                                } else {
91                                    maybe_total_items = Some(key.offset_requested);
92                                }
93                            }
94
95                            BackingCache {
96                                inner: Arc::new(BackingCacheInner {
97                                    items: SparseItems {
98                                        items: parking_lot::Mutex::new(
99                                            items
100                                                .into_iter()
101                                                .enumerate()
102                                                .map(|(idx, item)| (key.offset_requested + (idx as u64), item))
103                                                .collect(),
104                                        ),
105                                    },
106                                    maybe_total_items: parking_lot::Mutex::new(maybe_total_items),
107                                    update_lock: futures::lock::Mutex::new(()),
108                                }),
109                            }
110                        }
111                    }
112                })
113                // Shouldn't itself expire, the paginated query will control that:
114                .with_options(
115                    QueryOptions::default()
116                        .with_stale_time(Duration::MAX)
117                        .with_gc_time(Duration::MAX)
118                        .with_refetch_interval(Duration::MAX)
119                );
120
121                $name::new({
122                    let backing_cache_scope = backing_cache_scope.clone();
123                    move |page_key: PaginatedPageKey<Key>| {
124                        let backing_cache_scope = backing_cache_scope.clone();
125                        let getter = getter.clone();
126                        async move {
127                            let untyped_client = use_context::<UntypedQueryClient>()
128                                .expect(
129                                    "leptos-fetch bug, UntypedQueryClient should always have been \
130                                    provided to the query context internally"
131                            );
132                            let scope_cache_key = use_context::<ScopeCacheKey>()
133                                .expect(
134                                    "leptos-fetch bug, ScopeCacheKey itself should always have been \
135                                    provided to the query context internally"
136                                );
137
138                            // If this query is reloading because it was stale, should invalidate the backing cache before reading it,
139                            // otherwise will still get back the same stale data again:
140                            if let Some(metadata) = untyped_client.query_metadata::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>(
141                                scope_cache_key,
142                                &page_key,
143                            )
144                            && metadata.stale_or_invalidated
145                            && let Some(backing_metadata) = untyped_client.query_metadata::<KeyWithUnhashedItemCountAndOffsetRequested<Key>, BackingCache<PageItem>>(
146                                backing_cache_scope.cache_key,
147                                &KeyWithUnhashedItemCountAndOffsetRequested {
148                                    key: page_key.key.clone(),
149                                    item_count_requested: 0, // Doesn't matter, it's not part of the hash
150                                    offset_requested: 0, // Doesn't matter, it's not part of the hash
151                                },
152                            )
153                            && backing_metadata.updated_at <= metadata.updated_at
154                            {
155                                untyped_client.invalidate_query(
156                                    &backing_cache_scope,
157                                    KeyWithUnhashedItemCountAndOffsetRequested {
158                                        key: page_key.key.clone(),
159                                        item_count_requested: 0, // Doesn't matter, it's not part of the hash
160                                        offset_requested: 0, // Doesn't matter, it's not part of the hash
161                                    },
162                                );
163                            }
164
165                            let infinite_cache = untyped_client
166                                .$fetch_fn(
167                                    backing_cache_scope,
168                                    KeyWithUnhashedItemCountAndOffsetRequested {
169                                        key: page_key.key.clone(),
170                                        item_count_requested: page_key.page_size,
171                                        offset_requested: (page_key.page_index as u64) * (page_key.page_size as u64)
172                                    },
173                                )
174                                .await;
175
176                            let target_idx_start = (page_key.page_index as u64) * (page_key.page_size as u64);
177                            let mut target_idx_end_exclusive = ((page_key.page_index as u64) + 1) * (page_key.page_size as u64);
178                            if let Some(maybe_total_items) = *infinite_cache.inner.maybe_total_items.lock() {
179                                // If page starts beyond available data, return None
180                                if target_idx_start >= maybe_total_items {
181                                    return None;
182                                }
183                                if target_idx_end_exclusive > maybe_total_items {
184                                    target_idx_end_exclusive = maybe_total_items;
185                                }
186                            }
187
188                            // Load x more if needed:
189                            if !infinite_cache.inner.items.check_range(target_idx_start, target_idx_end_exclusive) {
190                                // Preventing multiple simultaneous fetches by holding an async lock,
191                                // but note making sure to not hold any sync locks across await boundaries.
192                                let mut _guard = infinite_cache.inner.update_lock.lock().await;
193                                loop {
194                                    // Find the next offset we need to fetch
195                                    let next_offset = {
196                                        let items_lock = infinite_cache.inner.items.items.lock();
197                                        // Find first missing offset in our target range
198                                        let mut offset = target_idx_start;
199                                        while offset < target_idx_end_exclusive {
200                                            if !items_lock.contains_key(&offset) {
201                                                break;
202                                            }
203                                            offset += 1;
204                                        }
205                                        offset
206                                    };
207
208                                    if next_offset >= target_idx_end_exclusive {
209                                        // All items in range are present
210                                        break;
211                                    }
212
213                                    let items_needed = (target_idx_end_exclusive - next_offset) as usize;
214                                    let (items, maybe_total_items) =
215                                        getter(page_key.key.clone(), items_needed, next_offset).await;
216                                    if !items.is_empty() {
217                                        infinite_cache.inner.items.extend(next_offset, items);
218                                        *infinite_cache.inner.maybe_total_items.lock() = maybe_total_items;
219                                    } else {
220                                        // Protect against incorrect item count when clearly no items left:
221                                        let mut guard = infinite_cache.inner.maybe_total_items.lock();
222                                        if let Some(cur_total_items) = &mut* guard {
223                                            if *cur_total_items > next_offset {
224                                                *cur_total_items = next_offset;
225                                            }
226                                        } else {
227                                            *guard = Some(next_offset);
228                                        }
229                                        // Can't get more items, break out
230                                        break;
231                                    }
232                                }
233                                drop(_guard);
234                            }
235
236                            infinite_cache.inner.items.get_range(target_idx_start, (target_idx_end_exclusive - target_idx_start) as usize).map(|items| {
237                                (items, infinite_cache.inner.maybe_total_items.lock().clone())
238                            })
239                        }
240                    }
241                })
242                // An invalidation or clear of any page should invalidate the backing cache:
243                .on_invalidation({
244                    let backing_cache_scope = backing_cache_scope.clone();
245                    move |key| {
246                        let untyped_client = use_context::<UntypedQueryClient>()
247                            .expect(
248                                "leptos-fetch bug, UntypedQueryClient should always have been \
249                                provided to the on_invalidation context internally"
250                            );
251                        untyped_client.invalidate_query(
252                            &backing_cache_scope,
253                            KeyWithUnhashedItemCountAndOffsetRequested {
254                                key: key.key.clone(),
255                                item_count_requested: 0, // Doesn't matter, it's not part of the hash
256                                offset_requested: 0, // Doesn't matter, it's not part of the hash
257                            },
258                        );
259                    }
260                })
261                // If this was the last page existing now being gc'd, clear the backing cache too:
262                .on_gc(move |key| {
263                    let untyped_client = use_context::<UntypedQueryClient>()
264                        .expect(
265                            "leptos-fetch bug, UntypedQueryClient should always have been \
266                            provided to the on_gc context internally"
267                        );
268                    let scope_cache_key = use_context::<ScopeCacheKey>()
269                        .expect(
270                            "leptos-fetch bug, ScopeCacheKey itself should always have been \
271                            provided to the on_gc context internally"
272                        );
273                    let mut found_nb = 0;
274                    untyped_client
275                        .scope_lookup
276                        .with_cached_scope_mut::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>, _, _>(
277                            &mut untyped_client.scope_lookup.scopes_mut(),
278                            scope_cache_key,
279                            OnScopeMissing::Skip,
280                            |_| {},
281                            |maybe_scope, _| {
282                                if let Some(scope) = maybe_scope {
283                                    for query_or_pending in scope.all_queries_mut_include_pending() {
284                                        if query_or_pending.key().value_if_safe().map(|test_key| test_key.key == key.key).unwrap_or(false) {
285                                            found_nb += 1;
286                                        }
287                                    }
288                                }
289                            },
290                        );
291                    if found_nb == 0 {
292                        untyped_client.clear_query(
293                            &backing_cache_scope,
294                            KeyWithUnhashedItemCountAndOffsetRequested {
295                                key: key.key.clone(),
296                                item_count_requested: 0, // Doesn't matter, it's not part of the hash
297                                offset_requested: 0, // Doesn't matter, it's not part of the hash
298                            },
299                        );
300                    }
301                })
302            }
303        }
304    };
305}
306
307define! { [+ Send], [+ Send + Sync], QueryScope, fetch_query, "QueryScope" }
308define! { [], [], QueryScopeLocal, fetch_query_local, "QueryScopeLocal" }
309
310#[derive(Debug, Clone)]
311struct KeyWithUnhashedItemCountAndOffsetRequested<Key> {
312    key: Key,
313    item_count_requested: usize,
314    offset_requested: u64,
315}
316
317impl<Key: Hash> Hash for KeyWithUnhashedItemCountAndOffsetRequested<Key> {
318    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
319        self.key.hash(state);
320    }
321}
322
323#[derive(Debug, Clone)]
324struct BackingCache<Item> {
325    inner: Arc<BackingCacheInner<Item>>,
326}
327
328#[derive(Debug)]
329struct BackingCacheInner<Item> {
330    items: SparseItems<Item>,
331    maybe_total_items: parking_lot::Mutex<Option<u64>>,
332    update_lock: futures::lock::Mutex<()>,
333}
334
335#[derive(Debug)]
336struct SparseItems<Item> {
337    items: parking_lot::Mutex<BTreeMap<u64, Item>>,
338}
339
340impl<Item> SparseItems<Item> {
341    fn extend(&self, offset: u64, new_items: Vec<Item>) {
342        let mut items = self.items.lock();
343        for (idx, item) in new_items.into_iter().enumerate() {
344            items.insert(offset + (idx as u64), item);
345        }
346    }
347
348    /// Return the range requested, or None if any items are missing.
349    fn get_range(&self, start_offset: u64, count: usize) -> Option<Vec<Item>>
350    where
351        Item: Clone,
352    {
353        let items = self.items.lock();
354
355        // Check if we have all items in the range
356        let mut result = Vec::with_capacity(count);
357        for offset in start_offset..(start_offset + (count as u64)) {
358            match items.get(&offset) {
359                Some(item) => result.push(item),
360                None => return None, // Missing item, return None
361            }
362        }
363
364        Some(result.into_iter().cloned().collect())
365    }
366
367    fn check_range(&self, start_offset: u64, end_index_exclusive: u64) -> bool
368    where
369        Item: Clone,
370    {
371        let items = self.items.lock();
372        for offset in start_offset..end_index_exclusive {
373            if !items.contains_key(&offset) {
374                return false;
375            }
376        }
377        true
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use any_spawner::Executor;
384    use hydration_context::SsrSharedContext;
385    use leptos::prelude::*;
386    use rstest::*;
387    use std::sync::Arc;
388    use std::sync::atomic::{AtomicUsize, Ordering};
389
390    use crate::test::prep_vari;
391    use crate::{PaginatedPageKey, QueryClient, QueryScope};
392
393    /// We don't know the serialization mechanism the user is using, so cannot return wrapping types from the query function.
394    /// Hence returning (Vec<Item>, Option<total_items>) instead of a custom wrapping Page<Item> type.
395    /// This test is just checking compilation.
396    #[tokio::test]
397    async fn test_paginated_serialization_works() {
398        crate::test::identify_parking_lot_deadlocks();
399        tokio::task::LocalSet::new()
400            .run_until(async move {
401                let (client, _guard, _owner) = prep_vari!(true);
402                let scope = QueryScope::new_paginated_with_offset(|_: (), _, _| async {
403                    (vec![()], Some(10))
404                });
405                client.resource(scope, || PaginatedPageKey {
406                    key: (),
407                    page_index: 0,
408                    page_size: 10,
409                });
410            })
411            .await;
412    }
413
414    fn get_simple_api_fn(
415        num_rows: usize,
416    ) -> (
417        Arc<AtomicUsize>,
418        impl Fn(usize, u64) -> (Vec<usize>, Option<u64>) + Clone + 'static,
419    ) {
420        let call_count = Arc::new(AtomicUsize::new(0));
421
422        let api_fn = {
423            let call_count = call_count.clone();
424            move |target_return_count: usize, offset: u64| {
425                let call_count = call_count.clone();
426                call_count.fetch_add(1, Ordering::Relaxed);
427
428                let offset_usize = offset as usize;
429                let items = (0..num_rows)
430                    .skip(offset_usize)
431                    .take(target_return_count)
432                    .collect::<Vec<_>>();
433                let total_items = num_rows as u64;
434                (items, Some(total_items))
435            }
436        };
437
438        (call_count, api_fn)
439    }
440
441    #[tokio::test]
442    async fn test_paginated_offset() {
443        crate::test::identify_parking_lot_deadlocks();
444        tokio::task::LocalSet::new()
445            .run_until(async move {
446                let (client, _guard, _owner) = prep_vari!(true);
447
448                let (_call_count, my_api_fn) = get_simple_api_fn(30);
449
450                let scope =
451                    QueryScope::new_paginated_with_offset(move |_query_key, page_size, offset| {
452                        let my_api_fn = my_api_fn.clone();
453                        async move {
454                            let (items, total_items) = my_api_fn(page_size, offset);
455                            (items, total_items)
456                        }
457                    });
458
459                let (first_page_logs, maybe_total) = client
460                    .fetch_query(
461                        scope.clone(),
462                        PaginatedPageKey {
463                            key: (),
464                            page_index: 0,
465                            page_size: 20,
466                        },
467                    )
468                    .await
469                    .expect(
470                        "This page should exist (Some()), \
471                        None when a page is requested beyond the end of the data.",
472                    );
473
474                assert_eq!(first_page_logs, (0..20).collect::<Vec<_>>());
475                assert_eq!(maybe_total, Some(30));
476
477                let (second_page_logs, maybe_total) = client
478                    .fetch_query(
479                        scope.clone(),
480                        PaginatedPageKey {
481                            key: (),
482                            page_index: 1,
483                            page_size: 20,
484                        },
485                    )
486                    .await
487                    .expect(
488                        "This page should exist (Some()), \
489                        None when a page is requested beyond the end of the data.",
490                    );
491
492                assert_eq!(second_page_logs, (20..30).collect::<Vec<_>>());
493                assert_eq!(maybe_total, Some(30));
494
495                assert!(
496                    client
497                        .fetch_query(
498                            scope.clone(),
499                            PaginatedPageKey {
500                                key: (),
501                                page_index: 2,
502                                page_size: 20,
503                            },
504                        )
505                        .await
506                        .is_none()
507                );
508            })
509            .await;
510    }
511
512    /// Test that the pagination logic keeps calling the API until page_size items are available
513    #[tokio::test]
514    async fn test_paginated_offset_fills_page_size_with_multiple_calls() {
515        crate::test::identify_parking_lot_deadlocks();
516        tokio::task::LocalSet::new()
517            .run_until(async move {
518                let (client, _guard, _owner) = prep_vari!(true);
519
520                let call_count = Arc::new(AtomicUsize::new(0));
521                let call_count_clone = call_count.clone();
522
523                // API that returns fewer items than requested
524                let scope =
525                    QueryScope::new_paginated_with_offset(move |_key: (), _page_size, offset| {
526                        let call_count = call_count_clone.clone();
527                        async move {
528                            call_count.fetch_add(1, Ordering::Relaxed);
529
530                            // Return only 3 items per call, even if more are requested
531                            let items = match offset {
532                                0 => vec![0, 1, 2],
533                                3 => vec![3, 4, 5],
534                                6 => vec![6, 7, 8],
535                                9 => vec![9, 10],
536                                _ => vec![],
537                            };
538                            (items, Some(11))
539                        }
540                    });
541
542                // Request page with size 10 - should make 4 API calls to get 10 items
543                let (items, total) = client
544                    .fetch_query(
545                        scope.clone(),
546                        PaginatedPageKey {
547                            key: (),
548                            page_index: 0,
549                            page_size: 10,
550                        },
551                    )
552                    .await
553                    .expect("Page should exist");
554
555                assert_eq!(items.len(), 10, "Should have filled to page_size");
556                assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
557                assert_eq!(total, Some(11));
558                assert_eq!(
559                    call_count.load(Ordering::Relaxed),
560                    4,
561                    "Should have made 4 API calls to fill page"
562                );
563
564                // Request second page - item 10 was already fetched during first page
565                let (items, total) = client
566                    .fetch_query(
567                        scope.clone(),
568                        PaginatedPageKey {
569                            key: (),
570                            page_index: 1,
571                            page_size: 10,
572                        },
573                    )
574                    .await
575                    .expect("Page should exist");
576
577                assert_eq!(items.len(), 1, "Only 1 item left");
578                assert_eq!(items, vec![10]);
579                assert_eq!(total, Some(11));
580                assert_eq!(
581                    call_count.load(Ordering::Relaxed),
582                    4,
583                    "Should not make additional calls - item 10 was already cached"
584                );
585            })
586            .await;
587    }
588
589    /// Test that API calls are cached properly and not repeated unnecessarily
590    #[tokio::test]
591    async fn test_paginated_offset_no_unnecessary_api_calls() {
592        crate::test::identify_parking_lot_deadlocks();
593        tokio::task::LocalSet::new()
594            .run_until(async move {
595                let (client, _guard, _owner) = prep_vari!(true);
596
597                let call_count = Arc::new(AtomicUsize::new(0));
598                let call_count_clone = call_count.clone();
599
600                let scope =
601                    QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
602                        let call_count = call_count_clone.clone();
603                        async move {
604                            call_count.fetch_add(1, Ordering::Relaxed);
605
606                            let offset_usize = offset as usize;
607                            let items: Vec<usize> =
608                                (offset_usize..offset_usize + page_size).collect();
609                            (items, Some(100))
610                        }
611                    });
612
613                // First fetch
614                let _ = client
615                    .fetch_query(
616                        scope.clone(),
617                        PaginatedPageKey {
618                            key: (),
619                            page_index: 0,
620                            page_size: 10,
621                        },
622                    )
623                    .await;
624                assert_eq!(
625                    call_count.load(Ordering::Relaxed),
626                    1,
627                    "First fetch should make 1 API call"
628                );
629
630                // Fetch same page again - should use cache
631                let _ = client
632                    .fetch_query(
633                        scope.clone(),
634                        PaginatedPageKey {
635                            key: (),
636                            page_index: 0,
637                            page_size: 10,
638                        },
639                    )
640                    .await;
641                assert_eq!(
642                    call_count.load(Ordering::Relaxed),
643                    1,
644                    "Should still be 1 call - cached"
645                );
646
647                // Fetch next page
648                let _ = client
649                    .fetch_query(
650                        scope.clone(),
651                        PaginatedPageKey {
652                            key: (),
653                            page_index: 1,
654                            page_size: 10,
655                        },
656                    )
657                    .await;
658                assert_eq!(
659                    call_count.load(Ordering::Relaxed),
660                    2,
661                    "Second page needs new API call"
662                );
663
664                // Fetch first page again - should still be cached
665                let _ = client
666                    .fetch_query(
667                        scope.clone(),
668                        PaginatedPageKey {
669                            key: (),
670                            page_index: 0,
671                            page_size: 10,
672                        },
673                    )
674                    .await;
675                assert_eq!(
676                    call_count.load(Ordering::Relaxed),
677                    2,
678                    "Should still be 2 calls - first page cached"
679                );
680            })
681            .await;
682    }
683
684    /// Test linked invalidation and clear between pages with same key
685    #[rstest]
686    #[tokio::test]
687    async fn test_paginated_offset_linked_invalidation_and_clear(
688        #[values(true, false)] clear: bool,
689    ) {
690        crate::test::identify_parking_lot_deadlocks();
691        tokio::task::LocalSet::new()
692            .run_until(async move {
693                let (client, _guard, _owner) = prep_vari!(true);
694
695                let version = Arc::new(AtomicUsize::new(0));
696                let version_clone = version.clone();
697
698                let scope =
699                    QueryScope::new_paginated_with_offset(move |key: String, page_size, offset| {
700                        let v = version_clone.clone();
701                        async move {
702                            let current_version = v.load(Ordering::Relaxed);
703                            let offset_usize = offset as usize;
704
705                            // Return different data based on version
706                            let items: Vec<String> = (offset_usize..offset_usize + page_size)
707                                .map(|i| format!("{}_v{}_{}", key, current_version, i))
708                                .collect();
709
710                            (items, Some(30))
711                        }
712                    });
713
714                // Fetch first page
715                let (items1, _) = client
716                    .fetch_query(
717                        scope.clone(),
718                        PaginatedPageKey {
719                            key: "test".to_string(),
720                            page_index: 0,
721                            page_size: 10,
722                        },
723                    )
724                    .await
725                    .expect("Page should exist");
726
727                assert_eq!(items1[0], "test_v0_0");
728
729                // Fetch second page
730                let (items2, _) = client
731                    .fetch_query(
732                        scope.clone(),
733                        PaginatedPageKey {
734                            key: "test".to_string(),
735                            page_index: 1,
736                            page_size: 10,
737                        },
738                    )
739                    .await
740                    .expect("Page should exist");
741
742                assert_eq!(items2[0], "test_v0_10");
743
744                // Increment version to simulate data change
745                version.store(1, Ordering::Relaxed);
746
747                // Fetch first page again - should still be old data
748                let (items1_new, _) = client
749                    .fetch_query(
750                        scope.clone(),
751                        PaginatedPageKey {
752                            key: "test".to_string(),
753                            page_index: 0,
754                            page_size: 10,
755                        },
756                    )
757                    .await
758                    .expect("Page should exist");
759
760                assert_eq!(
761                    items1_new[0], "test_v0_0",
762                    "Should still have old version before invalidation/clear"
763                );
764
765                // Invalidation/clear should lead to new data:
766                if clear {
767                    // Currently not public, but still want to check as used internally for some things:
768                    client.untyped_client.clear_query_scope(scope.clone());
769                } else {
770                    client.invalidate_query_scope(scope.clone());
771                }
772
773                // Fetch first page again - should get new data
774                let (items1_new, _) = client
775                    .fetch_query(
776                        scope.clone(),
777                        PaginatedPageKey {
778                            key: "test".to_string(),
779                            page_index: 0,
780                            page_size: 10,
781                        },
782                    )
783                    .await
784                    .expect("Page should exist");
785
786                assert_eq!(
787                    items1_new[0], "test_v1_0",
788                    "Should have new version after invalidation/clear"
789                );
790
791                // Second page should also be invalidated/cleared
792                let (items2_new, _) = client
793                    .fetch_query(
794                        scope.clone(),
795                        PaginatedPageKey {
796                            key: "test".to_string(),
797                            page_index: 1,
798                            page_size: 10,
799                        },
800                    )
801                    .await
802                    .expect("Page should exist");
803
804                assert_eq!(
805                    items2_new[0], "test_v1_10",
806                    "Second page should also have new version"
807                );
808            })
809            .await;
810    }
811
812    /// Test that empty responses are handled correctly
813    #[tokio::test]
814    async fn test_paginated_offset_empty_response_handling() {
815        crate::test::identify_parking_lot_deadlocks();
816        tokio::task::LocalSet::new()
817            .run_until(async move {
818                let (client, _guard, _owner) = prep_vari!(true);
819
820                let scope = QueryScope::new_paginated_with_offset(
821                    |_key: (), _page_size, offset| async move {
822                        match offset {
823                            0 => (vec![1, 2, 3], Some(3)),
824                            _ => (vec![], Some(3)),
825                        }
826                    },
827                );
828
829                let (items, total) = client
830                    .fetch_query(
831                        scope.clone(),
832                        PaginatedPageKey {
833                            key: (),
834                            page_index: 0,
835                            page_size: 10,
836                        },
837                    )
838                    .await
839                    .expect("Page should exist");
840
841                // Should only have 3 items since total is 3
842                assert_eq!(items.len(), 3);
843                assert_eq!(total, Some(3));
844            })
845            .await;
846    }
847
848    /// Test concurrent page fetches don't cause duplicate API calls
849    #[tokio::test]
850    async fn test_paginated_offset_concurrent_fetches() {
851        crate::test::identify_parking_lot_deadlocks();
852        tokio::task::LocalSet::new()
853            .run_until(async move {
854                let (client, _guard, _owner) = prep_vari!(true);
855
856                let call_count = Arc::new(AtomicUsize::new(0));
857                let call_count_clone = call_count.clone();
858
859                let scope =
860                    QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
861                        let call_count = call_count_clone.clone();
862                        async move {
863                            call_count.fetch_add(1, Ordering::Relaxed);
864
865                            // Add a small delay to simulate network latency
866                            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
867
868                            let offset_usize = offset as usize;
869                            let items: Vec<usize> =
870                                (offset_usize..offset_usize + page_size).collect();
871                            (items, Some(100))
872                        }
873                    });
874
875                // Launch multiple concurrent fetches for the same page
876                let futures = (0..5).map(|_| {
877                    client.fetch_query(
878                        scope.clone(),
879                        PaginatedPageKey {
880                            key: (),
881                            page_index: 0,
882                            page_size: 10,
883                        },
884                    )
885                });
886
887                let results = futures::future::join_all(futures).await;
888
889                // All should succeed
890                for result in results {
891                    assert!(result.is_some());
892                }
893
894                // Should only make 1 API call despite 5 concurrent requests
895                assert_eq!(
896                    call_count.load(Ordering::Relaxed),
897                    1,
898                    "Concurrent fetches should share single API call"
899                );
900            })
901            .await;
902    }
903
904    /// Test different page sizes work correctly with shared cache
905    #[tokio::test]
906    async fn test_paginated_offset_different_page_sizes() {
907        crate::test::identify_parking_lot_deadlocks();
908        tokio::task::LocalSet::new()
909            .run_until(async move {
910                let (client, _guard, _owner) = prep_vari!(true);
911
912                let call_count = Arc::new(AtomicUsize::new(0));
913                let call_count_clone = call_count.clone();
914
915                let scope =
916                    QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
917                        let call_count = call_count_clone.clone();
918                        async move {
919                            call_count.fetch_add(1, Ordering::Relaxed);
920
921                            let offset_usize = offset as usize;
922                            let items: Vec<usize> = (offset_usize
923                                ..std::cmp::min(offset_usize + page_size, 50))
924                                .collect();
925                            (items, Some(50))
926                        }
927                    });
928
929                // Fetch with page size 5
930                let (items1, _) = client
931                    .fetch_query(
932                        scope.clone(),
933                        PaginatedPageKey {
934                            key: (),
935                            page_index: 0,
936                            page_size: 5,
937                        },
938                    )
939                    .await
940                    .expect("Page should exist");
941                assert_eq!(items1.len(), 5);
942                assert_eq!(call_count.load(Ordering::Relaxed), 1);
943
944                // Fetch with page size 15 - should reuse cached data and fetch more
945                let (items2, _) = client
946                    .fetch_query(
947                        scope.clone(),
948                        PaginatedPageKey {
949                            key: (),
950                            page_index: 0,
951                            page_size: 15,
952                        },
953                    )
954                    .await
955                    .expect("Page should exist");
956                assert_eq!(items2.len(), 15);
957                assert_eq!(
958                    call_count.load(Ordering::Relaxed),
959                    2,
960                    "Should fetch more data for larger page"
961                );
962
963                // Fetch with page size 10 - should use cached data
964                let (items3, _) = client
965                    .fetch_query(
966                        scope.clone(),
967                        PaginatedPageKey {
968                            key: (),
969                            page_index: 0,
970                            page_size: 10,
971                        },
972                    )
973                    .await
974                    .expect("Page should exist");
975                assert_eq!(items3.len(), 10);
976                assert_eq!(
977                    call_count.load(Ordering::Relaxed),
978                    2,
979                    "Should use cached data, no new fetch"
980                );
981            })
982            .await;
983    }
984
985    /// Test that backing cache is properly managed when paginated pages expire
986    /// Tests both GC (garbage collection) and stale time scenarios
987    #[rstest]
988    #[case::gc_time(TestMode::GcTime)]
989    #[case::stale_time(TestMode::StaleTime)]
990    #[tokio::test]
991    async fn test_paginated_offset_backing_cache_lifecycle(#[case] mode: TestMode) {
992        crate::test::identify_parking_lot_deadlocks();
993        tokio::task::LocalSet::new()
994            .run_until(async move {
995                let (client, _guard, _owner) = prep_vari!(true);
996
997                let call_count = Arc::new(AtomicUsize::new(0));
998                let call_count_clone = call_count.clone();
999
1000                let scope =
1001                    QueryScope::new_paginated_with_offset(move |key: String, page_size, offset| {
1002                        let call_count = call_count_clone.clone();
1003                        async move {
1004                            call_count.fetch_add(1, Ordering::Relaxed);
1005
1006                            let offset_usize = offset as usize;
1007                            let items: Vec<String> = (offset_usize..offset_usize + page_size)
1008                                .map(|i| format!("{}_{}", key, i))
1009                                .collect();
1010                            (items, Some(30))
1011                        }
1012                    })
1013                    .with_options(match mode {
1014                        TestMode::GcTime => crate::QueryOptions::default()
1015                            .with_gc_time(std::time::Duration::from_millis(100)),
1016                        TestMode::StaleTime => crate::QueryOptions::default()
1017                            .with_stale_time(std::time::Duration::from_millis(100)),
1018                    });
1019
1020                // Fetch first page
1021                let (items, _) = client
1022                    .fetch_query(
1023                        scope.clone(),
1024                        PaginatedPageKey {
1025                            key: "test".to_string(),
1026                            page_index: 0,
1027                            page_size: 10,
1028                        },
1029                    )
1030                    .await
1031                    .expect("Page should exist");
1032                assert_eq!(items[0], "test_0");
1033                assert_eq!(call_count.load(Ordering::Relaxed), 1);
1034
1035                // Wait for 50ms, 50ms left till gc/stale:
1036                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1037
1038                // Fetch second page
1039                let (items, _) = client
1040                    .fetch_query(
1041                        scope.clone(),
1042                        PaginatedPageKey {
1043                            key: "test".to_string(),
1044                            page_index: 1,
1045                            page_size: 10,
1046                        },
1047                    )
1048                    .await
1049                    .expect("Page should exist");
1050                assert_eq!(items[0], "test_10");
1051                assert_eq!(call_count.load(Ordering::Relaxed), 2);
1052
1053                // Wait another 50ms, so the first query is stale/gc'd, but the second is valid even when gc'd because 50ms left:
1054                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1055
1056                // Fetch first page again
1057                let (items, _) = client
1058                    .fetch_query(
1059                        scope.clone(),
1060                        PaginatedPageKey {
1061                            key: "test".to_string(),
1062                            page_index: 0,
1063                            page_size: 10,
1064                        },
1065                    )
1066                    .await
1067                    .expect("Page should exist");
1068                assert_eq!(items[0], "test_0");
1069
1070                // Fetch second page again
1071                let (items, _) = client
1072                    .fetch_query(
1073                        scope.clone(),
1074                        PaginatedPageKey {
1075                            key: "test".to_string(),
1076                            page_index: 1,
1077                            page_size: 10,
1078                        },
1079                    )
1080                    .await
1081                    .expect("Page should exist");
1082                assert_eq!(items[0], "test_10");
1083
1084                let expected_calls = match mode {
1085                    TestMode::GcTime => {
1086                        // GC cleared page 0, but page 1 kept backing cache alive
1087                        // So page 0 refetch reuses backing cache, plus page 1 still alive so also reused
1088                        2
1089                    }
1090                    TestMode::StaleTime => {
1091                        // Stale page triggers invalidation which clears backing cache
1092                        // So page 0 refetch needs new API call, but page 1 is still valid so reused
1093                        3
1094                    }
1095                };
1096                assert_eq!(call_count.load(Ordering::Relaxed), expected_calls);
1097
1098                // If gc, should clear the backing cache only when all pages are gc'd:
1099                if matches!(mode, TestMode::GcTime) {
1100                    tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1101
1102                    // Now fetch again - backing cache should be gone
1103                    let (items, _) = client
1104                        .fetch_query(
1105                            scope.clone(),
1106                            PaginatedPageKey {
1107                                key: "test".to_string(),
1108                                page_index: 0,
1109                                page_size: 10,
1110                            },
1111                        )
1112                        .await
1113                        .expect("Page should exist");
1114                    assert_eq!(items[0], "test_0");
1115                    assert_eq!(
1116                        call_count.load(Ordering::Relaxed),
1117                        3,
1118                        "Backing cache should be cleared when all pages are GC'd"
1119                    );
1120                }
1121            })
1122            .await;
1123    }
1124
1125    /// Test jumping to distant pages without loading intermediate pages (offset-specific feature)
1126    #[tokio::test]
1127    async fn test_paginated_offset_jump_to_distant_page() {
1128        crate::test::identify_parking_lot_deadlocks();
1129        tokio::task::LocalSet::new()
1130            .run_until(async move {
1131                let (client, _guard, _owner) = prep_vari!(true);
1132
1133                let call_count = Arc::new(AtomicUsize::new(0));
1134                let call_count_clone = call_count.clone();
1135
1136                let scope =
1137                    QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
1138                        let call_count = call_count_clone.clone();
1139                        async move {
1140                            call_count.fetch_add(1, Ordering::Relaxed);
1141
1142                            let offset_usize = offset as usize;
1143                            let items: Vec<usize> =
1144                                (offset_usize..offset_usize + page_size).collect();
1145                            (items, Some(1000))
1146                        }
1147                    });
1148
1149                // Jump directly to page 10 without loading pages 0-9
1150                let (items, total) = client
1151                    .fetch_query(
1152                        scope.clone(),
1153                        PaginatedPageKey {
1154                            key: (),
1155                            page_index: 10,
1156                            page_size: 10,
1157                        },
1158                    )
1159                    .await
1160                    .expect("Page should exist");
1161
1162                assert_eq!(items, (100..110).collect::<Vec<_>>());
1163                assert_eq!(total, Some(1000));
1164                assert_eq!(
1165                    call_count.load(Ordering::Relaxed),
1166                    1,
1167                    "Should only make 1 API call to jump to distant page"
1168                );
1169
1170                // Now jump back to page 0
1171                let (items, total) = client
1172                    .fetch_query(
1173                        scope.clone(),
1174                        PaginatedPageKey {
1175                            key: (),
1176                            page_index: 0,
1177                            page_size: 10,
1178                        },
1179                    )
1180                    .await
1181                    .expect("Page should exist");
1182
1183                assert_eq!(items, (0..10).collect::<Vec<_>>());
1184                assert_eq!(total, Some(1000));
1185                assert_eq!(
1186                    call_count.load(Ordering::Relaxed),
1187                    2,
1188                    "Should make 1 more API call to jump back"
1189                );
1190
1191                // Jump to page 50
1192                let (items, total) = client
1193                    .fetch_query(
1194                        scope.clone(),
1195                        PaginatedPageKey {
1196                            key: (),
1197                            page_index: 50,
1198                            page_size: 10,
1199                        },
1200                    )
1201                    .await
1202                    .expect("Page should exist");
1203
1204                assert_eq!(items, (500..510).collect::<Vec<_>>());
1205                assert_eq!(total, Some(1000));
1206                assert_eq!(
1207                    call_count.load(Ordering::Relaxed),
1208                    3,
1209                    "Should make 1 more API call for distant page 50"
1210                );
1211            })
1212            .await;
1213    }
1214
1215    /// Test total items calculation and page count (offset-specific feature)
1216    #[tokio::test]
1217    async fn test_paginated_offset_total_items_and_page_count() {
1218        crate::test::identify_parking_lot_deadlocks();
1219        tokio::task::LocalSet::new()
1220            .run_until(async move {
1221                let (client, _guard, _owner) = prep_vari!(true);
1222
1223                let scope = QueryScope::new_paginated_with_offset(
1224                    move |_key: (), page_size, offset| async move {
1225                        let offset_usize = offset as usize;
1226                        let total_items = 47u64;
1227                        let items: Vec<usize> = (offset_usize
1228                            ..std::cmp::min(offset_usize + page_size, total_items as usize))
1229                            .collect();
1230                        (items, Some(total_items))
1231                    },
1232                );
1233
1234                // Fetch first page
1235                let (items, total) = client
1236                    .fetch_query(
1237                        scope.clone(),
1238                        PaginatedPageKey {
1239                            key: (),
1240                            page_index: 0,
1241                            page_size: 10,
1242                        },
1243                    )
1244                    .await
1245                    .expect("Page should exist");
1246
1247                assert_eq!(items.len(), 10);
1248                assert_eq!(total, Some(47));
1249
1250                // Calculate total pages: ceil(47 / 10) = 5
1251                let total_pages = total.map(|t| (t as f64 / 10.0).ceil() as usize);
1252                assert_eq!(total_pages, Some(5));
1253
1254                // Test page 4 (last page with partial data)
1255                let (items, total) = client
1256                    .fetch_query(
1257                        scope.clone(),
1258                        PaginatedPageKey {
1259                            key: (),
1260                            page_index: 4,
1261                            page_size: 10,
1262                        },
1263                    )
1264                    .await
1265                    .expect("Page should exist");
1266
1267                assert_eq!(items.len(), 7, "Last page should have 7 items (47 % 10)");
1268                assert_eq!(items, (40..47).collect::<Vec<_>>());
1269                assert_eq!(total, Some(47));
1270
1271                // Test beyond last page
1272                let result = client
1273                    .fetch_query(
1274                        scope.clone(),
1275                        PaginatedPageKey {
1276                            key: (),
1277                            page_index: 5,
1278                            page_size: 10,
1279                        },
1280                    )
1281                    .await;
1282
1283                assert!(result.is_none(), "Should return None for page beyond total");
1284            })
1285            .await;
1286    }
1287
1288    /// Test total items update when it changes
1289    #[tokio::test]
1290    async fn test_paginated_offset_total_items_update() {
1291        crate::test::identify_parking_lot_deadlocks();
1292        tokio::task::LocalSet::new()
1293            .run_until(async move {
1294                let (client, _guard, _owner) = prep_vari!(true);
1295
1296                let total = Arc::new(AtomicUsize::new(100));
1297                let total_clone = total.clone();
1298
1299                let scope =
1300                    QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
1301                        let total = total_clone.clone();
1302                        async move {
1303                            let current_total = total.load(Ordering::Relaxed) as u64;
1304                            let offset_usize = offset as usize;
1305                            let items: Vec<usize> = (offset_usize
1306                                ..std::cmp::min(offset_usize + page_size, current_total as usize))
1307                                .collect();
1308                            (items, Some(current_total))
1309                        }
1310                    });
1311
1312                // Fetch with initial total of 100
1313                let (items, returned_total) = client
1314                    .fetch_query(
1315                        scope.clone(),
1316                        PaginatedPageKey {
1317                            key: (),
1318                            page_index: 0,
1319                            page_size: 10,
1320                        },
1321                    )
1322                    .await
1323                    .expect("Page should exist");
1324
1325                assert_eq!(items, (0..10).collect::<Vec<_>>());
1326                assert_eq!(returned_total, Some(100));
1327
1328                // Update total to 50
1329                total.store(50, Ordering::Relaxed);
1330
1331                // Invalidate to get fresh data
1332                client.invalidate_query_scope(scope.clone());
1333
1334                // Fetch again - should get updated total
1335                let (items, returned_total) = client
1336                    .fetch_query(
1337                        scope.clone(),
1338                        PaginatedPageKey {
1339                            key: (),
1340                            page_index: 0,
1341                            page_size: 10,
1342                        },
1343                    )
1344                    .await
1345                    .expect("Page should exist");
1346
1347                assert_eq!(items, (0..10).collect::<Vec<_>>());
1348                assert_eq!(returned_total, Some(50));
1349            })
1350            .await;
1351    }
1352
1353    /// Test sparse item storage - items at non-contiguous offsets
1354    #[tokio::test]
1355    async fn test_paginated_offset_sparse_item_storage() {
1356        crate::test::identify_parking_lot_deadlocks();
1357        tokio::task::LocalSet::new()
1358            .run_until(async move {
1359                let (client, _guard, _owner) = prep_vari!(true);
1360
1361                let scope = QueryScope::new_paginated_with_offset(
1362                    move |_key: (), page_size, offset| async move {
1363                        let offset_usize = offset as usize;
1364                        let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
1365                        (items, Some(1000))
1366                    },
1367                );
1368
1369                // Fetch page 0
1370                let _ = client
1371                    .fetch_query(
1372                        scope.clone(),
1373                        PaginatedPageKey {
1374                            key: (),
1375                            page_index: 0,
1376                            page_size: 10,
1377                        },
1378                    )
1379                    .await;
1380
1381                // Fetch page 10 (offset 100) - creates sparse storage
1382                let (items, _) = client
1383                    .fetch_query(
1384                        scope.clone(),
1385                        PaginatedPageKey {
1386                            key: (),
1387                            page_index: 10,
1388                            page_size: 10,
1389                        },
1390                    )
1391                    .await
1392                    .expect("Page should exist");
1393
1394                assert_eq!(items, (100..110).collect::<Vec<_>>());
1395
1396                // Fetch page 5 (offset 50) - fills in the middle
1397                let (items, _) = client
1398                    .fetch_query(
1399                        scope.clone(),
1400                        PaginatedPageKey {
1401                            key: (),
1402                            page_index: 5,
1403                            page_size: 10,
1404                        },
1405                    )
1406                    .await
1407                    .expect("Page should exist");
1408
1409                assert_eq!(items, (50..60).collect::<Vec<_>>());
1410
1411                // All three pages should still be accessible
1412                let (items, _) = client
1413                    .fetch_query(
1414                        scope.clone(),
1415                        PaginatedPageKey {
1416                            key: (),
1417                            page_index: 0,
1418                            page_size: 10,
1419                        },
1420                    )
1421                    .await
1422                    .expect("Page should exist");
1423                assert_eq!(items, (0..10).collect::<Vec<_>>());
1424            })
1425            .await;
1426    }
1427
1428    #[derive(Debug, Clone, Copy)]
1429    enum TestMode {
1430        /// Test GC behavior: backing cache should only be cleared when all pages are GC'd
1431        GcTime,
1432        /// Test stale time behavior: backing cache should be invalidated when any page goes stale
1433        StaleTime,
1434    }
1435}