1use std::collections::BTreeMap;
2use std::time::Duration;
3use std::{hash::Hash, sync::Arc};
4
5use leptos::prelude::*;
6
7use crate::{
8 PaginatedPageKey, QueryOptions, QueryScope, QueryScopeLocal, UntypedQueryClient,
9 cache::OnScopeMissing, debug_if_devtools_enabled::DebugIfDevtoolsEnabled,
10 query_scope::ScopeCacheKey,
11};
12
13macro_rules! define {
14 ([$($impl_fut_generics:tt)*], [$($impl_fn_generics:tt)*], $name:ident, $fetch_fn:ident, $sname:literal) => {
15
16 impl<Key, PageItem> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>
17 where
18 Key: DebugIfDevtoolsEnabled + Clone + Hash + PartialEq + 'static $($impl_fn_generics)*,
19 PageItem: DebugIfDevtoolsEnabled + Clone + 'static $($impl_fn_generics)*,
20 {
21 pub fn new_paginated_with_offset<Fut>(
71 getter: impl Fn(Key, usize, u64) -> Fut + 'static $($impl_fn_generics)*,
72 ) -> $name<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>
73 where
74 Fut: Future<Output = (Vec<PageItem>, Option<u64>)> $($impl_fut_generics)*,
75 {
76 let getter = Arc::new(getter);
77 let backing_cache_scope = $name::new({
78 let getter = getter.clone();
79 move |key: KeyWithUnhashedItemCountAndOffsetRequested<Key>| {
80 let getter = getter.clone();
81 async move {
82 let (items, mut maybe_total_items) = getter(key.key, key.item_count_requested, key.offset_requested).await;
83
84 if items.is_empty() {
86 if let Some(cur_total_items) = &mut maybe_total_items {
87 if *cur_total_items > key.offset_requested {
88 *cur_total_items = key.offset_requested;
89 }
90 } else {
91 maybe_total_items = Some(key.offset_requested);
92 }
93 }
94
95 BackingCache {
96 inner: Arc::new(BackingCacheInner {
97 items: SparseItems {
98 items: parking_lot::Mutex::new(
99 items
100 .into_iter()
101 .enumerate()
102 .map(|(idx, item)| (key.offset_requested + (idx as u64), item))
103 .collect(),
104 ),
105 },
106 maybe_total_items: parking_lot::Mutex::new(maybe_total_items),
107 update_lock: futures::lock::Mutex::new(()),
108 }),
109 }
110 }
111 }
112 })
113 .with_options(
115 QueryOptions::default()
116 .with_stale_time(Duration::MAX)
117 .with_gc_time(Duration::MAX)
118 .with_refetch_interval(Duration::MAX)
119 );
120
121 $name::new({
122 let backing_cache_scope = backing_cache_scope.clone();
123 move |page_key: PaginatedPageKey<Key>| {
124 let backing_cache_scope = backing_cache_scope.clone();
125 let getter = getter.clone();
126 async move {
127 let untyped_client = use_context::<UntypedQueryClient>()
128 .expect(
129 "leptos-fetch bug, UntypedQueryClient should always have been \
130 provided to the query context internally"
131 );
132 let scope_cache_key = use_context::<ScopeCacheKey>()
133 .expect(
134 "leptos-fetch bug, ScopeCacheKey itself should always have been \
135 provided to the query context internally"
136 );
137
138 if let Some(metadata) = untyped_client.query_metadata::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>>(
141 scope_cache_key,
142 &page_key,
143 )
144 && metadata.stale_or_invalidated
145 && let Some(backing_metadata) = untyped_client.query_metadata::<KeyWithUnhashedItemCountAndOffsetRequested<Key>, BackingCache<PageItem>>(
146 backing_cache_scope.cache_key,
147 &KeyWithUnhashedItemCountAndOffsetRequested {
148 key: page_key.key.clone(),
149 item_count_requested: 0, offset_requested: 0, },
152 )
153 && backing_metadata.updated_at <= metadata.updated_at
154 {
155 untyped_client.invalidate_query(
156 &backing_cache_scope,
157 KeyWithUnhashedItemCountAndOffsetRequested {
158 key: page_key.key.clone(),
159 item_count_requested: 0, offset_requested: 0, },
162 );
163 }
164
165 let infinite_cache = untyped_client
166 .$fetch_fn(
167 backing_cache_scope,
168 KeyWithUnhashedItemCountAndOffsetRequested {
169 key: page_key.key.clone(),
170 item_count_requested: page_key.page_size,
171 offset_requested: (page_key.page_index as u64) * (page_key.page_size as u64)
172 },
173 )
174 .await;
175
176 let target_idx_start = (page_key.page_index as u64) * (page_key.page_size as u64);
177 let mut target_idx_end_exclusive = ((page_key.page_index as u64) + 1) * (page_key.page_size as u64);
178 if let Some(maybe_total_items) = *infinite_cache.inner.maybe_total_items.lock() {
179 if target_idx_start >= maybe_total_items {
181 return None;
182 }
183 if target_idx_end_exclusive > maybe_total_items {
184 target_idx_end_exclusive = maybe_total_items;
185 }
186 }
187
188 if !infinite_cache.inner.items.check_range(target_idx_start, target_idx_end_exclusive) {
190 let mut _guard = infinite_cache.inner.update_lock.lock().await;
193 loop {
194 let next_offset = {
196 let items_lock = infinite_cache.inner.items.items.lock();
197 let mut offset = target_idx_start;
199 while offset < target_idx_end_exclusive {
200 if !items_lock.contains_key(&offset) {
201 break;
202 }
203 offset += 1;
204 }
205 offset
206 };
207
208 if next_offset >= target_idx_end_exclusive {
209 break;
211 }
212
213 let items_needed = (target_idx_end_exclusive - next_offset) as usize;
214 let (items, maybe_total_items) =
215 getter(page_key.key.clone(), items_needed, next_offset).await;
216 if !items.is_empty() {
217 infinite_cache.inner.items.extend(next_offset, items);
218 *infinite_cache.inner.maybe_total_items.lock() = maybe_total_items;
219 } else {
220 let mut guard = infinite_cache.inner.maybe_total_items.lock();
222 if let Some(cur_total_items) = &mut* guard {
223 if *cur_total_items > next_offset {
224 *cur_total_items = next_offset;
225 }
226 } else {
227 *guard = Some(next_offset);
228 }
229 break;
231 }
232 }
233 drop(_guard);
234 }
235
236 infinite_cache.inner.items.get_range(target_idx_start, (target_idx_end_exclusive - target_idx_start) as usize).map(|items| {
237 (items, infinite_cache.inner.maybe_total_items.lock().clone())
238 })
239 }
240 }
241 })
242 .on_invalidation({
244 let backing_cache_scope = backing_cache_scope.clone();
245 move |key| {
246 let untyped_client = use_context::<UntypedQueryClient>()
247 .expect(
248 "leptos-fetch bug, UntypedQueryClient should always have been \
249 provided to the on_invalidation context internally"
250 );
251 untyped_client.invalidate_query(
252 &backing_cache_scope,
253 KeyWithUnhashedItemCountAndOffsetRequested {
254 key: key.key.clone(),
255 item_count_requested: 0, offset_requested: 0, },
258 );
259 }
260 })
261 .on_gc(move |key| {
263 let untyped_client = use_context::<UntypedQueryClient>()
264 .expect(
265 "leptos-fetch bug, UntypedQueryClient should always have been \
266 provided to the on_gc context internally"
267 );
268 let scope_cache_key = use_context::<ScopeCacheKey>()
269 .expect(
270 "leptos-fetch bug, ScopeCacheKey itself should always have been \
271 provided to the on_gc context internally"
272 );
273 let mut found_nb = 0;
274 untyped_client
275 .scope_lookup
276 .with_cached_scope_mut::<PaginatedPageKey<Key>, Option<(Vec<PageItem>, Option<u64>)>, _, _>(
277 &mut untyped_client.scope_lookup.scopes_mut(),
278 scope_cache_key,
279 OnScopeMissing::Skip,
280 |_| {},
281 |maybe_scope, _| {
282 if let Some(scope) = maybe_scope {
283 for query_or_pending in scope.all_queries_mut_include_pending() {
284 if query_or_pending.key().value_if_safe().map(|test_key| test_key.key == key.key).unwrap_or(false) {
285 found_nb += 1;
286 }
287 }
288 }
289 },
290 );
291 if found_nb == 0 {
292 untyped_client.clear_query(
293 &backing_cache_scope,
294 KeyWithUnhashedItemCountAndOffsetRequested {
295 key: key.key.clone(),
296 item_count_requested: 0, offset_requested: 0, },
299 );
300 }
301 })
302 }
303 }
304 };
305}
306
307define! { [+ Send], [+ Send + Sync], QueryScope, fetch_query, "QueryScope" }
308define! { [], [], QueryScopeLocal, fetch_query_local, "QueryScopeLocal" }
309
310#[derive(Debug, Clone)]
311struct KeyWithUnhashedItemCountAndOffsetRequested<Key> {
312 key: Key,
313 item_count_requested: usize,
314 offset_requested: u64,
315}
316
317impl<Key: Hash> Hash for KeyWithUnhashedItemCountAndOffsetRequested<Key> {
318 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
319 self.key.hash(state);
320 }
321}
322
323#[derive(Debug, Clone)]
324struct BackingCache<Item> {
325 inner: Arc<BackingCacheInner<Item>>,
326}
327
328#[derive(Debug)]
329struct BackingCacheInner<Item> {
330 items: SparseItems<Item>,
331 maybe_total_items: parking_lot::Mutex<Option<u64>>,
332 update_lock: futures::lock::Mutex<()>,
333}
334
335#[derive(Debug)]
336struct SparseItems<Item> {
337 items: parking_lot::Mutex<BTreeMap<u64, Item>>,
338}
339
340impl<Item> SparseItems<Item> {
341 fn extend(&self, offset: u64, new_items: Vec<Item>) {
342 let mut items = self.items.lock();
343 for (idx, item) in new_items.into_iter().enumerate() {
344 items.insert(offset + (idx as u64), item);
345 }
346 }
347
348 fn get_range(&self, start_offset: u64, count: usize) -> Option<Vec<Item>>
350 where
351 Item: Clone,
352 {
353 let items = self.items.lock();
354
355 let mut result = Vec::with_capacity(count);
357 for offset in start_offset..(start_offset + (count as u64)) {
358 match items.get(&offset) {
359 Some(item) => result.push(item),
360 None => return None, }
362 }
363
364 Some(result.into_iter().cloned().collect())
365 }
366
367 fn check_range(&self, start_offset: u64, end_index_exclusive: u64) -> bool
368 where
369 Item: Clone,
370 {
371 let items = self.items.lock();
372 for offset in start_offset..end_index_exclusive {
373 if !items.contains_key(&offset) {
374 return false;
375 }
376 }
377 true
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use any_spawner::Executor;
384 use hydration_context::SsrSharedContext;
385 use leptos::prelude::*;
386 use rstest::*;
387 use std::sync::Arc;
388 use std::sync::atomic::{AtomicUsize, Ordering};
389
390 use crate::test::prep_vari;
391 use crate::{PaginatedPageKey, QueryClient, QueryScope};
392
393 #[tokio::test]
397 async fn test_paginated_serialization_works() {
398 crate::test::identify_parking_lot_deadlocks();
399 tokio::task::LocalSet::new()
400 .run_until(async move {
401 let (client, _guard, _owner) = prep_vari!(true);
402 let scope = QueryScope::new_paginated_with_offset(|_: (), _, _| async {
403 (vec![()], Some(10))
404 });
405 client.resource(scope, || PaginatedPageKey {
406 key: (),
407 page_index: 0,
408 page_size: 10,
409 });
410 })
411 .await;
412 }
413
414 fn get_simple_api_fn(
415 num_rows: usize,
416 ) -> (
417 Arc<AtomicUsize>,
418 impl Fn(usize, u64) -> (Vec<usize>, Option<u64>) + Clone + 'static,
419 ) {
420 let call_count = Arc::new(AtomicUsize::new(0));
421
422 let api_fn = {
423 let call_count = call_count.clone();
424 move |target_return_count: usize, offset: u64| {
425 let call_count = call_count.clone();
426 call_count.fetch_add(1, Ordering::Relaxed);
427
428 let offset_usize = offset as usize;
429 let items = (0..num_rows)
430 .skip(offset_usize)
431 .take(target_return_count)
432 .collect::<Vec<_>>();
433 let total_items = num_rows as u64;
434 (items, Some(total_items))
435 }
436 };
437
438 (call_count, api_fn)
439 }
440
441 #[tokio::test]
442 async fn test_paginated_offset() {
443 crate::test::identify_parking_lot_deadlocks();
444 tokio::task::LocalSet::new()
445 .run_until(async move {
446 let (client, _guard, _owner) = prep_vari!(true);
447
448 let (_call_count, my_api_fn) = get_simple_api_fn(30);
449
450 let scope =
451 QueryScope::new_paginated_with_offset(move |_query_key, page_size, offset| {
452 let my_api_fn = my_api_fn.clone();
453 async move {
454 let (items, total_items) = my_api_fn(page_size, offset);
455 (items, total_items)
456 }
457 });
458
459 let (first_page_logs, maybe_total) = client
460 .fetch_query(
461 scope.clone(),
462 PaginatedPageKey {
463 key: (),
464 page_index: 0,
465 page_size: 20,
466 },
467 )
468 .await
469 .expect(
470 "This page should exist (Some()), \
471 None when a page is requested beyond the end of the data.",
472 );
473
474 assert_eq!(first_page_logs, (0..20).collect::<Vec<_>>());
475 assert_eq!(maybe_total, Some(30));
476
477 let (second_page_logs, maybe_total) = client
478 .fetch_query(
479 scope.clone(),
480 PaginatedPageKey {
481 key: (),
482 page_index: 1,
483 page_size: 20,
484 },
485 )
486 .await
487 .expect(
488 "This page should exist (Some()), \
489 None when a page is requested beyond the end of the data.",
490 );
491
492 assert_eq!(second_page_logs, (20..30).collect::<Vec<_>>());
493 assert_eq!(maybe_total, Some(30));
494
495 assert!(
496 client
497 .fetch_query(
498 scope.clone(),
499 PaginatedPageKey {
500 key: (),
501 page_index: 2,
502 page_size: 20,
503 },
504 )
505 .await
506 .is_none()
507 );
508 })
509 .await;
510 }
511
512 #[tokio::test]
514 async fn test_paginated_offset_fills_page_size_with_multiple_calls() {
515 crate::test::identify_parking_lot_deadlocks();
516 tokio::task::LocalSet::new()
517 .run_until(async move {
518 let (client, _guard, _owner) = prep_vari!(true);
519
520 let call_count = Arc::new(AtomicUsize::new(0));
521 let call_count_clone = call_count.clone();
522
523 let scope =
525 QueryScope::new_paginated_with_offset(move |_key: (), _page_size, offset| {
526 let call_count = call_count_clone.clone();
527 async move {
528 call_count.fetch_add(1, Ordering::Relaxed);
529
530 let items = match offset {
532 0 => vec![0, 1, 2],
533 3 => vec![3, 4, 5],
534 6 => vec![6, 7, 8],
535 9 => vec![9, 10],
536 _ => vec![],
537 };
538 (items, Some(11))
539 }
540 });
541
542 let (items, total) = client
544 .fetch_query(
545 scope.clone(),
546 PaginatedPageKey {
547 key: (),
548 page_index: 0,
549 page_size: 10,
550 },
551 )
552 .await
553 .expect("Page should exist");
554
555 assert_eq!(items.len(), 10, "Should have filled to page_size");
556 assert_eq!(items, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
557 assert_eq!(total, Some(11));
558 assert_eq!(
559 call_count.load(Ordering::Relaxed),
560 4,
561 "Should have made 4 API calls to fill page"
562 );
563
564 let (items, total) = client
566 .fetch_query(
567 scope.clone(),
568 PaginatedPageKey {
569 key: (),
570 page_index: 1,
571 page_size: 10,
572 },
573 )
574 .await
575 .expect("Page should exist");
576
577 assert_eq!(items.len(), 1, "Only 1 item left");
578 assert_eq!(items, vec![10]);
579 assert_eq!(total, Some(11));
580 assert_eq!(
581 call_count.load(Ordering::Relaxed),
582 4,
583 "Should not make additional calls - item 10 was already cached"
584 );
585 })
586 .await;
587 }
588
589 #[tokio::test]
591 async fn test_paginated_offset_no_unnecessary_api_calls() {
592 crate::test::identify_parking_lot_deadlocks();
593 tokio::task::LocalSet::new()
594 .run_until(async move {
595 let (client, _guard, _owner) = prep_vari!(true);
596
597 let call_count = Arc::new(AtomicUsize::new(0));
598 let call_count_clone = call_count.clone();
599
600 let scope =
601 QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
602 let call_count = call_count_clone.clone();
603 async move {
604 call_count.fetch_add(1, Ordering::Relaxed);
605
606 let offset_usize = offset as usize;
607 let items: Vec<usize> =
608 (offset_usize..offset_usize + page_size).collect();
609 (items, Some(100))
610 }
611 });
612
613 let _ = client
615 .fetch_query(
616 scope.clone(),
617 PaginatedPageKey {
618 key: (),
619 page_index: 0,
620 page_size: 10,
621 },
622 )
623 .await;
624 assert_eq!(
625 call_count.load(Ordering::Relaxed),
626 1,
627 "First fetch should make 1 API call"
628 );
629
630 let _ = client
632 .fetch_query(
633 scope.clone(),
634 PaginatedPageKey {
635 key: (),
636 page_index: 0,
637 page_size: 10,
638 },
639 )
640 .await;
641 assert_eq!(
642 call_count.load(Ordering::Relaxed),
643 1,
644 "Should still be 1 call - cached"
645 );
646
647 let _ = client
649 .fetch_query(
650 scope.clone(),
651 PaginatedPageKey {
652 key: (),
653 page_index: 1,
654 page_size: 10,
655 },
656 )
657 .await;
658 assert_eq!(
659 call_count.load(Ordering::Relaxed),
660 2,
661 "Second page needs new API call"
662 );
663
664 let _ = client
666 .fetch_query(
667 scope.clone(),
668 PaginatedPageKey {
669 key: (),
670 page_index: 0,
671 page_size: 10,
672 },
673 )
674 .await;
675 assert_eq!(
676 call_count.load(Ordering::Relaxed),
677 2,
678 "Should still be 2 calls - first page cached"
679 );
680 })
681 .await;
682 }
683
684 #[rstest]
686 #[tokio::test]
687 async fn test_paginated_offset_linked_invalidation_and_clear(
688 #[values(true, false)] clear: bool,
689 ) {
690 crate::test::identify_parking_lot_deadlocks();
691 tokio::task::LocalSet::new()
692 .run_until(async move {
693 let (client, _guard, _owner) = prep_vari!(true);
694
695 let version = Arc::new(AtomicUsize::new(0));
696 let version_clone = version.clone();
697
698 let scope =
699 QueryScope::new_paginated_with_offset(move |key: String, page_size, offset| {
700 let v = version_clone.clone();
701 async move {
702 let current_version = v.load(Ordering::Relaxed);
703 let offset_usize = offset as usize;
704
705 let items: Vec<String> = (offset_usize..offset_usize + page_size)
707 .map(|i| format!("{}_v{}_{}", key, current_version, i))
708 .collect();
709
710 (items, Some(30))
711 }
712 });
713
714 let (items1, _) = client
716 .fetch_query(
717 scope.clone(),
718 PaginatedPageKey {
719 key: "test".to_string(),
720 page_index: 0,
721 page_size: 10,
722 },
723 )
724 .await
725 .expect("Page should exist");
726
727 assert_eq!(items1[0], "test_v0_0");
728
729 let (items2, _) = client
731 .fetch_query(
732 scope.clone(),
733 PaginatedPageKey {
734 key: "test".to_string(),
735 page_index: 1,
736 page_size: 10,
737 },
738 )
739 .await
740 .expect("Page should exist");
741
742 assert_eq!(items2[0], "test_v0_10");
743
744 version.store(1, Ordering::Relaxed);
746
747 let (items1_new, _) = client
749 .fetch_query(
750 scope.clone(),
751 PaginatedPageKey {
752 key: "test".to_string(),
753 page_index: 0,
754 page_size: 10,
755 },
756 )
757 .await
758 .expect("Page should exist");
759
760 assert_eq!(
761 items1_new[0], "test_v0_0",
762 "Should still have old version before invalidation/clear"
763 );
764
765 if clear {
767 client.untyped_client.clear_query_scope(scope.clone());
769 } else {
770 client.invalidate_query_scope(scope.clone());
771 }
772
773 let (items1_new, _) = client
775 .fetch_query(
776 scope.clone(),
777 PaginatedPageKey {
778 key: "test".to_string(),
779 page_index: 0,
780 page_size: 10,
781 },
782 )
783 .await
784 .expect("Page should exist");
785
786 assert_eq!(
787 items1_new[0], "test_v1_0",
788 "Should have new version after invalidation/clear"
789 );
790
791 let (items2_new, _) = client
793 .fetch_query(
794 scope.clone(),
795 PaginatedPageKey {
796 key: "test".to_string(),
797 page_index: 1,
798 page_size: 10,
799 },
800 )
801 .await
802 .expect("Page should exist");
803
804 assert_eq!(
805 items2_new[0], "test_v1_10",
806 "Second page should also have new version"
807 );
808 })
809 .await;
810 }
811
812 #[tokio::test]
814 async fn test_paginated_offset_empty_response_handling() {
815 crate::test::identify_parking_lot_deadlocks();
816 tokio::task::LocalSet::new()
817 .run_until(async move {
818 let (client, _guard, _owner) = prep_vari!(true);
819
820 let scope = QueryScope::new_paginated_with_offset(
821 |_key: (), _page_size, offset| async move {
822 match offset {
823 0 => (vec![1, 2, 3], Some(3)),
824 _ => (vec![], Some(3)),
825 }
826 },
827 );
828
829 let (items, total) = client
830 .fetch_query(
831 scope.clone(),
832 PaginatedPageKey {
833 key: (),
834 page_index: 0,
835 page_size: 10,
836 },
837 )
838 .await
839 .expect("Page should exist");
840
841 assert_eq!(items.len(), 3);
843 assert_eq!(total, Some(3));
844 })
845 .await;
846 }
847
848 #[tokio::test]
850 async fn test_paginated_offset_concurrent_fetches() {
851 crate::test::identify_parking_lot_deadlocks();
852 tokio::task::LocalSet::new()
853 .run_until(async move {
854 let (client, _guard, _owner) = prep_vari!(true);
855
856 let call_count = Arc::new(AtomicUsize::new(0));
857 let call_count_clone = call_count.clone();
858
859 let scope =
860 QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
861 let call_count = call_count_clone.clone();
862 async move {
863 call_count.fetch_add(1, Ordering::Relaxed);
864
865 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
867
868 let offset_usize = offset as usize;
869 let items: Vec<usize> =
870 (offset_usize..offset_usize + page_size).collect();
871 (items, Some(100))
872 }
873 });
874
875 let futures = (0..5).map(|_| {
877 client.fetch_query(
878 scope.clone(),
879 PaginatedPageKey {
880 key: (),
881 page_index: 0,
882 page_size: 10,
883 },
884 )
885 });
886
887 let results = futures::future::join_all(futures).await;
888
889 for result in results {
891 assert!(result.is_some());
892 }
893
894 assert_eq!(
896 call_count.load(Ordering::Relaxed),
897 1,
898 "Concurrent fetches should share single API call"
899 );
900 })
901 .await;
902 }
903
904 #[tokio::test]
906 async fn test_paginated_offset_different_page_sizes() {
907 crate::test::identify_parking_lot_deadlocks();
908 tokio::task::LocalSet::new()
909 .run_until(async move {
910 let (client, _guard, _owner) = prep_vari!(true);
911
912 let call_count = Arc::new(AtomicUsize::new(0));
913 let call_count_clone = call_count.clone();
914
915 let scope =
916 QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
917 let call_count = call_count_clone.clone();
918 async move {
919 call_count.fetch_add(1, Ordering::Relaxed);
920
921 let offset_usize = offset as usize;
922 let items: Vec<usize> = (offset_usize
923 ..std::cmp::min(offset_usize + page_size, 50))
924 .collect();
925 (items, Some(50))
926 }
927 });
928
929 let (items1, _) = client
931 .fetch_query(
932 scope.clone(),
933 PaginatedPageKey {
934 key: (),
935 page_index: 0,
936 page_size: 5,
937 },
938 )
939 .await
940 .expect("Page should exist");
941 assert_eq!(items1.len(), 5);
942 assert_eq!(call_count.load(Ordering::Relaxed), 1);
943
944 let (items2, _) = client
946 .fetch_query(
947 scope.clone(),
948 PaginatedPageKey {
949 key: (),
950 page_index: 0,
951 page_size: 15,
952 },
953 )
954 .await
955 .expect("Page should exist");
956 assert_eq!(items2.len(), 15);
957 assert_eq!(
958 call_count.load(Ordering::Relaxed),
959 2,
960 "Should fetch more data for larger page"
961 );
962
963 let (items3, _) = client
965 .fetch_query(
966 scope.clone(),
967 PaginatedPageKey {
968 key: (),
969 page_index: 0,
970 page_size: 10,
971 },
972 )
973 .await
974 .expect("Page should exist");
975 assert_eq!(items3.len(), 10);
976 assert_eq!(
977 call_count.load(Ordering::Relaxed),
978 2,
979 "Should use cached data, no new fetch"
980 );
981 })
982 .await;
983 }
984
985 #[rstest]
988 #[case::gc_time(TestMode::GcTime)]
989 #[case::stale_time(TestMode::StaleTime)]
990 #[tokio::test]
991 async fn test_paginated_offset_backing_cache_lifecycle(#[case] mode: TestMode) {
992 crate::test::identify_parking_lot_deadlocks();
993 tokio::task::LocalSet::new()
994 .run_until(async move {
995 let (client, _guard, _owner) = prep_vari!(true);
996
997 let call_count = Arc::new(AtomicUsize::new(0));
998 let call_count_clone = call_count.clone();
999
1000 let scope =
1001 QueryScope::new_paginated_with_offset(move |key: String, page_size, offset| {
1002 let call_count = call_count_clone.clone();
1003 async move {
1004 call_count.fetch_add(1, Ordering::Relaxed);
1005
1006 let offset_usize = offset as usize;
1007 let items: Vec<String> = (offset_usize..offset_usize + page_size)
1008 .map(|i| format!("{}_{}", key, i))
1009 .collect();
1010 (items, Some(30))
1011 }
1012 })
1013 .with_options(match mode {
1014 TestMode::GcTime => crate::QueryOptions::default()
1015 .with_gc_time(std::time::Duration::from_millis(100)),
1016 TestMode::StaleTime => crate::QueryOptions::default()
1017 .with_stale_time(std::time::Duration::from_millis(100)),
1018 });
1019
1020 let (items, _) = client
1022 .fetch_query(
1023 scope.clone(),
1024 PaginatedPageKey {
1025 key: "test".to_string(),
1026 page_index: 0,
1027 page_size: 10,
1028 },
1029 )
1030 .await
1031 .expect("Page should exist");
1032 assert_eq!(items[0], "test_0");
1033 assert_eq!(call_count.load(Ordering::Relaxed), 1);
1034
1035 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1037
1038 let (items, _) = client
1040 .fetch_query(
1041 scope.clone(),
1042 PaginatedPageKey {
1043 key: "test".to_string(),
1044 page_index: 1,
1045 page_size: 10,
1046 },
1047 )
1048 .await
1049 .expect("Page should exist");
1050 assert_eq!(items[0], "test_10");
1051 assert_eq!(call_count.load(Ordering::Relaxed), 2);
1052
1053 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1055
1056 let (items, _) = client
1058 .fetch_query(
1059 scope.clone(),
1060 PaginatedPageKey {
1061 key: "test".to_string(),
1062 page_index: 0,
1063 page_size: 10,
1064 },
1065 )
1066 .await
1067 .expect("Page should exist");
1068 assert_eq!(items[0], "test_0");
1069
1070 let (items, _) = client
1072 .fetch_query(
1073 scope.clone(),
1074 PaginatedPageKey {
1075 key: "test".to_string(),
1076 page_index: 1,
1077 page_size: 10,
1078 },
1079 )
1080 .await
1081 .expect("Page should exist");
1082 assert_eq!(items[0], "test_10");
1083
1084 let expected_calls = match mode {
1085 TestMode::GcTime => {
1086 2
1089 }
1090 TestMode::StaleTime => {
1091 3
1094 }
1095 };
1096 assert_eq!(call_count.load(Ordering::Relaxed), expected_calls);
1097
1098 if matches!(mode, TestMode::GcTime) {
1100 tokio::time::sleep(tokio::time::Duration::from_millis(150)).await;
1101
1102 let (items, _) = client
1104 .fetch_query(
1105 scope.clone(),
1106 PaginatedPageKey {
1107 key: "test".to_string(),
1108 page_index: 0,
1109 page_size: 10,
1110 },
1111 )
1112 .await
1113 .expect("Page should exist");
1114 assert_eq!(items[0], "test_0");
1115 assert_eq!(
1116 call_count.load(Ordering::Relaxed),
1117 3,
1118 "Backing cache should be cleared when all pages are GC'd"
1119 );
1120 }
1121 })
1122 .await;
1123 }
1124
1125 #[tokio::test]
1127 async fn test_paginated_offset_jump_to_distant_page() {
1128 crate::test::identify_parking_lot_deadlocks();
1129 tokio::task::LocalSet::new()
1130 .run_until(async move {
1131 let (client, _guard, _owner) = prep_vari!(true);
1132
1133 let call_count = Arc::new(AtomicUsize::new(0));
1134 let call_count_clone = call_count.clone();
1135
1136 let scope =
1137 QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
1138 let call_count = call_count_clone.clone();
1139 async move {
1140 call_count.fetch_add(1, Ordering::Relaxed);
1141
1142 let offset_usize = offset as usize;
1143 let items: Vec<usize> =
1144 (offset_usize..offset_usize + page_size).collect();
1145 (items, Some(1000))
1146 }
1147 });
1148
1149 let (items, total) = client
1151 .fetch_query(
1152 scope.clone(),
1153 PaginatedPageKey {
1154 key: (),
1155 page_index: 10,
1156 page_size: 10,
1157 },
1158 )
1159 .await
1160 .expect("Page should exist");
1161
1162 assert_eq!(items, (100..110).collect::<Vec<_>>());
1163 assert_eq!(total, Some(1000));
1164 assert_eq!(
1165 call_count.load(Ordering::Relaxed),
1166 1,
1167 "Should only make 1 API call to jump to distant page"
1168 );
1169
1170 let (items, total) = client
1172 .fetch_query(
1173 scope.clone(),
1174 PaginatedPageKey {
1175 key: (),
1176 page_index: 0,
1177 page_size: 10,
1178 },
1179 )
1180 .await
1181 .expect("Page should exist");
1182
1183 assert_eq!(items, (0..10).collect::<Vec<_>>());
1184 assert_eq!(total, Some(1000));
1185 assert_eq!(
1186 call_count.load(Ordering::Relaxed),
1187 2,
1188 "Should make 1 more API call to jump back"
1189 );
1190
1191 let (items, total) = client
1193 .fetch_query(
1194 scope.clone(),
1195 PaginatedPageKey {
1196 key: (),
1197 page_index: 50,
1198 page_size: 10,
1199 },
1200 )
1201 .await
1202 .expect("Page should exist");
1203
1204 assert_eq!(items, (500..510).collect::<Vec<_>>());
1205 assert_eq!(total, Some(1000));
1206 assert_eq!(
1207 call_count.load(Ordering::Relaxed),
1208 3,
1209 "Should make 1 more API call for distant page 50"
1210 );
1211 })
1212 .await;
1213 }
1214
1215 #[tokio::test]
1217 async fn test_paginated_offset_total_items_and_page_count() {
1218 crate::test::identify_parking_lot_deadlocks();
1219 tokio::task::LocalSet::new()
1220 .run_until(async move {
1221 let (client, _guard, _owner) = prep_vari!(true);
1222
1223 let scope = QueryScope::new_paginated_with_offset(
1224 move |_key: (), page_size, offset| async move {
1225 let offset_usize = offset as usize;
1226 let total_items = 47u64;
1227 let items: Vec<usize> = (offset_usize
1228 ..std::cmp::min(offset_usize + page_size, total_items as usize))
1229 .collect();
1230 (items, Some(total_items))
1231 },
1232 );
1233
1234 let (items, total) = client
1236 .fetch_query(
1237 scope.clone(),
1238 PaginatedPageKey {
1239 key: (),
1240 page_index: 0,
1241 page_size: 10,
1242 },
1243 )
1244 .await
1245 .expect("Page should exist");
1246
1247 assert_eq!(items.len(), 10);
1248 assert_eq!(total, Some(47));
1249
1250 let total_pages = total.map(|t| (t as f64 / 10.0).ceil() as usize);
1252 assert_eq!(total_pages, Some(5));
1253
1254 let (items, total) = client
1256 .fetch_query(
1257 scope.clone(),
1258 PaginatedPageKey {
1259 key: (),
1260 page_index: 4,
1261 page_size: 10,
1262 },
1263 )
1264 .await
1265 .expect("Page should exist");
1266
1267 assert_eq!(items.len(), 7, "Last page should have 7 items (47 % 10)");
1268 assert_eq!(items, (40..47).collect::<Vec<_>>());
1269 assert_eq!(total, Some(47));
1270
1271 let result = client
1273 .fetch_query(
1274 scope.clone(),
1275 PaginatedPageKey {
1276 key: (),
1277 page_index: 5,
1278 page_size: 10,
1279 },
1280 )
1281 .await;
1282
1283 assert!(result.is_none(), "Should return None for page beyond total");
1284 })
1285 .await;
1286 }
1287
1288 #[tokio::test]
1290 async fn test_paginated_offset_total_items_update() {
1291 crate::test::identify_parking_lot_deadlocks();
1292 tokio::task::LocalSet::new()
1293 .run_until(async move {
1294 let (client, _guard, _owner) = prep_vari!(true);
1295
1296 let total = Arc::new(AtomicUsize::new(100));
1297 let total_clone = total.clone();
1298
1299 let scope =
1300 QueryScope::new_paginated_with_offset(move |_key: (), page_size, offset| {
1301 let total = total_clone.clone();
1302 async move {
1303 let current_total = total.load(Ordering::Relaxed) as u64;
1304 let offset_usize = offset as usize;
1305 let items: Vec<usize> = (offset_usize
1306 ..std::cmp::min(offset_usize + page_size, current_total as usize))
1307 .collect();
1308 (items, Some(current_total))
1309 }
1310 });
1311
1312 let (items, returned_total) = client
1314 .fetch_query(
1315 scope.clone(),
1316 PaginatedPageKey {
1317 key: (),
1318 page_index: 0,
1319 page_size: 10,
1320 },
1321 )
1322 .await
1323 .expect("Page should exist");
1324
1325 assert_eq!(items, (0..10).collect::<Vec<_>>());
1326 assert_eq!(returned_total, Some(100));
1327
1328 total.store(50, Ordering::Relaxed);
1330
1331 client.invalidate_query_scope(scope.clone());
1333
1334 let (items, returned_total) = client
1336 .fetch_query(
1337 scope.clone(),
1338 PaginatedPageKey {
1339 key: (),
1340 page_index: 0,
1341 page_size: 10,
1342 },
1343 )
1344 .await
1345 .expect("Page should exist");
1346
1347 assert_eq!(items, (0..10).collect::<Vec<_>>());
1348 assert_eq!(returned_total, Some(50));
1349 })
1350 .await;
1351 }
1352
1353 #[tokio::test]
1355 async fn test_paginated_offset_sparse_item_storage() {
1356 crate::test::identify_parking_lot_deadlocks();
1357 tokio::task::LocalSet::new()
1358 .run_until(async move {
1359 let (client, _guard, _owner) = prep_vari!(true);
1360
1361 let scope = QueryScope::new_paginated_with_offset(
1362 move |_key: (), page_size, offset| async move {
1363 let offset_usize = offset as usize;
1364 let items: Vec<usize> = (offset_usize..offset_usize + page_size).collect();
1365 (items, Some(1000))
1366 },
1367 );
1368
1369 let _ = client
1371 .fetch_query(
1372 scope.clone(),
1373 PaginatedPageKey {
1374 key: (),
1375 page_index: 0,
1376 page_size: 10,
1377 },
1378 )
1379 .await;
1380
1381 let (items, _) = client
1383 .fetch_query(
1384 scope.clone(),
1385 PaginatedPageKey {
1386 key: (),
1387 page_index: 10,
1388 page_size: 10,
1389 },
1390 )
1391 .await
1392 .expect("Page should exist");
1393
1394 assert_eq!(items, (100..110).collect::<Vec<_>>());
1395
1396 let (items, _) = client
1398 .fetch_query(
1399 scope.clone(),
1400 PaginatedPageKey {
1401 key: (),
1402 page_index: 5,
1403 page_size: 10,
1404 },
1405 )
1406 .await
1407 .expect("Page should exist");
1408
1409 assert_eq!(items, (50..60).collect::<Vec<_>>());
1410
1411 let (items, _) = client
1413 .fetch_query(
1414 scope.clone(),
1415 PaginatedPageKey {
1416 key: (),
1417 page_index: 0,
1418 page_size: 10,
1419 },
1420 )
1421 .await
1422 .expect("Page should exist");
1423 assert_eq!(items, (0..10).collect::<Vec<_>>());
1424 })
1425 .await;
1426 }
1427
1428 #[derive(Debug, Clone, Copy)]
1429 enum TestMode {
1430 GcTime,
1432 StaleTime,
1434 }
1435}