use crate::auth::AuthToken;
use crate::common::ContinuationParams;
use crate::parse::ParseFrom;
use crate::query::{GetContinuationsQuery, PostMethod, PostQuery, Query, QueryMethod};
use crate::{ProcessedResult, Result};
use futures::Stream;
use std::fmt::Debug;
pub trait ParseFromContinuable<Q>: Debug + Sized {
fn parse_from_continuable(
p: ProcessedResult<Q>,
) -> crate::Result<(Self, Option<ContinuationParams<'static>>)>;
fn parse_continuation(
p: ProcessedResult<GetContinuationsQuery<'_, Q>>,
) -> crate::Result<(Self, Option<ContinuationParams<'static>>)>;
}
impl<T, Q> ParseFrom<Q> for T
where
T: ParseFromContinuable<Q>,
{
fn parse_from(p: ProcessedResult<Q>) -> crate::Result<Self> {
T::parse_from_continuable(p).map(|t| t.0)
}
}
pub(crate) fn stream<'a, Q, A>(
query: &'a Q,
client: &'a crate::client::Client,
tok: &'a A,
) -> impl Stream<Item = Result<Q::Output>> + 'a
where
A: AuthToken,
Q: Query<A>,
Q: PostQuery,
Q::Output: ParseFromContinuable<Q>,
{
futures::stream::unfold(
(false, None::<GetContinuationsQuery<'a, Q>>),
move |(first_query_run, maybe_next_query)| async move {
if !first_query_run {
let first_res = Q::Method::call(query, client, tok)
.await
.and_then(|res| res.process())
.and_then(|res| GetContinuationsQuery::from_first_result(res));
match first_res {
Ok((first, next)) => {
return Some((Ok(first), (true, next)));
}
Err(e) => return Some((Err(e), (true, None))),
}
}
if let Some(ref next_query) = maybe_next_query {
let next_res = PostMethod::call(next_query, client, tok)
.await
.and_then(|res| res.process());
let next_res =
next_res.and_then(|res| GetContinuationsQuery::from_continuation(res));
match next_res {
Ok((this, next)) => {
return Some((Ok(this), (true, next)));
}
Err(e) => return Some((Err(e), (true, None))),
}
}
None
},
)
}
pub(crate) fn raw_json_stream<'a, Q, A>(
query: &'a Q,
client: &'a crate::client::Client,
tok: &'a A,
) -> impl Stream<Item = Result<String>> + 'a
where
A: AuthToken,
Q: Query<A>,
Q: PostQuery,
Q::Output: ParseFromContinuable<Q>,
{
futures::stream::unfold(
(false, None::<GetContinuationsQuery<'a, Q>>),
move |(first_query_run, maybe_next_query)| async move {
if !first_query_run {
let first_raw_res = Q::Method::call(query, client, tok).await;
match first_raw_res {
Ok(first_raw_res) => {
let first_source = first_raw_res.json.clone();
let next_query = first_raw_res
.process()
.and_then(GetContinuationsQuery::from_first_result::<Q::Output>)
.ok()
.and_then(|(_, q)| q);
return Some((Ok(first_source), (true, next_query)));
}
Err(e) => return Some((Err(e), (true, None))),
}
}
if let Some(ref next_query) = maybe_next_query {
let next_raw_res =
<GetContinuationsQuery<Q> as Query<A>>::Method::call(next_query, client, tok)
.await;
match next_raw_res {
Ok(next_raw_res) => {
let next_source = next_raw_res.json.clone();
let next_query = next_raw_res
.process()
.and_then(GetContinuationsQuery::from_continuation::<Q::Output>)
.ok()
.and_then(|(_, q)| q);
return Some((Ok(next_source), (true, next_query)));
}
Err(e) => return Some((Err(e), (true, None))),
}
}
None
},
)
}