ytmapi_rs/
continuations.rs1use crate::{
4 auth::AuthToken,
5 common::ContinuationParams,
6 parse::ParseFrom,
7 query::{GetContinuationsQuery, PostMethod, PostQuery, Query, QueryMethod},
8 ProcessedResult, Result,
9};
10use futures::Stream;
11use std::fmt::Debug;
12
13pub trait Continuable<Q>: Sized {
20 fn take_continuation_params(&mut self) -> Option<ContinuationParams<'static>>;
21 fn parse_continuation(p: ProcessedResult<GetContinuationsQuery<'_, Q>>) -> Result<Self>;
22}
23
24impl<'a, T, Q> ParseFrom<GetContinuationsQuery<'a, Q>> for T
28where
29 T: Continuable<Q>,
30 T: Debug,
31{
32 fn parse_from(p: ProcessedResult<GetContinuationsQuery<'a, Q>>) -> Result<Self> {
33 T::parse_continuation(p)
34 }
35}
36
37pub(crate) fn stream<'a, Q, A>(
48 query: &'a Q,
49 client: &'a crate::client::Client,
50 tok: &'a A,
51) -> impl Stream<Item = Result<Q::Output>> + 'a
52where
53 A: AuthToken,
54 Q: Query<A>,
55 Q: PostQuery,
56 Q::Output: Continuable<Q>,
57{
58 futures::stream::unfold(
59 (false, None::<GetContinuationsQuery<Q>>),
60 move |(first, maybe_next_query)| async move {
61 if !first {
62 let first_res: Result<Q::Output> = Q::Method::call(query, client, tok)
63 .await
64 .and_then(|res| res.process())
65 .and_then(|res| res.parse_into());
66 match first_res {
67 Ok(mut first) => {
68 let maybe_next_query = GetContinuationsQuery::<Q>::new(&mut first, query);
69 return Some((Ok(first), (true, maybe_next_query)));
70 }
71 Err(e) => return Some((Err(e), (true, None))),
72 }
73 }
74 if let Some(next_query) = maybe_next_query {
75 let next = PostMethod::call(&next_query, client, tok)
76 .await
77 .and_then(|res| res.process())
78 .and_then(|res| res.parse_into());
79
80 match next {
81 Ok(mut next) => {
82 let maybe_next_query = GetContinuationsQuery::<Q>::new(&mut next, query);
83 return Some((Ok(next), (true, maybe_next_query)));
84 }
85 Err(e) => return Some((Err(e), (true, None))),
86 }
87 }
88 None
89 },
90 )
91}