use futures::{future, stream, Future, Stream, StreamExt};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::vec;
use crate::uri::Uri;
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug)]
#[cfg_attr(test, serde(deny_unknown_fields))]
pub struct List<T> {
pub data: Vec<T>,
pub has_more: bool,
pub next_page: Option<Uri<List<T>>>,
pub total_cards: Option<usize>,
pub warnings: Option<Vec<String>>,
#[cfg(test)]
#[serde(rename = "object")]
_object: String,
}
impl<T: DeserializeOwned + Send + Sync + Unpin> List<T> {
pub fn into_page_iter(self) -> PageIter<T> {
PageIter {
curr: Some(self),
page_num: 1,
}
}
pub fn into_list_iter(self) -> ListIter<T> {
debug_assert!(self.has_more == self.next_page.is_some());
ListIter {
inner: self.data.into_iter(),
next_uri: self.next_page,
page_num: 1,
total: self.total_cards,
remaining: self.total_cards,
}
}
}
#[derive(Debug, Clone)]
pub struct ListIter<T> {
inner: vec::IntoIter<T>,
next_uri: Option<Uri<List<T>>>,
page_num: usize,
total: Option<usize>,
remaining: Option<usize>,
}
impl<T: DeserializeOwned + Send + Sync + Unpin + 'static> ListIter<T> {
pub async fn next_page(&self) -> crate::Result<Option<Self>> {
if let Some(uri) = self.next_uri.as_ref() {
let mut new_iter = uri.fetch_iter().await?;
new_iter.remaining = self.remaining.map(|r| r - self.inner.len());
new_iter.page_num = self.page_num + 1;
debug_assert_eq!(self.total, new_iter.total);
Ok(Some(new_iter))
} else {
Ok(None)
}
}
pub async fn next(&mut self) -> Option<crate::Result<T>> {
match self.inner.next() {
Some(next) => {
self.remaining = self.remaining.map(|r| r - 1);
Some(Ok(next))
},
None => match self.next_page().await {
Ok(Some(new_iter)) => {
*self = new_iter;
match self.inner.next() {
Some(next) => {
self.remaining = self.remaining.map(|r| r - 1);
Some(Ok(next))
},
None => None,
}
},
Ok(None) => None,
Err(e) => {
self.next_uri = None;
self.remaining = Some(0);
Some(Err(e))
},
},
}
}
async fn stream_next(&mut self) -> Option<impl Future<Output = crate::Result<T>>> {
match self.inner.next() {
Some(next) => {
self.remaining = self.remaining.map(|r| r - 1);
Some(future::ready(Ok(next)))
},
None => match self.next_page().await {
Ok(Some(new_iter)) => {
*self = new_iter;
match self.inner.next() {
Some(next) => {
self.remaining = self.remaining.map(|r| r - 1);
Some(future::ready(Ok(next)))
},
None => None,
}
},
Ok(None) => None,
Err(e) => {
self.next_uri = None;
self.remaining = Some(0);
Some(future::ready(Err(e)))
},
},
}
}
pub fn into_stream(self) -> impl Stream<Item = crate::Result<T>> + Unpin {
Box::pin(stream::unfold(self, |mut state| async move {
let item = state.stream_next().await;
if let Some(val) = item {
Some((val.await, state))
} else {
None
}
}))
}
pub fn into_stream_buffered(
self,
buf_factor: usize,
) -> impl Stream<Item = crate::Result<T>> + Unpin {
Box::pin(
stream::unfold(self, |mut state| async move {
let item = state.stream_next().await;
item.map(|val| (val, state))
})
.buffered(buf_factor),
)
}
pub fn into_stream_buffered_unordered(
self,
buf_factor: usize,
) -> impl Stream<Item = crate::Result<T>> + Unpin {
Box::pin(
stream::unfold(self, |mut state| async move {
let item = state.stream_next().await;
item.map(|val| (val, state))
})
.buffer_unordered(buf_factor),
)
}
pub fn size_hint(&self) -> (usize, Option<usize>) {
if let Some(len) = self.remaining {
(len, Some(len))
} else {
let len = self.inner.len();
(
len,
if self.next_uri.is_some() {
None
} else {
Some(len)
},
)
}
}
pub fn into_inner(self) -> vec::IntoIter<T> {
self.inner
}
}
pub struct PageIter<T> {
curr: Option<List<T>>,
page_num: usize,
}
impl<T: DeserializeOwned + Send + Sync + Unpin> PageIter<T> {
async fn stream_next(&mut self) -> Option<impl Future<Output = List<T>>> {
if let Some(curr) = self.curr.take() {
self.curr = match &curr.next_page {
Some(uri) => match uri.fetch().await {
Ok(page) => {
self.page_num += 1;
Some(page)
},
Err(e) => {
eprintln!("Error fetching page {} - {}", self.page_num + 1, e);
None
},
},
None => None,
};
Some(future::ready(curr))
} else {
None
}
}
pub fn into_stream(self) -> impl Stream<Item = List<T>> + Unpin {
Box::pin(stream::unfold(self, |mut state| async move {
if let Some(val) = state.stream_next().await {
Some((val.await, state))
} else {
None
}
}))
}
pub fn into_stream_buffered(self, buf_factor: usize) -> impl Stream<Item = List<T>> + Unpin {
Box::pin(
stream::unfold(self, |mut state| async move {
state.stream_next().await.map(|val| (val, state))
})
.buffered(buf_factor),
)
}
pub fn into_stream_buffered_unordered(
self,
buf_factor: usize,
) -> impl Stream<Item = List<T>> + Unpin {
Box::pin(
stream::unfold(self, |mut state| async move {
state.stream_next().await.map(|val| (val, state))
})
.buffer_unordered(buf_factor),
)
}
}