1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio_util::sync::CancellationToken;
10
11use crate::core::traits::SharedReporter;
12use crate::models::media::MediaInfo;
13use crate::models::queue::{QueueItemInfo, QueueStatus};
14use crate::platforms::traits::PlatformDownloader;
15
16fn shared_http_client() -> &'static reqwest::Client {
17 static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
18 CLIENT.get_or_init(|| {
19 crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
20 .build()
21 .unwrap_or_default()
22 })
23}
24
25struct CachedInfo {
26 info: MediaInfo,
27 cached_at: std::time::Instant,
28}
29
30static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
31
32fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
33 INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
34}
35
36const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
37
38static IN_FLIGHT_FETCHES: OnceLock<
39 tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
40> = OnceLock::new();
41
42fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
43 IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct MediaPreviewEvent {
48 pub url: String,
49 pub title: String,
50 pub author: String,
51 pub thumbnail_url: Option<String>,
52 pub duration_seconds: Option<f64>,
53}
54
55pub struct QueueItem {
56 pub id: u64,
57 pub url: String,
58 pub platform: String,
59 pub title: String,
60 pub status: QueueStatus,
61 pub cancel_token: CancellationToken,
62 pub output_dir: String,
63 pub download_mode: Option<String>,
64 pub quality: Option<String>,
65 pub format_id: Option<String>,
66 pub referer: Option<String>,
67 pub extra_headers: Option<std::collections::HashMap<String, String>>,
68 pub page_url: Option<String>,
69 pub user_agent: Option<String>,
70 pub percent: f64,
71 pub speed_bytes_per_sec: f64,
72 pub downloaded_bytes: u64,
73 pub total_bytes: Option<u64>,
74 pub file_path: Option<String>,
75 pub file_size_bytes: Option<u64>,
76 pub file_count: Option<u32>,
77 pub media_info: Option<MediaInfo>,
78 pub downloader: Arc<dyn PlatformDownloader>,
79 pub ytdlp_path: Option<PathBuf>,
80 pub from_hotkey: bool,
81 pub torrent_id: Option<usize>,
82 pub download_subtitles: Option<bool>,
83 pub phase: String,
84}
85
86impl QueueItem {
87 pub fn to_info(&self) -> QueueItemInfo {
88 QueueItemInfo {
89 id: self.id,
90 url: self.url.clone(),
91 platform: self.platform.clone(),
92 title: self.title.clone(),
93 status: self.status.clone(),
94 percent: self.percent,
95 speed_bytes_per_sec: self.speed_bytes_per_sec,
96 downloaded_bytes: self.downloaded_bytes,
97 total_bytes: self.total_bytes,
98 phase: self.phase.clone(),
99 file_path: self.file_path.clone(),
100 file_size_bytes: self.file_size_bytes,
101 file_count: self.file_count,
102 thumbnail_url: self
103 .media_info
104 .as_ref()
105 .and_then(|m| m.thumbnail_url.clone()),
106 }
107 }
108}
109
110pub struct DownloadQueue {
111 pub items: Vec<QueueItem>,
112 pub max_concurrent: u32,
113 pub stagger_delay_ms: u64,
114 pub reporter: Option<SharedReporter>,
115}
116
117impl DownloadQueue {
118 pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
119 Self {
120 items: Vec::new(),
121 max_concurrent,
122 stagger_delay_ms: 150,
123 reporter,
124 }
125 }
126
127 pub fn set_reporter(&mut self, reporter: SharedReporter) {
128 self.reporter = Some(reporter);
129 }
130
131 pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
132 let recovery_items = crate::core::manager::recovery::list();
133 for item in recovery_items {
134 let downloader = registry
135 .find_platform(&item.url)
136 .or_else(|| registry.find_by_name(&item.platform));
137
138 if let Some(dl) = downloader {
139 let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
140 100.0
141 } else {
142 0.0
143 };
144
145 let q_item = QueueItem {
146 id: item.id,
147 url: item.url,
148 platform: item.platform,
149 title: item.title,
150 status: item.status,
151 cancel_token: CancellationToken::new(),
152 output_dir: item.output_dir,
153 download_mode: item.download_mode,
154 quality: item.quality,
155 format_id: item.format_id,
156 referer: item.referer,
157 extra_headers: None,
158 page_url: None,
159 user_agent: None,
160 percent,
161 speed_bytes_per_sec: 0.0,
162 downloaded_bytes: 0,
163 total_bytes: item.file_size_bytes,
164 file_path: item.file_path,
165 file_size_bytes: item.file_size_bytes,
166 file_count: None,
167 media_info: None,
168 downloader: dl,
169 ytdlp_path: None,
170 from_hotkey: false,
171 torrent_id: None,
172 download_subtitles: None,
173 phase: "Restored".to_string(),
174 };
175 self.items.push(q_item);
176 }
177 }
178 }
179
180 fn sync_recovery(item: &QueueItem) {
181 crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
182 id: item.id,
183 url: item.url.clone(),
184 title: item.title.clone(),
185 platform: item.platform.clone(),
186 output_dir: item.output_dir.clone(),
187 status: item.status.clone(),
188 download_mode: item.download_mode.clone(),
189 quality: item.quality.clone(),
190 format_id: item.format_id.clone(),
191 referer: item.referer.clone(),
192 file_path: item.file_path.clone(),
193 file_size_bytes: item.file_size_bytes,
194 });
195 }
196
197 #[allow(clippy::too_many_arguments)]
198 pub fn enqueue(
199 &mut self,
200 id: u64,
201 url: String,
202 platform: String,
203 title: String,
204 output_dir: String,
205 download_mode: Option<String>,
206 quality: Option<String>,
207 download_subtitles: Option<bool>,
208 format_id: Option<String>,
209 referer: Option<String>,
210 extra_headers: Option<std::collections::HashMap<String, String>>,
211 page_url: Option<String>,
212 user_agent: Option<String>,
213 media_info: Option<MediaInfo>,
214 total_bytes: Option<u64>,
215 file_count: Option<u32>,
216 downloader: Arc<dyn PlatformDownloader>,
217 ytdlp_path: Option<PathBuf>,
218 from_hotkey: bool,
219 ) {
220 let item = QueueItem {
221 id,
222 url,
223 platform,
224 title,
225 status: QueueStatus::Queued,
226 cancel_token: CancellationToken::new(),
227 output_dir,
228 download_mode,
229 quality,
230 format_id,
231 referer,
232 extra_headers,
233 page_url,
234 user_agent,
235 percent: 0.0,
236 speed_bytes_per_sec: 0.0,
237 downloaded_bytes: 0,
238 total_bytes,
239 file_path: None,
240 file_size_bytes: None,
241 file_count,
242 media_info,
243 downloader,
244 ytdlp_path,
245 from_hotkey,
246 torrent_id: None,
247 download_subtitles,
248 phase: "Queued".to_string(),
249 };
250 self.items.push(item);
251 Self::sync_recovery(self.items.last().unwrap());
252 }
253
254 pub fn active_count(&self) -> u32 {
255 self.items
256 .iter()
257 .filter(|i| i.status == QueueStatus::Active)
258 .count() as u32
259 }
260
261 pub fn next_queued_ids(&self) -> Vec<u64> {
262 let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
263 self.items
264 .iter()
265 .filter(|i| i.status == QueueStatus::Queued)
266 .take(slots)
267 .map(|i| i.id)
268 .collect()
269 }
270
271 pub fn mark_active(&mut self, id: u64) {
272 let item = self.items.iter_mut().find(|i| i.id == id);
273 if let Some(item) = item {
274 item.status = QueueStatus::Active;
275 item.cancel_token = CancellationToken::new();
276 Self::sync_recovery(item);
277 }
278 }
279
280 pub fn mark_complete(
281 &mut self,
282 id: u64,
283 success: bool,
284 error: Option<String>,
285 file_path: Option<String>,
286 file_size_bytes: Option<u64>,
287 ) {
288 let item = self.items.iter_mut().find(|i| i.id == id);
289 if let Some(item) = item {
290 if success {
291 item.status = QueueStatus::Complete { success: true };
292 item.percent = 100.0;
293 } else {
294 item.status = QueueStatus::Error {
295 message: error.unwrap_or_default(),
296 };
297 }
298 item.file_path = file_path;
299 item.file_size_bytes = file_size_bytes;
300 item.speed_bytes_per_sec = 0.0;
301 Self::sync_recovery(item);
302 }
303 }
304
305 pub fn mark_seeding(
306 &mut self,
307 id: u64,
308 file_path: Option<String>,
309 file_size_bytes: Option<u64>,
310 torrent_id: Option<usize>,
311 ) {
312 let item = self.items.iter_mut().find(|i| i.id == id);
313 if let Some(item) = item {
314 item.status = QueueStatus::Seeding;
315 item.percent = 100.0;
316 item.file_path = file_path;
317 item.file_size_bytes = file_size_bytes;
318 item.speed_bytes_per_sec = 0.0;
319 item.torrent_id = torrent_id;
320 Self::sync_recovery(item);
321 }
322 }
323
324 pub fn update_progress(
325 &mut self,
326 id: u64,
327 percent: f64,
328 speed: f64,
329 downloaded: u64,
330 total: Option<u64>,
331 torrent_id: Option<usize>,
332 ) {
333 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
334 item.percent = percent;
335 item.speed_bytes_per_sec = speed;
336 item.downloaded_bytes = downloaded;
337 if let Some(t) = total {
338 item.total_bytes = Some(t);
339 }
340 if torrent_id.is_some() && item.torrent_id.is_none() {
341 item.torrent_id = torrent_id;
342 }
343 }
344 }
345
346 pub fn pause(&mut self, id: u64) -> bool {
347 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
348 if item.status == QueueStatus::Active {
349 if item.platform != "magnet" {
350 item.cancel_token.cancel();
351 }
352 item.status = QueueStatus::Paused;
353 item.speed_bytes_per_sec = 0.0;
354 Self::sync_recovery(item);
355 return true;
356 }
357 }
358 false
359 }
360
361 pub fn resume(&mut self, id: u64) -> bool {
362 let item = self.items.iter_mut().find(|i| i.id == id);
363 if let Some(item) = item {
364 if item.status == QueueStatus::Paused {
365 if item.platform == "magnet" {
366 item.status = QueueStatus::Active;
367 } else {
368 item.status = QueueStatus::Queued;
369 item.cancel_token = CancellationToken::new();
370 }
371 Self::sync_recovery(item);
372 return true;
373 }
374 }
375 false
376 }
377
378 pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
379 let result = self.cancel_inner(id);
380 if result.0 {
381 crate::core::manager::recovery::remove(id);
382 }
383 result
384 }
385
386 fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
387 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
388 match &item.status {
389 QueueStatus::Active => {
390 item.cancel_token.cancel();
391 item.status = QueueStatus::Error {
392 message: "Cancelled".to_string(),
393 };
394 item.speed_bytes_per_sec = 0.0;
395 Self::sync_recovery(item);
396 return (true, None);
397 }
398 QueueStatus::Seeding => {
399 let tid = item.torrent_id;
400 item.status = QueueStatus::Error {
401 message: "Cancelled".to_string(),
402 };
403 item.speed_bytes_per_sec = 0.0;
404 Self::sync_recovery(item);
405 return (true, tid);
406 }
407 QueueStatus::Paused => {
408 item.cancel_token.cancel();
409 let tid = if item.platform == "magnet" {
410 item.torrent_id
411 } else {
412 None
413 };
414 item.status = QueueStatus::Error {
415 message: "Cancelled".to_string(),
416 };
417 item.speed_bytes_per_sec = 0.0;
418 Self::sync_recovery(item);
419 return (true, tid);
420 }
421 QueueStatus::Queued => {
422 item.status = QueueStatus::Error {
423 message: "Cancelled".to_string(),
424 };
425 Self::sync_recovery(item);
426 return (true, None);
427 }
428 _ => {}
429 }
430 }
431 (false, None)
432 }
433
434 pub fn retry(&mut self, id: u64) -> bool {
435 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
436 if matches!(item.status, QueueStatus::Error { .. }) {
437 item.status = QueueStatus::Queued;
438 item.cancel_token = CancellationToken::new();
439 item.percent = 0.0;
440 item.speed_bytes_per_sec = 0.0;
441 item.downloaded_bytes = 0;
442 item.file_path = None;
443 item.file_size_bytes = None;
444 Self::sync_recovery(item);
445 return true;
446 }
447 }
448 false
449 }
450
451 pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
452 let result = self.remove_inner(id);
453 if result.is_some() {
454 crate::core::manager::recovery::remove(id);
455 }
456 result
457 }
458
459 fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
460 if let Some(pos) = self.items.iter().position(|i| i.id == id) {
461 let item = &self.items[pos];
462 if item.status == QueueStatus::Active {
463 item.cancel_token.cancel();
464 }
465 if item.status == QueueStatus::Paused && item.platform == "magnet" {
466 item.cancel_token.cancel();
467 }
468 let torrent_id = if item.status == QueueStatus::Seeding
469 || (item.status == QueueStatus::Paused && item.platform == "magnet")
470 {
471 item.torrent_id
472 } else {
473 None
474 };
475 self.items.remove(pos);
476 return Some(torrent_id);
477 }
478 None
479 }
480
481 pub fn clear_finished(&mut self) {
482 let to_remove: Vec<u64> = self
483 .items
484 .iter()
485 .filter(|i| {
486 matches!(
487 i.status,
488 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
489 )
490 })
491 .map(|i| i.id)
492 .collect();
493 for id in &to_remove {
494 crate::core::manager::recovery::remove(*id);
495 }
496 self.items.retain(|i| {
497 !matches!(
498 i.status,
499 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
500 )
501 });
502 }
503
504 pub fn get_state(&self) -> Vec<QueueItemInfo> {
505 self.items.iter().map(|i| i.to_info()).collect()
506 }
507
508 pub fn has_url(&self, url: &str) -> bool {
509 self.items.iter().any(|i| {
510 i.url == url
511 && matches!(
512 i.status,
513 QueueStatus::Queued
514 | QueueStatus::Active
515 | QueueStatus::Paused
516 | QueueStatus::Seeding
517 )
518 })
519 }
520}
521
522pub struct ProgressThrottle {
523 last_emit: std::time::Instant,
524 min_interval: std::time::Duration,
525}
526
527impl ProgressThrottle {
528 pub fn new(min_interval_ms: u64) -> Self {
529 Self {
530 last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
531 min_interval: std::time::Duration::from_millis(min_interval_ms),
532 }
533 }
534
535 pub fn should_emit(&mut self) -> bool {
536 let now = std::time::Instant::now();
537 if now.duration_since(self.last_emit) >= self.min_interval {
538 self.last_emit = now;
539 true
540 } else {
541 false
542 }
543 }
544}
545
546pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
547 let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
548 if n.is_multiple_of(10) {
549 tracing::debug!("[perf] emit_queue_state called {} times", n);
550 }
551 if let Some(reporter) = reporter {
552 reporter.on_queue_update(state);
553 }
554}
555
556pub fn emit_queue_state(queue: &DownloadQueue) {
557 let state = queue.get_state();
558 emit_queue_state_from_state(&queue.reporter, state);
559}
560
561struct ActiveJobSlot {
563 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
564 item_id: u64,
565 armed: bool,
566}
567
568impl ActiveJobSlot {
569 fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
570 Self {
571 queue,
572 item_id,
573 armed: true,
574 }
575 }
576
577 fn disarm(mut self) {
578 self.armed = false;
579 }
580}
581
582impl Drop for ActiveJobSlot {
583 fn drop(&mut self) {
584 if !self.armed {
585 return;
586 }
587 let queue = self.queue.clone();
588 let item_id = self.item_id;
589 tokio::spawn(async move {
590 let state = {
591 let mut q = queue.lock().await;
592 let still_active = q
593 .items
594 .iter()
595 .find(|i| i.id == item_id)
596 .map(|i| i.status == QueueStatus::Active)
597 .unwrap_or(false);
598 if !still_active {
599 return;
600 }
601 tracing::warn!(
602 "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
603 item_id
604 );
605 q.mark_complete(
606 item_id,
607 false,
608 Some("Download interrupted".to_string()),
609 None,
610 None,
611 );
612 q.get_state()
613 };
614 let reporter = { queue.lock().await.reporter.clone() };
615 emit_queue_state_from_state(&reporter, state);
616 tokio::spawn(try_start_next(queue));
617 });
618 }
619}
620
621pub fn spawn_download(
622 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
623 item_id: u64,
624) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
625 Box::pin(async move {
626 let _timer_start = std::time::Instant::now();
627 let slot = ActiveJobSlot::new(queue.clone(), item_id);
628 tokio::spawn(spawn_download_inner(queue.clone(), item_id));
629 slot.disarm();
630 tracing::debug!(
631 "[perf] spawn_download {} took {:?}",
632 item_id,
633 _timer_start.elapsed()
634 );
635 })
636}
637
638struct DownloadContext {
639 url: String,
640 output_dir: String,
641 download_mode: Option<String>,
642 quality: Option<String>,
643 download_subtitles: Option<bool>,
644 format_id: Option<String>,
645 referer: Option<String>,
646 extra_headers: Option<std::collections::HashMap<String, String>>,
647 page_url: Option<String>,
648 user_agent: Option<String>,
649 cancel_token: tokio_util::sync::CancellationToken,
650 media_info: Option<crate::models::media::MediaInfo>,
651 platform_name: String,
652 downloader: std::sync::Arc<dyn crate::platforms::traits::PlatformDownloader>,
653 ytdlp_path: Option<std::path::PathBuf>,
654 from_hotkey: bool,
655}
656
657async fn extract_download_context(
658 queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
659 item_id: u64,
660) -> Option<DownloadContext> {
661 let q = queue.lock().await;
662 let item = q.items.iter().find(|i| i.id == item_id)?;
663 Some(DownloadContext {
664 url: item.url.clone(),
665 output_dir: item.output_dir.clone(),
666 download_mode: item.download_mode.clone(),
667 quality: item.quality.clone(),
668 download_subtitles: item.download_subtitles,
669 format_id: item.format_id.clone(),
670 referer: item.referer.clone(),
671 extra_headers: item.extra_headers.clone(),
672 page_url: item.page_url.clone(),
673 user_agent: item.user_agent.clone(),
674 cancel_token: item.cancel_token.clone(),
675 media_info: item.media_info.clone(),
676 platform_name: item.platform.clone(),
677 downloader: item.downloader.clone(),
678 ytdlp_path: item.ytdlp_path.clone(),
679 from_hotkey: item.from_hotkey,
680 })
681}
682
683async fn prepare_media_info(
684 queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
685 item_id: u64,
686 ctx: &DownloadContext,
687 reporter: &Option<crate::core::traits::SharedReporter>,
688) -> Option<crate::models::media::MediaInfo> {
689 let info_start = std::time::Instant::now();
690 let info = match &ctx.media_info {
691 Some(i) => {
692 tracing::info!(
693 "[queue] info for {} from cache/pre-fetched in {:?}",
694 item_id,
695 info_start.elapsed()
696 );
697 i.clone()
698 }
699 None => {
700 tracing::debug!(
701 "[perf] spawn_download_inner {}: media_info is None, fetching info",
702 item_id
703 );
704 if let Some(r) = reporter {
705 r.on_progress(
706 item_id,
707 crate::core::events::QueueItemProgress {
708 id: item_id,
709 title: ctx.url.clone(),
710 platform: ctx.platform_name.clone(),
711 percent: 0.0,
712 speed_bytes_per_sec: 0.0,
713 downloaded_bytes: 0,
714 total_bytes: None,
715 phase: "fetching_info".to_string(),
716 },
717 );
718 }
719
720 let info_result = tokio::time::timeout(
721 std::time::Duration::from_secs(60),
722 fetch_and_cache_info(&ctx.url, &*ctx.downloader, &ctx.platform_name),
723 )
724 .await;
725
726 match info_result {
727 Ok(Ok(i)) => i,
728 Ok(Err(e)) => {
729 let state = {
730 let mut q = queue.lock().await;
731 q.mark_complete(item_id, false, Some(e.to_string()), None, None);
732 q.get_state()
733 };
734 emit_queue_state_from_state(reporter, state);
735 tokio::spawn(try_start_next(queue.clone()));
736 return None;
737 }
738 Err(_) => {
739 tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
740 let state = {
741 let mut q = queue.lock().await;
742 q.mark_complete(
743 item_id,
744 false,
745 Some("Timed out fetching video info".to_string()),
746 None,
747 None,
748 );
749 q.get_state()
750 };
751 emit_queue_state_from_state(reporter, state);
752 tokio::spawn(try_start_next(queue.clone()));
753 return None;
754 }
755 }
756 }
757 };
758 tracing::info!(
759 "[queue] info fetch for {} took {:?}",
760 item_id,
761 info_start.elapsed()
762 );
763
764 let state = {
765 let mut q = queue.lock().await;
766 if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
767 item.title = info.title.clone();
768 item.total_bytes = info.file_size_bytes;
769 let fc = if info.media_type == crate::models::media::MediaType::Carousel
770 || info.media_type == crate::models::media::MediaType::Playlist
771 {
772 info.available_qualities.len() as u32
773 } else {
774 1
775 };
776 item.file_count = Some(fc);
777 item.media_info = Some(info.clone());
778 }
779 q.get_state()
780 };
781 emit_queue_state_from_state(reporter, state);
782
783 if let Some(r) = reporter {
784 r.on_progress(
785 item_id,
786 crate::core::events::QueueItemProgress {
787 id: item_id,
788 title: info.title.clone(),
789 platform: ctx.platform_name.clone(),
790 percent: 0.5,
791 speed_bytes_per_sec: 0.0,
792 downloaded_bytes: 0,
793 total_bytes: info.file_size_bytes,
794 phase: "starting".to_string(),
795 },
796 );
797 }
798
799 Some(info)
800}
801
802fn build_download_options(
803 ctx: &DownloadContext,
804) -> (
805 crate::models::media::DownloadOptions,
806 std::sync::Arc<tokio::sync::Mutex<Option<usize>>>,
807) {
808 let settings = crate::models::settings::AppSettings::load_from_disk();
809 let tmpl = settings.download.filename_template.clone();
810 let mut final_output_dir = std::path::PathBuf::from(&ctx.output_dir);
811 if settings.download.organize_by_platform {
812 final_output_dir = final_output_dir.join(&ctx.platform_name);
813 }
814 let torrent_id_slot = std::sync::Arc::new(tokio::sync::Mutex::new(None));
815 let opts = crate::models::media::DownloadOptions {
816 quality: ctx
817 .quality
818 .clone()
819 .or_else(|| Some(settings.download.video_quality.clone())),
820 output_dir: final_output_dir,
821 filename_template: Some(tmpl),
822 download_subtitles: ctx
823 .download_subtitles
824 .unwrap_or(settings.download.download_subtitles),
825 include_auto_subtitles: settings.download.include_auto_subtitles,
826 download_mode: ctx.download_mode.clone(),
827 format_id: ctx.format_id.clone(),
828 referer: ctx.referer.clone(),
829 extra_headers: ctx.extra_headers.clone(),
830 page_url: ctx.page_url.clone(),
831 user_agent: ctx.user_agent.clone(),
832 cancel_token: ctx.cancel_token.clone(),
833 concurrent_fragments: settings.advanced.concurrent_fragments,
834 ytdlp_path: ctx.ytdlp_path.clone(),
835 torrent_listen_port: Some(settings.advanced.torrent_listen_port),
836 torrent_id_slot: Some(torrent_id_slot.clone()),
837 };
838 (opts, torrent_id_slot)
839}
840
841async fn handle_download_result(
842 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
843 item_id: u64,
844 ctx: DownloadContext,
845 info: crate::models::media::MediaInfo,
846 reporter: Option<crate::core::traits::SharedReporter>,
847 result: anyhow::Result<crate::models::media::DownloadResult>,
848) {
849 let settings = crate::models::settings::AppSettings::load_from_disk();
850 match result {
851 Ok(dl) => {
852 if settings.download.embed_metadata
853 && ctx.platform_name != "magnet"
854 && crate::core::ffmpeg::is_ffmpeg_available().await
855 {
856 let metadata = crate::core::ffmpeg::MetadataEmbed {
857 title: Some(info.title.clone()),
858 artist: Some(info.author.clone()),
859 thumbnail_url: info.thumbnail_url.clone(),
860 ..Default::default()
861 };
862 if let Err(e) = crate::core::ffmpeg::embed_metadata(
863 &dl.file_path,
864 &metadata,
865 settings.download.embed_thumbnail,
866 shared_http_client(),
867 )
868 .await
869 {
870 tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
871 }
872 }
873
874 if ctx.from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
875 #[cfg(not(target_os = "android"))]
876 {
877 match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
878 Ok(()) => {
879 tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
880 }
881 Err(e) => {
882 tracing::warn!("[clipboard] failed to copy file: {}", e);
883 }
884 }
885 }
886 }
887
888 let state = {
889 let mut q = queue.lock().await;
890 if ctx.platform_name == "magnet" && dl.torrent_id.is_some() {
891 q.mark_seeding(
892 item_id,
893 Some(dl.file_path.to_string_lossy().to_string()),
894 Some(dl.file_size_bytes),
895 dl.torrent_id,
896 );
897 } else {
898 q.mark_complete(
899 item_id,
900 true,
901 None,
902 Some(dl.file_path.to_string_lossy().to_string()),
903 Some(dl.file_size_bytes),
904 );
905 }
906 q.get_state()
907 };
908 if let Some(r) = &reporter {
909 r.on_complete(
910 item_id,
911 Some(dl.file_path.to_string_lossy().to_string()),
912 Some(dl.file_size_bytes),
913 );
914 }
915 emit_queue_state_from_state(&reporter, state);
916 }
917 Err(e) => {
918 let raw_err = format!("{}", e);
919 let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
920 let user_msg = if category != "unknown" {
921 format!("{} ({})", hint, raw_err)
922 } else {
923 raw_err.clone()
924 };
925 tracing::error!(
926 "Download error '{}' [{}]: {}",
927 ctx.platform_name,
928 category,
929 raw_err
930 );
931 let state = {
932 let mut q = queue.lock().await;
933 q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
934 q.get_state()
935 };
936 if let Some(r) = &reporter {
937 r.on_error(item_id, user_msg);
938 }
939 emit_queue_state_from_state(&reporter, state);
940 }
941 }
942}
943
944async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
945 tracing::info!("[queue] download {} started", item_id);
946
947 let reporter = { queue.lock().await.reporter.clone() };
948
949 if let Some(r) = &reporter {
950 r.on_progress(
951 item_id,
952 crate::core::events::QueueItemProgress {
953 id: item_id,
954 title: "".to_string(),
955 platform: "".to_string(),
956 percent: 0.0,
957 speed_bytes_per_sec: 0.0,
958 downloaded_bytes: 0,
959 total_bytes: None,
960 phase: "preparing".to_string(),
961 },
962 );
963 }
964
965 let ctx = match extract_download_context(&queue, item_id).await {
966 Some(c) => c,
967 None => return,
968 };
969
970 let info = match prepare_media_info(&queue, item_id, &ctx, &reporter).await {
971 Some(i) => i,
972 None => return,
973 };
974
975 let (opts, torrent_id_slot) = build_download_options(&ctx);
976
977 let total_bytes = info.file_size_bytes;
978 let item_title = info.title.clone();
979 let item_platform = ctx.platform_name.clone();
980 let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(32);
981
982 let reporter_progress = reporter.clone();
983 let queue_progress = queue.clone();
984 let torrent_id_slot_progress = torrent_id_slot.clone();
985 let progress_forwarder = tokio::spawn(async move {
986 let mut last_bytes: u64 = 0;
987 let mut last_time = std::time::Instant::now();
988 let mut throttle = ProgressThrottle::new(250);
989 let mut current_speed: f64 = 0.0;
990
991 while let Some(percent) = rx.recv().await {
992 if !throttle.should_emit() && percent < 100.0 {
993 continue;
994 }
995
996 let now = std::time::Instant::now();
997 let clamped = percent.max(0.0);
998 let downloaded_bytes = total_bytes
999 .map(|total| (clamped / 100.0 * total as f64) as u64)
1000 .unwrap_or(0);
1001
1002 if total_bytes.is_some() && downloaded_bytes > last_bytes {
1003 let dt = now.duration_since(last_time).as_secs_f64();
1004 if dt > 0.1 {
1005 let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
1006 current_speed = if current_speed > 0.0 {
1007 current_speed * 0.7 + instant_speed * 0.3
1008 } else {
1009 instant_speed
1010 };
1011 }
1012 }
1013
1014 last_bytes = downloaded_bytes;
1015 last_time = now;
1016
1017 let phase = match percent {
1018 p if p < -1.5 => "connecting",
1019 p if p < -0.5 => "starting",
1020 p if p > 0.0 => "downloading",
1021 _ => "starting",
1022 };
1023
1024 {
1025 let mut q = queue_progress.lock().await;
1026 let tid = { *torrent_id_slot_progress.lock().await };
1027 q.update_progress(
1028 item_id,
1029 clamped,
1030 current_speed,
1031 downloaded_bytes,
1032 total_bytes,
1033 tid,
1034 );
1035 }
1036
1037 if let Some(r) = &reporter_progress {
1038 r.on_progress(
1039 item_id,
1040 crate::core::events::QueueItemProgress {
1041 id: item_id,
1042 title: item_title.clone(),
1043 platform: item_platform.clone(),
1044 percent: clamped,
1045 speed_bytes_per_sec: current_speed,
1046 downloaded_bytes,
1047 total_bytes,
1048 phase: phase.to_string(),
1049 },
1050 );
1051 }
1052 }
1053 });
1054
1055 if let Some(ua) = opts.user_agent.clone() {
1056 crate::core::ytdlp::register_ext_user_agent(ctx.url.clone(), ua);
1057 }
1058 if let Some(hdrs) = opts.extra_headers.clone() {
1059 crate::core::ytdlp::register_ext_headers(ctx.url.clone(), hdrs);
1060 }
1061
1062 let dl_start = std::time::Instant::now();
1063 let dl_future = async {
1064 tokio::select! {
1065 r = ctx.downloader.download(&info, &opts, tx) => r,
1066 _ = ctx.cancel_token.cancelled() => {
1067 Err(anyhow::anyhow!("Download cancelado"))
1068 }
1069 }
1070 };
1071 let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
1072 .scope(item_id, dl_future)
1073 .await;
1074 crate::core::ytdlp::clear_ext_user_agent(&ctx.url);
1075 crate::core::ytdlp::clear_ext_headers(&ctx.url);
1076 tracing::info!(
1077 "[queue] download {} completed in {:?}",
1078 item_id,
1079 dl_start.elapsed()
1080 );
1081
1082 let _ = progress_forwarder.await;
1083
1084 let was_paused = {
1085 let q = queue.lock().await;
1086 q.items
1087 .iter()
1088 .find(|i| i.id == item_id)
1089 .map(|i| i.status == QueueStatus::Paused)
1090 .unwrap_or(false)
1091 };
1092
1093 if was_paused {
1094 let state = {
1095 let q = queue.lock().await;
1096 q.get_state()
1097 };
1098 emit_queue_state_from_state(&reporter, state);
1099 tokio::spawn(try_start_next(queue));
1100 return;
1101 }
1102
1103 handle_download_result(queue.clone(), item_id, ctx, info, reporter, result).await;
1104
1105 tokio::spawn(try_start_next(queue));
1106}
1107
1108pub async fn fetch_and_cache_info(
1109 url: &str,
1110 downloader: &dyn PlatformDownloader,
1111 platform: &str,
1112) -> anyhow::Result<MediaInfo> {
1113 {
1114 let cache = info_cache().lock().await;
1115 if let Some(entry) = cache.get(url) {
1116 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1117 tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1118 return Ok(entry.info.clone());
1119 }
1120 }
1121 }
1122
1123 let url_lock = {
1124 let mut map = in_flight_map().lock().await;
1125 map.entry(url.to_string())
1126 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1127 .clone()
1128 };
1129 let _guard = url_lock.lock().await;
1130
1131 {
1132 let cache = info_cache().lock().await;
1133 if let Some(entry) = cache.get(url) {
1134 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1135 tracing::debug!(
1136 "[perf] fetch_and_cache_info: dedup cache hit for {}",
1137 platform
1138 );
1139 return Ok(entry.info.clone());
1140 }
1141 }
1142 }
1143
1144 tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1145 let mut info = downloader.get_media_info(url).await?;
1146
1147 if is_generic_title(&info.title) {
1148 let name = crate::core::random_names::get_random_name();
1149 info.title = format!("video_{}", name);
1150 }
1151
1152 let mut cache = info_cache().lock().await;
1153 cache.insert(
1154 url.to_string(),
1155 CachedInfo {
1156 info: info.clone(),
1157 cached_at: std::time::Instant::now(),
1158 },
1159 );
1160 if cache.len() > 50 {
1161 cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1162 }
1163 Ok(info)
1164}
1165
1166pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1167 let cache = info_cache().lock().await;
1168 cache
1169 .get(url)
1170 .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1171 .map(|entry| entry.info.clone())
1172}
1173
1174pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1175 prefetch_info_with_emit(url, downloader, platform, None).await;
1176}
1177
1178pub async fn prefetch_info_with_emit(
1179 url: &str,
1180 downloader: &dyn PlatformDownloader,
1181 platform: &str,
1182 reporter: Option<SharedReporter>,
1183) {
1184 let _timer_start = std::time::Instant::now();
1185 tracing::debug!("[perf] prefetch_info: started");
1186 match fetch_and_cache_info(url, downloader, platform).await {
1187 Ok(info) => {
1188 tracing::debug!(
1189 "[perf] prefetch_info: completed in {:?} — {}",
1190 _timer_start.elapsed(),
1191 info.title
1192 );
1193 if let Some(r) = reporter {
1194 r.on_media_preview(
1195 url.to_string(),
1196 info.title.clone(),
1197 info.author.clone(),
1198 info.thumbnail_url.clone(),
1199 info.duration_seconds,
1200 );
1201 }
1202 }
1203 Err(e) => tracing::warn!(
1204 "[perf] prefetch_info: failed in {:?} — {}",
1205 _timer_start.elapsed(),
1206 e
1207 ),
1208 }
1209}
1210
1211pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1212 let _timer_start = std::time::Instant::now();
1213 let (next_ids, stagger, state_to_emit, reporter) = {
1214 let mut q = queue.lock().await;
1215 let ids = q.next_queued_ids();
1216 for nid in &ids {
1217 q.mark_active(*nid);
1218 }
1219 let state = if !ids.is_empty() {
1220 Some(q.get_state())
1221 } else {
1222 None
1223 };
1224 (ids, q.stagger_delay_ms, state, q.reporter.clone())
1225 };
1226
1227 if let Some(state) = state_to_emit {
1228 emit_queue_state_from_state(&reporter, state);
1229 }
1230
1231 let batch_size = next_ids.len();
1232 for (i, nid) in next_ids.into_iter().enumerate() {
1233 if let Some(r) = &reporter {
1234 r.on_progress(
1235 nid,
1236 crate::core::events::QueueItemProgress {
1237 id: nid,
1238 title: String::new(),
1239 platform: String::new(),
1240 percent: 0.0,
1241 speed_bytes_per_sec: 0.0,
1242 downloaded_bytes: 0,
1243 total_bytes: None,
1244 phase: "queued_starting".to_string(),
1245 },
1246 );
1247 }
1248
1249 if i > 0 {
1250 let item_platform = {
1251 let q = queue.lock().await;
1252 q.items
1253 .iter()
1254 .find(|item| item.id == nid)
1255 .map(|item| item.platform.clone())
1256 };
1257 let delay_ms = if item_platform.as_deref() == Some("youtube") {
1258 2000
1259 } else if batch_size > 3 {
1260 stagger.max(1000)
1261 } else {
1262 stagger
1263 };
1264 if delay_ms > 0 {
1265 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1266 }
1267 }
1268 let queue_c = queue.clone();
1269 tokio::spawn(async move {
1270 tokio::spawn(spawn_download(queue_c, nid));
1271 });
1272 }
1273 tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1274}
1275
1276fn is_generic_title(title: &str) -> bool {
1277 let t = title.to_lowercase();
1278 let t = t.trim();
1279 t.is_empty()
1280 || t == "video"
1281 || t == "media"
1282 || t == "untitled"
1283 || t == "unknown"
1284 || t.starts_with("video [video]")
1285 || t.starts_with("media [media]")
1286}