Skip to main content

agentic_tools_utils/
pagination.rs

1//! Generic two-level locking TTL-based pagination cache.
2//!
3//! This module provides a thread-safe pagination cache that can be used by
4//! MCP servers to implement implicit pagination - where repeated calls with
5//! the same parameters automatically advance through pages.
6//!
7//! # Architecture
8//!
9//! Uses two-level locking for thread safety:
10//! - Level 1: Brief lock on outer `HashMap` to get/create per-query state
11//! - Level 2: Per-query mutex protects shared `QueryState` access. Callers
12//!   typically lock only to read or update pagination state and may perform
13//!   expensive work outside the lock.
14//!
15//! This cache does not automatically coordinate in-flight work for the same
16//! key; concurrent same-key callers may do redundant fetching unless the
17//! caller adds its own coordination.
18//!
19//! # Example
20//!
21//! ```
22//! use agentic_tools_utils::pagination::{PaginationCache, paginate_slice};
23//!
24//! let cache: PaginationCache<i32> = PaginationCache::new();
25//! let lock = cache.get_or_create("my-query-key");
26//!
27//! let needs_fetch = {
28//!     let state = lock.lock_state();
29//!     state.is_empty() || state.is_expired()
30//! };
31//!
32//! if needs_fetch {
33//!     // Do expensive work outside the lock.
34//!     let fetched_entries = vec![1, 2, 3, 4, 5];
35//!     let page_size = 2;
36//!
37//!     let mut state = lock.lock_state();
38//!     if state.is_empty() || state.is_expired() {
39//!         state.reset(fetched_entries, (), page_size);
40//!     }
41//! }
42//!
43//! let (page, has_more) = {
44//!     let mut state = lock.lock_state();
45//!     let (page, has_more) =
46//!         paginate_slice(&state.results, state.next_offset, state.page_size);
47//!     state.next_offset += page.len();
48//!     (page, has_more)
49//! };
50//! ```
51
52use std::collections::HashMap;
53use std::sync::Arc;
54use std::sync::Mutex;
55use std::time::Duration;
56use std::time::Instant;
57
58/// Default TTL for pagination state: 5 minutes.
59pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
60
61/// Two-level locking pagination cache generic over result T and optional meta M.
62///
63/// The meta type M allows storing additional per-query context alongside
64/// results, such as warnings or metadata from the original query.
65#[derive(Default)]
66pub struct PaginationCache<T, M = ()> {
67    map: Mutex<HashMap<String, Arc<QueryLock<T, M>>>>,
68}
69
70impl<T, M> PaginationCache<T, M> {
71    /// Create a new empty pagination cache.
72    pub fn new() -> Self {
73        Self {
74            map: Mutex::new(HashMap::new()),
75        }
76    }
77
78    // TODO(3): Consider returning Result or recovering from poison for better
79    // MCP server resilience.
80    #[expect(
81        clippy::unwrap_used,
82        reason = "Mutex poisoning indicates a prior panic. Fail fast for pagination cache map."
83    )]
84    fn lock_map(&self) -> std::sync::MutexGuard<'_, HashMap<String, Arc<QueryLock<T, M>>>> {
85        self.map.lock().unwrap()
86    }
87
88    /// Remove entry if it still points to the provided Arc.
89    ///
90    /// This is safe for concurrent access - only removes if the current
91    /// entry is the exact same Arc, preventing removal of a replaced entry.
92    pub fn remove_if_same(&self, key: &str, candidate: &Arc<QueryLock<T, M>>) {
93        let mut m = self.lock_map();
94        if let Some(existing) = m.get(key)
95            && Arc::ptr_eq(existing, candidate)
96        {
97            m.remove(key);
98        }
99    }
100}
101
102impl<T, M: Default> PaginationCache<T, M> {
103    /// Get or create the per-query lock for the given key.
104    ///
105    /// If a lock already exists for this key, returns a clone of its Arc.
106    /// Otherwise creates a new `QueryLock` and returns it.
107    pub fn get_or_create(&self, key: &str) -> Arc<QueryLock<T, M>> {
108        let mut m = self.lock_map();
109        let arc = m
110            .entry(key.to_string())
111            .or_insert_with(|| Arc::new(QueryLock::new()));
112        Arc::clone(arc)
113    }
114
115    /// Opportunistic sweep: remove expired entries.
116    ///
117    /// Call this periodically to clean up stale cache entries.
118    /// Each expired entry is only removed if it hasn't been replaced.
119    pub fn sweep_expired(&self) {
120        let entries: Vec<(String, Arc<QueryLock<T, M>>)> = {
121            let m = self.lock_map();
122            m.iter().map(|(k, v)| (k.clone(), Arc::clone(v))).collect()
123        };
124
125        for (k, lk) in entries {
126            let expired = { lk.lock_state().is_expired() };
127            if expired {
128                let mut m = self.lock_map();
129                if let Some(existing) = m.get(&k)
130                    && Arc::ptr_eq(existing, &lk)
131                {
132                    m.remove(&k);
133                }
134            }
135        }
136    }
137}
138
139/// Per-query lock protecting the query state.
140pub struct QueryLock<T, M = ()> {
141    pub state: Mutex<QueryState<T, M>>,
142}
143
144impl<T, M> QueryLock<T, M> {
145    // TODO(3): Consider returning Result or recovering from poison for better
146    // MCP server resilience. Pagination state is reconstructible.
147    #[expect(
148        clippy::unwrap_used,
149        reason = "Mutex poisoning indicates a prior panic. Fail fast to avoid \
150                  inconsistent pagination state."
151    )]
152    pub fn lock_state(&self) -> std::sync::MutexGuard<'_, QueryState<T, M>> {
153        self.state.lock().unwrap()
154    }
155}
156
157impl<T, M: Default> QueryLock<T, M> {
158    /// Create a new `QueryLock` with empty state.
159    pub fn new() -> Self {
160        Self {
161            state: Mutex::new(QueryState::with_ttl(DEFAULT_TTL)),
162        }
163    }
164}
165
166impl<T, M: Default> Default for QueryLock<T, M> {
167    fn default() -> Self {
168        Self::new()
169    }
170}
171
172/// State for a cached query including full results and pagination offset.
173pub struct QueryState<T, M = ()> {
174    /// Cached full results
175    pub results: Vec<T>,
176    /// Optional metadata (e.g., warnings)
177    pub meta: M,
178    /// Next page start offset
179    pub next_offset: usize,
180    /// Page size for this query
181    pub page_size: usize,
182    /// When results were (re)computed
183    pub created_at: Instant,
184    /// TTL for this state
185    ttl: Duration,
186}
187
188impl<T> QueryState<T, ()> {
189    /// Create empty state with default TTL and unit meta.
190    pub fn empty() -> Self {
191        Self {
192            results: Vec::new(),
193            meta: (),
194            next_offset: 0,
195            page_size: 0,
196            created_at: Instant::now(),
197            ttl: DEFAULT_TTL,
198        }
199    }
200}
201
202impl<T, M: Default> QueryState<T, M> {
203    /// Create empty state with custom TTL.
204    pub fn with_ttl(ttl: Duration) -> Self {
205        Self {
206            results: Vec::new(),
207            meta: M::default(),
208            next_offset: 0,
209            page_size: 0,
210            created_at: Instant::now(),
211            ttl,
212        }
213    }
214
215    /// Reset state with fresh results.
216    pub fn reset(&mut self, entries: Vec<T>, meta: M, page_size: usize) {
217        self.results = entries;
218        self.meta = meta;
219        self.next_offset = 0;
220        self.page_size = page_size;
221        self.created_at = Instant::now();
222    }
223
224    /// Check if this state has expired (beyond TTL).
225    pub fn is_expired(&self) -> bool {
226        self.created_at.elapsed() >= self.ttl
227    }
228
229    /// Check if state is empty (never populated).
230    pub fn is_empty(&self) -> bool {
231        self.results.is_empty() && self.page_size == 0
232    }
233}
234
235/// Paginate a slice without consuming it.
236///
237/// Returns (`page_entries`, `has_more`).
238///
239/// # Arguments
240/// * `entries` - The full list of entries to paginate
241/// * `offset` - Starting offset (0-based)
242/// * `page_size` - Maximum entries to return
243///
244/// # Returns
245/// A tuple of (paginated entries, whether more entries remain)
246pub fn paginate_slice<T: Clone>(entries: &[T], offset: usize, page_size: usize) -> (Vec<T>, bool) {
247    if offset >= entries.len() {
248        return (vec![], false);
249    }
250    let end = (offset + page_size).min(entries.len());
251    let has_more = end < entries.len();
252    (entries[offset..end].to_vec(), has_more)
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn paginate_slice_first_page() {
261        let items: Vec<i32> = (0..25).collect();
262        let (page, has_more) = paginate_slice(&items, 0, 10);
263        assert_eq!(page.len(), 10);
264        assert!(has_more);
265        assert_eq!(page[0], 0);
266        assert_eq!(page[9], 9);
267    }
268
269    #[test]
270    fn paginate_slice_second_page() {
271        let items: Vec<i32> = (0..25).collect();
272        let (page, has_more) = paginate_slice(&items, 10, 10);
273        assert_eq!(page.len(), 10);
274        assert!(has_more);
275        assert_eq!(page[0], 10);
276        assert_eq!(page[9], 19);
277    }
278
279    #[test]
280    fn paginate_slice_last_page() {
281        let items: Vec<i32> = (0..25).collect();
282        let (page, has_more) = paginate_slice(&items, 20, 10);
283        assert_eq!(page.len(), 5);
284        assert!(!has_more);
285        assert_eq!(page[0], 20);
286        assert_eq!(page[4], 24);
287    }
288
289    #[test]
290    fn paginate_slice_empty_at_end() {
291        let items: Vec<i32> = (0..10).collect();
292        let (page, has_more) = paginate_slice(&items, 10, 10);
293        assert!(page.is_empty());
294        assert!(!has_more);
295    }
296
297    #[test]
298    fn paginate_slice_empty_input() {
299        let items: Vec<i32> = vec![];
300        let (page, has_more) = paginate_slice(&items, 0, 10);
301        assert!(page.is_empty());
302        assert!(!has_more);
303    }
304
305    #[test]
306    fn query_state_empty_detection() {
307        let state: QueryState<i32> = QueryState::empty();
308        assert!(state.is_empty());
309        assert!(!state.is_expired());
310    }
311
312    #[test]
313    fn query_state_reset() {
314        let mut state: QueryState<i32> = QueryState::empty();
315        assert!(state.is_empty());
316
317        state.reset(vec![1, 2, 3], (), 10);
318        assert!(!state.is_empty());
319        assert_eq!(state.results.len(), 3);
320        assert_eq!(state.page_size, 10);
321        assert_eq!(state.next_offset, 0);
322    }
323
324    #[test]
325    fn query_state_with_meta() {
326        let mut state: QueryState<i32, Vec<String>> = QueryState::with_ttl(DEFAULT_TTL);
327        state.reset(vec![1, 2], vec!["warning".into()], 10);
328        assert_eq!(state.meta.len(), 1);
329        assert_eq!(state.meta[0], "warning");
330    }
331
332    #[test]
333    fn pagination_cache_get_or_create() {
334        let cache: PaginationCache<i32> = PaginationCache::new();
335
336        // First access creates new entry
337        let lock1 = cache.get_or_create("key1");
338
339        // Second access returns same Arc
340        let lock2 = cache.get_or_create("key1");
341        assert!(Arc::ptr_eq(&lock1, &lock2));
342
343        // Different key creates different entry
344        let lock3 = cache.get_or_create("key2");
345        assert!(!Arc::ptr_eq(&lock1, &lock3));
346    }
347
348    #[test]
349    fn pagination_cache_remove_if_same() {
350        let cache: PaginationCache<i32> = PaginationCache::new();
351
352        let lock1 = cache.get_or_create("key1");
353
354        // Remove with matching Arc should succeed
355        cache.remove_if_same("key1", &lock1);
356
357        // New get_or_create should return different Arc
358        let lock2 = cache.get_or_create("key1");
359        assert!(!Arc::ptr_eq(&lock1, &lock2));
360    }
361
362    #[test]
363    fn pagination_cache_remove_if_same_ignores_mismatch() {
364        let cache: PaginationCache<i32> = PaginationCache::new();
365
366        let lock1 = cache.get_or_create("key1");
367
368        // Create a different Arc
369        let different_lock = Arc::new(QueryLock::<i32>::new());
370
371        // Remove with non-matching Arc should not remove
372        cache.remove_if_same("key1", &different_lock);
373
374        // Original lock should still be there
375        let lock2 = cache.get_or_create("key1");
376        assert!(Arc::ptr_eq(&lock1, &lock2));
377    }
378
379    #[test]
380    fn sweep_expired_removes_expired_entries() {
381        let cache: PaginationCache<i32> = PaginationCache::new();
382
383        // Create an entry
384        let lock = cache.get_or_create("key1");
385
386        // Manually expire it by setting created_at to the past
387        {
388            let mut st = lock.state.lock().unwrap();
389            st.created_at = Instant::now()
390                .checked_sub(Duration::from_secs(6 * 60))
391                .unwrap();
392        }
393
394        // Sweep should remove expired entry
395        cache.sweep_expired();
396
397        // New get_or_create should return a different Arc
398        let lock2 = cache.get_or_create("key1");
399        assert!(!Arc::ptr_eq(&lock, &lock2));
400    }
401
402    #[test]
403    fn sweep_expired_keeps_fresh_entries() {
404        let cache: PaginationCache<i32> = PaginationCache::new();
405
406        // Create an entry (fresh by default)
407        let lock1 = cache.get_or_create("key1");
408
409        // Sweep should not remove fresh entries
410        cache.sweep_expired();
411
412        // Same Arc should still be there
413        let lock2 = cache.get_or_create("key1");
414        assert!(Arc::ptr_eq(&lock1, &lock2));
415    }
416}