use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_trait::async_trait;
use futures_core::{Future, Stream};
#[async_trait]
pub trait PaginationDelegate {
type Item;
type Error;
async fn next_page(&mut self) -> Result<Vec<Self::Item>, Self::Error>;
fn offset(&self) -> usize;
fn set_offset(&mut self, value: usize);
fn total_items(&self) -> Option<usize>;
}
pub enum PaginatedStream<'f, D: PaginationDelegate> {
Request {
delegate: D,
},
Pending {
#[allow(clippy::type_complexity)]
future: Pin<Box<dyn Future<Output = Result<(D, Vec<D::Item>), D::Error>> + 'f>>,
},
Ready {
delegate: D,
items: VecDeque<D::Item>,
},
Closed,
Indeterminate,
}
impl<'f, D> From<D> for PaginatedStream<'f, D>
where
D: PaginationDelegate,
{
fn from(other: D) -> PaginatedStream<'f, D> {
PaginatedStream::Request { delegate: other }
}
}
impl<'f, D> Stream for PaginatedStream<'f, D>
where
D: PaginationDelegate + Unpin + 'f,
D::Item: Unpin,
{
type Item = Result<D::Item, D::Error>;
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
use PaginatedStream::*;
let this = std::mem::replace(&mut *self, Indeterminate);
match this {
Request { mut delegate } => {
self.set(Pending {
future: Box::pin(async {
let result = delegate.next_page().await;
result.map(|items| (delegate, items))
}),
});
ctx.waker().wake_by_ref();
Poll::Pending
}
Pending { mut future } => match future.as_mut().poll(ctx) {
Poll::Ready(Ok((mut delegate, items))) => {
delegate.set_offset(delegate.offset() + items.len());
let mut items = VecDeque::from(items);
let popped = items.pop_front().unwrap();
self.set(Ready { delegate, items });
Poll::Ready(Some(Ok(popped)))
}
Poll::Ready(Err(error)) => {
self.set(Closed);
Poll::Ready(Some(Err(error)))
}
Poll::Pending => {
self.set(Pending { future });
Poll::Pending
}
},
Ready {
delegate,
mut items,
} => match items.pop_front() {
Some(item) => {
self.set(Ready { delegate, items });
Poll::Ready(Some(Ok(item)))
}
None => {
if delegate.offset() >= delegate.total_items().unwrap_or(usize::MAX) {
self.set(Closed);
Poll::Ready(None)
} else {
self.set(Request { delegate });
self.poll_next(ctx)
}
}
},
Closed => Poll::Ready(None),
Indeterminate => unreachable!(),
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
use PaginatedStream::*;
match self {
Request { delegate } | Ready { delegate, .. } => (0, delegate.total_items()),
_ => (0, None),
}
}
}