Skip to main content

agentic_tools_utils/
pagination.rs

1//! Generic two-level locking TTL-based pagination cache.
2//!
3//! This module provides a thread-safe pagination cache that can be used by
4//! MCP servers to implement implicit pagination - where repeated calls with
5//! the same parameters automatically advance through pages.
6//!
7//! # Architecture
8//!
9//! Uses two-level locking for thread safety:
10//! - Level 1: Brief lock on outer HashMap to get/create per-query state
11//! - Level 2: Per-query mutex protects shared QueryState access. Callers
12//!   typically lock only to read or update pagination state and may perform
13//!   expensive work outside the lock.
14//!
15//! This cache does not automatically coordinate in-flight work for the same
16//! key; concurrent same-key callers may do redundant fetching unless the
17//! caller adds its own coordination.
18//!
19//! # Example
20//!
21//! ```
22//! use agentic_tools_utils::pagination::{PaginationCache, paginate_slice};
23//!
24//! let cache: PaginationCache<i32> = PaginationCache::new();
25//! let lock = cache.get_or_create("my-query-key");
26//!
27//! let needs_fetch = {
28//!     let state = lock.lock_state();
29//!     state.is_empty() || state.is_expired()
30//! };
31//!
32//! if needs_fetch {
33//!     // Do expensive work outside the lock.
34//!     let fetched_entries = vec![1, 2, 3, 4, 5];
35//!     let page_size = 2;
36//!
37//!     let mut state = lock.lock_state();
38//!     if state.is_empty() || state.is_expired() {
39//!         state.reset(fetched_entries, (), page_size);
40//!     }
41//! }
42//!
43//! let (page, has_more) = {
44//!     let mut state = lock.lock_state();
45//!     let (page, has_more) =
46//!         paginate_slice(&state.results, state.next_offset, state.page_size);
47//!     state.next_offset += page.len();
48//!     (page, has_more)
49//! };
50//! ```
51
52use std::collections::HashMap;
53use std::sync::{Arc, Mutex};
54use std::time::{Duration, Instant};
55
56/// Default TTL for pagination state: 5 minutes.
57pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
58
59/// Two-level locking pagination cache generic over result T and optional meta M.
60///
61/// The meta type M allows storing additional per-query context alongside
62/// results, such as warnings or metadata from the original query.
63#[derive(Default)]
64pub struct PaginationCache<T, M = ()> {
65    map: Mutex<HashMap<String, Arc<QueryLock<T, M>>>>,
66}
67
68impl<T, M> PaginationCache<T, M> {
69    /// Create a new empty pagination cache.
70    pub fn new() -> Self {
71        Self {
72            map: Mutex::new(HashMap::new()),
73        }
74    }
75
76    // TODO(3): Consider returning Result or recovering from poison for better
77    // MCP server resilience.
78    #[expect(
79        clippy::unwrap_used,
80        reason = "Mutex poisoning indicates a prior panic. Fail fast for pagination cache map."
81    )]
82    fn lock_map(&self) -> std::sync::MutexGuard<'_, HashMap<String, Arc<QueryLock<T, M>>>> {
83        self.map.lock().unwrap()
84    }
85
86    /// Remove entry if it still points to the provided Arc.
87    ///
88    /// This is safe for concurrent access - only removes if the current
89    /// entry is the exact same Arc, preventing removal of a replaced entry.
90    pub fn remove_if_same(&self, key: &str, candidate: &Arc<QueryLock<T, M>>) {
91        let mut m = self.lock_map();
92        if let Some(existing) = m.get(key)
93            && Arc::ptr_eq(existing, candidate)
94        {
95            m.remove(key);
96        }
97    }
98}
99
100impl<T, M: Default> PaginationCache<T, M> {
101    /// Get or create the per-query lock for the given key.
102    ///
103    /// If a lock already exists for this key, returns a clone of its Arc.
104    /// Otherwise creates a new QueryLock and returns it.
105    pub fn get_or_create(&self, key: &str) -> Arc<QueryLock<T, M>> {
106        let mut m = self.lock_map();
107        m.entry(key.to_string())
108            .or_insert_with(|| Arc::new(QueryLock::new()))
109            .clone()
110    }
111
112    /// Opportunistic sweep: remove expired entries.
113    ///
114    /// Call this periodically to clean up stale cache entries.
115    /// Each expired entry is only removed if it hasn't been replaced.
116    pub fn sweep_expired(&self) {
117        let entries: Vec<(String, Arc<QueryLock<T, M>>)> = {
118            let m = self.lock_map();
119            m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
120        };
121
122        for (k, lk) in entries {
123            let expired = { lk.lock_state().is_expired() };
124            if expired {
125                let mut m = self.lock_map();
126                if let Some(existing) = m.get(&k)
127                    && Arc::ptr_eq(existing, &lk)
128                {
129                    m.remove(&k);
130                }
131            }
132        }
133    }
134}
135
136/// Per-query lock protecting the query state.
137pub struct QueryLock<T, M = ()> {
138    pub state: Mutex<QueryState<T, M>>,
139}
140
141impl<T, M> QueryLock<T, M> {
142    // TODO(3): Consider returning Result or recovering from poison for better
143    // MCP server resilience. Pagination state is reconstructible.
144    #[expect(
145        clippy::unwrap_used,
146        reason = "Mutex poisoning indicates a prior panic. Fail fast to avoid \
147                  inconsistent pagination state."
148    )]
149    pub fn lock_state(&self) -> std::sync::MutexGuard<'_, QueryState<T, M>> {
150        self.state.lock().unwrap()
151    }
152}
153
154impl<T, M: Default> QueryLock<T, M> {
155    /// Create a new QueryLock with empty state.
156    pub fn new() -> Self {
157        Self {
158            state: Mutex::new(QueryState::with_ttl(DEFAULT_TTL)),
159        }
160    }
161}
162
163impl<T, M: Default> Default for QueryLock<T, M> {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169/// State for a cached query including full results and pagination offset.
170pub struct QueryState<T, M = ()> {
171    /// Cached full results
172    pub results: Vec<T>,
173    /// Optional metadata (e.g., warnings)
174    pub meta: M,
175    /// Next page start offset
176    pub next_offset: usize,
177    /// Page size for this query
178    pub page_size: usize,
179    /// When results were (re)computed
180    pub created_at: Instant,
181    /// TTL for this state
182    ttl: Duration,
183}
184
185impl<T> QueryState<T, ()> {
186    /// Create empty state with default TTL and unit meta.
187    pub fn empty() -> Self {
188        Self {
189            results: Vec::new(),
190            meta: (),
191            next_offset: 0,
192            page_size: 0,
193            created_at: Instant::now(),
194            ttl: DEFAULT_TTL,
195        }
196    }
197}
198
199impl<T, M: Default> QueryState<T, M> {
200    /// Create empty state with custom TTL.
201    pub fn with_ttl(ttl: Duration) -> Self {
202        Self {
203            results: Vec::new(),
204            meta: M::default(),
205            next_offset: 0,
206            page_size: 0,
207            created_at: Instant::now(),
208            ttl,
209        }
210    }
211
212    /// Reset state with fresh results.
213    pub fn reset(&mut self, entries: Vec<T>, meta: M, page_size: usize) {
214        self.results = entries;
215        self.meta = meta;
216        self.next_offset = 0;
217        self.page_size = page_size;
218        self.created_at = Instant::now();
219    }
220
221    /// Check if this state has expired (beyond TTL).
222    pub fn is_expired(&self) -> bool {
223        self.created_at.elapsed() >= self.ttl
224    }
225
226    /// Check if state is empty (never populated).
227    pub fn is_empty(&self) -> bool {
228        self.results.is_empty() && self.page_size == 0
229    }
230}
231
232/// Paginate a slice without consuming it.
233///
234/// Returns (page_entries, has_more).
235///
236/// # Arguments
237/// * `entries` - The full list of entries to paginate
238/// * `offset` - Starting offset (0-based)
239/// * `page_size` - Maximum entries to return
240///
241/// # Returns
242/// A tuple of (paginated entries, whether more entries remain)
243pub fn paginate_slice<T: Clone>(entries: &[T], offset: usize, page_size: usize) -> (Vec<T>, bool) {
244    if offset >= entries.len() {
245        return (vec![], false);
246    }
247    let end = (offset + page_size).min(entries.len());
248    let has_more = end < entries.len();
249    (entries[offset..end].to_vec(), has_more)
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255
256    #[test]
257    fn paginate_slice_first_page() {
258        let items: Vec<i32> = (0..25).collect();
259        let (page, has_more) = paginate_slice(&items, 0, 10);
260        assert_eq!(page.len(), 10);
261        assert!(has_more);
262        assert_eq!(page[0], 0);
263        assert_eq!(page[9], 9);
264    }
265
266    #[test]
267    fn paginate_slice_second_page() {
268        let items: Vec<i32> = (0..25).collect();
269        let (page, has_more) = paginate_slice(&items, 10, 10);
270        assert_eq!(page.len(), 10);
271        assert!(has_more);
272        assert_eq!(page[0], 10);
273        assert_eq!(page[9], 19);
274    }
275
276    #[test]
277    fn paginate_slice_last_page() {
278        let items: Vec<i32> = (0..25).collect();
279        let (page, has_more) = paginate_slice(&items, 20, 10);
280        assert_eq!(page.len(), 5);
281        assert!(!has_more);
282        assert_eq!(page[0], 20);
283        assert_eq!(page[4], 24);
284    }
285
286    #[test]
287    fn paginate_slice_empty_at_end() {
288        let items: Vec<i32> = (0..10).collect();
289        let (page, has_more) = paginate_slice(&items, 10, 10);
290        assert!(page.is_empty());
291        assert!(!has_more);
292    }
293
294    #[test]
295    fn paginate_slice_empty_input() {
296        let items: Vec<i32> = vec![];
297        let (page, has_more) = paginate_slice(&items, 0, 10);
298        assert!(page.is_empty());
299        assert!(!has_more);
300    }
301
302    #[test]
303    fn query_state_empty_detection() {
304        let state: QueryState<i32> = QueryState::empty();
305        assert!(state.is_empty());
306        assert!(!state.is_expired());
307    }
308
309    #[test]
310    fn query_state_reset() {
311        let mut state: QueryState<i32> = QueryState::empty();
312        assert!(state.is_empty());
313
314        state.reset(vec![1, 2, 3], (), 10);
315        assert!(!state.is_empty());
316        assert_eq!(state.results.len(), 3);
317        assert_eq!(state.page_size, 10);
318        assert_eq!(state.next_offset, 0);
319    }
320
321    #[test]
322    fn query_state_with_meta() {
323        let mut state: QueryState<i32, Vec<String>> = QueryState::with_ttl(DEFAULT_TTL);
324        state.reset(vec![1, 2], vec!["warning".into()], 10);
325        assert_eq!(state.meta.len(), 1);
326        assert_eq!(state.meta[0], "warning");
327    }
328
329    #[test]
330    fn pagination_cache_get_or_create() {
331        let cache: PaginationCache<i32> = PaginationCache::new();
332
333        // First access creates new entry
334        let lock1 = cache.get_or_create("key1");
335
336        // Second access returns same Arc
337        let lock2 = cache.get_or_create("key1");
338        assert!(Arc::ptr_eq(&lock1, &lock2));
339
340        // Different key creates different entry
341        let lock3 = cache.get_or_create("key2");
342        assert!(!Arc::ptr_eq(&lock1, &lock3));
343    }
344
345    #[test]
346    fn pagination_cache_remove_if_same() {
347        let cache: PaginationCache<i32> = PaginationCache::new();
348
349        let lock1 = cache.get_or_create("key1");
350
351        // Remove with matching Arc should succeed
352        cache.remove_if_same("key1", &lock1);
353
354        // New get_or_create should return different Arc
355        let lock2 = cache.get_or_create("key1");
356        assert!(!Arc::ptr_eq(&lock1, &lock2));
357    }
358
359    #[test]
360    fn pagination_cache_remove_if_same_ignores_mismatch() {
361        let cache: PaginationCache<i32> = PaginationCache::new();
362
363        let lock1 = cache.get_or_create("key1");
364
365        // Create a different Arc
366        let different_lock = Arc::new(QueryLock::<i32>::new());
367
368        // Remove with non-matching Arc should not remove
369        cache.remove_if_same("key1", &different_lock);
370
371        // Original lock should still be there
372        let lock2 = cache.get_or_create("key1");
373        assert!(Arc::ptr_eq(&lock1, &lock2));
374    }
375
376    #[test]
377    fn sweep_expired_removes_expired_entries() {
378        let cache: PaginationCache<i32> = PaginationCache::new();
379
380        // Create an entry
381        let lock = cache.get_or_create("key1");
382
383        // Manually expire it by setting created_at to the past
384        {
385            let mut st = lock.state.lock().unwrap();
386            st.created_at = Instant::now() - Duration::from_secs(6 * 60);
387        }
388
389        // Sweep should remove expired entry
390        cache.sweep_expired();
391
392        // New get_or_create should return a different Arc
393        let lock2 = cache.get_or_create("key1");
394        assert!(!Arc::ptr_eq(&lock, &lock2));
395    }
396
397    #[test]
398    fn sweep_expired_keeps_fresh_entries() {
399        let cache: PaginationCache<i32> = PaginationCache::new();
400
401        // Create an entry (fresh by default)
402        let lock1 = cache.get_or_create("key1");
403
404        // Sweep should not remove fresh entries
405        cache.sweep_expired();
406
407        // Same Arc should still be there
408        let lock2 = cache.get_or_create("key1");
409        assert!(Arc::ptr_eq(&lock1, &lock2));
410    }
411}