1use futures::{join, stream::FuturesOrdered, try_join, StreamExt};
2use std::{path::Path, sync::Arc};
3use tokio::{select, sync::RwLock};
4use tokio_retry::Retry;
5
6use crate::{dash, hls, player_response, util};
7
8#[derive(thiserror::Error, Debug)]
9pub enum WorkerError {
10 #[error("Error getting initial player response")]
11 InitialPlayerResponseError(#[from] player_response::PlayerResponseError),
12 #[error("Could not find representation")]
13 MissingRepresentation(String),
14 #[error("I/O error")]
15 IoError(#[from] std::io::Error),
16 #[error("Download error")]
17 DownloadError(#[from] util::DownloadError),
18 #[error("No thumbnail found")]
19 NoThumbnail,
20}
21
22pub async fn start(
23 client: &util::HttpClient,
24 ipr: &player_response::InitialPlayerResponse,
25 workdir: &Path,
26) -> Result<(), WorkerError> {
27 let (manifest, thumbnail) = join!(
28 ipr.get_dash_representations(&client),
29 thumbnail_dl(&client, &ipr, workdir),
30 );
31
32 let manifest = manifest?;
33 if let Err(e) = thumbnail {
34 warn!("Could not download thumbnail: {}", e);
35 }
36
37 let stats = Arc::new(RwLock::new(crate::stats::DownloadStatistics::new()));
38 let (tx_seq, rx_seq) = tokio::sync::mpsc::unbounded_channel();
39
40 try_join!(
41 thread_seq(&client, stats.clone(), tx_seq, &ipr),
42 thread_download(&client, stats.clone(), rx_seq, &manifest, workdir, 4),
43 )?;
44
45 Ok(())
46}
47
48async fn thumbnail_dl(
49 client: &util::HttpClient,
50 ipr: &player_response::InitialPlayerResponse,
51 workdir: &Path,
52) -> Result<(), WorkerError> {
53 let url = || -> Option<String> {
54 Some(
55 ipr.microformat
56 .as_ref()?
57 .player_microformat_renderer
58 .thumbnail
59 .thumbnails
60 .last()?
61 .url
62 .clone(),
63 )
64 }()
65 .ok_or(WorkerError::NoThumbnail)?;
66
67 let fname = workdir.join("thumbnail.jpg");
68 let fname = fname.to_string_lossy();
69
70 client
71 .download_file(&url, &fname)
72 .await
73 .map_err(WorkerError::DownloadError)?;
74
75 info!("Thumbnail saved to {}", fname);
76
77 Ok(())
78}
79
80async fn thread_seq(
81 client: &util::HttpClient,
82 stats: Arc<RwLock<crate::stats::DownloadStatistics>>,
83 tx_seq: tokio::sync::mpsc::UnboundedSender<i64>,
84 ipr: &player_response::InitialPlayerResponse,
85) -> Result<(), WorkerError> {
86 let mut seq = 0;
87 let mut last_seq_time = std::time::Instant::now();
88
89 let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(200)
90 .map(tokio_retry::strategy::jitter)
91 .take(5);
92
93 'out: loop {
94 let manifest = Retry::spawn(retry_strategy.clone(), || {
95 ipr.get_dash_representations(&client)
96 })
97 .await
98 .map_err(WorkerError::InitialPlayerResponseError)?;
99
100 if manifest.latest_segment_number > seq {
101 for s in seq..manifest.latest_segment_number {
102 if seq > 0 {
103 last_seq_time = std::time::Instant::now();
104 }
105 if tx_seq.send(s).is_err() {
106 error!("Failed to send segment number to download thread");
107 break 'out;
108 }
109
110 let mut st = stats.write().await;
111 st.segments_total = 1 + s as u64;
112 st.print();
113 }
114 seq = manifest.latest_segment_number;
115 }
116
117 if !ipr.is_usable() {
118 info!("Video is no longer live");
119 break;
120 }
121
122 if last_seq_time.elapsed().as_secs() > 30 {
123 warn!("No new segments found for 30 seconds, stopping");
124 break;
125 }
126
127 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
128 }
129
130 debug!("Sequence thread exited");
131
132 Ok(())
133}
134
135async fn thread_download(
136 client: &util::HttpClient,
137 stats: Arc<RwLock<crate::stats::DownloadStatistics>>,
138 rx_seq: tokio::sync::mpsc::UnboundedReceiver<i64>,
139 manifest: &dash::Manifest,
140 workdir: &Path,
141 concurrency: usize,
142) -> Result<(), WorkerError> {
143 let mut audio = manifest
145 .representations
146 .iter()
147 .filter(|r| r.height.is_none())
148 .collect::<Vec<_>>();
149 audio.sort_by(|a, b| a.bandwidth.cmp(&b.bandwidth));
150 let audio = *audio
151 .last()
152 .ok_or(WorkerError::MissingRepresentation("audio".to_string()))?;
153
154 let mut video = manifest
156 .representations
157 .iter()
158 .filter(|r| r.height.is_some())
159 .collect::<Vec<_>>();
160 video.sort_by(|a, b| a.bandwidth.cmp(&b.bandwidth));
161 let video = *video
162 .last()
163 .ok_or(WorkerError::MissingRepresentation("video".to_string()))?;
164
165 info!(
166 "Video: {}x{} {}fps ({}, f{})",
167 video.width.ok_or(WorkerError::MissingRepresentation(
168 "video width".to_string()
169 ))?,
170 video.height.ok_or(WorkerError::MissingRepresentation(
171 "video height".to_string()
172 ))?,
173 video.frame_rate.ok_or(WorkerError::MissingRepresentation(
174 "video frame rate".to_string()
175 ))?,
176 video.codecs,
177 video.id,
178 );
179 info!(
180 "Audio: {}kbps ({}, f{})",
181 audio.bandwidth / 1000,
182 audio.codecs,
183 audio.id
184 );
185
186 let segment_duration = std::time::Duration::from_millis(manifest.segment_duration as u64);
188 let playlist_path = workdir.join("index.m3u8");
189 let mut playlist =
190 hls::IndexPlaylist::new(&playlist_path.to_string_lossy(), &manifest, &audio, &video)
191 .await
192 .map_err(WorkerError::IoError)?;
193
194 let mut tasks = FuturesOrdered::new();
195 let mut seq_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx_seq);
196 let mut is_done = false;
197
198 loop {
199 while tasks.len() < concurrency && !is_done {
201 select! {
202 seq = seq_stream.next() => {
203 match seq {
204 Some(seq) => tasks.push_back(util::download_av_segment(
205 &client, workdir, &audio, &video, seq,
206 )),
207 None => {
208 is_done = true;
209 break;
210 }
211 }
212 },
213 _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {
215 break;
216 }
217 }
218 }
219
220 if tasks.is_empty() && is_done {
222 break;
223 }
224
225 match tasks.next().await {
227 Some(Ok((fname_audio, fname_video, size_total))) => {
228 playlist
229 .add_segment(&fname_audio, &fname_video, segment_duration)
230 .await
231 .map_err(WorkerError::IoError)?;
232
233 let mut st = stats.write().await;
234 st.segments_downloaded += 1;
235 st.bytes_downloaded += size_total as u64;
236 st.print();
237 }
238 Some(Err(e)) => {
239 error!("Could not download segment: {}", e);
240 }
241 None => (),
242 }
243 }
244
245 playlist.finish().await.map_err(WorkerError::IoError)?;
247
248 Ok(())
249}