use futures::Stream;
use serde::{de::DeserializeOwned, Deserialize};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::common::*;
use crate::error::Result;
use crate::{auth, list, user};
pub trait Cursor {
type Item;
fn previous_cursor_id(&self) -> i64;
fn next_cursor_id(&self) -> i64;
fn into_inner(self) -> Vec<Self::Item>;
}
#[derive(Deserialize)]
pub struct UserCursor {
pub previous_cursor: i64,
pub next_cursor: i64,
pub users: Vec<user::TwitterUser>,
}
impl Cursor for UserCursor {
type Item = user::TwitterUser;
fn previous_cursor_id(&self) -> i64 {
self.previous_cursor
}
fn next_cursor_id(&self) -> i64 {
self.next_cursor
}
fn into_inner(self) -> Vec<Self::Item> {
self.users
}
}
#[derive(Deserialize)]
pub struct IDCursor {
pub previous_cursor: i64,
pub next_cursor: i64,
pub ids: Vec<u64>,
}
impl Cursor for IDCursor {
type Item = u64;
fn previous_cursor_id(&self) -> i64 {
self.previous_cursor
}
fn next_cursor_id(&self) -> i64 {
self.next_cursor
}
fn into_inner(self) -> Vec<Self::Item> {
self.ids
}
}
#[derive(Deserialize)]
pub struct ListCursor {
pub previous_cursor: i64,
pub next_cursor: i64,
pub lists: Vec<list::List>,
}
impl Cursor for ListCursor {
type Item = list::List;
fn previous_cursor_id(&self) -> i64 {
self.previous_cursor
}
fn next_cursor_id(&self) -> i64 {
self.next_cursor
}
fn into_inner(self) -> Vec<Self::Item> {
self.lists
}
}
#[must_use = "cursor iterators are lazy and do nothing unless consumed"]
pub struct CursorIter<T>
where
T: Cursor + DeserializeOwned,
{
link: &'static str,
token: auth::Token,
params_base: Option<ParamList>,
pub page_size: Option<i32>,
pub previous_cursor: i64,
pub next_cursor: i64,
loader: Option<FutureResponse<T>>,
iter: Option<Box<dyn Iterator<Item = Response<T::Item>> + Send>>,
}
impl<T> CursorIter<T>
where
T: Cursor + DeserializeOwned,
{
pub fn with_page_size(self, page_size: i32) -> CursorIter<T> {
if self.page_size.is_some() {
CursorIter {
page_size: Some(page_size),
previous_cursor: -1,
next_cursor: -1,
loader: None,
iter: None,
..self
}
} else {
self
}
}
pub fn call(&self) -> impl Future<Output = Result<Response<T>>> {
let params = self
.params_base
.as_ref()
.cloned()
.unwrap_or_default()
.add_param("cursor", self.next_cursor.to_string())
.add_opt_param("count", self.page_size.map_string());
let req = get(self.link, &self.token, Some(¶ms));
request_with_json_response(req)
}
pub(crate) fn new(
link: &'static str,
token: &auth::Token,
params_base: Option<ParamList>,
page_size: Option<i32>,
) -> CursorIter<T> {
CursorIter {
link,
token: token.clone(),
params_base,
page_size,
previous_cursor: -1,
next_cursor: -1,
loader: None,
iter: None,
}
}
}
impl<T> Stream for CursorIter<T>
where
T: Cursor + DeserializeOwned + 'static,
T::Item: Unpin + Send,
{
type Item = Result<Response<T::Item>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
if let Some(mut fut) = self.loader.take() {
match Pin::new(&mut fut).poll(cx) {
Poll::Pending => {
self.loader = Some(fut);
return Poll::Pending;
}
Poll::Ready(Ok(resp)) => {
self.previous_cursor = resp.previous_cursor_id();
self.next_cursor = resp.next_cursor_id();
let resp = Response::map(resp, |r| r.into_inner());
let rate = resp.rate_limit_status;
let mut iter = Box::new(resp.response.into_iter().map(move |item| Response {
rate_limit_status: rate,
response: item,
}));
let first = iter.next();
self.iter = Some(iter);
match first {
Some(item) => return Poll::Ready(Some(Ok(item))),
None => return Poll::Ready(None),
}
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
}
}
if let Some(ref mut results) = self.iter {
if let Some(item) = results.next() {
return Poll::Ready(Some(Ok(item)));
} else if self.next_cursor == 0 {
return Poll::Ready(None);
}
}
self.loader = Some(Box::pin(self.call()));
self.poll_next(cx)
}
}