mesa_dev/pagination.rs
1//! Cursor-based pagination support.
2//!
3//! Paginated API endpoints return results in pages. This module provides
4//! [`PageStream`], a lazy async iterator that fetches pages on demand.
5//!
6//! # Usage
7//!
8//! Most users should use the `list_all()` methods on resource types, which
9//! return a `PageStream`:
10//!
11//! ```rust,no_run
12//! # use mesa_dev::{Mesa, MesaError};
13//! # async fn run() -> Result<(), MesaError> {
14//! # let client = Mesa::new("key");
15//! // Collect all items
16//! let repos = client.repos("org").list_all().collect().await?;
17//!
18//! // Or iterate one at a time
19//! let mut stream = client.repos("org").list_all();
20//! while let Some(repo) = stream.next().await? {
21//! println!("{}", repo.name);
22//! }
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! # As a `futures::Stream`
28//!
29//! `PageStream` implements [`futures_core::Stream`], so you can use it with
30//! any stream combinator:
31//!
32//! ```rust,no_run
33//! # use mesa_dev::{Mesa, MesaError};
34//! use futures::{pin_mut, StreamExt};
35//!
36//! # async fn run() -> Result<(), MesaError> {
37//! # let client = Mesa::new("key");
38//! let stream = client.repos("org").list_all();
39//! pin_mut!(stream);
40//!
41//! while let Some(result) = stream.next().await {
42//! let repo = result?;
43//! println!("{}", repo.name);
44//! }
45//! # Ok(())
46//! # }
47//! ```
48//!
49//! For manual page-by-page control, use the `list()` methods with
50//! [`PaginationParams`](crate::models::PaginationParams).
51
52use std::collections::VecDeque;
53use std::future::Future;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57
58use http::Method;
59use serde::de::DeserializeOwned;
60
61use crate::client::ClientInner;
62use crate::error::MesaError;
63use crate::http_client::HttpClient;
64use crate::models::Paginated;
65
66/// A boxed, Send future that resolves to an optional page or error.
67type FetchFuture<Page> = Pin<Box<dyn Future<Output = Result<Option<Page>, MesaError>> + Send>>;
68
69/// An async page stream that lazily fetches pages from a paginated endpoint.
70///
71/// Created by `list_all()` methods on resource types. Owns all its state via
72/// `Arc`, so it has no lifetime parameters and can be stored freely.
73///
74/// # Methods
75///
76/// - [`next()`](Self::next) — Get the next individual item, fetching new pages
77/// as needed. Returns `Ok(None)` when exhausted.
78/// - [`next_page()`](Self::next_page) — Get the next full page of results.
79/// Returns `Ok(None)` when exhausted.
80/// - [`collect()`](Self::collect) — Consume the stream and collect all
81/// remaining items into a `Vec`.
82///
83/// # `futures::Stream`
84///
85/// This type also implements [`futures_core::Stream`] with
86/// `Item = Result<Page::Item, MesaError>`, enabling use with `StreamExt`
87/// combinators like `.map()`, `.filter()`, `.take()`, etc.
88pub struct PageStream<C: HttpClient, Page: Paginated + DeserializeOwned> {
89 inner: Arc<ClientInner<C>>,
90 path: String,
91 extra_query: Vec<(String, String)>,
92 cursor: Option<String>,
93 buffer: VecDeque<Page::Item>,
94 done: bool,
95 fetch_future: Option<FetchFuture<Page>>,
96}
97
98// All fields are Unpin (Arc, String, Vec, Option, VecDeque, bool, and
99// Pin<Box<..>> which is Unpin because Box is Unpin). This lets us access
100// &mut self freely inside poll_next without pin projection.
101impl<C: HttpClient, Page: Paginated + DeserializeOwned> Unpin for PageStream<C, Page> {}
102
103impl<C: HttpClient, Page: Paginated + DeserializeOwned> PageStream<C, Page> {
104 /// Create a new page stream.
105 pub(crate) fn new(
106 inner: Arc<ClientInner<C>>,
107 path: String,
108 extra_query: Vec<(String, String)>,
109 ) -> Self {
110 Self {
111 inner,
112 path,
113 extra_query,
114 cursor: None,
115 buffer: VecDeque::new(),
116 done: false,
117 fetch_future: None,
118 }
119 }
120
121 /// Fetch the next individual item, requesting new pages as needed.
122 ///
123 /// Returns `Ok(None)` when all pages have been exhausted.
124 ///
125 /// # Errors
126 ///
127 /// Returns [`MesaError`] if the API request fails.
128 pub async fn next(&mut self) -> Result<Option<Page::Item>, MesaError> {
129 if let Some(item) = self.buffer.pop_front() {
130 return Ok(Some(item));
131 }
132
133 if self.done {
134 return Ok(None);
135 }
136
137 if let Some(page) = self.fetch_page().await? {
138 let has_more = page.has_more();
139 self.cursor = page.next_cursor().map(ToOwned::to_owned);
140 self.buffer = VecDeque::from(page.items());
141 self.done = !has_more || self.cursor.is_none();
142 Ok(self.buffer.pop_front())
143 } else {
144 self.done = true;
145 Ok(None)
146 }
147 }
148
149 /// Collect all remaining items into a `Vec`.
150 ///
151 /// # Errors
152 ///
153 /// Returns [`MesaError`] if any API request fails.
154 pub async fn collect(mut self) -> Result<Vec<Page::Item>, MesaError> {
155 let mut all = Vec::new();
156 while let Some(item) = self.next().await? {
157 all.push(item);
158 }
159 Ok(all)
160 }
161
162 /// Fetch the next full page.
163 ///
164 /// Returns `Ok(None)` when all pages have been exhausted.
165 ///
166 /// # Errors
167 ///
168 /// Returns [`MesaError`] if the API request fails.
169 pub async fn next_page(&mut self) -> Result<Option<Page>, MesaError> {
170 if self.done {
171 return Ok(None);
172 }
173 let page = self.fetch_page().await?;
174 if let Some(ref p) = page {
175 let has_more = p.has_more();
176 self.cursor = p.next_cursor().map(ToOwned::to_owned);
177 self.done = !has_more || self.cursor.is_none();
178 } else {
179 self.done = true;
180 }
181 Ok(page)
182 }
183
184 /// Internal: fetch a single page from the API.
185 async fn fetch_page(&self) -> Result<Option<Page>, MesaError> {
186 let mut query: Vec<(&str, &str)> = self
187 .extra_query
188 .iter()
189 .map(|(k, v)| (k.as_str(), v.as_str()))
190 .collect();
191
192 if let Some(ref cursor) = self.cursor {
193 query.push(("cursor", cursor));
194 }
195
196 let page: Page = self
197 .inner
198 .request(Method::GET, &self.path, &query, None)
199 .await?;
200
201 Ok(Some(page))
202 }
203}
204
205/// Fetch a single page from the API with all owned arguments.
206///
207/// This standalone function captures owned data so the returned future is
208/// `'static`, which is required for the `Stream` implementation.
209async fn fetch_page_owned<C: HttpClient, Page: Paginated + DeserializeOwned>(
210 inner: Arc<ClientInner<C>>,
211 path: String,
212 extra_query: Vec<(String, String)>,
213 cursor: Option<String>,
214) -> Result<Option<Page>, MesaError> {
215 let mut query: Vec<(&str, &str)> = extra_query
216 .iter()
217 .map(|(k, v)| (k.as_str(), v.as_str()))
218 .collect();
219
220 if let Some(ref c) = cursor {
221 query.push(("cursor", c));
222 }
223
224 let page: Page = inner.request(Method::GET, &path, &query, None).await?;
225 Ok(Some(page))
226}
227
228impl<C, Page> futures_core::Stream for PageStream<C, Page>
229where
230 C: HttpClient + 'static,
231 Page: Paginated + DeserializeOwned + 'static,
232{
233 type Item = Result<Page::Item, MesaError>;
234
235 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
236 let this = self.get_mut();
237
238 // Return buffered items first.
239 if let Some(item) = this.buffer.pop_front() {
240 return Poll::Ready(Some(Ok(item)));
241 }
242
243 // If done, signal end of stream.
244 if this.done {
245 return Poll::Ready(None);
246 }
247
248 // If no fetch is in flight, start one.
249 if this.fetch_future.is_none() {
250 this.fetch_future = Some(Box::pin(fetch_page_owned::<C, Page>(
251 Arc::clone(&this.inner),
252 this.path.clone(),
253 this.extra_query.clone(),
254 this.cursor.clone(),
255 )));
256 }
257
258 // Poll the in-flight fetch.
259 let Some(fut) = this.fetch_future.as_mut() else {
260 this.done = true;
261 return Poll::Ready(None);
262 };
263
264 match fut.as_mut().poll(cx) {
265 Poll::Pending => Poll::Pending,
266 Poll::Ready(result) => {
267 this.fetch_future = None;
268
269 match result {
270 Ok(Some(page)) => {
271 let has_more = page.has_more();
272 this.cursor = page.next_cursor().map(ToOwned::to_owned);
273 this.buffer = VecDeque::from(page.items());
274 this.done = !has_more || this.cursor.is_none();
275 if let Some(item) = this.buffer.pop_front() {
276 Poll::Ready(Some(Ok(item)))
277 } else {
278 this.done = true;
279 Poll::Ready(None)
280 }
281 }
282 Ok(None) => {
283 this.done = true;
284 Poll::Ready(None)
285 }
286 Err(e) => {
287 this.done = true;
288 Poll::Ready(Some(Err(e)))
289 }
290 }
291 }
292 }
293 }
294}