use futures_core::Stream;
use modkit_odata::{ODataQuery, Page};
use pin_project_lite::pin_project;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
pub enum PagerError<E> {
Fetch(E),
InvalidCursor(String),
}
impl<E: fmt::Display> fmt::Display for PagerError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Fetch(e) => write!(f, "Fetch error: {e}"),
Self::InvalidCursor(cursor) => write!(f, "Invalid cursor: {cursor}"),
}
}
}
impl<E: std::error::Error + 'static> std::error::Error for PagerError<E> {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Fetch(e) => Some(e),
Self::InvalidCursor(_) => None,
}
}
}
pin_project! {
pub struct CursorPager<T, E, F, Fut>
where
F: FnMut(ODataQuery) -> Fut,
Fut: Future<Output = Result<Page<T>, E>>,
{
base_query: ODataQuery,
next_cursor: Option<String>,
buffer: VecDeque<T>,
done: bool,
fetcher: F,
#[pin]
current_fetch: Option<Fut>,
}
}
impl<T, E, F, Fut> CursorPager<T, E, F, Fut>
where
F: FnMut(ODataQuery) -> Fut,
Fut: Future<Output = Result<Page<T>, E>>,
{
pub fn new(base_query: ODataQuery, fetcher: F) -> Self {
Self {
base_query,
next_cursor: None,
buffer: VecDeque::new(),
done: false,
fetcher,
current_fetch: None,
}
}
}
impl<T, E, F, Fut> Stream for CursorPager<T, E, F, Fut>
where
F: FnMut(ODataQuery) -> Fut,
Fut: Future<Output = Result<Page<T>, E>>,
{
type Item = Result<T, PagerError<E>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(item) = this.buffer.pop_front() {
return Poll::Ready(Some(Ok(item)));
}
if *this.done {
return Poll::Ready(None);
}
if let Some(fut) = this.current_fetch.as_mut().as_pin_mut() {
match fut.poll(cx) {
Poll::Ready(Ok(page)) => {
this.current_fetch.set(None);
this.next_cursor.clone_from(&page.page_info.next_cursor);
if this.next_cursor.is_none() {
*this.done = true;
}
this.buffer.extend(page.items);
continue;
}
Poll::Ready(Err(e)) => {
this.current_fetch.set(None);
*this.done = true;
return Poll::Ready(Some(Err(PagerError::Fetch(e))));
}
Poll::Pending => return Poll::Pending,
}
}
let mut query = this.base_query.clone();
if let Some(cursor_str) = this.next_cursor.as_ref() {
if let Ok(cursor) = modkit_odata::CursorV1::decode(cursor_str) {
query = query.with_cursor(cursor);
} else {
*this.done = true;
return Poll::Ready(Some(Err(PagerError::InvalidCursor(cursor_str.clone()))));
}
}
let fut = (this.fetcher)(query);
this.current_fetch.set(Some(fut));
}
}
}
pin_project! {
pub struct PagesPager<T, E, F, Fut>
where
F: FnMut(ODataQuery) -> Fut,
Fut: Future<Output = Result<Page<T>, E>>,
{
base_query: ODataQuery,
next_cursor: Option<String>,
done: bool,
fetcher: F,
#[pin]
current_fetch: Option<Fut>,
}
}
impl<T, E, F, Fut> PagesPager<T, E, F, Fut>
where
F: FnMut(ODataQuery) -> Fut,
Fut: Future<Output = Result<Page<T>, E>>,
{
pub fn new(base_query: ODataQuery, fetcher: F) -> Self {
Self {
base_query,
next_cursor: None,
done: false,
fetcher,
current_fetch: None,
}
}
}
impl<T, E, F, Fut> Stream for PagesPager<T, E, F, Fut>
where
F: FnMut(ODataQuery) -> Fut,
Fut: Future<Output = Result<Page<T>, E>>,
{
type Item = Result<Page<T>, PagerError<E>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if *this.done {
return Poll::Ready(None);
}
if let Some(fut) = this.current_fetch.as_mut().as_pin_mut() {
match fut.poll(cx) {
Poll::Ready(Ok(page)) => {
this.current_fetch.set(None);
this.next_cursor.clone_from(&page.page_info.next_cursor);
if this.next_cursor.is_none() {
*this.done = true;
}
return Poll::Ready(Some(Ok(page)));
}
Poll::Ready(Err(e)) => {
this.current_fetch.set(None);
*this.done = true;
return Poll::Ready(Some(Err(PagerError::Fetch(e))));
}
Poll::Pending => return Poll::Pending,
}
}
let mut query = this.base_query.clone();
if let Some(cursor_str) = this.next_cursor.as_ref() {
if let Ok(cursor) = modkit_odata::CursorV1::decode(cursor_str) {
query = query.with_cursor(cursor);
} else {
*this.done = true;
return Poll::Ready(Some(Err(PagerError::InvalidCursor(cursor_str.clone()))));
}
}
let fut = (this.fetcher)(query);
this.current_fetch.set(Some(fut));
}
}
}
#[cfg(test)]
#[path = "pager_tests.rs"]
mod pager_tests;