Skip to main content

mangofetch_core/core/manager/
queue.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio_util::sync::CancellationToken;
10
11use crate::core::traits::SharedReporter;
12use crate::models::media::MediaInfo;
13use crate::models::queue::{QueueItemInfo, QueueStatus};
14use crate::platforms::traits::PlatformDownloader;
15
16fn shared_http_client() -> &'static reqwest::Client {
17    static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
18    CLIENT.get_or_init(|| {
19        crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
20            .build()
21            .unwrap_or_default()
22    })
23}
24
25struct CachedInfo {
26    info: MediaInfo,
27    cached_at: std::time::Instant,
28}
29
30static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
31
32fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
33    INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
34}
35
36const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
37
38static IN_FLIGHT_FETCHES: OnceLock<
39    tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
40> = OnceLock::new();
41
42fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
43    IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct MediaPreviewEvent {
48    pub url: String,
49    pub title: String,
50    pub author: String,
51    pub thumbnail_url: Option<String>,
52    pub duration_seconds: Option<f64>,
53}
54
55pub struct QueueItem {
56    pub id: u64,
57    pub url: String,
58    pub platform: String,
59    pub title: String,
60    pub status: QueueStatus,
61    pub cancel_token: CancellationToken,
62    pub output_dir: String,
63    pub download_mode: Option<String>,
64    pub quality: Option<String>,
65    pub video_format: Option<String>,
66    pub audio_format: Option<String>,
67    pub audio_quality: Option<String>,
68    pub format_id: Option<String>,
69    pub referer: Option<String>,
70    pub extra_headers: Option<std::collections::HashMap<String, String>>,
71    pub page_url: Option<String>,
72    pub user_agent: Option<String>,
73    pub percent: f64,
74    pub speed_bytes_per_sec: f64,
75    pub downloaded_bytes: u64,
76    pub total_bytes: Option<u64>,
77    pub file_path: Option<String>,
78    pub file_size_bytes: Option<u64>,
79    pub file_count: Option<u32>,
80    pub media_info: Option<MediaInfo>,
81    pub downloader: Arc<dyn PlatformDownloader>,
82    pub ytdlp_path: Option<PathBuf>,
83    pub from_hotkey: bool,
84    pub torrent_id: Option<usize>,
85    pub download_subtitles: Option<bool>,
86    pub phase: String,
87}
88
89impl QueueItem {
90    pub fn to_info(&self) -> QueueItemInfo {
91        QueueItemInfo {
92            id: self.id,
93            url: self.url.clone(),
94            platform: self.platform.clone(),
95            title: self.title.clone(),
96            status: self.status.clone(),
97            percent: self.percent,
98            speed_bytes_per_sec: self.speed_bytes_per_sec,
99            downloaded_bytes: self.downloaded_bytes,
100            total_bytes: self.total_bytes,
101            phase: self.phase.clone(),
102            file_path: self.file_path.clone(),
103            file_size_bytes: self.file_size_bytes,
104            file_count: self.file_count,
105            thumbnail_url: self
106                .media_info
107                .as_ref()
108                .and_then(|m| m.thumbnail_url.clone()),
109        }
110    }
111}
112
113pub struct DownloadQueue {
114    pub items: Vec<QueueItem>,
115    pub max_concurrent: u32,
116    pub stagger_delay_ms: u64,
117    pub reporter: Option<SharedReporter>,
118}
119
120impl DownloadQueue {
121    pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
122        Self {
123            items: Vec::new(),
124            max_concurrent,
125            stagger_delay_ms: 150,
126            reporter,
127        }
128    }
129
130    pub fn set_reporter(&mut self, reporter: SharedReporter) {
131        self.reporter = Some(reporter);
132    }
133
134    pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
135        let recovery_items = crate::core::manager::recovery::list();
136        for item in recovery_items {
137            let downloader = registry
138                .find_platform(&item.url)
139                .or_else(|| registry.find_by_name(&item.platform));
140
141            if let Some(dl) = downloader {
142                let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
143                    100.0
144                } else {
145                    0.0
146                };
147
148                let status = match item.status {
149                    QueueStatus::Active | QueueStatus::Seeding => QueueStatus::Paused,
150                    other => other,
151                };
152
153                let q_item = QueueItem {
154                    id: item.id,
155                    url: item.url,
156                    platform: item.platform,
157                    title: item.title,
158                    status,
159                    cancel_token: CancellationToken::new(),
160                    output_dir: item.output_dir,
161                    download_mode: item.download_mode,
162                    quality: item.quality,
163                    video_format: None,
164                    audio_format: None,
165                    audio_quality: None,
166                    format_id: item.format_id,
167                    referer: item.referer,
168                    extra_headers: None,
169                    page_url: None,
170                    user_agent: None,
171                    percent,
172                    speed_bytes_per_sec: 0.0,
173                    downloaded_bytes: 0,
174                    total_bytes: item.file_size_bytes,
175                    file_path: item.file_path,
176                    file_size_bytes: item.file_size_bytes,
177                    file_count: None,
178                    media_info: None,
179                    downloader: dl,
180                    ytdlp_path: None,
181                    from_hotkey: false,
182                    torrent_id: None,
183                    download_subtitles: None,
184                    phase: "Restored".to_string(),
185                };
186                self.items.push(q_item);
187            }
188        }
189    }
190
191    fn sync_recovery(item: &QueueItem) {
192        crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
193            id: item.id,
194            url: item.url.clone(),
195            title: item.title.clone(),
196            platform: item.platform.clone(),
197            output_dir: item.output_dir.clone(),
198            status: item.status.clone(),
199            download_mode: item.download_mode.clone(),
200            quality: item.quality.clone(),
201            format_id: item.format_id.clone(),
202            referer: item.referer.clone(),
203            file_path: item.file_path.clone(),
204            file_size_bytes: item.file_size_bytes,
205        });
206    }
207
208    #[allow(clippy::too_many_arguments)]
209    pub fn enqueue(
210        &mut self,
211        id: u64,
212        url: String,
213        platform: String,
214        title: String,
215        output_dir: String,
216        download_mode: Option<String>,
217        quality: Option<String>,
218        video_format: Option<String>,
219        audio_format: Option<String>,
220        audio_quality: Option<String>,
221        download_subtitles: Option<bool>,
222        format_id: Option<String>,
223        referer: Option<String>,
224        extra_headers: Option<std::collections::HashMap<String, String>>,
225        page_url: Option<String>,
226        user_agent: Option<String>,
227        media_info: Option<MediaInfo>,
228        total_bytes: Option<u64>,
229        file_count: Option<u32>,
230        downloader: Arc<dyn PlatformDownloader>,
231        ytdlp_path: Option<PathBuf>,
232        from_hotkey: bool,
233    ) {
234        let item = QueueItem {
235            id,
236            url,
237            platform,
238            title,
239            status: QueueStatus::Queued,
240            cancel_token: CancellationToken::new(),
241            output_dir,
242            download_mode,
243            quality,
244            video_format,
245            audio_format,
246            audio_quality,
247            format_id,
248            referer,
249            extra_headers,
250            page_url,
251            user_agent,
252            percent: 0.0,
253            speed_bytes_per_sec: 0.0,
254            downloaded_bytes: 0,
255            total_bytes,
256            file_path: None,
257            file_size_bytes: None,
258            file_count,
259            media_info,
260            downloader,
261            ytdlp_path,
262            from_hotkey,
263            torrent_id: None,
264            download_subtitles,
265            phase: "Queued".to_string(),
266        };
267        self.items.push(item);
268        Self::sync_recovery(self.items.last().unwrap());
269    }
270
271    pub fn active_count(&self) -> u32 {
272        self.items
273            .iter()
274            .filter(|i| i.status == QueueStatus::Active)
275            .count() as u32
276    }
277
278    pub fn next_queued_ids(&self) -> Vec<u64> {
279        let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
280        self.items
281            .iter()
282            .filter(|i| i.status == QueueStatus::Queued)
283            .take(slots)
284            .map(|i| i.id)
285            .collect()
286    }
287
288    pub fn mark_active(&mut self, id: u64) {
289        let item = self.items.iter_mut().find(|i| i.id == id);
290        if let Some(item) = item {
291            item.status = QueueStatus::Active;
292            item.cancel_token = CancellationToken::new();
293            Self::sync_recovery(item);
294        }
295    }
296
297    pub fn mark_complete(
298        &mut self,
299        id: u64,
300        success: bool,
301        error: Option<String>,
302        file_path: Option<String>,
303        file_size_bytes: Option<u64>,
304    ) {
305        let item = self.items.iter_mut().find(|i| i.id == id);
306        if let Some(item) = item {
307            if success {
308                item.status = QueueStatus::Complete { success: true };
309                item.percent = 100.0;
310            } else {
311                item.status = QueueStatus::Error {
312                    message: error.unwrap_or_default(),
313                };
314            }
315            item.file_path = file_path;
316            item.file_size_bytes = file_size_bytes;
317            item.speed_bytes_per_sec = 0.0;
318            Self::sync_recovery(item);
319        }
320    }
321
322    pub fn mark_seeding(
323        &mut self,
324        id: u64,
325        file_path: Option<String>,
326        file_size_bytes: Option<u64>,
327        torrent_id: Option<usize>,
328    ) {
329        let item = self.items.iter_mut().find(|i| i.id == id);
330        if let Some(item) = item {
331            item.status = QueueStatus::Seeding;
332            item.percent = 100.0;
333            item.file_path = file_path;
334            item.file_size_bytes = file_size_bytes;
335            item.speed_bytes_per_sec = 0.0;
336            item.torrent_id = torrent_id;
337            Self::sync_recovery(item);
338        }
339    }
340
341    pub fn update_progress(
342        &mut self,
343        id: u64,
344        percent: f64,
345        speed: f64,
346        downloaded: u64,
347        total: Option<u64>,
348        torrent_id: Option<usize>,
349    ) {
350        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
351            item.percent = percent;
352            item.speed_bytes_per_sec = speed;
353            item.downloaded_bytes = downloaded;
354            if let Some(t) = total {
355                item.total_bytes = Some(t);
356            }
357            if torrent_id.is_some() && item.torrent_id.is_none() {
358                item.torrent_id = torrent_id;
359            }
360        }
361    }
362
363    pub fn pause(&mut self, id: u64) -> bool {
364        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
365            if item.status == QueueStatus::Active {
366                if item.platform != "magnet" {
367                    item.cancel_token.cancel();
368                }
369                item.status = QueueStatus::Paused;
370                item.speed_bytes_per_sec = 0.0;
371                Self::sync_recovery(item);
372                return true;
373            }
374        }
375        false
376    }
377
378    pub fn resume(&mut self, id: u64) -> bool {
379        let item = self.items.iter_mut().find(|i| i.id == id);
380        if let Some(item) = item {
381            if item.status == QueueStatus::Paused {
382                if item.platform == "magnet" {
383                    item.status = QueueStatus::Active;
384                } else {
385                    item.status = QueueStatus::Queued;
386                    item.cancel_token = CancellationToken::new();
387                }
388                Self::sync_recovery(item);
389                return true;
390            }
391        }
392        false
393    }
394
395    pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
396        let result = self.cancel_inner(id);
397        if result.0 {
398            crate::core::manager::recovery::remove(id);
399        }
400        result
401    }
402
403    fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
404        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
405            match &item.status {
406                QueueStatus::Active => {
407                    item.cancel_token.cancel();
408                    item.status = QueueStatus::Error {
409                        message: "Cancelled".to_string(),
410                    };
411                    item.speed_bytes_per_sec = 0.0;
412                    Self::sync_recovery(item);
413                    return (true, None);
414                }
415                QueueStatus::Seeding => {
416                    let tid = item.torrent_id;
417                    item.status = QueueStatus::Error {
418                        message: "Cancelled".to_string(),
419                    };
420                    item.speed_bytes_per_sec = 0.0;
421                    Self::sync_recovery(item);
422                    return (true, tid);
423                }
424                QueueStatus::Paused => {
425                    item.cancel_token.cancel();
426                    let tid = if item.platform == "magnet" {
427                        item.torrent_id
428                    } else {
429                        None
430                    };
431                    item.status = QueueStatus::Error {
432                        message: "Cancelled".to_string(),
433                    };
434                    item.speed_bytes_per_sec = 0.0;
435                    Self::sync_recovery(item);
436                    return (true, tid);
437                }
438                QueueStatus::Queued => {
439                    item.status = QueueStatus::Error {
440                        message: "Cancelled".to_string(),
441                    };
442                    Self::sync_recovery(item);
443                    return (true, None);
444                }
445                _ => {}
446            }
447        }
448        (false, None)
449    }
450
451    pub fn retry(&mut self, id: u64) -> bool {
452        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
453            if matches!(item.status, QueueStatus::Error { .. }) {
454                item.status = QueueStatus::Queued;
455                item.cancel_token = CancellationToken::new();
456                item.percent = 0.0;
457                item.speed_bytes_per_sec = 0.0;
458                item.downloaded_bytes = 0;
459                item.file_path = None;
460                item.file_size_bytes = None;
461                Self::sync_recovery(item);
462                return true;
463            }
464        }
465        false
466    }
467
468    pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
469        let result = self.remove_inner(id);
470        if result.is_some() {
471            crate::core::manager::recovery::remove(id);
472        }
473        result
474    }
475
476    fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
477        if let Some(pos) = self.items.iter().position(|i| i.id == id) {
478            let item = &self.items[pos];
479            if item.status == QueueStatus::Active {
480                item.cancel_token.cancel();
481            }
482            if item.status == QueueStatus::Paused && item.platform == "magnet" {
483                item.cancel_token.cancel();
484            }
485            let torrent_id = if item.status == QueueStatus::Seeding
486                || (item.status == QueueStatus::Paused && item.platform == "magnet")
487            {
488                item.torrent_id
489            } else {
490                None
491            };
492            self.items.remove(pos);
493            return Some(torrent_id);
494        }
495        None
496    }
497
498    pub fn clear_finished(&mut self) {
499        let to_remove: Vec<u64> = self
500            .items
501            .iter()
502            .filter(|i| {
503                matches!(
504                    i.status,
505                    QueueStatus::Complete { .. } | QueueStatus::Error { .. }
506                )
507            })
508            .map(|i| i.id)
509            .collect();
510        for id in &to_remove {
511            crate::core::manager::recovery::remove(*id);
512        }
513        self.items.retain(|i| {
514            !matches!(
515                i.status,
516                QueueStatus::Complete { .. } | QueueStatus::Error { .. }
517            )
518        });
519    }
520
521    pub fn get_state(&self) -> Vec<QueueItemInfo> {
522        self.items.iter().map(|i| i.to_info()).collect()
523    }
524
525    pub fn has_url(&self, url: &str) -> bool {
526        self.items.iter().any(|i| {
527            i.url == url
528                && matches!(
529                    i.status,
530                    QueueStatus::Queued
531                        | QueueStatus::Active
532                        | QueueStatus::Paused
533                        | QueueStatus::Seeding
534                )
535        })
536    }
537}
538
539pub struct ProgressThrottle {
540    last_emit: std::time::Instant,
541    min_interval: std::time::Duration,
542}
543
544impl ProgressThrottle {
545    pub fn new(min_interval_ms: u64) -> Self {
546        Self {
547            last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
548            min_interval: std::time::Duration::from_millis(min_interval_ms),
549        }
550    }
551
552    pub fn should_emit(&mut self) -> bool {
553        let now = std::time::Instant::now();
554        if now.duration_since(self.last_emit) >= self.min_interval {
555            self.last_emit = now;
556            true
557        } else {
558            false
559        }
560    }
561}
562
563pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
564    let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
565    if n.is_multiple_of(10) {
566        tracing::debug!("[perf] emit_queue_state called {} times", n);
567    }
568    if let Some(reporter) = reporter {
569        reporter.on_queue_update(state);
570    }
571}
572
573pub fn emit_queue_state(queue: &DownloadQueue) {
574    let state = queue.get_state();
575    emit_queue_state_from_state(&queue.reporter, state);
576}
577
578/// RAII guard that ensures an Active queue item never leaks a slot.
579struct ActiveJobSlot {
580    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
581    item_id: u64,
582    armed: bool,
583}
584
585impl ActiveJobSlot {
586    fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
587        Self {
588            queue,
589            item_id,
590            armed: true,
591        }
592    }
593
594    fn disarm(mut self) {
595        self.armed = false;
596    }
597}
598
599impl Drop for ActiveJobSlot {
600    fn drop(&mut self) {
601        if !self.armed {
602            return;
603        }
604        let queue = self.queue.clone();
605        let item_id = self.item_id;
606        tokio::spawn(async move {
607            let state = {
608                let mut q = queue.lock().await;
609                let still_active = q
610                    .items
611                    .iter()
612                    .find(|i| i.id == item_id)
613                    .map(|i| i.status == QueueStatus::Active)
614                    .unwrap_or(false);
615                if !still_active {
616                    return;
617                }
618                tracing::warn!(
619                    "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
620                    item_id
621                );
622                q.mark_complete(
623                    item_id,
624                    false,
625                    Some("Download interrupted".to_string()),
626                    None,
627                    None,
628                );
629                q.get_state()
630            };
631            let reporter = { queue.lock().await.reporter.clone() };
632            emit_queue_state_from_state(&reporter, state);
633            tokio::spawn(try_start_next(queue));
634        });
635    }
636}
637
638pub fn spawn_download(
639    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
640    item_id: u64,
641) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
642    Box::pin(async move {
643        let _timer_start = std::time::Instant::now();
644        let slot = ActiveJobSlot::new(queue.clone(), item_id);
645        tokio::spawn(spawn_download_inner(queue.clone(), item_id));
646        slot.disarm();
647        tracing::debug!(
648            "[perf] spawn_download {} took {:?}",
649            item_id,
650            _timer_start.elapsed()
651        );
652    })
653}
654
655struct DownloadContext {
656    url: String,
657    output_dir: String,
658    download_mode: Option<String>,
659    quality: Option<String>,
660    video_format: Option<String>,
661    audio_format: Option<String>,
662    audio_quality: Option<String>,
663    download_subtitles: Option<bool>,
664    format_id: Option<String>,
665    referer: Option<String>,
666    extra_headers: Option<std::collections::HashMap<String, String>>,
667    page_url: Option<String>,
668    user_agent: Option<String>,
669    cancel_token: tokio_util::sync::CancellationToken,
670    media_info: Option<crate::models::media::MediaInfo>,
671    platform_name: String,
672    downloader: std::sync::Arc<dyn crate::platforms::traits::PlatformDownloader>,
673    ytdlp_path: Option<std::path::PathBuf>,
674    from_hotkey: bool,
675}
676
677async fn extract_download_context(
678    queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
679    item_id: u64,
680) -> Option<DownloadContext> {
681    let q = queue.lock().await;
682    let item = q.items.iter().find(|i| i.id == item_id)?;
683    Some(DownloadContext {
684        url: item.url.clone(),
685        output_dir: item.output_dir.clone(),
686        download_mode: item.download_mode.clone(),
687        quality: item.quality.clone(),
688        video_format: item.video_format.clone(),
689        audio_format: item.audio_format.clone(),
690        audio_quality: item.audio_quality.clone(),
691        download_subtitles: item.download_subtitles,
692        format_id: item.format_id.clone(),
693        referer: item.referer.clone(),
694        extra_headers: item.extra_headers.clone(),
695        page_url: item.page_url.clone(),
696        user_agent: item.user_agent.clone(),
697        cancel_token: item.cancel_token.clone(),
698        media_info: item.media_info.clone(),
699        platform_name: item.platform.clone(),
700        downloader: item.downloader.clone(),
701        ytdlp_path: item.ytdlp_path.clone(),
702        from_hotkey: item.from_hotkey,
703    })
704}
705
706async fn prepare_media_info(
707    queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
708    item_id: u64,
709    ctx: &DownloadContext,
710    reporter: &Option<crate::core::traits::SharedReporter>,
711) -> Option<crate::models::media::MediaInfo> {
712    let info_start = std::time::Instant::now();
713    let info = match &ctx.media_info {
714        Some(i) => {
715            tracing::info!(
716                "[queue] info for {} from cache/pre-fetched in {:?}",
717                item_id,
718                info_start.elapsed()
719            );
720            i.clone()
721        }
722        None => {
723            tracing::debug!(
724                "[perf] spawn_download_inner {}: media_info is None, fetching info",
725                item_id
726            );
727            if let Some(r) = reporter {
728                r.on_progress(
729                    item_id,
730                    crate::core::events::QueueItemProgress {
731                        id: item_id,
732                        title: ctx.url.clone(),
733                        platform: ctx.platform_name.clone(),
734                        percent: 0.0,
735                        speed_bytes_per_sec: 0.0,
736                        downloaded_bytes: 0,
737                        total_bytes: None,
738                        phase: "fetching_info".to_string(),
739                    },
740                );
741            }
742
743            let info_result = tokio::time::timeout(
744                std::time::Duration::from_secs(60),
745                fetch_and_cache_info(&ctx.url, &*ctx.downloader, &ctx.platform_name),
746            )
747            .await;
748
749            match info_result {
750                Ok(Ok(i)) => i,
751                Ok(Err(e)) => {
752                    let state = {
753                        let mut q = queue.lock().await;
754                        q.mark_complete(item_id, false, Some(e.to_string()), None, None);
755                        q.get_state()
756                    };
757                    emit_queue_state_from_state(reporter, state);
758                    tokio::spawn(try_start_next(queue.clone()));
759                    return None;
760                }
761                Err(_) => {
762                    tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
763                    let state = {
764                        let mut q = queue.lock().await;
765                        q.mark_complete(
766                            item_id,
767                            false,
768                            Some("Timed out fetching video info".to_string()),
769                            None,
770                            None,
771                        );
772                        q.get_state()
773                    };
774                    emit_queue_state_from_state(reporter, state);
775                    tokio::spawn(try_start_next(queue.clone()));
776                    return None;
777                }
778            }
779        }
780    };
781    tracing::info!(
782        "[queue] info fetch for {} took {:?}",
783        item_id,
784        info_start.elapsed()
785    );
786
787    let state = {
788        let mut q = queue.lock().await;
789        if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
790            item.title = info.title.clone();
791            item.total_bytes = info.file_size_bytes;
792            let fc = if info.media_type == crate::models::media::MediaType::Carousel
793                || info.media_type == crate::models::media::MediaType::Playlist
794            {
795                info.available_qualities.len() as u32
796            } else {
797                1
798            };
799            item.file_count = Some(fc);
800            item.media_info = Some(info.clone());
801        }
802        q.get_state()
803    };
804    emit_queue_state_from_state(reporter, state);
805
806    if let Some(r) = reporter {
807        r.on_progress(
808            item_id,
809            crate::core::events::QueueItemProgress {
810                id: item_id,
811                title: info.title.clone(),
812                platform: ctx.platform_name.clone(),
813                percent: 0.5,
814                speed_bytes_per_sec: 0.0,
815                downloaded_bytes: 0,
816                total_bytes: info.file_size_bytes,
817                phase: "starting".to_string(),
818            },
819        );
820    }
821
822    Some(info)
823}
824
825fn build_download_options(
826    ctx: &DownloadContext,
827) -> (
828    crate::models::media::DownloadOptions,
829    std::sync::Arc<tokio::sync::Mutex<Option<usize>>>,
830) {
831    let settings = crate::models::settings::AppSettings::load_from_disk();
832    let tmpl = settings.download.filename_template.clone();
833    let mut final_output_dir = std::path::PathBuf::from(&ctx.output_dir);
834    if settings.download.organize_by_platform {
835        final_output_dir = final_output_dir.join(&ctx.platform_name);
836    }
837    let torrent_id_slot = std::sync::Arc::new(tokio::sync::Mutex::new(None));
838    let opts = crate::models::media::DownloadOptions {
839        quality: ctx
840            .quality
841            .clone()
842            .or_else(|| Some(settings.download.video_quality.clone())),
843        video_format: ctx
844            .video_format
845            .clone()
846            .or_else(|| Some(settings.download.video_format.clone())),
847        audio_format: ctx
848            .audio_format
849            .clone()
850            .or_else(|| Some(settings.download.audio_format.clone())),
851        audio_quality: ctx
852            .audio_quality
853            .clone()
854            .or_else(|| Some(settings.download.audio_quality.clone())),
855        output_dir: final_output_dir,
856        filename_template: Some(tmpl),
857        download_subtitles: ctx
858            .download_subtitles
859            .unwrap_or(settings.download.download_subtitles),
860        include_auto_subtitles: settings.download.include_auto_subtitles,
861        download_mode: ctx.download_mode.clone(),
862        format_id: ctx.format_id.clone(),
863        referer: ctx.referer.clone(),
864        extra_headers: ctx.extra_headers.clone(),
865        page_url: ctx.page_url.clone(),
866        user_agent: ctx.user_agent.clone(),
867        cancel_token: ctx.cancel_token.clone(),
868        concurrent_fragments: settings.advanced.concurrent_fragments,
869        ytdlp_path: ctx.ytdlp_path.clone(),
870        torrent_listen_port: Some(settings.advanced.torrent_listen_port),
871        torrent_id_slot: Some(torrent_id_slot.clone()),
872    };
873    (opts, torrent_id_slot)
874}
875
876async fn handle_download_result(
877    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
878    item_id: u64,
879    ctx: DownloadContext,
880    info: crate::models::media::MediaInfo,
881    reporter: Option<crate::core::traits::SharedReporter>,
882    result: anyhow::Result<crate::models::media::DownloadResult>,
883) {
884    let settings = crate::models::settings::AppSettings::load_from_disk();
885    match result {
886        Ok(dl) => {
887            if settings.download.embed_metadata
888                && ctx.platform_name != "magnet"
889                && crate::core::ffmpeg::is_ffmpeg_available().await
890            {
891                let metadata = crate::core::ffmpeg::MetadataEmbed {
892                    title: Some(info.title.clone()),
893                    artist: Some(info.author.clone()),
894                    thumbnail_url: info.thumbnail_url.clone(),
895                    ..Default::default()
896                };
897                if let Err(e) = crate::core::ffmpeg::embed_metadata(
898                    &dl.file_path,
899                    &metadata,
900                    settings.download.embed_thumbnail,
901                    shared_http_client(),
902                )
903                .await
904                {
905                    tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
906                }
907            }
908
909            if ctx.from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
910                #[cfg(not(target_os = "android"))]
911                {
912                    match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
913                        Ok(()) => {
914                            tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
915                        }
916                        Err(e) => {
917                            tracing::warn!("[clipboard] failed to copy file: {}", e);
918                        }
919                    }
920                }
921            }
922
923            let state = {
924                let mut q = queue.lock().await;
925                if ctx.platform_name == "magnet" && dl.torrent_id.is_some() {
926                    q.mark_seeding(
927                        item_id,
928                        Some(dl.file_path.to_string_lossy().to_string()),
929                        Some(dl.file_size_bytes),
930                        dl.torrent_id,
931                    );
932                } else {
933                    q.mark_complete(
934                        item_id,
935                        true,
936                        None,
937                        Some(dl.file_path.to_string_lossy().to_string()),
938                        Some(dl.file_size_bytes),
939                    );
940                }
941                q.get_state()
942            };
943            if let Some(r) = &reporter {
944                r.on_complete(
945                    item_id,
946                    Some(dl.file_path.to_string_lossy().to_string()),
947                    Some(dl.file_size_bytes),
948                );
949            }
950            emit_queue_state_from_state(&reporter, state);
951        }
952        Err(e) => {
953            let raw_err = format!("{}", e);
954            let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
955            let user_msg = if category != "unknown" {
956                format!("{} ({})", hint, raw_err)
957            } else {
958                raw_err.clone()
959            };
960            tracing::error!(
961                "Download error '{}' [{}]: {}",
962                ctx.platform_name,
963                category,
964                raw_err
965            );
966            let state = {
967                let mut q = queue.lock().await;
968                q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
969                q.get_state()
970            };
971            if let Some(r) = &reporter {
972                r.on_error(item_id, user_msg);
973            }
974            emit_queue_state_from_state(&reporter, state);
975        }
976    }
977}
978
979async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
980    tracing::info!("[queue] download {} started", item_id);
981
982    let reporter = { queue.lock().await.reporter.clone() };
983
984    if let Some(r) = &reporter {
985        r.on_progress(
986            item_id,
987            crate::core::events::QueueItemProgress {
988                id: item_id,
989                title: "".to_string(),
990                platform: "".to_string(),
991                percent: 0.0,
992                speed_bytes_per_sec: 0.0,
993                downloaded_bytes: 0,
994                total_bytes: None,
995                phase: "preparing".to_string(),
996            },
997        );
998    }
999
1000    let ctx = match extract_download_context(&queue, item_id).await {
1001        Some(c) => c,
1002        None => return,
1003    };
1004
1005    let info = match prepare_media_info(&queue, item_id, &ctx, &reporter).await {
1006        Some(i) => i,
1007        None => return,
1008    };
1009
1010    let (opts, torrent_id_slot) = build_download_options(&ctx);
1011
1012    let total_bytes = info.file_size_bytes;
1013    let item_title = info.title.clone();
1014    let item_platform = ctx.platform_name.clone();
1015    let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(32);
1016
1017    let reporter_progress = reporter.clone();
1018    let queue_progress = queue.clone();
1019    let torrent_id_slot_progress = torrent_id_slot.clone();
1020    let progress_forwarder = tokio::spawn(async move {
1021        let mut last_bytes: u64 = 0;
1022        let mut last_time = std::time::Instant::now();
1023        let mut throttle = ProgressThrottle::new(250);
1024        let mut current_speed: f64 = 0.0;
1025
1026        while let Some(percent) = rx.recv().await {
1027            if !throttle.should_emit() && percent < 100.0 {
1028                continue;
1029            }
1030
1031            let now = std::time::Instant::now();
1032            let clamped = percent.max(0.0);
1033            let downloaded_bytes = total_bytes
1034                .map(|total| (clamped / 100.0 * total as f64) as u64)
1035                .unwrap_or(0);
1036
1037            if total_bytes.is_some() && downloaded_bytes > last_bytes {
1038                let dt = now.duration_since(last_time).as_secs_f64();
1039                if dt > 0.1 {
1040                    let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
1041                    current_speed = if current_speed > 0.0 {
1042                        current_speed * 0.7 + instant_speed * 0.3
1043                    } else {
1044                        instant_speed
1045                    };
1046                }
1047            }
1048
1049            last_bytes = downloaded_bytes;
1050            last_time = now;
1051
1052            let phase = match percent {
1053                p if p < -1.5 => "connecting",
1054                p if p < -0.5 => "starting",
1055                p if p > 0.0 => "downloading",
1056                _ => "starting",
1057            };
1058
1059            {
1060                let mut q = queue_progress.lock().await;
1061                let tid = { *torrent_id_slot_progress.lock().await };
1062                q.update_progress(
1063                    item_id,
1064                    clamped,
1065                    current_speed,
1066                    downloaded_bytes,
1067                    total_bytes,
1068                    tid,
1069                );
1070            }
1071
1072            if let Some(r) = &reporter_progress {
1073                r.on_progress(
1074                    item_id,
1075                    crate::core::events::QueueItemProgress {
1076                        id: item_id,
1077                        title: item_title.clone(),
1078                        platform: item_platform.clone(),
1079                        percent: clamped,
1080                        speed_bytes_per_sec: current_speed,
1081                        downloaded_bytes,
1082                        total_bytes,
1083                        phase: phase.to_string(),
1084                    },
1085                );
1086            }
1087        }
1088    });
1089
1090    if let Some(ua) = opts.user_agent.clone() {
1091        crate::core::ytdlp::register_ext_user_agent(ctx.url.clone(), ua);
1092    }
1093    if let Some(hdrs) = opts.extra_headers.clone() {
1094        crate::core::ytdlp::register_ext_headers(ctx.url.clone(), hdrs);
1095    }
1096
1097    // Ensure FFmpeg is installed if needed and try to surface progress to reporter (TUI/CLI)
1098    if !crate::core::ffmpeg::is_ffmpeg_available().await {
1099        let rep_ref = reporter.as_ref().map(|r| r.as_ref());
1100        match crate::core::dependencies::ensure_ffmpeg(rep_ref).await {
1101            Ok(path) => {
1102                tracing::info!("[ffmpeg] auto-installed to {:?}", path);
1103                crate::core::ffmpeg::reset_ffmpeg_available_cache();
1104            }
1105            Err(e) => tracing::warn!("[ffmpeg] auto-install failed: {}", e),
1106        }
1107    }
1108
1109    let dl_start = std::time::Instant::now();
1110    let dl_future = async {
1111        tokio::select! {
1112            r = ctx.downloader.download(&info, &opts, tx) => r,
1113            _ = ctx.cancel_token.cancelled() => {
1114                Err(anyhow::anyhow!("Download cancelled"))
1115            }
1116        }
1117    };
1118    let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
1119        .scope(item_id, dl_future)
1120        .await;
1121    crate::core::ytdlp::clear_ext_user_agent(&ctx.url);
1122    crate::core::ytdlp::clear_ext_headers(&ctx.url);
1123    tracing::info!(
1124        "[queue] download {} completed in {:?}",
1125        item_id,
1126        dl_start.elapsed()
1127    );
1128
1129    let _ = progress_forwarder.await;
1130
1131    let was_paused = {
1132        let q = queue.lock().await;
1133        q.items
1134            .iter()
1135            .find(|i| i.id == item_id)
1136            .map(|i| i.status == QueueStatus::Paused)
1137            .unwrap_or(false)
1138    };
1139
1140    if was_paused {
1141        let state = {
1142            let q = queue.lock().await;
1143            q.get_state()
1144        };
1145        emit_queue_state_from_state(&reporter, state);
1146        tokio::spawn(try_start_next(queue));
1147        return;
1148    }
1149
1150    handle_download_result(queue.clone(), item_id, ctx, info, reporter, result).await;
1151
1152    tokio::spawn(try_start_next(queue));
1153}
1154
1155pub async fn fetch_and_cache_info(
1156    url: &str,
1157    downloader: &dyn PlatformDownloader,
1158    platform: &str,
1159) -> anyhow::Result<MediaInfo> {
1160    {
1161        let cache = info_cache().lock().await;
1162        if let Some(entry) = cache.get(url) {
1163            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1164                tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1165                return Ok(entry.info.clone());
1166            }
1167        }
1168    }
1169
1170    let url_lock = {
1171        let mut map = in_flight_map().lock().await;
1172        map.entry(url.to_string())
1173            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1174            .clone()
1175    };
1176    let _guard = url_lock.lock().await;
1177
1178    {
1179        let cache = info_cache().lock().await;
1180        if let Some(entry) = cache.get(url) {
1181            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1182                tracing::debug!(
1183                    "[perf] fetch_and_cache_info: dedup cache hit for {}",
1184                    platform
1185                );
1186                return Ok(entry.info.clone());
1187            }
1188        }
1189    }
1190
1191    tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1192    let mut info = downloader.get_media_info(url).await?;
1193
1194    if is_generic_title(&info.title) {
1195        let name = crate::core::random_names::get_random_name();
1196        info.title = format!("video_{}", name);
1197    }
1198
1199    let mut cache = info_cache().lock().await;
1200    cache.insert(
1201        url.to_string(),
1202        CachedInfo {
1203            info: info.clone(),
1204            cached_at: std::time::Instant::now(),
1205        },
1206    );
1207    if cache.len() > 50 {
1208        cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1209    }
1210    Ok(info)
1211}
1212
1213pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1214    let cache = info_cache().lock().await;
1215    cache
1216        .get(url)
1217        .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1218        .map(|entry| entry.info.clone())
1219}
1220
1221pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1222    prefetch_info_with_emit(url, downloader, platform, None).await;
1223}
1224
1225pub async fn prefetch_info_with_emit(
1226    url: &str,
1227    downloader: &dyn PlatformDownloader,
1228    platform: &str,
1229    reporter: Option<SharedReporter>,
1230) {
1231    let _timer_start = std::time::Instant::now();
1232    tracing::debug!("[perf] prefetch_info: started");
1233    match fetch_and_cache_info(url, downloader, platform).await {
1234        Ok(info) => {
1235            tracing::debug!(
1236                "[perf] prefetch_info: completed in {:?} — {}",
1237                _timer_start.elapsed(),
1238                info.title
1239            );
1240            if let Some(r) = reporter {
1241                r.on_media_preview(
1242                    url.to_string(),
1243                    info.title.clone(),
1244                    info.author.clone(),
1245                    info.thumbnail_url.clone(),
1246                    info.duration_seconds,
1247                );
1248            }
1249        }
1250        Err(e) => tracing::warn!(
1251            "[perf] prefetch_info: failed in {:?} — {}",
1252            _timer_start.elapsed(),
1253            e
1254        ),
1255    }
1256}
1257
1258pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1259    let _timer_start = std::time::Instant::now();
1260    let (next_ids, stagger, state_to_emit, reporter) = {
1261        let mut q = queue.lock().await;
1262        let ids = q.next_queued_ids();
1263        for nid in &ids {
1264            q.mark_active(*nid);
1265        }
1266        let state = if !ids.is_empty() {
1267            Some(q.get_state())
1268        } else {
1269            None
1270        };
1271        (ids, q.stagger_delay_ms, state, q.reporter.clone())
1272    };
1273
1274    if let Some(state) = state_to_emit {
1275        emit_queue_state_from_state(&reporter, state);
1276    }
1277
1278    let batch_size = next_ids.len();
1279    for (i, nid) in next_ids.into_iter().enumerate() {
1280        if let Some(r) = &reporter {
1281            r.on_progress(
1282                nid,
1283                crate::core::events::QueueItemProgress {
1284                    id: nid,
1285                    title: String::new(),
1286                    platform: String::new(),
1287                    percent: 0.0,
1288                    speed_bytes_per_sec: 0.0,
1289                    downloaded_bytes: 0,
1290                    total_bytes: None,
1291                    phase: "queued_starting".to_string(),
1292                },
1293            );
1294        }
1295
1296        if i > 0 {
1297            let item_platform = {
1298                let q = queue.lock().await;
1299                q.items
1300                    .iter()
1301                    .find(|item| item.id == nid)
1302                    .map(|item| item.platform.clone())
1303            };
1304            let delay_ms = if item_platform.as_deref() == Some("youtube") {
1305                2000
1306            } else if batch_size > 3 {
1307                stagger.max(1000)
1308            } else {
1309                stagger
1310            };
1311            if delay_ms > 0 {
1312                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1313            }
1314        }
1315        let queue_c = queue.clone();
1316        tokio::spawn(async move {
1317            tokio::spawn(spawn_download(queue_c, nid));
1318        });
1319    }
1320    tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1321}
1322
1323fn is_generic_title(title: &str) -> bool {
1324    let t = title.to_lowercase();
1325    let t = t.trim();
1326    t.is_empty()
1327        || t == "video"
1328        || t == "media"
1329        || t == "untitled"
1330        || t == "unknown"
1331        || t.starts_with("video [video]")
1332        || t.starts_with("media [media]")
1333}