use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use futures_util::Stream;
use serde::de::DeserializeOwned;
use crate::error::Result;
use crate::http::client::HttpClient;
type PageFetcher<TRaw> = Arc<
dyn Fn(u64) -> Pin<Box<dyn Future<Output = Result<PageResponse<TRaw>>> + Send>> + Send + Sync,
>;
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct PageResponse<T> {
pub items: Vec<T>,
pub total: u64,
pub page: u64,
pub size: u64,
pub pages: u64,
}
impl<T> PageResponse<T> {
#[must_use]
pub fn new(items: Vec<T>, total: u64, page: u64, size: u64, pages: u64) -> Self {
Self {
items,
total,
page,
size,
pages,
}
}
}
impl<T> Default for PageResponse<T> {
fn default() -> Self {
Self {
items: Vec::new(),
total: 0,
page: 1,
size: 0,
pages: 0,
}
}
}
pub struct Page<TRaw, TOut = TRaw> {
inner: Arc<PageInner<TRaw, TOut>>,
}
struct PageInner<TRaw, TOut> {
items: Vec<TRaw>,
total: u64,
page: u64,
size: u64,
pages: u64,
next_fetcher: Option<PageFetcher<TRaw>>,
transform: Arc<dyn Fn(TRaw) -> TOut + Send + Sync>,
}
impl<TRaw: 'static, TOut: 'static> Page<TRaw, TOut> {
#[must_use]
pub fn raw_items(&self) -> &[TRaw] {
&self.inner.items
}
#[must_use]
pub fn items(&self) -> Vec<TOut>
where
TRaw: Clone,
{
self.inner
.items
.iter()
.cloned()
.map(|v| (self.inner.transform)(v))
.collect()
}
#[must_use]
pub fn total(&self) -> u64 {
self.inner.total
}
#[must_use]
pub fn page(&self) -> u64 {
self.inner.page
}
#[must_use]
pub fn size(&self) -> u64 {
self.inner.size
}
#[must_use]
pub fn pages(&self) -> u64 {
self.inner.pages
}
#[must_use]
pub fn has_next(&self) -> bool {
self.inner.page < self.inner.pages
}
pub async fn next_page(&self) -> Result<Option<Self>> {
if !self.has_next() {
return Ok(None);
}
let fetcher = match self.inner.next_fetcher.as_ref() {
Some(f) => Arc::clone(f),
None => return Ok(None),
};
let next_num = self.inner.page + 1;
let transform = Arc::clone(&self.inner.transform);
let next_fetcher = self.inner.next_fetcher.clone();
let resp = fetcher(next_num).await?;
Ok(Some(Self {
inner: Arc::new(PageInner {
items: resp.items,
total: resp.total,
page: resp.page,
size: resp.size,
pages: resp.pages,
next_fetcher,
transform,
}),
}))
}
#[must_use]
pub fn with_fetcher<F, Fut>(self, fetcher: F) -> Self
where
F: Fn(u64) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<PageResponse<TRaw>>> + Send + 'static,
TRaw: Clone,
{
Self {
inner: Arc::new(PageInner {
items: self.inner.items.clone(),
total: self.inner.total,
page: self.inner.page,
size: self.inner.size,
pages: self.inner.pages,
next_fetcher: Some(Arc::new(move |pn| Box::pin(fetcher(pn)))),
transform: self.inner.transform.clone(),
}),
}
}
pub fn map<TNewOut>(
self,
f: impl Fn(TOut) -> TNewOut + Send + Sync + 'static,
) -> Page<TRaw, TNewOut>
where
TRaw: Clone,
{
let prev = self.inner.transform.clone();
Page {
inner: Arc::new(PageInner {
items: self.inner.items.clone(),
total: self.inner.total,
page: self.inner.page,
size: self.inner.size,
pages: self.inner.pages,
next_fetcher: self.inner.next_fetcher.clone(),
transform: Arc::new(move |raw| f(prev(raw))),
}),
}
}
pub fn into_stream(self) -> impl Stream<Item = Result<TOut>> + Send + 'static
where
TRaw: Clone + Send + 'static,
TOut: Send + 'static,
{
let items = self.inner.items.clone();
let has_next = self.has_next();
let next_page_num = self.inner.page + 1;
let fetcher = self.inner.next_fetcher.clone();
let transform = self.inner.transform.clone();
async_stream::try_stream! {
for item in items {
yield transform(item);
}
if let Some(fetcher) = fetcher
&& has_next
{
let mut current_page = next_page_num;
loop {
let resp = (fetcher)(current_page).await?;
let is_last = resp.page >= resp.pages;
for item in resp.items {
yield transform(item);
}
if is_last {
break;
}
let next = resp.page + 1;
if next <= current_page {
break;
}
current_page = next;
}
}
}
}
}
#[allow(clippy::mismatching_type_param_order)]
impl<TRaw: 'static> Page<TRaw, TRaw> {
#[must_use]
pub fn new(items: Vec<TRaw>, total: u64, page: u64, size: u64, pages: u64) -> Self {
Self {
inner: Arc::new(PageInner {
items,
total,
page,
size,
pages,
next_fetcher: None,
transform: Arc::new(std::convert::identity),
}),
}
}
#[must_use]
pub fn from_page_response(resp: PageResponse<TRaw>) -> Self {
Self::new(resp.items, resp.total, resp.page, resp.size, resp.pages)
}
}
#[allow(clippy::mismatching_type_param_order)]
impl<TRaw: 'static> Default for Page<TRaw, TRaw> {
fn default() -> Self {
Self {
inner: Arc::new(PageInner {
items: Vec::new(),
total: 0,
page: 1,
size: 0,
pages: 0,
next_fetcher: None,
transform: Arc::new(std::convert::identity),
}),
}
}
}
impl<TRaw, TOut> Clone for Page<TRaw, TOut> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<TRaw: fmt::Debug, TOut> fmt::Debug for Page<TRaw, TOut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Page")
.field("items", &self.inner.items)
.field("total", &self.inner.total)
.field("page", &self.inner.page)
.field("size", &self.inner.size)
.field("pages", &self.inner.pages)
.finish_non_exhaustive()
}
}
#[allow(clippy::mismatching_type_param_order)]
impl<TRaw: PartialEq> PartialEq for Page<TRaw, TRaw> {
fn eq(&self, other: &Self) -> bool {
self.inner.items == other.inner.items
&& self.inner.total == other.inner.total
&& self.inner.page == other.inner.page
&& self.inner.size == other.inner.size
&& self.inner.pages == other.inner.pages
}
}
#[allow(clippy::mismatching_type_param_order)]
impl<TRaw: Eq> Eq for Page<TRaw, TRaw> {}
#[allow(clippy::mismatching_type_param_order)]
impl<TRaw: serde::Serialize> serde::Serialize for Page<TRaw, TRaw> {
fn serialize<S: serde::Serializer>(
&self,
serializer: S,
) -> std::result::Result<S::Ok, S::Error> {
use serde::ser::SerializeStruct;
let mut s = serializer.serialize_struct("Page", 5)?;
s.serialize_field("items", &self.inner.items)?;
s.serialize_field("total", &self.inner.total)?;
s.serialize_field("page", &self.inner.page)?;
s.serialize_field("size", &self.inner.size)?;
s.serialize_field("pages", &self.inner.pages)?;
s.end()
}
}
#[allow(clippy::mismatching_type_param_order)]
impl<'de, TRaw: serde::Deserialize<'de> + 'static> serde::Deserialize<'de> for Page<TRaw, TRaw> {
fn deserialize<D: serde::Deserializer<'de>>(
deserializer: D,
) -> std::result::Result<Self, D::Error> {
let resp = PageResponse::<TRaw>::deserialize(deserializer)?;
Ok(Self::from_page_response(resp))
}
}
#[doc(hidden)]
pub async fn paginate_post<T>(
http: &HttpClient,
route: &str,
body: Option<&serde_json::Value>,
page: u64,
size: u64,
reverse: bool,
) -> Result<Page<T>>
where
T: DeserializeOwned + Clone + Send + 'static,
{
let page_str = page.to_string();
let size_str = size.to_string();
let rev_str;
let mut query: Vec<(&str, &str)> = vec![("page", &page_str), ("size", &size_str)];
if reverse {
rev_str = "true".to_owned();
query.push(("reverse", &rev_str));
}
let resp: PageResponse<T> = http.post(route, body, &query).await?;
let result = Page::from_page_response(resp);
let http_clone = http.clone();
let route_owned = route.to_owned();
let body_clone = body.cloned();
Ok(result.with_fetcher(move |page_num| {
let http = http_clone.clone();
let route = route_owned.clone();
let body = body_clone.clone();
Box::pin(async move {
let page_str = page_num.to_string();
let size_str = size.to_string();
let rev_str;
let mut query: Vec<(&str, &str)> = vec![("page", &page_str), ("size", &size_str)];
if reverse {
rev_str = "true".to_owned();
query.push(("reverse", &rev_str));
}
let resp: PageResponse<T> = http.post(&route, body.as_ref(), &query).await?;
Ok(resp)
})
}))
}