Skip to main content

agentic_tools_utils/
pagination.rs

1//! Generic two-level locking TTL-based pagination cache.
2//!
3//! This module provides a thread-safe pagination cache that can be used by
4//! MCP servers to implement implicit pagination - where repeated calls with
5//! the same parameters automatically advance through pages.
6//!
7//! # Architecture
8//!
9//! Uses two-level locking for thread safety:
10//! - Level 1: Brief lock on outer HashMap to get/create per-query state
11//! - Level 2: Per-query lock held during work, serializing same-param calls
12//!
13//! # Example
14//!
15//! ```
16//! use agentic_tools_utils::pagination::{PaginationCache, paginate_slice};
17//!
18//! // Create a cache for your result type
19//! let cache: PaginationCache<i32> = PaginationCache::new();
20//!
21//! // Get or create a lock for a query
22//! let lock = cache.get_or_create("my-query-key");
23//!
24//! // Work with the query state
25//! {
26//!     let mut state = lock.state.lock().unwrap();
27//!     if state.is_empty() {
28//!         // Fetch results and populate state
29//!         state.reset(vec![1, 2, 3, 4, 5], (), 2);
30//!     }
31//! }
32//! ```
33
34use std::collections::HashMap;
35use std::sync::{Arc, Mutex};
36use std::time::{Duration, Instant};
37
38/// Default TTL for pagination state: 5 minutes.
39pub const DEFAULT_TTL: Duration = Duration::from_secs(5 * 60);
40
41/// Two-level locking pagination cache generic over result T and optional meta M.
42///
43/// The meta type M allows storing additional per-query context alongside
44/// results, such as warnings or metadata from the original query.
45#[derive(Default)]
46pub struct PaginationCache<T, M = ()> {
47    map: Mutex<HashMap<String, Arc<QueryLock<T, M>>>>,
48}
49
50impl<T, M> PaginationCache<T, M> {
51    /// Create a new empty pagination cache.
52    pub fn new() -> Self {
53        Self {
54            map: Mutex::new(HashMap::new()),
55        }
56    }
57
58    // TODO(3): Consider returning Result or recovering from poison for better
59    // MCP server resilience.
60    #[expect(
61        clippy::unwrap_used,
62        reason = "Mutex poisoning indicates a prior panic. Fail fast for pagination cache map."
63    )]
64    fn lock_map(&self) -> std::sync::MutexGuard<'_, HashMap<String, Arc<QueryLock<T, M>>>> {
65        self.map.lock().unwrap()
66    }
67
68    /// Remove entry if it still points to the provided Arc.
69    ///
70    /// This is safe for concurrent access - only removes if the current
71    /// entry is the exact same Arc, preventing removal of a replaced entry.
72    pub fn remove_if_same(&self, key: &str, candidate: &Arc<QueryLock<T, M>>) {
73        let mut m = self.lock_map();
74        if let Some(existing) = m.get(key)
75            && Arc::ptr_eq(existing, candidate)
76        {
77            m.remove(key);
78        }
79    }
80}
81
82impl<T, M: Default> PaginationCache<T, M> {
83    /// Get or create the per-query lock for the given key.
84    ///
85    /// If a lock already exists for this key, returns a clone of its Arc.
86    /// Otherwise creates a new QueryLock and returns it.
87    pub fn get_or_create(&self, key: &str) -> Arc<QueryLock<T, M>> {
88        let mut m = self.lock_map();
89        m.entry(key.to_string())
90            .or_insert_with(|| Arc::new(QueryLock::new()))
91            .clone()
92    }
93
94    /// Opportunistic sweep: remove expired entries.
95    ///
96    /// Call this periodically to clean up stale cache entries.
97    /// Each expired entry is only removed if it hasn't been replaced.
98    pub fn sweep_expired(&self) {
99        let entries: Vec<(String, Arc<QueryLock<T, M>>)> = {
100            let m = self.lock_map();
101            m.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
102        };
103
104        for (k, lk) in entries {
105            let expired = { lk.lock_state().is_expired() };
106            if expired {
107                let mut m = self.lock_map();
108                if let Some(existing) = m.get(&k)
109                    && Arc::ptr_eq(existing, &lk)
110                {
111                    m.remove(&k);
112                }
113            }
114        }
115    }
116}
117
118/// Per-query lock protecting the query state.
119pub struct QueryLock<T, M = ()> {
120    pub state: Mutex<QueryState<T, M>>,
121}
122
123impl<T, M> QueryLock<T, M> {
124    // TODO(3): Consider returning Result or recovering from poison for better
125    // MCP server resilience. Pagination state is reconstructible.
126    #[expect(
127        clippy::unwrap_used,
128        reason = "Mutex poisoning indicates a prior panic. Fail fast to avoid \
129                  inconsistent pagination state."
130    )]
131    pub fn lock_state(&self) -> std::sync::MutexGuard<'_, QueryState<T, M>> {
132        self.state.lock().unwrap()
133    }
134}
135
136impl<T, M: Default> QueryLock<T, M> {
137    /// Create a new QueryLock with empty state.
138    pub fn new() -> Self {
139        Self {
140            state: Mutex::new(QueryState::with_ttl(DEFAULT_TTL)),
141        }
142    }
143}
144
145impl<T, M: Default> Default for QueryLock<T, M> {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151/// State for a cached query including full results and pagination offset.
152pub struct QueryState<T, M = ()> {
153    /// Cached full results
154    pub results: Vec<T>,
155    /// Optional metadata (e.g., warnings)
156    pub meta: M,
157    /// Next page start offset
158    pub next_offset: usize,
159    /// Page size for this query
160    pub page_size: usize,
161    /// When results were (re)computed
162    pub created_at: Instant,
163    /// TTL for this state
164    ttl: Duration,
165}
166
167impl<T> QueryState<T, ()> {
168    /// Create empty state with default TTL and unit meta.
169    pub fn empty() -> Self {
170        Self {
171            results: Vec::new(),
172            meta: (),
173            next_offset: 0,
174            page_size: 0,
175            created_at: Instant::now(),
176            ttl: DEFAULT_TTL,
177        }
178    }
179}
180
181impl<T, M: Default> QueryState<T, M> {
182    /// Create empty state with custom TTL.
183    pub fn with_ttl(ttl: Duration) -> Self {
184        Self {
185            results: Vec::new(),
186            meta: M::default(),
187            next_offset: 0,
188            page_size: 0,
189            created_at: Instant::now(),
190            ttl,
191        }
192    }
193
194    /// Reset state with fresh results.
195    pub fn reset(&mut self, entries: Vec<T>, meta: M, page_size: usize) {
196        self.results = entries;
197        self.meta = meta;
198        self.next_offset = 0;
199        self.page_size = page_size;
200        self.created_at = Instant::now();
201    }
202
203    /// Check if this state has expired (beyond TTL).
204    pub fn is_expired(&self) -> bool {
205        self.created_at.elapsed() >= self.ttl
206    }
207
208    /// Check if state is empty (never populated).
209    pub fn is_empty(&self) -> bool {
210        self.results.is_empty() && self.page_size == 0
211    }
212}
213
214/// Paginate a slice without consuming it.
215///
216/// Returns (page_entries, has_more).
217///
218/// # Arguments
219/// * `entries` - The full list of entries to paginate
220/// * `offset` - Starting offset (0-based)
221/// * `page_size` - Maximum entries to return
222///
223/// # Returns
224/// A tuple of (paginated entries, whether more entries remain)
225pub fn paginate_slice<T: Clone>(entries: &[T], offset: usize, page_size: usize) -> (Vec<T>, bool) {
226    if offset >= entries.len() {
227        return (vec![], false);
228    }
229    let end = (offset + page_size).min(entries.len());
230    let has_more = end < entries.len();
231    (entries[offset..end].to_vec(), has_more)
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[test]
239    fn paginate_slice_first_page() {
240        let items: Vec<i32> = (0..25).collect();
241        let (page, has_more) = paginate_slice(&items, 0, 10);
242        assert_eq!(page.len(), 10);
243        assert!(has_more);
244        assert_eq!(page[0], 0);
245        assert_eq!(page[9], 9);
246    }
247
248    #[test]
249    fn paginate_slice_second_page() {
250        let items: Vec<i32> = (0..25).collect();
251        let (page, has_more) = paginate_slice(&items, 10, 10);
252        assert_eq!(page.len(), 10);
253        assert!(has_more);
254        assert_eq!(page[0], 10);
255        assert_eq!(page[9], 19);
256    }
257
258    #[test]
259    fn paginate_slice_last_page() {
260        let items: Vec<i32> = (0..25).collect();
261        let (page, has_more) = paginate_slice(&items, 20, 10);
262        assert_eq!(page.len(), 5);
263        assert!(!has_more);
264        assert_eq!(page[0], 20);
265        assert_eq!(page[4], 24);
266    }
267
268    #[test]
269    fn paginate_slice_empty_at_end() {
270        let items: Vec<i32> = (0..10).collect();
271        let (page, has_more) = paginate_slice(&items, 10, 10);
272        assert!(page.is_empty());
273        assert!(!has_more);
274    }
275
276    #[test]
277    fn paginate_slice_empty_input() {
278        let items: Vec<i32> = vec![];
279        let (page, has_more) = paginate_slice(&items, 0, 10);
280        assert!(page.is_empty());
281        assert!(!has_more);
282    }
283
284    #[test]
285    fn query_state_empty_detection() {
286        let state: QueryState<i32> = QueryState::empty();
287        assert!(state.is_empty());
288        assert!(!state.is_expired());
289    }
290
291    #[test]
292    fn query_state_reset() {
293        let mut state: QueryState<i32> = QueryState::empty();
294        assert!(state.is_empty());
295
296        state.reset(vec![1, 2, 3], (), 10);
297        assert!(!state.is_empty());
298        assert_eq!(state.results.len(), 3);
299        assert_eq!(state.page_size, 10);
300        assert_eq!(state.next_offset, 0);
301    }
302
303    #[test]
304    fn query_state_with_meta() {
305        let mut state: QueryState<i32, Vec<String>> = QueryState::with_ttl(DEFAULT_TTL);
306        state.reset(vec![1, 2], vec!["warning".into()], 10);
307        assert_eq!(state.meta.len(), 1);
308        assert_eq!(state.meta[0], "warning");
309    }
310
311    #[test]
312    fn pagination_cache_get_or_create() {
313        let cache: PaginationCache<i32> = PaginationCache::new();
314
315        // First access creates new entry
316        let lock1 = cache.get_or_create("key1");
317
318        // Second access returns same Arc
319        let lock2 = cache.get_or_create("key1");
320        assert!(Arc::ptr_eq(&lock1, &lock2));
321
322        // Different key creates different entry
323        let lock3 = cache.get_or_create("key2");
324        assert!(!Arc::ptr_eq(&lock1, &lock3));
325    }
326
327    #[test]
328    fn pagination_cache_remove_if_same() {
329        let cache: PaginationCache<i32> = PaginationCache::new();
330
331        let lock1 = cache.get_or_create("key1");
332
333        // Remove with matching Arc should succeed
334        cache.remove_if_same("key1", &lock1);
335
336        // New get_or_create should return different Arc
337        let lock2 = cache.get_or_create("key1");
338        assert!(!Arc::ptr_eq(&lock1, &lock2));
339    }
340
341    #[test]
342    fn pagination_cache_remove_if_same_ignores_mismatch() {
343        let cache: PaginationCache<i32> = PaginationCache::new();
344
345        let lock1 = cache.get_or_create("key1");
346
347        // Create a different Arc
348        let different_lock = Arc::new(QueryLock::<i32>::new());
349
350        // Remove with non-matching Arc should not remove
351        cache.remove_if_same("key1", &different_lock);
352
353        // Original lock should still be there
354        let lock2 = cache.get_or_create("key1");
355        assert!(Arc::ptr_eq(&lock1, &lock2));
356    }
357
358    #[test]
359    fn sweep_expired_removes_expired_entries() {
360        let cache: PaginationCache<i32> = PaginationCache::new();
361
362        // Create an entry
363        let lock = cache.get_or_create("key1");
364
365        // Manually expire it by setting created_at to the past
366        {
367            let mut st = lock.state.lock().unwrap();
368            st.created_at = Instant::now() - Duration::from_secs(6 * 60);
369        }
370
371        // Sweep should remove expired entry
372        cache.sweep_expired();
373
374        // New get_or_create should return a different Arc
375        let lock2 = cache.get_or_create("key1");
376        assert!(!Arc::ptr_eq(&lock, &lock2));
377    }
378
379    #[test]
380    fn sweep_expired_keeps_fresh_entries() {
381        let cache: PaginationCache<i32> = PaginationCache::new();
382
383        // Create an entry (fresh by default)
384        let lock1 = cache.get_or_create("key1");
385
386        // Sweep should not remove fresh entries
387        cache.sweep_expired();
388
389        // Same Arc should still be there
390        let lock2 = cache.get_or_create("key1");
391        assert!(Arc::ptr_eq(&lock1, &lock2));
392    }
393}