modkit_sdk/pager.rs
1// Updated: 2026-04-07 by Constructor Tech
2//! Cursor-based pagination with Stream API
3//!
4//! This module provides a reusable cursor-based pager that converts a page-fetching function
5//! into a Stream of pages or items, hiding cursor management from SDK users.
6//!
7//! # Example
8//!
9//! ```rust,ignore
10//! use modkit_sdk::pager::{CursorPager, PagerError};
11//! use modkit_sdk::odata::{items_stream, pages_stream, QueryBuilder};
12//! use futures_util::StreamExt;
13//!
14//! // Stream of pages
15//! let pages = pages_stream(
16//! QueryBuilder::<UserSchema>::new()
17//! .filter(NAME.contains("john"))
18//! .page_size(50),
19//! |query| async move { client.list_users(query).await },
20//! );
21//!
22//! // Stream of items
23//! let items = items_stream(
24//! QueryBuilder::<UserSchema>::new()
25//! .filter(NAME.contains("john"))
26//! .page_size(50),
27//! |query| async move { client.list_users(query).await },
28//! );
29//!
30//! // Consume the stream
31//! while let Some(result) = items.next().await {
32//! match result {
33//! Ok(user) => println!("User: {:?}", user),
34//! Err(PagerError::Fetch(e)) => eprintln!("Fetch error: {}", e),
35//! Err(PagerError::InvalidCursor(c)) => eprintln!("Invalid cursor: {}", c),
36//! }
37//! }
38//! ```
39
40use futures_core::Stream;
41use modkit_odata::{ODataQuery, Page};
42use pin_project_lite::pin_project;
43use std::collections::VecDeque;
44use std::fmt;
45use std::future::Future;
46use std::pin::Pin;
47use std::task::{Context, Poll};
48
49/// Error type for pagination operations.
50///
51/// This enum wraps both fetcher errors and cursor decoding failures,
52/// ensuring that invalid cursors are not silently ignored.
53#[derive(Debug)]
54pub enum PagerError<E> {
55 /// Error from the fetcher function.
56 Fetch(E),
57 /// Invalid cursor string that failed to decode.
58 InvalidCursor(String),
59}
60
61impl<E: fmt::Display> fmt::Display for PagerError<E> {
62 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
63 match self {
64 Self::Fetch(e) => write!(f, "Fetch error: {e}"),
65 Self::InvalidCursor(cursor) => write!(f, "Invalid cursor: {cursor}"),
66 }
67 }
68}
69
70impl<E: std::error::Error + 'static> std::error::Error for PagerError<E> {
71 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
72 match self {
73 Self::Fetch(e) => Some(e),
74 Self::InvalidCursor(_) => None,
75 }
76 }
77}
78
79pin_project! {
80 /// A cursor-based pager that implements `Stream` for paginated items.
81 ///
82 /// This pager manages cursor state internally and fetches pages on-demand,
83 /// yielding individual items from the stream.
84 ///
85 /// # Type Parameters
86 ///
87 /// * `T` - The item type
88 /// * `E` - The error type
89 /// * `F` - The fetcher function type
90 /// * `Fut` - The future returned by the fetcher
91 pub struct CursorPager<T, E, F, Fut>
92 where
93 F: FnMut(ODataQuery) -> Fut,
94 Fut: Future<Output = Result<Page<T>, E>>,
95 {
96 base_query: ODataQuery,
97 next_cursor: Option<String>,
98 buffer: VecDeque<T>,
99 done: bool,
100 fetcher: F,
101 #[pin]
102 current_fetch: Option<Fut>,
103 }
104}
105
106impl<T, E, F, Fut> CursorPager<T, E, F, Fut>
107where
108 F: FnMut(ODataQuery) -> Fut,
109 Fut: Future<Output = Result<Page<T>, E>>,
110{
111 /// Create a new cursor pager with the given base query and fetcher function.
112 ///
113 /// # Arguments
114 ///
115 /// * `base_query` - The base `OData` query (without cursor)
116 /// * `fetcher` - Function that fetches a page given an `ODataQuery`
117 ///
118 /// # Example
119 ///
120 /// ```rust,ignore
121 /// let pager = CursorPager::new(query, |q| async move {
122 /// client.list_users(q).await
123 /// });
124 /// ```
125 pub fn new(base_query: ODataQuery, fetcher: F) -> Self {
126 Self {
127 base_query,
128 next_cursor: None,
129 buffer: VecDeque::new(),
130 done: false,
131 fetcher,
132 current_fetch: None,
133 }
134 }
135}
136
137impl<T, E, F, Fut> Stream for CursorPager<T, E, F, Fut>
138where
139 F: FnMut(ODataQuery) -> Fut,
140 Fut: Future<Output = Result<Page<T>, E>>,
141{
142 type Item = Result<T, PagerError<E>>;
143
144 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
145 let mut this = self.project();
146
147 loop {
148 if let Some(item) = this.buffer.pop_front() {
149 return Poll::Ready(Some(Ok(item)));
150 }
151
152 if *this.done {
153 return Poll::Ready(None);
154 }
155
156 if let Some(fut) = this.current_fetch.as_mut().as_pin_mut() {
157 match fut.poll(cx) {
158 Poll::Ready(Ok(page)) => {
159 this.current_fetch.set(None);
160
161 this.next_cursor.clone_from(&page.page_info.next_cursor);
162
163 if this.next_cursor.is_none() {
164 *this.done = true;
165 }
166
167 this.buffer.extend(page.items);
168
169 continue;
170 }
171 Poll::Ready(Err(e)) => {
172 this.current_fetch.set(None);
173 *this.done = true;
174 return Poll::Ready(Some(Err(PagerError::Fetch(e))));
175 }
176 Poll::Pending => return Poll::Pending,
177 }
178 }
179
180 // Allocation strategy: base_query cloned once per page fetch.
181 // Filter AST is built once in QueryBuilder and reused here.
182 let mut query = this.base_query.clone();
183 if let Some(cursor_str) = this.next_cursor.as_ref() {
184 if let Ok(cursor) = modkit_odata::CursorV1::decode(cursor_str) {
185 query = query.with_cursor(cursor);
186 } else {
187 *this.done = true;
188 return Poll::Ready(Some(Err(PagerError::InvalidCursor(cursor_str.clone()))));
189 }
190 }
191
192 let fut = (this.fetcher)(query);
193 this.current_fetch.set(Some(fut));
194 }
195 }
196}
197
198pin_project! {
199 /// A cursor-based pager that implements `Stream` for pages.
200 ///
201 /// This pager yields entire pages instead of individual items.
202 ///
203 /// # Type Parameters
204 ///
205 /// * `T` - The item type
206 /// * `E` - The error type
207 /// * `F` - The fetcher function type
208 /// * `Fut` - The future returned by the fetcher
209 pub struct PagesPager<T, E, F, Fut>
210 where
211 F: FnMut(ODataQuery) -> Fut,
212 Fut: Future<Output = Result<Page<T>, E>>,
213 {
214 base_query: ODataQuery,
215 next_cursor: Option<String>,
216 done: bool,
217 fetcher: F,
218 #[pin]
219 current_fetch: Option<Fut>,
220 }
221}
222
223impl<T, E, F, Fut> PagesPager<T, E, F, Fut>
224where
225 F: FnMut(ODataQuery) -> Fut,
226 Fut: Future<Output = Result<Page<T>, E>>,
227{
228 /// Create a new pages pager with the given base query and fetcher function.
229 ///
230 /// # Arguments
231 ///
232 /// * `base_query` - The base `OData` query (without cursor)
233 /// * `fetcher` - Function that fetches a page given an `ODataQuery`
234 ///
235 /// # Example
236 ///
237 /// ```rust,ignore
238 /// let pager = PagesPager::new(query, |q| async move {
239 /// client.list_users(q).await
240 /// });
241 /// ```
242 pub fn new(base_query: ODataQuery, fetcher: F) -> Self {
243 Self {
244 base_query,
245 next_cursor: None,
246 done: false,
247 fetcher,
248 current_fetch: None,
249 }
250 }
251}
252
253impl<T, E, F, Fut> Stream for PagesPager<T, E, F, Fut>
254where
255 F: FnMut(ODataQuery) -> Fut,
256 Fut: Future<Output = Result<Page<T>, E>>,
257{
258 type Item = Result<Page<T>, PagerError<E>>;
259
260 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
261 let mut this = self.project();
262
263 loop {
264 if *this.done {
265 return Poll::Ready(None);
266 }
267
268 if let Some(fut) = this.current_fetch.as_mut().as_pin_mut() {
269 match fut.poll(cx) {
270 Poll::Ready(Ok(page)) => {
271 this.current_fetch.set(None);
272
273 this.next_cursor.clone_from(&page.page_info.next_cursor);
274
275 if this.next_cursor.is_none() {
276 *this.done = true;
277 }
278
279 return Poll::Ready(Some(Ok(page)));
280 }
281 Poll::Ready(Err(e)) => {
282 this.current_fetch.set(None);
283 *this.done = true;
284 return Poll::Ready(Some(Err(PagerError::Fetch(e))));
285 }
286 Poll::Pending => return Poll::Pending,
287 }
288 }
289
290 // Allocation strategy: base_query cloned once per page fetch.
291 // Filter AST is built once in QueryBuilder and reused here.
292 let mut query = this.base_query.clone();
293 if let Some(cursor_str) = this.next_cursor.as_ref() {
294 if let Ok(cursor) = modkit_odata::CursorV1::decode(cursor_str) {
295 query = query.with_cursor(cursor);
296 } else {
297 *this.done = true;
298 return Poll::Ready(Some(Err(PagerError::InvalidCursor(cursor_str.clone()))));
299 }
300 }
301
302 let fut = (this.fetcher)(query);
303 this.current_fetch.set(Some(fut));
304
305 // Poll the newly-installed future immediately so it can register the current waker
306 // naturally, avoiding a manual `wake_by_ref()` and the associated spurious wakeup.
307 }
308 }
309}
310
311#[cfg(test)]
312#[path = "pager_tests.rs"]
313mod pager_tests;