yta_rs/
worker.rs

1use futures::{join, stream::FuturesOrdered, try_join, StreamExt};
2use std::{path::Path, sync::Arc};
3use tokio::{select, sync::RwLock};
4use tokio_retry::Retry;
5
6use crate::{dash, hls, player_response, util};
7
8#[derive(thiserror::Error, Debug)]
9pub enum WorkerError {
10    #[error("Error getting initial player response")]
11    InitialPlayerResponseError(#[from] player_response::PlayerResponseError),
12    #[error("Could not find representation")]
13    MissingRepresentation(String),
14    #[error("I/O error")]
15    IoError(#[from] std::io::Error),
16    #[error("Download error")]
17    DownloadError(#[from] util::DownloadError),
18    #[error("No thumbnail found")]
19    NoThumbnail,
20}
21
22pub async fn start(
23    client: &util::HttpClient,
24    ipr: &player_response::InitialPlayerResponse,
25    workdir: &Path,
26) -> Result<(), WorkerError> {
27    let (manifest, thumbnail) = join!(
28        ipr.get_dash_representations(&client),
29        thumbnail_dl(&client, &ipr, workdir),
30    );
31
32    let manifest = manifest?;
33    if let Err(e) = thumbnail {
34        warn!("Could not download thumbnail: {}", e);
35    }
36
37    let stats = Arc::new(RwLock::new(crate::stats::DownloadStatistics::new()));
38    let (tx_seq, rx_seq) = tokio::sync::mpsc::unbounded_channel();
39
40    try_join!(
41        thread_seq(&client, stats.clone(), tx_seq, &ipr),
42        thread_download(&client, stats.clone(), rx_seq, &manifest, workdir, 4),
43    )?;
44
45    Ok(())
46}
47
48async fn thumbnail_dl(
49    client: &util::HttpClient,
50    ipr: &player_response::InitialPlayerResponse,
51    workdir: &Path,
52) -> Result<(), WorkerError> {
53    let url = || -> Option<String> {
54        Some(
55            ipr.microformat
56                .as_ref()?
57                .player_microformat_renderer
58                .thumbnail
59                .thumbnails
60                .last()?
61                .url
62                .clone(),
63        )
64    }()
65    .ok_or(WorkerError::NoThumbnail)?;
66
67    let fname = workdir.join("thumbnail.jpg");
68    let fname = fname.to_string_lossy();
69
70    client
71        .download_file(&url, &fname)
72        .await
73        .map_err(WorkerError::DownloadError)?;
74
75    info!("Thumbnail saved to {}", fname);
76
77    Ok(())
78}
79
80async fn thread_seq(
81    client: &util::HttpClient,
82    stats: Arc<RwLock<crate::stats::DownloadStatistics>>,
83    tx_seq: tokio::sync::mpsc::UnboundedSender<i64>,
84    ipr: &player_response::InitialPlayerResponse,
85) -> Result<(), WorkerError> {
86    let mut seq = 0;
87    let mut last_seq_time = std::time::Instant::now();
88
89    let retry_strategy = tokio_retry::strategy::ExponentialBackoff::from_millis(200)
90        .map(tokio_retry::strategy::jitter)
91        .take(5);
92
93    'out: loop {
94        let manifest = Retry::spawn(retry_strategy.clone(), || {
95            ipr.get_dash_representations(&client)
96        })
97        .await
98        .map_err(WorkerError::InitialPlayerResponseError)?;
99
100        if manifest.latest_segment_number > seq {
101            for s in seq..manifest.latest_segment_number {
102                if seq > 0 {
103                    last_seq_time = std::time::Instant::now();
104                }
105                if tx_seq.send(s).is_err() {
106                    error!("Failed to send segment number to download thread");
107                    break 'out;
108                }
109
110                let mut st = stats.write().await;
111                st.segments_total = 1 + s as u64;
112                st.print();
113            }
114            seq = manifest.latest_segment_number;
115        }
116
117        if !ipr.is_usable() {
118            info!("Video is no longer live");
119            break;
120        }
121
122        if last_seq_time.elapsed().as_secs() > 30 {
123            warn!("No new segments found for 30 seconds, stopping");
124            break;
125        }
126
127        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
128    }
129
130    debug!("Sequence thread exited");
131
132    Ok(())
133}
134
135async fn thread_download(
136    client: &util::HttpClient,
137    stats: Arc<RwLock<crate::stats::DownloadStatistics>>,
138    rx_seq: tokio::sync::mpsc::UnboundedReceiver<i64>,
139    manifest: &dash::Manifest,
140    workdir: &Path,
141    concurrency: usize,
142) -> Result<(), WorkerError> {
143    // Get the highest quality audio
144    let mut audio = manifest
145        .representations
146        .iter()
147        .filter(|r| r.height.is_none())
148        .collect::<Vec<_>>();
149    audio.sort_by(|a, b| a.bandwidth.cmp(&b.bandwidth));
150    let audio = *audio
151        .last()
152        .ok_or(WorkerError::MissingRepresentation("audio".to_string()))?;
153
154    // Get the highest quality video
155    let mut video = manifest
156        .representations
157        .iter()
158        .filter(|r| r.height.is_some())
159        .collect::<Vec<_>>();
160    video.sort_by(|a, b| a.bandwidth.cmp(&b.bandwidth));
161    let video = *video
162        .last()
163        .ok_or(WorkerError::MissingRepresentation("video".to_string()))?;
164
165    info!(
166        "Video: {}x{} {}fps ({}, f{})",
167        video.width.ok_or(WorkerError::MissingRepresentation(
168            "video width".to_string()
169        ))?,
170        video.height.ok_or(WorkerError::MissingRepresentation(
171            "video height".to_string()
172        ))?,
173        video.frame_rate.ok_or(WorkerError::MissingRepresentation(
174            "video frame rate".to_string()
175        ))?,
176        video.codecs,
177        video.id,
178    );
179    info!(
180        "Audio: {}kbps ({}, f{})",
181        audio.bandwidth / 1000,
182        audio.codecs,
183        audio.id
184    );
185
186    // Write the m3u8 file
187    let segment_duration = std::time::Duration::from_millis(manifest.segment_duration as u64);
188    let playlist_path = workdir.join("index.m3u8");
189    let mut playlist =
190        hls::IndexPlaylist::new(&playlist_path.to_string_lossy(), &manifest, &audio, &video)
191            .await
192            .map_err(WorkerError::IoError)?;
193
194    let mut tasks = FuturesOrdered::new();
195    let mut seq_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx_seq);
196    let mut is_done = false;
197
198    loop {
199        // Start new downloads if we have room
200        while tasks.len() < concurrency && !is_done {
201            select! {
202                seq = seq_stream.next() => {
203                    match seq {
204                        Some(seq) => tasks.push_back(util::download_av_segment(
205                            &client, workdir, &audio, &video, seq,
206                        )),
207                        None => {
208                            is_done = true;
209                            break;
210                        }
211                    }
212                },
213                // If no new segments are available after 1ms, continue
214                _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {
215                    break;
216                }
217            }
218        }
219
220        // Exit if there's nothing left to do
221        if tasks.is_empty() && is_done {
222            break;
223        }
224
225        // Write finished segments to playlist file
226        match tasks.next().await {
227            Some(Ok((fname_audio, fname_video, size_total))) => {
228                playlist
229                    .add_segment(&fname_audio, &fname_video, segment_duration)
230                    .await
231                    .map_err(WorkerError::IoError)?;
232
233                let mut st = stats.write().await;
234                st.segments_downloaded += 1;
235                st.bytes_downloaded += size_total as u64;
236                st.print();
237            }
238            Some(Err(e)) => {
239                error!("Could not download segment: {}", e);
240            }
241            None => (),
242        }
243    }
244
245    // Close the playlist
246    playlist.finish().await.map_err(WorkerError::IoError)?;
247
248    Ok(())
249}