1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio_util::sync::CancellationToken;
10
11use crate::core::traits::SharedReporter;
12use crate::models::media::MediaInfo;
13use crate::models::queue::{QueueItemInfo, QueueStatus};
14use crate::platforms::traits::PlatformDownloader;
15
16fn shared_http_client() -> &'static reqwest::Client {
17 static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
18 CLIENT.get_or_init(|| {
19 crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
20 .build()
21 .unwrap_or_default()
22 })
23}
24
25struct CachedInfo {
26 info: MediaInfo,
27 cached_at: std::time::Instant,
28}
29
30static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
31
32fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
33 INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
34}
35
36const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
37
38static IN_FLIGHT_FETCHES: OnceLock<
39 tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
40> = OnceLock::new();
41
42fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
43 IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct MediaPreviewEvent {
48 pub url: String,
49 pub title: String,
50 pub author: String,
51 pub thumbnail_url: Option<String>,
52 pub duration_seconds: Option<f64>,
53}
54
55pub struct QueueItem {
56 pub id: u64,
57 pub url: String,
58 pub platform: String,
59 pub title: String,
60 pub status: QueueStatus,
61 pub cancel_token: CancellationToken,
62 pub output_dir: String,
63 pub download_mode: Option<String>,
64 pub quality: Option<String>,
65 pub video_format: Option<String>,
66 pub audio_format: Option<String>,
67 pub audio_quality: Option<String>,
68 pub format_id: Option<String>,
69 pub referer: Option<String>,
70 pub extra_headers: Option<std::collections::HashMap<String, String>>,
71 pub page_url: Option<String>,
72 pub user_agent: Option<String>,
73 pub percent: f64,
74 pub speed_bytes_per_sec: f64,
75 pub downloaded_bytes: u64,
76 pub total_bytes: Option<u64>,
77 pub file_path: Option<String>,
78 pub file_size_bytes: Option<u64>,
79 pub file_count: Option<u32>,
80 pub media_info: Option<MediaInfo>,
81 pub downloader: Arc<dyn PlatformDownloader>,
82 pub ytdlp_path: Option<PathBuf>,
83 pub from_hotkey: bool,
84 pub torrent_id: Option<usize>,
85 pub download_subtitles: Option<bool>,
86 pub phase: String,
87}
88
89impl QueueItem {
90 pub fn to_info(&self) -> QueueItemInfo {
91 QueueItemInfo {
92 id: self.id,
93 url: self.url.clone(),
94 platform: self.platform.clone(),
95 title: self.title.clone(),
96 status: self.status.clone(),
97 percent: self.percent,
98 speed_bytes_per_sec: self.speed_bytes_per_sec,
99 downloaded_bytes: self.downloaded_bytes,
100 total_bytes: self.total_bytes,
101 phase: self.phase.clone(),
102 file_path: self.file_path.clone(),
103 file_size_bytes: self.file_size_bytes,
104 file_count: self.file_count,
105 thumbnail_url: self
106 .media_info
107 .as_ref()
108 .and_then(|m| m.thumbnail_url.clone()),
109 }
110 }
111}
112
113pub struct DownloadQueue {
114 pub items: Vec<QueueItem>,
115 pub max_concurrent: u32,
116 pub stagger_delay_ms: u64,
117 pub reporter: Option<SharedReporter>,
118}
119
120impl DownloadQueue {
121 pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
122 Self {
123 items: Vec::new(),
124 max_concurrent,
125 stagger_delay_ms: 150,
126 reporter,
127 }
128 }
129
130 pub fn set_reporter(&mut self, reporter: SharedReporter) {
131 self.reporter = Some(reporter);
132 }
133
134 pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
135 let recovery_items = crate::core::manager::recovery::list();
136 for item in recovery_items {
137 let downloader = registry
138 .find_platform(&item.url)
139 .or_else(|| registry.find_by_name(&item.platform));
140
141 if let Some(dl) = downloader {
142 let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
143 100.0
144 } else {
145 0.0
146 };
147
148 let q_item = QueueItem {
149 id: item.id,
150 url: item.url,
151 platform: item.platform,
152 title: item.title,
153 status: item.status,
154 cancel_token: CancellationToken::new(),
155 output_dir: item.output_dir,
156 download_mode: item.download_mode,
157 quality: item.quality,
158 video_format: None,
159 audio_format: None,
160 audio_quality: None,
161 format_id: item.format_id,
162 referer: item.referer,
163 extra_headers: None,
164 page_url: None,
165 user_agent: None,
166 percent,
167 speed_bytes_per_sec: 0.0,
168 downloaded_bytes: 0,
169 total_bytes: item.file_size_bytes,
170 file_path: item.file_path,
171 file_size_bytes: item.file_size_bytes,
172 file_count: None,
173 media_info: None,
174 downloader: dl,
175 ytdlp_path: None,
176 from_hotkey: false,
177 torrent_id: None,
178 download_subtitles: None,
179 phase: "Restored".to_string(),
180 };
181 self.items.push(q_item);
182 }
183 }
184 }
185
186 fn sync_recovery(item: &QueueItem) {
187 crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
188 id: item.id,
189 url: item.url.clone(),
190 title: item.title.clone(),
191 platform: item.platform.clone(),
192 output_dir: item.output_dir.clone(),
193 status: item.status.clone(),
194 download_mode: item.download_mode.clone(),
195 quality: item.quality.clone(),
196 format_id: item.format_id.clone(),
197 referer: item.referer.clone(),
198 file_path: item.file_path.clone(),
199 file_size_bytes: item.file_size_bytes,
200 });
201 }
202
203 #[allow(clippy::too_many_arguments)]
204 pub fn enqueue(
205 &mut self,
206 id: u64,
207 url: String,
208 platform: String,
209 title: String,
210 output_dir: String,
211 download_mode: Option<String>,
212 quality: Option<String>,
213 video_format: Option<String>,
214 audio_format: Option<String>,
215 audio_quality: Option<String>,
216 download_subtitles: Option<bool>,
217 format_id: Option<String>,
218 referer: Option<String>,
219 extra_headers: Option<std::collections::HashMap<String, String>>,
220 page_url: Option<String>,
221 user_agent: Option<String>,
222 media_info: Option<MediaInfo>,
223 total_bytes: Option<u64>,
224 file_count: Option<u32>,
225 downloader: Arc<dyn PlatformDownloader>,
226 ytdlp_path: Option<PathBuf>,
227 from_hotkey: bool,
228 ) {
229 let item = QueueItem {
230 id,
231 url,
232 platform,
233 title,
234 status: QueueStatus::Queued,
235 cancel_token: CancellationToken::new(),
236 output_dir,
237 download_mode,
238 quality,
239 video_format,
240 audio_format,
241 audio_quality,
242 format_id,
243 referer,
244 extra_headers,
245 page_url,
246 user_agent,
247 percent: 0.0,
248 speed_bytes_per_sec: 0.0,
249 downloaded_bytes: 0,
250 total_bytes,
251 file_path: None,
252 file_size_bytes: None,
253 file_count,
254 media_info,
255 downloader,
256 ytdlp_path,
257 from_hotkey,
258 torrent_id: None,
259 download_subtitles,
260 phase: "Queued".to_string(),
261 };
262 self.items.push(item);
263 Self::sync_recovery(self.items.last().unwrap());
264 }
265
266 pub fn active_count(&self) -> u32 {
267 self.items
268 .iter()
269 .filter(|i| i.status == QueueStatus::Active)
270 .count() as u32
271 }
272
273 pub fn next_queued_ids(&self) -> Vec<u64> {
274 let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
275 self.items
276 .iter()
277 .filter(|i| i.status == QueueStatus::Queued)
278 .take(slots)
279 .map(|i| i.id)
280 .collect()
281 }
282
283 pub fn mark_active(&mut self, id: u64) {
284 let item = self.items.iter_mut().find(|i| i.id == id);
285 if let Some(item) = item {
286 item.status = QueueStatus::Active;
287 item.cancel_token = CancellationToken::new();
288 Self::sync_recovery(item);
289 }
290 }
291
292 pub fn mark_complete(
293 &mut self,
294 id: u64,
295 success: bool,
296 error: Option<String>,
297 file_path: Option<String>,
298 file_size_bytes: Option<u64>,
299 ) {
300 let item = self.items.iter_mut().find(|i| i.id == id);
301 if let Some(item) = item {
302 if success {
303 item.status = QueueStatus::Complete { success: true };
304 item.percent = 100.0;
305 } else {
306 item.status = QueueStatus::Error {
307 message: error.unwrap_or_default(),
308 };
309 }
310 item.file_path = file_path;
311 item.file_size_bytes = file_size_bytes;
312 item.speed_bytes_per_sec = 0.0;
313 Self::sync_recovery(item);
314 }
315 }
316
317 pub fn mark_seeding(
318 &mut self,
319 id: u64,
320 file_path: Option<String>,
321 file_size_bytes: Option<u64>,
322 torrent_id: Option<usize>,
323 ) {
324 let item = self.items.iter_mut().find(|i| i.id == id);
325 if let Some(item) = item {
326 item.status = QueueStatus::Seeding;
327 item.percent = 100.0;
328 item.file_path = file_path;
329 item.file_size_bytes = file_size_bytes;
330 item.speed_bytes_per_sec = 0.0;
331 item.torrent_id = torrent_id;
332 Self::sync_recovery(item);
333 }
334 }
335
336 pub fn update_progress(
337 &mut self,
338 id: u64,
339 percent: f64,
340 speed: f64,
341 downloaded: u64,
342 total: Option<u64>,
343 torrent_id: Option<usize>,
344 ) {
345 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
346 item.percent = percent;
347 item.speed_bytes_per_sec = speed;
348 item.downloaded_bytes = downloaded;
349 if let Some(t) = total {
350 item.total_bytes = Some(t);
351 }
352 if torrent_id.is_some() && item.torrent_id.is_none() {
353 item.torrent_id = torrent_id;
354 }
355 }
356 }
357
358 pub fn pause(&mut self, id: u64) -> bool {
359 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
360 if item.status == QueueStatus::Active {
361 if item.platform != "magnet" {
362 item.cancel_token.cancel();
363 }
364 item.status = QueueStatus::Paused;
365 item.speed_bytes_per_sec = 0.0;
366 Self::sync_recovery(item);
367 return true;
368 }
369 }
370 false
371 }
372
373 pub fn resume(&mut self, id: u64) -> bool {
374 let item = self.items.iter_mut().find(|i| i.id == id);
375 if let Some(item) = item {
376 if item.status == QueueStatus::Paused {
377 if item.platform == "magnet" {
378 item.status = QueueStatus::Active;
379 } else {
380 item.status = QueueStatus::Queued;
381 item.cancel_token = CancellationToken::new();
382 }
383 Self::sync_recovery(item);
384 return true;
385 }
386 }
387 false
388 }
389
390 pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
391 let result = self.cancel_inner(id);
392 if result.0 {
393 crate::core::manager::recovery::remove(id);
394 }
395 result
396 }
397
398 fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
399 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
400 match &item.status {
401 QueueStatus::Active => {
402 item.cancel_token.cancel();
403 item.status = QueueStatus::Error {
404 message: "Cancelled".to_string(),
405 };
406 item.speed_bytes_per_sec = 0.0;
407 Self::sync_recovery(item);
408 return (true, None);
409 }
410 QueueStatus::Seeding => {
411 let tid = item.torrent_id;
412 item.status = QueueStatus::Error {
413 message: "Cancelled".to_string(),
414 };
415 item.speed_bytes_per_sec = 0.0;
416 Self::sync_recovery(item);
417 return (true, tid);
418 }
419 QueueStatus::Paused => {
420 item.cancel_token.cancel();
421 let tid = if item.platform == "magnet" {
422 item.torrent_id
423 } else {
424 None
425 };
426 item.status = QueueStatus::Error {
427 message: "Cancelled".to_string(),
428 };
429 item.speed_bytes_per_sec = 0.0;
430 Self::sync_recovery(item);
431 return (true, tid);
432 }
433 QueueStatus::Queued => {
434 item.status = QueueStatus::Error {
435 message: "Cancelled".to_string(),
436 };
437 Self::sync_recovery(item);
438 return (true, None);
439 }
440 _ => {}
441 }
442 }
443 (false, None)
444 }
445
446 pub fn retry(&mut self, id: u64) -> bool {
447 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
448 if matches!(item.status, QueueStatus::Error { .. }) {
449 item.status = QueueStatus::Queued;
450 item.cancel_token = CancellationToken::new();
451 item.percent = 0.0;
452 item.speed_bytes_per_sec = 0.0;
453 item.downloaded_bytes = 0;
454 item.file_path = None;
455 item.file_size_bytes = None;
456 Self::sync_recovery(item);
457 return true;
458 }
459 }
460 false
461 }
462
463 pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
464 let result = self.remove_inner(id);
465 if result.is_some() {
466 crate::core::manager::recovery::remove(id);
467 }
468 result
469 }
470
471 fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
472 if let Some(pos) = self.items.iter().position(|i| i.id == id) {
473 let item = &self.items[pos];
474 if item.status == QueueStatus::Active {
475 item.cancel_token.cancel();
476 }
477 if item.status == QueueStatus::Paused && item.platform == "magnet" {
478 item.cancel_token.cancel();
479 }
480 let torrent_id = if item.status == QueueStatus::Seeding
481 || (item.status == QueueStatus::Paused && item.platform == "magnet")
482 {
483 item.torrent_id
484 } else {
485 None
486 };
487 self.items.remove(pos);
488 return Some(torrent_id);
489 }
490 None
491 }
492
493 pub fn clear_finished(&mut self) {
494 let to_remove: Vec<u64> = self
495 .items
496 .iter()
497 .filter(|i| {
498 matches!(
499 i.status,
500 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
501 )
502 })
503 .map(|i| i.id)
504 .collect();
505 for id in &to_remove {
506 crate::core::manager::recovery::remove(*id);
507 }
508 self.items.retain(|i| {
509 !matches!(
510 i.status,
511 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
512 )
513 });
514 }
515
516 pub fn get_state(&self) -> Vec<QueueItemInfo> {
517 self.items.iter().map(|i| i.to_info()).collect()
518 }
519
520 pub fn has_url(&self, url: &str) -> bool {
521 self.items.iter().any(|i| {
522 i.url == url
523 && matches!(
524 i.status,
525 QueueStatus::Queued
526 | QueueStatus::Active
527 | QueueStatus::Paused
528 | QueueStatus::Seeding
529 )
530 })
531 }
532}
533
534pub struct ProgressThrottle {
535 last_emit: std::time::Instant,
536 min_interval: std::time::Duration,
537}
538
539impl ProgressThrottle {
540 pub fn new(min_interval_ms: u64) -> Self {
541 Self {
542 last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
543 min_interval: std::time::Duration::from_millis(min_interval_ms),
544 }
545 }
546
547 pub fn should_emit(&mut self) -> bool {
548 let now = std::time::Instant::now();
549 if now.duration_since(self.last_emit) >= self.min_interval {
550 self.last_emit = now;
551 true
552 } else {
553 false
554 }
555 }
556}
557
558pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
559 let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
560 if n.is_multiple_of(10) {
561 tracing::debug!("[perf] emit_queue_state called {} times", n);
562 }
563 if let Some(reporter) = reporter {
564 reporter.on_queue_update(state);
565 }
566}
567
568pub fn emit_queue_state(queue: &DownloadQueue) {
569 let state = queue.get_state();
570 emit_queue_state_from_state(&queue.reporter, state);
571}
572
573struct ActiveJobSlot {
575 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
576 item_id: u64,
577 armed: bool,
578}
579
580impl ActiveJobSlot {
581 fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
582 Self {
583 queue,
584 item_id,
585 armed: true,
586 }
587 }
588
589 fn disarm(mut self) {
590 self.armed = false;
591 }
592}
593
594impl Drop for ActiveJobSlot {
595 fn drop(&mut self) {
596 if !self.armed {
597 return;
598 }
599 let queue = self.queue.clone();
600 let item_id = self.item_id;
601 tokio::spawn(async move {
602 let state = {
603 let mut q = queue.lock().await;
604 let still_active = q
605 .items
606 .iter()
607 .find(|i| i.id == item_id)
608 .map(|i| i.status == QueueStatus::Active)
609 .unwrap_or(false);
610 if !still_active {
611 return;
612 }
613 tracing::warn!(
614 "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
615 item_id
616 );
617 q.mark_complete(
618 item_id,
619 false,
620 Some("Download interrupted".to_string()),
621 None,
622 None,
623 );
624 q.get_state()
625 };
626 let reporter = { queue.lock().await.reporter.clone() };
627 emit_queue_state_from_state(&reporter, state);
628 tokio::spawn(try_start_next(queue));
629 });
630 }
631}
632
633pub fn spawn_download(
634 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
635 item_id: u64,
636) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
637 Box::pin(async move {
638 let _timer_start = std::time::Instant::now();
639 let slot = ActiveJobSlot::new(queue.clone(), item_id);
640 tokio::spawn(spawn_download_inner(queue.clone(), item_id));
641 slot.disarm();
642 tracing::debug!(
643 "[perf] spawn_download {} took {:?}",
644 item_id,
645 _timer_start.elapsed()
646 );
647 })
648}
649
650struct DownloadContext {
651 url: String,
652 output_dir: String,
653 download_mode: Option<String>,
654 quality: Option<String>,
655 video_format: Option<String>,
656 audio_format: Option<String>,
657 audio_quality: Option<String>,
658 download_subtitles: Option<bool>,
659 format_id: Option<String>,
660 referer: Option<String>,
661 extra_headers: Option<std::collections::HashMap<String, String>>,
662 page_url: Option<String>,
663 user_agent: Option<String>,
664 cancel_token: tokio_util::sync::CancellationToken,
665 media_info: Option<crate::models::media::MediaInfo>,
666 platform_name: String,
667 downloader: std::sync::Arc<dyn crate::platforms::traits::PlatformDownloader>,
668 ytdlp_path: Option<std::path::PathBuf>,
669 from_hotkey: bool,
670}
671
672async fn extract_download_context(
673 queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
674 item_id: u64,
675) -> Option<DownloadContext> {
676 let q = queue.lock().await;
677 let item = q.items.iter().find(|i| i.id == item_id)?;
678 Some(DownloadContext {
679 url: item.url.clone(),
680 output_dir: item.output_dir.clone(),
681 download_mode: item.download_mode.clone(),
682 quality: item.quality.clone(),
683 video_format: item.video_format.clone(),
684 audio_format: item.audio_format.clone(),
685 audio_quality: item.audio_quality.clone(),
686 download_subtitles: item.download_subtitles,
687 format_id: item.format_id.clone(),
688 referer: item.referer.clone(),
689 extra_headers: item.extra_headers.clone(),
690 page_url: item.page_url.clone(),
691 user_agent: item.user_agent.clone(),
692 cancel_token: item.cancel_token.clone(),
693 media_info: item.media_info.clone(),
694 platform_name: item.platform.clone(),
695 downloader: item.downloader.clone(),
696 ytdlp_path: item.ytdlp_path.clone(),
697 from_hotkey: item.from_hotkey,
698 })
699}
700
701async fn prepare_media_info(
702 queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
703 item_id: u64,
704 ctx: &DownloadContext,
705 reporter: &Option<crate::core::traits::SharedReporter>,
706) -> Option<crate::models::media::MediaInfo> {
707 let info_start = std::time::Instant::now();
708 let info = match &ctx.media_info {
709 Some(i) => {
710 tracing::info!(
711 "[queue] info for {} from cache/pre-fetched in {:?}",
712 item_id,
713 info_start.elapsed()
714 );
715 i.clone()
716 }
717 None => {
718 tracing::debug!(
719 "[perf] spawn_download_inner {}: media_info is None, fetching info",
720 item_id
721 );
722 if let Some(r) = reporter {
723 r.on_progress(
724 item_id,
725 crate::core::events::QueueItemProgress {
726 id: item_id,
727 title: ctx.url.clone(),
728 platform: ctx.platform_name.clone(),
729 percent: 0.0,
730 speed_bytes_per_sec: 0.0,
731 downloaded_bytes: 0,
732 total_bytes: None,
733 phase: "fetching_info".to_string(),
734 },
735 );
736 }
737
738 let info_result = tokio::time::timeout(
739 std::time::Duration::from_secs(60),
740 fetch_and_cache_info(&ctx.url, &*ctx.downloader, &ctx.platform_name),
741 )
742 .await;
743
744 match info_result {
745 Ok(Ok(i)) => i,
746 Ok(Err(e)) => {
747 let state = {
748 let mut q = queue.lock().await;
749 q.mark_complete(item_id, false, Some(e.to_string()), None, None);
750 q.get_state()
751 };
752 emit_queue_state_from_state(reporter, state);
753 tokio::spawn(try_start_next(queue.clone()));
754 return None;
755 }
756 Err(_) => {
757 tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
758 let state = {
759 let mut q = queue.lock().await;
760 q.mark_complete(
761 item_id,
762 false,
763 Some("Timed out fetching video info".to_string()),
764 None,
765 None,
766 );
767 q.get_state()
768 };
769 emit_queue_state_from_state(reporter, state);
770 tokio::spawn(try_start_next(queue.clone()));
771 return None;
772 }
773 }
774 }
775 };
776 tracing::info!(
777 "[queue] info fetch for {} took {:?}",
778 item_id,
779 info_start.elapsed()
780 );
781
782 let state = {
783 let mut q = queue.lock().await;
784 if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
785 item.title = info.title.clone();
786 item.total_bytes = info.file_size_bytes;
787 let fc = if info.media_type == crate::models::media::MediaType::Carousel
788 || info.media_type == crate::models::media::MediaType::Playlist
789 {
790 info.available_qualities.len() as u32
791 } else {
792 1
793 };
794 item.file_count = Some(fc);
795 item.media_info = Some(info.clone());
796 }
797 q.get_state()
798 };
799 emit_queue_state_from_state(reporter, state);
800
801 if let Some(r) = reporter {
802 r.on_progress(
803 item_id,
804 crate::core::events::QueueItemProgress {
805 id: item_id,
806 title: info.title.clone(),
807 platform: ctx.platform_name.clone(),
808 percent: 0.5,
809 speed_bytes_per_sec: 0.0,
810 downloaded_bytes: 0,
811 total_bytes: info.file_size_bytes,
812 phase: "starting".to_string(),
813 },
814 );
815 }
816
817 Some(info)
818}
819
820fn build_download_options(
821 ctx: &DownloadContext,
822) -> (
823 crate::models::media::DownloadOptions,
824 std::sync::Arc<tokio::sync::Mutex<Option<usize>>>,
825) {
826 let settings = crate::models::settings::AppSettings::load_from_disk();
827 let tmpl = settings.download.filename_template.clone();
828 let mut final_output_dir = std::path::PathBuf::from(&ctx.output_dir);
829 if settings.download.organize_by_platform {
830 final_output_dir = final_output_dir.join(&ctx.platform_name);
831 }
832 let torrent_id_slot = std::sync::Arc::new(tokio::sync::Mutex::new(None));
833 let opts = crate::models::media::DownloadOptions {
834 quality: ctx
835 .quality
836 .clone()
837 .or_else(|| Some(settings.download.video_quality.clone())),
838 video_format: ctx
839 .video_format
840 .clone()
841 .or_else(|| Some(settings.download.video_format.clone())),
842 audio_format: ctx
843 .audio_format
844 .clone()
845 .or_else(|| Some(settings.download.audio_format.clone())),
846 audio_quality: ctx
847 .audio_quality
848 .clone()
849 .or_else(|| Some(settings.download.audio_quality.clone())),
850 output_dir: final_output_dir,
851 filename_template: Some(tmpl),
852 download_subtitles: ctx
853 .download_subtitles
854 .unwrap_or(settings.download.download_subtitles),
855 include_auto_subtitles: settings.download.include_auto_subtitles,
856 download_mode: ctx.download_mode.clone(),
857 format_id: ctx.format_id.clone(),
858 referer: ctx.referer.clone(),
859 extra_headers: ctx.extra_headers.clone(),
860 page_url: ctx.page_url.clone(),
861 user_agent: ctx.user_agent.clone(),
862 cancel_token: ctx.cancel_token.clone(),
863 concurrent_fragments: settings.advanced.concurrent_fragments,
864 ytdlp_path: ctx.ytdlp_path.clone(),
865 torrent_listen_port: Some(settings.advanced.torrent_listen_port),
866 torrent_id_slot: Some(torrent_id_slot.clone()),
867 };
868 (opts, torrent_id_slot)
869}
870
871async fn handle_download_result(
872 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
873 item_id: u64,
874 ctx: DownloadContext,
875 info: crate::models::media::MediaInfo,
876 reporter: Option<crate::core::traits::SharedReporter>,
877 result: anyhow::Result<crate::models::media::DownloadResult>,
878) {
879 let settings = crate::models::settings::AppSettings::load_from_disk();
880 match result {
881 Ok(dl) => {
882 if settings.download.embed_metadata
883 && ctx.platform_name != "magnet"
884 && crate::core::ffmpeg::is_ffmpeg_available().await
885 {
886 let metadata = crate::core::ffmpeg::MetadataEmbed {
887 title: Some(info.title.clone()),
888 artist: Some(info.author.clone()),
889 thumbnail_url: info.thumbnail_url.clone(),
890 ..Default::default()
891 };
892 if let Err(e) = crate::core::ffmpeg::embed_metadata(
893 &dl.file_path,
894 &metadata,
895 settings.download.embed_thumbnail,
896 shared_http_client(),
897 )
898 .await
899 {
900 tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
901 }
902 }
903
904 if ctx.from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
905 #[cfg(not(target_os = "android"))]
906 {
907 match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
908 Ok(()) => {
909 tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
910 }
911 Err(e) => {
912 tracing::warn!("[clipboard] failed to copy file: {}", e);
913 }
914 }
915 }
916 }
917
918 let state = {
919 let mut q = queue.lock().await;
920 if ctx.platform_name == "magnet" && dl.torrent_id.is_some() {
921 q.mark_seeding(
922 item_id,
923 Some(dl.file_path.to_string_lossy().to_string()),
924 Some(dl.file_size_bytes),
925 dl.torrent_id,
926 );
927 } else {
928 q.mark_complete(
929 item_id,
930 true,
931 None,
932 Some(dl.file_path.to_string_lossy().to_string()),
933 Some(dl.file_size_bytes),
934 );
935 }
936 q.get_state()
937 };
938 if let Some(r) = &reporter {
939 r.on_complete(
940 item_id,
941 Some(dl.file_path.to_string_lossy().to_string()),
942 Some(dl.file_size_bytes),
943 );
944 }
945 emit_queue_state_from_state(&reporter, state);
946 }
947 Err(e) => {
948 let raw_err = format!("{}", e);
949 let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
950 let user_msg = if category != "unknown" {
951 format!("{} ({})", hint, raw_err)
952 } else {
953 raw_err.clone()
954 };
955 tracing::error!(
956 "Download error '{}' [{}]: {}",
957 ctx.platform_name,
958 category,
959 raw_err
960 );
961 let state = {
962 let mut q = queue.lock().await;
963 q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
964 q.get_state()
965 };
966 if let Some(r) = &reporter {
967 r.on_error(item_id, user_msg);
968 }
969 emit_queue_state_from_state(&reporter, state);
970 }
971 }
972}
973
974async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
975 tracing::info!("[queue] download {} started", item_id);
976
977 let reporter = { queue.lock().await.reporter.clone() };
978
979 if let Some(r) = &reporter {
980 r.on_progress(
981 item_id,
982 crate::core::events::QueueItemProgress {
983 id: item_id,
984 title: "".to_string(),
985 platform: "".to_string(),
986 percent: 0.0,
987 speed_bytes_per_sec: 0.0,
988 downloaded_bytes: 0,
989 total_bytes: None,
990 phase: "preparing".to_string(),
991 },
992 );
993 }
994
995 let ctx = match extract_download_context(&queue, item_id).await {
996 Some(c) => c,
997 None => return,
998 };
999
1000 let info = match prepare_media_info(&queue, item_id, &ctx, &reporter).await {
1001 Some(i) => i,
1002 None => return,
1003 };
1004
1005 let (opts, torrent_id_slot) = build_download_options(&ctx);
1006
1007 let total_bytes = info.file_size_bytes;
1008 let item_title = info.title.clone();
1009 let item_platform = ctx.platform_name.clone();
1010 let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(32);
1011
1012 let reporter_progress = reporter.clone();
1013 let queue_progress = queue.clone();
1014 let torrent_id_slot_progress = torrent_id_slot.clone();
1015 let progress_forwarder = tokio::spawn(async move {
1016 let mut last_bytes: u64 = 0;
1017 let mut last_time = std::time::Instant::now();
1018 let mut throttle = ProgressThrottle::new(250);
1019 let mut current_speed: f64 = 0.0;
1020
1021 while let Some(percent) = rx.recv().await {
1022 if !throttle.should_emit() && percent < 100.0 {
1023 continue;
1024 }
1025
1026 let now = std::time::Instant::now();
1027 let clamped = percent.max(0.0);
1028 let downloaded_bytes = total_bytes
1029 .map(|total| (clamped / 100.0 * total as f64) as u64)
1030 .unwrap_or(0);
1031
1032 if total_bytes.is_some() && downloaded_bytes > last_bytes {
1033 let dt = now.duration_since(last_time).as_secs_f64();
1034 if dt > 0.1 {
1035 let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
1036 current_speed = if current_speed > 0.0 {
1037 current_speed * 0.7 + instant_speed * 0.3
1038 } else {
1039 instant_speed
1040 };
1041 }
1042 }
1043
1044 last_bytes = downloaded_bytes;
1045 last_time = now;
1046
1047 let phase = match percent {
1048 p if p < -1.5 => "connecting",
1049 p if p < -0.5 => "starting",
1050 p if p > 0.0 => "downloading",
1051 _ => "starting",
1052 };
1053
1054 {
1055 let mut q = queue_progress.lock().await;
1056 let tid = { *torrent_id_slot_progress.lock().await };
1057 q.update_progress(
1058 item_id,
1059 clamped,
1060 current_speed,
1061 downloaded_bytes,
1062 total_bytes,
1063 tid,
1064 );
1065 }
1066
1067 if let Some(r) = &reporter_progress {
1068 r.on_progress(
1069 item_id,
1070 crate::core::events::QueueItemProgress {
1071 id: item_id,
1072 title: item_title.clone(),
1073 platform: item_platform.clone(),
1074 percent: clamped,
1075 speed_bytes_per_sec: current_speed,
1076 downloaded_bytes,
1077 total_bytes,
1078 phase: phase.to_string(),
1079 },
1080 );
1081 }
1082 }
1083 });
1084
1085 if let Some(ua) = opts.user_agent.clone() {
1086 crate::core::ytdlp::register_ext_user_agent(ctx.url.clone(), ua);
1087 }
1088 if let Some(hdrs) = opts.extra_headers.clone() {
1089 crate::core::ytdlp::register_ext_headers(ctx.url.clone(), hdrs);
1090 }
1091
1092 if !crate::core::ffmpeg::is_ffmpeg_available().await {
1094 let rep_ref = reporter.as_ref().map(|r| r.as_ref());
1095 match crate::core::dependencies::ensure_ffmpeg(rep_ref).await {
1096 Ok(path) => {
1097 tracing::info!("[ffmpeg] auto-installed to {:?}", path);
1098 crate::core::ffmpeg::reset_ffmpeg_available_cache();
1099 }
1100 Err(e) => tracing::warn!("[ffmpeg] auto-install failed: {}", e),
1101 }
1102 }
1103
1104 let dl_start = std::time::Instant::now();
1105 let dl_future = async {
1106 tokio::select! {
1107 r = ctx.downloader.download(&info, &opts, tx) => r,
1108 _ = ctx.cancel_token.cancelled() => {
1109 Err(anyhow::anyhow!("Download cancelled"))
1110 }
1111 }
1112 };
1113 let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
1114 .scope(item_id, dl_future)
1115 .await;
1116 crate::core::ytdlp::clear_ext_user_agent(&ctx.url);
1117 crate::core::ytdlp::clear_ext_headers(&ctx.url);
1118 tracing::info!(
1119 "[queue] download {} completed in {:?}",
1120 item_id,
1121 dl_start.elapsed()
1122 );
1123
1124 let _ = progress_forwarder.await;
1125
1126 let was_paused = {
1127 let q = queue.lock().await;
1128 q.items
1129 .iter()
1130 .find(|i| i.id == item_id)
1131 .map(|i| i.status == QueueStatus::Paused)
1132 .unwrap_or(false)
1133 };
1134
1135 if was_paused {
1136 let state = {
1137 let q = queue.lock().await;
1138 q.get_state()
1139 };
1140 emit_queue_state_from_state(&reporter, state);
1141 tokio::spawn(try_start_next(queue));
1142 return;
1143 }
1144
1145 handle_download_result(queue.clone(), item_id, ctx, info, reporter, result).await;
1146
1147 tokio::spawn(try_start_next(queue));
1148}
1149
1150pub async fn fetch_and_cache_info(
1151 url: &str,
1152 downloader: &dyn PlatformDownloader,
1153 platform: &str,
1154) -> anyhow::Result<MediaInfo> {
1155 {
1156 let cache = info_cache().lock().await;
1157 if let Some(entry) = cache.get(url) {
1158 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1159 tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1160 return Ok(entry.info.clone());
1161 }
1162 }
1163 }
1164
1165 let url_lock = {
1166 let mut map = in_flight_map().lock().await;
1167 map.entry(url.to_string())
1168 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1169 .clone()
1170 };
1171 let _guard = url_lock.lock().await;
1172
1173 {
1174 let cache = info_cache().lock().await;
1175 if let Some(entry) = cache.get(url) {
1176 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1177 tracing::debug!(
1178 "[perf] fetch_and_cache_info: dedup cache hit for {}",
1179 platform
1180 );
1181 return Ok(entry.info.clone());
1182 }
1183 }
1184 }
1185
1186 tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1187 let mut info = downloader.get_media_info(url).await?;
1188
1189 if is_generic_title(&info.title) {
1190 let name = crate::core::random_names::get_random_name();
1191 info.title = format!("video_{}", name);
1192 }
1193
1194 let mut cache = info_cache().lock().await;
1195 cache.insert(
1196 url.to_string(),
1197 CachedInfo {
1198 info: info.clone(),
1199 cached_at: std::time::Instant::now(),
1200 },
1201 );
1202 if cache.len() > 50 {
1203 cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1204 }
1205 Ok(info)
1206}
1207
1208pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1209 let cache = info_cache().lock().await;
1210 cache
1211 .get(url)
1212 .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1213 .map(|entry| entry.info.clone())
1214}
1215
1216pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1217 prefetch_info_with_emit(url, downloader, platform, None).await;
1218}
1219
1220pub async fn prefetch_info_with_emit(
1221 url: &str,
1222 downloader: &dyn PlatformDownloader,
1223 platform: &str,
1224 reporter: Option<SharedReporter>,
1225) {
1226 let _timer_start = std::time::Instant::now();
1227 tracing::debug!("[perf] prefetch_info: started");
1228 match fetch_and_cache_info(url, downloader, platform).await {
1229 Ok(info) => {
1230 tracing::debug!(
1231 "[perf] prefetch_info: completed in {:?} — {}",
1232 _timer_start.elapsed(),
1233 info.title
1234 );
1235 if let Some(r) = reporter {
1236 r.on_media_preview(
1237 url.to_string(),
1238 info.title.clone(),
1239 info.author.clone(),
1240 info.thumbnail_url.clone(),
1241 info.duration_seconds,
1242 );
1243 }
1244 }
1245 Err(e) => tracing::warn!(
1246 "[perf] prefetch_info: failed in {:?} — {}",
1247 _timer_start.elapsed(),
1248 e
1249 ),
1250 }
1251}
1252
1253pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1254 let _timer_start = std::time::Instant::now();
1255 let (next_ids, stagger, state_to_emit, reporter) = {
1256 let mut q = queue.lock().await;
1257 let ids = q.next_queued_ids();
1258 for nid in &ids {
1259 q.mark_active(*nid);
1260 }
1261 let state = if !ids.is_empty() {
1262 Some(q.get_state())
1263 } else {
1264 None
1265 };
1266 (ids, q.stagger_delay_ms, state, q.reporter.clone())
1267 };
1268
1269 if let Some(state) = state_to_emit {
1270 emit_queue_state_from_state(&reporter, state);
1271 }
1272
1273 let batch_size = next_ids.len();
1274 for (i, nid) in next_ids.into_iter().enumerate() {
1275 if let Some(r) = &reporter {
1276 r.on_progress(
1277 nid,
1278 crate::core::events::QueueItemProgress {
1279 id: nid,
1280 title: String::new(),
1281 platform: String::new(),
1282 percent: 0.0,
1283 speed_bytes_per_sec: 0.0,
1284 downloaded_bytes: 0,
1285 total_bytes: None,
1286 phase: "queued_starting".to_string(),
1287 },
1288 );
1289 }
1290
1291 if i > 0 {
1292 let item_platform = {
1293 let q = queue.lock().await;
1294 q.items
1295 .iter()
1296 .find(|item| item.id == nid)
1297 .map(|item| item.platform.clone())
1298 };
1299 let delay_ms = if item_platform.as_deref() == Some("youtube") {
1300 2000
1301 } else if batch_size > 3 {
1302 stagger.max(1000)
1303 } else {
1304 stagger
1305 };
1306 if delay_ms > 0 {
1307 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1308 }
1309 }
1310 let queue_c = queue.clone();
1311 tokio::spawn(async move {
1312 tokio::spawn(spawn_download(queue_c, nid));
1313 });
1314 }
1315 tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1316}
1317
1318fn is_generic_title(title: &str) -> bool {
1319 let t = title.to_lowercase();
1320 let t = t.trim();
1321 t.is_empty()
1322 || t == "video"
1323 || t == "media"
1324 || t == "untitled"
1325 || t == "unknown"
1326 || t.starts_with("video [video]")
1327 || t.starts_with("media [media]")
1328}