use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::Stream;
use serde_json::Value;
use crate::{ApiError, HttpClient};
#[derive(Debug)]
pub struct PaginationResult<T> {
pub items: Vec<T>,
pub next_cursor: Option<String>,
pub has_next_page: bool,
}
pub struct AsyncPaginator<T> {
http_client: Arc<HttpClient>,
page_loader: Box<
dyn Fn(
Arc<HttpClient>,
Option<String>,
)
-> Pin<Box<dyn Future<Output = Result<PaginationResult<T>, ApiError>> + Send>>
+ Send
+ Sync,
>,
current_page: VecDeque<T>,
current_cursor: Option<String>,
has_next_page: bool,
loading_next:
Option<Pin<Box<dyn Future<Output = Result<PaginationResult<T>, ApiError>> + Send>>>,
}
impl<T> AsyncPaginator<T> {
pub fn new<F, Fut>(
http_client: Arc<HttpClient>,
page_loader: F,
initial_cursor: Option<String>,
) -> Result<Self, ApiError>
where
F: Fn(Arc<HttpClient>, Option<String>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Result<PaginationResult<T>, ApiError>> + Send + 'static,
{
Ok(Self {
http_client,
page_loader: Box::new(move |client, cursor| Box::pin(page_loader(client, cursor))),
current_page: VecDeque::new(),
current_cursor: initial_cursor,
has_next_page: true, loading_next: None,
})
}
pub fn has_next_page(&self) -> bool {
!self.current_page.is_empty() || self.has_next_page
}
pub async fn next_page(&mut self) -> Result<Vec<T>, ApiError> {
if !self.has_next_page {
return Ok(Vec::new());
}
let result =
(self.page_loader)(self.http_client.clone(), self.current_cursor.clone()).await?;
self.current_cursor = result.next_cursor;
self.has_next_page = result.has_next_page;
Ok(result.items)
}
}
impl<T> Stream for AsyncPaginator<T>
where
T: Unpin,
{
type Item = Result<T, ApiError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(item) = self.current_page.pop_front() {
return Poll::Ready(Some(Ok(item)));
}
if let Some(ref mut loading_future) = self.loading_next {
match loading_future.as_mut().poll(cx) {
Poll::Ready(Ok(result)) => {
self.current_page.extend(result.items);
self.current_cursor = result.next_cursor;
self.has_next_page = result.has_next_page;
self.loading_next = None;
if let Some(item) = self.current_page.pop_front() {
return Poll::Ready(Some(Ok(item)));
} else if !self.has_next_page {
return Poll::Ready(None);
}
}
Poll::Ready(Err(e)) => {
self.loading_next = None;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => return Poll::Pending,
}
}
if !self.has_next_page {
return Poll::Ready(None);
}
let future = (self.page_loader)(self.http_client.clone(), self.current_cursor.clone());
self.loading_next = Some(future);
if let Some(ref mut loading_future) = self.loading_next {
match loading_future.as_mut().poll(cx) {
Poll::Ready(Ok(result)) => {
self.current_page.extend(result.items);
self.current_cursor = result.next_cursor;
self.has_next_page = result.has_next_page;
self.loading_next = None;
if let Some(item) = self.current_page.pop_front() {
Poll::Ready(Some(Ok(item)))
} else if !self.has_next_page {
Poll::Ready(None)
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
Poll::Ready(Err(e)) => {
self.loading_next = None;
Poll::Ready(Some(Err(e)))
}
Poll::Pending => Poll::Pending,
}
} else {
Poll::Pending
}
}
}
pub struct SyncPaginator<T> {
http_client: Arc<HttpClient>,
page_loader: Box<
dyn Fn(Arc<HttpClient>, Option<String>) -> Result<PaginationResult<T>, ApiError>
+ Send
+ Sync,
>,
current_page: VecDeque<T>,
current_cursor: Option<String>,
has_next_page: bool,
}
impl<T> SyncPaginator<T> {
pub fn new<F>(
http_client: Arc<HttpClient>,
page_loader: F,
initial_cursor: Option<String>,
) -> Result<Self, ApiError>
where
F: Fn(Arc<HttpClient>, Option<String>) -> Result<PaginationResult<T>, ApiError>
+ Send
+ Sync
+ 'static,
{
Ok(Self {
http_client,
page_loader: Box::new(page_loader),
current_page: VecDeque::new(),
current_cursor: initial_cursor,
has_next_page: true, })
}
pub fn has_next_page(&self) -> bool {
!self.current_page.is_empty() || self.has_next_page
}
pub fn next_page(&mut self) -> Result<Vec<T>, ApiError> {
if !self.has_next_page {
return Ok(Vec::new());
}
let result = (self.page_loader)(self.http_client.clone(), self.current_cursor.clone())?;
self.current_cursor = result.next_cursor;
self.has_next_page = result.has_next_page;
Ok(result.items)
}
pub fn collect_all(&mut self) -> Result<Vec<T>, ApiError> {
let mut all_items = Vec::new();
while let Some(item) = self.current_page.pop_front() {
all_items.push(item);
}
while self.has_next_page {
let page_items = self.next_page()?;
all_items.extend(page_items);
}
Ok(all_items)
}
}
impl<T> Iterator for SyncPaginator<T> {
type Item = Result<T, ApiError>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.current_page.pop_front() {
return Some(Ok(item));
}
if !self.has_next_page {
return None;
}
match (self.page_loader)(self.http_client.clone(), self.current_cursor.clone()) {
Ok(result) => {
self.current_page.extend(result.items);
self.current_cursor = result.next_cursor;
self.has_next_page = result.has_next_page;
self.current_page.pop_front().map(Ok)
}
Err(e) => Some(Err(e)),
}
}
}
pub trait Paginated<T> {
fn items(&self) -> &[T];
fn next_cursor(&self) -> Option<&str>;
fn has_next_page(&self) -> bool;
}
pub trait OffsetPaginated<T> {
fn items(&self) -> &[T];
fn has_next_page(&self) -> bool;
fn page_size(&self) -> usize {
self.items().len()
}
}