Skip to main content

leptos_fetch/pagination/
paginated_offset.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3use std::{hash::Hash, sync::Arc};
4
5use leptos::prelude::*;
6
7use crate::{
8    PaginatedPageKey, QueryOptions, QueryScope, QueryScopeLocal, UntypedQueryClient, cache::OnScopeMissing,
9    debug_if_devtools_enabled::DebugIfDevtoolsEnabled, query_scope::ScopeCacheKey,
10};
11
12macro_rules! define {
13    ([$($impl_fut_generics:tt)*], [$($impl_fn_generics:tt)*], $name:ident, $fetch_fn:ident, $sname:literal) => {
14
15        impl<Key, PageItem> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>
16        where
17            Key: DebugIfDevtoolsEnabled + Clone + Hash + PartialEq + 'static $($impl_fn_generics)*,
18            PageItem: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
19        {
20            /// Create an offset-based paginated query scope.
21            ///
22            /// Preferred when your API supports numeric offsets. Enables jumping to any page without
23            /// loading intermediate pages and can return total item count for page calculations.
24            ///
25            /// # Arguments
26            ///
27            /// The getter receives:
28            /// - `query_key: Key` - Your custom key, same across all pages
29            /// - `nb_items_requested: usize` - Target number of items to return
30            /// - `offset: u64` - Starting position/offset for this fetch
31            ///
32            /// The getter must return:
33            /// - `Vec<Item>` - Items for this page
34            /// - `Option<u64>` - Total item count if known (enables page count calculations)
35            ///
36            /// # Example
37            ///
38            /// ```rust,ignore
39            /// use leptos_fetch::{QueryScope, PaginatedPageKey};
40            ///
41            /// // Create paginated scope with offset-based API
42            /// let scope = QueryScope::new_paginated_with_offset(
43            ///     |_key: (), nb_items, offset| async move {
44            ///         // Call your API with offset and limit
45            ///         let (items, total) = fetch_from_api(offset, nb_items).await;
46            ///         (items, Some(total))
47            ///     }
48            /// );
49            ///
50            /// // Fetch page 0
51            /// let (items, total) = client.fetch_query(scope, PaginatedPageKey {
52            ///     key: (),
53            ///     page_index: 0,
54            ///     page_size: 20,
55            /// }).await.expect("Page exists");
56            ///
57            /// // Jump directly to page 10 (no need to load pages 1-9)
58            /// let (items, total) = client.fetch_query(scope, PaginatedPageKey {
59            ///     key: (),
60            ///     page_index: 10,
61            ///     page_size: 20,
62            /// }).await.expect("Page exists");
63            ///
64            /// // Calculate total pages from returned total count
65            /// if let Some(total_items) = total {
66            ///     let total_pages = (total_items as f64 / 20.0).ceil() as u64;
67            /// }
68            /// ```
69            pub fn new_paginated_with_offset<Fut>(
70                getter: impl Fn(Key, usize, u64) -> Fut + 'static $($impl_fn_generics)*,
71            ) -> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>
72            where
73                Fut: Future<Output = (Vec<PageItem>, Option<u64>)> $($impl_fut_generics)*,
74            {
75                let getter = Arc::new(getter);
76                let backing_cache_scope = $name::new({
77                    let getter = getter.clone();
78                    move |key: KeyWithUnhashedItemCountAndOffsetRequested<Key>| {
79                        let getter = getter.clone();
80                        async move {
81                            let (items, mut maybe_total_items) =
82                                getter(key.key, key.item_count_requested, key.offset_requested)
83                                    .await;
84
85                            // Protect against incorrect item count when clearly no items left:
86                            if items.is_empty() {
87                                if let Some(cur_total_items) = &mut maybe_total_items {
88                                    if *cur_total_items > key.offset_requested {
89                                        *cur_total_items = key.offset_requested;
90                                    }
91                                } else {
92                                    maybe_total_items = Some(key.offset_requested);
93                                }
94                            }
95
96                            BackingCache {
97                                inner: Arc::new(BackingCacheInner {
98                                    items: SparseItems {
99                                        items: parking_lot::Mutex::new(
100                                            items
101                                                .into_iter()
102                                                .enumerate()
103                                                .map(|(idx, item)| (key.offset_requested + (idx as u64), item))
104                                                .collect(),
105                                        ),
106                                    },
107                                    maybe_total_items: parking_lot::Mutex::new(maybe_total_items),
108                                    update_lock: futures::lock::Mutex::new(()),
109                                }),
110                            }
111                        }
112                    }
113                })
114                // Shouldn't itself expire, the paginated query will control that:
115                .with_options(
116                    QueryOptions::default()
117                        .with_stale_time(Duration::MAX)
118                        .with_gc_time(Duration::MAX)
119                        .with_refetch_interval(Duration::MAX)
120                );
121
122                $name::new({
123                    let backing_cache_scope = backing_cache_scope.clone();
124                    move |page_key: PaginatedPageKey<Key>| {
125                        let backing_cache_scope = backing_cache_scope.clone();
126                        let getter = getter.clone();
127                        async move {
128                            let untyped_client = use_context::<UntypedQueryClient>()
129                                .expect(
130                                    "leptos-fetch bug, UntypedQueryClient should always have been \
131                                    provided to the query context internally"
132                            );
133                            let scope_cache_key = use_context::<ScopeCacheKey>()
134                                .expect(
135                                    "leptos-fetch bug, ScopeCacheKey itself should always have been \
136                                    provided to the query context internally"
137                                );
138
139                            // If this query is reloading because it was stale, should
140                            // invalidate the backing cache before reading it,
141                            // otherwise will still get back the same stale data again:
142                            if let Some(metadata) = untyped_client
143                                .query_metadata::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>(
144                                scope_cache_key,
145                                &page_key,
146                            )
147                            && metadata.stale_or_invalidated
148                            && let Some(backing_metadata) = untyped_client
149                                .query_metadata::<KeyWithUnhashedItemCountAndOffsetRequested<Key>, BackingCache<PageItem>>(
150                                backing_cache_scope.cache_key,
151                                &KeyWithUnhashedItemCountAndOffsetRequested {
152                                    key: page_key.key.clone(),
153                                    item_count_requested: 0, // Doesn't matter, it's not part of the hash
154                                    offset_requested: 0, // Doesn't matter, it's not part of the hash
155                                },
156                            )
157                            && backing_metadata.updated_at <= metadata.updated_at
158                            {
159                                untyped_client.invalidate_query(
160                                    &backing_cache_scope,
161                                    KeyWithUnhashedItemCountAndOffsetRequested {
162                                        key: page_key.key.clone(),
163                                        item_count_requested: 0, // Doesn't matter, it's not part of the hash
164                                        offset_requested: 0, // Doesn't matter, it's not part of the hash
165                                    },
166                                );
167                            }
168
169                            let infinite_cache = untyped_client
170                                .$fetch_fn(
171                                    backing_cache_scope,
172                                    KeyWithUnhashedItemCountAndOffsetRequested {
173                                        key: page_key.key.clone(),
174                                        item_count_requested: page_key.page_size,
175                                        offset_requested: (page_key.page_index as u64) * (page_key.page_size as u64)
176                                    },
177                                )
178                                .await;
179
180                            let target_idx_start = (page_key.page_index as u64) * (page_key.page_size as u64);
181                            let mut target_idx_end_exclusive =
182                                ((page_key.page_index as u64) + 1) * (page_key.page_size as u64);
183                            if let Some(maybe_total_items) = *infinite_cache.inner.maybe_total_items.lock() {
184                                // If page starts beyond available data, return None
185                                if target_idx_start >= maybe_total_items {
186                                    return None;
187                                }
188                                if target_idx_end_exclusive > maybe_total_items {
189                                    target_idx_end_exclusive = maybe_total_items;
190                                }
191                            }
192
193                            // Load x more if needed:
194                            if !infinite_cache.inner.items.check_range(target_idx_start, target_idx_end_exclusive) {
195                                // Preventing multiple simultaneous fetches by holding an async lock,
196                                // but note making sure to not hold any sync locks across await boundaries.
197                                let mut _guard = infinite_cache.inner.update_lock.lock().await;
198                                loop {
199                                    // Find the next offset we need to fetch
200                                    let next_offset = {
201                                        let items_lock = infinite_cache.inner.items.items.lock();
202                                        // Find first missing offset in our target range
203                                        let mut offset = target_idx_start;
204                                        while offset < target_idx_end_exclusive {
205                                            if !items_lock.contains_key(&offset) {
206                                                break;
207                                            }
208                                            offset += 1;
209                                        }
210                                        offset
211                                    };
212
213                                    if next_offset >= target_idx_end_exclusive {
214                                        // All items in range are present
215                                        break;
216                                    }
217
218                                    let items_needed = (target_idx_end_exclusive - next_offset) as usize;
219                                    let (items, maybe_total_items) =
220                                        getter(page_key.key.clone(), items_needed, next_offset).await;
221                                    if !items.is_empty() {
222                                        infinite_cache.inner.items.extend(next_offset, items);
223                                        *infinite_cache.inner.maybe_total_items.lock() = maybe_total_items;
224                                    } else {
225                                        // Protect against incorrect item count when clearly no items left:
226                                        let mut guard = infinite_cache.inner.maybe_total_items.lock();
227                                        if let Some(cur_total_items) = &mut* guard {
228                                            if *cur_total_items > next_offset {
229                                                *cur_total_items = next_offset;
230                                            }
231                                        } else {
232                                            *guard = Some(next_offset);
233                                        }
234                                        // Can't get more items, break out
235                                        break;
236                                    }
237                                }
238                                drop(_guard);
239                            }
240
241                            infinite_cache
242                                .inner
243                                .items
244                                .get_range(
245                                    target_idx_start,
246                                    (target_idx_end_exclusive - target_idx_start) as usize,
247                                )
248                                .map(|items| {
249                                (items, infinite_cache.inner.maybe_total_items.lock().clone())
250                            })
251                        }
252                    }
253                })
254                // An invalidation or clear of any page should invalidate the backing cache:
255                .on_invalidation({
256                    let backing_cache_scope = backing_cache_scope.clone();
257                    move |key| {
258                        let untyped_client = use_context::<UntypedQueryClient>()
259                            .expect(
260                                "leptos-fetch bug, UntypedQueryClient should always have been \
261                                provided to the on_invalidation context internally"
262                            );
263                        untyped_client.invalidate_query(
264                            &backing_cache_scope,
265                            KeyWithUnhashedItemCountAndOffsetRequested {
266                                key: key.key.clone(),
267                                item_count_requested: 0, // Doesn't matter, it's not part of the hash
268                                offset_requested: 0, // Doesn't matter, it's not part of the hash
269                            },
270                        );
271                    }
272                })
273                // If this was the last page existing now being gc'd, clear the backing cache too:
274                .on_gc(move |key| {
275                    let untyped_client = use_context::<UntypedQueryClient>()
276                        .expect(
277                            "leptos-fetch bug, UntypedQueryClient should always have been \
278                            provided to the on_gc context internally"
279                        );
280                    let scope_cache_key = use_context::<ScopeCacheKey>()
281                        .expect(
282                            "leptos-fetch bug, ScopeCacheKey itself should always have been \
283                            provided to the on_gc context internally"
284                        );
285                    let mut found_nb = 0;
286                    untyped_client
287                        .scope_lookup
288                        .with_cached_scope_mut::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>, _, _>(
289                            &mut untyped_client.scope_lookup.scopes_mut(),
290                            scope_cache_key,
291                            OnScopeMissing::Skip,
292                            |_| {},
293                            |maybe_scope, _| {
294                                if let Some(scope) = maybe_scope {
295                                    for query_or_pending in scope.all_queries_mut_include_pending() {
296                                        if query_or_pending
297                                            .key()
298                                            .value_if_safe()
299                                            .map(|test_key| test_key.key == key.key)
300                                            .unwrap_or(false)
301                                        {
302                                            found_nb += 1;
303                                        }
304                                    }
305                                }
306                            },
307                        );
308                    if found_nb == 0 {
309                        untyped_client.clear_query(
310                            &backing_cache_scope,
311                            KeyWithUnhashedItemCountAndOffsetRequested {
312                                key: key.key.clone(),
313                                item_count_requested: 0, // Doesn't matter, it's not part of the hash
314                                offset_requested: 0, // Doesn't matter, it's not part of the hash
315                            },
316                        );
317                    }
318                })
319            }
320        }
321    };
322}
323
324define! { [+ Send], [+ Send + Sync], QueryScope, fetch_query, "QueryScope" }
325define! { [], [], QueryScopeLocal, fetch_query_local, "QueryScopeLocal" }
326
327#[derive(Debug, Clone)]
328struct KeyWithUnhashedItemCountAndOffsetRequested<Key> {
329    key: Key,
330    item_count_requested: usize,
331    offset_requested: u64,
332}
333
334impl<Key: Hash> Hash for KeyWithUnhashedItemCountAndOffsetRequested<Key> {
335    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
336        self.key.hash(state);
337    }
338}
339
340#[derive(Debug, Clone)]
341struct BackingCache<Item> {
342    inner: Arc<BackingCacheInner<Item>>,
343}
344
345#[derive(Debug)]
346struct BackingCacheInner<Item> {
347    items: SparseItems<Item>,
348    maybe_total_items: parking_lot::Mutex<Option<u64>>,
349    update_lock: futures::lock::Mutex<()>,
350}
351
352#[derive(Debug)]
353struct SparseItems<Item> {
354    items: parking_lot::Mutex<BTreeMap<u64, Item>>,
355}
356
357impl<Item> SparseItems<Item> {
358    fn extend(&self, offset: u64, new_items: Vec<Item>) {
359        let mut items = self.items.lock();
360        for (idx, item) in new_items.into_iter().enumerate() {
361            items.insert(offset + (idx as u64), item);
362        }
363    }
364
365    /// Return the range requested, or None if any items are missing.
366    fn get_range(&self, start_offset: u64, count: usize) -> Option<Vec<Item>>
367    where
368        Item: Clone,
369    {
370        let items = self.items.lock();
371
372        // Check if we have all items in the range
373        let mut result = Vec::with_capacity(count);
374        for offset in start_offset..(start_offset + (count as u64)) {
375            match items.get(&offset) {
376                Some(item) => result.push(item),
377                None => return None, // Missing item, return None
378            }
379        }
380
381        Some(result.into_iter().cloned().collect())
382    }
383
384    fn check_range(&self, start_offset: u64, end_index_exclusive: u64) -> bool
385    where
386        Item: Clone,
387    {
388        let items = self.items.lock();
389        for offset in start_offset..end_index_exclusive {
390            if !items.contains_key(&offset) {
391                return false;
392            }
393        }
394        true
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use any_spawner::Executor;
401    use hydration_context::SsrSharedContext;
402    use leptos::prelude::*;
403    use rstest::*;
404    use std::sync::Arc;
405    use std::sync::atomic::{AtomicUsize, Ordering};
406
407    use crate::test::prep_vari;
408    use crate::{PaginatedPageKey, QueryClient, QueryScope};
409
410    /// We don't know the serialization mechanism the user is using, so cannot return
411    /// wrapping types from the query function.
412    /// Hence returning (Vec<Item>, Option<total_items>) instead of a custom wrapping Page<Item> type.
413    /// This test is just checking compilation.
414    #[tokio::test]
415    async fn test_paginated_serialization_works() {
416        crate::test::identify_parking_lot_deadlocks();
417        tokio::task::LocalSet::new()
418            .run_until(async move {
419                let (client, _guard, _owner) = prep_vari!(true);
420                let scope = QueryScope::new_paginated_with_offset(|_: (), _, _| async { (vec![()], Some(10)) });
421                client.resource(scope, || PaginatedPageKey {
422                    key: (),
423                    page_index: 0,
424                    page_size: 10,
425                });
426            })
427            .await;
428    }
429
430    fn get_simple_api_fn(
431        num_rows: usize,
432    ) -> (
433        Arc<AtomicUsize>,
434        impl Fn(usize, u64) -> (Vec<usize>, Option<u64>) + Clone + 'static,
435    ) {
436        let call_count = Arc::new(AtomicUsize::new(0));
437
438        let api_fn = {
439            let call_count = call_count.clone();
440            move |target_return_count: usize, offset: u64| {
441                let call_count = call_count.clone();
442                call_count.fetch_add(1, Ordering::Relaxed);
443
444                let offset_usize = offset as usize;
445                let items = (0..num_rows)
446                    .skip(offset_usize)
447                    .take(target_return_count)
448                    .collect::<Vec<_>>();
449                let total_items = num_rows as u64;
450                (items, Some(total_items))
451            }
452        };
453
454        (call_count, api_fn)
455    }
456
457    #[tokio::test]
458    async fn test_paginated_offset() {
459        crate::test::identify_parking_lot_deadlocks();
460        tokio::task::LocalSet::new()
461            .run_until(async move {
462                let (client, _guard, _owner) = prep_vari!(true);
463
464                let (_call_count, my_api_fn) = get_simple_api_fn(30);
465
466                let scope = QueryScope::new_paginated_with_offset(move |_query_key, page_size, offset| {
467                    let my_api_fn = my_api_fn.clone();
468                    async move {
469                        let (items, total_items) = my_api_fn(page_size, offset);
470                        (items, total_items)
471                    }
472                });
473
474                let (first_page_logs, maybe_total) = client
475                    .fetch_query(
476                        scope.clone(),
477                        PaginatedPageKey {
478                            key: (),
479                            page_index: 0,
480                            page_size: 20,
481                        },
482                    )
483                    .await
484                    .expect(
485                        "This page should exist (Some()), \
486                        None when a page is requested beyond the end of the data.",
487                    );
488
489                assert_eq!(first_page_logs, (0..20).collect::<Vec<_>>());
490                assert_eq!(maybe_total, Some(30));
491
492                let (second_page_logs, maybe_total) = client
493                    .fetch_query(
494                        scope.clone(),
495                        PaginatedPageKey {
496                            key: (),
497                            page_index: 1,
498                            page_size: 20,
499                        },
500                    )
501                    .await
502                    .expect(
503                        "This page should exist (Some()), \
504                        None when a page is requested beyond the end of the data.",
505                    );
506
507                assert_eq!(second_page_logs, (20..30).collect::<Vec<_>>());
508                assert_eq!(maybe_total, Some(30));
509
510                assert!(
511                    client
512                        .fetch_query(
513                            scope.clone(),
514                            PaginatedPageKey {
515                                key: (),
516                                page_index: 2,
517                                page_size: 20,
518                            },
519                        )
520                        .await
521                        .is_none()
522                );
523            })
524            .await;
525    }
526
527    /// Test that the pagination logic keeps calling the API until page_size items are available
528    #[tokio::test]
529    async fn test_paginated_offset_fills_page_size_with_multiple_calls() {
530        crate::test::identify_parking_lot_deadlocks();
531        tokio::task::LocalSet::new()
532            .run_until(async move {
533                let (client, _guard, _owner) = prep_vari!(true);
534
535                let call_count = Arc::new(AtomicUsize::new(0));
536                let call_count_clone = call_count.clone();
537
538                // API that returns fewer items than requested
539                let scope = QueryScope::new_paginated_with_offset(move |_key: (), _page_size, offset| {
540                    let call_count = call_count_clone.clone();
541                    async move {
542                        call_count.fetch_add(1, Ordering::Relaxed);
543
544                        // Return only 3 items per call, even if more are requested
545                        let items = match offset {
546                            0 => vec![0, 1, 2],
547                            3 => vec![3, 4, 5],
548                            6 => vec![6, 7, 8],
549                            9 => vec![9, 10],
550                            _ => vec![],
551                        };
552                        (items, Some(11))
553                    }
554                });
555
556                // Request page with size 10 - should make 4 API calls to get 10 items
557                let (items, total) = client
558                    .fetch_query(
559                        scope.clone(),
560                        PaginatedPageKey {
561                            key: (),
562                            page_index: 0,
563                            page_size: 10,
564                        },
565                    )
566                    .await
567                    .expect("Page should exist");
568
569                assert_eq!(items.len(), 10, "Should have filled to page_size");
570                assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
571                assert_eq!(total, Some(11));
572                assert_eq!(
573                    call_count.load(Ordering::Relaxed),
574                    4,
575                    "Should have made 4 API calls to fill page"
576                );
577
578                // Request second page - item 10 was already fetched during first page
579                let (items, total) = client
580                    .fetch_query(
581                        scope.clone(),
582                        PaginatedPageKey {
583                            key: (),
584                            page_index: 1,
585                            page_size: 10,
586                        },
587                    )
588                    .await
589                    .expect("Page should exist");
590
591                assert_eq!(items.len(), 1, "Only 1 item left");
592                assert_eq!(items, vec![10]);
593                assert_eq!(total, Some(11));
594                assert_eq!(
595                    call_count.load(Ordering::Relaxed),
596                    4,
597                    "Should not make additional calls - item 10 was already cached"
598                );
599            })
600            .await;
601    }
602
603    /// Test that API calls are cached properly and not repeated unnecessarily
604    #[tokio::test]
605    async fn test_paginated_offset_no_unnecessary_api_calls() {
606        crate::test::identify_parking_lot_deadlocks();
607        tokio::task::LocalSet::new()
608            .run_until(async move {
609                let (client, _guard, _owner) = prep_vari!(true);
610
611                let call_count = Arc::new(AtomicUsize::new(0));
612                let call_count_clone = call_count.clone();
613
614                let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
615                    let call_count = call_count_clone.clone();
616                    async move {
617                        call_count.fetch_add(1, Ordering::Relaxed);
618
619                        let offset_usize = offset as usize;
620                        let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
621                        (items, Some(100))
622                    }
623                });
624
625                // First fetch
626                let _ = client
627                    .fetch_query(
628                        scope.clone(),
629                        PaginatedPageKey {
630                            key: (),
631                            page_index: 0,
632                            page_size: 10,
633                        },
634                    )
635                    .await;
636                assert_eq!(
637                    call_count.load(Ordering::Relaxed),
638                    1,
639                    "First fetch should make 1 API call"
640                );
641
642                // Fetch same page again - should use cache
643                let _ = client
644                    .fetch_query(
645                        scope.clone(),
646                        PaginatedPageKey {
647                            key: (),
648                            page_index: 0,
649                            page_size: 10,
650                        },
651                    )
652                    .await;
653                assert_eq!(call_count.load(Ordering::Relaxed), 1, "Should still be 1 call - cached");
654
655                // Fetch next page
656                let _ = client
657                    .fetch_query(
658                        scope.clone(),
659                        PaginatedPageKey {
660                            key: (),
661                            page_index: 1,
662                            page_size: 10,
663                        },
664                    )
665                    .await;
666                assert_eq!(call_count.load(Ordering::Relaxed), 2, "Second page needs new API call");
667
668                // Fetch first page again - should still be cached
669                let _ = client
670                    .fetch_query(
671                        scope.clone(),
672                        PaginatedPageKey {
673                            key: (),
674                            page_index: 0,
675                            page_size: 10,
676                        },
677                    )
678                    .await;
679                assert_eq!(
680                    call_count.load(Ordering::Relaxed),
681                    2,
682                    "Should still be 2 calls - first page cached"
683                );
684            })
685            .await;
686    }
687
688    /// Test linked invalidation and clear between pages with same key
689    #[rstest]
690    #[tokio::test]
691    async fn test_paginated_offset_linked_invalidation_and_clear(#[values(true, false)] clear: bool) {
692        crate::test::identify_parking_lot_deadlocks();
693        tokio::task::LocalSet::new()
694            .run_until(async move {
695                let (client, _guard, _owner) = prep_vari!(true);
696
697                let version = Arc::new(AtomicUsize::new(0));
698                let version_clone = version.clone();
699
700                let scope = QueryScope::new_paginated_with_offset(move |key: String, page_size, offset| {
701                    let v = version_clone.clone();
702                    async move {
703                        let current_version = v.load(Ordering::Relaxed);
704                        let offset_usize = offset as usize;
705
706                        // Return different data based on version
707                        let items: Vec<String> = (offset_usize..offset_usize + page_size)
708                            .map(|i| format!("{}_v{}_{}", key, current_version, i))
709                            .collect();
710
711                        (items, Some(30))
712                    }
713                });
714
715                // Fetch first page
716                let (items1, _) = client
717                    .fetch_query(
718                        scope.clone(),
719                        PaginatedPageKey {
720                            key: "test".to_string(),
721                            page_index: 0,
722                            page_size: 10,
723                        },
724                    )
725                    .await
726                    .expect("Page should exist");
727
728                assert_eq!(items1[0], "test_v0_0");
729
730                // Fetch second page
731                let (items2, _) = client
732                    .fetch_query(
733                        scope.clone(),
734                        PaginatedPageKey {
735                            key: "test".to_string(),
736                            page_index: 1,
737                            page_size: 10,
738                        },
739                    )
740                    .await
741                    .expect("Page should exist");
742
743                assert_eq!(items2[0], "test_v0_10");
744
745                // Increment version to simulate data change
746                version.store(1, Ordering::Relaxed);
747
748                // Fetch first page again - should still be old data
749                let (items1_new, _) = client
750                    .fetch_query(
751                        scope.clone(),
752                        PaginatedPageKey {
753                            key: "test".to_string(),
754                            page_index: 0,
755                            page_size: 10,
756                        },
757                    )
758                    .await
759                    .expect("Page should exist");
760
761                assert_eq!(
762                    items1_new[0], "test_v0_0",
763                    "Should still have old version before invalidation/clear"
764                );
765
766                // Invalidation/clear should lead to new data:
767                if clear {
768                    // Currently not public, but still want to check as used internally for some things:
769                    client.untyped_client.clear_query_scope(scope.clone());
770                } else {
771                    client.invalidate_query_scope(scope.clone());
772                }
773
774                // Fetch first page again - should get new data
775                let (items1_new, _) = client
776                    .fetch_query(
777                        scope.clone(),
778                        PaginatedPageKey {
779                            key: "test".to_string(),
780                            page_index: 0,
781                            page_size: 10,
782                        },
783                    )
784                    .await
785                    .expect("Page should exist");
786
787                assert_eq!(
788                    items1_new[0], "test_v1_0",
789                    "Should have new version after invalidation/clear"
790                );
791
792                // Second page should also be invalidated/cleared
793                let (items2_new, _) = client
794                    .fetch_query(
795                        scope.clone(),
796                        PaginatedPageKey {
797                            key: "test".to_string(),
798                            page_index: 1,
799                            page_size: 10,
800                        },
801                    )
802                    .await
803                    .expect("Page should exist");
804
805                assert_eq!(items2_new[0], "test_v1_10", "Second page should also have new version");
806            })
807            .await;
808    }
809
810    /// Test that empty responses are handled correctly
811    #[tokio::test]
812    async fn test_paginated_offset_empty_response_handling() {
813        crate::test::identify_parking_lot_deadlocks();
814        tokio::task::LocalSet::new()
815            .run_until(async move {
816                let (client, _guard, _owner) = prep_vari!(true);
817
818                let scope = QueryScope::new_paginated_with_offset(|_key: (), _page_size, offset| async move {
819                    match offset {
820                        0 => (vec![1, 2, 3], Some(3)),
821                        _ => (vec![], Some(3)),
822                    }
823                });
824
825                let (items, total) = client
826                    .fetch_query(
827                        scope.clone(),
828                        PaginatedPageKey {
829                            key: (),
830                            page_index: 0,
831                            page_size: 10,
832                        },
833                    )
834                    .await
835                    .expect("Page should exist");
836
837                // Should only have 3 items since total is 3
838                assert_eq!(items.len(), 3);
839                assert_eq!(total, Some(3));
840            })
841            .await;
842    }
843
844    /// Test concurrent page fetches don't cause duplicate API calls
845    #[tokio::test]
846    async fn test_paginated_offset_concurrent_fetches() {
847        crate::test::identify_parking_lot_deadlocks();
848        tokio::task::LocalSet::new()
849            .run_until(async move {
850                let (client, _guard, _owner) = prep_vari!(true);
851
852                let call_count = Arc::new(AtomicUsize::new(0));
853                let call_count_clone = call_count.clone();
854
855                let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
856                    let call_count = call_count_clone.clone();
857                    async move {
858                        call_count.fetch_add(1, Ordering::Relaxed);
859
860                        // Add a small delay to simulate network latency
861                        tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
862
863                        let offset_usize = offset as usize;
864                        let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
865                        (items, Some(100))
866                    }
867                });
868
869                // Launch multiple concurrent fetches for the same page
870                let futures = (0..5).map(|_| {
871                    client.fetch_query(
872                        scope.clone(),
873                        PaginatedPageKey {
874                            key: (),
875                            page_index: 0,
876                            page_size: 10,
877                        },
878                    )
879                });
880
881                let results = futures::future::join_all(futures).await;
882
883                // All should succeed
884                for result in results {
885                    assert!(result.is_some());
886                }
887
888                // Should only make 1 API call despite 5 concurrent requests
889                assert_eq!(
890                    call_count.load(Ordering::Relaxed),
891                    1,
892                    "Concurrent fetches should share single API call"
893                );
894            })
895            .await;
896    }
897
898    /// Test different page sizes work correctly with shared cache
899    #[tokio::test]
900    async fn test_paginated_offset_different_page_sizes() {
901        crate::test::identify_parking_lot_deadlocks();
902        tokio::task::LocalSet::new()
903            .run_until(async move {
904                let (client, _guard, _owner) = prep_vari!(true);
905
906                let call_count = Arc::new(AtomicUsize::new(0));
907                let call_count_clone = call_count.clone();
908
909                let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
910                    let call_count = call_count_clone.clone();
911                    async move {
912                        call_count.fetch_add(1, Ordering::Relaxed);
913
914                        let offset_usize = offset as usize;
915                        let items: Vec<usize> = (offset_usize..std::cmp::min(offset_usize + page_size, 50)).collect();
916                        (items, Some(50))
917                    }
918                });
919
920                // Fetch with page size 5
921                let (items1, _) = client
922                    .fetch_query(
923                        scope.clone(),
924                        PaginatedPageKey {
925                            key: (),
926                            page_index: 0,
927                            page_size: 5,
928                        },
929                    )
930                    .await
931                    .expect("Page should exist");
932                assert_eq!(items1.len(), 5);
933                assert_eq!(call_count.load(Ordering::Relaxed), 1);
934
935                // Fetch with page size 15 - should reuse cached data and fetch more
936                let (items2, _) = client
937                    .fetch_query(
938                        scope.clone(),
939                        PaginatedPageKey {
940                            key: (),
941                            page_index: 0,
942                            page_size: 15,
943                        },
944                    )
945                    .await
946                    .expect("Page should exist");
947                assert_eq!(items2.len(), 15);
948                assert_eq!(
949                    call_count.load(Ordering::Relaxed),
950                    2,
951                    "Should fetch more data for larger page"
952                );
953
954                // Fetch with page size 10 - should use cached data
955                let (items3, _) = client
956                    .fetch_query(
957                        scope.clone(),
958                        PaginatedPageKey {
959                            key: (),
960                            page_index: 0,
961                            page_size: 10,
962                        },
963                    )
964                    .await
965                    .expect("Page should exist");
966                assert_eq!(items3.len(), 10);
967                assert_eq!(
968                    call_count.load(Ordering::Relaxed),
969                    2,
970                    "Should use cached data, no new fetch"
971                );
972            })
973            .await;
974    }
975
976    /// Test that backing cache is properly managed when paginated pages expire
977    /// Tests both GC (garbage collection) and stale time scenarios
978    #[rstest]
979    #[case::gc_time(TestMode::GcTime)]
980    #[case::stale_time(TestMode::StaleTime)]
981    #[tokio::test]
982    async fn test_paginated_offset_backing_cache_lifecycle(#[case] mode: TestMode) {
983        crate::test::identify_parking_lot_deadlocks();
984        tokio::task::LocalSet::new()
985            .run_until(async move {
986                let (client, _guard, _owner) = prep_vari!(true);
987
988                let call_count = Arc::new(AtomicUsize::new(0));
989                let call_count_clone = call_count.clone();
990
991                let scope = QueryScope::new_paginated_with_offset(move |key: String, page_size, offset| {
992                    let call_count = call_count_clone.clone();
993                    async move {
994                        call_count.fetch_add(1, Ordering::Relaxed);
995
996                        let offset_usize = offset as usize;
997                        let items: Vec<String> = (offset_usize..offset_usize + page_size)
998                            .map(|i| format!("{}_{}", key, i))
999                            .collect();
1000                        (items, Some(30))
1001                    }
1002                })
1003                .with_options(match mode {
1004                    TestMode::GcTime => {
1005                        crate::QueryOptions::default().with_gc_time(std::time::Duration::from_millis(100))
1006                    }
1007                    TestMode::StaleTime => {
1008                        crate::QueryOptions::default().with_stale_time(std::time::Duration::from_millis(100))
1009                    }
1010                });
1011
1012                // Fetch first page
1013                let (items, _) = client
1014                    .fetch_query(
1015                        scope.clone(),
1016                        PaginatedPageKey {
1017                            key: "test".to_string(),
1018                            page_index: 0,
1019                            page_size: 10,
1020                        },
1021                    )
1022                    .await
1023                    .expect("Page should exist");
1024                assert_eq!(items[0], "test_0");
1025                assert_eq!(call_count.load(Ordering::Relaxed), 1);
1026
1027                // Wait for 50ms, 50ms left till gc/stale:
1028                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1029
1030                // Fetch second page
1031                let (items, _) = client
1032                    .fetch_query(
1033                        scope.clone(),
1034                        PaginatedPageKey {
1035                            key: "test".to_string(),
1036                            page_index: 1,
1037                            page_size: 10,
1038                        },
1039                    )
1040                    .await
1041                    .expect("Page should exist");
1042                assert_eq!(items[0], "test_10");
1043                assert_eq!(call_count.load(Ordering::Relaxed), 2);
1044
1045                // Wait another 50ms, so the first query is stale/gc'd, but the second
1046                // is valid even when gc'd because 50ms left:
1047                tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1048
1049                // Fetch first page again
1050                let (items, _) = client
1051                    .fetch_query(
1052                        scope.clone(),
1053                        PaginatedPageKey {
1054                            key: "test".to_string(),
1055                            page_index: 0,
1056                            page_size: 10,
1057                        },
1058                    )
1059                    .await
1060                    .expect("Page should exist");
1061                assert_eq!(items[0], "test_0");
1062
1063                // Fetch second page again
1064                let (items, _) = client
1065                    .fetch_query(
1066                        scope.clone(),
1067                        PaginatedPageKey {
1068                            key: "test".to_string(),
1069                            page_index: 1,
1070                            page_size: 10,
1071                        },
1072                    )
1073                    .await
1074                    .expect("Page should exist");
1075                assert_eq!(items[0], "test_10");
1076
1077                let expected_calls = match mode {
1078                    TestMode::GcTime => {
1079                        // GC cleared page 0, but page 1 kept backing cache alive
1080                        // So page 0 refetch reuses backing cache, plus page 1 still alive so also reused
1081                        2
1082                    }
1083                    TestMode::StaleTime => {
1084                        // Stale page triggers invalidation which clears backing cache
1085                        // So page 0 refetch needs new API call, but page 1 is still valid so reused
1086                        3
1087                    }
1088                };
1089                assert_eq!(call_count.load(Ordering::Relaxed), expected_calls);
1090
1091                // If gc, should clear the backing cache only when all pages are gc'd:
1092                if matches!(mode, TestMode::GcTime) {
1093                    tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1094
1095                    // Now fetch again - backing cache should be gone
1096                    let (items, _) = client
1097                        .fetch_query(
1098                            scope.clone(),
1099                            PaginatedPageKey {
1100                                key: "test".to_string(),
1101                                page_index: 0,
1102                                page_size: 10,
1103                            },
1104                        )
1105                        .await
1106                        .expect("Page should exist");
1107                    assert_eq!(items[0], "test_0");
1108                    assert_eq!(
1109                        call_count.load(Ordering::Relaxed),
1110                        3,
1111                        "Backing cache should be cleared when all pages are GC'd"
1112                    );
1113                }
1114            })
1115            .await;
1116    }
1117
1118    /// Test jumping to distant pages without loading intermediate pages (offset-specific feature)
1119    #[tokio::test]
1120    async fn test_paginated_offset_jump_to_distant_page() {
1121        crate::test::identify_parking_lot_deadlocks();
1122        tokio::task::LocalSet::new()
1123            .run_until(async move {
1124                let (client, _guard, _owner) = prep_vari!(true);
1125
1126                let call_count = Arc::new(AtomicUsize::new(0));
1127                let call_count_clone = call_count.clone();
1128
1129                let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
1130                    let call_count = call_count_clone.clone();
1131                    async move {
1132                        call_count.fetch_add(1, Ordering::Relaxed);
1133
1134                        let offset_usize = offset as usize;
1135                        let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
1136                        (items, Some(1000))
1137                    }
1138                });
1139
1140                // Jump directly to page 10 without loading pages 0-9
1141                let (items, total) = client
1142                    .fetch_query(
1143                        scope.clone(),
1144                        PaginatedPageKey {
1145                            key: (),
1146                            page_index: 10,
1147                            page_size: 10,
1148                        },
1149                    )
1150                    .await
1151                    .expect("Page should exist");
1152
1153                assert_eq!(items, (100..110).collect::<Vec<_>>());
1154                assert_eq!(total, Some(1000));
1155                assert_eq!(
1156                    call_count.load(Ordering::Relaxed),
1157                    1,
1158                    "Should only make 1 API call to jump to distant page"
1159                );
1160
1161                // Now jump back to page 0
1162                let (items, total) = client
1163                    .fetch_query(
1164                        scope.clone(),
1165                        PaginatedPageKey {
1166                            key: (),
1167                            page_index: 0,
1168                            page_size: 10,
1169                        },
1170                    )
1171                    .await
1172                    .expect("Page should exist");
1173
1174                assert_eq!(items, (0..10).collect::<Vec<_>>());
1175                assert_eq!(total, Some(1000));
1176                assert_eq!(
1177                    call_count.load(Ordering::Relaxed),
1178                    2,
1179                    "Should make 1 more API call to jump back"
1180                );
1181
1182                // Jump to page 50
1183                let (items, total) = client
1184                    .fetch_query(
1185                        scope.clone(),
1186                        PaginatedPageKey {
1187                            key: (),
1188                            page_index: 50,
1189                            page_size: 10,
1190                        },
1191                    )
1192                    .await
1193                    .expect("Page should exist");
1194
1195                assert_eq!(items, (500..510).collect::<Vec<_>>());
1196                assert_eq!(total, Some(1000));
1197                assert_eq!(
1198                    call_count.load(Ordering::Relaxed),
1199                    3,
1200                    "Should make 1 more API call for distant page 50"
1201                );
1202            })
1203            .await;
1204    }
1205
1206    /// Test total items calculation and page count (offset-specific feature)
1207    #[tokio::test]
1208    async fn test_paginated_offset_total_items_and_page_count() {
1209        crate::test::identify_parking_lot_deadlocks();
1210        tokio::task::LocalSet::new()
1211            .run_until(async move {
1212                let (client, _guard, _owner) = prep_vari!(true);
1213
1214                let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| async move {
1215                    let offset_usize = offset as usize;
1216                    let total_items = 47u64;
1217                    let items: Vec<usize> =
1218                        (offset_usize..std::cmp::min(offset_usize + page_size, total_items as usize)).collect();
1219                    (items, Some(total_items))
1220                });
1221
1222                // Fetch first page
1223                let (items, total) = client
1224                    .fetch_query(
1225                        scope.clone(),
1226                        PaginatedPageKey {
1227                            key: (),
1228                            page_index: 0,
1229                            page_size: 10,
1230                        },
1231                    )
1232                    .await
1233                    .expect("Page should exist");
1234
1235                assert_eq!(items.len(), 10);
1236                assert_eq!(total, Some(47));
1237
1238                // Calculate total pages: ceil(47 / 10) = 5
1239                let total_pages = total.map(|t| (t as f64 / 10.0).ceil() as usize);
1240                assert_eq!(total_pages, Some(5));
1241
1242                // Test page 4 (last page with partial data)
1243                let (items, total) = client
1244                    .fetch_query(
1245                        scope.clone(),
1246                        PaginatedPageKey {
1247                            key: (),
1248                            page_index: 4,
1249                            page_size: 10,
1250                        },
1251                    )
1252                    .await
1253                    .expect("Page should exist");
1254
1255                assert_eq!(items.len(), 7, "Last page should have 7 items (47 % 10)");
1256                assert_eq!(items, (40..47).collect::<Vec<_>>());
1257                assert_eq!(total, Some(47));
1258
1259                // Test beyond last page
1260                let result = client
1261                    .fetch_query(
1262                        scope.clone(),
1263                        PaginatedPageKey {
1264                            key: (),
1265                            page_index: 5,
1266                            page_size: 10,
1267                        },
1268                    )
1269                    .await;
1270
1271                assert!(result.is_none(), "Should return None for page beyond total");
1272            })
1273            .await;
1274    }
1275
1276    /// Test total items update when it changes
1277    #[tokio::test]
1278    async fn test_paginated_offset_total_items_update() {
1279        crate::test::identify_parking_lot_deadlocks();
1280        tokio::task::LocalSet::new()
1281            .run_until(async move {
1282                let (client, _guard, _owner) = prep_vari!(true);
1283
1284                let total = Arc::new(AtomicUsize::new(100));
1285                let total_clone = total.clone();
1286
1287                let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
1288                    let total = total_clone.clone();
1289                    async move {
1290                        let current_total = total.load(Ordering::Relaxed) as u64;
1291                        let offset_usize = offset as usize;
1292                        let items: Vec<usize> =
1293                            (offset_usize..std::cmp::min(offset_usize + page_size, current_total as usize)).collect();
1294                        (items, Some(current_total))
1295                    }
1296                });
1297
1298                // Fetch with initial total of 100
1299                let (items, returned_total) = client
1300                    .fetch_query(
1301                        scope.clone(),
1302                        PaginatedPageKey {
1303                            key: (),
1304                            page_index: 0,
1305                            page_size: 10,
1306                        },
1307                    )
1308                    .await
1309                    .expect("Page should exist");
1310
1311                assert_eq!(items, (0..10).collect::<Vec<_>>());
1312                assert_eq!(returned_total, Some(100));
1313
1314                // Update total to 50
1315                total.store(50, Ordering::Relaxed);
1316
1317                // Invalidate to get fresh data
1318                client.invalidate_query_scope(scope.clone());
1319
1320                // Fetch again - should get updated total
1321                let (items, returned_total) = client
1322                    .fetch_query(
1323                        scope.clone(),
1324                        PaginatedPageKey {
1325                            key: (),
1326                            page_index: 0,
1327                            page_size: 10,
1328                        },
1329                    )
1330                    .await
1331                    .expect("Page should exist");
1332
1333                assert_eq!(items, (0..10).collect::<Vec<_>>());
1334                assert_eq!(returned_total, Some(50));
1335            })
1336            .await;
1337    }
1338
1339    /// Test sparse item storage - items at non-contiguous offsets
1340    #[tokio::test]
1341    async fn test_paginated_offset_sparse_item_storage() {
1342        crate::test::identify_parking_lot_deadlocks();
1343        tokio::task::LocalSet::new()
1344            .run_until(async move {
1345                let (client, _guard, _owner) = prep_vari!(true);
1346
1347                let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| async move {
1348                    let offset_usize = offset as usize;
1349                    let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
1350                    (items, Some(1000))
1351                });
1352
1353                // Fetch page 0
1354                let _ = client
1355                    .fetch_query(
1356                        scope.clone(),
1357                        PaginatedPageKey {
1358                            key: (),
1359                            page_index: 0,
1360                            page_size: 10,
1361                        },
1362                    )
1363                    .await;
1364
1365                // Fetch page 10 (offset 100) - creates sparse storage
1366                let (items, _) = client
1367                    .fetch_query(
1368                        scope.clone(),
1369                        PaginatedPageKey {
1370                            key: (),
1371                            page_index: 10,
1372                            page_size: 10,
1373                        },
1374                    )
1375                    .await
1376                    .expect("Page should exist");
1377
1378                assert_eq!(items, (100..110).collect::<Vec<_>>());
1379
1380                // Fetch page 5 (offset 50) - fills in the middle
1381                let (items, _) = client
1382                    .fetch_query(
1383                        scope.clone(),
1384                        PaginatedPageKey {
1385                            key: (),
1386                            page_index: 5,
1387                            page_size: 10,
1388                        },
1389                    )
1390                    .await
1391                    .expect("Page should exist");
1392
1393                assert_eq!(items, (50..60).collect::<Vec<_>>());
1394
1395                // All three pages should still be accessible
1396                let (items, _) = client
1397                    .fetch_query(
1398                        scope.clone(),
1399                        PaginatedPageKey {
1400                            key: (),
1401                            page_index: 0,
1402                            page_size: 10,
1403                        },
1404                    )
1405                    .await
1406                    .expect("Page should exist");
1407                assert_eq!(items, (0..10).collect::<Vec<_>>());
1408            })
1409            .await;
1410    }
1411
1412    #[derive(Debug, Clone, Copy)]
1413    enum TestMode {
1414        /// Test GC behavior: backing cache should only be cleared when all pages are GC'd
1415        GcTime,
1416        /// Test stale time behavior: backing cache should be invalidated when any page goes stale
1417        StaleTime,
1418    }
1419}