Skip to main content

mesa_dev/
pagination.rs

1//! Cursor-based pagination support.
2//!
3//! Paginated API endpoints return results in pages. This module provides
4//! [`PageStream`], a lazy async iterator that fetches pages on demand.
5//!
6//! # Usage
7//!
8//! Most users should use the `list_all()` methods on resource types, which
9//! return a `PageStream`:
10//!
11//! ```rust,no_run
12//! # use mesa_dev::{Mesa, MesaError};
13//! # async fn run() -> Result<(), MesaError> {
14//! # let client = Mesa::new("key");
15//! // Collect all items
16//! let repos = client.repos("org").list_all().collect().await?;
17//!
18//! // Or iterate one at a time
19//! let mut stream = client.repos("org").list_all();
20//! while let Some(repo) = stream.next().await? {
21//!     println!("{}", repo.name);
22//! }
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! # As a `futures::Stream`
28//!
29//! `PageStream` implements [`futures_core::Stream`], so you can use it with
30//! any stream combinator:
31//!
32//! ```rust,no_run
33//! # use mesa_dev::{Mesa, MesaError};
34//! use futures::{pin_mut, StreamExt};
35//!
36//! # async fn run() -> Result<(), MesaError> {
37//! # let client = Mesa::new("key");
38//! let stream = client.repos("org").list_all();
39//! pin_mut!(stream);
40//!
41//! while let Some(result) = stream.next().await {
42//!     let repo = result?;
43//!     println!("{}", repo.name);
44//! }
45//! # Ok(())
46//! # }
47//! ```
48//!
49//! For manual page-by-page control, use the `list()` methods with
50//! [`PaginationParams`](crate::models::PaginationParams).
51
52use std::collections::VecDeque;
53use std::future::Future;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57
58use http::Method;
59use serde::de::DeserializeOwned;
60
61use crate::client::ClientInner;
62use crate::error::MesaError;
63use crate::http_client::HttpClient;
64use crate::models::Paginated;
65
66/// A boxed, Send future that resolves to an optional page or error.
67type FetchFuture<Page> = Pin<Box<dyn Future<Output = Result<Option<Page>, MesaError>> + Send>>;
68
69/// An async page stream that lazily fetches pages from a paginated endpoint.
70///
71/// Created by `list_all()` methods on resource types. Owns all its state via
72/// `Arc`, so it has no lifetime parameters and can be stored freely.
73///
74/// # Methods
75///
76/// - [`next()`](Self::next) — Get the next individual item, fetching new pages
77///   as needed. Returns `Ok(None)` when exhausted.
78/// - [`next_page()`](Self::next_page) — Get the next full page of results.
79///   Returns `Ok(None)` when exhausted.
80/// - [`collect()`](Self::collect) — Consume the stream and collect all
81///   remaining items into a `Vec`.
82///
83/// # `futures::Stream`
84///
85/// This type also implements [`futures_core::Stream`] with
86/// `Item = Result<Page::Item, MesaError>`, enabling use with `StreamExt`
87/// combinators like `.map()`, `.filter()`, `.take()`, etc.
88pub struct PageStream<C: HttpClient, Page: Paginated + DeserializeOwned> {
89    inner: Arc<ClientInner<C>>,
90    path: String,
91    extra_query: Vec<(String, String)>,
92    cursor: Option<String>,
93    buffer: VecDeque<Page::Item>,
94    done: bool,
95    fetch_future: Option<FetchFuture<Page>>,
96}
97
98// All fields are Unpin (Arc, String, Vec, Option, VecDeque, bool, and
99// Pin<Box<..>> which is Unpin because Box is Unpin). This lets us access
100// &mut self freely inside poll_next without pin projection.
101impl<C: HttpClient, Page: Paginated + DeserializeOwned> Unpin for PageStream<C, Page> {}
102
103impl<C: HttpClient, Page: Paginated + DeserializeOwned> PageStream<C, Page> {
104    /// Create a new page stream.
105    pub(crate) fn new(
106        inner: Arc<ClientInner<C>>,
107        path: String,
108        extra_query: Vec<(String, String)>,
109    ) -> Self {
110        Self {
111            inner,
112            path,
113            extra_query,
114            cursor: None,
115            buffer: VecDeque::new(),
116            done: false,
117            fetch_future: None,
118        }
119    }
120
121    /// Fetch the next individual item, requesting new pages as needed.
122    ///
123    /// Returns `Ok(None)` when all pages have been exhausted.
124    ///
125    /// # Errors
126    ///
127    /// Returns [`MesaError`] if the API request fails.
128    pub async fn next(&mut self) -> Result<Option<Page::Item>, MesaError> {
129        if let Some(item) = self.buffer.pop_front() {
130            return Ok(Some(item));
131        }
132
133        if self.done {
134            return Ok(None);
135        }
136
137        if let Some(page) = self.fetch_page().await? {
138            let has_more = page.has_more();
139            self.cursor = page.next_cursor().map(ToOwned::to_owned);
140            self.buffer = VecDeque::from(page.items());
141            self.done = !has_more || self.cursor.is_none();
142            Ok(self.buffer.pop_front())
143        } else {
144            self.done = true;
145            Ok(None)
146        }
147    }
148
149    /// Collect all remaining items into a `Vec`.
150    ///
151    /// # Errors
152    ///
153    /// Returns [`MesaError`] if any API request fails.
154    pub async fn collect(mut self) -> Result<Vec<Page::Item>, MesaError> {
155        let mut all = Vec::new();
156        while let Some(item) = self.next().await? {
157            all.push(item);
158        }
159        Ok(all)
160    }
161
162    /// Fetch the next full page.
163    ///
164    /// Returns `Ok(None)` when all pages have been exhausted.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`MesaError`] if the API request fails.
169    pub async fn next_page(&mut self) -> Result<Option<Page>, MesaError> {
170        if self.done {
171            return Ok(None);
172        }
173        let page = self.fetch_page().await?;
174        if let Some(ref p) = page {
175            let has_more = p.has_more();
176            self.cursor = p.next_cursor().map(ToOwned::to_owned);
177            self.done = !has_more || self.cursor.is_none();
178        } else {
179            self.done = true;
180        }
181        Ok(page)
182    }
183
184    /// Internal: fetch a single page from the API.
185    async fn fetch_page(&self) -> Result<Option<Page>, MesaError> {
186        let mut query: Vec<(&str, &str)> = self
187            .extra_query
188            .iter()
189            .map(|(k, v)| (k.as_str(), v.as_str()))
190            .collect();
191
192        if let Some(ref cursor) = self.cursor {
193            query.push(("cursor", cursor));
194        }
195
196        let page: Page = self
197            .inner
198            .request(Method::GET, &self.path, &query, None)
199            .await?;
200
201        Ok(Some(page))
202    }
203}
204
205/// Fetch a single page from the API with all owned arguments.
206///
207/// This standalone function captures owned data so the returned future is
208/// `'static`, which is required for the `Stream` implementation.
209async fn fetch_page_owned<C: HttpClient, Page: Paginated + DeserializeOwned>(
210    inner: Arc<ClientInner<C>>,
211    path: String,
212    extra_query: Vec<(String, String)>,
213    cursor: Option<String>,
214) -> Result<Option<Page>, MesaError> {
215    let mut query: Vec<(&str, &str)> = extra_query
216        .iter()
217        .map(|(k, v)| (k.as_str(), v.as_str()))
218        .collect();
219
220    if let Some(ref c) = cursor {
221        query.push(("cursor", c));
222    }
223
224    let page: Page = inner.request(Method::GET, &path, &query, None).await?;
225    Ok(Some(page))
226}
227
228impl<C, Page> futures_core::Stream for PageStream<C, Page>
229where
230    C: HttpClient + 'static,
231    Page: Paginated + DeserializeOwned + 'static,
232{
233    type Item = Result<Page::Item, MesaError>;
234
235    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
236        let this = self.get_mut();
237
238        // Return buffered items first.
239        if let Some(item) = this.buffer.pop_front() {
240            return Poll::Ready(Some(Ok(item)));
241        }
242
243        // If done, signal end of stream.
244        if this.done {
245            return Poll::Ready(None);
246        }
247
248        // If no fetch is in flight, start one.
249        if this.fetch_future.is_none() {
250            this.fetch_future = Some(Box::pin(fetch_page_owned::<C, Page>(
251                Arc::clone(&this.inner),
252                this.path.clone(),
253                this.extra_query.clone(),
254                this.cursor.clone(),
255            )));
256        }
257
258        // Poll the in-flight fetch.
259        let Some(fut) = this.fetch_future.as_mut() else {
260            this.done = true;
261            return Poll::Ready(None);
262        };
263
264        match fut.as_mut().poll(cx) {
265            Poll::Pending => Poll::Pending,
266            Poll::Ready(result) => {
267                this.fetch_future = None;
268
269                match result {
270                    Ok(Some(page)) => {
271                        let has_more = page.has_more();
272                        this.cursor = page.next_cursor().map(ToOwned::to_owned);
273                        this.buffer = VecDeque::from(page.items());
274                        this.done = !has_more || this.cursor.is_none();
275                        if let Some(item) = this.buffer.pop_front() {
276                            Poll::Ready(Some(Ok(item)))
277                        } else {
278                            this.done = true;
279                            Poll::Ready(None)
280                        }
281                    }
282                    Ok(None) => {
283                        this.done = true;
284                        Poll::Ready(None)
285                    }
286                    Err(e) => {
287                        this.done = true;
288                        Poll::Ready(Some(Err(e)))
289                    }
290                }
291            }
292        }
293    }
294}