1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use crate::core::traits::SharedReporter;
13use crate::models::media::MediaInfo;
14use crate::models::queue::{QueueItemInfo, QueueStatus};
15use crate::platforms::traits::PlatformDownloader;
16
17fn shared_http_client() -> &'static reqwest::Client {
18 static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
19 CLIENT.get_or_init(|| {
20 crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
21 .build()
22 .unwrap_or_default()
23 })
24}
25
26struct CachedInfo {
27 info: MediaInfo,
28 cached_at: std::time::Instant,
29}
30
31static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
32
33fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
34 INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
35}
36
37const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
38
39static IN_FLIGHT_FETCHES: OnceLock<
40 tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
41> = OnceLock::new();
42
43fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
44 IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct MediaPreviewEvent {
49 pub url: String,
50 pub title: String,
51 pub author: String,
52 pub thumbnail_url: Option<String>,
53 pub duration_seconds: Option<f64>,
54}
55
56pub struct QueueItem {
57 pub id: u64,
58 pub url: String,
59 pub platform: String,
60 pub title: String,
61 pub status: QueueStatus,
62 pub cancel_token: CancellationToken,
63 pub output_dir: String,
64 pub download_mode: Option<String>,
65 pub quality: Option<String>,
66 pub format_id: Option<String>,
67 pub referer: Option<String>,
68 pub extra_headers: Option<std::collections::HashMap<String, String>>,
69 pub page_url: Option<String>,
70 pub user_agent: Option<String>,
71 pub percent: f64,
72 pub speed_bytes_per_sec: f64,
73 pub downloaded_bytes: u64,
74 pub total_bytes: Option<u64>,
75 pub file_path: Option<String>,
76 pub file_size_bytes: Option<u64>,
77 pub file_count: Option<u32>,
78 pub media_info: Option<MediaInfo>,
79 pub downloader: Arc<dyn PlatformDownloader>,
80 pub ytdlp_path: Option<PathBuf>,
81 pub from_hotkey: bool,
82 pub torrent_id: Option<usize>,
83 pub phase: String,
84}
85
86impl QueueItem {
87 pub fn to_info(&self) -> QueueItemInfo {
88 QueueItemInfo {
89 id: self.id,
90 url: self.url.clone(),
91 platform: self.platform.clone(),
92 title: self.title.clone(),
93 status: self.status.clone(),
94 percent: self.percent,
95 speed_bytes_per_sec: self.speed_bytes_per_sec,
96 downloaded_bytes: self.downloaded_bytes,
97 total_bytes: self.total_bytes,
98 phase: self.phase.clone(),
99 file_path: self.file_path.clone(),
100 file_size_bytes: self.file_size_bytes,
101 file_count: self.file_count,
102 thumbnail_url: self
103 .media_info
104 .as_ref()
105 .and_then(|m| m.thumbnail_url.clone()),
106 }
107 }
108}
109
110pub struct DownloadQueue {
111 pub items: Vec<QueueItem>,
112 pub max_concurrent: u32,
113 pub stagger_delay_ms: u64,
114 pub reporter: Option<SharedReporter>,
115}
116
117impl DownloadQueue {
118 pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
119 Self {
120 items: Vec::new(),
121 max_concurrent,
122 stagger_delay_ms: 150,
123 reporter,
124 }
125 }
126
127 pub fn set_reporter(&mut self, reporter: SharedReporter) {
128 self.reporter = Some(reporter);
129 }
130
131 pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
132 let recovery_items = crate::core::manager::recovery::list();
133 for item in recovery_items {
134 let downloader = registry
135 .find_platform(&item.url)
136 .or_else(|| registry.find_by_name(&item.platform));
137
138 if let Some(dl) = downloader {
139 let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
140 100.0
141 } else {
142 0.0
143 };
144
145 let q_item = QueueItem {
146 id: item.id,
147 url: item.url,
148 platform: item.platform,
149 title: item.title,
150 status: item.status,
151 cancel_token: CancellationToken::new(),
152 output_dir: item.output_dir,
153 download_mode: item.download_mode,
154 quality: item.quality,
155 format_id: item.format_id,
156 referer: item.referer,
157 extra_headers: None,
158 page_url: None,
159 user_agent: None,
160 percent,
161 speed_bytes_per_sec: 0.0,
162 downloaded_bytes: 0,
163 total_bytes: item.file_size_bytes,
164 file_path: item.file_path,
165 file_size_bytes: item.file_size_bytes,
166 file_count: None,
167 media_info: None,
168 downloader: dl,
169 ytdlp_path: None,
170 from_hotkey: false,
171 torrent_id: None,
172 phase: "Restored".to_string(),
173 };
174 self.items.push(q_item);
175 }
176 }
177 }
178
179 fn sync_recovery(item: &QueueItem) {
180 crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
181 id: item.id,
182 url: item.url.clone(),
183 title: item.title.clone(),
184 platform: item.platform.clone(),
185 output_dir: item.output_dir.clone(),
186 status: item.status.clone(),
187 download_mode: item.download_mode.clone(),
188 quality: item.quality.clone(),
189 format_id: item.format_id.clone(),
190 referer: item.referer.clone(),
191 file_path: item.file_path.clone(),
192 file_size_bytes: item.file_size_bytes,
193 });
194 }
195
196 #[allow(clippy::too_many_arguments)]
197 pub fn enqueue(
198 &mut self,
199 id: u64,
200 url: String,
201 platform: String,
202 title: String,
203 output_dir: String,
204 download_mode: Option<String>,
205 quality: Option<String>,
206 format_id: Option<String>,
207 referer: Option<String>,
208 extra_headers: Option<std::collections::HashMap<String, String>>,
209 page_url: Option<String>,
210 user_agent: Option<String>,
211 media_info: Option<MediaInfo>,
212 total_bytes: Option<u64>,
213 file_count: Option<u32>,
214 downloader: Arc<dyn PlatformDownloader>,
215 ytdlp_path: Option<PathBuf>,
216 from_hotkey: bool,
217 ) {
218 let item = QueueItem {
219 id,
220 url,
221 platform,
222 title,
223 status: QueueStatus::Queued,
224 cancel_token: CancellationToken::new(),
225 output_dir,
226 download_mode,
227 quality,
228 format_id,
229 referer,
230 extra_headers,
231 page_url,
232 user_agent,
233 percent: 0.0,
234 speed_bytes_per_sec: 0.0,
235 downloaded_bytes: 0,
236 total_bytes,
237 file_path: None,
238 file_size_bytes: None,
239 file_count,
240 media_info,
241 downloader,
242 ytdlp_path,
243 from_hotkey,
244 torrent_id: None,
245 phase: "Queued".to_string(),
246 };
247 self.items.push(item);
248 Self::sync_recovery(self.items.last().unwrap());
249 }
250
251 pub fn active_count(&self) -> u32 {
252 self.items
253 .iter()
254 .filter(|i| i.status == QueueStatus::Active)
255 .count() as u32
256 }
257
258 pub fn next_queued_ids(&self) -> Vec<u64> {
259 let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
260 self.items
261 .iter()
262 .filter(|i| i.status == QueueStatus::Queued)
263 .take(slots)
264 .map(|i| i.id)
265 .collect()
266 }
267
268 pub fn mark_active(&mut self, id: u64) {
269 let item = self.items.iter_mut().find(|i| i.id == id);
270 if let Some(item) = item {
271 item.status = QueueStatus::Active;
272 item.cancel_token = CancellationToken::new();
273 Self::sync_recovery(item);
274 }
275 }
276
277 pub fn mark_complete(
278 &mut self,
279 id: u64,
280 success: bool,
281 error: Option<String>,
282 file_path: Option<String>,
283 file_size_bytes: Option<u64>,
284 ) {
285 let item = self.items.iter_mut().find(|i| i.id == id);
286 if let Some(item) = item {
287 if success {
288 item.status = QueueStatus::Complete { success: true };
289 item.percent = 100.0;
290 } else {
291 item.status = QueueStatus::Error {
292 message: error.unwrap_or_default(),
293 };
294 }
295 item.file_path = file_path;
296 item.file_size_bytes = file_size_bytes;
297 item.speed_bytes_per_sec = 0.0;
298 Self::sync_recovery(item);
299 }
300 }
301
302 pub fn mark_seeding(
303 &mut self,
304 id: u64,
305 file_path: Option<String>,
306 file_size_bytes: Option<u64>,
307 torrent_id: Option<usize>,
308 ) {
309 let item = self.items.iter_mut().find(|i| i.id == id);
310 if let Some(item) = item {
311 item.status = QueueStatus::Seeding;
312 item.percent = 100.0;
313 item.file_path = file_path;
314 item.file_size_bytes = file_size_bytes;
315 item.speed_bytes_per_sec = 0.0;
316 item.torrent_id = torrent_id;
317 Self::sync_recovery(item);
318 }
319 }
320
321 pub fn update_progress(
322 &mut self,
323 id: u64,
324 percent: f64,
325 speed: f64,
326 downloaded: u64,
327 total: Option<u64>,
328 torrent_id: Option<usize>,
329 ) {
330 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
331 item.percent = percent;
332 item.speed_bytes_per_sec = speed;
333 item.downloaded_bytes = downloaded;
334 if let Some(t) = total {
335 item.total_bytes = Some(t);
336 }
337 if torrent_id.is_some() && item.torrent_id.is_none() {
338 item.torrent_id = torrent_id;
339 }
340 }
341 }
342
343 pub fn pause(&mut self, id: u64) -> bool {
344 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
345 if item.status == QueueStatus::Active {
346 if item.platform != "magnet" {
347 item.cancel_token.cancel();
348 }
349 item.status = QueueStatus::Paused;
350 item.speed_bytes_per_sec = 0.0;
351 Self::sync_recovery(item);
352 return true;
353 }
354 }
355 false
356 }
357
358 pub fn resume(&mut self, id: u64) -> bool {
359 let item = self.items.iter_mut().find(|i| i.id == id);
360 if let Some(item) = item {
361 if item.status == QueueStatus::Paused {
362 if item.platform == "magnet" {
363 item.status = QueueStatus::Active;
364 } else {
365 item.status = QueueStatus::Queued;
366 item.cancel_token = CancellationToken::new();
367 }
368 Self::sync_recovery(item);
369 return true;
370 }
371 }
372 false
373 }
374
375 pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
376 let result = self.cancel_inner(id);
377 if result.0 {
378 crate::core::manager::recovery::remove(id);
379 }
380 result
381 }
382
383 fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
384 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
385 match &item.status {
386 QueueStatus::Active => {
387 item.cancel_token.cancel();
388 item.status = QueueStatus::Error {
389 message: "Cancelled".to_string(),
390 };
391 item.speed_bytes_per_sec = 0.0;
392 Self::sync_recovery(item);
393 return (true, None);
394 }
395 QueueStatus::Seeding => {
396 let tid = item.torrent_id;
397 item.status = QueueStatus::Error {
398 message: "Cancelled".to_string(),
399 };
400 item.speed_bytes_per_sec = 0.0;
401 Self::sync_recovery(item);
402 return (true, tid);
403 }
404 QueueStatus::Paused => {
405 item.cancel_token.cancel();
406 let tid = if item.platform == "magnet" {
407 item.torrent_id
408 } else {
409 None
410 };
411 item.status = QueueStatus::Error {
412 message: "Cancelled".to_string(),
413 };
414 item.speed_bytes_per_sec = 0.0;
415 Self::sync_recovery(item);
416 return (true, tid);
417 }
418 QueueStatus::Queued => {
419 item.status = QueueStatus::Error {
420 message: "Cancelled".to_string(),
421 };
422 Self::sync_recovery(item);
423 return (true, None);
424 }
425 _ => {}
426 }
427 }
428 (false, None)
429 }
430
431 pub fn retry(&mut self, id: u64) -> bool {
432 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
433 if matches!(item.status, QueueStatus::Error { .. }) {
434 item.status = QueueStatus::Queued;
435 item.cancel_token = CancellationToken::new();
436 item.percent = 0.0;
437 item.speed_bytes_per_sec = 0.0;
438 item.downloaded_bytes = 0;
439 item.file_path = None;
440 item.file_size_bytes = None;
441 Self::sync_recovery(item);
442 return true;
443 }
444 }
445 false
446 }
447
448 pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
449 let result = self.remove_inner(id);
450 if result.is_some() {
451 crate::core::manager::recovery::remove(id);
452 }
453 result
454 }
455
456 fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
457 if let Some(pos) = self.items.iter().position(|i| i.id == id) {
458 let item = &self.items[pos];
459 if item.status == QueueStatus::Active {
460 item.cancel_token.cancel();
461 }
462 if item.status == QueueStatus::Paused && item.platform == "magnet" {
463 item.cancel_token.cancel();
464 }
465 let torrent_id = if item.status == QueueStatus::Seeding
466 || (item.status == QueueStatus::Paused && item.platform == "magnet")
467 {
468 item.torrent_id
469 } else {
470 None
471 };
472 self.items.remove(pos);
473 return Some(torrent_id);
474 }
475 None
476 }
477
478 pub fn clear_finished(&mut self) {
479 let to_remove: Vec<u64> = self
480 .items
481 .iter()
482 .filter(|i| {
483 matches!(
484 i.status,
485 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
486 )
487 })
488 .map(|i| i.id)
489 .collect();
490 for id in &to_remove {
491 crate::core::manager::recovery::remove(*id);
492 }
493 self.items.retain(|i| {
494 !matches!(
495 i.status,
496 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
497 )
498 });
499 }
500
501 pub fn get_state(&self) -> Vec<QueueItemInfo> {
502 self.items.iter().map(|i| i.to_info()).collect()
503 }
504
505 pub fn has_url(&self, url: &str) -> bool {
506 self.items.iter().any(|i| {
507 i.url == url
508 && matches!(
509 i.status,
510 QueueStatus::Queued
511 | QueueStatus::Active
512 | QueueStatus::Paused
513 | QueueStatus::Seeding
514 )
515 })
516 }
517}
518
519pub struct ProgressThrottle {
520 last_emit: std::time::Instant,
521 min_interval: std::time::Duration,
522}
523
524impl ProgressThrottle {
525 pub fn new(min_interval_ms: u64) -> Self {
526 Self {
527 last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
528 min_interval: std::time::Duration::from_millis(min_interval_ms),
529 }
530 }
531
532 pub fn should_emit(&mut self) -> bool {
533 let now = std::time::Instant::now();
534 if now.duration_since(self.last_emit) >= self.min_interval {
535 self.last_emit = now;
536 true
537 } else {
538 false
539 }
540 }
541}
542
543pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
544 let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
545 if n.is_multiple_of(10) {
546 tracing::debug!("[perf] emit_queue_state called {} times", n);
547 }
548 if let Some(reporter) = reporter {
549 reporter.on_queue_update(state);
550 }
551}
552
553pub fn emit_queue_state(queue: &DownloadQueue) {
554 let state = queue.get_state();
555 emit_queue_state_from_state(&queue.reporter, state);
556}
557
558struct ActiveJobSlot {
560 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
561 item_id: u64,
562 armed: bool,
563}
564
565impl ActiveJobSlot {
566 fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
567 Self {
568 queue,
569 item_id,
570 armed: true,
571 }
572 }
573
574 fn disarm(mut self) {
575 self.armed = false;
576 }
577}
578
579impl Drop for ActiveJobSlot {
580 fn drop(&mut self) {
581 if !self.armed {
582 return;
583 }
584 let queue = self.queue.clone();
585 let item_id = self.item_id;
586 tokio::spawn(async move {
587 let state = {
588 let mut q = queue.lock().await;
589 let still_active = q
590 .items
591 .iter()
592 .find(|i| i.id == item_id)
593 .map(|i| i.status == QueueStatus::Active)
594 .unwrap_or(false);
595 if !still_active {
596 return;
597 }
598 tracing::warn!(
599 "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
600 item_id
601 );
602 q.mark_complete(
603 item_id,
604 false,
605 Some("Download interrupted".to_string()),
606 None,
607 None,
608 );
609 q.get_state()
610 };
611 let reporter = { queue.lock().await.reporter.clone() };
612 emit_queue_state_from_state(&reporter, state);
613 tokio::spawn(try_start_next(queue));
614 });
615 }
616}
617
618pub fn spawn_download(
619 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
620 item_id: u64,
621) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
622 Box::pin(async move {
623 let _timer_start = std::time::Instant::now();
624 let slot = ActiveJobSlot::new(queue.clone(), item_id);
625 tokio::spawn(spawn_download_inner(queue.clone(), item_id));
626 slot.disarm();
627 tracing::debug!(
628 "[perf] spawn_download {} took {:?}",
629 item_id,
630 _timer_start.elapsed()
631 );
632 })
633}
634
635async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
636 tracing::info!("[queue] download {} started", item_id);
637
638 let reporter = { queue.lock().await.reporter.clone() };
639
640 if let Some(r) = &reporter {
641 r.on_progress(
642 item_id,
643 crate::core::events::QueueItemProgress {
644 id: item_id,
645 title: "".to_string(),
646 platform: "".to_string(),
647 percent: 0.0,
648 speed_bytes_per_sec: 0.0,
649 downloaded_bytes: 0,
650 total_bytes: None,
651 phase: "preparing".to_string(),
652 },
653 );
654 }
655
656 let (
657 url,
658 output_dir,
659 download_mode,
660 quality,
661 format_id,
662 referer,
663 extra_headers,
664 page_url,
665 user_agent,
666 cancel_token,
667 media_info,
668 platform_name,
669 downloader,
670 ytdlp_path,
671 from_hotkey,
672 ) = {
673 let q = queue.lock().await;
674 let item = match q.items.iter().find(|i| i.id == item_id) {
675 Some(i) => i,
676 None => return,
677 };
678 (
679 item.url.clone(),
680 item.output_dir.clone(),
681 item.download_mode.clone(),
682 item.quality.clone(),
683 item.format_id.clone(),
684 item.referer.clone(),
685 item.extra_headers.clone(),
686 item.page_url.clone(),
687 item.user_agent.clone(),
688 item.cancel_token.clone(),
689 item.media_info.clone(),
690 item.platform.clone(),
691 item.downloader.clone(),
692 item.ytdlp_path.clone(),
693 item.from_hotkey,
694 )
695 };
696
697 let info_start = std::time::Instant::now();
698 let info = match media_info {
699 Some(i) => {
700 tracing::info!(
701 "[queue] info for {} from cache/pre-fetched in {:?}",
702 item_id,
703 info_start.elapsed()
704 );
705 i
706 }
707 None => {
708 tracing::debug!(
709 "[perf] spawn_download_inner {}: media_info is None, fetching info",
710 item_id
711 );
712 if let Some(r) = &reporter {
713 r.on_progress(
714 item_id,
715 crate::core::events::QueueItemProgress {
716 id: item_id,
717 title: url.clone(),
718 platform: platform_name.clone(),
719 percent: 0.0,
720 speed_bytes_per_sec: 0.0,
721 downloaded_bytes: 0,
722 total_bytes: None,
723 phase: "fetching_info".to_string(),
724 },
725 );
726 }
727
728 let info_result = tokio::time::timeout(
729 std::time::Duration::from_secs(60),
730 fetch_and_cache_info(&url, &*downloader, &platform_name),
731 )
732 .await;
733
734 match info_result {
735 Ok(Ok(i)) => i,
736 Ok(Err(e)) => {
737 let state = {
738 let mut q = queue.lock().await;
739 q.mark_complete(item_id, false, Some(e.to_string()), None, None);
740 q.get_state()
741 };
742 emit_queue_state_from_state(&reporter, state);
743 tokio::spawn(try_start_next(queue));
744 return;
745 }
746 Err(_) => {
747 tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
748 let state = {
749 let mut q = queue.lock().await;
750 q.mark_complete(
751 item_id,
752 false,
753 Some("Timed out fetching video info".to_string()),
754 None,
755 None,
756 );
757 q.get_state()
758 };
759 emit_queue_state_from_state(&reporter, state);
760 tokio::spawn(try_start_next(queue));
761 return;
762 }
763 }
764 }
765 };
766 tracing::info!(
767 "[queue] info fetch for {} took {:?}",
768 item_id,
769 info_start.elapsed()
770 );
771
772 let state = {
773 let mut q = queue.lock().await;
774 if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
775 item.title = info.title.clone();
776 item.total_bytes = info.file_size_bytes;
777 let fc = if info.media_type == crate::models::media::MediaType::Carousel
778 || info.media_type == crate::models::media::MediaType::Playlist
779 {
780 info.available_qualities.len() as u32
781 } else {
782 1
783 };
784 item.file_count = Some(fc);
785 item.media_info = Some(info.clone());
786 }
787 q.get_state()
788 };
789 emit_queue_state_from_state(&reporter, state);
790
791 if let Some(r) = &reporter {
792 r.on_progress(
793 item_id,
794 crate::core::events::QueueItemProgress {
795 id: item_id,
796 title: info.title.clone(),
797 platform: platform_name.clone(),
798 percent: 0.5,
799 speed_bytes_per_sec: 0.0,
800 downloaded_bytes: 0,
801 total_bytes: info.file_size_bytes,
802 phase: "starting".to_string(),
803 },
804 );
805 }
806
807 let settings = crate::models::settings::AppSettings::load_from_disk();
808 let tmpl = settings.download.filename_template.clone();
809 let mut final_output_dir = std::path::PathBuf::from(&output_dir);
810 if settings.download.organize_by_platform {
811 final_output_dir = final_output_dir.join(&platform_name);
812 }
813 let torrent_id_slot = Arc::new(tokio::sync::Mutex::new(None));
814 let opts = crate::models::media::DownloadOptions {
815 quality: quality.or_else(|| Some(settings.download.video_quality.clone())),
816 output_dir: final_output_dir,
817 filename_template: Some(tmpl),
818 download_subtitles: settings.download.download_subtitles,
819 include_auto_subtitles: settings.download.include_auto_subtitles,
820 download_mode,
821 format_id,
822 referer,
823 extra_headers,
824 page_url,
825 user_agent,
826 cancel_token: cancel_token.clone(),
827 concurrent_fragments: settings.advanced.concurrent_fragments,
828 ytdlp_path,
829 torrent_listen_port: Some(settings.advanced.torrent_listen_port),
830 torrent_id_slot: Some(torrent_id_slot.clone()),
831 };
832
833 let total_bytes = info.file_size_bytes;
834 let item_title = info.title.clone();
835 let item_platform = platform_name.clone();
836 let (tx, mut rx) = mpsc::channel::<f64>(32);
837
838 let reporter_progress = reporter.clone();
839 let queue_progress = queue.clone();
840 let torrent_id_slot_progress = torrent_id_slot.clone();
841 let progress_forwarder = tokio::spawn(async move {
842 let mut last_bytes: u64 = 0;
843 let mut last_time = std::time::Instant::now();
844 let mut throttle = ProgressThrottle::new(250);
845 let mut current_speed: f64 = 0.0;
846
847 while let Some(percent) = rx.recv().await {
848 if !throttle.should_emit() && percent < 100.0 {
849 continue;
850 }
851
852 let now = std::time::Instant::now();
853 let clamped = percent.max(0.0);
854 let downloaded_bytes = total_bytes
855 .map(|total| (clamped / 100.0 * total as f64) as u64)
856 .unwrap_or(0);
857
858 if total_bytes.is_some() && downloaded_bytes > last_bytes {
859 let dt = now.duration_since(last_time).as_secs_f64();
860 if dt > 0.1 {
861 let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
862 current_speed = if current_speed > 0.0 {
863 current_speed * 0.7 + instant_speed * 0.3
864 } else {
865 instant_speed
866 };
867 }
868 }
869
870 last_bytes = downloaded_bytes;
871 last_time = now;
872
873 let phase = match percent {
874 p if p < -1.5 => "connecting",
875 p if p < -0.5 => "starting",
876 p if p > 0.0 => "downloading",
877 _ => "starting",
878 };
879
880 {
881 let mut q = queue_progress.lock().await;
882 let tid = { *torrent_id_slot_progress.lock().await };
883 q.update_progress(
884 item_id,
885 clamped,
886 current_speed,
887 downloaded_bytes,
888 total_bytes,
889 tid,
890 );
891 }
892
893 if let Some(r) = &reporter_progress {
894 r.on_progress(
895 item_id,
896 crate::core::events::QueueItemProgress {
897 id: item_id,
898 title: item_title.clone(),
899 platform: item_platform.clone(),
900 percent: clamped,
901 speed_bytes_per_sec: current_speed,
902 downloaded_bytes,
903 total_bytes,
904 phase: phase.to_string(),
905 },
906 );
907 }
908 }
909 });
910
911 if let Some(ua) = opts.user_agent.clone() {
912 crate::core::ytdlp::register_ext_user_agent(url.clone(), ua);
913 }
914 if let Some(hdrs) = opts.extra_headers.clone() {
915 crate::core::ytdlp::register_ext_headers(url.clone(), hdrs);
916 }
917
918 let dl_start = std::time::Instant::now();
919 let dl_future = async {
920 tokio::select! {
921 r = downloader.download(&info, &opts, tx) => r,
922 _ = cancel_token.cancelled() => {
923 Err(anyhow::anyhow!("Download cancelado"))
924 }
925 }
926 };
927 let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
928 .scope(item_id, dl_future)
929 .await;
930 crate::core::ytdlp::clear_ext_user_agent(&url);
931 crate::core::ytdlp::clear_ext_headers(&url);
932 tracing::info!(
933 "[queue] download {} completed in {:?}",
934 item_id,
935 dl_start.elapsed()
936 );
937
938 let _ = progress_forwarder.await;
939
940 let was_paused = {
941 let q = queue.lock().await;
942 q.items
943 .iter()
944 .find(|i| i.id == item_id)
945 .map(|i| i.status == QueueStatus::Paused)
946 .unwrap_or(false)
947 };
948
949 if was_paused {
950 let state = {
951 let q = queue.lock().await;
952 q.get_state()
953 };
954 emit_queue_state_from_state(&reporter, state);
955 tokio::spawn(try_start_next(queue));
956 return;
957 }
958
959 match result {
960 Ok(dl) => {
961 if settings.download.embed_metadata
962 && platform_name != "magnet"
963 && crate::core::ffmpeg::is_ffmpeg_available().await
964 {
965 let metadata = crate::core::ffmpeg::MetadataEmbed {
966 title: Some(info.title.clone()),
967 artist: Some(info.author.clone()),
968 thumbnail_url: info.thumbnail_url.clone(),
969 ..Default::default()
970 };
971 if let Err(e) = crate::core::ffmpeg::embed_metadata(
972 &dl.file_path,
973 &metadata,
974 settings.download.embed_thumbnail,
975 shared_http_client(),
976 )
977 .await
978 {
979 tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
980 }
981 }
982
983 if from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
984 #[cfg(not(target_os = "android"))]
985 {
986 match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
987 Ok(()) => {
988 tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
989 }
992 Err(e) => {
993 tracing::warn!("[clipboard] failed to copy file: {}", e);
994 }
995 }
996 }
997 }
998
999 let state = {
1000 let mut q = queue.lock().await;
1001 if platform_name == "magnet" && dl.torrent_id.is_some() {
1002 q.mark_seeding(
1003 item_id,
1004 Some(dl.file_path.to_string_lossy().to_string()),
1005 Some(dl.file_size_bytes),
1006 dl.torrent_id,
1007 );
1008 } else {
1009 q.mark_complete(
1010 item_id,
1011 true,
1012 None,
1013 Some(dl.file_path.to_string_lossy().to_string()),
1014 Some(dl.file_size_bytes),
1015 );
1016 }
1017 q.get_state()
1018 };
1019 if let Some(r) = &reporter {
1020 r.on_complete(
1021 item_id,
1022 Some(dl.file_path.to_string_lossy().to_string()),
1023 Some(dl.file_size_bytes),
1024 );
1025 }
1026 emit_queue_state_from_state(&reporter, state);
1027 }
1028 Err(e) => {
1029 let raw_err = e.to_string();
1030 let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
1031 let user_msg = if category != "unknown" {
1032 format!("{} ({})", hint, raw_err)
1033 } else {
1034 raw_err.clone()
1035 };
1036 tracing::error!(
1037 "Download error '{}' [{}]: {}",
1038 platform_name,
1039 category,
1040 raw_err
1041 );
1042 let state = {
1043 let mut q = queue.lock().await;
1044 q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
1045 q.get_state()
1046 };
1047 if let Some(r) = &reporter {
1048 r.on_error(item_id, user_msg);
1049 }
1050 emit_queue_state_from_state(&reporter, state);
1051 }
1052 }
1053
1054 tokio::spawn(try_start_next(queue));
1055}
1056
1057pub async fn fetch_and_cache_info(
1058 url: &str,
1059 downloader: &dyn PlatformDownloader,
1060 platform: &str,
1061) -> anyhow::Result<MediaInfo> {
1062 {
1063 let cache = info_cache().lock().await;
1064 if let Some(entry) = cache.get(url) {
1065 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1066 tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1067 return Ok(entry.info.clone());
1068 }
1069 }
1070 }
1071
1072 let url_lock = {
1073 let mut map = in_flight_map().lock().await;
1074 map.entry(url.to_string())
1075 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1076 .clone()
1077 };
1078 let _guard = url_lock.lock().await;
1079
1080 {
1081 let cache = info_cache().lock().await;
1082 if let Some(entry) = cache.get(url) {
1083 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1084 tracing::debug!(
1085 "[perf] fetch_and_cache_info: dedup cache hit for {}",
1086 platform
1087 );
1088 return Ok(entry.info.clone());
1089 }
1090 }
1091 }
1092
1093 tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1094 let mut info = downloader.get_media_info(url).await?;
1095
1096 if is_generic_title(&info.title) {
1097 let name = crate::core::random_names::get_random_name();
1098 info.title = format!("video_{}", name);
1099 }
1100
1101 let mut cache = info_cache().lock().await;
1102 cache.insert(
1103 url.to_string(),
1104 CachedInfo {
1105 info: info.clone(),
1106 cached_at: std::time::Instant::now(),
1107 },
1108 );
1109 if cache.len() > 50 {
1110 cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1111 }
1112 Ok(info)
1113}
1114
1115pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1116 let cache = info_cache().lock().await;
1117 cache
1118 .get(url)
1119 .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1120 .map(|entry| entry.info.clone())
1121}
1122
1123pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1124 prefetch_info_with_emit(url, downloader, platform, None).await;
1125}
1126
1127pub async fn prefetch_info_with_emit(
1128 url: &str,
1129 downloader: &dyn PlatformDownloader,
1130 platform: &str,
1131 reporter: Option<SharedReporter>,
1132) {
1133 let _timer_start = std::time::Instant::now();
1134 tracing::debug!("[perf] prefetch_info: started");
1135 match fetch_and_cache_info(url, downloader, platform).await {
1136 Ok(info) => {
1137 tracing::debug!(
1138 "[perf] prefetch_info: completed in {:?} — {}",
1139 _timer_start.elapsed(),
1140 info.title
1141 );
1142 if let Some(r) = reporter {
1143 r.on_media_preview(
1144 url.to_string(),
1145 info.title.clone(),
1146 info.author.clone(),
1147 info.thumbnail_url.clone(),
1148 info.duration_seconds,
1149 );
1150 }
1151 }
1152 Err(e) => tracing::warn!(
1153 "[perf] prefetch_info: failed in {:?} — {}",
1154 _timer_start.elapsed(),
1155 e
1156 ),
1157 }
1158}
1159
1160pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1161 let _timer_start = std::time::Instant::now();
1162 let (next_ids, stagger, state_to_emit, reporter) = {
1163 let mut q = queue.lock().await;
1164 let ids = q.next_queued_ids();
1165 for nid in &ids {
1166 q.mark_active(*nid);
1167 }
1168 let state = if !ids.is_empty() {
1169 Some(q.get_state())
1170 } else {
1171 None
1172 };
1173 (ids, q.stagger_delay_ms, state, q.reporter.clone())
1174 };
1175
1176 if let Some(state) = state_to_emit {
1177 emit_queue_state_from_state(&reporter, state);
1178 }
1179
1180 let batch_size = next_ids.len();
1181 for (i, nid) in next_ids.into_iter().enumerate() {
1182 if let Some(r) = &reporter {
1183 r.on_progress(
1184 nid,
1185 crate::core::events::QueueItemProgress {
1186 id: nid,
1187 title: String::new(),
1188 platform: String::new(),
1189 percent: 0.0,
1190 speed_bytes_per_sec: 0.0,
1191 downloaded_bytes: 0,
1192 total_bytes: None,
1193 phase: "queued_starting".to_string(),
1194 },
1195 );
1196 }
1197
1198 if i > 0 {
1199 let item_platform = {
1200 let q = queue.lock().await;
1201 q.items
1202 .iter()
1203 .find(|item| item.id == nid)
1204 .map(|item| item.platform.clone())
1205 };
1206 let delay_ms = if item_platform.as_deref() == Some("youtube") {
1207 2000
1208 } else if batch_size > 3 {
1209 stagger.max(1000)
1210 } else {
1211 stagger
1212 };
1213 if delay_ms > 0 {
1214 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1215 }
1216 }
1217 let queue_c = queue.clone();
1218 tokio::spawn(async move {
1219 tokio::spawn(spawn_download(queue_c, nid));
1220 });
1221 }
1222 tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1223}
1224
1225fn is_generic_title(title: &str) -> bool {
1226 let t = title.to_lowercase();
1227 let t = t.trim();
1228 t.is_empty()
1229 || t == "video"
1230 || t == "media"
1231 || t == "untitled"
1232 || t == "unknown"
1233 || t.starts_with("video [video]")
1234 || t.starts_with("media [media]")
1235}