Skip to main content

mangofetch_core/core/manager/
queue.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio_util::sync::CancellationToken;
10
11use crate::core::traits::SharedReporter;
12use crate::models::media::MediaInfo;
13use crate::models::queue::{QueueItemInfo, QueueStatus};
14use crate::platforms::traits::PlatformDownloader;
15
16fn shared_http_client() -> &'static reqwest::Client {
17    static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
18    CLIENT.get_or_init(|| {
19        crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
20            .build()
21            .unwrap_or_default()
22    })
23}
24
25struct CachedInfo {
26    info: MediaInfo,
27    cached_at: std::time::Instant,
28}
29
30static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
31
32fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
33    INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
34}
35
36const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
37
38static IN_FLIGHT_FETCHES: OnceLock<
39    tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
40> = OnceLock::new();
41
42fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
43    IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct MediaPreviewEvent {
48    pub url: String,
49    pub title: String,
50    pub author: String,
51    pub thumbnail_url: Option<String>,
52    pub duration_seconds: Option<f64>,
53}
54
55pub struct QueueItem {
56    pub id: u64,
57    pub url: String,
58    pub platform: String,
59    pub title: String,
60    pub status: QueueStatus,
61    pub cancel_token: CancellationToken,
62    pub output_dir: String,
63    pub download_mode: Option<String>,
64    pub quality: Option<String>,
65    pub format_id: Option<String>,
66    pub referer: Option<String>,
67    pub extra_headers: Option<std::collections::HashMap<String, String>>,
68    pub page_url: Option<String>,
69    pub user_agent: Option<String>,
70    pub percent: f64,
71    pub speed_bytes_per_sec: f64,
72    pub downloaded_bytes: u64,
73    pub total_bytes: Option<u64>,
74    pub file_path: Option<String>,
75    pub file_size_bytes: Option<u64>,
76    pub file_count: Option<u32>,
77    pub media_info: Option<MediaInfo>,
78    pub downloader: Arc<dyn PlatformDownloader>,
79    pub ytdlp_path: Option<PathBuf>,
80    pub from_hotkey: bool,
81    pub torrent_id: Option<usize>,
82    pub download_subtitles: Option<bool>,
83    pub phase: String,
84}
85
86impl QueueItem {
87    pub fn to_info(&self) -> QueueItemInfo {
88        QueueItemInfo {
89            id: self.id,
90            url: self.url.clone(),
91            platform: self.platform.clone(),
92            title: self.title.clone(),
93            status: self.status.clone(),
94            percent: self.percent,
95            speed_bytes_per_sec: self.speed_bytes_per_sec,
96            downloaded_bytes: self.downloaded_bytes,
97            total_bytes: self.total_bytes,
98            phase: self.phase.clone(),
99            file_path: self.file_path.clone(),
100            file_size_bytes: self.file_size_bytes,
101            file_count: self.file_count,
102            thumbnail_url: self
103                .media_info
104                .as_ref()
105                .and_then(|m| m.thumbnail_url.clone()),
106        }
107    }
108}
109
110pub struct DownloadQueue {
111    pub items: Vec<QueueItem>,
112    pub max_concurrent: u32,
113    pub stagger_delay_ms: u64,
114    pub reporter: Option<SharedReporter>,
115}
116
117impl DownloadQueue {
118    pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
119        Self {
120            items: Vec::new(),
121            max_concurrent,
122            stagger_delay_ms: 150,
123            reporter,
124        }
125    }
126
127    pub fn set_reporter(&mut self, reporter: SharedReporter) {
128        self.reporter = Some(reporter);
129    }
130
131    pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
132        let recovery_items = crate::core::manager::recovery::list();
133        for item in recovery_items {
134            let downloader = registry
135                .find_platform(&item.url)
136                .or_else(|| registry.find_by_name(&item.platform));
137
138            if let Some(dl) = downloader {
139                let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
140                    100.0
141                } else {
142                    0.0
143                };
144
145                let q_item = QueueItem {
146                    id: item.id,
147                    url: item.url,
148                    platform: item.platform,
149                    title: item.title,
150                    status: item.status,
151                    cancel_token: CancellationToken::new(),
152                    output_dir: item.output_dir,
153                    download_mode: item.download_mode,
154                    quality: item.quality,
155                    format_id: item.format_id,
156                    referer: item.referer,
157                    extra_headers: None,
158                    page_url: None,
159                    user_agent: None,
160                    percent,
161                    speed_bytes_per_sec: 0.0,
162                    downloaded_bytes: 0,
163                    total_bytes: item.file_size_bytes,
164                    file_path: item.file_path,
165                    file_size_bytes: item.file_size_bytes,
166                    file_count: None,
167                    media_info: None,
168                    downloader: dl,
169                    ytdlp_path: None,
170                    from_hotkey: false,
171                    torrent_id: None,
172                    download_subtitles: None,
173                    phase: "Restored".to_string(),
174                };
175                self.items.push(q_item);
176            }
177        }
178    }
179
180    fn sync_recovery(item: &QueueItem) {
181        crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
182            id: item.id,
183            url: item.url.clone(),
184            title: item.title.clone(),
185            platform: item.platform.clone(),
186            output_dir: item.output_dir.clone(),
187            status: item.status.clone(),
188            download_mode: item.download_mode.clone(),
189            quality: item.quality.clone(),
190            format_id: item.format_id.clone(),
191            referer: item.referer.clone(),
192            file_path: item.file_path.clone(),
193            file_size_bytes: item.file_size_bytes,
194        });
195    }
196
197    #[allow(clippy::too_many_arguments)]
198    pub fn enqueue(
199        &mut self,
200        id: u64,
201        url: String,
202        platform: String,
203        title: String,
204        output_dir: String,
205        download_mode: Option<String>,
206        quality: Option<String>,
207        download_subtitles: Option<bool>,
208        format_id: Option<String>,
209        referer: Option<String>,
210        extra_headers: Option<std::collections::HashMap<String, String>>,
211        page_url: Option<String>,
212        user_agent: Option<String>,
213        media_info: Option<MediaInfo>,
214        total_bytes: Option<u64>,
215        file_count: Option<u32>,
216        downloader: Arc<dyn PlatformDownloader>,
217        ytdlp_path: Option<PathBuf>,
218        from_hotkey: bool,
219    ) {
220        let item = QueueItem {
221            id,
222            url,
223            platform,
224            title,
225            status: QueueStatus::Queued,
226            cancel_token: CancellationToken::new(),
227            output_dir,
228            download_mode,
229            quality,
230            format_id,
231            referer,
232            extra_headers,
233            page_url,
234            user_agent,
235            percent: 0.0,
236            speed_bytes_per_sec: 0.0,
237            downloaded_bytes: 0,
238            total_bytes,
239            file_path: None,
240            file_size_bytes: None,
241            file_count,
242            media_info,
243            downloader,
244            ytdlp_path,
245            from_hotkey,
246            torrent_id: None,
247            download_subtitles,
248            phase: "Queued".to_string(),
249        };
250        self.items.push(item);
251        Self::sync_recovery(self.items.last().unwrap());
252    }
253
254    pub fn active_count(&self) -> u32 {
255        self.items
256            .iter()
257            .filter(|i| i.status == QueueStatus::Active)
258            .count() as u32
259    }
260
261    pub fn next_queued_ids(&self) -> Vec<u64> {
262        let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
263        self.items
264            .iter()
265            .filter(|i| i.status == QueueStatus::Queued)
266            .take(slots)
267            .map(|i| i.id)
268            .collect()
269    }
270
271    pub fn mark_active(&mut self, id: u64) {
272        let item = self.items.iter_mut().find(|i| i.id == id);
273        if let Some(item) = item {
274            item.status = QueueStatus::Active;
275            item.cancel_token = CancellationToken::new();
276            Self::sync_recovery(item);
277        }
278    }
279
280    pub fn mark_complete(
281        &mut self,
282        id: u64,
283        success: bool,
284        error: Option<String>,
285        file_path: Option<String>,
286        file_size_bytes: Option<u64>,
287    ) {
288        let item = self.items.iter_mut().find(|i| i.id == id);
289        if let Some(item) = item {
290            if success {
291                item.status = QueueStatus::Complete { success: true };
292                item.percent = 100.0;
293            } else {
294                item.status = QueueStatus::Error {
295                    message: error.unwrap_or_default(),
296                };
297            }
298            item.file_path = file_path;
299            item.file_size_bytes = file_size_bytes;
300            item.speed_bytes_per_sec = 0.0;
301            Self::sync_recovery(item);
302        }
303    }
304
305    pub fn mark_seeding(
306        &mut self,
307        id: u64,
308        file_path: Option<String>,
309        file_size_bytes: Option<u64>,
310        torrent_id: Option<usize>,
311    ) {
312        let item = self.items.iter_mut().find(|i| i.id == id);
313        if let Some(item) = item {
314            item.status = QueueStatus::Seeding;
315            item.percent = 100.0;
316            item.file_path = file_path;
317            item.file_size_bytes = file_size_bytes;
318            item.speed_bytes_per_sec = 0.0;
319            item.torrent_id = torrent_id;
320            Self::sync_recovery(item);
321        }
322    }
323
324    pub fn update_progress(
325        &mut self,
326        id: u64,
327        percent: f64,
328        speed: f64,
329        downloaded: u64,
330        total: Option<u64>,
331        torrent_id: Option<usize>,
332    ) {
333        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
334            item.percent = percent;
335            item.speed_bytes_per_sec = speed;
336            item.downloaded_bytes = downloaded;
337            if let Some(t) = total {
338                item.total_bytes = Some(t);
339            }
340            if torrent_id.is_some() && item.torrent_id.is_none() {
341                item.torrent_id = torrent_id;
342            }
343        }
344    }
345
346    pub fn pause(&mut self, id: u64) -> bool {
347        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
348            if item.status == QueueStatus::Active {
349                if item.platform != "magnet" {
350                    item.cancel_token.cancel();
351                }
352                item.status = QueueStatus::Paused;
353                item.speed_bytes_per_sec = 0.0;
354                Self::sync_recovery(item);
355                return true;
356            }
357        }
358        false
359    }
360
361    pub fn resume(&mut self, id: u64) -> bool {
362        let item = self.items.iter_mut().find(|i| i.id == id);
363        if let Some(item) = item {
364            if item.status == QueueStatus::Paused {
365                if item.platform == "magnet" {
366                    item.status = QueueStatus::Active;
367                } else {
368                    item.status = QueueStatus::Queued;
369                    item.cancel_token = CancellationToken::new();
370                }
371                Self::sync_recovery(item);
372                return true;
373            }
374        }
375        false
376    }
377
378    pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
379        let result = self.cancel_inner(id);
380        if result.0 {
381            crate::core::manager::recovery::remove(id);
382        }
383        result
384    }
385
386    fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
387        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
388            match &item.status {
389                QueueStatus::Active => {
390                    item.cancel_token.cancel();
391                    item.status = QueueStatus::Error {
392                        message: "Cancelled".to_string(),
393                    };
394                    item.speed_bytes_per_sec = 0.0;
395                    Self::sync_recovery(item);
396                    return (true, None);
397                }
398                QueueStatus::Seeding => {
399                    let tid = item.torrent_id;
400                    item.status = QueueStatus::Error {
401                        message: "Cancelled".to_string(),
402                    };
403                    item.speed_bytes_per_sec = 0.0;
404                    Self::sync_recovery(item);
405                    return (true, tid);
406                }
407                QueueStatus::Paused => {
408                    item.cancel_token.cancel();
409                    let tid = if item.platform == "magnet" {
410                        item.torrent_id
411                    } else {
412                        None
413                    };
414                    item.status = QueueStatus::Error {
415                        message: "Cancelled".to_string(),
416                    };
417                    item.speed_bytes_per_sec = 0.0;
418                    Self::sync_recovery(item);
419                    return (true, tid);
420                }
421                QueueStatus::Queued => {
422                    item.status = QueueStatus::Error {
423                        message: "Cancelled".to_string(),
424                    };
425                    Self::sync_recovery(item);
426                    return (true, None);
427                }
428                _ => {}
429            }
430        }
431        (false, None)
432    }
433
434    pub fn retry(&mut self, id: u64) -> bool {
435        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
436            if matches!(item.status, QueueStatus::Error { .. }) {
437                item.status = QueueStatus::Queued;
438                item.cancel_token = CancellationToken::new();
439                item.percent = 0.0;
440                item.speed_bytes_per_sec = 0.0;
441                item.downloaded_bytes = 0;
442                item.file_path = None;
443                item.file_size_bytes = None;
444                Self::sync_recovery(item);
445                return true;
446            }
447        }
448        false
449    }
450
451    pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
452        let result = self.remove_inner(id);
453        if result.is_some() {
454            crate::core::manager::recovery::remove(id);
455        }
456        result
457    }
458
459    fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
460        if let Some(pos) = self.items.iter().position(|i| i.id == id) {
461            let item = &self.items[pos];
462            if item.status == QueueStatus::Active {
463                item.cancel_token.cancel();
464            }
465            if item.status == QueueStatus::Paused && item.platform == "magnet" {
466                item.cancel_token.cancel();
467            }
468            let torrent_id = if item.status == QueueStatus::Seeding
469                || (item.status == QueueStatus::Paused && item.platform == "magnet")
470            {
471                item.torrent_id
472            } else {
473                None
474            };
475            self.items.remove(pos);
476            return Some(torrent_id);
477        }
478        None
479    }
480
481    pub fn clear_finished(&mut self) {
482        let to_remove: Vec<u64> = self
483            .items
484            .iter()
485            .filter(|i| {
486                matches!(
487                    i.status,
488                    QueueStatus::Complete { .. } | QueueStatus::Error { .. }
489                )
490            })
491            .map(|i| i.id)
492            .collect();
493        for id in &to_remove {
494            crate::core::manager::recovery::remove(*id);
495        }
496        self.items.retain(|i| {
497            !matches!(
498                i.status,
499                QueueStatus::Complete { .. } | QueueStatus::Error { .. }
500            )
501        });
502    }
503
504    pub fn get_state(&self) -> Vec<QueueItemInfo> {
505        self.items.iter().map(|i| i.to_info()).collect()
506    }
507
508    pub fn has_url(&self, url: &str) -> bool {
509        self.items.iter().any(|i| {
510            i.url == url
511                && matches!(
512                    i.status,
513                    QueueStatus::Queued
514                        | QueueStatus::Active
515                        | QueueStatus::Paused
516                        | QueueStatus::Seeding
517                )
518        })
519    }
520}
521
522pub struct ProgressThrottle {
523    last_emit: std::time::Instant,
524    min_interval: std::time::Duration,
525}
526
527impl ProgressThrottle {
528    pub fn new(min_interval_ms: u64) -> Self {
529        Self {
530            last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
531            min_interval: std::time::Duration::from_millis(min_interval_ms),
532        }
533    }
534
535    pub fn should_emit(&mut self) -> bool {
536        let now = std::time::Instant::now();
537        if now.duration_since(self.last_emit) >= self.min_interval {
538            self.last_emit = now;
539            true
540        } else {
541            false
542        }
543    }
544}
545
546pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
547    let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
548    if n.is_multiple_of(10) {
549        tracing::debug!("[perf] emit_queue_state called {} times", n);
550    }
551    if let Some(reporter) = reporter {
552        reporter.on_queue_update(state);
553    }
554}
555
556pub fn emit_queue_state(queue: &DownloadQueue) {
557    let state = queue.get_state();
558    emit_queue_state_from_state(&queue.reporter, state);
559}
560
561/// RAII guard that ensures an Active queue item never leaks a slot.
562struct ActiveJobSlot {
563    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
564    item_id: u64,
565    armed: bool,
566}
567
568impl ActiveJobSlot {
569    fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
570        Self {
571            queue,
572            item_id,
573            armed: true,
574        }
575    }
576
577    fn disarm(mut self) {
578        self.armed = false;
579    }
580}
581
582impl Drop for ActiveJobSlot {
583    fn drop(&mut self) {
584        if !self.armed {
585            return;
586        }
587        let queue = self.queue.clone();
588        let item_id = self.item_id;
589        tokio::spawn(async move {
590            let state = {
591                let mut q = queue.lock().await;
592                let still_active = q
593                    .items
594                    .iter()
595                    .find(|i| i.id == item_id)
596                    .map(|i| i.status == QueueStatus::Active)
597                    .unwrap_or(false);
598                if !still_active {
599                    return;
600                }
601                tracing::warn!(
602                    "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
603                    item_id
604                );
605                q.mark_complete(
606                    item_id,
607                    false,
608                    Some("Download interrupted".to_string()),
609                    None,
610                    None,
611                );
612                q.get_state()
613            };
614            let reporter = { queue.lock().await.reporter.clone() };
615            emit_queue_state_from_state(&reporter, state);
616            tokio::spawn(try_start_next(queue));
617        });
618    }
619}
620
621pub fn spawn_download(
622    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
623    item_id: u64,
624) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
625    Box::pin(async move {
626        let _timer_start = std::time::Instant::now();
627        let slot = ActiveJobSlot::new(queue.clone(), item_id);
628        tokio::spawn(spawn_download_inner(queue.clone(), item_id));
629        slot.disarm();
630        tracing::debug!(
631            "[perf] spawn_download {} took {:?}",
632            item_id,
633            _timer_start.elapsed()
634        );
635    })
636}
637
638struct DownloadContext {
639    url: String,
640    output_dir: String,
641    download_mode: Option<String>,
642    quality: Option<String>,
643    download_subtitles: Option<bool>,
644    format_id: Option<String>,
645    referer: Option<String>,
646    extra_headers: Option<std::collections::HashMap<String, String>>,
647    page_url: Option<String>,
648    user_agent: Option<String>,
649    cancel_token: tokio_util::sync::CancellationToken,
650    media_info: Option<crate::models::media::MediaInfo>,
651    platform_name: String,
652    downloader: std::sync::Arc<dyn crate::platforms::traits::PlatformDownloader>,
653    ytdlp_path: Option<std::path::PathBuf>,
654    from_hotkey: bool,
655}
656
657async fn extract_download_context(
658    queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
659    item_id: u64,
660) -> Option<DownloadContext> {
661    let q = queue.lock().await;
662    let item = q.items.iter().find(|i| i.id == item_id)?;
663    Some(DownloadContext {
664        url: item.url.clone(),
665        output_dir: item.output_dir.clone(),
666        download_mode: item.download_mode.clone(),
667        quality: item.quality.clone(),
668        download_subtitles: item.download_subtitles,
669        format_id: item.format_id.clone(),
670        referer: item.referer.clone(),
671        extra_headers: item.extra_headers.clone(),
672        page_url: item.page_url.clone(),
673        user_agent: item.user_agent.clone(),
674        cancel_token: item.cancel_token.clone(),
675        media_info: item.media_info.clone(),
676        platform_name: item.platform.clone(),
677        downloader: item.downloader.clone(),
678        ytdlp_path: item.ytdlp_path.clone(),
679        from_hotkey: item.from_hotkey,
680    })
681}
682
683async fn prepare_media_info(
684    queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
685    item_id: u64,
686    ctx: &DownloadContext,
687    reporter: &Option<crate::core::traits::SharedReporter>,
688) -> Option<crate::models::media::MediaInfo> {
689    let info_start = std::time::Instant::now();
690    let info = match &ctx.media_info {
691        Some(i) => {
692            tracing::info!(
693                "[queue] info for {} from cache/pre-fetched in {:?}",
694                item_id,
695                info_start.elapsed()
696            );
697            i.clone()
698        }
699        None => {
700            tracing::debug!(
701                "[perf] spawn_download_inner {}: media_info is None, fetching info",
702                item_id
703            );
704            if let Some(r) = reporter {
705                r.on_progress(
706                    item_id,
707                    crate::core::events::QueueItemProgress {
708                        id: item_id,
709                        title: ctx.url.clone(),
710                        platform: ctx.platform_name.clone(),
711                        percent: 0.0,
712                        speed_bytes_per_sec: 0.0,
713                        downloaded_bytes: 0,
714                        total_bytes: None,
715                        phase: "fetching_info".to_string(),
716                    },
717                );
718            }
719
720            let info_result = tokio::time::timeout(
721                std::time::Duration::from_secs(60),
722                fetch_and_cache_info(&ctx.url, &*ctx.downloader, &ctx.platform_name),
723            )
724            .await;
725
726            match info_result {
727                Ok(Ok(i)) => i,
728                Ok(Err(e)) => {
729                    let state = {
730                        let mut q = queue.lock().await;
731                        q.mark_complete(item_id, false, Some(e.to_string()), None, None);
732                        q.get_state()
733                    };
734                    emit_queue_state_from_state(reporter, state);
735                    tokio::spawn(try_start_next(queue.clone()));
736                    return None;
737                }
738                Err(_) => {
739                    tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
740                    let state = {
741                        let mut q = queue.lock().await;
742                        q.mark_complete(
743                            item_id,
744                            false,
745                            Some("Timed out fetching video info".to_string()),
746                            None,
747                            None,
748                        );
749                        q.get_state()
750                    };
751                    emit_queue_state_from_state(reporter, state);
752                    tokio::spawn(try_start_next(queue.clone()));
753                    return None;
754                }
755            }
756        }
757    };
758    tracing::info!(
759        "[queue] info fetch for {} took {:?}",
760        item_id,
761        info_start.elapsed()
762    );
763
764    let state = {
765        let mut q = queue.lock().await;
766        if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
767            item.title = info.title.clone();
768            item.total_bytes = info.file_size_bytes;
769            let fc = if info.media_type == crate::models::media::MediaType::Carousel
770                || info.media_type == crate::models::media::MediaType::Playlist
771            {
772                info.available_qualities.len() as u32
773            } else {
774                1
775            };
776            item.file_count = Some(fc);
777            item.media_info = Some(info.clone());
778        }
779        q.get_state()
780    };
781    emit_queue_state_from_state(reporter, state);
782
783    if let Some(r) = reporter {
784        r.on_progress(
785            item_id,
786            crate::core::events::QueueItemProgress {
787                id: item_id,
788                title: info.title.clone(),
789                platform: ctx.platform_name.clone(),
790                percent: 0.5,
791                speed_bytes_per_sec: 0.0,
792                downloaded_bytes: 0,
793                total_bytes: info.file_size_bytes,
794                phase: "starting".to_string(),
795            },
796        );
797    }
798
799    Some(info)
800}
801
802fn build_download_options(
803    ctx: &DownloadContext,
804) -> (
805    crate::models::media::DownloadOptions,
806    std::sync::Arc<tokio::sync::Mutex<Option<usize>>>,
807) {
808    let settings = crate::models::settings::AppSettings::load_from_disk();
809    let tmpl = settings.download.filename_template.clone();
810    let mut final_output_dir = std::path::PathBuf::from(&ctx.output_dir);
811    if settings.download.organize_by_platform {
812        final_output_dir = final_output_dir.join(&ctx.platform_name);
813    }
814    let torrent_id_slot = std::sync::Arc::new(tokio::sync::Mutex::new(None));
815    let opts = crate::models::media::DownloadOptions {
816        quality: ctx
817            .quality
818            .clone()
819            .or_else(|| Some(settings.download.video_quality.clone())),
820        output_dir: final_output_dir,
821        filename_template: Some(tmpl),
822        download_subtitles: ctx
823            .download_subtitles
824            .unwrap_or(settings.download.download_subtitles),
825        include_auto_subtitles: settings.download.include_auto_subtitles,
826        download_mode: ctx.download_mode.clone(),
827        format_id: ctx.format_id.clone(),
828        referer: ctx.referer.clone(),
829        extra_headers: ctx.extra_headers.clone(),
830        page_url: ctx.page_url.clone(),
831        user_agent: ctx.user_agent.clone(),
832        cancel_token: ctx.cancel_token.clone(),
833        concurrent_fragments: settings.advanced.concurrent_fragments,
834        ytdlp_path: ctx.ytdlp_path.clone(),
835        torrent_listen_port: Some(settings.advanced.torrent_listen_port),
836        torrent_id_slot: Some(torrent_id_slot.clone()),
837    };
838    (opts, torrent_id_slot)
839}
840
841async fn handle_download_result(
842    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
843    item_id: u64,
844    ctx: DownloadContext,
845    info: crate::models::media::MediaInfo,
846    reporter: Option<crate::core::traits::SharedReporter>,
847    result: anyhow::Result<crate::models::media::DownloadResult>,
848) {
849    let settings = crate::models::settings::AppSettings::load_from_disk();
850    match result {
851        Ok(dl) => {
852            if settings.download.embed_metadata
853                && ctx.platform_name != "magnet"
854                && crate::core::ffmpeg::is_ffmpeg_available().await
855            {
856                let metadata = crate::core::ffmpeg::MetadataEmbed {
857                    title: Some(info.title.clone()),
858                    artist: Some(info.author.clone()),
859                    thumbnail_url: info.thumbnail_url.clone(),
860                    ..Default::default()
861                };
862                if let Err(e) = crate::core::ffmpeg::embed_metadata(
863                    &dl.file_path,
864                    &metadata,
865                    settings.download.embed_thumbnail,
866                    shared_http_client(),
867                )
868                .await
869                {
870                    tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
871                }
872            }
873
874            if ctx.from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
875                #[cfg(not(target_os = "android"))]
876                {
877                    match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
878                        Ok(()) => {
879                            tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
880                        }
881                        Err(e) => {
882                            tracing::warn!("[clipboard] failed to copy file: {}", e);
883                        }
884                    }
885                }
886            }
887
888            let state = {
889                let mut q = queue.lock().await;
890                if ctx.platform_name == "magnet" && dl.torrent_id.is_some() {
891                    q.mark_seeding(
892                        item_id,
893                        Some(dl.file_path.to_string_lossy().to_string()),
894                        Some(dl.file_size_bytes),
895                        dl.torrent_id,
896                    );
897                } else {
898                    q.mark_complete(
899                        item_id,
900                        true,
901                        None,
902                        Some(dl.file_path.to_string_lossy().to_string()),
903                        Some(dl.file_size_bytes),
904                    );
905                }
906                q.get_state()
907            };
908            if let Some(r) = &reporter {
909                r.on_complete(
910                    item_id,
911                    Some(dl.file_path.to_string_lossy().to_string()),
912                    Some(dl.file_size_bytes),
913                );
914            }
915            emit_queue_state_from_state(&reporter, state);
916        }
917        Err(e) => {
918            let raw_err = format!("{}", e);
919            let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
920            let user_msg = if category != "unknown" {
921                format!("{} ({})", hint, raw_err)
922            } else {
923                raw_err.clone()
924            };
925            tracing::error!(
926                "Download error '{}' [{}]: {}",
927                ctx.platform_name,
928                category,
929                raw_err
930            );
931            let state = {
932                let mut q = queue.lock().await;
933                q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
934                q.get_state()
935            };
936            if let Some(r) = &reporter {
937                r.on_error(item_id, user_msg);
938            }
939            emit_queue_state_from_state(&reporter, state);
940        }
941    }
942}
943
944async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
945    tracing::info!("[queue] download {} started", item_id);
946
947    let reporter = { queue.lock().await.reporter.clone() };
948
949    if let Some(r) = &reporter {
950        r.on_progress(
951            item_id,
952            crate::core::events::QueueItemProgress {
953                id: item_id,
954                title: "".to_string(),
955                platform: "".to_string(),
956                percent: 0.0,
957                speed_bytes_per_sec: 0.0,
958                downloaded_bytes: 0,
959                total_bytes: None,
960                phase: "preparing".to_string(),
961            },
962        );
963    }
964
965    let ctx = match extract_download_context(&queue, item_id).await {
966        Some(c) => c,
967        None => return,
968    };
969
970    let info = match prepare_media_info(&queue, item_id, &ctx, &reporter).await {
971        Some(i) => i,
972        None => return,
973    };
974
975    let (opts, torrent_id_slot) = build_download_options(&ctx);
976
977    let total_bytes = info.file_size_bytes;
978    let item_title = info.title.clone();
979    let item_platform = ctx.platform_name.clone();
980    let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(32);
981
982    let reporter_progress = reporter.clone();
983    let queue_progress = queue.clone();
984    let torrent_id_slot_progress = torrent_id_slot.clone();
985    let progress_forwarder = tokio::spawn(async move {
986        let mut last_bytes: u64 = 0;
987        let mut last_time = std::time::Instant::now();
988        let mut throttle = ProgressThrottle::new(250);
989        let mut current_speed: f64 = 0.0;
990
991        while let Some(percent) = rx.recv().await {
992            if !throttle.should_emit() && percent < 100.0 {
993                continue;
994            }
995
996            let now = std::time::Instant::now();
997            let clamped = percent.max(0.0);
998            let downloaded_bytes = total_bytes
999                .map(|total| (clamped / 100.0 * total as f64) as u64)
1000                .unwrap_or(0);
1001
1002            if total_bytes.is_some() && downloaded_bytes > last_bytes {
1003                let dt = now.duration_since(last_time).as_secs_f64();
1004                if dt > 0.1 {
1005                    let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
1006                    current_speed = if current_speed > 0.0 {
1007                        current_speed * 0.7 + instant_speed * 0.3
1008                    } else {
1009                        instant_speed
1010                    };
1011                }
1012            }
1013
1014            last_bytes = downloaded_bytes;
1015            last_time = now;
1016
1017            let phase = match percent {
1018                p if p < -1.5 => "connecting",
1019                p if p < -0.5 => "starting",
1020                p if p > 0.0 => "downloading",
1021                _ => "starting",
1022            };
1023
1024            {
1025                let mut q = queue_progress.lock().await;
1026                let tid = { *torrent_id_slot_progress.lock().await };
1027                q.update_progress(
1028                    item_id,
1029                    clamped,
1030                    current_speed,
1031                    downloaded_bytes,
1032                    total_bytes,
1033                    tid,
1034                );
1035            }
1036
1037            if let Some(r) = &reporter_progress {
1038                r.on_progress(
1039                    item_id,
1040                    crate::core::events::QueueItemProgress {
1041                        id: item_id,
1042                        title: item_title.clone(),
1043                        platform: item_platform.clone(),
1044                        percent: clamped,
1045                        speed_bytes_per_sec: current_speed,
1046                        downloaded_bytes,
1047                        total_bytes,
1048                        phase: phase.to_string(),
1049                    },
1050                );
1051            }
1052        }
1053    });
1054
1055    if let Some(ua) = opts.user_agent.clone() {
1056        crate::core::ytdlp::register_ext_user_agent(ctx.url.clone(), ua);
1057    }
1058    if let Some(hdrs) = opts.extra_headers.clone() {
1059        crate::core::ytdlp::register_ext_headers(ctx.url.clone(), hdrs);
1060    }
1061
1062    let dl_start = std::time::Instant::now();
1063    let dl_future = async {
1064        tokio::select! {
1065            r = ctx.downloader.download(&info, &opts, tx) => r,
1066            _ = ctx.cancel_token.cancelled() => {
1067                Err(anyhow::anyhow!("Download cancelado"))
1068            }
1069        }
1070    };
1071    let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
1072        .scope(item_id, dl_future)
1073        .await;
1074    crate::core::ytdlp::clear_ext_user_agent(&ctx.url);
1075    crate::core::ytdlp::clear_ext_headers(&ctx.url);
1076    tracing::info!(
1077        "[queue] download {} completed in {:?}",
1078        item_id,
1079        dl_start.elapsed()
1080    );
1081
1082    let _ = progress_forwarder.await;
1083
1084    let was_paused = {
1085        let q = queue.lock().await;
1086        q.items
1087            .iter()
1088            .find(|i| i.id == item_id)
1089            .map(|i| i.status == QueueStatus::Paused)
1090            .unwrap_or(false)
1091    };
1092
1093    if was_paused {
1094        let state = {
1095            let q = queue.lock().await;
1096            q.get_state()
1097        };
1098        emit_queue_state_from_state(&reporter, state);
1099        tokio::spawn(try_start_next(queue));
1100        return;
1101    }
1102
1103    handle_download_result(queue.clone(), item_id, ctx, info, reporter, result).await;
1104
1105    tokio::spawn(try_start_next(queue));
1106}
1107
1108pub async fn fetch_and_cache_info(
1109    url: &str,
1110    downloader: &dyn PlatformDownloader,
1111    platform: &str,
1112) -> anyhow::Result<MediaInfo> {
1113    {
1114        let cache = info_cache().lock().await;
1115        if let Some(entry) = cache.get(url) {
1116            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1117                tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1118                return Ok(entry.info.clone());
1119            }
1120        }
1121    }
1122
1123    let url_lock = {
1124        let mut map = in_flight_map().lock().await;
1125        map.entry(url.to_string())
1126            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1127            .clone()
1128    };
1129    let _guard = url_lock.lock().await;
1130
1131    {
1132        let cache = info_cache().lock().await;
1133        if let Some(entry) = cache.get(url) {
1134            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1135                tracing::debug!(
1136                    "[perf] fetch_and_cache_info: dedup cache hit for {}",
1137                    platform
1138                );
1139                return Ok(entry.info.clone());
1140            }
1141        }
1142    }
1143
1144    tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1145    let mut info = downloader.get_media_info(url).await?;
1146
1147    if is_generic_title(&info.title) {
1148        let name = crate::core::random_names::get_random_name();
1149        info.title = format!("video_{}", name);
1150    }
1151
1152    let mut cache = info_cache().lock().await;
1153    cache.insert(
1154        url.to_string(),
1155        CachedInfo {
1156            info: info.clone(),
1157            cached_at: std::time::Instant::now(),
1158        },
1159    );
1160    if cache.len() > 50 {
1161        cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1162    }
1163    Ok(info)
1164}
1165
1166pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1167    let cache = info_cache().lock().await;
1168    cache
1169        .get(url)
1170        .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1171        .map(|entry| entry.info.clone())
1172}
1173
1174pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1175    prefetch_info_with_emit(url, downloader, platform, None).await;
1176}
1177
1178pub async fn prefetch_info_with_emit(
1179    url: &str,
1180    downloader: &dyn PlatformDownloader,
1181    platform: &str,
1182    reporter: Option<SharedReporter>,
1183) {
1184    let _timer_start = std::time::Instant::now();
1185    tracing::debug!("[perf] prefetch_info: started");
1186    match fetch_and_cache_info(url, downloader, platform).await {
1187        Ok(info) => {
1188            tracing::debug!(
1189                "[perf] prefetch_info: completed in {:?} — {}",
1190                _timer_start.elapsed(),
1191                info.title
1192            );
1193            if let Some(r) = reporter {
1194                r.on_media_preview(
1195                    url.to_string(),
1196                    info.title.clone(),
1197                    info.author.clone(),
1198                    info.thumbnail_url.clone(),
1199                    info.duration_seconds,
1200                );
1201            }
1202        }
1203        Err(e) => tracing::warn!(
1204            "[perf] prefetch_info: failed in {:?} — {}",
1205            _timer_start.elapsed(),
1206            e
1207        ),
1208    }
1209}
1210
1211pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1212    let _timer_start = std::time::Instant::now();
1213    let (next_ids, stagger, state_to_emit, reporter) = {
1214        let mut q = queue.lock().await;
1215        let ids = q.next_queued_ids();
1216        for nid in &ids {
1217            q.mark_active(*nid);
1218        }
1219        let state = if !ids.is_empty() {
1220            Some(q.get_state())
1221        } else {
1222            None
1223        };
1224        (ids, q.stagger_delay_ms, state, q.reporter.clone())
1225    };
1226
1227    if let Some(state) = state_to_emit {
1228        emit_queue_state_from_state(&reporter, state);
1229    }
1230
1231    let batch_size = next_ids.len();
1232    for (i, nid) in next_ids.into_iter().enumerate() {
1233        if let Some(r) = &reporter {
1234            r.on_progress(
1235                nid,
1236                crate::core::events::QueueItemProgress {
1237                    id: nid,
1238                    title: String::new(),
1239                    platform: String::new(),
1240                    percent: 0.0,
1241                    speed_bytes_per_sec: 0.0,
1242                    downloaded_bytes: 0,
1243                    total_bytes: None,
1244                    phase: "queued_starting".to_string(),
1245                },
1246            );
1247        }
1248
1249        if i > 0 {
1250            let item_platform = {
1251                let q = queue.lock().await;
1252                q.items
1253                    .iter()
1254                    .find(|item| item.id == nid)
1255                    .map(|item| item.platform.clone())
1256            };
1257            let delay_ms = if item_platform.as_deref() == Some("youtube") {
1258                2000
1259            } else if batch_size > 3 {
1260                stagger.max(1000)
1261            } else {
1262                stagger
1263            };
1264            if delay_ms > 0 {
1265                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1266            }
1267        }
1268        let queue_c = queue.clone();
1269        tokio::spawn(async move {
1270            tokio::spawn(spawn_download(queue_c, nid));
1271        });
1272    }
1273    tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1274}
1275
1276fn is_generic_title(title: &str) -> bool {
1277    let t = title.to_lowercase();
1278    let t = t.trim();
1279    t.is_empty()
1280        || t == "video"
1281        || t == "media"
1282        || t == "untitled"
1283        || t == "unknown"
1284        || t.starts_with("video [video]")
1285        || t.starts_with("media [media]")
1286}