Skip to main content

mangofetch_core/core/manager/
queue.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio_util::sync::CancellationToken;
10
11use crate::core::traits::SharedReporter;
12use crate::models::media::MediaInfo;
13use crate::models::queue::{QueueItemInfo, QueueStatus};
14use crate::platforms::traits::PlatformDownloader;
15
16fn shared_http_client() -> &'static reqwest::Client {
17    static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
18    CLIENT.get_or_init(|| {
19        crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
20            .build()
21            .unwrap_or_default()
22    })
23}
24
25struct CachedInfo {
26    info: MediaInfo,
27    cached_at: std::time::Instant,
28}
29
30static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
31
32fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
33    INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
34}
35
36const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
37
38static IN_FLIGHT_FETCHES: OnceLock<
39    tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
40> = OnceLock::new();
41
42fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
43    IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
44}
45
46#[derive(Debug, Clone, Serialize)]
47pub struct MediaPreviewEvent {
48    pub url: String,
49    pub title: String,
50    pub author: String,
51    pub thumbnail_url: Option<String>,
52    pub duration_seconds: Option<f64>,
53}
54
55pub struct QueueItem {
56    pub id: u64,
57    pub url: String,
58    pub platform: String,
59    pub title: String,
60    pub status: QueueStatus,
61    pub cancel_token: CancellationToken,
62    pub output_dir: String,
63    pub download_mode: Option<String>,
64    pub quality: Option<String>,
65    pub video_format: Option<String>,
66    pub audio_format: Option<String>,
67    pub audio_quality: Option<String>,
68    pub format_id: Option<String>,
69    pub referer: Option<String>,
70    pub extra_headers: Option<std::collections::HashMap<String, String>>,
71    pub page_url: Option<String>,
72    pub user_agent: Option<String>,
73    pub percent: f64,
74    pub speed_bytes_per_sec: f64,
75    pub downloaded_bytes: u64,
76    pub total_bytes: Option<u64>,
77    pub file_path: Option<String>,
78    pub file_size_bytes: Option<u64>,
79    pub file_count: Option<u32>,
80    pub media_info: Option<MediaInfo>,
81    pub downloader: Arc<dyn PlatformDownloader>,
82    pub ytdlp_path: Option<PathBuf>,
83    pub from_hotkey: bool,
84    pub torrent_id: Option<usize>,
85    pub download_subtitles: Option<bool>,
86    pub phase: String,
87}
88
89impl QueueItem {
90    pub fn to_info(&self) -> QueueItemInfo {
91        QueueItemInfo {
92            id: self.id,
93            url: self.url.clone(),
94            platform: self.platform.clone(),
95            title: self.title.clone(),
96            status: self.status.clone(),
97            percent: self.percent,
98            speed_bytes_per_sec: self.speed_bytes_per_sec,
99            downloaded_bytes: self.downloaded_bytes,
100            total_bytes: self.total_bytes,
101            phase: self.phase.clone(),
102            file_path: self.file_path.clone(),
103            file_size_bytes: self.file_size_bytes,
104            file_count: self.file_count,
105            thumbnail_url: self
106                .media_info
107                .as_ref()
108                .and_then(|m| m.thumbnail_url.clone()),
109        }
110    }
111}
112
113pub struct DownloadQueue {
114    pub items: Vec<QueueItem>,
115    pub max_concurrent: u32,
116    pub stagger_delay_ms: u64,
117    pub reporter: Option<SharedReporter>,
118}
119
120impl DownloadQueue {
121    pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
122        Self {
123            items: Vec::new(),
124            max_concurrent,
125            stagger_delay_ms: 150,
126            reporter,
127        }
128    }
129
130    pub fn set_reporter(&mut self, reporter: SharedReporter) {
131        self.reporter = Some(reporter);
132    }
133
134    pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
135        let recovery_items = crate::core::manager::recovery::list();
136        for item in recovery_items {
137            let downloader = registry
138                .find_platform(&item.url)
139                .or_else(|| registry.find_by_name(&item.platform));
140
141            if let Some(dl) = downloader {
142                let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
143                    100.0
144                } else {
145                    0.0
146                };
147
148                let q_item = QueueItem {
149                    id: item.id,
150                    url: item.url,
151                    platform: item.platform,
152                    title: item.title,
153                    status: item.status,
154                    cancel_token: CancellationToken::new(),
155                    output_dir: item.output_dir,
156                    download_mode: item.download_mode,
157                    quality: item.quality,
158                    video_format: None,
159                    audio_format: None,
160                    audio_quality: None,
161                    format_id: item.format_id,
162                    referer: item.referer,
163                    extra_headers: None,
164                    page_url: None,
165                    user_agent: None,
166                    percent,
167                    speed_bytes_per_sec: 0.0,
168                    downloaded_bytes: 0,
169                    total_bytes: item.file_size_bytes,
170                    file_path: item.file_path,
171                    file_size_bytes: item.file_size_bytes,
172                    file_count: None,
173                    media_info: None,
174                    downloader: dl,
175                    ytdlp_path: None,
176                    from_hotkey: false,
177                    torrent_id: None,
178                    download_subtitles: None,
179                    phase: "Restored".to_string(),
180                };
181                self.items.push(q_item);
182            }
183        }
184    }
185
186    fn sync_recovery(item: &QueueItem) {
187        crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
188            id: item.id,
189            url: item.url.clone(),
190            title: item.title.clone(),
191            platform: item.platform.clone(),
192            output_dir: item.output_dir.clone(),
193            status: item.status.clone(),
194            download_mode: item.download_mode.clone(),
195            quality: item.quality.clone(),
196            format_id: item.format_id.clone(),
197            referer: item.referer.clone(),
198            file_path: item.file_path.clone(),
199            file_size_bytes: item.file_size_bytes,
200        });
201    }
202
203    #[allow(clippy::too_many_arguments)]
204    pub fn enqueue(
205        &mut self,
206        id: u64,
207        url: String,
208        platform: String,
209        title: String,
210        output_dir: String,
211        download_mode: Option<String>,
212        quality: Option<String>,
213        video_format: Option<String>,
214        audio_format: Option<String>,
215        audio_quality: Option<String>,
216        download_subtitles: Option<bool>,
217        format_id: Option<String>,
218        referer: Option<String>,
219        extra_headers: Option<std::collections::HashMap<String, String>>,
220        page_url: Option<String>,
221        user_agent: Option<String>,
222        media_info: Option<MediaInfo>,
223        total_bytes: Option<u64>,
224        file_count: Option<u32>,
225        downloader: Arc<dyn PlatformDownloader>,
226        ytdlp_path: Option<PathBuf>,
227        from_hotkey: bool,
228    ) {
229        let item = QueueItem {
230            id,
231            url,
232            platform,
233            title,
234            status: QueueStatus::Queued,
235            cancel_token: CancellationToken::new(),
236            output_dir,
237            download_mode,
238            quality,
239            video_format,
240            audio_format,
241            audio_quality,
242            format_id,
243            referer,
244            extra_headers,
245            page_url,
246            user_agent,
247            percent: 0.0,
248            speed_bytes_per_sec: 0.0,
249            downloaded_bytes: 0,
250            total_bytes,
251            file_path: None,
252            file_size_bytes: None,
253            file_count,
254            media_info,
255            downloader,
256            ytdlp_path,
257            from_hotkey,
258            torrent_id: None,
259            download_subtitles,
260            phase: "Queued".to_string(),
261        };
262        self.items.push(item);
263        Self::sync_recovery(self.items.last().unwrap());
264    }
265
266    pub fn active_count(&self) -> u32 {
267        self.items
268            .iter()
269            .filter(|i| i.status == QueueStatus::Active)
270            .count() as u32
271    }
272
273    pub fn next_queued_ids(&self) -> Vec<u64> {
274        let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
275        self.items
276            .iter()
277            .filter(|i| i.status == QueueStatus::Queued)
278            .take(slots)
279            .map(|i| i.id)
280            .collect()
281    }
282
283    pub fn mark_active(&mut self, id: u64) {
284        let item = self.items.iter_mut().find(|i| i.id == id);
285        if let Some(item) = item {
286            item.status = QueueStatus::Active;
287            item.cancel_token = CancellationToken::new();
288            Self::sync_recovery(item);
289        }
290    }
291
292    pub fn mark_complete(
293        &mut self,
294        id: u64,
295        success: bool,
296        error: Option<String>,
297        file_path: Option<String>,
298        file_size_bytes: Option<u64>,
299    ) {
300        let item = self.items.iter_mut().find(|i| i.id == id);
301        if let Some(item) = item {
302            if success {
303                item.status = QueueStatus::Complete { success: true };
304                item.percent = 100.0;
305            } else {
306                item.status = QueueStatus::Error {
307                    message: error.unwrap_or_default(),
308                };
309            }
310            item.file_path = file_path;
311            item.file_size_bytes = file_size_bytes;
312            item.speed_bytes_per_sec = 0.0;
313            Self::sync_recovery(item);
314        }
315    }
316
317    pub fn mark_seeding(
318        &mut self,
319        id: u64,
320        file_path: Option<String>,
321        file_size_bytes: Option<u64>,
322        torrent_id: Option<usize>,
323    ) {
324        let item = self.items.iter_mut().find(|i| i.id == id);
325        if let Some(item) = item {
326            item.status = QueueStatus::Seeding;
327            item.percent = 100.0;
328            item.file_path = file_path;
329            item.file_size_bytes = file_size_bytes;
330            item.speed_bytes_per_sec = 0.0;
331            item.torrent_id = torrent_id;
332            Self::sync_recovery(item);
333        }
334    }
335
336    pub fn update_progress(
337        &mut self,
338        id: u64,
339        percent: f64,
340        speed: f64,
341        downloaded: u64,
342        total: Option<u64>,
343        torrent_id: Option<usize>,
344    ) {
345        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
346            item.percent = percent;
347            item.speed_bytes_per_sec = speed;
348            item.downloaded_bytes = downloaded;
349            if let Some(t) = total {
350                item.total_bytes = Some(t);
351            }
352            if torrent_id.is_some() && item.torrent_id.is_none() {
353                item.torrent_id = torrent_id;
354            }
355        }
356    }
357
358    pub fn pause(&mut self, id: u64) -> bool {
359        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
360            if item.status == QueueStatus::Active {
361                if item.platform != "magnet" {
362                    item.cancel_token.cancel();
363                }
364                item.status = QueueStatus::Paused;
365                item.speed_bytes_per_sec = 0.0;
366                Self::sync_recovery(item);
367                return true;
368            }
369        }
370        false
371    }
372
373    pub fn resume(&mut self, id: u64) -> bool {
374        let item = self.items.iter_mut().find(|i| i.id == id);
375        if let Some(item) = item {
376            if item.status == QueueStatus::Paused {
377                if item.platform == "magnet" {
378                    item.status = QueueStatus::Active;
379                } else {
380                    item.status = QueueStatus::Queued;
381                    item.cancel_token = CancellationToken::new();
382                }
383                Self::sync_recovery(item);
384                return true;
385            }
386        }
387        false
388    }
389
390    pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
391        let result = self.cancel_inner(id);
392        if result.0 {
393            crate::core::manager::recovery::remove(id);
394        }
395        result
396    }
397
398    fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
399        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
400            match &item.status {
401                QueueStatus::Active => {
402                    item.cancel_token.cancel();
403                    item.status = QueueStatus::Error {
404                        message: "Cancelled".to_string(),
405                    };
406                    item.speed_bytes_per_sec = 0.0;
407                    Self::sync_recovery(item);
408                    return (true, None);
409                }
410                QueueStatus::Seeding => {
411                    let tid = item.torrent_id;
412                    item.status = QueueStatus::Error {
413                        message: "Cancelled".to_string(),
414                    };
415                    item.speed_bytes_per_sec = 0.0;
416                    Self::sync_recovery(item);
417                    return (true, tid);
418                }
419                QueueStatus::Paused => {
420                    item.cancel_token.cancel();
421                    let tid = if item.platform == "magnet" {
422                        item.torrent_id
423                    } else {
424                        None
425                    };
426                    item.status = QueueStatus::Error {
427                        message: "Cancelled".to_string(),
428                    };
429                    item.speed_bytes_per_sec = 0.0;
430                    Self::sync_recovery(item);
431                    return (true, tid);
432                }
433                QueueStatus::Queued => {
434                    item.status = QueueStatus::Error {
435                        message: "Cancelled".to_string(),
436                    };
437                    Self::sync_recovery(item);
438                    return (true, None);
439                }
440                _ => {}
441            }
442        }
443        (false, None)
444    }
445
446    pub fn retry(&mut self, id: u64) -> bool {
447        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
448            if matches!(item.status, QueueStatus::Error { .. }) {
449                item.status = QueueStatus::Queued;
450                item.cancel_token = CancellationToken::new();
451                item.percent = 0.0;
452                item.speed_bytes_per_sec = 0.0;
453                item.downloaded_bytes = 0;
454                item.file_path = None;
455                item.file_size_bytes = None;
456                Self::sync_recovery(item);
457                return true;
458            }
459        }
460        false
461    }
462
463    pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
464        let result = self.remove_inner(id);
465        if result.is_some() {
466            crate::core::manager::recovery::remove(id);
467        }
468        result
469    }
470
471    fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
472        if let Some(pos) = self.items.iter().position(|i| i.id == id) {
473            let item = &self.items[pos];
474            if item.status == QueueStatus::Active {
475                item.cancel_token.cancel();
476            }
477            if item.status == QueueStatus::Paused && item.platform == "magnet" {
478                item.cancel_token.cancel();
479            }
480            let torrent_id = if item.status == QueueStatus::Seeding
481                || (item.status == QueueStatus::Paused && item.platform == "magnet")
482            {
483                item.torrent_id
484            } else {
485                None
486            };
487            self.items.remove(pos);
488            return Some(torrent_id);
489        }
490        None
491    }
492
493    pub fn clear_finished(&mut self) {
494        let to_remove: Vec<u64> = self
495            .items
496            .iter()
497            .filter(|i| {
498                matches!(
499                    i.status,
500                    QueueStatus::Complete { .. } | QueueStatus::Error { .. }
501                )
502            })
503            .map(|i| i.id)
504            .collect();
505        for id in &to_remove {
506            crate::core::manager::recovery::remove(*id);
507        }
508        self.items.retain(|i| {
509            !matches!(
510                i.status,
511                QueueStatus::Complete { .. } | QueueStatus::Error { .. }
512            )
513        });
514    }
515
516    pub fn get_state(&self) -> Vec<QueueItemInfo> {
517        self.items.iter().map(|i| i.to_info()).collect()
518    }
519
520    pub fn has_url(&self, url: &str) -> bool {
521        self.items.iter().any(|i| {
522            i.url == url
523                && matches!(
524                    i.status,
525                    QueueStatus::Queued
526                        | QueueStatus::Active
527                        | QueueStatus::Paused
528                        | QueueStatus::Seeding
529                )
530        })
531    }
532}
533
534pub struct ProgressThrottle {
535    last_emit: std::time::Instant,
536    min_interval: std::time::Duration,
537}
538
539impl ProgressThrottle {
540    pub fn new(min_interval_ms: u64) -> Self {
541        Self {
542            last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
543            min_interval: std::time::Duration::from_millis(min_interval_ms),
544        }
545    }
546
547    pub fn should_emit(&mut self) -> bool {
548        let now = std::time::Instant::now();
549        if now.duration_since(self.last_emit) >= self.min_interval {
550            self.last_emit = now;
551            true
552        } else {
553            false
554        }
555    }
556}
557
558pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
559    let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
560    if n.is_multiple_of(10) {
561        tracing::debug!("[perf] emit_queue_state called {} times", n);
562    }
563    if let Some(reporter) = reporter {
564        reporter.on_queue_update(state);
565    }
566}
567
568pub fn emit_queue_state(queue: &DownloadQueue) {
569    let state = queue.get_state();
570    emit_queue_state_from_state(&queue.reporter, state);
571}
572
573/// RAII guard that ensures an Active queue item never leaks a slot.
574struct ActiveJobSlot {
575    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
576    item_id: u64,
577    armed: bool,
578}
579
580impl ActiveJobSlot {
581    fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
582        Self {
583            queue,
584            item_id,
585            armed: true,
586        }
587    }
588
589    fn disarm(mut self) {
590        self.armed = false;
591    }
592}
593
594impl Drop for ActiveJobSlot {
595    fn drop(&mut self) {
596        if !self.armed {
597            return;
598        }
599        let queue = self.queue.clone();
600        let item_id = self.item_id;
601        tokio::spawn(async move {
602            let state = {
603                let mut q = queue.lock().await;
604                let still_active = q
605                    .items
606                    .iter()
607                    .find(|i| i.id == item_id)
608                    .map(|i| i.status == QueueStatus::Active)
609                    .unwrap_or(false);
610                if !still_active {
611                    return;
612                }
613                tracing::warn!(
614                    "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
615                    item_id
616                );
617                q.mark_complete(
618                    item_id,
619                    false,
620                    Some("Download interrupted".to_string()),
621                    None,
622                    None,
623                );
624                q.get_state()
625            };
626            let reporter = { queue.lock().await.reporter.clone() };
627            emit_queue_state_from_state(&reporter, state);
628            tokio::spawn(try_start_next(queue));
629        });
630    }
631}
632
633pub fn spawn_download(
634    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
635    item_id: u64,
636) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
637    Box::pin(async move {
638        let _timer_start = std::time::Instant::now();
639        let slot = ActiveJobSlot::new(queue.clone(), item_id);
640        tokio::spawn(spawn_download_inner(queue.clone(), item_id));
641        slot.disarm();
642        tracing::debug!(
643            "[perf] spawn_download {} took {:?}",
644            item_id,
645            _timer_start.elapsed()
646        );
647    })
648}
649
650struct DownloadContext {
651    url: String,
652    output_dir: String,
653    download_mode: Option<String>,
654    quality: Option<String>,
655    video_format: Option<String>,
656    audio_format: Option<String>,
657    audio_quality: Option<String>,
658    download_subtitles: Option<bool>,
659    format_id: Option<String>,
660    referer: Option<String>,
661    extra_headers: Option<std::collections::HashMap<String, String>>,
662    page_url: Option<String>,
663    user_agent: Option<String>,
664    cancel_token: tokio_util::sync::CancellationToken,
665    media_info: Option<crate::models::media::MediaInfo>,
666    platform_name: String,
667    downloader: std::sync::Arc<dyn crate::platforms::traits::PlatformDownloader>,
668    ytdlp_path: Option<std::path::PathBuf>,
669    from_hotkey: bool,
670}
671
672async fn extract_download_context(
673    queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
674    item_id: u64,
675) -> Option<DownloadContext> {
676    let q = queue.lock().await;
677    let item = q.items.iter().find(|i| i.id == item_id)?;
678    Some(DownloadContext {
679        url: item.url.clone(),
680        output_dir: item.output_dir.clone(),
681        download_mode: item.download_mode.clone(),
682        quality: item.quality.clone(),
683        video_format: item.video_format.clone(),
684        audio_format: item.audio_format.clone(),
685        audio_quality: item.audio_quality.clone(),
686        download_subtitles: item.download_subtitles,
687        format_id: item.format_id.clone(),
688        referer: item.referer.clone(),
689        extra_headers: item.extra_headers.clone(),
690        page_url: item.page_url.clone(),
691        user_agent: item.user_agent.clone(),
692        cancel_token: item.cancel_token.clone(),
693        media_info: item.media_info.clone(),
694        platform_name: item.platform.clone(),
695        downloader: item.downloader.clone(),
696        ytdlp_path: item.ytdlp_path.clone(),
697        from_hotkey: item.from_hotkey,
698    })
699}
700
701async fn prepare_media_info(
702    queue: &Arc<tokio::sync::Mutex<DownloadQueue>>,
703    item_id: u64,
704    ctx: &DownloadContext,
705    reporter: &Option<crate::core::traits::SharedReporter>,
706) -> Option<crate::models::media::MediaInfo> {
707    let info_start = std::time::Instant::now();
708    let info = match &ctx.media_info {
709        Some(i) => {
710            tracing::info!(
711                "[queue] info for {} from cache/pre-fetched in {:?}",
712                item_id,
713                info_start.elapsed()
714            );
715            i.clone()
716        }
717        None => {
718            tracing::debug!(
719                "[perf] spawn_download_inner {}: media_info is None, fetching info",
720                item_id
721            );
722            if let Some(r) = reporter {
723                r.on_progress(
724                    item_id,
725                    crate::core::events::QueueItemProgress {
726                        id: item_id,
727                        title: ctx.url.clone(),
728                        platform: ctx.platform_name.clone(),
729                        percent: 0.0,
730                        speed_bytes_per_sec: 0.0,
731                        downloaded_bytes: 0,
732                        total_bytes: None,
733                        phase: "fetching_info".to_string(),
734                    },
735                );
736            }
737
738            let info_result = tokio::time::timeout(
739                std::time::Duration::from_secs(60),
740                fetch_and_cache_info(&ctx.url, &*ctx.downloader, &ctx.platform_name),
741            )
742            .await;
743
744            match info_result {
745                Ok(Ok(i)) => i,
746                Ok(Err(e)) => {
747                    let state = {
748                        let mut q = queue.lock().await;
749                        q.mark_complete(item_id, false, Some(e.to_string()), None, None);
750                        q.get_state()
751                    };
752                    emit_queue_state_from_state(reporter, state);
753                    tokio::spawn(try_start_next(queue.clone()));
754                    return None;
755                }
756                Err(_) => {
757                    tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
758                    let state = {
759                        let mut q = queue.lock().await;
760                        q.mark_complete(
761                            item_id,
762                            false,
763                            Some("Timed out fetching video info".to_string()),
764                            None,
765                            None,
766                        );
767                        q.get_state()
768                    };
769                    emit_queue_state_from_state(reporter, state);
770                    tokio::spawn(try_start_next(queue.clone()));
771                    return None;
772                }
773            }
774        }
775    };
776    tracing::info!(
777        "[queue] info fetch for {} took {:?}",
778        item_id,
779        info_start.elapsed()
780    );
781
782    let state = {
783        let mut q = queue.lock().await;
784        if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
785            item.title = info.title.clone();
786            item.total_bytes = info.file_size_bytes;
787            let fc = if info.media_type == crate::models::media::MediaType::Carousel
788                || info.media_type == crate::models::media::MediaType::Playlist
789            {
790                info.available_qualities.len() as u32
791            } else {
792                1
793            };
794            item.file_count = Some(fc);
795            item.media_info = Some(info.clone());
796        }
797        q.get_state()
798    };
799    emit_queue_state_from_state(reporter, state);
800
801    if let Some(r) = reporter {
802        r.on_progress(
803            item_id,
804            crate::core::events::QueueItemProgress {
805                id: item_id,
806                title: info.title.clone(),
807                platform: ctx.platform_name.clone(),
808                percent: 0.5,
809                speed_bytes_per_sec: 0.0,
810                downloaded_bytes: 0,
811                total_bytes: info.file_size_bytes,
812                phase: "starting".to_string(),
813            },
814        );
815    }
816
817    Some(info)
818}
819
820fn build_download_options(
821    ctx: &DownloadContext,
822) -> (
823    crate::models::media::DownloadOptions,
824    std::sync::Arc<tokio::sync::Mutex<Option<usize>>>,
825) {
826    let settings = crate::models::settings::AppSettings::load_from_disk();
827    let tmpl = settings.download.filename_template.clone();
828    let mut final_output_dir = std::path::PathBuf::from(&ctx.output_dir);
829    if settings.download.organize_by_platform {
830        final_output_dir = final_output_dir.join(&ctx.platform_name);
831    }
832    let torrent_id_slot = std::sync::Arc::new(tokio::sync::Mutex::new(None));
833    let opts = crate::models::media::DownloadOptions {
834        quality: ctx
835            .quality
836            .clone()
837            .or_else(|| Some(settings.download.video_quality.clone())),
838        video_format: ctx
839            .video_format
840            .clone()
841            .or_else(|| Some(settings.download.video_format.clone())),
842        audio_format: ctx
843            .audio_format
844            .clone()
845            .or_else(|| Some(settings.download.audio_format.clone())),
846        audio_quality: ctx
847            .audio_quality
848            .clone()
849            .or_else(|| Some(settings.download.audio_quality.clone())),
850        output_dir: final_output_dir,
851        filename_template: Some(tmpl),
852        download_subtitles: ctx
853            .download_subtitles
854            .unwrap_or(settings.download.download_subtitles),
855        include_auto_subtitles: settings.download.include_auto_subtitles,
856        download_mode: ctx.download_mode.clone(),
857        format_id: ctx.format_id.clone(),
858        referer: ctx.referer.clone(),
859        extra_headers: ctx.extra_headers.clone(),
860        page_url: ctx.page_url.clone(),
861        user_agent: ctx.user_agent.clone(),
862        cancel_token: ctx.cancel_token.clone(),
863        concurrent_fragments: settings.advanced.concurrent_fragments,
864        ytdlp_path: ctx.ytdlp_path.clone(),
865        torrent_listen_port: Some(settings.advanced.torrent_listen_port),
866        torrent_id_slot: Some(torrent_id_slot.clone()),
867    };
868    (opts, torrent_id_slot)
869}
870
871async fn handle_download_result(
872    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
873    item_id: u64,
874    ctx: DownloadContext,
875    info: crate::models::media::MediaInfo,
876    reporter: Option<crate::core::traits::SharedReporter>,
877    result: anyhow::Result<crate::models::media::DownloadResult>,
878) {
879    let settings = crate::models::settings::AppSettings::load_from_disk();
880    match result {
881        Ok(dl) => {
882            if settings.download.embed_metadata
883                && ctx.platform_name != "magnet"
884                && crate::core::ffmpeg::is_ffmpeg_available().await
885            {
886                let metadata = crate::core::ffmpeg::MetadataEmbed {
887                    title: Some(info.title.clone()),
888                    artist: Some(info.author.clone()),
889                    thumbnail_url: info.thumbnail_url.clone(),
890                    ..Default::default()
891                };
892                if let Err(e) = crate::core::ffmpeg::embed_metadata(
893                    &dl.file_path,
894                    &metadata,
895                    settings.download.embed_thumbnail,
896                    shared_http_client(),
897                )
898                .await
899                {
900                    tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
901                }
902            }
903
904            if ctx.from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
905                #[cfg(not(target_os = "android"))]
906                {
907                    match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
908                        Ok(()) => {
909                            tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
910                        }
911                        Err(e) => {
912                            tracing::warn!("[clipboard] failed to copy file: {}", e);
913                        }
914                    }
915                }
916            }
917
918            let state = {
919                let mut q = queue.lock().await;
920                if ctx.platform_name == "magnet" && dl.torrent_id.is_some() {
921                    q.mark_seeding(
922                        item_id,
923                        Some(dl.file_path.to_string_lossy().to_string()),
924                        Some(dl.file_size_bytes),
925                        dl.torrent_id,
926                    );
927                } else {
928                    q.mark_complete(
929                        item_id,
930                        true,
931                        None,
932                        Some(dl.file_path.to_string_lossy().to_string()),
933                        Some(dl.file_size_bytes),
934                    );
935                }
936                q.get_state()
937            };
938            if let Some(r) = &reporter {
939                r.on_complete(
940                    item_id,
941                    Some(dl.file_path.to_string_lossy().to_string()),
942                    Some(dl.file_size_bytes),
943                );
944            }
945            emit_queue_state_from_state(&reporter, state);
946        }
947        Err(e) => {
948            let raw_err = format!("{}", e);
949            let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
950            let user_msg = if category != "unknown" {
951                format!("{} ({})", hint, raw_err)
952            } else {
953                raw_err.clone()
954            };
955            tracing::error!(
956                "Download error '{}' [{}]: {}",
957                ctx.platform_name,
958                category,
959                raw_err
960            );
961            let state = {
962                let mut q = queue.lock().await;
963                q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
964                q.get_state()
965            };
966            if let Some(r) = &reporter {
967                r.on_error(item_id, user_msg);
968            }
969            emit_queue_state_from_state(&reporter, state);
970        }
971    }
972}
973
974async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
975    tracing::info!("[queue] download {} started", item_id);
976
977    let reporter = { queue.lock().await.reporter.clone() };
978
979    if let Some(r) = &reporter {
980        r.on_progress(
981            item_id,
982            crate::core::events::QueueItemProgress {
983                id: item_id,
984                title: "".to_string(),
985                platform: "".to_string(),
986                percent: 0.0,
987                speed_bytes_per_sec: 0.0,
988                downloaded_bytes: 0,
989                total_bytes: None,
990                phase: "preparing".to_string(),
991            },
992        );
993    }
994
995    let ctx = match extract_download_context(&queue, item_id).await {
996        Some(c) => c,
997        None => return,
998    };
999
1000    let info = match prepare_media_info(&queue, item_id, &ctx, &reporter).await {
1001        Some(i) => i,
1002        None => return,
1003    };
1004
1005    let (opts, torrent_id_slot) = build_download_options(&ctx);
1006
1007    let total_bytes = info.file_size_bytes;
1008    let item_title = info.title.clone();
1009    let item_platform = ctx.platform_name.clone();
1010    let (tx, mut rx) = tokio::sync::mpsc::channel::<f64>(32);
1011
1012    let reporter_progress = reporter.clone();
1013    let queue_progress = queue.clone();
1014    let torrent_id_slot_progress = torrent_id_slot.clone();
1015    let progress_forwarder = tokio::spawn(async move {
1016        let mut last_bytes: u64 = 0;
1017        let mut last_time = std::time::Instant::now();
1018        let mut throttle = ProgressThrottle::new(250);
1019        let mut current_speed: f64 = 0.0;
1020
1021        while let Some(percent) = rx.recv().await {
1022            if !throttle.should_emit() && percent < 100.0 {
1023                continue;
1024            }
1025
1026            let now = std::time::Instant::now();
1027            let clamped = percent.max(0.0);
1028            let downloaded_bytes = total_bytes
1029                .map(|total| (clamped / 100.0 * total as f64) as u64)
1030                .unwrap_or(0);
1031
1032            if total_bytes.is_some() && downloaded_bytes > last_bytes {
1033                let dt = now.duration_since(last_time).as_secs_f64();
1034                if dt > 0.1 {
1035                    let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
1036                    current_speed = if current_speed > 0.0 {
1037                        current_speed * 0.7 + instant_speed * 0.3
1038                    } else {
1039                        instant_speed
1040                    };
1041                }
1042            }
1043
1044            last_bytes = downloaded_bytes;
1045            last_time = now;
1046
1047            let phase = match percent {
1048                p if p < -1.5 => "connecting",
1049                p if p < -0.5 => "starting",
1050                p if p > 0.0 => "downloading",
1051                _ => "starting",
1052            };
1053
1054            {
1055                let mut q = queue_progress.lock().await;
1056                let tid = { *torrent_id_slot_progress.lock().await };
1057                q.update_progress(
1058                    item_id,
1059                    clamped,
1060                    current_speed,
1061                    downloaded_bytes,
1062                    total_bytes,
1063                    tid,
1064                );
1065            }
1066
1067            if let Some(r) = &reporter_progress {
1068                r.on_progress(
1069                    item_id,
1070                    crate::core::events::QueueItemProgress {
1071                        id: item_id,
1072                        title: item_title.clone(),
1073                        platform: item_platform.clone(),
1074                        percent: clamped,
1075                        speed_bytes_per_sec: current_speed,
1076                        downloaded_bytes,
1077                        total_bytes,
1078                        phase: phase.to_string(),
1079                    },
1080                );
1081            }
1082        }
1083    });
1084
1085    if let Some(ua) = opts.user_agent.clone() {
1086        crate::core::ytdlp::register_ext_user_agent(ctx.url.clone(), ua);
1087    }
1088    if let Some(hdrs) = opts.extra_headers.clone() {
1089        crate::core::ytdlp::register_ext_headers(ctx.url.clone(), hdrs);
1090    }
1091
1092    // Ensure FFmpeg is installed if needed and try to surface progress to reporter (TUI/CLI)
1093    if !crate::core::ffmpeg::is_ffmpeg_available().await {
1094        let rep_ref = reporter.as_ref().map(|r| r.as_ref());
1095        match crate::core::dependencies::ensure_ffmpeg(rep_ref).await {
1096            Ok(path) => {
1097                tracing::info!("[ffmpeg] auto-installed to {:?}", path);
1098                crate::core::ffmpeg::reset_ffmpeg_available_cache();
1099            }
1100            Err(e) => tracing::warn!("[ffmpeg] auto-install failed: {}", e),
1101        }
1102    }
1103
1104    let dl_start = std::time::Instant::now();
1105    let dl_future = async {
1106        tokio::select! {
1107            r = ctx.downloader.download(&info, &opts, tx) => r,
1108            _ = ctx.cancel_token.cancelled() => {
1109                Err(anyhow::anyhow!("Download cancelled"))
1110            }
1111        }
1112    };
1113    let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
1114        .scope(item_id, dl_future)
1115        .await;
1116    crate::core::ytdlp::clear_ext_user_agent(&ctx.url);
1117    crate::core::ytdlp::clear_ext_headers(&ctx.url);
1118    tracing::info!(
1119        "[queue] download {} completed in {:?}",
1120        item_id,
1121        dl_start.elapsed()
1122    );
1123
1124    let _ = progress_forwarder.await;
1125
1126    let was_paused = {
1127        let q = queue.lock().await;
1128        q.items
1129            .iter()
1130            .find(|i| i.id == item_id)
1131            .map(|i| i.status == QueueStatus::Paused)
1132            .unwrap_or(false)
1133    };
1134
1135    if was_paused {
1136        let state = {
1137            let q = queue.lock().await;
1138            q.get_state()
1139        };
1140        emit_queue_state_from_state(&reporter, state);
1141        tokio::spawn(try_start_next(queue));
1142        return;
1143    }
1144
1145    handle_download_result(queue.clone(), item_id, ctx, info, reporter, result).await;
1146
1147    tokio::spawn(try_start_next(queue));
1148}
1149
1150pub async fn fetch_and_cache_info(
1151    url: &str,
1152    downloader: &dyn PlatformDownloader,
1153    platform: &str,
1154) -> anyhow::Result<MediaInfo> {
1155    {
1156        let cache = info_cache().lock().await;
1157        if let Some(entry) = cache.get(url) {
1158            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1159                tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1160                return Ok(entry.info.clone());
1161            }
1162        }
1163    }
1164
1165    let url_lock = {
1166        let mut map = in_flight_map().lock().await;
1167        map.entry(url.to_string())
1168            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1169            .clone()
1170    };
1171    let _guard = url_lock.lock().await;
1172
1173    {
1174        let cache = info_cache().lock().await;
1175        if let Some(entry) = cache.get(url) {
1176            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1177                tracing::debug!(
1178                    "[perf] fetch_and_cache_info: dedup cache hit for {}",
1179                    platform
1180                );
1181                return Ok(entry.info.clone());
1182            }
1183        }
1184    }
1185
1186    tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1187    let mut info = downloader.get_media_info(url).await?;
1188
1189    if is_generic_title(&info.title) {
1190        let name = crate::core::random_names::get_random_name();
1191        info.title = format!("video_{}", name);
1192    }
1193
1194    let mut cache = info_cache().lock().await;
1195    cache.insert(
1196        url.to_string(),
1197        CachedInfo {
1198            info: info.clone(),
1199            cached_at: std::time::Instant::now(),
1200        },
1201    );
1202    if cache.len() > 50 {
1203        cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1204    }
1205    Ok(info)
1206}
1207
1208pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1209    let cache = info_cache().lock().await;
1210    cache
1211        .get(url)
1212        .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1213        .map(|entry| entry.info.clone())
1214}
1215
1216pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1217    prefetch_info_with_emit(url, downloader, platform, None).await;
1218}
1219
1220pub async fn prefetch_info_with_emit(
1221    url: &str,
1222    downloader: &dyn PlatformDownloader,
1223    platform: &str,
1224    reporter: Option<SharedReporter>,
1225) {
1226    let _timer_start = std::time::Instant::now();
1227    tracing::debug!("[perf] prefetch_info: started");
1228    match fetch_and_cache_info(url, downloader, platform).await {
1229        Ok(info) => {
1230            tracing::debug!(
1231                "[perf] prefetch_info: completed in {:?} — {}",
1232                _timer_start.elapsed(),
1233                info.title
1234            );
1235            if let Some(r) = reporter {
1236                r.on_media_preview(
1237                    url.to_string(),
1238                    info.title.clone(),
1239                    info.author.clone(),
1240                    info.thumbnail_url.clone(),
1241                    info.duration_seconds,
1242                );
1243            }
1244        }
1245        Err(e) => tracing::warn!(
1246            "[perf] prefetch_info: failed in {:?} — {}",
1247            _timer_start.elapsed(),
1248            e
1249        ),
1250    }
1251}
1252
1253pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1254    let _timer_start = std::time::Instant::now();
1255    let (next_ids, stagger, state_to_emit, reporter) = {
1256        let mut q = queue.lock().await;
1257        let ids = q.next_queued_ids();
1258        for nid in &ids {
1259            q.mark_active(*nid);
1260        }
1261        let state = if !ids.is_empty() {
1262            Some(q.get_state())
1263        } else {
1264            None
1265        };
1266        (ids, q.stagger_delay_ms, state, q.reporter.clone())
1267    };
1268
1269    if let Some(state) = state_to_emit {
1270        emit_queue_state_from_state(&reporter, state);
1271    }
1272
1273    let batch_size = next_ids.len();
1274    for (i, nid) in next_ids.into_iter().enumerate() {
1275        if let Some(r) = &reporter {
1276            r.on_progress(
1277                nid,
1278                crate::core::events::QueueItemProgress {
1279                    id: nid,
1280                    title: String::new(),
1281                    platform: String::new(),
1282                    percent: 0.0,
1283                    speed_bytes_per_sec: 0.0,
1284                    downloaded_bytes: 0,
1285                    total_bytes: None,
1286                    phase: "queued_starting".to_string(),
1287                },
1288            );
1289        }
1290
1291        if i > 0 {
1292            let item_platform = {
1293                let q = queue.lock().await;
1294                q.items
1295                    .iter()
1296                    .find(|item| item.id == nid)
1297                    .map(|item| item.platform.clone())
1298            };
1299            let delay_ms = if item_platform.as_deref() == Some("youtube") {
1300                2000
1301            } else if batch_size > 3 {
1302                stagger.max(1000)
1303            } else {
1304                stagger
1305            };
1306            if delay_ms > 0 {
1307                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1308            }
1309        }
1310        let queue_c = queue.clone();
1311        tokio::spawn(async move {
1312            tokio::spawn(spawn_download(queue_c, nid));
1313        });
1314    }
1315    tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1316}
1317
1318fn is_generic_title(title: &str) -> bool {
1319    let t = title.to_lowercase();
1320    let t = t.trim();
1321    t.is_empty()
1322        || t == "video"
1323        || t == "media"
1324        || t == "untitled"
1325        || t == "unknown"
1326        || t.starts_with("video [video]")
1327        || t.starts_with("media [media]")
1328}