1use crate::client::Client;
23use crate::error::{Error, Result};
24use futures::stream::Stream;
25use serde::Deserialize;
26use std::future::Future;
27use std::pin::Pin;
28use std::task::{Context, Poll};
29
30#[derive(Debug, Clone)]
36pub struct Page<T> {
37 pub count: u64,
40 pub next: Option<String>,
42 pub previous: Option<String>,
44 pub page_metadata: Option<serde_json::Value>,
46 pub cursor: Option<String>,
50 pub results: Vec<T>,
52}
53
54#[derive(Deserialize)]
55struct RawPage<T> {
56 #[serde(default)]
57 count: Option<serde_json::Value>,
58 #[serde(default)]
59 next: Option<String>,
60 #[serde(default)]
61 previous: Option<String>,
62 #[serde(default)]
63 page_metadata: Option<serde_json::Value>,
64 #[serde(default = "Vec::new")]
65 results: Vec<T>,
66}
67
68impl<T> Page<T> {
69 pub(crate) fn decode(bytes: &[u8]) -> Result<Self>
71 where
72 T: for<'de> Deserialize<'de>,
73 {
74 let raw: RawPage<T> = serde_json::from_slice(bytes).map_err(Error::Decode)?;
75 let count = match raw.count {
76 Some(serde_json::Value::Number(n)) => n.as_u64().unwrap_or(0),
77 Some(serde_json::Value::String(s)) => s.parse::<u64>().unwrap_or(0),
78 _ => 0,
79 };
80 let cursor = raw.next.as_deref().and_then(extract_cursor);
81 let mut out = Self {
82 count,
83 next: raw.next.filter(|s| !s.is_empty()),
84 previous: raw.previous.filter(|s| !s.is_empty()),
85 page_metadata: raw.page_metadata,
86 cursor,
87 results: raw.results,
88 };
89 if out.count == 0 {
90 out.count = out.results.len() as u64;
91 }
92 Ok(out)
93 }
94}
95
96fn extract_cursor(url: &str) -> Option<String> {
97 let parsed = reqwest::Url::parse(url).ok()?;
98 parsed
99 .query_pairs()
100 .find(|(k, _)| k == "cursor")
101 .map(|(_, v)| v.into_owned())
102 .filter(|v| !v.is_empty())
103}
104
105fn extract_page(url: &str) -> Option<u32> {
106 let parsed = reqwest::Url::parse(url).ok()?;
107 parsed
108 .query_pairs()
109 .find(|(k, _)| k == "page")
110 .and_then(|(_, v)| v.parse::<u32>().ok())
111}
112
113type FetchFut<T> = Pin<Box<dyn Future<Output = Result<Page<T>>> + Send>>;
115
116pub(crate) type FetchFn<T> =
119 Box<dyn FnMut(Client, Option<u32>, Option<String>) -> FetchFut<T> + Send>;
120
121pub struct PageStream<T> {
135 client: Client,
136 fetch: FetchFn<T>,
137 next_page: Option<u32>,
138 next_cursor: Option<String>,
139 buf: std::vec::IntoIter<T>,
140 done: bool,
141 in_flight: Option<FetchFut<T>>,
142}
143
144impl<T> Unpin for PageStream<T> {}
150
151impl<T> std::fmt::Debug for PageStream<T> {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("PageStream")
154 .field("next_page", &self.next_page)
155 .field("next_cursor", &self.next_cursor)
156 .field("done", &self.done)
157 .finish_non_exhaustive()
158 }
159}
160
161impl<T> PageStream<T>
162where
163 T: Send + 'static,
164{
165 pub(crate) fn new(client: Client, fetch: FetchFn<T>) -> Self {
166 Self {
167 client,
168 fetch,
169 next_page: None,
170 next_cursor: None,
171 buf: Vec::new().into_iter(),
172 done: false,
173 in_flight: None,
174 }
175 }
176
177 pub async fn collect_all(self) -> Result<Vec<T>> {
182 use futures::TryStreamExt;
183 self.try_collect().await
184 }
185}
186
187impl<T> Stream for PageStream<T>
188where
189 T: Send + 'static,
190{
191 type Item = Result<T>;
192
193 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
194 let this = self.get_mut();
196 loop {
197 if let Some(item) = this.buf.next() {
202 return Poll::Ready(Some(Ok(item)));
203 }
204 if this.done {
205 return Poll::Ready(None);
206 }
207
208 if this.in_flight.is_none() {
210 let client = this.client.clone();
211 let page = this.next_page;
212 let cursor = this.next_cursor.clone();
213 let fut = (this.fetch)(client, page, cursor);
214 this.in_flight = Some(fut);
215 }
216
217 let fut = this
218 .in_flight
219 .as_mut()
220 .expect("we just set in_flight to Some");
221 match fut.as_mut().poll(cx) {
222 Poll::Pending => return Poll::Pending,
223 Poll::Ready(Err(e)) => {
224 this.in_flight = None;
225 this.done = true;
226 return Poll::Ready(Some(Err(e)));
227 }
228 Poll::Ready(Ok(page)) => {
229 this.in_flight = None;
230 let Page {
231 next,
232 cursor,
233 results,
234 ..
235 } = page;
236
237 if results.is_empty() {
238 this.done = true;
239 return Poll::Ready(None);
240 }
241 this.buf = results.into_iter();
242
243 match next.as_deref() {
245 None => {
246 this.done = true;
247 this.next_page = None;
248 this.next_cursor = None;
249 }
250 Some(next_url) => {
251 if let Some(c) = cursor {
252 this.next_cursor = Some(c);
253 this.next_page = None;
254 } else if let Some(p) = extract_page(next_url) {
255 this.next_page = Some(p);
256 this.next_cursor = None;
257 } else {
258 this.done = true;
259 this.next_page = None;
260 this.next_cursor = None;
261 }
262 }
263 }
264 }
266 }
267 }
268 }
269}
270
271#[cfg(test)]
272mod tests {
273 use super::*;
274
275 #[test]
276 fn decode_basic_page() {
277 let body = br#"{
278 "count": 2,
279 "next": null,
280 "previous": null,
281 "results": [{"a": 1}, {"a": 2}]
282 }"#;
283 let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
284 assert_eq!(page.count, 2);
285 assert!(page.next.is_none());
286 assert_eq!(page.results.len(), 2);
287 assert!(page.cursor.is_none());
288 }
289
290 #[test]
291 fn decode_string_count() {
292 let body = br#"{"count": "42", "results": []}"#;
293 let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
294 assert_eq!(page.count, 42);
295 }
296
297 #[test]
298 fn decode_extracts_cursor_from_next() {
299 let body = br#"{
300 "count": 1,
301 "next": "https://tango.example/api/contracts/?cursor=abc123&limit=25",
302 "results": [{"piid": "X"}]
303 }"#;
304 let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
305 assert_eq!(page.cursor.as_deref(), Some("abc123"));
306 assert!(page.next.is_some());
307 }
308
309 #[test]
310 fn count_defaults_to_results_len_when_zero() {
311 let body = br#"{"results": [{"a": 1}, {"a": 2}, {"a": 3}]}"#;
312 let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
313 assert_eq!(page.count, 3);
314 }
315
316 #[test]
317 fn extract_page_parses_query_param() {
318 assert_eq!(
319 extract_page("https://tango.example/api/contracts/?page=4&limit=25"),
320 Some(4)
321 );
322 assert_eq!(extract_page("https://tango.example/api/contracts/"), None);
323 }
324}