Skip to main content

tango/
pagination.rs

1//! Pagination: [`Page<T>`] envelope + async [`PageStream<T>`].
2//!
3//! Every list endpoint returns a [`Page<T>`] for a single page of results.
4//! For walking every page, resource methods expose an `iterate_*` constructor
5//! that returns a [`PageStream<T>`], which implements [`futures::Stream`].
6//!
7//! ```no_run
8//! use futures::TryStreamExt;
9//! use tango::Client;
10//! # async fn run() -> tango::Result<()> {
11//! let client = Client::builder().api_key("…").build()?;
12//! let opts = tango::ListContractsOptions::builder()
13//!     .awarding_agency("9700")
14//!     .build();
15//! let mut stream = client.iterate_contracts(opts);
16//! while let Some(record) = stream.try_next().await? {
17//!     println!("{:?}", record.get("piid"));
18//! }
19//! # Ok(()) }
20//! ```
21
22use crate::client::Client;
23use crate::error::{Error, Result};
24use futures::stream::Stream;
25use serde::Deserialize;
26use std::future::Future;
27use std::pin::Pin;
28use std::task::{Context, Poll};
29
30/// A single page of results from a list endpoint.
31///
32/// `next`/`previous` are the server-supplied URLs to the adjacent pages;
33/// `cursor` is the keyset cursor extracted from `next` for convenience (empty
34/// on offset-paginated endpoints).
35#[derive(Debug, Clone)]
36pub struct Page<T> {
37    /// Total number of items across all pages, when the server reports it.
38    /// For some endpoints this matches `results.len()`.
39    pub count: u64,
40    /// URL of the next page, or `None` when on the last page.
41    pub next: Option<String>,
42    /// URL of the previous page, or `None` when on the first page.
43    pub previous: Option<String>,
44    /// Opaque page metadata for keyset endpoints.
45    pub page_metadata: Option<serde_json::Value>,
46    /// Cursor extracted from `next` on cursor-paginated endpoints. Pass it
47    /// back via the `cursor` option to fetch the next page. Empty when the
48    /// endpoint is offset-paginated or no cursor is set.
49    pub cursor: Option<String>,
50    /// The actual data for this page.
51    pub results: Vec<T>,
52}
53
54#[derive(Deserialize)]
55struct RawPage<T> {
56    #[serde(default)]
57    count: Option<serde_json::Value>,
58    #[serde(default)]
59    next: Option<String>,
60    #[serde(default)]
61    previous: Option<String>,
62    #[serde(default)]
63    page_metadata: Option<serde_json::Value>,
64    #[serde(default = "Vec::new")]
65    results: Vec<T>,
66}
67
68impl<T> Page<T> {
69    /// Decode a JSON byte slice into a [`Page<T>`].
70    pub(crate) fn decode(bytes: &[u8]) -> Result<Self>
71    where
72        T: for<'de> Deserialize<'de>,
73    {
74        let raw: RawPage<T> = serde_json::from_slice(bytes).map_err(Error::Decode)?;
75        let count = match raw.count {
76            Some(serde_json::Value::Number(n)) => n.as_u64().unwrap_or(0),
77            Some(serde_json::Value::String(s)) => s.parse::<u64>().unwrap_or(0),
78            _ => 0,
79        };
80        let cursor = raw.next.as_deref().and_then(extract_cursor);
81        let mut out = Self {
82            count,
83            next: raw.next.filter(|s| !s.is_empty()),
84            previous: raw.previous.filter(|s| !s.is_empty()),
85            page_metadata: raw.page_metadata,
86            cursor,
87            results: raw.results,
88        };
89        if out.count == 0 {
90            out.count = out.results.len() as u64;
91        }
92        Ok(out)
93    }
94}
95
96fn extract_cursor(url: &str) -> Option<String> {
97    let parsed = reqwest::Url::parse(url).ok()?;
98    parsed
99        .query_pairs()
100        .find(|(k, _)| k == "cursor")
101        .map(|(_, v)| v.into_owned())
102        .filter(|v| !v.is_empty())
103}
104
105fn extract_page(url: &str) -> Option<u32> {
106    let parsed = reqwest::Url::parse(url).ok()?;
107    parsed
108        .query_pairs()
109        .find(|(k, _)| k == "page")
110        .and_then(|(_, v)| v.parse::<u32>().ok())
111}
112
113/// Future produced by a [`FetchFn`].
114type FetchFut<T> = Pin<Box<dyn Future<Output = Result<Page<T>>> + Send>>;
115
116/// Fetcher signature for [`PageStream`]: given the next `(page, cursor)`,
117/// return a future for the next [`Page<T>`].
118pub(crate) type FetchFn<T> =
119    Box<dyn FnMut(Client, Option<u32>, Option<String>) -> FetchFut<T> + Send>;
120
121/// An async stream over every result of a paginated list endpoint.
122///
123/// Yields `T` items one at a time, fetching successive pages on demand. The
124/// stream follows `?cursor=` when the server returns one in `next`, falling
125/// back to `?page=` for offset-paginated endpoints.
126///
127/// Construct with the `iterate_*` methods on [`Client`]; you typically use
128/// `try_next().await` (via [`futures::TryStreamExt`]) or `collect_all().await`.
129///
130/// # Cancellation
131///
132/// Dropping the stream stops fetching mid-page. In-flight requests are
133/// cancelled when their future is dropped (`reqwest`'s standard behavior).
134pub struct PageStream<T> {
135    client: Client,
136    fetch: FetchFn<T>,
137    next_page: Option<u32>,
138    next_cursor: Option<String>,
139    buf: std::vec::IntoIter<T>,
140    done: bool,
141    in_flight: Option<FetchFut<T>>,
142}
143
144// All fields are themselves Unpin (Pin<Box<dyn Future>> is Unpin, IntoIter is
145// Unpin, the rest are inert). The Stream impl uses `Pin<&mut Self>` purely as
146// a type-system requirement, not because the struct holds self-referential
147// state. Marking Unpin lets us call `get_mut` in `poll_next` and freely mutate
148// the buffered iterator/page-tracking fields.
149impl<T> Unpin for PageStream<T> {}
150
151impl<T> std::fmt::Debug for PageStream<T> {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("PageStream")
154            .field("next_page", &self.next_page)
155            .field("next_cursor", &self.next_cursor)
156            .field("done", &self.done)
157            .finish_non_exhaustive()
158    }
159}
160
161impl<T> PageStream<T>
162where
163    T: Send + 'static,
164{
165    pub(crate) fn new(client: Client, fetch: FetchFn<T>) -> Self {
166        Self {
167            client,
168            fetch,
169            next_page: None,
170            next_cursor: None,
171            buf: Vec::new().into_iter(),
172            done: false,
173            in_flight: None,
174        }
175    }
176
177    /// Drain the stream into a single `Vec<T>`.
178    ///
179    /// Convenience for callers who want every result. For huge result sets,
180    /// prefer iterating with `try_next` to bound memory.
181    pub async fn collect_all(self) -> Result<Vec<T>> {
182        use futures::TryStreamExt;
183        self.try_collect().await
184    }
185}
186
187impl<T> Stream for PageStream<T>
188where
189    T: Send + 'static,
190{
191    type Item = Result<T>;
192
193    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
194        // PageStream is Unpin (see impl above) — safe to project to &mut Self.
195        let this = self.get_mut();
196        loop {
197            // 1. Drain the buffer FIRST, even after `done` was set. The last
198            //    page legitimately holds results after the server says next=null;
199            //    yielding them before honoring done is what makes single-page
200            //    iteration correct. (Mirrors the F1 Go-iterator bug fix.)
201            if let Some(item) = this.buf.next() {
202                return Poll::Ready(Some(Ok(item)));
203            }
204            if this.done {
205                return Poll::Ready(None);
206            }
207
208            // 2. Drive any in-flight fetch, or start a new one.
209            if this.in_flight.is_none() {
210                let client = this.client.clone();
211                let page = this.next_page;
212                let cursor = this.next_cursor.clone();
213                let fut = (this.fetch)(client, page, cursor);
214                this.in_flight = Some(fut);
215            }
216
217            let fut = this
218                .in_flight
219                .as_mut()
220                .expect("we just set in_flight to Some");
221            match fut.as_mut().poll(cx) {
222                Poll::Pending => return Poll::Pending,
223                Poll::Ready(Err(e)) => {
224                    this.in_flight = None;
225                    this.done = true;
226                    return Poll::Ready(Some(Err(e)));
227                }
228                Poll::Ready(Ok(page)) => {
229                    this.in_flight = None;
230                    let Page {
231                        next,
232                        cursor,
233                        results,
234                        ..
235                    } = page;
236
237                    if results.is_empty() {
238                        this.done = true;
239                        return Poll::Ready(None);
240                    }
241                    this.buf = results.into_iter();
242
243                    // Decide how to advance to the next page.
244                    match next.as_deref() {
245                        None => {
246                            this.done = true;
247                            this.next_page = None;
248                            this.next_cursor = None;
249                        }
250                        Some(next_url) => {
251                            if let Some(c) = cursor {
252                                this.next_cursor = Some(c);
253                                this.next_page = None;
254                            } else if let Some(p) = extract_page(next_url) {
255                                this.next_page = Some(p);
256                                this.next_cursor = None;
257                            } else {
258                                this.done = true;
259                                this.next_page = None;
260                                this.next_cursor = None;
261                            }
262                        }
263                    }
264                    // Loop back: yield the first buffered item.
265                }
266            }
267        }
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274
275    #[test]
276    fn decode_basic_page() {
277        let body = br#"{
278            "count": 2,
279            "next": null,
280            "previous": null,
281            "results": [{"a": 1}, {"a": 2}]
282        }"#;
283        let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
284        assert_eq!(page.count, 2);
285        assert!(page.next.is_none());
286        assert_eq!(page.results.len(), 2);
287        assert!(page.cursor.is_none());
288    }
289
290    #[test]
291    fn decode_string_count() {
292        let body = br#"{"count": "42", "results": []}"#;
293        let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
294        assert_eq!(page.count, 42);
295    }
296
297    #[test]
298    fn decode_extracts_cursor_from_next() {
299        let body = br#"{
300            "count": 1,
301            "next": "https://tango.example/api/contracts/?cursor=abc123&limit=25",
302            "results": [{"piid": "X"}]
303        }"#;
304        let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
305        assert_eq!(page.cursor.as_deref(), Some("abc123"));
306        assert!(page.next.is_some());
307    }
308
309    #[test]
310    fn count_defaults_to_results_len_when_zero() {
311        let body = br#"{"results": [{"a": 1}, {"a": 2}, {"a": 3}]}"#;
312        let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
313        assert_eq!(page.count, 3);
314    }
315
316    #[test]
317    fn extract_page_parses_query_param() {
318        assert_eq!(
319            extract_page("https://tango.example/api/contracts/?page=4&limit=25"),
320            Some(4)
321        );
322        assert_eq!(extract_page("https://tango.example/api/contracts/"), None);
323    }
324}