use crate::{
error::ErrorKind,
http::{
headers::HeaderName, policies::create_public_api_span, response::Response, Context,
DeserializeWith, Format, JsonFormat,
},
tracing::{Span, SpanStatus},
};
use async_trait::async_trait;
use futures::{stream::unfold, FutureExt, Stream};
use std::{
fmt,
future::Future,
ops::Deref,
pin::Pin,
str::FromStr,
sync::{Arc, Mutex},
task,
};
#[derive(Debug, Default, PartialEq, Eq)]
pub enum PagerState<C: AsRef<str>> {
#[default]
Initial,
More(C),
}
impl<C: AsRef<str>> PagerState<C> {
#[inline]
pub fn map<U, F>(self, f: F) -> PagerState<U>
where
U: AsRef<str>,
F: FnOnce(C) -> U,
{
match self {
PagerState::Initial => PagerState::Initial,
PagerState::More(c) => PagerState::More(f(c)),
}
}
#[inline]
pub const fn as_ref(&self) -> PagerState<&C> {
match *self {
PagerState::Initial => PagerState::Initial,
PagerState::More(ref x) => PagerState::More(x),
}
}
#[inline]
pub fn as_deref(&self) -> PagerState<&C::Target>
where
C: Deref,
C::Target: AsRef<str>,
{
self.as_ref().map(|t| t.deref())
}
}
impl<C: Clone + AsRef<str>> Clone for PagerState<C> {
#[inline]
fn clone(&self) -> Self {
match self {
PagerState::Initial => PagerState::Initial,
PagerState::More(c) => PagerState::More(c.clone()),
}
}
}
pub enum PagerResult<P, C: AsRef<str>> {
More {
response: P,
continuation: C,
},
Done {
response: P,
},
}
impl<P, F> PagerResult<Response<P, F>, String> {
pub fn from_response_header(response: Response<P, F>, header_name: &HeaderName) -> Self {
match response.headers().get_optional_string(header_name) {
Some(continuation) => PagerResult::More {
response,
continuation,
},
None => PagerResult::Done { response },
}
}
}
impl<P, C: fmt::Debug + AsRef<str>> fmt::Debug for PagerResult<P, C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::More { continuation, .. } => f
.debug_struct("More")
.field("continuation", &continuation)
.finish_non_exhaustive(),
Self::Done { .. } => f.debug_struct("Done").finish_non_exhaustive(),
}
}
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait Page {
type Item;
type IntoIter: Iterator<Item = Self::Item>;
async fn into_items(self) -> crate::Result<Self::IntoIter>;
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<P, F> Page for Response<P, F>
where
P: DeserializeWith<F> + Page + Send,
F: Format + Send,
{
type Item = P::Item;
type IntoIter = P::IntoIter;
async fn into_items(self) -> crate::Result<Self::IntoIter> {
let page: P = self.into_model()?;
page.into_items().await
}
}
pub type Pager<P, F = JsonFormat> = ItemIterator<Response<P, F>>;
#[derive(Clone, Debug, Default)]
pub struct PagerOptions<'a> {
pub context: Context<'a>,
}
#[cfg(not(target_arch = "wasm32"))]
type BoxedStream<P> = Box<dyn Stream<Item = crate::Result<P>> + Send>;
#[cfg(target_arch = "wasm32")]
type BoxedStream<P> = Box<dyn Stream<Item = crate::Result<P>>>;
#[pin_project::pin_project]
pub struct ItemIterator<P: Page> {
#[pin]
stream: Pin<BoxedStream<P>>,
continuation_token: Option<String>,
next_token: Arc<Mutex<Option<String>>>,
current: Option<P::IntoIter>,
}
impl<P: Page> ItemIterator<P> {
pub fn from_callback<
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + FromStr + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, Context<'static>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef<str> + FromStr + 'static,
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, Context<'static>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
>(
make_request: F,
options: Option<PagerOptions<'static>>,
) -> Self
where
<C as FromStr>::Err: std::error::Error,
{
let options = options.unwrap_or_default();
let next_token = Arc::new(Mutex::new(None::<String>));
let stream = iter_from_callback(make_request, options.context.clone(), next_token.clone());
Self {
stream: Box::pin(stream),
continuation_token: None,
next_token,
current: None,
}
}
pub fn from_stream<
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = crate::Result<P>> + Send + 'static,
#[cfg(target_arch = "wasm32")] S: Stream<Item = crate::Result<P>> + 'static,
>(
stream: S,
) -> Self {
Self {
stream: Box::pin(stream),
continuation_token: None,
next_token: Default::default(),
current: None,
}
}
pub fn into_pages(self) -> PageIterator<P> {
if let Ok(mut token) = self.next_token.lock() {
*token = self.continuation_token;
}
PageIterator {
stream: self.stream,
continuation_token: self.next_token,
}
}
pub fn with_continuation_token(self, continuation_token: Option<String>) -> Self {
if let Ok(mut token) = self.next_token.lock() {
*token = continuation_token;
}
self
}
pub fn continuation_token(&self) -> Option<String> {
self.continuation_token.clone()
}
}
impl<P: Page> futures::Stream for ItemIterator<P> {
type Item = crate::Result<P::Item>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
let mut projected_self = self.project();
loop {
if let Some(current) = projected_self.current.as_mut() {
if let Some(item) = current.next() {
return task::Poll::Ready(Some(Ok(item)));
}
*projected_self.current = None;
}
if let Ok(token) = projected_self.next_token.lock() {
tracing::trace!(
"updating continuation_token from {:?} to {:?}",
projected_self.continuation_token,
token
);
*projected_self.continuation_token = token.clone();
}
match projected_self.stream.as_mut().poll_next(cx) {
task::Poll::Ready(page) => match page {
Some(Ok(page)) => match page.into_items().poll_unpin(cx) {
task::Poll::Ready(Ok(iter)) => {
*projected_self.current = Some(iter);
continue;
}
task::Poll::Ready(Err(err)) => return task::Poll::Ready(Some(Err(err))),
task::Poll::Pending => return task::Poll::Pending,
},
Some(Err(err)) => return task::Poll::Ready(Some(Err(err))),
None => return task::Poll::Ready(None),
},
task::Poll::Pending => return task::Poll::Pending,
}
}
}
}
impl<P: Page> fmt::Debug for ItemIterator<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ItemIterator").finish_non_exhaustive()
}
}
#[pin_project::pin_project]
pub struct PageIterator<P> {
#[pin]
stream: Pin<BoxedStream<P>>,
continuation_token: Arc<Mutex<Option<String>>>,
}
impl<P> PageIterator<P> {
pub fn from_callback<
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + FromStr + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, Context<'static>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef<str> + FromStr + 'static,
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, Context<'static>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
>(
make_request: F,
options: Option<PagerOptions<'static>>,
) -> Self
where
<C as FromStr>::Err: std::error::Error,
{
let options = options.unwrap_or_default();
let continuation_token = Arc::new(Mutex::new(None::<String>));
let stream = iter_from_callback(
make_request,
options.context.clone(),
continuation_token.clone(),
);
Self {
stream: Box::pin(stream),
continuation_token,
}
}
pub fn from_stream<
#[cfg(not(target_arch = "wasm32"))] S: Stream<Item = crate::Result<P>> + Send + 'static,
#[cfg(target_arch = "wasm32")] S: Stream<Item = crate::Result<P>> + 'static,
>(
stream: S,
) -> Self {
Self {
stream: Box::pin(stream),
continuation_token: Default::default(),
}
}
pub fn with_continuation_token(self, continuation_token: String) -> Self {
if let Ok(mut token_guard) = self.continuation_token.lock() {
*token_guard = Some(continuation_token);
}
self
}
pub fn continuation_token(&self) -> Option<String> {
if let Ok(token) = self.continuation_token.lock() {
return token.clone();
}
None
}
}
impl<P> futures::Stream for PageIterator<P> {
type Item = crate::Result<P>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
}
impl<P> fmt::Debug for PageIterator<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PageIterator").finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Eq)]
enum State<C> {
Init,
More(C),
Done,
}
impl<C> PartialEq for State<C> {
fn eq(&self, other: &Self) -> bool {
matches!(
(self, other),
(State::Init, State::Init) | (State::Done, State::Done)
)
}
}
#[derive(Debug)]
struct StreamState<'a, C, F> {
state: State<C>,
make_request: F,
continuation_token: Arc<Mutex<Option<String>>>,
ctx: Context<'a>,
added_span: bool,
}
fn iter_from_callback<
P,
#[cfg(not(target_arch = "wasm32"))] C: AsRef<str> + FromStr + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] F: Fn(PagerState<C>, Context<'static>) -> Fut + Send + 'static,
#[cfg(not(target_arch = "wasm32"))] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + Send + 'static,
#[cfg(target_arch = "wasm32")] C: AsRef<str> + FromStr + 'static,
#[cfg(target_arch = "wasm32")] F: Fn(PagerState<C>, Context<'static>) -> Fut + 'static,
#[cfg(target_arch = "wasm32")] Fut: Future<Output = crate::Result<PagerResult<P, C>>> + 'static,
>(
make_request: F,
ctx: Context<'static>,
continuation_token: Arc<Mutex<Option<String>>>,
) -> impl Stream<Item = crate::Result<P>> + 'static
where
<C as FromStr>::Err: std::error::Error,
{
unfold(
StreamState {
state: State::Init,
make_request,
continuation_token,
ctx,
added_span: false,
},
|mut stream_state| async move {
if stream_state.state != State::Done {
let result = match stream_state.continuation_token.lock() {
Ok(next_token) => match next_token.as_deref() {
Some(n) => match n.parse() {
Ok(s) => Ok(State::More(s)),
Err(err) => Err(crate::Error::with_message_fn(
ErrorKind::DataConversion,
|| format!("invalid continuation token: {err}"),
)),
},
None => Ok(State::Init),
},
Err(err) => Err(crate::Error::with_message_fn(ErrorKind::Other, || {
format!("continuation token lock: {err}")
})),
};
match result {
Ok(state) => stream_state.state = state,
Err(err) => {
stream_state.state = State::Done;
return Some((Err(err), stream_state));
}
}
}
let result = match stream_state.state {
State::Init => {
tracing::debug!("initial page request");
let span = create_public_api_span(&stream_state.ctx, None, None);
if let Some(ref s) = span {
stream_state.added_span = true;
stream_state.ctx = stream_state.ctx.with_value(s.clone());
}
(stream_state.make_request)(PagerState::Initial, stream_state.ctx.clone()).await
}
State::More(n) => {
tracing::debug!("subsequent page request to {:?}", AsRef::<str>::as_ref(&n));
(stream_state.make_request)(PagerState::More(n), stream_state.ctx.clone()).await
}
State::Done => {
tracing::debug!("done");
if let Ok(mut token) = stream_state.continuation_token.lock() {
*token = None;
}
return None;
}
};
let (item, next_state) = match result {
Err(e) => {
if stream_state.added_span {
if let Some(span) = stream_state.ctx.value::<Arc<dyn Span>>() {
span.set_status(SpanStatus::Error {
description: e.to_string(),
});
span.set_attribute("error.type", e.kind().to_string().into());
span.end();
}
}
stream_state.state = State::Done;
return Some((Err(e), stream_state));
}
Ok(PagerResult::More {
response,
continuation: next_token,
}) => {
if let Ok(mut token) = stream_state.continuation_token.lock() {
*token = Some(next_token.as_ref().into());
}
(Ok(response), State::More(next_token))
}
Ok(PagerResult::Done { response }) => {
if let Ok(mut token) = stream_state.continuation_token.lock() {
*token = None;
}
if stream_state.added_span {
if let Some(span) = stream_state.ctx.value::<Arc<dyn Span>>() {
span.end();
}
}
(Ok(response), State::Done)
}
};
stream_state.state = next_state;
Some((item, stream_state))
},
)
}
#[cfg(test)]
mod tests {
use crate::{
error::ErrorKind,
http::{
headers::{HeaderName, HeaderValue},
pager::{PageIterator, Pager, PagerResult, PagerState},
Context, RawResponse, Response, StatusCode,
},
};
use async_trait::async_trait;
use futures::{StreamExt as _, TryStreamExt as _};
use serde::Deserialize;
use std::{collections::HashMap, future::Future, pin::Pin};
#[derive(Deserialize, Debug, PartialEq, Eq)]
struct Page {
pub items: Vec<i32>,
pub page: Option<i32>,
}
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl super::Page for Page {
type Item = i32;
type IntoIter = <Vec<i32> as IntoIterator>::IntoIter;
async fn into_items(self) -> crate::Result<Self::IntoIter> {
Ok(self.items.into_iter())
}
}
#[tokio::test]
async fn callback_item_pagination() {
let pager: Pager<Page> = Pager::from_callback(
|continuation: PagerState<String>, _ctx| async move {
match continuation {
PagerState::Initial => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-1"),
)])
.into(),
r#"{"items":[1],"page":1}"#,
)
.into(),
continuation: "1".into(),
}),
PagerState::More(ref i) if i == "1" => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-2"),
)])
.into(),
r#"{"items":[2],"page":2}"#,
)
.into(),
continuation: "2".into(),
}),
PagerState::More(ref i) if i == "2" => Ok(PagerResult::Done {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-3"),
)])
.into(),
r#"{"items":[3],"page":3}"#,
)
.into(),
}),
_ => {
panic!("Unexpected continuation value")
}
}
},
None,
);
let items: Vec<i32> = pager.try_collect().await.unwrap();
assert_eq!(vec![1, 2, 3], items.as_slice())
}
#[tokio::test]
async fn callback_item_pagination_error() {
let pager: Pager<Page> = Pager::from_callback(
|continuation: PagerState<String>, _ctx| async move {
match continuation {
PagerState::Initial => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-1"),
)])
.into(),
r#"{"items":[1],"page":1}"#,
)
.into(),
continuation: "1".into(),
}),
PagerState::More(ref i) if i == "1" => Err(crate::Error::with_message(
crate::error::ErrorKind::Other,
"yon request didst fail",
)),
_ => {
panic!("Unexpected continuation value")
}
}
},
None,
);
let pages: Vec<Result<(String, Page), crate::Error>> = pager
.into_pages()
.then(|r| async move {
let r = r?;
let header = r
.headers()
.get_optional_string(&HeaderName::from_static("x-test-header"))
.unwrap();
let body = r.into_model()?;
Ok((header, body))
})
.collect()
.await;
assert_eq!(2, pages.len());
assert_eq!(
&(
"page-1".to_string(),
Page {
items: vec![1],
page: Some(1)
}
),
pages[0].as_ref().unwrap()
);
let err = pages[1].as_ref().unwrap_err();
assert_eq!(&crate::error::ErrorKind::Other, err.kind());
assert_eq!("yon request didst fail", format!("{}", err));
}
#[tokio::test]
async fn page_iterator_with_continuation_token() {
let mut first_pager: PageIterator<Response<Page>> =
PageIterator::from_callback(make_three_page_callback(), None);
assert_eq!(first_pager.continuation_token(), None);
let first_page = first_pager
.next()
.await
.expect("expected first page")
.expect("expected successful first page")
.into_model()
.expect("expected page");
assert_eq!(first_page.page, Some(1));
assert_eq!(first_page.items, vec![1, 2, 3]);
let continuation_token = first_pager
.continuation_token()
.expect("expected continuation_token from first page");
assert_eq!(continuation_token, "next-token-1");
let mut second_pager: PageIterator<Response<Page>> =
PageIterator::from_callback(make_three_page_callback(), None)
.with_continuation_token(continuation_token);
assert_eq!(
second_pager.continuation_token(),
Some("next-token-1".into())
);
let second_page = second_pager
.next()
.await
.expect("expected second page")
.expect("expected successful second page")
.into_model()
.expect("expected page");
assert_eq!(second_page.page, Some(2));
assert_eq!(second_page.items, vec![4, 5, 6]);
assert_eq!(
second_pager.continuation_token(),
Some("next-token-2".into())
);
let last_page = second_pager
.next()
.await
.expect("expected last page")
.expect("expected successful last page")
.into_model()
.expect("expected page");
assert_eq!(last_page.page, None);
assert_eq!(last_page.items, vec![7, 8, 9]);
assert_eq!(second_pager.continuation_token(), None);
}
#[tokio::test]
async fn page_iterator_from_item_iterator_after_first_page() {
let mut item_pager: Pager<Page> = Pager::from_callback(make_three_page_callback(), None);
assert_eq!(item_pager.continuation_token(), None);
let first_item = item_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = item_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let third_item = item_pager
.next()
.await
.expect("expected third item")
.expect("expected successful third item");
assert_eq!(third_item, 3);
let mut page_pager = item_pager.into_pages();
assert_eq!(page_pager.continuation_token(), None);
let first_page = page_pager
.next()
.await
.expect("expected first page")
.expect("expected successful first page")
.into_model()
.expect("expected page");
assert_eq!(first_page.page, Some(1));
assert_eq!(first_page.items, vec![1, 2, 3]);
let continuation_token = page_pager
.continuation_token()
.expect("expected continuation_token from first page");
assert_eq!(continuation_token, "next-token-1");
}
#[tokio::test]
async fn page_iterator_from_item_iterator_second_page_first_item() {
let mut item_pager: Pager<Page> = Pager::from_callback(make_three_page_callback(), None);
assert_eq!(item_pager.continuation_token(), None);
let first_item = item_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = item_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let third_item = item_pager
.next()
.await
.expect("expected third item")
.expect("expected successful third item");
assert_eq!(third_item, 3);
let fourth_item = item_pager
.next()
.await
.expect("expected fourth item")
.expect("expected successful fourth item");
assert_eq!(fourth_item, 4);
let mut page_pager = item_pager.into_pages();
assert_eq!(page_pager.continuation_token(), Some("next-token-1".into()));
let second_page = page_pager
.next()
.await
.expect("expected second page")
.expect("expected successful second page")
.into_model()
.expect("expected page");
assert_eq!(second_page.page, Some(2));
assert_eq!(second_page.items, vec![4, 5, 6]);
let continuation_token = page_pager
.continuation_token()
.expect("expected continuation_token from second page");
assert_eq!(continuation_token, "next-token-2");
}
#[tokio::test]
async fn item_iterator_with_continuation_token() {
let mut first_pager: Pager<Page> = Pager::from_callback(make_three_page_callback(), None);
assert_eq!(first_pager.continuation_token(), None);
let first_item = first_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = first_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let continuation_token = first_pager.continuation_token();
assert_eq!(continuation_token, None);
let mut second_pager: Pager<Page> = Pager::from_callback(make_three_page_callback(), None)
.with_continuation_token(continuation_token);
assert_eq!(second_pager.continuation_token(), None);
let first_item_second_pager = second_pager
.next()
.await
.expect("expected first item from second pager")
.expect("expected successful first item from second pager");
assert_eq!(first_item_second_pager, 1);
let items: Vec<i32> = second_pager.try_collect().await.unwrap();
assert_eq!(items.as_slice(), vec![2, 3, 4, 5, 6, 7, 8, 9]);
}
#[tokio::test]
async fn item_iterator_continuation_second_page_second_item() {
let mut first_pager: Pager<Page> = Pager::from_callback(make_three_page_callback(), None);
assert_eq!(first_pager.continuation_token(), None);
let first_item = first_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = first_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let third_item = first_pager
.next()
.await
.expect("expected third item")
.expect("expected successful third item");
assert_eq!(third_item, 3);
let fourth_item = first_pager
.next()
.await
.expect("expected fourth item")
.expect("expected successful fourth item");
assert_eq!(fourth_item, 4);
let fifth_item = first_pager
.next()
.await
.expect("expected fifth item")
.expect("expected successful fifth item");
assert_eq!(fifth_item, 5);
let continuation_token = first_pager.continuation_token();
assert_eq!(continuation_token.as_deref(), Some("next-token-1"));
let mut second_pager: Pager<Page> = Pager::from_callback(make_three_page_callback(), None)
.with_continuation_token(continuation_token);
let first_item_second_pager = second_pager
.next()
.await
.expect("expected first item from second pager")
.expect("expected successful first item from second pager");
assert_eq!(first_item_second_pager, 4);
let items: Vec<i32> = second_pager.try_collect().await.unwrap();
assert_eq!(items.as_slice(), vec![5, 6, 7, 8, 9]);
}
#[tokio::test]
async fn item_iterator_continuation_after_first_page() {
let mut first_pager: Pager<Page> = Pager::from_callback(make_three_page_callback(), None);
assert_eq!(first_pager.continuation_token(), None);
let first_item = first_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = first_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let third_item = first_pager
.next()
.await
.expect("expected third item")
.expect("expected successful third item");
assert_eq!(third_item, 3);
let continuation_token = first_pager.continuation_token();
assert_eq!(continuation_token, None);
let mut second_pager: Pager<Page> = Pager::from_callback(make_three_page_callback(), None)
.with_continuation_token(continuation_token);
let first_item_second_pager = second_pager
.next()
.await
.expect("expected first item from first pager")
.expect("expected successful first item from first pager");
assert_eq!(first_item_second_pager, 1);
let items: Vec<i32> = second_pager.try_collect().await.unwrap();
assert_eq!(items.as_slice(), vec![2, 3, 4, 5, 6, 7, 8, 9]);
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct ContinuationToken(String);
impl AsRef<str> for ContinuationToken {
fn as_ref(&self) -> &str {
&self.0
}
}
impl std::str::FromStr for ContinuationToken {
type Err = std::io::Error;
fn from_str(_s: &str) -> Result<Self, Self::Err> {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"ContinuationToken parsing always fails",
))
}
}
#[tokio::test]
async fn callback_item_pagination_from_str_error() {
let mut pager: Pager<Page> = Pager::from_callback(
|continuation: PagerState<ContinuationToken>, _ctx| async move {
match continuation {
PagerState::Initial => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-1"),
)])
.into(),
r#"{"items":[1],"page":1}"#,
)
.into(),
continuation: ContinuationToken("unparseable-token".to_string()),
}),
_ => {
panic!("Unexpected continuation value: {:?}", continuation)
}
}
},
None,
);
let first_item = pager.try_next().await.expect("expected first page");
assert_eq!(first_item, Some(1));
assert!(
matches!(pager.try_next().await, Err(err) if err.kind() == &ErrorKind::DataConversion)
);
}
#[allow(clippy::type_complexity)]
fn make_three_page_callback() -> impl Fn(
PagerState<String>,
Context<'_>,
) -> Pin<
Box<dyn Future<Output = crate::Result<PagerResult<Response<Page>, String>>> + Send>,
> + Send
+ 'static {
|continuation: PagerState<String>, _ctx| {
Box::pin(async move {
match continuation.as_deref() {
PagerState::Initial => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
Default::default(),
r#"{"items":[1,2,3],"page":1}"#,
)
.into(),
continuation: "next-token-1".to_string(),
}),
PagerState::More("next-token-1") => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-2"),
)])
.into(),
r#"{"items":[4,5,6],"page":2}"#,
)
.into(),
continuation: "next-token-2".to_string(),
}),
PagerState::More("next-token-2") => Ok(PagerResult::Done {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-3"),
)])
.into(),
r#"{"items":[7,8,9]}"#,
)
.into(),
}),
_ => {
panic!("Unexpected continuation value: {:?}", continuation)
}
}
})
}
}
}