use crate::client::Client;
use crate::error::{Error, Result};
use futures::stream::Stream;
use serde::Deserialize;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug, Clone)]
pub struct Page<T> {
pub count: u64,
pub next: Option<String>,
pub previous: Option<String>,
pub page_metadata: Option<serde_json::Value>,
pub cursor: Option<String>,
pub results: Vec<T>,
}
#[derive(Deserialize)]
struct RawPage<T> {
#[serde(default)]
count: Option<serde_json::Value>,
#[serde(default)]
next: Option<String>,
#[serde(default)]
previous: Option<String>,
#[serde(default)]
page_metadata: Option<serde_json::Value>,
#[serde(default = "Vec::new")]
results: Vec<T>,
}
impl<T> Page<T> {
pub(crate) fn decode(bytes: &[u8]) -> Result<Self>
where
T: for<'de> Deserialize<'de>,
{
let raw: RawPage<T> = serde_json::from_slice(bytes).map_err(Error::Decode)?;
let count = match raw.count {
Some(serde_json::Value::Number(n)) => n.as_u64().unwrap_or(0),
Some(serde_json::Value::String(s)) => s.parse::<u64>().unwrap_or(0),
_ => 0,
};
let cursor = raw.next.as_deref().and_then(extract_cursor);
let mut out = Self {
count,
next: raw.next.filter(|s| !s.is_empty()),
previous: raw.previous.filter(|s| !s.is_empty()),
page_metadata: raw.page_metadata,
cursor,
results: raw.results,
};
if out.count == 0 {
out.count = out.results.len() as u64;
}
Ok(out)
}
}
fn extract_cursor(url: &str) -> Option<String> {
let parsed = reqwest::Url::parse(url).ok()?;
parsed
.query_pairs()
.find(|(k, _)| k == "cursor")
.map(|(_, v)| v.into_owned())
.filter(|v| !v.is_empty())
}
fn extract_page(url: &str) -> Option<u32> {
let parsed = reqwest::Url::parse(url).ok()?;
parsed
.query_pairs()
.find(|(k, _)| k == "page")
.and_then(|(_, v)| v.parse::<u32>().ok())
}
type FetchFut<T> = Pin<Box<dyn Future<Output = Result<Page<T>>> + Send>>;
pub(crate) type FetchFn<T> =
Box<dyn FnMut(Client, Option<u32>, Option<String>) -> FetchFut<T> + Send>;
pub struct PageStream<T> {
client: Client,
fetch: FetchFn<T>,
next_page: Option<u32>,
next_cursor: Option<String>,
buf: std::vec::IntoIter<T>,
done: bool,
in_flight: Option<FetchFut<T>>,
}
impl<T> Unpin for PageStream<T> {}
impl<T> std::fmt::Debug for PageStream<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PageStream")
.field("next_page", &self.next_page)
.field("next_cursor", &self.next_cursor)
.field("done", &self.done)
.finish_non_exhaustive()
}
}
impl<T> PageStream<T>
where
T: Send + 'static,
{
pub(crate) fn new(client: Client, fetch: FetchFn<T>) -> Self {
Self {
client,
fetch,
next_page: None,
next_cursor: None,
buf: Vec::new().into_iter(),
done: false,
in_flight: None,
}
}
pub async fn collect_all(self) -> Result<Vec<T>> {
use futures::TryStreamExt;
self.try_collect().await
}
}
impl<T> Stream for PageStream<T>
where
T: Send + 'static,
{
type Item = Result<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if let Some(item) = this.buf.next() {
return Poll::Ready(Some(Ok(item)));
}
if this.done {
return Poll::Ready(None);
}
if this.in_flight.is_none() {
let client = this.client.clone();
let page = this.next_page;
let cursor = this.next_cursor.clone();
let fut = (this.fetch)(client, page, cursor);
this.in_flight = Some(fut);
}
let fut = this
.in_flight
.as_mut()
.expect("we just set in_flight to Some");
match fut.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => {
this.in_flight = None;
this.done = true;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(Ok(page)) => {
this.in_flight = None;
let Page {
next,
cursor,
results,
..
} = page;
if results.is_empty() {
this.done = true;
return Poll::Ready(None);
}
this.buf = results.into_iter();
match next.as_deref() {
None => {
this.done = true;
this.next_page = None;
this.next_cursor = None;
}
Some(next_url) => {
if let Some(c) = cursor {
this.next_cursor = Some(c);
this.next_page = None;
} else if let Some(p) = extract_page(next_url) {
this.next_page = Some(p);
this.next_cursor = None;
} else {
this.done = true;
this.next_page = None;
this.next_cursor = None;
}
}
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn decode_basic_page() {
let body = br#"{
"count": 2,
"next": null,
"previous": null,
"results": [{"a": 1}, {"a": 2}]
}"#;
let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
assert_eq!(page.count, 2);
assert!(page.next.is_none());
assert_eq!(page.results.len(), 2);
assert!(page.cursor.is_none());
}
#[test]
fn decode_string_count() {
let body = br#"{"count": "42", "results": []}"#;
let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
assert_eq!(page.count, 42);
}
#[test]
fn decode_extracts_cursor_from_next() {
let body = br#"{
"count": 1,
"next": "https://tango.example/api/contracts/?cursor=abc123&limit=25",
"results": [{"piid": "X"}]
}"#;
let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
assert_eq!(page.cursor.as_deref(), Some("abc123"));
assert!(page.next.is_some());
}
#[test]
fn count_defaults_to_results_len_when_zero() {
let body = br#"{"results": [{"a": 1}, {"a": 2}, {"a": 3}]}"#;
let page: Page<serde_json::Value> = Page::decode(body).expect("decode");
assert_eq!(page.count, 3);
}
#[test]
fn extract_page_parses_query_param() {
assert_eq!(
extract_page("https://tango.example/api/contracts/?page=4&limit=25"),
Some(4)
);
assert_eq!(extract_page("https://tango.example/api/contracts/"), None);
}
}