use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::Stream;
use crate::error::Result;
use crate::model::{Page, Pagination};
type FetchFn<T> =
Box<dyn Fn(Pagination) -> Pin<Box<dyn Future<Output = Result<Page<T>>> + Send>> + Send>;
pub struct PageStream<T> {
fetch: FetchFn<T>,
page_size: u32,
offset: u32,
total: Option<u32>,
done: bool,
buffer: VecDeque<T>,
inflight: Option<Pin<Box<dyn Future<Output = Result<Page<T>>> + Send>>>,
}
impl<T: Send + 'static> PageStream<T> {
pub(crate) fn new(fetch: FetchFn<T>, page_size: u32) -> Self {
Self {
fetch,
page_size,
offset: 0,
buffer: VecDeque::new(),
total: None,
done: false,
inflight: None,
}
}
}
impl<T: Send + Unpin + 'static> Stream for PageStream<T> {
type Item = Result<T>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(item) = this.buffer.pop_front() {
return Poll::Ready(Some(Ok(item)));
}
if this.done {
return Poll::Ready(None);
}
if this.inflight.is_none() {
let pagination = Pagination {
offset: Some(this.offset),
limit: Some(this.page_size),
};
this.inflight = Some((this.fetch)(pagination));
}
match this.inflight.as_mut().unwrap().as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(e)) => {
this.inflight = None;
this.done = true;
Poll::Ready(Some(Err(e)))
}
Poll::Ready(Ok(page)) => {
this.inflight = None;
this.total = Some(page.total);
this.done = !page.has_more;
this.offset = this.offset.saturating_add(this.page_size);
this.buffer.extend(page.items);
match this.buffer.pop_front() {
Some(item) => Poll::Ready(Some(Ok(item))),
None => Poll::Ready(None),
}
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let buffered = self.buffer.len();
match self.total {
Some(total) => {
let remaining = (total as usize).saturating_sub(self.offset as usize) + buffered;
(buffered, Some(remaining))
}
None => (buffered, None),
}
}
}