1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use crate::core::traits::SharedReporter;
13use crate::models::media::MediaInfo;
14use crate::models::queue::{QueueItemInfo, QueueStatus};
15use crate::platforms::traits::PlatformDownloader;
16
17fn shared_http_client() -> &'static reqwest::Client {
18 static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
19 CLIENT.get_or_init(|| {
20 crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
21 .build()
22 .unwrap_or_default()
23 })
24}
25
26struct CachedInfo {
27 info: MediaInfo,
28 cached_at: std::time::Instant,
29}
30
31static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
32
33fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
34 INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
35}
36
37const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
38
39static IN_FLIGHT_FETCHES: OnceLock<
40 tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
41> = OnceLock::new();
42
43fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
44 IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct MediaPreviewEvent {
49 pub url: String,
50 pub title: String,
51 pub author: String,
52 pub thumbnail_url: Option<String>,
53 pub duration_seconds: Option<f64>,
54}
55
56pub struct QueueItem {
57 pub id: u64,
58 pub url: String,
59 pub platform: String,
60 pub title: String,
61 pub status: QueueStatus,
62 pub cancel_token: CancellationToken,
63 pub output_dir: String,
64 pub download_mode: Option<String>,
65 pub quality: Option<String>,
66 pub format_id: Option<String>,
67 pub referer: Option<String>,
68 pub extra_headers: Option<std::collections::HashMap<String, String>>,
69 pub page_url: Option<String>,
70 pub user_agent: Option<String>,
71 pub percent: f64,
72 pub speed_bytes_per_sec: f64,
73 pub downloaded_bytes: u64,
74 pub total_bytes: Option<u64>,
75 pub file_path: Option<String>,
76 pub file_size_bytes: Option<u64>,
77 pub file_count: Option<u32>,
78 pub media_info: Option<MediaInfo>,
79 pub downloader: Arc<dyn PlatformDownloader>,
80 pub ytdlp_path: Option<PathBuf>,
81 pub from_hotkey: bool,
82 pub torrent_id: Option<usize>,
83 pub phase: String,
84}
85
86impl QueueItem {
87 pub fn to_info(&self) -> QueueItemInfo {
88 QueueItemInfo {
89 id: self.id,
90 url: self.url.clone(),
91 platform: self.platform.clone(),
92 title: self.title.clone(),
93 status: self.status.clone(),
94 percent: self.percent,
95 speed_bytes_per_sec: self.speed_bytes_per_sec,
96 downloaded_bytes: self.downloaded_bytes,
97 total_bytes: self.total_bytes,
98 phase: self.phase.clone(),
99 file_path: self.file_path.clone(),
100 file_size_bytes: self.file_size_bytes,
101 file_count: self.file_count,
102 thumbnail_url: self
103 .media_info
104 .as_ref()
105 .and_then(|m| m.thumbnail_url.clone()),
106 }
107 }
108}
109
110pub struct DownloadQueue {
111 pub items: Vec<QueueItem>,
112 pub max_concurrent: u32,
113 pub stagger_delay_ms: u64,
114 pub reporter: Option<SharedReporter>,
115}
116
117impl DownloadQueue {
118 pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
119 Self {
120 items: Vec::new(),
121 max_concurrent,
122 stagger_delay_ms: 150,
123 reporter,
124 }
125 }
126
127 pub fn set_reporter(&mut self, reporter: SharedReporter) {
128 self.reporter = Some(reporter);
129 }
130
131 pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
132 let recovery_items = crate::core::manager::recovery::list();
133 for item in recovery_items {
134 let downloader = registry
135 .find_platform(&item.url)
136 .or_else(|| registry.find_by_name(&item.platform));
137
138 if let Some(dl) = downloader {
139 let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
140 100.0
141 } else {
142 0.0
143 };
144
145 let q_item = QueueItem {
146 id: item.id,
147 url: item.url,
148 platform: item.platform,
149 title: item.title,
150 status: item.status,
151 cancel_token: CancellationToken::new(),
152 output_dir: item.output_dir,
153 download_mode: item.download_mode,
154 quality: item.quality,
155 format_id: item.format_id,
156 referer: item.referer,
157 extra_headers: None,
158 page_url: None,
159 user_agent: None,
160 percent,
161 speed_bytes_per_sec: 0.0,
162 downloaded_bytes: 0,
163 total_bytes: item.file_size_bytes,
164 file_path: item.file_path,
165 file_size_bytes: item.file_size_bytes,
166 file_count: None,
167 media_info: None,
168 downloader: dl,
169 ytdlp_path: None,
170 from_hotkey: false,
171 torrent_id: None,
172 phase: "Restored".to_string(),
173 };
174 self.items.push(q_item);
175 }
176 }
177 }
178
179 fn sync_recovery(&self, item: &QueueItem) {
180 crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
181 id: item.id,
182 url: item.url.clone(),
183 title: item.title.clone(),
184 platform: item.platform.clone(),
185 output_dir: item.output_dir.clone(),
186 status: item.status.clone(),
187 download_mode: item.download_mode.clone(),
188 quality: item.quality.clone(),
189 format_id: item.format_id.clone(),
190 referer: item.referer.clone(),
191 file_path: item.file_path.clone(),
192 file_size_bytes: item.file_size_bytes,
193 });
194 }
195
196 #[allow(clippy::too_many_arguments)]
197 pub fn enqueue(
198 &mut self,
199 id: u64,
200 url: String,
201 platform: String,
202 title: String,
203 output_dir: String,
204 download_mode: Option<String>,
205 quality: Option<String>,
206 format_id: Option<String>,
207 referer: Option<String>,
208 extra_headers: Option<std::collections::HashMap<String, String>>,
209 page_url: Option<String>,
210 user_agent: Option<String>,
211 media_info: Option<MediaInfo>,
212 total_bytes: Option<u64>,
213 file_count: Option<u32>,
214 downloader: Arc<dyn PlatformDownloader>,
215 ytdlp_path: Option<PathBuf>,
216 from_hotkey: bool,
217 ) {
218 let item = QueueItem {
219 id,
220 url,
221 platform,
222 title,
223 status: QueueStatus::Queued,
224 cancel_token: CancellationToken::new(),
225 output_dir,
226 download_mode,
227 quality,
228 format_id,
229 referer,
230 extra_headers,
231 page_url,
232 user_agent,
233 percent: 0.0,
234 speed_bytes_per_sec: 0.0,
235 downloaded_bytes: 0,
236 total_bytes,
237 file_path: None,
238 file_size_bytes: None,
239 file_count,
240 media_info,
241 downloader,
242 ytdlp_path,
243 from_hotkey,
244 torrent_id: None,
245 phase: "Queued".to_string(),
246 };
247 self.items.push(item);
248 self.sync_recovery(self.items.last().unwrap());
249 }
250
251 pub fn active_count(&self) -> u32 {
252 self.items
253 .iter()
254 .filter(|i| i.status == QueueStatus::Active)
255 .count() as u32
256 }
257
258 pub fn next_queued_ids(&self) -> Vec<u64> {
259 let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
260 self.items
261 .iter()
262 .filter(|i| i.status == QueueStatus::Queued)
263 .take(slots)
264 .map(|i| i.id)
265 .collect()
266 }
267
268 pub fn mark_active(&mut self, id: u64) {
269 let item = self.items.iter_mut().find(|i| i.id == id);
270 if let Some(item) = item {
271 item.status = QueueStatus::Active;
272 item.cancel_token = CancellationToken::new();
273 let item_ref = item as *const QueueItem;
274 self.sync_recovery(unsafe { &*item_ref });
275 }
276 }
277
278 pub fn mark_complete(
279 &mut self,
280 id: u64,
281 success: bool,
282 error: Option<String>,
283 file_path: Option<String>,
284 file_size_bytes: Option<u64>,
285 ) {
286 let item = self.items.iter_mut().find(|i| i.id == id);
287 if let Some(item) = item {
288 if success {
289 item.status = QueueStatus::Complete { success: true };
290 item.percent = 100.0;
291 } else {
292 item.status = QueueStatus::Error {
293 message: error.unwrap_or_default(),
294 };
295 }
296 item.file_path = file_path;
297 item.file_size_bytes = file_size_bytes;
298 item.speed_bytes_per_sec = 0.0;
299 let item_ref = item as *const QueueItem;
300 self.sync_recovery(unsafe { &*item_ref });
301 }
302 }
303
304 pub fn mark_seeding(
305 &mut self,
306 id: u64,
307 file_path: Option<String>,
308 file_size_bytes: Option<u64>,
309 torrent_id: Option<usize>,
310 ) {
311 let item = self.items.iter_mut().find(|i| i.id == id);
312 if let Some(item) = item {
313 item.status = QueueStatus::Seeding;
314 item.percent = 100.0;
315 item.file_path = file_path;
316 item.file_size_bytes = file_size_bytes;
317 item.speed_bytes_per_sec = 0.0;
318 item.torrent_id = torrent_id;
319 let item_ref = item as *const QueueItem;
320 self.sync_recovery(unsafe { &*item_ref });
321 }
322 }
323
324 pub fn update_progress(
325 &mut self,
326 id: u64,
327 percent: f64,
328 speed: f64,
329 downloaded: u64,
330 total: Option<u64>,
331 torrent_id: Option<usize>,
332 ) {
333 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
334 item.percent = percent;
335 item.speed_bytes_per_sec = speed;
336 item.downloaded_bytes = downloaded;
337 if let Some(t) = total {
338 item.total_bytes = Some(t);
339 }
340 if torrent_id.is_some() && item.torrent_id.is_none() {
341 item.torrent_id = torrent_id;
342 }
343 }
344 }
345
346 pub fn pause(&mut self, id: u64) -> bool {
347 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
348 if item.status == QueueStatus::Active {
349 if item.platform != "magnet" {
350 item.cancel_token.cancel();
351 }
352 item.status = QueueStatus::Paused;
353 item.speed_bytes_per_sec = 0.0;
354 let item_ref = item as *const QueueItem;
355 self.sync_recovery(unsafe { &*item_ref });
356 return true;
357 }
358 }
359 false
360 }
361
362 pub fn resume(&mut self, id: u64) -> bool {
363 let item = self.items.iter_mut().find(|i| i.id == id);
364 if let Some(item) = item {
365 if item.status == QueueStatus::Paused {
366 if item.platform == "magnet" {
367 item.status = QueueStatus::Active;
368 } else {
369 item.status = QueueStatus::Queued;
370 item.cancel_token = CancellationToken::new();
371 }
372 let item_ref = item as *const QueueItem;
373 self.sync_recovery(unsafe { &*item_ref });
374 return true;
375 }
376 }
377 false
378 }
379
380 pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
381 let result = self.cancel_inner(id);
382 if result.0 {
383 crate::core::manager::recovery::remove(id);
384 }
385 result
386 }
387
388 fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
389 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
390 match &item.status {
391 QueueStatus::Active => {
392 item.cancel_token.cancel();
393 item.status = QueueStatus::Error {
394 message: "Cancelled".to_string(),
395 };
396 item.speed_bytes_per_sec = 0.0;
397 let item_ref = item as *const QueueItem;
398 self.sync_recovery(unsafe { &*item_ref });
399 return (true, None);
400 }
401 QueueStatus::Seeding => {
402 let tid = item.torrent_id;
403 item.status = QueueStatus::Error {
404 message: "Cancelled".to_string(),
405 };
406 item.speed_bytes_per_sec = 0.0;
407 let item_ref = item as *const QueueItem;
408 self.sync_recovery(unsafe { &*item_ref });
409 return (true, tid);
410 }
411 QueueStatus::Paused => {
412 item.cancel_token.cancel();
413 let tid = if item.platform == "magnet" {
414 item.torrent_id
415 } else {
416 None
417 };
418 item.status = QueueStatus::Error {
419 message: "Cancelled".to_string(),
420 };
421 item.speed_bytes_per_sec = 0.0;
422 let item_ref = item as *const QueueItem;
423 self.sync_recovery(unsafe { &*item_ref });
424 return (true, tid);
425 }
426 QueueStatus::Queued => {
427 item.status = QueueStatus::Error {
428 message: "Cancelled".to_string(),
429 };
430 let item_ref = item as *const QueueItem;
431 self.sync_recovery(unsafe { &*item_ref });
432 return (true, None);
433 }
434 _ => {}
435 }
436 }
437 (false, None)
438 }
439
440 pub fn retry(&mut self, id: u64) -> bool {
441 if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
442 if matches!(item.status, QueueStatus::Error { .. }) {
443 item.status = QueueStatus::Queued;
444 item.cancel_token = CancellationToken::new();
445 item.percent = 0.0;
446 item.speed_bytes_per_sec = 0.0;
447 item.downloaded_bytes = 0;
448 item.file_path = None;
449 item.file_size_bytes = None;
450 let item_ref = item as *const QueueItem;
451 self.sync_recovery(unsafe { &*item_ref });
452 return true;
453 }
454 }
455 false
456 }
457
458 pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
459 let result = self.remove_inner(id);
460 if result.is_some() {
461 crate::core::manager::recovery::remove(id);
462 }
463 result
464 }
465
466 fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
467 if let Some(pos) = self.items.iter().position(|i| i.id == id) {
468 let item = &self.items[pos];
469 if item.status == QueueStatus::Active {
470 item.cancel_token.cancel();
471 }
472 if item.status == QueueStatus::Paused && item.platform == "magnet" {
473 item.cancel_token.cancel();
474 }
475 let torrent_id = if item.status == QueueStatus::Seeding
476 || (item.status == QueueStatus::Paused && item.platform == "magnet")
477 {
478 item.torrent_id
479 } else {
480 None
481 };
482 self.items.remove(pos);
483 return Some(torrent_id);
484 }
485 None
486 }
487
488 pub fn clear_finished(&mut self) {
489 let to_remove: Vec<u64> = self
490 .items
491 .iter()
492 .filter(|i| {
493 matches!(
494 i.status,
495 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
496 )
497 })
498 .map(|i| i.id)
499 .collect();
500 for id in &to_remove {
501 crate::core::manager::recovery::remove(*id);
502 }
503 self.items.retain(|i| {
504 !matches!(
505 i.status,
506 QueueStatus::Complete { .. } | QueueStatus::Error { .. }
507 )
508 });
509 }
510
511 pub fn get_state(&self) -> Vec<QueueItemInfo> {
512 self.items.iter().map(|i| i.to_info()).collect()
513 }
514
515 pub fn has_url(&self, url: &str) -> bool {
516 self.items.iter().any(|i| {
517 i.url == url
518 && matches!(
519 i.status,
520 QueueStatus::Queued
521 | QueueStatus::Active
522 | QueueStatus::Paused
523 | QueueStatus::Seeding
524 )
525 })
526 }
527}
528
529pub struct ProgressThrottle {
530 last_emit: std::time::Instant,
531 min_interval: std::time::Duration,
532}
533
534impl ProgressThrottle {
535 pub fn new(min_interval_ms: u64) -> Self {
536 Self {
537 last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
538 min_interval: std::time::Duration::from_millis(min_interval_ms),
539 }
540 }
541
542 pub fn should_emit(&mut self) -> bool {
543 let now = std::time::Instant::now();
544 if now.duration_since(self.last_emit) >= self.min_interval {
545 self.last_emit = now;
546 true
547 } else {
548 false
549 }
550 }
551}
552
553pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
554 let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
555 if n.is_multiple_of(10) {
556 tracing::debug!("[perf] emit_queue_state called {} times", n);
557 }
558 if let Some(reporter) = reporter {
559 reporter.on_queue_update(state);
560 }
561}
562
563pub fn emit_queue_state(queue: &DownloadQueue) {
564 let state = queue.get_state();
565 emit_queue_state_from_state(&queue.reporter, state);
566}
567
568struct ActiveJobSlot {
570 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
571 item_id: u64,
572 armed: bool,
573}
574
575impl ActiveJobSlot {
576 fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
577 Self {
578 queue,
579 item_id,
580 armed: true,
581 }
582 }
583
584 fn disarm(mut self) {
585 self.armed = false;
586 }
587}
588
589impl Drop for ActiveJobSlot {
590 fn drop(&mut self) {
591 if !self.armed {
592 return;
593 }
594 let queue = self.queue.clone();
595 let item_id = self.item_id;
596 tokio::spawn(async move {
597 let state = {
598 let mut q = queue.lock().await;
599 let still_active = q
600 .items
601 .iter()
602 .find(|i| i.id == item_id)
603 .map(|i| i.status == QueueStatus::Active)
604 .unwrap_or(false);
605 if !still_active {
606 return;
607 }
608 tracing::warn!(
609 "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
610 item_id
611 );
612 q.mark_complete(
613 item_id,
614 false,
615 Some("Download interrupted".to_string()),
616 None,
617 None,
618 );
619 q.get_state()
620 };
621 let reporter = { queue.lock().await.reporter.clone() };
622 emit_queue_state_from_state(&reporter, state);
623 tokio::spawn(try_start_next(queue));
624 });
625 }
626}
627
628pub fn spawn_download(
629 queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
630 item_id: u64,
631) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
632 Box::pin(async move {
633 let _timer_start = std::time::Instant::now();
634 let slot = ActiveJobSlot::new(queue.clone(), item_id);
635 tokio::spawn(spawn_download_inner(queue.clone(), item_id));
636 slot.disarm();
637 tracing::debug!(
638 "[perf] spawn_download {} took {:?}",
639 item_id,
640 _timer_start.elapsed()
641 );
642 })
643}
644
645async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
646 tracing::info!("[queue] download {} started", item_id);
647
648 let reporter = { queue.lock().await.reporter.clone() };
649
650 if let Some(r) = &reporter {
651 r.on_progress(
652 item_id,
653 crate::core::events::QueueItemProgress {
654 id: item_id,
655 title: "".to_string(),
656 platform: "".to_string(),
657 percent: 0.0,
658 speed_bytes_per_sec: 0.0,
659 downloaded_bytes: 0,
660 total_bytes: None,
661 phase: "preparing".to_string(),
662 },
663 );
664 }
665
666 let (
667 url,
668 output_dir,
669 download_mode,
670 quality,
671 format_id,
672 referer,
673 extra_headers,
674 page_url,
675 user_agent,
676 cancel_token,
677 media_info,
678 platform_name,
679 downloader,
680 ytdlp_path,
681 from_hotkey,
682 ) = {
683 let q = queue.lock().await;
684 let item = match q.items.iter().find(|i| i.id == item_id) {
685 Some(i) => i,
686 None => return,
687 };
688 (
689 item.url.clone(),
690 item.output_dir.clone(),
691 item.download_mode.clone(),
692 item.quality.clone(),
693 item.format_id.clone(),
694 item.referer.clone(),
695 item.extra_headers.clone(),
696 item.page_url.clone(),
697 item.user_agent.clone(),
698 item.cancel_token.clone(),
699 item.media_info.clone(),
700 item.platform.clone(),
701 item.downloader.clone(),
702 item.ytdlp_path.clone(),
703 item.from_hotkey,
704 )
705 };
706
707 let info_start = std::time::Instant::now();
708 let info = match media_info {
709 Some(i) => {
710 tracing::info!(
711 "[queue] info for {} from cache/pre-fetched in {:?}",
712 item_id,
713 info_start.elapsed()
714 );
715 i
716 }
717 None => {
718 tracing::debug!(
719 "[perf] spawn_download_inner {}: media_info is None, fetching info",
720 item_id
721 );
722 if let Some(r) = &reporter {
723 r.on_progress(
724 item_id,
725 crate::core::events::QueueItemProgress {
726 id: item_id,
727 title: url.clone(),
728 platform: platform_name.clone(),
729 percent: 0.0,
730 speed_bytes_per_sec: 0.0,
731 downloaded_bytes: 0,
732 total_bytes: None,
733 phase: "fetching_info".to_string(),
734 },
735 );
736 }
737
738 let info_result = tokio::time::timeout(
739 std::time::Duration::from_secs(60),
740 fetch_and_cache_info(&url, &*downloader, &platform_name),
741 )
742 .await;
743
744 match info_result {
745 Ok(Ok(i)) => i,
746 Ok(Err(e)) => {
747 let state = {
748 let mut q = queue.lock().await;
749 q.mark_complete(item_id, false, Some(e.to_string()), None, None);
750 q.get_state()
751 };
752 emit_queue_state_from_state(&reporter, state);
753 tokio::spawn(try_start_next(queue));
754 return;
755 }
756 Err(_) => {
757 tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
758 let state = {
759 let mut q = queue.lock().await;
760 q.mark_complete(
761 item_id,
762 false,
763 Some("Timed out fetching video info".to_string()),
764 None,
765 None,
766 );
767 q.get_state()
768 };
769 emit_queue_state_from_state(&reporter, state);
770 tokio::spawn(try_start_next(queue));
771 return;
772 }
773 }
774 }
775 };
776 tracing::info!(
777 "[queue] info fetch for {} took {:?}",
778 item_id,
779 info_start.elapsed()
780 );
781
782 let state = {
783 let mut q = queue.lock().await;
784 if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
785 item.title = info.title.clone();
786 item.total_bytes = info.file_size_bytes;
787 let fc = if info.media_type == crate::models::media::MediaType::Carousel
788 || info.media_type == crate::models::media::MediaType::Playlist
789 {
790 info.available_qualities.len() as u32
791 } else {
792 1
793 };
794 item.file_count = Some(fc);
795 item.media_info = Some(info.clone());
796 }
797 q.get_state()
798 };
799 emit_queue_state_from_state(&reporter, state);
800
801 if let Some(r) = &reporter {
802 r.on_progress(
803 item_id,
804 crate::core::events::QueueItemProgress {
805 id: item_id,
806 title: info.title.clone(),
807 platform: platform_name.clone(),
808 percent: 0.5,
809 speed_bytes_per_sec: 0.0,
810 downloaded_bytes: 0,
811 total_bytes: info.file_size_bytes,
812 phase: "starting".to_string(),
813 },
814 );
815 }
816
817 let settings = crate::models::settings::AppSettings::load_from_disk();
818 let tmpl = settings.download.filename_template.clone();
819 let mut final_output_dir = std::path::PathBuf::from(&output_dir);
820 if settings.download.organize_by_platform {
821 final_output_dir = final_output_dir.join(&platform_name);
822 }
823 let torrent_id_slot = Arc::new(tokio::sync::Mutex::new(None));
824 let opts = crate::models::media::DownloadOptions {
825 quality: quality.or_else(|| Some(settings.download.video_quality.clone())),
826 output_dir: final_output_dir,
827 filename_template: Some(tmpl),
828 download_subtitles: settings.download.download_subtitles,
829 include_auto_subtitles: settings.download.include_auto_subtitles,
830 download_mode,
831 format_id,
832 referer,
833 extra_headers,
834 page_url,
835 user_agent,
836 cancel_token: cancel_token.clone(),
837 concurrent_fragments: settings.advanced.concurrent_fragments,
838 ytdlp_path,
839 torrent_listen_port: Some(settings.advanced.torrent_listen_port),
840 torrent_id_slot: Some(torrent_id_slot.clone()),
841 };
842
843 let total_bytes = info.file_size_bytes;
844 let item_title = info.title.clone();
845 let item_platform = platform_name.clone();
846 let (tx, mut rx) = mpsc::channel::<f64>(32);
847
848 let reporter_progress = reporter.clone();
849 let queue_progress = queue.clone();
850 let torrent_id_slot_progress = torrent_id_slot.clone();
851 let progress_forwarder = tokio::spawn(async move {
852 let mut last_bytes: u64 = 0;
853 let mut last_time = std::time::Instant::now();
854 let mut throttle = ProgressThrottle::new(250);
855 let mut current_speed: f64 = 0.0;
856
857 while let Some(percent) = rx.recv().await {
858 if !throttle.should_emit() && percent < 100.0 {
859 continue;
860 }
861
862 let now = std::time::Instant::now();
863 let clamped = percent.max(0.0);
864 let downloaded_bytes = total_bytes
865 .map(|total| (clamped / 100.0 * total as f64) as u64)
866 .unwrap_or(0);
867
868 if total_bytes.is_some() && downloaded_bytes > last_bytes {
869 let dt = now.duration_since(last_time).as_secs_f64();
870 if dt > 0.1 {
871 let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
872 current_speed = if current_speed > 0.0 {
873 current_speed * 0.7 + instant_speed * 0.3
874 } else {
875 instant_speed
876 };
877 }
878 }
879
880 last_bytes = downloaded_bytes;
881 last_time = now;
882
883 let phase = match percent {
884 p if p < -1.5 => "connecting",
885 p if p < -0.5 => "starting",
886 p if p > 0.0 => "downloading",
887 _ => "starting",
888 };
889
890 {
891 let mut q = queue_progress.lock().await;
892 let tid = { *torrent_id_slot_progress.lock().await };
893 q.update_progress(
894 item_id,
895 clamped,
896 current_speed,
897 downloaded_bytes,
898 total_bytes,
899 tid,
900 );
901 }
902
903 if let Some(r) = &reporter_progress {
904 r.on_progress(
905 item_id,
906 crate::core::events::QueueItemProgress {
907 id: item_id,
908 title: item_title.clone(),
909 platform: item_platform.clone(),
910 percent: clamped,
911 speed_bytes_per_sec: current_speed,
912 downloaded_bytes,
913 total_bytes,
914 phase: phase.to_string(),
915 },
916 );
917 }
918 }
919 });
920
921 if let Some(ua) = opts.user_agent.clone() {
922 crate::core::ytdlp::register_ext_user_agent(url.clone(), ua);
923 }
924 if let Some(hdrs) = opts.extra_headers.clone() {
925 crate::core::ytdlp::register_ext_headers(url.clone(), hdrs);
926 }
927
928 let dl_start = std::time::Instant::now();
929 let dl_future = async {
930 tokio::select! {
931 r = downloader.download(&info, &opts, tx) => r,
932 _ = cancel_token.cancelled() => {
933 Err(anyhow::anyhow!("Download cancelado"))
934 }
935 }
936 };
937 let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
938 .scope(item_id, dl_future)
939 .await;
940 crate::core::ytdlp::clear_ext_user_agent(&url);
941 crate::core::ytdlp::clear_ext_headers(&url);
942 tracing::info!(
943 "[queue] download {} completed in {:?}",
944 item_id,
945 dl_start.elapsed()
946 );
947
948 let _ = progress_forwarder.await;
949
950 let was_paused = {
951 let q = queue.lock().await;
952 q.items
953 .iter()
954 .find(|i| i.id == item_id)
955 .map(|i| i.status == QueueStatus::Paused)
956 .unwrap_or(false)
957 };
958
959 if was_paused {
960 let state = {
961 let q = queue.lock().await;
962 q.get_state()
963 };
964 emit_queue_state_from_state(&reporter, state);
965 tokio::spawn(try_start_next(queue));
966 return;
967 }
968
969 match result {
970 Ok(dl) => {
971 if settings.download.embed_metadata
972 && platform_name != "magnet"
973 && crate::core::ffmpeg::is_ffmpeg_available().await
974 {
975 let metadata = crate::core::ffmpeg::MetadataEmbed {
976 title: Some(info.title.clone()),
977 artist: Some(info.author.clone()),
978 thumbnail_url: info.thumbnail_url.clone(),
979 ..Default::default()
980 };
981 if let Err(e) = crate::core::ffmpeg::embed_metadata(
982 &dl.file_path,
983 &metadata,
984 settings.download.embed_thumbnail,
985 shared_http_client(),
986 )
987 .await
988 {
989 tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
990 }
991 }
992
993 if from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
994 #[cfg(not(target_os = "android"))]
995 {
996 match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
997 Ok(()) => {
998 tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
999 }
1002 Err(e) => {
1003 tracing::warn!("[clipboard] failed to copy file: {}", e);
1004 }
1005 }
1006 }
1007 }
1008
1009 let state = {
1010 let mut q = queue.lock().await;
1011 if platform_name == "magnet" && dl.torrent_id.is_some() {
1012 q.mark_seeding(
1013 item_id,
1014 Some(dl.file_path.to_string_lossy().to_string()),
1015 Some(dl.file_size_bytes),
1016 dl.torrent_id,
1017 );
1018 } else {
1019 q.mark_complete(
1020 item_id,
1021 true,
1022 None,
1023 Some(dl.file_path.to_string_lossy().to_string()),
1024 Some(dl.file_size_bytes),
1025 );
1026 }
1027 q.get_state()
1028 };
1029 if let Some(r) = &reporter {
1030 r.on_complete(
1031 item_id,
1032 Some(dl.file_path.to_string_lossy().to_string()),
1033 Some(dl.file_size_bytes),
1034 );
1035 }
1036 emit_queue_state_from_state(&reporter, state);
1037 }
1038 Err(e) => {
1039 let raw_err = e.to_string();
1040 let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
1041 let user_msg = if category != "unknown" {
1042 format!("{} ({})", hint, raw_err)
1043 } else {
1044 raw_err.clone()
1045 };
1046 tracing::error!(
1047 "Download error '{}' [{}]: {}",
1048 platform_name,
1049 category,
1050 raw_err
1051 );
1052 let state = {
1053 let mut q = queue.lock().await;
1054 q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
1055 q.get_state()
1056 };
1057 if let Some(r) = &reporter {
1058 r.on_error(item_id, user_msg);
1059 }
1060 emit_queue_state_from_state(&reporter, state);
1061 }
1062 }
1063
1064 tokio::spawn(try_start_next(queue));
1065}
1066
1067pub async fn fetch_and_cache_info(
1068 url: &str,
1069 downloader: &dyn PlatformDownloader,
1070 platform: &str,
1071) -> anyhow::Result<MediaInfo> {
1072 {
1073 let cache = info_cache().lock().await;
1074 if let Some(entry) = cache.get(url) {
1075 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1076 tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1077 return Ok(entry.info.clone());
1078 }
1079 }
1080 }
1081
1082 let url_lock = {
1083 let mut map = in_flight_map().lock().await;
1084 map.entry(url.to_string())
1085 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1086 .clone()
1087 };
1088 let _guard = url_lock.lock().await;
1089
1090 {
1091 let cache = info_cache().lock().await;
1092 if let Some(entry) = cache.get(url) {
1093 if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1094 tracing::debug!(
1095 "[perf] fetch_and_cache_info: dedup cache hit for {}",
1096 platform
1097 );
1098 return Ok(entry.info.clone());
1099 }
1100 }
1101 }
1102
1103 tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1104 let mut info = downloader.get_media_info(url).await?;
1105
1106 if is_generic_title(&info.title) {
1107 let name = crate::core::random_names::get_random_name();
1108 info.title = format!("video_{}", name);
1109 }
1110
1111 let mut cache = info_cache().lock().await;
1112 cache.insert(
1113 url.to_string(),
1114 CachedInfo {
1115 info: info.clone(),
1116 cached_at: std::time::Instant::now(),
1117 },
1118 );
1119 if cache.len() > 50 {
1120 cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1121 }
1122 Ok(info)
1123}
1124
1125pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1126 let cache = info_cache().lock().await;
1127 cache
1128 .get(url)
1129 .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1130 .map(|entry| entry.info.clone())
1131}
1132
1133pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1134 prefetch_info_with_emit(url, downloader, platform, None).await;
1135}
1136
1137pub async fn prefetch_info_with_emit(
1138 url: &str,
1139 downloader: &dyn PlatformDownloader,
1140 platform: &str,
1141 reporter: Option<SharedReporter>,
1142) {
1143 let _timer_start = std::time::Instant::now();
1144 tracing::debug!("[perf] prefetch_info: started");
1145 match fetch_and_cache_info(url, downloader, platform).await {
1146 Ok(info) => {
1147 tracing::debug!(
1148 "[perf] prefetch_info: completed in {:?} — {}",
1149 _timer_start.elapsed(),
1150 info.title
1151 );
1152 if let Some(r) = reporter {
1153 r.on_media_preview(
1154 url.to_string(),
1155 info.title.clone(),
1156 info.author.clone(),
1157 info.thumbnail_url.clone(),
1158 info.duration_seconds,
1159 );
1160 }
1161 }
1162 Err(e) => tracing::warn!(
1163 "[perf] prefetch_info: failed in {:?} — {}",
1164 _timer_start.elapsed(),
1165 e
1166 ),
1167 }
1168}
1169
1170pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1171 let _timer_start = std::time::Instant::now();
1172 let (next_ids, stagger, state_to_emit, reporter) = {
1173 let mut q = queue.lock().await;
1174 let ids = q.next_queued_ids();
1175 for nid in &ids {
1176 q.mark_active(*nid);
1177 }
1178 let state = if !ids.is_empty() {
1179 Some(q.get_state())
1180 } else {
1181 None
1182 };
1183 (ids, q.stagger_delay_ms, state, q.reporter.clone())
1184 };
1185
1186 if let Some(state) = state_to_emit {
1187 emit_queue_state_from_state(&reporter, state);
1188 }
1189
1190 let batch_size = next_ids.len();
1191 for (i, nid) in next_ids.into_iter().enumerate() {
1192 if let Some(r) = &reporter {
1193 r.on_progress(
1194 nid,
1195 crate::core::events::QueueItemProgress {
1196 id: nid,
1197 title: String::new(),
1198 platform: String::new(),
1199 percent: 0.0,
1200 speed_bytes_per_sec: 0.0,
1201 downloaded_bytes: 0,
1202 total_bytes: None,
1203 phase: "queued_starting".to_string(),
1204 },
1205 );
1206 }
1207
1208 if i > 0 {
1209 let item_platform = {
1210 let q = queue.lock().await;
1211 q.items
1212 .iter()
1213 .find(|item| item.id == nid)
1214 .map(|item| item.platform.clone())
1215 };
1216 let delay_ms = if item_platform.as_deref() == Some("youtube") {
1217 2000
1218 } else if batch_size > 3 {
1219 stagger.max(1000)
1220 } else {
1221 stagger
1222 };
1223 if delay_ms > 0 {
1224 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1225 }
1226 }
1227 let queue_c = queue.clone();
1228 tokio::spawn(async move {
1229 tokio::spawn(spawn_download(queue_c, nid));
1230 });
1231 }
1232 tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1233}
1234
1235fn is_generic_title(title: &str) -> bool {
1236 let t = title.to_lowercase();
1237 let t = t.trim();
1238 t.is_empty()
1239 || t == "video"
1240 || t == "media"
1241 || t == "untitled"
1242 || t == "unknown"
1243 || t.starts_with("video [video]")
1244 || t.starts_with("media [media]")
1245}