use std::vec;
use async_stream::try_stream;
use async_trait::async_trait;
use futures::pin_mut;
use futures::stream::{Stream, TryStreamExt};
use super::super::{Error, ErrorKind, Result};
#[async_trait]
pub trait ResourceQuery {
type Item;
const DEFAULT_LIMIT: usize;
async fn can_paginate(&self) -> Result<bool>;
fn extract_marker(&self, resource: &Self::Item) -> String;
async fn fetch_chunk(
&self,
limit: Option<usize>,
marker: Option<String>,
) -> Result<Vec<Self::Item>>;
async fn validate(&mut self) -> Result<()> {
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct ResourceIterator<Q: ResourceQuery> {
query: Q,
cache: Option<vec::IntoIter<Q::Item>>,
marker: Option<String>,
can_paginate: Option<bool>,
validated: bool,
}
impl<Q> ResourceIterator<Q>
where
Q: ResourceQuery,
{
#[allow(dead_code)] pub(crate) fn new(query: Q) -> ResourceIterator<Q> {
ResourceIterator {
query,
cache: None,
marker: None,
can_paginate: None, validated: false,
}
}
}
impl<Q> ResourceIterator<Q>
where
Q: ResourceQuery + Send,
{
pub async fn one(self) -> Result<Q::Item> {
let stream = self.into_stream();
pin_mut!(stream);
match stream.try_next().await? {
Some(result) => {
if stream.try_next().await?.is_some() {
Err(Error::new(
ErrorKind::TooManyItems,
"Query returned more than one result",
))
} else {
Ok(result)
}
}
None => Err(Error::new(
ErrorKind::ResourceNotFound,
"Query returned no results",
)),
}
}
pub fn into_stream(mut self) -> impl Stream<Item = Result<Q::Item>> {
try_stream! {
if !self.validated {
self.query.validate().await?;
self.validated = true;
}
if self.can_paginate.is_none() {
self.can_paginate = Some(self.query.can_paginate().await?);
}
loop {
let maybe_next = self.cache.as_mut().and_then(|cache| cache.next());
if let Some(next) = maybe_next {
self.marker = Some(self.query.extract_marker(&next));
yield next;
} else if self.cache.is_some() && self.can_paginate == Some(false) {
break;
} else {
let (marker, limit) = if self.can_paginate == Some(true) {
(self.marker.clone(), Some(Q::DEFAULT_LIMIT))
} else {
(None, None)
};
let mut iter = self.query.fetch_chunk(limit, marker).await?.into_iter();
let maybe_next = iter.next();
self.cache = Some(iter);
if let Some(next) = maybe_next {
self.marker = Some(self.query.extract_marker(&next));
yield next;
} else {
break;
}
}
}
}
}
}
#[cfg(test)]
mod test {
use async_trait::async_trait;
use futures::stream::TryStreamExt;
use super::super::super::Result;
use super::{ResourceIterator, ResourceQuery};
#[derive(Debug, PartialEq, Eq)]
struct Test(u8);
#[derive(Debug)]
struct TestQuery;
#[async_trait]
impl ResourceQuery for TestQuery {
type Item = Test;
const DEFAULT_LIMIT: usize = 2;
async fn can_paginate(&self) -> Result<bool> {
Ok(true)
}
fn extract_marker(&self, resource: &Test) -> String {
resource.0.to_string()
}
async fn fetch_chunk(
&self,
limit: Option<usize>,
marker: Option<String>,
) -> Result<Vec<Self::Item>> {
assert_eq!(limit, Some(2));
Ok(match marker.map(|s| s.parse::<u8>().unwrap()) {
Some(1) => vec![Test(2), Test(3)],
Some(3) => Vec::new(),
None => vec![Test(0), Test(1)],
Some(x) => panic!("unexpected marker {:?}", x),
})
}
}
#[derive(Debug)]
struct NoPagination;
#[async_trait]
impl ResourceQuery for NoPagination {
type Item = Test;
const DEFAULT_LIMIT: usize = 2;
async fn can_paginate(&self) -> Result<bool> {
Ok(false)
}
fn extract_marker(&self, resource: &Test) -> String {
resource.0.to_string()
}
async fn fetch_chunk(
&self,
limit: Option<usize>,
marker: Option<String>,
) -> Result<Vec<Self::Item>> {
assert!(limit.is_none());
assert!(marker.is_none());
Ok(vec![Test(0), Test(1), Test(2)])
}
}
#[tokio::test]
async fn test_resource_iterator() {
let it: ResourceIterator<TestQuery> = ResourceIterator::new(TestQuery);
assert_eq!(
it.into_stream().try_collect::<Vec<Test>>().await.unwrap(),
vec![Test(0), Test(1), Test(2), Test(3)]
);
}
#[tokio::test]
async fn test_resource_iterator_no_pagination() {
let it: ResourceIterator<NoPagination> = ResourceIterator::new(NoPagination);
assert_eq!(
it.into_stream().try_collect::<Vec<Test>>().await.unwrap(),
vec![Test(0), Test(1), Test(2)]
);
}
}