Skip to main content

modkit_sdk/
pager.rs

1// Updated: 2026-04-07 by Constructor Tech
2//! Cursor-based pagination with Stream API
3//!
4//! This module provides a reusable cursor-based pager that converts a page-fetching function
5//! into a Stream of pages or items, hiding cursor management from SDK users.
6//!
7//! # Example
8//!
9//! ```rust,ignore
10//! use modkit_sdk::pager::{CursorPager, PagerError};
11//! use modkit_sdk::odata::{items_stream, pages_stream, QueryBuilder};
12//! use futures_util::StreamExt;
13//!
14//! // Stream of pages
15//! let pages = pages_stream(
16//!     QueryBuilder::<UserSchema>::new()
17//!         .filter(NAME.contains("john"))
18//!         .page_size(50),
19//!     |query| async move { client.list_users(query).await },
20//! );
21//!
22//! // Stream of items
23//! let items = items_stream(
24//!     QueryBuilder::<UserSchema>::new()
25//!         .filter(NAME.contains("john"))
26//!         .page_size(50),
27//!     |query| async move { client.list_users(query).await },
28//! );
29//!
30//! // Consume the stream
31//! while let Some(result) = items.next().await {
32//!     match result {
33//!         Ok(user) => println!("User: {:?}", user),
34//!         Err(PagerError::Fetch(e)) => eprintln!("Fetch error: {}", e),
35//!         Err(PagerError::InvalidCursor(c)) => eprintln!("Invalid cursor: {}", c),
36//!     }
37//! }
38//! ```
39
40use futures_core::Stream;
41use modkit_odata::{ODataQuery, Page};
42use pin_project_lite::pin_project;
43use std::collections::VecDeque;
44use std::fmt;
45use std::future::Future;
46use std::pin::Pin;
47use std::task::{Context, Poll};
48
49/// Error type for pagination operations.
50///
51/// This enum wraps both fetcher errors and cursor decoding failures,
52/// ensuring that invalid cursors are not silently ignored.
53#[derive(Debug)]
54pub enum PagerError<E> {
55    /// Error from the fetcher function.
56    Fetch(E),
57    /// Invalid cursor string that failed to decode.
58    InvalidCursor(String),
59}
60
61impl<E: fmt::Display> fmt::Display for PagerError<E> {
62    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63        match self {
64            Self::Fetch(e) => write!(f, "Fetch error: {e}"),
65            Self::InvalidCursor(cursor) => write!(f, "Invalid cursor: {cursor}"),
66        }
67    }
68}
69
70impl<E: std::error::Error + 'static> std::error::Error for PagerError<E> {
71    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
72        match self {
73            Self::Fetch(e) => Some(e),
74            Self::InvalidCursor(_) => None,
75        }
76    }
77}
78
79pin_project! {
80    /// A cursor-based pager that implements `Stream` for paginated items.
81    ///
82    /// This pager manages cursor state internally and fetches pages on-demand,
83    /// yielding individual items from the stream.
84    ///
85    /// # Type Parameters
86    ///
87    /// * `T` - The item type
88    /// * `E` - The error type
89    /// * `F` - The fetcher function type
90    /// * `Fut` - The future returned by the fetcher
91    pub struct CursorPager<T, E, F, Fut>
92    where
93        F: FnMut(ODataQuery) -> Fut,
94        Fut: Future<Output = Result<Page<T>, E>>,
95    {
96        base_query: ODataQuery,
97        next_cursor: Option<String>,
98        buffer: VecDeque<T>,
99        done: bool,
100        fetcher: F,
101        #[pin]
102        current_fetch: Option<Fut>,
103    }
104}
105
106impl<T, E, F, Fut> CursorPager<T, E, F, Fut>
107where
108    F: FnMut(ODataQuery) -> Fut,
109    Fut: Future<Output = Result<Page<T>, E>>,
110{
111    /// Create a new cursor pager with the given base query and fetcher function.
112    ///
113    /// # Arguments
114    ///
115    /// * `base_query` - The base `OData` query (without cursor)
116    /// * `fetcher` - Function that fetches a page given an `ODataQuery`
117    ///
118    /// # Example
119    ///
120    /// ```rust,ignore
121    /// let pager = CursorPager::new(query, |q| async move {
122    ///     client.list_users(q).await
123    /// });
124    /// ```
125    pub fn new(base_query: ODataQuery, fetcher: F) -> Self {
126        Self {
127            base_query,
128            next_cursor: None,
129            buffer: VecDeque::new(),
130            done: false,
131            fetcher,
132            current_fetch: None,
133        }
134    }
135}
136
137impl<T, E, F, Fut> Stream for CursorPager<T, E, F, Fut>
138where
139    F: FnMut(ODataQuery) -> Fut,
140    Fut: Future<Output = Result<Page<T>, E>>,
141{
142    type Item = Result<T, PagerError<E>>;
143
144    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
145        let mut this = self.project();
146
147        loop {
148            if let Some(item) = this.buffer.pop_front() {
149                return Poll::Ready(Some(Ok(item)));
150            }
151
152            if *this.done {
153                return Poll::Ready(None);
154            }
155
156            if let Some(fut) = this.current_fetch.as_mut().as_pin_mut() {
157                match fut.poll(cx) {
158                    Poll::Ready(Ok(page)) => {
159                        this.current_fetch.set(None);
160
161                        this.next_cursor.clone_from(&page.page_info.next_cursor);
162
163                        if this.next_cursor.is_none() {
164                            *this.done = true;
165                        }
166
167                        this.buffer.extend(page.items);
168
169                        continue;
170                    }
171                    Poll::Ready(Err(e)) => {
172                        this.current_fetch.set(None);
173                        *this.done = true;
174                        return Poll::Ready(Some(Err(PagerError::Fetch(e))));
175                    }
176                    Poll::Pending => return Poll::Pending,
177                }
178            }
179
180            // Allocation strategy: base_query cloned once per page fetch.
181            // Filter AST is built once in QueryBuilder and reused here.
182            let mut query = this.base_query.clone();
183            if let Some(cursor_str) = this.next_cursor.as_ref() {
184                if let Ok(cursor) = modkit_odata::CursorV1::decode(cursor_str) {
185                    query = query.with_cursor(cursor);
186                } else {
187                    *this.done = true;
188                    return Poll::Ready(Some(Err(PagerError::InvalidCursor(cursor_str.clone()))));
189                }
190            }
191
192            let fut = (this.fetcher)(query);
193            this.current_fetch.set(Some(fut));
194        }
195    }
196}
197
198pin_project! {
199    /// A cursor-based pager that implements `Stream` for pages.
200    ///
201    /// This pager yields entire pages instead of individual items.
202    ///
203    /// # Type Parameters
204    ///
205    /// * `T` - The item type
206    /// * `E` - The error type
207    /// * `F` - The fetcher function type
208    /// * `Fut` - The future returned by the fetcher
209    pub struct PagesPager<T, E, F, Fut>
210    where
211        F: FnMut(ODataQuery) -> Fut,
212        Fut: Future<Output = Result<Page<T>, E>>,
213    {
214        base_query: ODataQuery,
215        next_cursor: Option<String>,
216        done: bool,
217        fetcher: F,
218        #[pin]
219        current_fetch: Option<Fut>,
220    }
221}
222
223impl<T, E, F, Fut> PagesPager<T, E, F, Fut>
224where
225    F: FnMut(ODataQuery) -> Fut,
226    Fut: Future<Output = Result<Page<T>, E>>,
227{
228    /// Create a new pages pager with the given base query and fetcher function.
229    ///
230    /// # Arguments
231    ///
232    /// * `base_query` - The base `OData` query (without cursor)
233    /// * `fetcher` - Function that fetches a page given an `ODataQuery`
234    ///
235    /// # Example
236    ///
237    /// ```rust,ignore
238    /// let pager = PagesPager::new(query, |q| async move {
239    ///     client.list_users(q).await
240    /// });
241    /// ```
242    pub fn new(base_query: ODataQuery, fetcher: F) -> Self {
243        Self {
244            base_query,
245            next_cursor: None,
246            done: false,
247            fetcher,
248            current_fetch: None,
249        }
250    }
251}
252
253impl<T, E, F, Fut> Stream for PagesPager<T, E, F, Fut>
254where
255    F: FnMut(ODataQuery) -> Fut,
256    Fut: Future<Output = Result<Page<T>, E>>,
257{
258    type Item = Result<Page<T>, PagerError<E>>;
259
260    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261        let mut this = self.project();
262
263        loop {
264            if *this.done {
265                return Poll::Ready(None);
266            }
267
268            if let Some(fut) = this.current_fetch.as_mut().as_pin_mut() {
269                match fut.poll(cx) {
270                    Poll::Ready(Ok(page)) => {
271                        this.current_fetch.set(None);
272
273                        this.next_cursor.clone_from(&page.page_info.next_cursor);
274
275                        if this.next_cursor.is_none() {
276                            *this.done = true;
277                        }
278
279                        return Poll::Ready(Some(Ok(page)));
280                    }
281                    Poll::Ready(Err(e)) => {
282                        this.current_fetch.set(None);
283                        *this.done = true;
284                        return Poll::Ready(Some(Err(PagerError::Fetch(e))));
285                    }
286                    Poll::Pending => return Poll::Pending,
287                }
288            }
289
290            // Allocation strategy: base_query cloned once per page fetch.
291            // Filter AST is built once in QueryBuilder and reused here.
292            let mut query = this.base_query.clone();
293            if let Some(cursor_str) = this.next_cursor.as_ref() {
294                if let Ok(cursor) = modkit_odata::CursorV1::decode(cursor_str) {
295                    query = query.with_cursor(cursor);
296                } else {
297                    *this.done = true;
298                    return Poll::Ready(Some(Err(PagerError::InvalidCursor(cursor_str.clone()))));
299                }
300            }
301
302            let fut = (this.fetcher)(query);
303            this.current_fetch.set(Some(fut));
304
305            // Poll the newly-installed future immediately so it can register the current waker
306            // naturally, avoiding a manual `wake_by_ref()` and the associated spurious wakeup.
307        }
308    }
309}
310
311#[cfg(test)]
312#[path = "pager_tests.rs"]
313mod pager_tests;