1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use crate::core::traits::SharedReporter;
13use crate::models::media::MediaInfo;
14use crate::models::queue::{QueueItemInfo, QueueStatus};
15use crate::platforms::traits::PlatformDownloader;
16
17fn shared_http_client() -> &'static reqwest::Client {
18 static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
19 CLIENT.get_or_init(|| {
20 crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
21 .build()
22 .unwrap_or_default()
23 })
24}
25
26struct CachedInfo {
27 info: MediaInfo,
28 cached_at: std::time::Instant,
29}
30
31static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
32
33fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
34 INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
35}
36
37const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
38
39static IN_FLIGHT_FETCHES: OnceLock<
40 tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
41> = OnceLock::new();
42
43fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
44 IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct MediaPreviewEvent {
49 pub url: String,
50 pub title: String,
51 pub author: String,
52 pub thumbnail_url: Option<String>,
53 pub duration_seconds: Option<f64>,
54}
55
56pub struct QueueItem {
57 pub id: u64,
58 pub url: String,
59 pub platform: String,
60 pub title: String,
61 pub status: QueueStatus,
62 pub cancel_token: CancellationToken,
63 pub output_dir: String,
64 pub download_mode: Option<String>,
65 pub quality: Option<String>,
66 pub format_id: Option<String>,
67 pub referer: Option<String>,
68 pub extra_headers: Option<std::collections::HashMap<String, String>>,
69 pub page_url: Option<String>,
70 pub user_agent: Option<String>,
71 pub percent: f64,
72 pub speed_bytes_per_sec: f64,
73 pub downloaded_bytes: u64,
74 pub total_bytes: Option<u64>,
75 pub file_path: Option<String>,
76 pub file_size_bytes: Option<u64>,
77 pub file_count: Option<u32>,
78 pub media_info: Option<MediaInfo>,
79 pub downloader: Arc<dyn PlatformDownloader>,
80 pub ytdlp_path: Option<PathBuf>,
81 pub from_hotkey: bool,
82 pub torrent_id: Option<usize>,
83 pub phase: String,
84}
85
86impl QueueItem {
87 pub fn to_info(&self) -> QueueItemInfo {
88 QueueItemInfo {
89 id: self.id,
90 url: self.url.clone(),
91 platform: self.platform.clone(),
92 title: self.title.clone(),
93 status: self.status.clone(),
94 percent: self.percent,
95 speed_bytes_per_sec: self.speed_bytes_per_sec,
96 downloaded_bytes: self.downloaded_bytes,
97 total_bytes: self.total_bytes,
98 phase: self.phase.clone(),
99 file_path: self.file_path.clone(),
100 file_size_bytes: self.file_size_bytes,
101 file_count: self.file_count,
102 thumbnail_url: self
103 .media_info
104 .as_ref()
105 .and_then(|m| m.thumbnail_url.clone()),
106 }
107 }
108}
109
110pub struct DownloadQueue {
111 pub items: Vec<QueueItem>,
112 pub max_concurrent: u32,
113 pub stagger_delay_ms: u64,
114 pub reporter: Option<SharedReporter>,
115}
116
117impl DownloadQueue {
118 pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
119 Self {
120 items: Vec::new(),
121 max_concurrent,
122 stagger_delay_ms: 150,
123 reporter,
124 }
125 }
126
127 pub fn set_reporter(&mut self, reporter: SharedReporter) {
128 self.reporter = Some(reporter);
129 }
130
131 fn sync_recovery(&self, item: &QueueItem) {
132 crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
133 id: item.id,
134 url: item.url.clone(),
135 title: item.title.clone(),
136 platform: item.platform.clone(),
137 output_dir: item.output_dir.clone(),
138 status: item.status.clone(),
139 download_mode: item.download_mode.clone(),
140 quality: item.quality.clone(),
141 format_id: item.format_id.clone(),
142 referer: item.referer.clone(),
143 });
144 }
145
146 #[allow(clippy::too_many_arguments)]
147 pub fn enqueue(
148 &mut self,
149 id: u64,
150 url: String,
151 platform: String,
152 title: String,
153 output_dir: String,
154 download_mode: Option<String>,
155 quality: Option<String>,
156 format_id: Option<String>,
157 referer: Option<String>,
158 extra_headers: Option<std::collections::HashMap<String, String>>,
159 page_url: Option<String>,
160 user_agent: Option<String>,
161 media_info: Option<MediaInfo>,
162 total_bytes: Option<u64>,
163 file_count: Option<u32>,
164 downloader: Arc<dyn PlatformDownloader>,
165 ytdlp_path: Option<PathBuf>,
166 from_hotkey: bool,
167 ) {
168 let item = QueueItem {
169 id,
170 url,
171 platform,
172 title,
173 status: QueueStatus::Queued,
174 cancel_token: CancellationToken::new(),
175 output_dir,
176 download_mode,
177 quality,
178 format_id,
179 referer,
180 extra_headers,
181 page_url,
182 user_agent,
183 percent: 0.0,
184 speed_bytes_per_sec: 0.0,
185 downloaded_bytes: 0,
186 total_bytes,
187 file_path: None,
188 file_size_bytes: None,
189 file_count,
190 media_info,
191 downloader,
192 ytdlp_path,
193 from_hotkey,
194 torrent_id: None,
195 phase: "Queued".to_string(),
196 };
197 self.items.push(item);
198 self.sync_recovery(self.items.last().unwrap());
199 }
200
201 pub fn active_count(&self) -> u32 {
202 self.items
203 .iter()
204 .filter(|i| i.status == QueueStatus::Active)
205 .count() as u32
206 }
207
208 pub fn next_queued_ids(&self) -> Vec<u64> {
209 let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
210 self.items
211 .iter()
212 .filter(|i| i.status == QueueStatus::Queued)
213 .take(slots)
214 .map(|i| i.id)
215 .collect()
216 }
217
218 pub fn mark_active(&mut self, id: u64) {
219 let item = self.items.iter_mut().find(|i| i.id == id);
220 if let Some(item) = item {
221 item.status = QueueStatus::Active;
222 item.cancel_token = CancellationToken::new();
223 let item_ref = item as *const QueueItem;
224 self.sync_recovery(unsafe { &*item_ref });
225 }
226 }
227
228 pub fn mark_complete(
229 &mut self,
230 id: u64,
231 success: bool,
232 error: Option<String>,
233 file_path: Option<String>,
234 file_size_bytes: Option<u64>,
235 ) {
236 let item = self.items.iter_mut().find(|i| i.id == id);
237 if let Some(item) = item {
238 if success {
239 item.status = QueueStatus::Complete { success: true };
240 item.percent = 100.0;
241 } else {
242 item.status = QueueStatus::Error {
243 message: error.unwrap_or_default(),
244 };
245 }
246 item.file_path = file_path;
247 item.file_size_bytes = file_size_bytes;
248 item.speed_bytes_per_sec = 0.0;
249 let item_ref = item as *const QueueItem;
250 self.sync_recovery(unsafe { &*item_ref });
251 }
252 }
253
254 pub fn mark_seeding(
255 &mut self,
256 id: u64,
257 file_path: Option<String>,
258 file_size_bytes: Option<u64>,
259 torrent_id: Option<usize>,
260 ) {
261 let item = self.items.iter_mut().find(|i| i.id == id);
262 if let Some(item) = item {
263 item.status = QueueStatus::Seeding;
264 item.percent = 100.0;
265 item.file_path = file_path;
266 item.file_size_bytes = file_size_bytes;
267 item.speed_bytes_per_sec = 0.0;
268 item.torrent_id = torrent_id;
269 let item_ref = item as *const QueueItem;
270 self.sync_recovery(unsafe { &*item_ref });
271 }
272 }
273
274 pub fn update_progress(
275 &mut self,
276 id: u64,
277 percent: f64,
278 speed: f64,
279 downloaded: u64,
280 total: Option<u64>,
281 torrent_id: Option<usize>,
282 ) {
283 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
284 item.percent = percent;
285 item.speed_bytes_per_sec = speed;
286 item.downloaded_bytes = downloaded;
287 if let Some(t) = total {
288 item.total_bytes = Some(t);
289 }
290 if torrent_id.is_some() && item.torrent_id.is_none() {
291 item.torrent_id = torrent_id;
292 }
293 }
294 }
295
296 pub fn pause(&mut self, id: u64) -> bool {
297 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
298 if item.status == QueueStatus::Active {
299 if item.platform != "magnet" {
300 item.cancel_token.cancel();
301 }
302 item.status = QueueStatus::Paused;
303 item.speed_bytes_per_sec = 0.0;
304 let item_ref = item as *const QueueItem;
305 self.sync_recovery(unsafe { &*item_ref });
306 return true;
307 }
308 }
309 false
310 }
311
312 pub fn resume(&mut self, id: u64) -> bool {
313 let item = self.items.iter_mut().find(|i| i.id == id);
314 if let Some(item) = item {
315 if item.status == QueueStatus::Paused {
316 if item.platform == "magnet" {
317 item.status = QueueStatus::Active;
318 } else {
319 item.status = QueueStatus::Queued;
320 item.cancel_token = CancellationToken::new();
321 }
322 let item_ref = item as *const QueueItem;
323 self.sync_recovery(unsafe { &*item_ref });
324 return true;
325 }
326 }
327 false
328 }
329
330 pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
331 let result = self.cancel_inner(id);
332 if result.0 {
333 crate::core::manager::recovery::remove(id);
334 }
335 result
336 }
337
338 fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
339 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
340 match &item.status {
341 QueueStatus::Active => {
342 item.cancel_token.cancel();
343 item.status = QueueStatus::Error {
344 message: "Cancelled".to_string(),
345 };
346 item.speed_bytes_per_sec = 0.0;
347 let item_ref = item as *const QueueItem;
348 self.sync_recovery(unsafe { &*item_ref });
349 return (true, None);
350 }
351 QueueStatus::Seeding => {
352 let tid = item.torrent_id;
353 item.status = QueueStatus::Error {
354 message: "Cancelled".to_string(),
355 };
356 item.speed_bytes_per_sec = 0.0;
357 let item_ref = item as *const QueueItem;
358 self.sync_recovery(unsafe { &*item_ref });
359 return (true, tid);
360 }
361 QueueStatus::Paused => {
362 item.cancel_token.cancel();
363 let tid = if item.platform == "magnet" {
364 item.torrent_id
365 } else {
366 None
367 };
368 item.status = QueueStatus::Error {
369 message: "Cancelled".to_string(),
370 };
371 item.speed_bytes_per_sec = 0.0;
372 let item_ref = item as *const QueueItem;
373 self.sync_recovery(unsafe { &*item_ref });
374 return (true, tid);
375 }
376 QueueStatus::Queued => {
377 item.status = QueueStatus::Error {
378 message: "Cancelled".to_string(),
379 };
380 let item_ref = item as *const QueueItem;
381 self.sync_recovery(unsafe { &*item_ref });
382 return (true, None);
383 }
384 _ => {}
385 }
386 }
387 (false, None)
388 }
389
390 pub fn retry(&mut self, id: u64) -> bool {
391 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
392 if matches!(item.status, QueueStatus::Error { .. }) {
393 item.status = QueueStatus::Queued;
394 item.cancel_token = CancellationToken::new();
395 item.percent = 0.0;
396 item.speed_bytes_per_sec = 0.0;
397 item.downloaded_bytes = 0;
398 item.file_path = None;
399 item.file_size_bytes = None;
400 let item_ref = item as *const QueueItem;
401 self.sync_recovery(unsafe { &*item_ref });
402 return true;
403 }
404 }
405 false
406 }
407
408 pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
409 let result = self.remove_inner(id);
410 if result.is_some() {
411 crate::core::manager::recovery::remove(id);
412 }
413 result
414 }
415
416 fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
417 if let Some(pos) = self.items.iter().position(|i| i.id == id) {
418 let item = &self.items[pos];
419 if item.status == QueueStatus::Active {
420 item.cancel_token.cancel();
421 }
422 if item.status == QueueStatus::Paused && item.platform == "magnet" {
423 item.cancel_token.cancel();
424 }
425 let torrent_id = if item.status == QueueStatus::Seeding
426 || (item.status == QueueStatus::Paused && item.platform == "magnet")
427 {
428 item.torrent_id
429 } else {
430 None
431 };
432 self.items.remove(pos);
433 return Some(torrent_id);
434 }
435 None
436 }
437
438 pub fn clear_finished(&mut self) {
439 let to_remove: Vec<u64> = self
440 .items
441 .iter()
442 .filter(|i| {
443 matches!(
444 i.status,
445 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
446 )
447 })
448 .map(|i| i.id)
449 .collect();
450 for id in &to_remove {
451 crate::core::manager::recovery::remove(*id);
452 }
453 self.items.retain(|i| {
454 !matches!(
455 i.status,
456 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
457 )
458 });
459 }
460
461 pub fn get_state(&self) -> Vec<QueueItemInfo> {
462 self.items.iter().map(|i| i.to_info()).collect()
463 }
464
465 pub fn has_url(&self, url: &str) -> bool {
466 self.items.iter().any(|i| {
467 i.url == url
468 && matches!(
469 i.status,
470 QueueStatus::Queued
471 | QueueStatus::Active
472 | QueueStatus::Paused
473 | QueueStatus::Seeding
474 )
475 })
476 }
477}
478
479pub struct ProgressThrottle {
480 last_emit: std::time::Instant,
481 min_interval: std::time::Duration,
482}
483
484impl ProgressThrottle {
485 pub fn new(min_interval_ms: u64) -> Self {
486 Self {
487 last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
488 min_interval: std::time::Duration::from_millis(min_interval_ms),
489 }
490 }
491
492 pub fn should_emit(&mut self) -> bool {
493 let now = std::time::Instant::now();
494 if now.duration_since(self.last_emit) >= self.min_interval {
495 self.last_emit = now;
496 true
497 } else {
498 false
499 }
500 }
501}
502
503pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
504 let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
505 if n.is_multiple_of(10) {
506 tracing::debug!("[perf] emit_queue_state called {} times", n);
507 }
508 if let Some(reporter) = reporter {
509 reporter.on_queue_update(state);
510 }
511}
512
513pub fn emit_queue_state(queue: &DownloadQueue) {
514 let state = queue.get_state();
515 emit_queue_state_from_state(&queue.reporter, state);
516}
517
518struct ActiveJobSlot {
520 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
521 item_id: u64,
522 armed: bool,
523}
524
525impl ActiveJobSlot {
526 fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
527 Self {
528 queue,
529 item_id,
530 armed: true,
531 }
532 }
533
534 fn disarm(mut self) {
535 self.armed = false;
536 }
537}
538
539impl Drop for ActiveJobSlot {
540 fn drop(&mut self) {
541 if !self.armed {
542 return;
543 }
544 let queue = self.queue.clone();
545 let item_id = self.item_id;
546 tokio::spawn(async move {
547 let state = {
548 let mut q = queue.lock().await;
549 let still_active = q
550 .items
551 .iter()
552 .find(|i| i.id == item_id)
553 .map(|i| i.status == QueueStatus::Active)
554 .unwrap_or(false);
555 if !still_active {
556 return;
557 }
558 tracing::warn!(
559 "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
560 item_id
561 );
562 q.mark_complete(
563 item_id,
564 false,
565 Some("Download interrupted".to_string()),
566 None,
567 None,
568 );
569 q.get_state()
570 };
571 let reporter = { queue.lock().await.reporter.clone() };
572 emit_queue_state_from_state(&reporter, state);
573 tokio::spawn(try_start_next(queue));
574 });
575 }
576}
577
578pub fn spawn_download(
579 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
580 item_id: u64,
581) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
582 Box::pin(async move {
583 let _timer_start = std::time::Instant::now();
584 let slot = ActiveJobSlot::new(queue.clone(), item_id);
585 tokio::spawn(spawn_download_inner(queue.clone(), item_id));
586 slot.disarm();
587 tracing::debug!(
588 "[perf] spawn_download {} took {:?}",
589 item_id,
590 _timer_start.elapsed()
591 );
592 })
593}
594
595async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
596 tracing::info!("[queue] download {} started", item_id);
597
598 let reporter = { queue.lock().await.reporter.clone() };
599
600 if let Some(r) = &reporter {
601 r.on_progress(
602 item_id,
603 crate::core::events::QueueItemProgress {
604 id: item_id,
605 title: "".to_string(),
606 platform: "".to_string(),
607 percent: 0.0,
608 speed_bytes_per_sec: 0.0,
609 downloaded_bytes: 0,
610 total_bytes: None,
611 phase: "preparing".to_string(),
612 },
613 );
614 }
615
616 let (
617 url,
618 output_dir,
619 download_mode,
620 quality,
621 format_id,
622 referer,
623 extra_headers,
624 page_url,
625 user_agent,
626 cancel_token,
627 media_info,
628 platform_name,
629 downloader,
630 ytdlp_path,
631 from_hotkey,
632 ) = {
633 let q = queue.lock().await;
634 let item = match q.items.iter().find(|i| i.id == item_id) {
635 Some(i) => i,
636 None => return,
637 };
638 (
639 item.url.clone(),
640 item.output_dir.clone(),
641 item.download_mode.clone(),
642 item.quality.clone(),
643 item.format_id.clone(),
644 item.referer.clone(),
645 item.extra_headers.clone(),
646 item.page_url.clone(),
647 item.user_agent.clone(),
648 item.cancel_token.clone(),
649 item.media_info.clone(),
650 item.platform.clone(),
651 item.downloader.clone(),
652 item.ytdlp_path.clone(),
653 item.from_hotkey,
654 )
655 };
656
657 let info_start = std::time::Instant::now();
658 let info = match media_info {
659 Some(i) => {
660 tracing::info!(
661 "[queue] info for {} from cache/pre-fetched in {:?}",
662 item_id,
663 info_start.elapsed()
664 );
665 i
666 }
667 None => {
668 tracing::debug!(
669 "[perf] spawn_download_inner {}: media_info is None, fetching info",
670 item_id
671 );
672 if let Some(r) = &reporter {
673 r.on_progress(
674 item_id,
675 crate::core::events::QueueItemProgress {
676 id: item_id,
677 title: url.clone(),
678 platform: platform_name.clone(),
679 percent: 0.0,
680 speed_bytes_per_sec: 0.0,
681 downloaded_bytes: 0,
682 total_bytes: None,
683 phase: "fetching_info".to_string(),
684 },
685 );
686 }
687
688 let info_result = tokio::time::timeout(
689 std::time::Duration::from_secs(60),
690 fetch_and_cache_info(&url, &*downloader, &platform_name),
691 )
692 .await;
693
694 match info_result {
695 Ok(Ok(i)) => i,
696 Ok(Err(e)) => {
697 let state = {
698 let mut q = queue.lock().await;
699 q.mark_complete(item_id, false, Some(e.to_string()), None, None);
700 q.get_state()
701 };
702 emit_queue_state_from_state(&reporter, state);
703 tokio::spawn(try_start_next(queue));
704 return;
705 }
706 Err(_) => {
707 tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
708 let state = {
709 let mut q = queue.lock().await;
710 q.mark_complete(
711 item_id,
712 false,
713 Some("Timed out fetching video info".to_string()),
714 None,
715 None,
716 );
717 q.get_state()
718 };
719 emit_queue_state_from_state(&reporter, state);
720 tokio::spawn(try_start_next(queue));
721 return;
722 }
723 }
724 }
725 };
726 tracing::info!(
727 "[queue] info fetch for {} took {:?}",
728 item_id,
729 info_start.elapsed()
730 );
731
732 let mut info = info;
733 if is_generic_title(&info.title) {
734 let pokemon = crate::core::pokemon_names::random_pokemon_name();
735 info.title = format!("video_{}", pokemon);
736 }
737
738 let state = {
739 let mut q = queue.lock().await;
740 if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
741 item.title = info.title.clone();
742 item.total_bytes = info.file_size_bytes;
743 let fc = if info.media_type == crate::models::media::MediaType::Carousel
744 || info.media_type == crate::models::media::MediaType::Playlist
745 {
746 info.available_qualities.len() as u32
747 } else {
748 1
749 };
750 item.file_count = Some(fc);
751 item.media_info = Some(info.clone());
752 }
753 q.get_state()
754 };
755 emit_queue_state_from_state(&reporter, state);
756
757 if let Some(r) = &reporter {
758 r.on_progress(
759 item_id,
760 crate::core::events::QueueItemProgress {
761 id: item_id,
762 title: info.title.clone(),
763 platform: platform_name.clone(),
764 percent: 0.5,
765 speed_bytes_per_sec: 0.0,
766 downloaded_bytes: 0,
767 total_bytes: info.file_size_bytes,
768 phase: "starting".to_string(),
769 },
770 );
771 }
772
773 let settings = crate::models::settings::AppSettings::load_from_disk();
774 let tmpl = settings.download.filename_template.clone();
775 let mut final_output_dir = std::path::PathBuf::from(&output_dir);
776 if settings.download.organize_by_platform {
777 final_output_dir = final_output_dir.join(&platform_name);
778 }
779 let torrent_id_slot = Arc::new(tokio::sync::Mutex::new(None));
780 let opts = crate::models::media::DownloadOptions {
781 quality: quality.or_else(|| Some(settings.download.video_quality.clone())),
782 output_dir: final_output_dir,
783 filename_template: Some(tmpl),
784 download_subtitles: settings.download.download_subtitles,
785 include_auto_subtitles: settings.download.include_auto_subtitles,
786 download_mode,
787 format_id,
788 referer,
789 extra_headers,
790 page_url,
791 user_agent,
792 cancel_token: cancel_token.clone(),
793 concurrent_fragments: settings.advanced.concurrent_fragments,
794 ytdlp_path,
795 torrent_listen_port: Some(settings.advanced.torrent_listen_port),
796 torrent_id_slot: Some(torrent_id_slot.clone()),
797 };
798
799 let total_bytes = info.file_size_bytes;
800 let item_title = info.title.clone();
801 let item_platform = platform_name.clone();
802 let (tx, mut rx) = mpsc::channel::<f64>(32);
803
804 let reporter_progress = reporter.clone();
805 let queue_progress = queue.clone();
806 let torrent_id_slot_progress = torrent_id_slot.clone();
807 let progress_forwarder = tokio::spawn(async move {
808 let mut last_bytes: u64 = 0;
809 let mut last_time = std::time::Instant::now();
810 let mut throttle = ProgressThrottle::new(250);
811 let mut current_speed: f64 = 0.0;
812
813 while let Some(percent) = rx.recv().await {
814 if !throttle.should_emit() && percent < 100.0 {
815 continue;
816 }
817
818 let now = std::time::Instant::now();
819 let clamped = percent.max(0.0);
820 let downloaded_bytes = total_bytes
821 .map(|total| (clamped / 100.0 * total as f64) as u64)
822 .unwrap_or(0);
823
824 if total_bytes.is_some() && downloaded_bytes > last_bytes {
825 let dt = now.duration_since(last_time).as_secs_f64();
826 if dt > 0.1 {
827 let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
828 current_speed = if current_speed > 0.0 {
829 current_speed * 0.7 + instant_speed * 0.3
830 } else {
831 instant_speed
832 };
833 }
834 }
835
836 last_bytes = downloaded_bytes;
837 last_time = now;
838
839 let phase = match percent {
840 p if p < -1.5 => "connecting",
841 p if p < -0.5 => "starting",
842 p if p > 0.0 => "downloading",
843 _ => "starting",
844 };
845
846 {
847 let mut q = queue_progress.lock().await;
848 let tid = { *torrent_id_slot_progress.lock().await };
849 q.update_progress(
850 item_id,
851 clamped,
852 current_speed,
853 downloaded_bytes,
854 total_bytes,
855 tid,
856 );
857 }
858
859 if let Some(r) = &reporter_progress {
860 r.on_progress(
861 item_id,
862 crate::core::events::QueueItemProgress {
863 id: item_id,
864 title: item_title.clone(),
865 platform: item_platform.clone(),
866 percent: clamped,
867 speed_bytes_per_sec: current_speed,
868 downloaded_bytes,
869 total_bytes,
870 phase: phase.to_string(),
871 },
872 );
873 }
874 }
875 });
876
877 if let Some(ua) = opts.user_agent.clone() {
878 crate::core::ytdlp::register_ext_user_agent(url.clone(), ua);
879 }
880 if let Some(hdrs) = opts.extra_headers.clone() {
881 crate::core::ytdlp::register_ext_headers(url.clone(), hdrs);
882 }
883
884 let dl_start = std::time::Instant::now();
885 let dl_future = async {
886 tokio::select! {
887 r = downloader.download(&info, &opts, tx) => r,
888 _ = cancel_token.cancelled() => {
889 Err(anyhow::anyhow!("Download cancelado"))
890 }
891 }
892 };
893 let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
894 .scope(item_id, dl_future)
895 .await;
896 crate::core::ytdlp::clear_ext_user_agent(&url);
897 crate::core::ytdlp::clear_ext_headers(&url);
898 tracing::info!(
899 "[queue] download {} completed in {:?}",
900 item_id,
901 dl_start.elapsed()
902 );
903
904 let _ = progress_forwarder.await;
905
906 let was_paused = {
907 let q = queue.lock().await;
908 q.items
909 .iter()
910 .find(|i| i.id == item_id)
911 .map(|i| i.status == QueueStatus::Paused)
912 .unwrap_or(false)
913 };
914
915 if was_paused {
916 let state = {
917 let q = queue.lock().await;
918 q.get_state()
919 };
920 emit_queue_state_from_state(&reporter, state);
921 tokio::spawn(try_start_next(queue));
922 return;
923 }
924
925 match result {
926 Ok(dl) => {
927 if settings.download.embed_metadata
928 && platform_name != "magnet"
929 && crate::core::ffmpeg::is_ffmpeg_available().await
930 {
931 let metadata = crate::core::ffmpeg::MetadataEmbed {
932 title: Some(info.title.clone()),
933 artist: Some(info.author.clone()),
934 thumbnail_url: info.thumbnail_url.clone(),
935 ..Default::default()
936 };
937 if let Err(e) = crate::core::ffmpeg::embed_metadata(
938 &dl.file_path,
939 &metadata,
940 settings.download.embed_thumbnail,
941 shared_http_client(),
942 )
943 .await
944 {
945 tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
946 }
947 }
948
949 if from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
950 #[cfg(not(target_os = "android"))]
951 {
952 match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
953 Ok(()) => {
954 tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
955 }
958 Err(e) => {
959 tracing::warn!("[clipboard] failed to copy file: {}", e);
960 }
961 }
962 }
963 }
964
965 let state = {
966 let mut q = queue.lock().await;
967 if platform_name == "magnet" && dl.torrent_id.is_some() {
968 q.mark_seeding(
969 item_id,
970 Some(dl.file_path.to_string_lossy().to_string()),
971 Some(dl.file_size_bytes),
972 dl.torrent_id,
973 );
974 } else {
975 q.mark_complete(
976 item_id,
977 true,
978 None,
979 Some(dl.file_path.to_string_lossy().to_string()),
980 Some(dl.file_size_bytes),
981 );
982 }
983 q.get_state()
984 };
985 if let Some(r) = &reporter {
986 r.on_complete(
987 item_id,
988 Some(dl.file_path.to_string_lossy().to_string()),
989 Some(dl.file_size_bytes),
990 );
991 }
992 emit_queue_state_from_state(&reporter, state);
993 }
994 Err(e) => {
995 let raw_err = e.to_string();
996 let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
997 let user_msg = if category != "unknown" {
998 format!("{} ({})", hint, raw_err)
999 } else {
1000 raw_err.clone()
1001 };
1002 tracing::error!(
1003 "Download error '{}' [{}]: {}",
1004 platform_name,
1005 category,
1006 raw_err
1007 );
1008 let state = {
1009 let mut q = queue.lock().await;
1010 q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
1011 q.get_state()
1012 };
1013 if let Some(r) = &reporter {
1014 r.on_error(item_id, user_msg);
1015 }
1016 emit_queue_state_from_state(&reporter, state);
1017 }
1018 }
1019
1020 tokio::spawn(try_start_next(queue));
1021}
1022
1023pub async fn fetch_and_cache_info(
1024 url: &str,
1025 downloader: &dyn PlatformDownloader,
1026 platform: &str,
1027) -> anyhow::Result<MediaInfo> {
1028 {
1029 let cache = info_cache().lock().await;
1030 if let Some(entry) = cache.get(url) {
1031 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1032 tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1033 return Ok(entry.info.clone());
1034 }
1035 }
1036 }
1037
1038 let url_lock = {
1039 let mut map = in_flight_map().lock().await;
1040 map.entry(url.to_string())
1041 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1042 .clone()
1043 };
1044 let _guard = url_lock.lock().await;
1045
1046 {
1047 let cache = info_cache().lock().await;
1048 if let Some(entry) = cache.get(url) {
1049 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1050 tracing::debug!(
1051 "[perf] fetch_and_cache_info: dedup cache hit for {}",
1052 platform
1053 );
1054 return Ok(entry.info.clone());
1055 }
1056 }
1057 }
1058
1059 tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1060 let info = downloader.get_media_info(url).await?;
1061
1062 let mut cache = info_cache().lock().await;
1063 cache.insert(
1064 url.to_string(),
1065 CachedInfo {
1066 info: info.clone(),
1067 cached_at: std::time::Instant::now(),
1068 },
1069 );
1070 if cache.len() > 50 {
1071 cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1072 }
1073 Ok(info)
1074}
1075
1076pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1077 let cache = info_cache().lock().await;
1078 cache
1079 .get(url)
1080 .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1081 .map(|entry| entry.info.clone())
1082}
1083
1084pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1085 prefetch_info_with_emit(url, downloader, platform, None).await;
1086}
1087
1088pub async fn prefetch_info_with_emit(
1089 url: &str,
1090 downloader: &dyn PlatformDownloader,
1091 platform: &str,
1092 reporter: Option<SharedReporter>,
1093) {
1094 let _timer_start = std::time::Instant::now();
1095 tracing::debug!("[perf] prefetch_info: started");
1096 match fetch_and_cache_info(url, downloader, platform).await {
1097 Ok(info) => {
1098 tracing::debug!(
1099 "[perf] prefetch_info: completed in {:?} — {}",
1100 _timer_start.elapsed(),
1101 info.title
1102 );
1103 if let Some(r) = reporter {
1104 r.on_media_preview(
1105 url.to_string(),
1106 info.title.clone(),
1107 info.author.clone(),
1108 info.thumbnail_url.clone(),
1109 info.duration_seconds,
1110 );
1111 }
1112 }
1113 Err(e) => tracing::warn!(
1114 "[perf] prefetch_info: failed in {:?} — {}",
1115 _timer_start.elapsed(),
1116 e
1117 ),
1118 }
1119}
1120
1121pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1122 let _timer_start = std::time::Instant::now();
1123 let (next_ids, stagger, state_to_emit, reporter) = {
1124 let mut q = queue.lock().await;
1125 let ids = q.next_queued_ids();
1126 for nid in &ids {
1127 q.mark_active(*nid);
1128 }
1129 let state = if !ids.is_empty() {
1130 Some(q.get_state())
1131 } else {
1132 None
1133 };
1134 (ids, q.stagger_delay_ms, state, q.reporter.clone())
1135 };
1136
1137 if let Some(state) = state_to_emit {
1138 emit_queue_state_from_state(&reporter, state);
1139 }
1140
1141 let batch_size = next_ids.len();
1142 for (i, nid) in next_ids.into_iter().enumerate() {
1143 if let Some(r) = &reporter {
1144 r.on_progress(
1145 nid,
1146 crate::core::events::QueueItemProgress {
1147 id: nid,
1148 title: String::new(),
1149 platform: String::new(),
1150 percent: 0.0,
1151 speed_bytes_per_sec: 0.0,
1152 downloaded_bytes: 0,
1153 total_bytes: None,
1154 phase: "queued_starting".to_string(),
1155 },
1156 );
1157 }
1158
1159 if i > 0 {
1160 let item_platform = {
1161 let q = queue.lock().await;
1162 q.items
1163 .iter()
1164 .find(|item| item.id == nid)
1165 .map(|item| item.platform.clone())
1166 };
1167 let delay_ms = if item_platform.as_deref() == Some("youtube") {
1168 2000
1169 } else if batch_size > 3 {
1170 stagger.max(1000)
1171 } else {
1172 stagger
1173 };
1174 if delay_ms > 0 {
1175 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1176 }
1177 }
1178 let queue_c = queue.clone();
1179 tokio::spawn(async move {
1180 tokio::spawn(spawn_download(queue_c, nid));
1181 });
1182 }
1183 tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1184}
1185
1186fn is_generic_title(title: &str) -> bool {
1187 let t = title.to_lowercase();
1188 let t = t.trim();
1189 t.is_empty()
1190 || t == "video"
1191 || t == "media"
1192 || t == "untitled"
1193 || t == "unknown"
1194 || t.starts_with("video [video]")
1195 || t.starts_with("media [media]")
1196}