ytmapi_rs/continuations.rs
1//! This module contains the `ParseFromContinuable` trait, allowing streaming of
2//! results that contain continuations.
3//! # Implementation example - Incomplete
4//! ```no_run
5//! # struct GetDateQuery;
6//! use serde::Deserialize;
7//! use ytmapi_rs::common::ContinuationParams;
8//! use ytmapi_rs::query::GetContinuationsQuery;
9//!
10//! #[derive(Debug, Deserialize)]
11//! struct Date {
12//! date_string: String,
13//! date_timestamp: usize,
14//! }
15//! impl ytmapi_rs::continuations::ParseFromContinuable<GetDateQuery> for () {
16//! fn parse_from_continuable(
17//! p: ytmapi_rs::ProcessedResult<GetDateQuery>,
18//! ) -> ytmapi_rs::Result<(Self, Option<ContinuationParams<'static>>)> {
19//! todo!();
20//! }
21//! fn parse_continuation(
22//! p: ytmapi_rs::ProcessedResult<GetContinuationsQuery<'_, GetDateQuery>>,
23//! ) -> ytmapi_rs::Result<(Self, Option<ContinuationParams<'static>>)> {
24//! todo!();
25//! }
26//! }
27//! ```
28//! # Alternative implementation
29//! An alternative to working directly with [`crate::json::Json`] is to add
30//! `json-crawler` as a dependency and use the provided
31//! `From<ProcessedResult> for JsonCrawlerOwned` implementation.
32use crate::auth::AuthToken;
33use crate::common::ContinuationParams;
34use crate::parse::ParseFrom;
35use crate::query::{GetContinuationsQuery, PostMethod, PostQuery, Query, QueryMethod};
36use crate::{ProcessedResult, Result};
37use futures::Stream;
38use std::fmt::Debug;
39
40/// This trait represents a result that can be streamed to get more results.
41/// It will contain continuation params, and a parsing function for its
42/// continuations.
43pub trait ParseFromContinuable<Q>: Debug + Sized {
44 fn parse_from_continuable(
45 p: ProcessedResult<Q>,
46 ) -> crate::Result<(Self, Option<ContinuationParams<'static>>)>;
47 fn parse_continuation(
48 p: ProcessedResult<GetContinuationsQuery<'_, Q>>,
49 ) -> crate::Result<(Self, Option<ContinuationParams<'static>>)>;
50}
51
52/// Blanket implementation of ParseFrom where T implements ParseFromContinuable
53/// so that caller can write T::ParseFrom.
54impl<T, Q> ParseFrom<Q> for T
55where
56 T: ParseFromContinuable<Q>,
57{
58 fn parse_from(p: ProcessedResult<Q>) -> crate::Result<Self> {
59 T::parse_from_continuable(p).map(|t| t.0)
60 }
61}
62
63/// Stream a query that can be streamed.
64/// This function has quite complicated trait bounds. To step through them;
65/// - query must meet the standard trait bounds for a query - Q: Query<A:
66/// AuthToken>.
67/// - only PostQuery queries can be streamed - therefore we add the trait bound
68/// Q: PostQuery - this simplifies code within this function.
69/// - a query can only be streamed if the output is Continuable - therefore we
70/// specify Q::Output: ParseFromContinuable<Q>.
71pub(crate) fn stream<'a, Q, A>(
72 query: &'a Q,
73 client: &'a crate::client::Client,
74 tok: &'a A,
75) -> impl Stream<Item = Result<Q::Output>> + 'a
76where
77 A: AuthToken,
78 Q: Query<A>,
79 Q: PostQuery,
80 Q::Output: ParseFromContinuable<Q>,
81{
82 futures::stream::unfold(
83 // Initial state for unfold
84 // The first component is that the first query hasn't been run.
85 // The second component of state represents if there are continuations
86 // (this is ignored on first run)
87 (false, None::<GetContinuationsQuery<'a, Q>>),
88 move |(first_query_run, maybe_next_query)| async move {
89 if !first_query_run {
90 let first_res = Q::Method::call(query, client, tok)
91 .await
92 .and_then(|res| res.process())
93 .and_then(|res| GetContinuationsQuery::from_first_result(res));
94 match first_res {
95 Ok((first, next)) => {
96 return Some((Ok(first), (true, next)));
97 }
98 Err(e) => return Some((Err(e), (true, None))),
99 }
100 }
101 if let Some(ref next_query) = maybe_next_query {
102 let next_res = PostMethod::call(next_query, client, tok)
103 .await
104 .and_then(|res| res.process());
105 let next_res =
106 next_res.and_then(|res| GetContinuationsQuery::from_continuation(res));
107 match next_res {
108 Ok((this, next)) => {
109 return Some((Ok(this), (true, next)));
110 }
111 Err(e) => return Some((Err(e), (true, None))),
112 }
113 }
114 None
115 },
116 )
117}
118
119/// Stream a query that can be streamed, returning the source as well as the
120/// output, by cloning the source before yielding it.
121/// Note that the stream will stop if an error is detected (after returning
122/// the source string that produced the error).
123/// This function has quite complicated trait bounds. To step through them;
124/// - query must meet the standard trait bounds for a query - Q: Query<A:
125/// AuthToken>.
126/// - only PostQuery queries can be streamed - therefore we add the trait bound
127/// Q: PostQuery - this simplifies code within this function.
128/// - a query can only be streamed if the output is Continuable - therefore we
129/// specify Q::Output: ParseFromContinuable<Q>.
130pub(crate) fn raw_json_stream<'a, Q, A>(
131 query: &'a Q,
132 client: &'a crate::client::Client,
133 tok: &'a A,
134) -> impl Stream<Item = Result<String>> + 'a
135where
136 A: AuthToken,
137 Q: Query<A>,
138 Q: PostQuery,
139 Q::Output: ParseFromContinuable<Q>,
140{
141 futures::stream::unfold(
142 // Initial state for unfold
143 // The first component is that the first query hasn't been run.
144 // The second component of state represents if there are continuations
145 // (this is ignored on first run)
146 (false, None::<GetContinuationsQuery<'a, Q>>),
147 move |(first_query_run, maybe_next_query)| async move {
148 if !first_query_run {
149 let first_raw_res = Q::Method::call(query, client, tok).await;
150 match first_raw_res {
151 Ok(first_raw_res) => {
152 let first_source = first_raw_res.json.clone();
153 let next_query = first_raw_res
154 .process()
155 .and_then(GetContinuationsQuery::from_first_result::<Q::Output>)
156 .ok()
157 .and_then(|(_, q)| q);
158 return Some((Ok(first_source), (true, next_query)));
159 }
160 Err(e) => return Some((Err(e), (true, None))),
161 }
162 }
163 if let Some(ref next_query) = maybe_next_query {
164 let next_raw_res =
165 <GetContinuationsQuery<Q> as Query<A>>::Method::call(next_query, client, tok)
166 .await;
167 match next_raw_res {
168 Ok(next_raw_res) => {
169 let next_source = next_raw_res.json.clone();
170 let next_query = next_raw_res
171 .process()
172 .and_then(GetContinuationsQuery::from_continuation::<Q::Output>)
173 .ok()
174 .and_then(|(_, q)| q);
175 return Some((Ok(next_source), (true, next_query)));
176 }
177 Err(e) => return Some((Err(e), (true, None))),
178 }
179 }
180 None
181 },
182 )
183}