ytmapi_rs/
continuations.rs

1//! This module contains the `Continuable` trait, allowing streaming of results
2//! that contain continuations.
3use crate::{
4    auth::AuthToken,
5    common::ContinuationParams,
6    parse::ParseFrom,
7    query::{GetContinuationsQuery, PostMethod, PostQuery, Query, QueryMethod},
8    ProcessedResult, Result,
9};
10use futures::Stream;
11use std::fmt::Debug;
12
13/// This trait represents a result that can be streamed to get more results.
14/// It will contain continuation params, and a parsing function for its
15/// continuations.
16// TODO: Implementation example.
17// TODO: Documement _why_ we need to take_continuation_params and we can't just
18// use a reference.
19pub trait Continuable<Q>: Sized {
20    fn take_continuation_params(&mut self) -> Option<ContinuationParams<'static>>;
21    fn parse_continuation(p: ProcessedResult<GetContinuationsQuery<'_, Q>>) -> Result<Self>;
22}
23
24// Implementing Continuable<Q> for T implies ParseFrom<GetContinuationsQuery<Q>
25// for T.
26// TODO: Consider if this lives here, or in parse module.
27impl<'a, T, Q> ParseFrom<GetContinuationsQuery<'a, Q>> for T
28where
29    T: Continuable<Q>,
30    T: Debug,
31{
32    fn parse_from(p: ProcessedResult<GetContinuationsQuery<'a, Q>>) -> Result<Self> {
33        T::parse_continuation(p)
34    }
35}
36
37/// Stream a query that can be streamed.
38/// This function has quite complicated trait bounds. To step through them;
39/// - query must meet the standard trait bounds for a query - Q: Query<A:
40///   AuthToken>.
41/// - only PostQuery queries can be streamed - therefore we add the trait bound
42///   Q: PostQuery - this simplifies code within this function.
43/// - a query can only be streamed if the output is Continuable - therefore we
44///   specify Q::Output: Continuable<Q>.
45// TODO: It may be possible to remove the Q: PostQuery bound,
46// instead calling QueryMethod<...>::Call directly.
47pub(crate) fn stream<'a, Q, A>(
48    query: &'a Q,
49    client: &'a crate::client::Client,
50    tok: &'a A,
51) -> impl Stream<Item = Result<Q::Output>> + 'a
52where
53    A: AuthToken,
54    Q: Query<A>,
55    Q: PostQuery,
56    Q::Output: Continuable<Q>,
57{
58    futures::stream::unfold(
59        (false, None::<GetContinuationsQuery<Q>>),
60        move |(first, maybe_next_query)| async move {
61            if !first {
62                let first_res: Result<Q::Output> = Q::Method::call(query, client, tok)
63                    .await
64                    .and_then(|res| res.process())
65                    .and_then(|res| res.parse_into());
66                match first_res {
67                    Ok(mut first) => {
68                        let maybe_next_query = GetContinuationsQuery::<Q>::new(&mut first, query);
69                        return Some((Ok(first), (true, maybe_next_query)));
70                    }
71                    Err(e) => return Some((Err(e), (true, None))),
72                }
73            }
74            if let Some(next_query) = maybe_next_query {
75                let next = PostMethod::call(&next_query, client, tok)
76                    .await
77                    .and_then(|res| res.process())
78                    .and_then(|res| res.parse_into());
79
80                match next {
81                    Ok(mut next) => {
82                        let maybe_next_query = GetContinuationsQuery::<Q>::new(&mut next, query);
83                        return Some((Ok(next), (true, maybe_next_query)));
84                    }
85                    Err(e) => return Some((Err(e), (true, None))),
86                }
87            }
88            None
89        },
90    )
91}