1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio_util::sync::CancellationToken;
10
11use crate::core::traits::SharedReporter;
12use crate::models::media::MediaInfo;
13use crate::models::queue::{QueueItemInfo, QueueStatus};
14use crate::platforms::traits::PlatformDownloader;
15
16fn shared_http_client() -> &'static reqwest::Client {
17 static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
18 CLIENT.get_or_init(|| {
19 crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
20 .build()
21 .unwrap_or_default()
22 })
23}
24
25struct CachedInfo {
26 info: MediaInfo,
27 cached_at: std::time::Instant,
28}
29
30static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
31
32fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
33 INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
34}
35
36const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
37
38static IN_FLIGHT_FETCHES: OnceLock<
39 tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
40> = OnceLock::new();
41
42fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
43 IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct MediaPreviewEvent {
48 pub url: String,
49 pub title: String,
50 pub author: String,
51 pub thumbnail_url: Option<String>,
52 pub duration_seconds: Option<f64>,
53}
54
55pub struct QueueItem {
56 pub id: u64,
57 pub url: String,
58 pub platform: String,
59 pub title: String,
60 pub status: QueueStatus,
61 pub cancel_token: CancellationToken,
62 pub output_dir: String,
63 pub download_mode: Option<String>,
64 pub quality: Option<String>,
65 pub video_format: Option<String>,
66 pub audio_format: Option<String>,
67 pub audio_quality: Option<String>,
68 pub format_id: Option<String>,
69 pub referer: Option<String>,
70 pub extra_headers: Option<std::collections::HashMap<String, String>>,
71 pub page_url: Option<String>,
72 pub user_agent: Option<String>,
73 pub percent: f64,
74 pub speed_bytes_per_sec: f64,
75 pub downloaded_bytes: u64,
76 pub total_bytes: Option<u64>,
77 pub file_path: Option<String>,
78 pub file_size_bytes: Option<u64>,
79 pub file_count: Option<u32>,
80 pub media_info: Option<MediaInfo>,
81 pub downloader: Arc<dyn PlatformDownloader>,
82 pub ytdlp_path: Option<PathBuf>,
83 pub from_hotkey: bool,
84 pub torrent_id: Option<usize>,
85 pub download_subtitles: Option<bool>,
86 pub phase: String,
87}
88
89impl QueueItem {
90 pub fn to_info(&self) -> QueueItemInfo {
91 QueueItemInfo {
92 id: self.id,
93 url: self.url.clone(),
94 platform: self.platform.clone(),
95 title: self.title.clone(),
96 status: self.status.clone(),
97 percent: self.percent,
98 speed_bytes_per_sec: self.speed_bytes_per_sec,
99 downloaded_bytes: self.downloaded_bytes,
100 total_bytes: self.total_bytes,
101 phase: self.phase.clone(),
102 file_path: self.file_path.clone(),
103 file_size_bytes: self.file_size_bytes,
104 file_count: self.file_count,
105 thumbnail_url: self
106 .media_info
107 .as_ref()
108 .and_then(|m| m.thumbnail_url.clone()),
109 }
110 }
111}
112
113pub struct DownloadQueue {
114 pub items: Vec<QueueItem>,
115 pub max_concurrent: u32,
116 pub stagger_delay_ms: u64,
117 pub reporter: Option<SharedReporter>,
118}
119
120impl DownloadQueue {
121 pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
122 Self {
123 items: Vec::new(),
124 max_concurrent,
125 stagger_delay_ms: 150,
126 reporter,
127 }
128 }
129
130 pub fn set_reporter(&mut self, reporter: SharedReporter) {
131 self.reporter = Some(reporter);
132 }
133
134 pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
135 let recovery_items = crate::core::manager::recovery::list();
136 for item in recovery_items {
137 let downloader = registry
138 .find_platform(&item.url)
139 .or_else(|| registry.find_by_name(&item.platform));
140
141 if let Some(dl) = downloader {
142 let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
143 100.0
144 } else {
145 0.0
146 };
147
148 let status = match item.status {
149 QueueStatus::Active | QueueStatus::Seeding => QueueStatus::Paused,
150 other => other,
151 };
152
153 let q_item = QueueItem {
154 id: item.id,
155 url: item.url,
156 platform: item.platform,
157 title: item.title,
158 status,
159 cancel_token: CancellationToken::new(),
160 output_dir: item.output_dir,
161 download_mode: item.download_mode,
162 quality: item.quality,
163 video_format: None,
164 audio_format: None,
165 audio_quality: None,
166 format_id: item.format_id,
167 referer: item.referer,
168 extra_headers: None,
169 page_url: None,
170 user_agent: None,
171 percent,
172 speed_bytes_per_sec: 0.0,
173 downloaded_bytes: 0,
174 total_bytes: item.file_size_bytes,
175 file_path: item.file_path,
176 file_size_bytes: item.file_size_bytes,
177 file_count: None,
178 media_info: None,
179 downloader: dl,
180 ytdlp_path: None,
181 from_hotkey: false,
182 torrent_id: None,
183 download_subtitles: None,
184 phase: "Restored".to_string(),
185 };
186 self.items.push(q_item);
187 }
188 }
189 }
190
191 fn sync_recovery(item: &QueueItem) {
192 crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
193 id: item.id,
194 url: item.url.clone(),
195 title: item.title.clone(),
196 platform: item.platform.clone(),
197 output_dir: item.output_dir.clone(),
198 status: item.status.clone(),
199 download_mode: item.download_mode.clone(),
200 quality: item.quality.clone(),
201 format_id: item.format_id.clone(),
202 referer: item.referer.clone(),
203 file_path: item.file_path.clone(),
204 file_size_bytes: item.file_size_bytes,
205 });
206 }
207
208 #[allow(clippy::too_many_arguments)]
209 pub fn enqueue(
210 &mut self,
211 id: u64,
212 url: String,
213 platform: String,
214 title: String,
215 output_dir: String,
216 download_mode: Option<String>,
217 quality: Option<String>,
218 video_format: Option<String>,
219 audio_format: Option<String>,
220 audio_quality: Option<String>,
221 download_subtitles: Option<bool>,
222 format_id: Option<String>,
223 referer: Option<String>,
224 extra_headers: Option<std::collections::HashMap<String, String>>,
225 page_url: Option<String>,
226 user_agent: Option<String>,
227 media_info: Option<MediaInfo>,
228 total_bytes: Option<u64>,
229 file_count: Option<u32>,
230 downloader: Arc<dyn PlatformDownloader>,
231 ytdlp_path: Option<PathBuf>,
232 from_hotkey: bool,
233 ) {
234 let item = QueueItem {
235 id,
236 url,
237 platform,
238 title,
239 status: QueueStatus::Queued,
240 cancel_token: CancellationToken::new(),
241 output_dir,
242 download_mode,
243 quality,
244 video_format,
245 audio_format,
246 audio_quality,
247 format_id,
248 referer,
249 extra_headers,
250 page_url,
251 user_agent,
252 percent: 0.0,
253 speed_bytes_per_sec: 0.0,
254 downloaded_bytes: 0,
255 total_bytes,
256 file_path: None,
257 file_size_bytes: None,
258 file_count,
259 media_info,
260 downloader,
261 ytdlp_path,
262 from_hotkey,
263 torrent_id: None,
264 download_subtitles,
265 phase: "Queued".to_string(),
266 };
267 self.items.push(item);
268 Self::sync_recovery(self.items.last().unwrap());
269 }
270
271 pub fn active_count(&self) -> u32 {
272 self.items
273 .iter()
274 .filter(|i| i.status == QueueStatus::Active)
275 .count() as u32
276 }
277
278 pub fn next_queued_ids(&self) -> Vec<u64> {
279 let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
280 self.items
281 .iter()
282 .filter(|i| i.status == QueueStatus::Queued)
283 .take(slots)
284 .map(|i| i.id)
285 .collect()
286 }
287
288 pub fn mark_active(&mut self, id: u64) {
289 let item = self.items.iter_mut().find(|i| i.id == id);
290 if let Some(item) = item {
291 item.status = QueueStatus::Active;
292 item.cancel_token = CancellationToken::new();
293 Self::sync_recovery(item);
294 }
295 }
296
297 pub fn mark_complete(
298 &mut self,
299 id: u64,
300 success: bool,
301 error: Option<String>,
302 file_path: Option<String>,
303 file_size_bytes: Option<u64>,
304 ) {
305 let item = self.items.iter_mut().find(|i| i.id == id);
306 if let Some(item) = item {
307 if success {
308 item.status = QueueStatus::Complete { success: true };
309 item.percent = 100.0;
310 } else {
311 item.status = QueueStatus::Error {
312 message: error.unwrap_or_default(),
313 };
314 }
315 item.file_path = file_path;
316 item.file_size_bytes = file_size_bytes;
317 item.speed_bytes_per_sec = 0.0;
318 Self::sync_recovery(item);
319 }
320 }
321
322 pub fn mark_seeding(
323 &mut self,
324 id: u64,
325 file_path: Option<String>,
326 file_size_bytes: Option<u64>,
327 torrent_id: Option<usize>,
328 ) {
329 let item = self.items.iter_mut().find(|i| i.id == id);
330 if let Some(item) = item {
331 item.status = QueueStatus::Seeding;
332 item.percent = 100.0;
333 item.file_path = file_path;
334 item.file_size_bytes = file_size_bytes;
335 item.speed_bytes_per_sec = 0.0;
336 item.torrent_id = torrent_id;
337 Self::sync_recovery(item);
338 }
339 }
340
341 pub fn update_progress(
342 &mut self,
343 id: u64,
344 percent: f64,
345 speed: f64,
346 downloaded: u64,
347 total: Option<u64>,
348 torrent_id: Option<usize>,
349 ) {
350 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
351 item.percent = percent;
352 item.speed_bytes_per_sec = speed;
353 item.downloaded_bytes = downloaded;
354 if let Some(t) = total {
355 item.total_bytes = Some(t);
356 }
357 if torrent_id.is_some() && item.torrent_id.is_none() {
358 item.torrent_id = torrent_id;
359 }
360 }
361 }
362
363 pub fn pause(&mut self, id: u64) -> bool {
364 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
365 if item.status == QueueStatus::Active {
366 if item.platform != "magnet" {
367 item.cancel_token.cancel();
368 }
369 item.status = QueueStatus::Paused;
370 item.speed_bytes_per_sec = 0.0;
371 Self::sync_recovery(item);
372 return true;
373 }
374 }
375 false
376 }
377
378 pub fn resume(&mut self, id: u64) -> bool {
379 let item = self.items.iter_mut().find(|i| i.id == id);
380 if let Some(item) = item {
381 if item.status == QueueStatus::Paused {
382 if item.platform == "magnet" {
383 item.status = QueueStatus::Active;
384 } else {
385 item.status = QueueStatus::Queued;
386 item.cancel_token = CancellationToken::new();
387 }
388 Self::sync_recovery(item);
389 return true;
390 }
391 }
392 false
393 }
394
395 pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
396 let result = self.cancel_inner(id);
397 if result.0 {
398 crate::core::manager::recovery::remove(id);
399 }
400 result
401 }
402
403 fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
404 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
405 match &item.status {
406 QueueStatus::Active => {
407 item.cancel_token.cancel();
408 item.status = QueueStatus::Error {
409 message: "Cancelled".to_string(),
410 };
411 item.speed_bytes_per_sec = 0.0;
412 Self::sync_recovery(item);
413 return (true, None);
414 }
415 QueueStatus::Seeding => {
416 let tid = item.torrent_id;
417 item.status = QueueStatus::Error {
418 message: "Cancelled".to_string(),
419 };
420 item.speed_bytes_per_sec = 0.0;
421 Self::sync_recovery(item);
422 return (true, tid);
423 }
424 QueueStatus::Paused => {
425 item.cancel_token.cancel();
426 let tid = if item.platform == "magnet" {
427 item.torrent_id
428 } else {
429 None
430 };
431 item.status = QueueStatus::Error {
432 message: "Cancelled".to_string(),
433 };
434 item.speed_bytes_per_sec = 0.0;
435 Self::sync_recovery(item);
436 return (true, tid);
437 }
438 QueueStatus::Queued => {
439 item.status = QueueStatus::Error {
440 message: "Cancelled".to_string(),
441 };
442 Self::sync_recovery(item);
443 return (true, None);
444 }
445 _ => {}
446 }
447 }
448 (false, None)
449 }
450
451 pub fn retry(&mut self, id: u64) -> bool {
452 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
453 if matches!(item.status, QueueStatus::Error { .. }) {
454 item.status = QueueStatus::Queued;
455 item.cancel_token = CancellationToken::new();
456 item.percent = 0.0;
457 item.speed_bytes_per_sec = 0.0;
458 item.downloaded_bytes = 0;
459 item.file_path = None;
460 item.file_size_bytes = None;
461 Self::sync_recovery(item);
462 return true;
463 }
464 }
465 false
466 }
467
468 pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
469 let result = self.remove_inner(id);
470 if result.is_some() {
471 crate::core::manager::recovery::remove(id);
472 }
473 result
474 }
475
476 fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
477 if let Some(pos) = self.items.iter().position(|i| i.id == id) {
478 let item = &self.items[pos];
479 if item.status == QueueStatus::Active {
480 item.cancel_token.cancel();
481 }
482 if item.status == QueueStatus::Paused && item.platform == "magnet" {
483 item.cancel_token.cancel();
484 }
485 let torrent_id = if item.status == QueueStatus::Seeding
486 || (item.status == QueueStatus::Paused && item.platform == "magnet")
487 {
488 item.torrent_id
489 } else {
490 None
491 };
492 self.items.remove(pos);
493 return Some(torrent_id);
494 }
495 None
496 }
497
498 pub fn clear_finished(&mut self) {
499 let to_remove: Vec<u64> = self
500 .items
501 .iter()
502 .filter(|i| {
503 matches!(
504 i.status,
505 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
506 )
507 })
508 .map(|i| i.id)
509 .collect();
510 for id in &to_remove {
511 crate::core::manager::recovery::remove(*id);
512 }
513 self.items.retain(|i| {
514 !matches!(
515 i.status,
516 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
517 )
518 });
519 }
520
521 pub fn get_state(&self) -> Vec<QueueItemInfo> {
522 self.items.iter().map(|i| i.to_info()).collect()
523 }
524
525 pub fn has_url(&self, url: &str) -> bool {
526 self.items.iter().any(|i| {
527 i.url == url
528 && matches!(
529 i.status,
530 QueueStatus::Queued
531 | QueueStatus::Active
532 | QueueStatus::Paused
533 | QueueStatus::Seeding
534 )
535 })
536 }
537}
538
539pub struct ProgressThrottle {
540 last_emit: std::time::Instant,
541 min_interval: std::time::Duration,
542}
543
544impl ProgressThrottle {
545 pub fn new(min_interval_ms: u64) -> Self {
546 Self {
547 last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
548 min_interval: std::time::Duration::from_millis(min_interval_ms),
549 }
550 }
551
552 pub fn should_emit(&mut self) -> bool {
553 let now = std::time::Instant::now();
554 if now.duration_since(self.last_emit) >= self.min_interval {
555 self.last_emit = now;
556 true
557 } else {
558 false
559 }
560 }
561}
562
563pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
564 let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
565 if n.is_multiple_of(10) {
566 tracing::debug!("[perf] emit_queue_state called {} times", n);
567 }
568 if let Some(reporter) = reporter {
569 reporter.on_queue_update(state);
570 }
571}
572
573pub fn emit_queue_state(queue: &DownloadQueue) {
574 let state = queue.get_state();
575 emit_queue_state_from_state(&queue.reporter, state);
576}
577
578struct ActiveJobSlot {
580 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
581 item_id: u64,
582 armed: bool,
583}
584
585impl ActiveJobSlot {
586 fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
587 Self {
588 queue,
589 item_id,
590 armed: true,
591 }
592 }
593
594 fn disarm(mut self) {
595 self.armed = false;
596 }
597}
598
599impl Drop for ActiveJobSlot {
600 fn drop(&mut self) {
601 if !self.armed {
602 return;
603 }
604 let queue = self.queue.clone();
605 let item_id = self.item_id;
606 tokio::spawn(async move {
607 let state = {
608 let mut q = queue.lock().await;
609 let still_active = q
610 .items
611 .iter()
612 .find(|i| i.id == item_id)
613 .map(|i| i.status == QueueStatus::Active)
614 .unwrap_or(false);
615 if !still_active {
616 return;
617 }
618 tracing::warn!(
619 "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
620 item_id
621 );
622 q.mark_complete(
623 item_id,
624 false,
625 Some("Download interrupted".to_string()),
626 None,
627 None,
628 );
629 q.get_state()
630 };
631 let reporter = { queue.lock().await.reporter.clone() };
632 emit_queue_state_from_state(&reporter, state);
633 tokio::spawn(try_start_next(queue));
634 });
635 }
636}
637
638pub fn spawn_download(
639 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
640 item_id: u64,
641) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
642 Box::pin(async move {
643 let _timer_start = std::time::Instant::now();
644 let slot = ActiveJobSlot::new(queue.clone(), item_id);
645 tokio::spawn(spawn_download_inner(queue.clone(), item_id));
646 slot.disarm();
647 tracing::debug!(
648 "[perf] spawn_download {} took {:?}",
649 item_id,
650 _timer_start.elapsed()
651 );
652 })
653}
654
655struct DownloadContext {
656 url: String,
657 output_dir: String,
658 download_mode: Option<String>,
659 quality: Option<String>,
660 video_format: Option<String>,
661 audio_format: Option<String>,
662 audio_quality: Option<String>,
663 download_subtitles: Option<bool>,
664 format_id: Option<String>,
665 referer: Option<String>,
666 extra_headers: Option<std::collections::HashMap<String, String>>,
667 page_url: Option<String>,
668 user_agent: Option<String>,
669 cancel_token: tokio_util::sync::CancellationToken,
670 media_info: Option<crate::models::media::MediaInfo>,
671 platform_name: String,
672 downloader: std::sync::Arc<dyn crate::platforms::traits::PlatformDownloader>,
673 ytdlp_path: Option<std::path::PathBuf>,
674 from_hotkey: bool,
675}
676
677async fn extract_download_context(
678 queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
679 item_id: u64,
680) -> Option<DownloadContext> {
681 let q = queue.lock().await;
682 let item = q.items.iter().find(|i| i.id == item_id)?;
683 Some(DownloadContext {
684 url: item.url.clone(),
685 output_dir: item.output_dir.clone(),
686 download_mode: item.download_mode.clone(),
687 quality: item.quality.clone(),
688 video_format: item.video_format.clone(),
689 audio_format: item.audio_format.clone(),
690 audio_quality: item.audio_quality.clone(),
691 download_subtitles: item.download_subtitles,
692 format_id: item.format_id.clone(),
693 referer: item.referer.clone(),
694 extra_headers: item.extra_headers.clone(),
695 page_url: item.page_url.clone(),
696 user_agent: item.user_agent.clone(),
697 cancel_token: item.cancel_token.clone(),
698 media_info: item.media_info.clone(),
699 platform_name: item.platform.clone(),
700 downloader: item.downloader.clone(),
701 ytdlp_path: item.ytdlp_path.clone(),
702 from_hotkey: item.from_hotkey,
703 })
704}
705
706async fn prepare_media_info(
707 queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
708 item_id: u64,
709 ctx: &DownloadContext,
710 reporter: &Option<crate::core::traits::SharedReporter>,
711) -> Option<crate::models::media::MediaInfo> {
712 let info_start = std::time::Instant::now();
713 let info = match &ctx.media_info {
714 Some(i) => {
715 tracing::info!(
716 "[queue] info for {} from cache/pre-fetched in {:?}",
717 item_id,
718 info_start.elapsed()
719 );
720 i.clone()
721 }
722 None => {
723 tracing::debug!(
724 "[perf] spawn_download_inner {}: media_info is None, fetching info",
725 item_id
726 );
727 if let Some(r) = reporter {
728 r.on_progress(
729 item_id,
730 crate::core::events::QueueItemProgress {
731 id: item_id,
732 title: ctx.url.clone(),
733 platform: ctx.platform_name.clone(),
734 percent: 0.0,
735 speed_bytes_per_sec: 0.0,
736 downloaded_bytes: 0,
737 total_bytes: None,
738 phase: "fetching_info".to_string(),
739 },
740 );
741 }
742
743 let info_result = tokio::time::timeout(
744 std::time::Duration::from_secs(60),
745 fetch_and_cache_info(&ctx.url, &*ctx.downloader, &ctx.platform_name),
746 )
747 .await;
748
749 match info_result {
750 Ok(Ok(i)) => i,
751 Ok(Err(e)) => {
752 let state = {
753 let mut q = queue.lock().await;
754 q.mark_complete(item_id, false, Some(e.to_string()), None, None);
755 q.get_state()
756 };
757 emit_queue_state_from_state(reporter, state);
758 tokio::spawn(try_start_next(queue.clone()));
759 return None;
760 }
761 Err(_) => {
762 tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
763 let state = {
764 let mut q = queue.lock().await;
765 q.mark_complete(
766 item_id,
767 false,
768 Some("Timed out fetching video info".to_string()),
769 None,
770 None,
771 );
772 q.get_state()
773 };
774 emit_queue_state_from_state(reporter, state);
775 tokio::spawn(try_start_next(queue.clone()));
776 return None;
777 }
778 }
779 }
780 };
781 tracing::info!(
782 "[queue] info fetch for {} took {:?}",
783 item_id,
784 info_start.elapsed()
785 );
786
787 let state = {
788 let mut q = queue.lock().await;
789 if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
790 item.title = info.title.clone();
791 item.total_bytes = info.file_size_bytes;
792 let fc = if info.media_type == crate::models::media::MediaType::Carousel
793 || info.media_type == crate::models::media::MediaType::Playlist
794 {
795 info.available_qualities.len() as u32
796 } else {
797 1
798 };
799 item.file_count = Some(fc);
800 item.media_info = Some(info.clone());
801 }
802 q.get_state()
803 };
804 emit_queue_state_from_state(reporter, state);
805
806 if let Some(r) = reporter {
807 r.on_progress(
808 item_id,
809 crate::core::events::QueueItemProgress {
810 id: item_id,
811 title: info.title.clone(),
812 platform: ctx.platform_name.clone(),
813 percent: 0.5,
814 speed_bytes_per_sec: 0.0,
815 downloaded_bytes: 0,
816 total_bytes: info.file_size_bytes,
817 phase: "starting".to_string(),
818 },
819 );
820 }
821
822 Some(info)
823}
824
825fn build_download_options(
826 ctx: &DownloadContext,
827) -> (
828 crate::models::media::DownloadOptions,
829 std::sync::Arc<tokio::sync::Mutex<Option<usize>>>,
830) {
831 let settings = crate::models::settings::AppSettings::load_from_disk();
832 let tmpl = settings.download.filename_template.clone();
833 let mut final_output_dir = std::path::PathBuf::from(&ctx.output_dir);
834 if settings.download.organize_by_platform {
835 final_output_dir = final_output_dir.join(&ctx.platform_name);
836 }
837 let torrent_id_slot = std::sync::Arc::new(tokio::sync::Mutex::new(None));
838 let opts = crate::models::media::DownloadOptions {
839 quality: ctx
840 .quality
841 .clone()
842 .or_else(|| Some(settings.download.video_quality.clone())),
843 video_format: ctx
844 .video_format
845 .clone()
846 .or_else(|| Some(settings.download.video_format.clone())),
847 audio_format: ctx
848 .audio_format
849 .clone()
850 .or_else(|| Some(settings.download.audio_format.clone())),
851 audio_quality: ctx
852 .audio_quality
853 .clone()
854 .or_else(|| Some(settings.download.audio_quality.clone())),
855 output_dir: final_output_dir,
856 filename_template: Some(tmpl),
857 download_subtitles: ctx
858 .download_subtitles
859 .unwrap_or(settings.download.download_subtitles),
860 include_auto_subtitles: settings.download.include_auto_subtitles,
861 download_mode: ctx.download_mode.clone(),
862 format_id: ctx.format_id.clone(),
863 referer: ctx.referer.clone(),
864 extra_headers: ctx.extra_headers.clone(),
865 page_url: ctx.page_url.clone(),
866 user_agent: ctx.user_agent.clone(),
867 cancel_token: ctx.cancel_token.clone(),
868 concurrent_fragments: settings.advanced.concurrent_fragments,
869 ytdlp_path: ctx.ytdlp_path.clone(),
870 torrent_listen_port: Some(settings.advanced.torrent_listen_port),
871 torrent_id_slot: Some(torrent_id_slot.clone()),
872 };
873 (opts, torrent_id_slot)
874}
875
876async fn handle_download_result(
877 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
878 item_id: u64,
879 ctx: DownloadContext,
880 info: crate::models::media::MediaInfo,
881 reporter: Option<crate::core::traits::SharedReporter>,
882 result: anyhow::Result<crate::models::media::DownloadResult>,
883) {
884 let settings = crate::models::settings::AppSettings::load_from_disk();
885 match result {
886 Ok(dl) => {
887 if settings.download.embed_metadata
888 && ctx.platform_name != "magnet"
889 && crate::core::ffmpeg::is_ffmpeg_available().await
890 {
891 let metadata = crate::core::ffmpeg::MetadataEmbed {
892 title: Some(info.title.clone()),
893 artist: Some(info.author.clone()),
894 thumbnail_url: info.thumbnail_url.clone(),
895 ..Default::default()
896 };
897 if let Err(e) = crate::core::ffmpeg::embed_metadata(
898 &dl.file_path,
899 &metadata,
900 settings.download.embed_thumbnail,
901 shared_http_client(),
902 )
903 .await
904 {
905 tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
906 }
907 }
908
909 if ctx.from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
910 #[cfg(not(target_os = "android"))]
911 {
912 match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
913 Ok(()) => {
914 tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
915 }
916 Err(e) => {
917 tracing::warn!("[clipboard] failed to copy file: {}", e);
918 }
919 }
920 }
921 }
922
923 let state = {
924 let mut q = queue.lock().await;
925 if ctx.platform_name == "magnet" && dl.torrent_id.is_some() {
926 q.mark_seeding(
927 item_id,
928 Some(dl.file_path.to_string_lossy().to_string()),
929 Some(dl.file_size_bytes),
930 dl.torrent_id,
931 );
932 } else {
933 q.mark_complete(
934 item_id,
935 true,
936 None,
937 Some(dl.file_path.to_string_lossy().to_string()),
938 Some(dl.file_size_bytes),
939 );
940 }
941 q.get_state()
942 };
943 if let Some(r) = &reporter {
944 r.on_complete(
945 item_id,
946 Some(dl.file_path.to_string_lossy().to_string()),
947 Some(dl.file_size_bytes),
948 );
949 }
950 emit_queue_state_from_state(&reporter, state);
951 }
952 Err(e) => {
953 let raw_err = format!("{}", e);
954 let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
955 let user_msg = if category != "unknown" {
956 format!("{} ({})", hint, raw_err)
957 } else {
958 raw_err.clone()
959 };
960 tracing::error!(
961 "Download error '{}' [{}]: {}",
962 ctx.platform_name,
963 category,
964 raw_err
965 );
966 let state = {
967 let mut q = queue.lock().await;
968 q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
969 q.get_state()
970 };
971 if let Some(r) = &reporter {
972 r.on_error(item_id, user_msg);
973 }
974 emit_queue_state_from_state(&reporter, state);
975 }
976 }
977}
978
979async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
980 tracing::info!("[queue] download {} started", item_id);
981
982 let reporter = { queue.lock().await.reporter.clone() };
983
984 if let Some(r) = &reporter {
985 r.on_progress(
986 item_id,
987 crate::core::events::QueueItemProgress {
988 id: item_id,
989 title: "".to_string(),
990 platform: "".to_string(),
991 percent: 0.0,
992 speed_bytes_per_sec: 0.0,
993 downloaded_bytes: 0,
994 total_bytes: None,
995 phase: "preparing".to_string(),
996 },
997 );
998 }
999
1000 let ctx = match extract_download_context(&queue, item_id).await {
1001 Some(c) => c,
1002 None => return,
1003 };
1004
1005 let info = match prepare_media_info(&queue, item_id, &ctx, &reporter).await {
1006 Some(i) => i,
1007 None => return,
1008 };
1009
1010 let (opts, torrent_id_slot) = build_download_options(&ctx);
1011
1012 let total_bytes = info.file_size_bytes;
1013 let item_title = info.title.clone();
1014 let item_platform = ctx.platform_name.clone();
1015 let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(32);
1016
1017 let reporter_progress = reporter.clone();
1018 let queue_progress = queue.clone();
1019 let torrent_id_slot_progress = torrent_id_slot.clone();
1020 let progress_forwarder = tokio::spawn(async move {
1021 let mut last_bytes: u64 = 0;
1022 let mut last_time = std::time::Instant::now();
1023 let mut throttle = ProgressThrottle::new(250);
1024 let mut current_speed: f64 = 0.0;
1025
1026 while let Some(percent) = rx.recv().await {
1027 if !throttle.should_emit() && percent < 100.0 {
1028 continue;
1029 }
1030
1031 let now = std::time::Instant::now();
1032 let clamped = percent.max(0.0);
1033 let downloaded_bytes = total_bytes
1034 .map(|total| (clamped / 100.0 * total as f64) as u64)
1035 .unwrap_or(0);
1036
1037 if total_bytes.is_some() && downloaded_bytes > last_bytes {
1038 let dt = now.duration_since(last_time).as_secs_f64();
1039 if dt > 0.1 {
1040 let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
1041 current_speed = if current_speed > 0.0 {
1042 current_speed * 0.7 + instant_speed * 0.3
1043 } else {
1044 instant_speed
1045 };
1046 }
1047 }
1048
1049 last_bytes = downloaded_bytes;
1050 last_time = now;
1051
1052 let phase = match percent {
1053 p if p < -1.5 => "connecting",
1054 p if p < -0.5 => "starting",
1055 p if p > 0.0 => "downloading",
1056 _ => "starting",
1057 };
1058
1059 {
1060 let mut q = queue_progress.lock().await;
1061 let tid = { *torrent_id_slot_progress.lock().await };
1062 q.update_progress(
1063 item_id,
1064 clamped,
1065 current_speed,
1066 downloaded_bytes,
1067 total_bytes,
1068 tid,
1069 );
1070 }
1071
1072 if let Some(r) = &reporter_progress {
1073 r.on_progress(
1074 item_id,
1075 crate::core::events::QueueItemProgress {
1076 id: item_id,
1077 title: item_title.clone(),
1078 platform: item_platform.clone(),
1079 percent: clamped,
1080 speed_bytes_per_sec: current_speed,
1081 downloaded_bytes,
1082 total_bytes,
1083 phase: phase.to_string(),
1084 },
1085 );
1086 }
1087 }
1088 });
1089
1090 if let Some(ua) = opts.user_agent.clone() {
1091 crate::core::ytdlp::register_ext_user_agent(ctx.url.clone(), ua);
1092 }
1093 if let Some(hdrs) = opts.extra_headers.clone() {
1094 crate::core::ytdlp::register_ext_headers(ctx.url.clone(), hdrs);
1095 }
1096
1097 if !crate::core::ffmpeg::is_ffmpeg_available().await {
1099 let rep_ref = reporter.as_ref().map(|r| r.as_ref());
1100 match crate::core::dependencies::ensure_ffmpeg(rep_ref).await {
1101 Ok(path) => {
1102 tracing::info!("[ffmpeg] auto-installed to {:?}", path);
1103 crate::core::ffmpeg::reset_ffmpeg_available_cache();
1104 }
1105 Err(e) => tracing::warn!("[ffmpeg] auto-install failed: {}", e),
1106 }
1107 }
1108
1109 let dl_start = std::time::Instant::now();
1110 let dl_future = async {
1111 tokio::select! {
1112 r = ctx.downloader.download(&info, &opts, tx) => r,
1113 _ = ctx.cancel_token.cancelled() => {
1114 Err(anyhow::anyhow!("Download cancelled"))
1115 }
1116 }
1117 };
1118 let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
1119 .scope(item_id, dl_future)
1120 .await;
1121 crate::core::ytdlp::clear_ext_user_agent(&ctx.url);
1122 crate::core::ytdlp::clear_ext_headers(&ctx.url);
1123 tracing::info!(
1124 "[queue] download {} completed in {:?}",
1125 item_id,
1126 dl_start.elapsed()
1127 );
1128
1129 let _ = progress_forwarder.await;
1130
1131 let was_paused = {
1132 let q = queue.lock().await;
1133 q.items
1134 .iter()
1135 .find(|i| i.id == item_id)
1136 .map(|i| i.status == QueueStatus::Paused)
1137 .unwrap_or(false)
1138 };
1139
1140 if was_paused {
1141 let state = {
1142 let q = queue.lock().await;
1143 q.get_state()
1144 };
1145 emit_queue_state_from_state(&reporter, state);
1146 tokio::spawn(try_start_next(queue));
1147 return;
1148 }
1149
1150 handle_download_result(queue.clone(), item_id, ctx, info, reporter, result).await;
1151
1152 tokio::spawn(try_start_next(queue));
1153}
1154
1155pub async fn fetch_and_cache_info(
1156 url: &str,
1157 downloader: &dyn PlatformDownloader,
1158 platform: &str,
1159) -> anyhow::Result<MediaInfo> {
1160 {
1161 let cache = info_cache().lock().await;
1162 if let Some(entry) = cache.get(url) {
1163 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1164 tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1165 return Ok(entry.info.clone());
1166 }
1167 }
1168 }
1169
1170 let url_lock = {
1171 let mut map = in_flight_map().lock().await;
1172 map.entry(url.to_string())
1173 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1174 .clone()
1175 };
1176 let _guard = url_lock.lock().await;
1177
1178 {
1179 let cache = info_cache().lock().await;
1180 if let Some(entry) = cache.get(url) {
1181 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1182 tracing::debug!(
1183 "[perf] fetch_and_cache_info: dedup cache hit for {}",
1184 platform
1185 );
1186 return Ok(entry.info.clone());
1187 }
1188 }
1189 }
1190
1191 tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1192 let mut info = downloader.get_media_info(url).await?;
1193
1194 if is_generic_title(&info.title) {
1195 let name = crate::core::random_names::get_random_name();
1196 info.title = format!("video_{}", name);
1197 }
1198
1199 let mut cache = info_cache().lock().await;
1200 cache.insert(
1201 url.to_string(),
1202 CachedInfo {
1203 info: info.clone(),
1204 cached_at: std::time::Instant::now(),
1205 },
1206 );
1207 if cache.len() > 50 {
1208 cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1209 }
1210 Ok(info)
1211}
1212
1213pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1214 let cache = info_cache().lock().await;
1215 cache
1216 .get(url)
1217 .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1218 .map(|entry| entry.info.clone())
1219}
1220
1221pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1222 prefetch_info_with_emit(url, downloader, platform, None).await;
1223}
1224
1225pub async fn prefetch_info_with_emit(
1226 url: &str,
1227 downloader: &dyn PlatformDownloader,
1228 platform: &str,
1229 reporter: Option<SharedReporter>,
1230) {
1231 let _timer_start = std::time::Instant::now();
1232 tracing::debug!("[perf] prefetch_info: started");
1233 match fetch_and_cache_info(url, downloader, platform).await {
1234 Ok(info) => {
1235 tracing::debug!(
1236 "[perf] prefetch_info: completed in {:?} — {}",
1237 _timer_start.elapsed(),
1238 info.title
1239 );
1240 if let Some(r) = reporter {
1241 r.on_media_preview(
1242 url.to_string(),
1243 info.title.clone(),
1244 info.author.clone(),
1245 info.thumbnail_url.clone(),
1246 info.duration_seconds,
1247 );
1248 }
1249 }
1250 Err(e) => tracing::warn!(
1251 "[perf] prefetch_info: failed in {:?} — {}",
1252 _timer_start.elapsed(),
1253 e
1254 ),
1255 }
1256}
1257
1258pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1259 let _timer_start = std::time::Instant::now();
1260 let (next_ids, stagger, state_to_emit, reporter) = {
1261 let mut q = queue.lock().await;
1262 let ids = q.next_queued_ids();
1263 for nid in &ids {
1264 q.mark_active(*nid);
1265 }
1266 let state = if !ids.is_empty() {
1267 Some(q.get_state())
1268 } else {
1269 None
1270 };
1271 (ids, q.stagger_delay_ms, state, q.reporter.clone())
1272 };
1273
1274 if let Some(state) = state_to_emit {
1275 emit_queue_state_from_state(&reporter, state);
1276 }
1277
1278 let batch_size = next_ids.len();
1279 for (i, nid) in next_ids.into_iter().enumerate() {
1280 if let Some(r) = &reporter {
1281 r.on_progress(
1282 nid,
1283 crate::core::events::QueueItemProgress {
1284 id: nid,
1285 title: String::new(),
1286 platform: String::new(),
1287 percent: 0.0,
1288 speed_bytes_per_sec: 0.0,
1289 downloaded_bytes: 0,
1290 total_bytes: None,
1291 phase: "queued_starting".to_string(),
1292 },
1293 );
1294 }
1295
1296 if i > 0 {
1297 let item_platform = {
1298 let q = queue.lock().await;
1299 q.items
1300 .iter()
1301 .find(|item| item.id == nid)
1302 .map(|item| item.platform.clone())
1303 };
1304 let delay_ms = if item_platform.as_deref() == Some("youtube") {
1305 2000
1306 } else if batch_size > 3 {
1307 stagger.max(1000)
1308 } else {
1309 stagger
1310 };
1311 if delay_ms > 0 {
1312 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1313 }
1314 }
1315 let queue_c = queue.clone();
1316 tokio::spawn(async move {
1317 tokio::spawn(spawn_download(queue_c, nid));
1318 });
1319 }
1320 tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1321}
1322
1323fn is_generic_title(title: &str) -> bool {
1324 let t = title.to_lowercase();
1325 let t = t.trim();
1326 t.is_empty()
1327 || t == "video"
1328 || t == "media"
1329 || t == "untitled"
1330 || t == "unknown"
1331 || t.starts_with("video [video]")
1332 || t.starts_with("media [media]")
1333}