use crate::{
error::ErrorKind,
http::{
headers::HeaderName, policies::create_public_api_span, response::Response, Context,
DeserializeWith, Format, JsonFormat, Url,
},
tracing::{Span, SpanStatus},
};
use async_trait::async_trait;
use futures::{stream::FusedStream, FutureExt, Stream};
use pin_project::pin_project;
use std::{fmt, future::Future, pin::Pin, sync::Arc, task};
use typespec::error::ResultExt;
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub enum PagerState {
#[default]
Initial,
More(PagerContinuation),
}
pub enum PagerResult<P> {
More {
response: P,
continuation: PagerContinuation,
},
Done {
response: P,
},
}
impl<P, F> PagerResult<Response<P, F>> {
pub fn from_response_header(response: Response<P, F>, header_name: &HeaderName) -> Self {
match response.headers().get_optional_string(header_name) {
Some(continuation) => PagerResult::More {
response,
continuation: PagerContinuation::Token(continuation),
},
None => PagerResult::Done { response },
}
}
}
impl<P> fmt::Debug for PagerResult<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::More { continuation, .. } => f
.debug_struct("More")
.field("continuation", continuation)
.finish_non_exhaustive(),
Self::Done { .. } => f.debug_struct("Done").finish_non_exhaustive(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[non_exhaustive]
pub enum PagerContinuation {
Link(Url),
Token(String),
}
impl AsRef<str> for PagerContinuation {
fn as_ref(&self) -> &str {
match self {
Self::Link(next_link) => next_link.as_str(),
Self::Token(continuation_token) => continuation_token.as_str(),
}
}
}
impl fmt::Display for PagerContinuation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_ref())
}
}
impl From<PagerContinuation> for String {
fn from(value: PagerContinuation) -> Self {
match value {
PagerContinuation::Link(next_link) => String::from(next_link.as_str()),
PagerContinuation::Token(continuation_token) => continuation_token,
}
}
}
impl TryFrom<PagerContinuation> for Url {
type Error = crate::Error;
fn try_from(value: PagerContinuation) -> Result<Self, Self::Error> {
match value {
PagerContinuation::Link(next_link) => Ok(next_link),
PagerContinuation::Token(continuation_token) => continuation_token
.parse()
.with_kind(ErrorKind::DataConversion),
}
}
}
#[async_trait]
pub trait Page {
type Item;
type IntoIter: Iterator<Item = Self::Item>;
async fn into_items(self) -> crate::Result<Self::IntoIter>;
}
#[async_trait]
impl<P, F> Page for Response<P, F>
where
P: DeserializeWith<F> + Page + Send,
F: Format + Send,
{
type Item = P::Item;
type IntoIter = P::IntoIter;
async fn into_items(self) -> crate::Result<Self::IntoIter> {
let page: P = self.into_model()?;
page.into_items().await
}
}
pub type Pager<P, F = JsonFormat> = ItemIterator<Response<P, F>>;
pub type PagerResultFuture<P> =
Pin<Box<dyn Future<Output = crate::Result<PagerResult<P>>> + Send + 'static>>;
type PagerFn<P> = Box<dyn Fn(PagerState, PagerOptions<'static>) -> PagerResultFuture<P> + Send>;
#[derive(Clone)]
pub struct PagerOptions<'a> {
pub context: Context<'a>,
pub continuation: Option<PagerContinuation>,
}
impl<'a> fmt::Debug for PagerOptions<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PagerOptions")
.field("context", &self.context)
.field("continuation", &self.continuation)
.finish()
}
}
impl<'a> Default for PagerOptions<'a> {
fn default() -> Self {
PagerOptions {
context: Context::new(),
continuation: None,
}
}
}
#[pin_project(project = ItemIteratorProjection, project_replace = ItemIteratorProjectionOwned)]
pub struct ItemIterator<P>
where
P: Page + Send,
{
#[pin]
iter: PageIterator<P>,
continuation: Option<PagerContinuation>,
current: Option<P::IntoIter>,
}
impl<P> ItemIterator<P>
where
P: Page + Send,
{
pub fn new<
F: Fn(PagerState, PagerOptions<'static>) -> PagerResultFuture<P> + Send + 'static,
>(
make_request: F,
options: Option<PagerOptions<'static>>,
) -> Self {
let options = options.unwrap_or_default();
let continuation_token = options.continuation.clone();
Self {
iter: PageIterator::new(make_request, Some(options)),
continuation: continuation_token,
current: None,
}
}
pub fn continuation(&self) -> Option<&PagerContinuation> {
self.continuation.as_ref()
}
pub fn into_continuation(self) -> Option<PagerContinuation> {
self.continuation
}
pub fn into_pages(self) -> PageIterator<P> {
let mut iter = self.iter;
iter.options.continuation = self.continuation;
iter.state = iter
.options
.continuation
.as_ref()
.map_or_else(|| State::Init, |_| State::More);
iter
}
}
impl<P> Stream for ItemIterator<P>
where
P: Page + Send,
{
type Item = crate::Result<P::Item>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut this = self.project();
let mut iter = this.iter.as_mut();
loop {
if let Some(current) = this.current.as_mut() {
if let Some(item) = current.next() {
return task::Poll::Ready(Some(Ok(item)));
}
*this.current = None;
}
tracing::trace!(
"updating continuation from {:?} to {:?}",
&this.continuation,
iter.continuation(),
);
*this.continuation = iter.options.continuation.clone();
match iter.as_mut().poll_next(cx) {
task::Poll::Ready(page) => match page {
Some(Ok(page)) => match page.into_items().poll_unpin(cx) {
task::Poll::Ready(Ok(iter)) => {
*this.current = Some(iter);
continue;
}
task::Poll::Ready(Err(err)) => return task::Poll::Ready(Some(Err(err))),
task::Poll::Pending => return task::Poll::Pending,
},
Some(Err(err)) => return task::Poll::Ready(Some(Err(err))),
None => return task::Poll::Ready(None),
},
task::Poll::Pending => return task::Poll::Pending,
}
}
}
}
impl<P> fmt::Debug for ItemIterator<P>
where
P: Page + Send,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ItemIterator")
.field("iter", &self.iter)
.field("continuation", &self.continuation)
.finish_non_exhaustive()
}
}
impl<P> FusedStream for ItemIterator<P>
where
P: Page + Send,
{
fn is_terminated(&self) -> bool {
self.iter.is_terminated()
}
}
#[must_use = "streams do nothing unless polled"]
#[pin_project(project = PageIteratorProjection, project_replace = PageIteratorProjectionOwned)]
pub struct PageIterator<P>
where
P: Send,
{
#[pin]
make_request: PagerFn<P>,
options: PagerOptions<'static>,
state: State<P>,
added_span: bool,
}
impl<P> PageIterator<P>
where
P: Send,
{
pub fn new<
F: Fn(PagerState, PagerOptions<'static>) -> PagerResultFuture<P> + Send + 'static,
>(
make_request: F,
options: Option<PagerOptions<'static>>,
) -> Self {
let options = options.unwrap_or_default();
let state = options
.continuation
.as_ref()
.map_or_else(|| State::Init, |_| State::More);
Self {
make_request: Box::new(make_request),
options,
state,
added_span: false,
}
}
pub fn continuation(&self) -> Option<&PagerContinuation> {
self.options.continuation.as_ref()
}
pub fn into_continuation(self) -> Option<PagerContinuation> {
self.options.continuation
}
}
impl<P> Stream for PageIterator<P>
where
P: Send,
{
type Item = crate::Result<P>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
if *this.state == State::Init || this.options.continuation.is_some() {
tracing::debug!("establish a public API span for new pager.");
let span = create_public_api_span(&this.options.context, None, None);
if let Some(ref s) = span {
*this.added_span = true;
this.options.context.insert(s.clone());
}
}
let result = match *this.state {
State::Init => {
tracing::debug!("initial page request");
let options = this.options.clone();
let mut fut = (this.make_request)(PagerState::Initial, options);
match fut.poll_unpin(cx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
*this.state = State::Pending(fut);
return task::Poll::Pending;
}
}
}
State::Pending(ref mut fut) => task::ready!(fut.poll_unpin(cx)),
State::More => {
let options = this.options.clone();
let continuation_token = options
.continuation
.clone()
.expect("expected continuation_token");
tracing::debug!("subsequent page request to {:?}", &continuation_token,);
let mut fut = (this.make_request)(PagerState::More(continuation_token), options);
match fut.poll_unpin(cx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
*this.state = State::Pending(fut);
return task::Poll::Pending;
}
}
}
State::Done => {
tracing::debug!("done");
this.options.continuation = None;
return task::Poll::Ready(None);
}
};
match result {
Err(e) => {
if *this.added_span {
if let Some(span) = this.options.context.value::<Arc<dyn Span>>() {
span.set_status(SpanStatus::Error {
description: e.to_string(),
});
span.set_attribute("error.type", e.kind().to_string().into());
span.end();
}
}
*this.state = State::Done;
task::Poll::Ready(Some(Err(e)))
}
Ok(PagerResult::More {
response,
continuation: continuation_token,
}) => {
this.options.continuation = Some(continuation_token);
*this.state = State::More;
task::Poll::Ready(Some(Ok(response)))
}
Ok(PagerResult::Done { response }) => {
this.options.continuation = None;
*this.state = State::Done;
if *this.added_span {
if let Some(span) = this.options.context.value::<Arc<dyn Span>>() {
span.end();
}
}
task::Poll::Ready(Some(Ok(response)))
}
}
}
}
impl<P> fmt::Debug for PageIterator<P>
where
P: Send,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PageIterator")
.field("continuation_token", &self.options.continuation)
.field("options", &self.options)
.field("state", &self.state)
.field("added_span", &self.added_span)
.finish_non_exhaustive()
}
}
impl<P> FusedStream for PageIterator<P>
where
P: Send,
{
fn is_terminated(&self) -> bool {
self.state == State::Done
}
}
enum State<P> {
Init,
Pending(PagerResultFuture<P>),
More,
Done,
}
impl<P> fmt::Debug for State<P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
State::Init => f.write_str("Init"),
State::Pending(..) => f.debug_tuple("Pending").finish_non_exhaustive(),
State::More => f.write_str("More"),
State::Done => f.write_str("Done"),
}
}
}
impl<P> PartialEq for State<P> {
fn eq(&self, other: &Self) -> bool {
matches!(
(self, other),
(State::Init, State::Init) | (State::Done, State::Done)
)
}
}
#[cfg(test)]
mod tests {
use super::{
ItemIterator, PageIterator, Pager, PagerContinuation, PagerOptions, PagerResult, PagerState,
};
use crate::http::{
headers::{HeaderName, HeaderValue},
pager::PagerResultFuture,
RawResponse, Response, StatusCode,
};
use async_trait::async_trait;
use futures::{StreamExt as _, TryStreamExt as _};
use serde::Deserialize;
use std::collections::HashMap;
#[derive(Deserialize, Debug, PartialEq, Eq)]
struct Page {
pub items: Vec<i32>,
pub page: Option<i32>,
}
#[async_trait]
impl super::Page for Page {
type Item = i32;
type IntoIter = <Vec<i32> as IntoIterator>::IntoIter;
async fn into_items(self) -> crate::Result<Self::IntoIter> {
Ok(self.items.into_iter())
}
}
#[tokio::test]
async fn callback_item_pagination() {
let pager: Pager<Page> = Pager::new(
|continuation: PagerState, _ctx| {
Box::pin(async move {
match continuation {
PagerState::Initial => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-1"),
)])
.into(),
r#"{"items":[1],"page":1}"#,
)
.into(),
continuation: PagerContinuation::Token("1".into()),
}),
PagerState::More(ref i) if i.as_ref() == "1" => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-2"),
)])
.into(),
r#"{"items":[2],"page":2}"#,
)
.into(),
continuation: PagerContinuation::Token("2".into()),
}),
PagerState::More(ref i) if i.as_ref() == "2" => Ok(PagerResult::Done {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-3"),
)])
.into(),
r#"{"items":[3],"page":3}"#,
)
.into(),
}),
_ => {
panic!("Unexpected continuation value")
}
}
})
},
None,
);
let items: Vec<i32> = pager.try_collect().await.unwrap();
assert_eq!(vec![1, 2, 3], items.as_slice())
}
#[tokio::test]
async fn callback_item_pagination_error() {
let pager: Pager<Page> = ItemIterator::new(
|continuation: PagerState, _options| {
Box::pin(async move {
match continuation {
PagerState::Initial => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-1"),
)])
.into(),
r#"{"items":[1],"page":1}"#,
)
.into(),
continuation: PagerContinuation::Token("1".into()),
}),
PagerState::More(ref i) if i.as_ref() == "1" => {
Err(crate::Error::with_message(
crate::error::ErrorKind::Other,
"yon request didst fail",
))
}
_ => {
panic!("Unexpected continuation value")
}
}
})
},
None,
);
let pages: Vec<Result<(String, Page), crate::Error>> = pager
.into_pages()
.then(|r| async move {
let r = r?;
let header = r
.headers()
.get_optional_string(&HeaderName::from_static("x-test-header"))
.unwrap();
let body = r.into_model()?;
Ok((header, body))
})
.collect()
.await;
assert_eq!(2, pages.len());
assert_eq!(
&(
"page-1".to_string(),
Page {
items: vec![1],
page: Some(1)
}
),
pages[0].as_ref().unwrap()
);
let err = pages[1].as_ref().unwrap_err();
assert_eq!(&crate::error::ErrorKind::Other, err.kind());
assert_eq!("yon request didst fail", format!("{}", err));
}
#[tokio::test]
async fn page_iterator_iterate_all_pages() {
let mut pager = PageIterator::new(make_three_page_callback(), None);
assert_eq!(pager.continuation(), None);
let first_page = pager
.next()
.await
.expect("expected first page")
.expect("expected successful first page")
.into_model()
.expect("expected page");
assert_eq!(first_page.page, Some(1));
assert_eq!(first_page.items, vec![1, 2, 3]);
assert_eq!(
pager.continuation().map(AsRef::as_ref),
Some("next-token-1")
);
let second_page = pager
.next()
.await
.expect("expected second page")
.expect("expected successful second page")
.into_model()
.expect("expected page");
assert_eq!(second_page.page, Some(2));
assert_eq!(second_page.items, vec![4, 5, 6]);
assert_eq!(
pager.continuation().map(AsRef::as_ref),
Some("next-token-2")
);
let third_page = pager
.next()
.await
.expect("expected third page")
.expect("expected successful third page")
.into_model()
.expect("expected page");
assert_eq!(third_page.page, None);
assert_eq!(third_page.items, vec![7, 8, 9]);
assert_eq!(pager.continuation(), None);
assert!(pager.next().await.is_none());
}
#[tokio::test]
async fn page_iterator_with_continuation_token() {
let mut first_pager = PageIterator::new(make_three_page_callback(), None);
assert_eq!(first_pager.continuation(), None);
let first_page = first_pager
.next()
.await
.expect("expected first page")
.expect("expected successful first page")
.into_model()
.expect("expected page");
assert_eq!(first_page.page, Some(1));
assert_eq!(first_page.items, vec![1, 2, 3]);
let continuation_token = first_pager
.continuation()
.expect("expected continuation_token from first page");
assert_eq!(continuation_token.as_ref(), "next-token-1");
let mut second_pager = PageIterator::new(
make_three_page_callback(),
Some(PagerOptions {
continuation: Some(continuation_token.clone()),
..Default::default()
}),
);
assert_eq!(
second_pager.continuation().map(AsRef::as_ref),
Some("next-token-1"),
);
let second_page = second_pager
.next()
.await
.expect("expected second page")
.expect("expected successful second page")
.into_model()
.expect("expected page");
assert_eq!(second_page.page, Some(2));
assert_eq!(second_page.items, vec![4, 5, 6]);
assert_eq!(
second_pager.continuation().map(AsRef::as_ref),
Some("next-token-2")
);
let last_page = second_pager
.next()
.await
.expect("expected last page")
.expect("expected successful last page")
.into_model()
.expect("expected page");
assert_eq!(last_page.page, None);
assert_eq!(last_page.items, vec![7, 8, 9]);
assert_eq!(second_pager.continuation(), None);
}
#[tokio::test]
async fn page_iterator_from_item_iterator_after_first_page() {
let mut item_pager = ItemIterator::new(make_three_page_callback(), None);
assert_eq!(item_pager.continuation(), None);
let first_item = item_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = item_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let third_item = item_pager
.next()
.await
.expect("expected third item")
.expect("expected successful third item");
assert_eq!(third_item, 3);
let mut page_pager = item_pager.into_pages();
assert_eq!(page_pager.continuation(), None);
let first_page = page_pager
.next()
.await
.expect("expected first page")
.expect("expected successful first page")
.into_model()
.expect("expected page");
assert_eq!(first_page.page, Some(1));
assert_eq!(first_page.items, vec![1, 2, 3]);
let continuation_token = page_pager
.continuation()
.expect("expected continuation_token from first page");
assert_eq!(continuation_token.as_ref(), "next-token-1");
}
#[tokio::test]
async fn page_iterator_from_item_iterator_second_page_first_item() {
let mut item_pager = ItemIterator::new(make_three_page_callback(), None);
assert_eq!(item_pager.continuation(), None);
let first_item = item_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = item_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let third_item = item_pager
.next()
.await
.expect("expected third item")
.expect("expected successful third item");
assert_eq!(third_item, 3);
let fourth_item = item_pager
.next()
.await
.expect("expected fourth item")
.expect("expected successful fourth item");
assert_eq!(fourth_item, 4);
let mut page_pager = item_pager.into_pages();
assert_eq!(
page_pager.continuation().map(AsRef::as_ref),
Some("next-token-1")
);
let second_page = page_pager
.next()
.await
.expect("expected second page")
.expect("expected successful second page")
.into_model()
.expect("expected page");
assert_eq!(second_page.page, Some(2));
assert_eq!(second_page.items, vec![4, 5, 6]);
let continuation_token = page_pager
.continuation()
.expect("expected continuation_token from second page");
assert_eq!(continuation_token.as_ref(), "next-token-2");
}
#[tokio::test]
async fn item_iterator_with_continuation_token() {
let mut first_pager = ItemIterator::new(make_three_page_callback(), None);
assert_eq!(first_pager.continuation(), None);
let first_item = first_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = first_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let continuation_token = first_pager.continuation();
assert_eq!(continuation_token, None);
let mut second_pager = ItemIterator::new(
make_three_page_callback(),
Some(PagerOptions {
continuation: continuation_token.cloned(),
..Default::default()
}),
);
assert_eq!(second_pager.continuation(), None);
let first_item_second_pager = second_pager
.next()
.await
.expect("expected first item from second pager")
.expect("expected successful first item from second pager");
assert_eq!(first_item_second_pager, 1);
let items: Vec<i32> = second_pager.try_collect().await.unwrap();
assert_eq!(items.as_slice(), vec![2, 3, 4, 5, 6, 7, 8, 9]);
}
#[tokio::test]
async fn item_iterator_continuation_second_page_second_item() {
let mut first_pager = ItemIterator::new(make_three_page_callback(), None);
assert_eq!(first_pager.continuation(), None);
let first_item = first_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = first_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let third_item = first_pager
.next()
.await
.expect("expected third item")
.expect("expected successful third item");
assert_eq!(third_item, 3);
let fourth_item = first_pager
.next()
.await
.expect("expected fourth item")
.expect("expected successful fourth item");
assert_eq!(fourth_item, 4);
let fifth_item = first_pager
.next()
.await
.expect("expected fifth item")
.expect("expected successful fifth item");
assert_eq!(fifth_item, 5);
let continuation_token = first_pager.into_continuation();
assert_eq!(
continuation_token.as_ref().map(AsRef::as_ref),
Some("next-token-1")
);
let mut second_pager = ItemIterator::new(
make_three_page_callback(),
Some(PagerOptions {
continuation: continuation_token,
..Default::default()
}),
);
let first_item_second_pager = second_pager
.next()
.await
.expect("expected first item from second pager")
.expect("expected successful first item from second pager");
assert_eq!(first_item_second_pager, 4);
let items: Vec<i32> = second_pager.try_collect().await.unwrap();
assert_eq!(items.as_slice(), vec![5, 6, 7, 8, 9]);
}
#[tokio::test]
async fn item_iterator_continuation_after_first_page() {
let mut first_pager = ItemIterator::new(make_three_page_callback(), None);
assert_eq!(first_pager.continuation(), None);
let first_item = first_pager
.next()
.await
.expect("expected first item")
.expect("expected successful first item");
assert_eq!(first_item, 1);
let second_item = first_pager
.next()
.await
.expect("expected second item")
.expect("expected successful second item");
assert_eq!(second_item, 2);
let third_item = first_pager
.next()
.await
.expect("expected third item")
.expect("expected successful third item");
assert_eq!(third_item, 3);
let continuation_token = first_pager.continuation();
assert_eq!(continuation_token, None);
let mut second_pager = ItemIterator::new(
make_three_page_callback(),
Some(PagerOptions {
continuation: continuation_token.cloned(),
..Default::default()
}),
);
let first_item_second_pager = second_pager
.next()
.await
.expect("expected first item from first pager")
.expect("expected successful first item from first pager");
assert_eq!(first_item_second_pager, 1);
let items: Vec<i32> = second_pager.try_collect().await.unwrap();
assert_eq!(items.as_slice(), vec![2, 3, 4, 5, 6, 7, 8, 9]);
}
#[allow(clippy::type_complexity)]
fn make_three_page_callback(
) -> impl Fn(PagerState, PagerOptions<'_>) -> PagerResultFuture<Response<Page>> {
|continuation: PagerState, _options| {
Box::pin(async move {
match continuation {
PagerState::Initial => Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
Default::default(),
r#"{"items":[1,2,3],"page":1}"#,
)
.into(),
continuation: PagerContinuation::Token("next-token-1".into()),
}),
PagerState::More(continuation) if continuation.as_ref() == "next-token-1" => {
Ok(PagerResult::More {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-2"),
)])
.into(),
r#"{"items":[4,5,6],"page":2}"#,
)
.into(),
continuation: PagerContinuation::Token("next-token-2".into()),
})
}
PagerState::More(continuation) if continuation.as_ref() == "next-token-2" => {
Ok(PagerResult::Done {
response: RawResponse::from_bytes(
StatusCode::Ok,
HashMap::from([(
HeaderName::from_static("x-test-header"),
HeaderValue::from_static("page-3"),
)])
.into(),
r#"{"items":[7,8,9]}"#,
)
.into(),
})
}
_ => {
panic!("Unexpected continuation value: {:?}", continuation)
}
}
})
}
}
}