1use std::collections::BTreeMap;
2use std::time::Duration;
3use std::{hash::Hash, sync::Arc};
4
5use leptos::prelude::*;
6
7use crate::{
8 PaginatedPageKey, QueryOptions, QueryScope, QueryScopeLocal, UntypedQueryClient, cache::OnScopeMissing,
9 debug_if_devtools_enabled::DebugIfDevtoolsEnabled, query_scope::ScopeCacheKey,
10};
11
12macro_rules! define {
13 ([$($impl_fut_generics:tt)*], [$($impl_fn_generics:tt)*], $name:ident, $fetch_fn:ident, $sname:literal) => {
14
15 impl<Key, PageItem> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>
16 where
17 Key: DebugIfDevtoolsEnabled + Clone + Hash + PartialEq + 'static $($impl_fn_generics)*,
18 PageItem: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
19 {
20 pub fn new_paginated_with_offset<Fut>(
70 getter: impl Fn(Key, usize, u64) -> Fut + 'static $($impl_fn_generics)*,
71 ) -> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>
72 where
73 Fut: Future<Output = (Vec<PageItem>, Option<u64>)> $($impl_fut_generics)*,
74 {
75 let getter = Arc::new(getter);
76 let backing_cache_scope = $name::new({
77 let getter = getter.clone();
78 move |key: KeyWithUnhashedItemCountAndOffsetRequested<Key>| {
79 let getter = getter.clone();
80 async move {
81 let (items, mut maybe_total_items) =
82 getter(key.key, key.item_count_requested, key.offset_requested)
83 .await;
84
85 if items.is_empty() {
87 if let Some(cur_total_items) = &mut maybe_total_items {
88 if *cur_total_items > key.offset_requested {
89 *cur_total_items = key.offset_requested;
90 }
91 } else {
92 maybe_total_items = Some(key.offset_requested);
93 }
94 }
95
96 BackingCache {
97 inner: Arc::new(BackingCacheInner {
98 items: SparseItems {
99 items: parking_lot::Mutex::new(
100 items
101 .into_iter()
102 .enumerate()
103 .map(|(idx, item)| (key.offset_requested + (idx as u64), item))
104 .collect(),
105 ),
106 },
107 maybe_total_items: parking_lot::Mutex::new(maybe_total_items),
108 update_lock: futures::lock::Mutex::new(()),
109 }),
110 }
111 }
112 }
113 })
114 .with_options(
116 QueryOptions::default()
117 .with_stale_time(Duration::MAX)
118 .with_gc_time(Duration::MAX)
119 .with_refetch_interval(Duration::MAX)
120 );
121
122 $name::new({
123 let backing_cache_scope = backing_cache_scope.clone();
124 move |page_key: PaginatedPageKey<Key>| {
125 let backing_cache_scope = backing_cache_scope.clone();
126 let getter = getter.clone();
127 async move {
128 let untyped_client = use_context::<UntypedQueryClient>()
129 .expect(
130 "leptos-fetch bug, UntypedQueryClient should always have been \
131 provided to the query context internally"
132 );
133 let scope_cache_key = use_context::<ScopeCacheKey>()
134 .expect(
135 "leptos-fetch bug, ScopeCacheKey itself should always have been \
136 provided to the query context internally"
137 );
138
139 if let Some(metadata) = untyped_client
143 .query_metadata::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>(
144 scope_cache_key,
145 &page_key,
146 )
147 && metadata.stale_or_invalidated
148 && let Some(backing_metadata) = untyped_client
149 .query_metadata::<KeyWithUnhashedItemCountAndOffsetRequested<Key>, BackingCache<PageItem>>(
150 backing_cache_scope.cache_key,
151 &KeyWithUnhashedItemCountAndOffsetRequested {
152 key: page_key.key.clone(),
153 item_count_requested: 0, offset_requested: 0, },
156 )
157 && backing_metadata.updated_at <= metadata.updated_at
158 {
159 untyped_client.invalidate_query(
160 &backing_cache_scope,
161 KeyWithUnhashedItemCountAndOffsetRequested {
162 key: page_key.key.clone(),
163 item_count_requested: 0, offset_requested: 0, },
166 );
167 }
168
169 let infinite_cache = untyped_client
170 .$fetch_fn(
171 backing_cache_scope,
172 KeyWithUnhashedItemCountAndOffsetRequested {
173 key: page_key.key.clone(),
174 item_count_requested: page_key.page_size,
175 offset_requested: (page_key.page_index as u64) * (page_key.page_size as u64)
176 },
177 )
178 .await;
179
180 let target_idx_start = (page_key.page_index as u64) * (page_key.page_size as u64);
181 let mut target_idx_end_exclusive =
182 ((page_key.page_index as u64) + 1) * (page_key.page_size as u64);
183 if let Some(maybe_total_items) = *infinite_cache.inner.maybe_total_items.lock() {
184 if target_idx_start >= maybe_total_items {
186 return None;
187 }
188 if target_idx_end_exclusive > maybe_total_items {
189 target_idx_end_exclusive = maybe_total_items;
190 }
191 }
192
193 if !infinite_cache.inner.items.check_range(target_idx_start, target_idx_end_exclusive) {
195 let mut _guard = infinite_cache.inner.update_lock.lock().await;
198 loop {
199 let next_offset = {
201 let items_lock = infinite_cache.inner.items.items.lock();
202 let mut offset = target_idx_start;
204 while offset < target_idx_end_exclusive {
205 if !items_lock.contains_key(&offset) {
206 break;
207 }
208 offset += 1;
209 }
210 offset
211 };
212
213 if next_offset >= target_idx_end_exclusive {
214 break;
216 }
217
218 let items_needed = (target_idx_end_exclusive - next_offset) as usize;
219 let (items, maybe_total_items) =
220 getter(page_key.key.clone(), items_needed, next_offset).await;
221 if !items.is_empty() {
222 infinite_cache.inner.items.extend(next_offset, items);
223 *infinite_cache.inner.maybe_total_items.lock() = maybe_total_items;
224 } else {
225 let mut guard = infinite_cache.inner.maybe_total_items.lock();
227 if let Some(cur_total_items) = &mut* guard {
228 if *cur_total_items > next_offset {
229 *cur_total_items = next_offset;
230 }
231 } else {
232 *guard = Some(next_offset);
233 }
234 break;
236 }
237 }
238 drop(_guard);
239 }
240
241 infinite_cache
242 .inner
243 .items
244 .get_range(
245 target_idx_start,
246 (target_idx_end_exclusive - target_idx_start) as usize,
247 )
248 .map(|items| {
249 (items, infinite_cache.inner.maybe_total_items.lock().clone())
250 })
251 }
252 }
253 })
254 .on_invalidation({
256 let backing_cache_scope = backing_cache_scope.clone();
257 move |key| {
258 let untyped_client = use_context::<UntypedQueryClient>()
259 .expect(
260 "leptos-fetch bug, UntypedQueryClient should always have been \
261 provided to the on_invalidation context internally"
262 );
263 untyped_client.invalidate_query(
264 &backing_cache_scope,
265 KeyWithUnhashedItemCountAndOffsetRequested {
266 key: key.key.clone(),
267 item_count_requested: 0, offset_requested: 0, },
270 );
271 }
272 })
273 .on_gc(move |key| {
275 let untyped_client = use_context::<UntypedQueryClient>()
276 .expect(
277 "leptos-fetch bug, UntypedQueryClient should always have been \
278 provided to the on_gc context internally"
279 );
280 let scope_cache_key = use_context::<ScopeCacheKey>()
281 .expect(
282 "leptos-fetch bug, ScopeCacheKey itself should always have been \
283 provided to the on_gc context internally"
284 );
285 let mut found_nb = 0;
286 untyped_client
287 .scope_lookup
288 .with_cached_scope_mut::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>, _, _>(
289 &mut untyped_client.scope_lookup.scopes_mut(),
290 scope_cache_key,
291 OnScopeMissing::Skip,
292 |_| {},
293 |maybe_scope, _| {
294 if let Some(scope) = maybe_scope {
295 for query_or_pending in scope.all_queries_mut_include_pending() {
296 if query_or_pending
297 .key()
298 .value_if_safe()
299 .map(|test_key| test_key.key == key.key)
300 .unwrap_or(false)
301 {
302 found_nb += 1;
303 }
304 }
305 }
306 },
307 );
308 if found_nb == 0 {
309 untyped_client.clear_query(
310 &backing_cache_scope,
311 KeyWithUnhashedItemCountAndOffsetRequested {
312 key: key.key.clone(),
313 item_count_requested: 0, offset_requested: 0, },
316 );
317 }
318 })
319 }
320 }
321 };
322}
323
324define! { [+ Send], [+ Send + Sync], QueryScope, fetch_query, "QueryScope" }
325define! { [], [], QueryScopeLocal, fetch_query_local, "QueryScopeLocal" }
326
327#[derive(Debug, Clone)]
328struct KeyWithUnhashedItemCountAndOffsetRequested<Key> {
329 key: Key,
330 item_count_requested: usize,
331 offset_requested: u64,
332}
333
334impl<Key: Hash> Hash for KeyWithUnhashedItemCountAndOffsetRequested<Key> {
335 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
336 self.key.hash(state);
337 }
338}
339
340#[derive(Debug, Clone)]
341struct BackingCache<Item> {
342 inner: Arc<BackingCacheInner<Item>>,
343}
344
345#[derive(Debug)]
346struct BackingCacheInner<Item> {
347 items: SparseItems<Item>,
348 maybe_total_items: parking_lot::Mutex<Option<u64>>,
349 update_lock: futures::lock::Mutex<()>,
350}
351
352#[derive(Debug)]
353struct SparseItems<Item> {
354 items: parking_lot::Mutex<BTreeMap<u64, Item>>,
355}
356
357impl<Item> SparseItems<Item> {
358 fn extend(&self, offset: u64, new_items: Vec<Item>) {
359 let mut items = self.items.lock();
360 for (idx, item) in new_items.into_iter().enumerate() {
361 items.insert(offset + (idx as u64), item);
362 }
363 }
364
365 fn get_range(&self, start_offset: u64, count: usize) -> Option<Vec<Item>>
367 where
368 Item: Clone,
369 {
370 let items = self.items.lock();
371
372 let mut result = Vec::with_capacity(count);
374 for offset in start_offset..(start_offset + (count as u64)) {
375 match items.get(&offset) {
376 Some(item) => result.push(item),
377 None => return None, }
379 }
380
381 Some(result.into_iter().cloned().collect())
382 }
383
384 fn check_range(&self, start_offset: u64, end_index_exclusive: u64) -> bool
385 where
386 Item: Clone,
387 {
388 let items = self.items.lock();
389 for offset in start_offset..end_index_exclusive {
390 if !items.contains_key(&offset) {
391 return false;
392 }
393 }
394 true
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use any_spawner::Executor;
401 use hydration_context::SsrSharedContext;
402 use leptos::prelude::*;
403 use rstest::*;
404 use std::sync::Arc;
405 use std::sync::atomic::{AtomicUsize, Ordering};
406
407 use crate::test::prep_vari;
408 use crate::{PaginatedPageKey, QueryClient, QueryScope};
409
410 #[tokio::test]
415 async fn test_paginated_serialization_works() {
416 crate::test::identify_parking_lot_deadlocks();
417 tokio::task::LocalSet::new()
418 .run_until(async move {
419 let (client, _guard, _owner) = prep_vari!(true);
420 let scope = QueryScope::new_paginated_with_offset(|_: (), _, _| async { (vec![()], Some(10)) });
421 client.resource(scope, || PaginatedPageKey {
422 key: (),
423 page_index: 0,
424 page_size: 10,
425 });
426 })
427 .await;
428 }
429
430 fn get_simple_api_fn(
431 num_rows: usize,
432 ) -> (
433 Arc<AtomicUsize>,
434 impl Fn(usize, u64) -> (Vec<usize>, Option<u64>) + Clone + 'static,
435 ) {
436 let call_count = Arc::new(AtomicUsize::new(0));
437
438 let api_fn = {
439 let call_count = call_count.clone();
440 move |target_return_count: usize, offset: u64| {
441 let call_count = call_count.clone();
442 call_count.fetch_add(1, Ordering::Relaxed);
443
444 let offset_usize = offset as usize;
445 let items = (0..num_rows)
446 .skip(offset_usize)
447 .take(target_return_count)
448 .collect::<Vec<_>>();
449 let total_items = num_rows as u64;
450 (items, Some(total_items))
451 }
452 };
453
454 (call_count, api_fn)
455 }
456
457 #[tokio::test]
458 async fn test_paginated_offset() {
459 crate::test::identify_parking_lot_deadlocks();
460 tokio::task::LocalSet::new()
461 .run_until(async move {
462 let (client, _guard, _owner) = prep_vari!(true);
463
464 let (_call_count, my_api_fn) = get_simple_api_fn(30);
465
466 let scope = QueryScope::new_paginated_with_offset(move |_query_key, page_size, offset| {
467 let my_api_fn = my_api_fn.clone();
468 async move {
469 let (items, total_items) = my_api_fn(page_size, offset);
470 (items, total_items)
471 }
472 });
473
474 let (first_page_logs, maybe_total) = client
475 .fetch_query(
476 scope.clone(),
477 PaginatedPageKey {
478 key: (),
479 page_index: 0,
480 page_size: 20,
481 },
482 )
483 .await
484 .expect(
485 "This page should exist (Some()), \
486 None when a page is requested beyond the end of the data.",
487 );
488
489 assert_eq!(first_page_logs, (0..20).collect::<Vec<_>>());
490 assert_eq!(maybe_total, Some(30));
491
492 let (second_page_logs, maybe_total) = client
493 .fetch_query(
494 scope.clone(),
495 PaginatedPageKey {
496 key: (),
497 page_index: 1,
498 page_size: 20,
499 },
500 )
501 .await
502 .expect(
503 "This page should exist (Some()), \
504 None when a page is requested beyond the end of the data.",
505 );
506
507 assert_eq!(second_page_logs, (20..30).collect::<Vec<_>>());
508 assert_eq!(maybe_total, Some(30));
509
510 assert!(
511 client
512 .fetch_query(
513 scope.clone(),
514 PaginatedPageKey {
515 key: (),
516 page_index: 2,
517 page_size: 20,
518 },
519 )
520 .await
521 .is_none()
522 );
523 })
524 .await;
525 }
526
527 #[tokio::test]
529 async fn test_paginated_offset_fills_page_size_with_multiple_calls() {
530 crate::test::identify_parking_lot_deadlocks();
531 tokio::task::LocalSet::new()
532 .run_until(async move {
533 let (client, _guard, _owner) = prep_vari!(true);
534
535 let call_count = Arc::new(AtomicUsize::new(0));
536 let call_count_clone = call_count.clone();
537
538 let scope = QueryScope::new_paginated_with_offset(move |_key: (), _page_size, offset| {
540 let call_count = call_count_clone.clone();
541 async move {
542 call_count.fetch_add(1, Ordering::Relaxed);
543
544 let items = match offset {
546 0 => vec![0, 1, 2],
547 3 => vec![3, 4, 5],
548 6 => vec![6, 7, 8],
549 9 => vec![9, 10],
550 _ => vec![],
551 };
552 (items, Some(11))
553 }
554 });
555
556 let (items, total) = client
558 .fetch_query(
559 scope.clone(),
560 PaginatedPageKey {
561 key: (),
562 page_index: 0,
563 page_size: 10,
564 },
565 )
566 .await
567 .expect("Page should exist");
568
569 assert_eq!(items.len(), 10, "Should have filled to page_size");
570 assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
571 assert_eq!(total, Some(11));
572 assert_eq!(
573 call_count.load(Ordering::Relaxed),
574 4,
575 "Should have made 4 API calls to fill page"
576 );
577
578 let (items, total) = client
580 .fetch_query(
581 scope.clone(),
582 PaginatedPageKey {
583 key: (),
584 page_index: 1,
585 page_size: 10,
586 },
587 )
588 .await
589 .expect("Page should exist");
590
591 assert_eq!(items.len(), 1, "Only 1 item left");
592 assert_eq!(items, vec![10]);
593 assert_eq!(total, Some(11));
594 assert_eq!(
595 call_count.load(Ordering::Relaxed),
596 4,
597 "Should not make additional calls - item 10 was already cached"
598 );
599 })
600 .await;
601 }
602
603 #[tokio::test]
605 async fn test_paginated_offset_no_unnecessary_api_calls() {
606 crate::test::identify_parking_lot_deadlocks();
607 tokio::task::LocalSet::new()
608 .run_until(async move {
609 let (client, _guard, _owner) = prep_vari!(true);
610
611 let call_count = Arc::new(AtomicUsize::new(0));
612 let call_count_clone = call_count.clone();
613
614 let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
615 let call_count = call_count_clone.clone();
616 async move {
617 call_count.fetch_add(1, Ordering::Relaxed);
618
619 let offset_usize = offset as usize;
620 let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
621 (items, Some(100))
622 }
623 });
624
625 let _ = client
627 .fetch_query(
628 scope.clone(),
629 PaginatedPageKey {
630 key: (),
631 page_index: 0,
632 page_size: 10,
633 },
634 )
635 .await;
636 assert_eq!(
637 call_count.load(Ordering::Relaxed),
638 1,
639 "First fetch should make 1 API call"
640 );
641
642 let _ = client
644 .fetch_query(
645 scope.clone(),
646 PaginatedPageKey {
647 key: (),
648 page_index: 0,
649 page_size: 10,
650 },
651 )
652 .await;
653 assert_eq!(call_count.load(Ordering::Relaxed), 1, "Should still be 1 call - cached");
654
655 let _ = client
657 .fetch_query(
658 scope.clone(),
659 PaginatedPageKey {
660 key: (),
661 page_index: 1,
662 page_size: 10,
663 },
664 )
665 .await;
666 assert_eq!(call_count.load(Ordering::Relaxed), 2, "Second page needs new API call");
667
668 let _ = client
670 .fetch_query(
671 scope.clone(),
672 PaginatedPageKey {
673 key: (),
674 page_index: 0,
675 page_size: 10,
676 },
677 )
678 .await;
679 assert_eq!(
680 call_count.load(Ordering::Relaxed),
681 2,
682 "Should still be 2 calls - first page cached"
683 );
684 })
685 .await;
686 }
687
688 #[rstest]
690 #[tokio::test]
691 async fn test_paginated_offset_linked_invalidation_and_clear(#[values(true, false)] clear: bool) {
692 crate::test::identify_parking_lot_deadlocks();
693 tokio::task::LocalSet::new()
694 .run_until(async move {
695 let (client, _guard, _owner) = prep_vari!(true);
696
697 let version = Arc::new(AtomicUsize::new(0));
698 let version_clone = version.clone();
699
700 let scope = QueryScope::new_paginated_with_offset(move |key: String, page_size, offset| {
701 let v = version_clone.clone();
702 async move {
703 let current_version = v.load(Ordering::Relaxed);
704 let offset_usize = offset as usize;
705
706 let items: Vec<String> = (offset_usize..offset_usize + page_size)
708 .map(|i| format!("{}_v{}_{}", key, current_version, i))
709 .collect();
710
711 (items, Some(30))
712 }
713 });
714
715 let (items1, _) = client
717 .fetch_query(
718 scope.clone(),
719 PaginatedPageKey {
720 key: "test".to_string(),
721 page_index: 0,
722 page_size: 10,
723 },
724 )
725 .await
726 .expect("Page should exist");
727
728 assert_eq!(items1[0], "test_v0_0");
729
730 let (items2, _) = client
732 .fetch_query(
733 scope.clone(),
734 PaginatedPageKey {
735 key: "test".to_string(),
736 page_index: 1,
737 page_size: 10,
738 },
739 )
740 .await
741 .expect("Page should exist");
742
743 assert_eq!(items2[0], "test_v0_10");
744
745 version.store(1, Ordering::Relaxed);
747
748 let (items1_new, _) = client
750 .fetch_query(
751 scope.clone(),
752 PaginatedPageKey {
753 key: "test".to_string(),
754 page_index: 0,
755 page_size: 10,
756 },
757 )
758 .await
759 .expect("Page should exist");
760
761 assert_eq!(
762 items1_new[0], "test_v0_0",
763 "Should still have old version before invalidation/clear"
764 );
765
766 if clear {
768 client.untyped_client.clear_query_scope(scope.clone());
770 } else {
771 client.invalidate_query_scope(scope.clone());
772 }
773
774 let (items1_new, _) = client
776 .fetch_query(
777 scope.clone(),
778 PaginatedPageKey {
779 key: "test".to_string(),
780 page_index: 0,
781 page_size: 10,
782 },
783 )
784 .await
785 .expect("Page should exist");
786
787 assert_eq!(
788 items1_new[0], "test_v1_0",
789 "Should have new version after invalidation/clear"
790 );
791
792 let (items2_new, _) = client
794 .fetch_query(
795 scope.clone(),
796 PaginatedPageKey {
797 key: "test".to_string(),
798 page_index: 1,
799 page_size: 10,
800 },
801 )
802 .await
803 .expect("Page should exist");
804
805 assert_eq!(items2_new[0], "test_v1_10", "Second page should also have new version");
806 })
807 .await;
808 }
809
810 #[tokio::test]
812 async fn test_paginated_offset_empty_response_handling() {
813 crate::test::identify_parking_lot_deadlocks();
814 tokio::task::LocalSet::new()
815 .run_until(async move {
816 let (client, _guard, _owner) = prep_vari!(true);
817
818 let scope = QueryScope::new_paginated_with_offset(|_key: (), _page_size, offset| async move {
819 match offset {
820 0 => (vec![1, 2, 3], Some(3)),
821 _ => (vec![], Some(3)),
822 }
823 });
824
825 let (items, total) = client
826 .fetch_query(
827 scope.clone(),
828 PaginatedPageKey {
829 key: (),
830 page_index: 0,
831 page_size: 10,
832 },
833 )
834 .await
835 .expect("Page should exist");
836
837 assert_eq!(items.len(), 3);
839 assert_eq!(total, Some(3));
840 })
841 .await;
842 }
843
844 #[tokio::test]
846 async fn test_paginated_offset_concurrent_fetches() {
847 crate::test::identify_parking_lot_deadlocks();
848 tokio::task::LocalSet::new()
849 .run_until(async move {
850 let (client, _guard, _owner) = prep_vari!(true);
851
852 let call_count = Arc::new(AtomicUsize::new(0));
853 let call_count_clone = call_count.clone();
854
855 let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
856 let call_count = call_count_clone.clone();
857 async move {
858 call_count.fetch_add(1, Ordering::Relaxed);
859
860 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
862
863 let offset_usize = offset as usize;
864 let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
865 (items, Some(100))
866 }
867 });
868
869 let futures = (0..5).map(|_| {
871 client.fetch_query(
872 scope.clone(),
873 PaginatedPageKey {
874 key: (),
875 page_index: 0,
876 page_size: 10,
877 },
878 )
879 });
880
881 let results = futures::future::join_all(futures).await;
882
883 for result in results {
885 assert!(result.is_some());
886 }
887
888 assert_eq!(
890 call_count.load(Ordering::Relaxed),
891 1,
892 "Concurrent fetches should share single API call"
893 );
894 })
895 .await;
896 }
897
898 #[tokio::test]
900 async fn test_paginated_offset_different_page_sizes() {
901 crate::test::identify_parking_lot_deadlocks();
902 tokio::task::LocalSet::new()
903 .run_until(async move {
904 let (client, _guard, _owner) = prep_vari!(true);
905
906 let call_count = Arc::new(AtomicUsize::new(0));
907 let call_count_clone = call_count.clone();
908
909 let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
910 let call_count = call_count_clone.clone();
911 async move {
912 call_count.fetch_add(1, Ordering::Relaxed);
913
914 let offset_usize = offset as usize;
915 let items: Vec<usize> = (offset_usize..std::cmp::min(offset_usize + page_size, 50)).collect();
916 (items, Some(50))
917 }
918 });
919
920 let (items1, _) = client
922 .fetch_query(
923 scope.clone(),
924 PaginatedPageKey {
925 key: (),
926 page_index: 0,
927 page_size: 5,
928 },
929 )
930 .await
931 .expect("Page should exist");
932 assert_eq!(items1.len(), 5);
933 assert_eq!(call_count.load(Ordering::Relaxed), 1);
934
935 let (items2, _) = client
937 .fetch_query(
938 scope.clone(),
939 PaginatedPageKey {
940 key: (),
941 page_index: 0,
942 page_size: 15,
943 },
944 )
945 .await
946 .expect("Page should exist");
947 assert_eq!(items2.len(), 15);
948 assert_eq!(
949 call_count.load(Ordering::Relaxed),
950 2,
951 "Should fetch more data for larger page"
952 );
953
954 let (items3, _) = client
956 .fetch_query(
957 scope.clone(),
958 PaginatedPageKey {
959 key: (),
960 page_index: 0,
961 page_size: 10,
962 },
963 )
964 .await
965 .expect("Page should exist");
966 assert_eq!(items3.len(), 10);
967 assert_eq!(
968 call_count.load(Ordering::Relaxed),
969 2,
970 "Should use cached data, no new fetch"
971 );
972 })
973 .await;
974 }
975
976 #[rstest]
979 #[case::gc_time(TestMode::GcTime)]
980 #[case::stale_time(TestMode::StaleTime)]
981 #[tokio::test]
982 async fn test_paginated_offset_backing_cache_lifecycle(#[case] mode: TestMode) {
983 crate::test::identify_parking_lot_deadlocks();
984 tokio::task::LocalSet::new()
985 .run_until(async move {
986 let (client, _guard, _owner) = prep_vari!(true);
987
988 let call_count = Arc::new(AtomicUsize::new(0));
989 let call_count_clone = call_count.clone();
990
991 let scope = QueryScope::new_paginated_with_offset(move |key: String, page_size, offset| {
992 let call_count = call_count_clone.clone();
993 async move {
994 call_count.fetch_add(1, Ordering::Relaxed);
995
996 let offset_usize = offset as usize;
997 let items: Vec<String> = (offset_usize..offset_usize + page_size)
998 .map(|i| format!("{}_{}", key, i))
999 .collect();
1000 (items, Some(30))
1001 }
1002 })
1003 .with_options(match mode {
1004 TestMode::GcTime => {
1005 crate::QueryOptions::default().with_gc_time(std::time::Duration::from_millis(100))
1006 }
1007 TestMode::StaleTime => {
1008 crate::QueryOptions::default().with_stale_time(std::time::Duration::from_millis(100))
1009 }
1010 });
1011
1012 let (items, _) = client
1014 .fetch_query(
1015 scope.clone(),
1016 PaginatedPageKey {
1017 key: "test".to_string(),
1018 page_index: 0,
1019 page_size: 10,
1020 },
1021 )
1022 .await
1023 .expect("Page should exist");
1024 assert_eq!(items[0], "test_0");
1025 assert_eq!(call_count.load(Ordering::Relaxed), 1);
1026
1027 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1029
1030 let (items, _) = client
1032 .fetch_query(
1033 scope.clone(),
1034 PaginatedPageKey {
1035 key: "test".to_string(),
1036 page_index: 1,
1037 page_size: 10,
1038 },
1039 )
1040 .await
1041 .expect("Page should exist");
1042 assert_eq!(items[0], "test_10");
1043 assert_eq!(call_count.load(Ordering::Relaxed), 2);
1044
1045 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1048
1049 let (items, _) = client
1051 .fetch_query(
1052 scope.clone(),
1053 PaginatedPageKey {
1054 key: "test".to_string(),
1055 page_index: 0,
1056 page_size: 10,
1057 },
1058 )
1059 .await
1060 .expect("Page should exist");
1061 assert_eq!(items[0], "test_0");
1062
1063 let (items, _) = client
1065 .fetch_query(
1066 scope.clone(),
1067 PaginatedPageKey {
1068 key: "test".to_string(),
1069 page_index: 1,
1070 page_size: 10,
1071 },
1072 )
1073 .await
1074 .expect("Page should exist");
1075 assert_eq!(items[0], "test_10");
1076
1077 let expected_calls = match mode {
1078 TestMode::GcTime => {
1079 2
1082 }
1083 TestMode::StaleTime => {
1084 3
1087 }
1088 };
1089 assert_eq!(call_count.load(Ordering::Relaxed), expected_calls);
1090
1091 if matches!(mode, TestMode::GcTime) {
1093 tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1094
1095 let (items, _) = client
1097 .fetch_query(
1098 scope.clone(),
1099 PaginatedPageKey {
1100 key: "test".to_string(),
1101 page_index: 0,
1102 page_size: 10,
1103 },
1104 )
1105 .await
1106 .expect("Page should exist");
1107 assert_eq!(items[0], "test_0");
1108 assert_eq!(
1109 call_count.load(Ordering::Relaxed),
1110 3,
1111 "Backing cache should be cleared when all pages are GC'd"
1112 );
1113 }
1114 })
1115 .await;
1116 }
1117
1118 #[tokio::test]
1120 async fn test_paginated_offset_jump_to_distant_page() {
1121 crate::test::identify_parking_lot_deadlocks();
1122 tokio::task::LocalSet::new()
1123 .run_until(async move {
1124 let (client, _guard, _owner) = prep_vari!(true);
1125
1126 let call_count = Arc::new(AtomicUsize::new(0));
1127 let call_count_clone = call_count.clone();
1128
1129 let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
1130 let call_count = call_count_clone.clone();
1131 async move {
1132 call_count.fetch_add(1, Ordering::Relaxed);
1133
1134 let offset_usize = offset as usize;
1135 let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
1136 (items, Some(1000))
1137 }
1138 });
1139
1140 let (items, total) = client
1142 .fetch_query(
1143 scope.clone(),
1144 PaginatedPageKey {
1145 key: (),
1146 page_index: 10,
1147 page_size: 10,
1148 },
1149 )
1150 .await
1151 .expect("Page should exist");
1152
1153 assert_eq!(items, (100..110).collect::<Vec<_>>());
1154 assert_eq!(total, Some(1000));
1155 assert_eq!(
1156 call_count.load(Ordering::Relaxed),
1157 1,
1158 "Should only make 1 API call to jump to distant page"
1159 );
1160
1161 let (items, total) = client
1163 .fetch_query(
1164 scope.clone(),
1165 PaginatedPageKey {
1166 key: (),
1167 page_index: 0,
1168 page_size: 10,
1169 },
1170 )
1171 .await
1172 .expect("Page should exist");
1173
1174 assert_eq!(items, (0..10).collect::<Vec<_>>());
1175 assert_eq!(total, Some(1000));
1176 assert_eq!(
1177 call_count.load(Ordering::Relaxed),
1178 2,
1179 "Should make 1 more API call to jump back"
1180 );
1181
1182 let (items, total) = client
1184 .fetch_query(
1185 scope.clone(),
1186 PaginatedPageKey {
1187 key: (),
1188 page_index: 50,
1189 page_size: 10,
1190 },
1191 )
1192 .await
1193 .expect("Page should exist");
1194
1195 assert_eq!(items, (500..510).collect::<Vec<_>>());
1196 assert_eq!(total, Some(1000));
1197 assert_eq!(
1198 call_count.load(Ordering::Relaxed),
1199 3,
1200 "Should make 1 more API call for distant page 50"
1201 );
1202 })
1203 .await;
1204 }
1205
1206 #[tokio::test]
1208 async fn test_paginated_offset_total_items_and_page_count() {
1209 crate::test::identify_parking_lot_deadlocks();
1210 tokio::task::LocalSet::new()
1211 .run_until(async move {
1212 let (client, _guard, _owner) = prep_vari!(true);
1213
1214 let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| async move {
1215 let offset_usize = offset as usize;
1216 let total_items = 47u64;
1217 let items: Vec<usize> =
1218 (offset_usize..std::cmp::min(offset_usize + page_size, total_items as usize)).collect();
1219 (items, Some(total_items))
1220 });
1221
1222 let (items, total) = client
1224 .fetch_query(
1225 scope.clone(),
1226 PaginatedPageKey {
1227 key: (),
1228 page_index: 0,
1229 page_size: 10,
1230 },
1231 )
1232 .await
1233 .expect("Page should exist");
1234
1235 assert_eq!(items.len(), 10);
1236 assert_eq!(total, Some(47));
1237
1238 let total_pages = total.map(|t| (t as f64 / 10.0).ceil() as usize);
1240 assert_eq!(total_pages, Some(5));
1241
1242 let (items, total) = client
1244 .fetch_query(
1245 scope.clone(),
1246 PaginatedPageKey {
1247 key: (),
1248 page_index: 4,
1249 page_size: 10,
1250 },
1251 )
1252 .await
1253 .expect("Page should exist");
1254
1255 assert_eq!(items.len(), 7, "Last page should have 7 items (47 % 10)");
1256 assert_eq!(items, (40..47).collect::<Vec<_>>());
1257 assert_eq!(total, Some(47));
1258
1259 let result = client
1261 .fetch_query(
1262 scope.clone(),
1263 PaginatedPageKey {
1264 key: (),
1265 page_index: 5,
1266 page_size: 10,
1267 },
1268 )
1269 .await;
1270
1271 assert!(result.is_none(), "Should return None for page beyond total");
1272 })
1273 .await;
1274 }
1275
1276 #[tokio::test]
1278 async fn test_paginated_offset_total_items_update() {
1279 crate::test::identify_parking_lot_deadlocks();
1280 tokio::task::LocalSet::new()
1281 .run_until(async move {
1282 let (client, _guard, _owner) = prep_vari!(true);
1283
1284 let total = Arc::new(AtomicUsize::new(100));
1285 let total_clone = total.clone();
1286
1287 let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
1288 let total = total_clone.clone();
1289 async move {
1290 let current_total = total.load(Ordering::Relaxed) as u64;
1291 let offset_usize = offset as usize;
1292 let items: Vec<usize> =
1293 (offset_usize..std::cmp::min(offset_usize + page_size, current_total as usize)).collect();
1294 (items, Some(current_total))
1295 }
1296 });
1297
1298 let (items, returned_total) = client
1300 .fetch_query(
1301 scope.clone(),
1302 PaginatedPageKey {
1303 key: (),
1304 page_index: 0,
1305 page_size: 10,
1306 },
1307 )
1308 .await
1309 .expect("Page should exist");
1310
1311 assert_eq!(items, (0..10).collect::<Vec<_>>());
1312 assert_eq!(returned_total, Some(100));
1313
1314 total.store(50, Ordering::Relaxed);
1316
1317 client.invalidate_query_scope(scope.clone());
1319
1320 let (items, returned_total) = client
1322 .fetch_query(
1323 scope.clone(),
1324 PaginatedPageKey {
1325 key: (),
1326 page_index: 0,
1327 page_size: 10,
1328 },
1329 )
1330 .await
1331 .expect("Page should exist");
1332
1333 assert_eq!(items, (0..10).collect::<Vec<_>>());
1334 assert_eq!(returned_total, Some(50));
1335 })
1336 .await;
1337 }
1338
1339 #[tokio::test]
1341 async fn test_paginated_offset_sparse_item_storage() {
1342 crate::test::identify_parking_lot_deadlocks();
1343 tokio::task::LocalSet::new()
1344 .run_until(async move {
1345 let (client, _guard, _owner) = prep_vari!(true);
1346
1347 let scope = QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| async move {
1348 let offset_usize = offset as usize;
1349 let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
1350 (items, Some(1000))
1351 });
1352
1353 let _ = client
1355 .fetch_query(
1356 scope.clone(),
1357 PaginatedPageKey {
1358 key: (),
1359 page_index: 0,
1360 page_size: 10,
1361 },
1362 )
1363 .await;
1364
1365 let (items, _) = client
1367 .fetch_query(
1368 scope.clone(),
1369 PaginatedPageKey {
1370 key: (),
1371 page_index: 10,
1372 page_size: 10,
1373 },
1374 )
1375 .await
1376 .expect("Page should exist");
1377
1378 assert_eq!(items, (100..110).collect::<Vec<_>>());
1379
1380 let (items, _) = client
1382 .fetch_query(
1383 scope.clone(),
1384 PaginatedPageKey {
1385 key: (),
1386 page_index: 5,
1387 page_size: 10,
1388 },
1389 )
1390 .await
1391 .expect("Page should exist");
1392
1393 assert_eq!(items, (50..60).collect::<Vec<_>>());
1394
1395 let (items, _) = client
1397 .fetch_query(
1398 scope.clone(),
1399 PaginatedPageKey {
1400 key: (),
1401 page_index: 0,
1402 page_size: 10,
1403 },
1404 )
1405 .await
1406 .expect("Page should exist");
1407 assert_eq!(items, (0..10).collect::<Vec<_>>());
1408 })
1409 .await;
1410 }
1411
1412 #[derive(Debug, Clone, Copy)]
1413 enum TestMode {
1414 GcTime,
1416 StaleTime,
1418 }
1419}