ytmapi_rs/
continuations.rs

1//! This module contains the `ParseFromContinuable` trait, allowing streaming of
2//! results that contain continuations.
3//! # Implementation example - Incomplete
4//! ```no_run
5//! # struct GetDateQuery;
6//! use serde::Deserialize;
7//! use ytmapi_rs::common::ContinuationParams;
8//! use ytmapi_rs::query::GetContinuationsQuery;
9//!
10//! #[derive(Debug, Deserialize)]
11//! struct Date {
12//!     date_string: String,
13//!     date_timestamp: usize,
14//! }
15//! impl ytmapi_rs::continuations::ParseFromContinuable<GetDateQuery> for () {
16//!     fn parse_from_continuable(
17//!         p: ytmapi_rs::ProcessedResult<GetDateQuery>,
18//!     ) -> ytmapi_rs::Result<(Self, Option<ContinuationParams<'static>>)> {
19//!         todo!();
20//!     }
21//!     fn parse_continuation(
22//!         p: ytmapi_rs::ProcessedResult<GetContinuationsQuery<'_, GetDateQuery>>,
23//!     ) -> ytmapi_rs::Result<(Self, Option<ContinuationParams<'static>>)> {
24//!         todo!();
25//!     }
26//! }
27//! ```
28//! # Alternative implementation
29//! An alternative to working directly with [`crate::json::Json`] is to add
30//! `json-crawler` as a dependency and use the provided
31//! `From<ProcessedResult> for JsonCrawlerOwned` implementation.
32use crate::auth::AuthToken;
33use crate::common::ContinuationParams;
34use crate::parse::ParseFrom;
35use crate::query::{GetContinuationsQuery, PostMethod, PostQuery, Query, QueryMethod};
36use crate::{ProcessedResult, Result};
37use futures::Stream;
38use std::fmt::Debug;
39
40/// This trait represents a result that can be streamed to get more results.
41/// It will contain continuation params, and a parsing function for its
42/// continuations.
43pub trait ParseFromContinuable<Q>: Debug + Sized {
44    fn parse_from_continuable(
45        p: ProcessedResult<Q>,
46    ) -> crate::Result<(Self, Option<ContinuationParams<'static>>)>;
47    fn parse_continuation(
48        p: ProcessedResult<GetContinuationsQuery<'_, Q>>,
49    ) -> crate::Result<(Self, Option<ContinuationParams<'static>>)>;
50}
51
52/// Blanket implementation of ParseFrom where T implements ParseFromContinuable
53/// so that caller can write T::ParseFrom.
54impl<T, Q> ParseFrom<Q> for T
55where
56    T: ParseFromContinuable<Q>,
57{
58    fn parse_from(p: ProcessedResult<Q>) -> crate::Result<Self> {
59        T::parse_from_continuable(p).map(|t| t.0)
60    }
61}
62
63/// Stream a query that can be streamed.
64/// This function has quite complicated trait bounds. To step through them;
65/// - query must meet the standard trait bounds for a query - Q: Query<A:
66///   AuthToken>.
67/// - only PostQuery queries can be streamed - therefore we add the trait bound
68///   Q: PostQuery - this simplifies code within this function.
69/// - a query can only be streamed if the output is Continuable - therefore we
70///   specify Q::Output: ParseFromContinuable<Q>.
71pub(crate) fn stream<'a, Q, A>(
72    query: &'a Q,
73    client: &'a crate::client::Client,
74    tok: &'a A,
75) -> impl Stream<Item = Result<Q::Output>> + 'a
76where
77    A: AuthToken,
78    Q: Query<A>,
79    Q: PostQuery,
80    Q::Output: ParseFromContinuable<Q>,
81{
82    futures::stream::unfold(
83        // Initial state for unfold
84        // The first component is that the first query hasn't been run.
85        // The second component of state represents if there are continuations
86        // (this is ignored on first run)
87        (false, None::<GetContinuationsQuery<'a, Q>>),
88        move |(first_query_run, maybe_next_query)| async move {
89            if !first_query_run {
90                let first_res = Q::Method::call(query, client, tok)
91                    .await
92                    .and_then(|res| res.process())
93                    .and_then(|res| GetContinuationsQuery::from_first_result(res));
94                match first_res {
95                    Ok((first, next)) => {
96                        return Some((Ok(first), (true, next)));
97                    }
98                    Err(e) => return Some((Err(e), (true, None))),
99                }
100            }
101            if let Some(ref next_query) = maybe_next_query {
102                let next_res = PostMethod::call(next_query, client, tok)
103                    .await
104                    .and_then(|res| res.process());
105                let next_res =
106                    next_res.and_then(|res| GetContinuationsQuery::from_continuation(res));
107                match next_res {
108                    Ok((this, next)) => {
109                        return Some((Ok(this), (true, next)));
110                    }
111                    Err(e) => return Some((Err(e), (true, None))),
112                }
113            }
114            None
115        },
116    )
117}
118
119/// Stream a query that can be streamed, returning the source as well as the
120/// output, by cloning the source before yielding it.
121/// Note that the stream will stop if an error is detected (after returning
122/// the source string that produced the error).
123/// This function has quite complicated trait bounds. To step through them;
124/// - query must meet the standard trait bounds for a query - Q: Query<A:
125///   AuthToken>.
126/// - only PostQuery queries can be streamed - therefore we add the trait bound
127///   Q: PostQuery - this simplifies code within this function.
128/// - a query can only be streamed if the output is Continuable - therefore we
129///   specify Q::Output: ParseFromContinuable<Q>.
130pub(crate) fn raw_json_stream<'a, Q, A>(
131    query: &'a Q,
132    client: &'a crate::client::Client,
133    tok: &'a A,
134) -> impl Stream<Item = Result<String>> + 'a
135where
136    A: AuthToken,
137    Q: Query<A>,
138    Q: PostQuery,
139    Q::Output: ParseFromContinuable<Q>,
140{
141    futures::stream::unfold(
142        // Initial state for unfold
143        // The first component is that the first query hasn't been run.
144        // The second component of state represents if there are continuations
145        // (this is ignored on first run)
146        (false, None::<GetContinuationsQuery<'a, Q>>),
147        move |(first_query_run, maybe_next_query)| async move {
148            if !first_query_run {
149                let first_raw_res = Q::Method::call(query, client, tok).await;
150                match first_raw_res {
151                    Ok(first_raw_res) => {
152                        let first_source = first_raw_res.json.clone();
153                        let next_query = first_raw_res
154                            .process()
155                            .and_then(GetContinuationsQuery::from_first_result::<Q::Output>)
156                            .ok()
157                            .and_then(|(_, q)| q);
158                        return Some((Ok(first_source), (true, next_query)));
159                    }
160                    Err(e) => return Some((Err(e), (true, None))),
161                }
162            }
163            if let Some(ref next_query) = maybe_next_query {
164                let next_raw_res =
165                    <GetContinuationsQuery<Q> as Query<A>>::Method::call(next_query, client, tok)
166                        .await;
167                match next_raw_res {
168                    Ok(next_raw_res) => {
169                        let next_source = next_raw_res.json.clone();
170                        let next_query = next_raw_res
171                            .process()
172                            .and_then(GetContinuationsQuery::from_continuation::<Q::Output>)
173                            .ok()
174                            .and_then(|(_, q)| q);
175                        return Some((Ok(next_source), (true, next_query)));
176                    }
177                    Err(e) => return Some((Err(e), (true, None))),
178                }
179            }
180            None
181        },
182    )
183}