Skip to main content

mangofetch_core/core/manager/
queue.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use crate::core::traits::SharedReporter;
13use crate::models::media::MediaInfo;
14use crate::models::queue::{QueueItemInfo, QueueStatus};
15use crate::platforms::traits::PlatformDownloader;
16
17fn shared_http_client() -> &'static reqwest::Client {
18    static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
19    CLIENT.get_or_init(|| {
20        crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
21            .build()
22            .unwrap_or_default()
23    })
24}
25
26struct CachedInfo {
27    info: MediaInfo,
28    cached_at: std::time::Instant,
29}
30
31static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
32
33fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
34    INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
35}
36
37const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
38
39static IN_FLIGHT_FETCHES: OnceLock<
40    tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
41> = OnceLock::new();
42
43fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
44    IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct MediaPreviewEvent {
49    pub url: String,
50    pub title: String,
51    pub author: String,
52    pub thumbnail_url: Option<String>,
53    pub duration_seconds: Option<f64>,
54}
55
56pub struct QueueItem {
57    pub id: u64,
58    pub url: String,
59    pub platform: String,
60    pub title: String,
61    pub status: QueueStatus,
62    pub cancel_token: CancellationToken,
63    pub output_dir: String,
64    pub download_mode: Option<String>,
65    pub quality: Option<String>,
66    pub format_id: Option<String>,
67    pub referer: Option<String>,
68    pub extra_headers: Option<std::collections::HashMap<String, String>>,
69    pub page_url: Option<String>,
70    pub user_agent: Option<String>,
71    pub percent: f64,
72    pub speed_bytes_per_sec: f64,
73    pub downloaded_bytes: u64,
74    pub total_bytes: Option<u64>,
75    pub file_path: Option<String>,
76    pub file_size_bytes: Option<u64>,
77    pub file_count: Option<u32>,
78    pub media_info: Option<MediaInfo>,
79    pub downloader: Arc<dyn PlatformDownloader>,
80    pub ytdlp_path: Option<PathBuf>,
81    pub from_hotkey: bool,
82    pub torrent_id: Option<usize>,
83    pub phase: String,
84}
85
86impl QueueItem {
87    pub fn to_info(&self) -> QueueItemInfo {
88        QueueItemInfo {
89            id: self.id,
90            url: self.url.clone(),
91            platform: self.platform.clone(),
92            title: self.title.clone(),
93            status: self.status.clone(),
94            percent: self.percent,
95            speed_bytes_per_sec: self.speed_bytes_per_sec,
96            downloaded_bytes: self.downloaded_bytes,
97            total_bytes: self.total_bytes,
98            phase: self.phase.clone(),
99            file_path: self.file_path.clone(),
100            file_size_bytes: self.file_size_bytes,
101            file_count: self.file_count,
102            thumbnail_url: self
103                .media_info
104                .as_ref()
105                .and_then(|m| m.thumbnail_url.clone()),
106        }
107    }
108}
109
110pub struct DownloadQueue {
111    pub items: Vec<QueueItem>,
112    pub max_concurrent: u32,
113    pub stagger_delay_ms: u64,
114    pub reporter: Option<SharedReporter>,
115}
116
117impl DownloadQueue {
118    pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
119        Self {
120            items: Vec::new(),
121            max_concurrent,
122            stagger_delay_ms: 150,
123            reporter,
124        }
125    }
126
127    pub fn set_reporter(&mut self, reporter: SharedReporter) {
128        self.reporter = Some(reporter);
129    }
130
131    pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
132        let recovery_items = crate::core::manager::recovery::list();
133        for item in recovery_items {
134            let downloader = registry
135                .find_platform(&item.url)
136                .or_else(|| registry.find_by_name(&item.platform));
137
138            if let Some(dl) = downloader {
139                let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
140                    100.0
141                } else {
142                    0.0
143                };
144
145                let q_item = QueueItem {
146                    id: item.id,
147                    url: item.url,
148                    platform: item.platform,
149                    title: item.title,
150                    status: item.status,
151                    cancel_token: CancellationToken::new(),
152                    output_dir: item.output_dir,
153                    download_mode: item.download_mode,
154                    quality: item.quality,
155                    format_id: item.format_id,
156                    referer: item.referer,
157                    extra_headers: None,
158                    page_url: None,
159                    user_agent: None,
160                    percent,
161                    speed_bytes_per_sec: 0.0,
162                    downloaded_bytes: 0,
163                    total_bytes: item.file_size_bytes,
164                    file_path: item.file_path,
165                    file_size_bytes: item.file_size_bytes,
166                    file_count: None,
167                    media_info: None,
168                    downloader: dl,
169                    ytdlp_path: None,
170                    from_hotkey: false,
171                    torrent_id: None,
172                    phase: "Restored".to_string(),
173                };
174                self.items.push(q_item);
175            }
176        }
177    }
178
179    fn sync_recovery(item: &QueueItem) {
180        crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
181            id: item.id,
182            url: item.url.clone(),
183            title: item.title.clone(),
184            platform: item.platform.clone(),
185            output_dir: item.output_dir.clone(),
186            status: item.status.clone(),
187            download_mode: item.download_mode.clone(),
188            quality: item.quality.clone(),
189            format_id: item.format_id.clone(),
190            referer: item.referer.clone(),
191            file_path: item.file_path.clone(),
192            file_size_bytes: item.file_size_bytes,
193        });
194    }
195
196    #[allow(clippy::too_many_arguments)]
197    pub fn enqueue(
198        &mut self,
199        id: u64,
200        url: String,
201        platform: String,
202        title: String,
203        output_dir: String,
204        download_mode: Option<String>,
205        quality: Option<String>,
206        format_id: Option<String>,
207        referer: Option<String>,
208        extra_headers: Option<std::collections::HashMap<String, String>>,
209        page_url: Option<String>,
210        user_agent: Option<String>,
211        media_info: Option<MediaInfo>,
212        total_bytes: Option<u64>,
213        file_count: Option<u32>,
214        downloader: Arc<dyn PlatformDownloader>,
215        ytdlp_path: Option<PathBuf>,
216        from_hotkey: bool,
217    ) {
218        let item = QueueItem {
219            id,
220            url,
221            platform,
222            title,
223            status: QueueStatus::Queued,
224            cancel_token: CancellationToken::new(),
225            output_dir,
226            download_mode,
227            quality,
228            format_id,
229            referer,
230            extra_headers,
231            page_url,
232            user_agent,
233            percent: 0.0,
234            speed_bytes_per_sec: 0.0,
235            downloaded_bytes: 0,
236            total_bytes,
237            file_path: None,
238            file_size_bytes: None,
239            file_count,
240            media_info,
241            downloader,
242            ytdlp_path,
243            from_hotkey,
244            torrent_id: None,
245            phase: "Queued".to_string(),
246        };
247        self.items.push(item);
248        Self::sync_recovery(self.items.last().unwrap());
249    }
250
251    pub fn active_count(&self) -> u32 {
252        self.items
253            .iter()
254            .filter(|i| i.status == QueueStatus::Active)
255            .count() as u32
256    }
257
258    pub fn next_queued_ids(&self) -> Vec<u64> {
259        let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
260        self.items
261            .iter()
262            .filter(|i| i.status == QueueStatus::Queued)
263            .take(slots)
264            .map(|i| i.id)
265            .collect()
266    }
267
268    pub fn mark_active(&mut self, id: u64) {
269        let item = self.items.iter_mut().find(|i| i.id == id);
270        if let Some(item) = item {
271            item.status = QueueStatus::Active;
272            item.cancel_token = CancellationToken::new();
273            Self::sync_recovery(item);
274        }
275    }
276
277    pub fn mark_complete(
278        &mut self,
279        id: u64,
280        success: bool,
281        error: Option<String>,
282        file_path: Option<String>,
283        file_size_bytes: Option<u64>,
284    ) {
285        let item = self.items.iter_mut().find(|i| i.id == id);
286        if let Some(item) = item {
287            if success {
288                item.status = QueueStatus::Complete { success: true };
289                item.percent = 100.0;
290            } else {
291                item.status = QueueStatus::Error {
292                    message: error.unwrap_or_default(),
293                };
294            }
295            item.file_path = file_path;
296            item.file_size_bytes = file_size_bytes;
297            item.speed_bytes_per_sec = 0.0;
298            Self::sync_recovery(item);
299        }
300    }
301
302    pub fn mark_seeding(
303        &mut self,
304        id: u64,
305        file_path: Option<String>,
306        file_size_bytes: Option<u64>,
307        torrent_id: Option<usize>,
308    ) {
309        let item = self.items.iter_mut().find(|i| i.id == id);
310        if let Some(item) = item {
311            item.status = QueueStatus::Seeding;
312            item.percent = 100.0;
313            item.file_path = file_path;
314            item.file_size_bytes = file_size_bytes;
315            item.speed_bytes_per_sec = 0.0;
316            item.torrent_id = torrent_id;
317            Self::sync_recovery(item);
318        }
319    }
320
321    pub fn update_progress(
322        &mut self,
323        id: u64,
324        percent: f64,
325        speed: f64,
326        downloaded: u64,
327        total: Option<u64>,
328        torrent_id: Option<usize>,
329    ) {
330        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
331            item.percent = percent;
332            item.speed_bytes_per_sec = speed;
333            item.downloaded_bytes = downloaded;
334            if let Some(t) = total {
335                item.total_bytes = Some(t);
336            }
337            if torrent_id.is_some() && item.torrent_id.is_none() {
338                item.torrent_id = torrent_id;
339            }
340        }
341    }
342
343    pub fn pause(&mut self, id: u64) -> bool {
344        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
345            if item.status == QueueStatus::Active {
346                if item.platform != "magnet" {
347                    item.cancel_token.cancel();
348                }
349                item.status = QueueStatus::Paused;
350                item.speed_bytes_per_sec = 0.0;
351                Self::sync_recovery(item);
352                return true;
353            }
354        }
355        false
356    }
357
358    pub fn resume(&mut self, id: u64) -> bool {
359        let item = self.items.iter_mut().find(|i| i.id == id);
360        if let Some(item) = item {
361            if item.status == QueueStatus::Paused {
362                if item.platform == "magnet" {
363                    item.status = QueueStatus::Active;
364                } else {
365                    item.status = QueueStatus::Queued;
366                    item.cancel_token = CancellationToken::new();
367                }
368                Self::sync_recovery(item);
369                return true;
370            }
371        }
372        false
373    }
374
375    pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
376        let result = self.cancel_inner(id);
377        if result.0 {
378            crate::core::manager::recovery::remove(id);
379        }
380        result
381    }
382
383    fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
384        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
385            match &item.status {
386                QueueStatus::Active => {
387                    item.cancel_token.cancel();
388                    item.status = QueueStatus::Error {
389                        message: "Cancelled".to_string(),
390                    };
391                    item.speed_bytes_per_sec = 0.0;
392                    Self::sync_recovery(item);
393                    return (true, None);
394                }
395                QueueStatus::Seeding => {
396                    let tid = item.torrent_id;
397                    item.status = QueueStatus::Error {
398                        message: "Cancelled".to_string(),
399                    };
400                    item.speed_bytes_per_sec = 0.0;
401                    Self::sync_recovery(item);
402                    return (true, tid);
403                }
404                QueueStatus::Paused => {
405                    item.cancel_token.cancel();
406                    let tid = if item.platform == "magnet" {
407                        item.torrent_id
408                    } else {
409                        None
410                    };
411                    item.status = QueueStatus::Error {
412                        message: "Cancelled".to_string(),
413                    };
414                    item.speed_bytes_per_sec = 0.0;
415                    Self::sync_recovery(item);
416                    return (true, tid);
417                }
418                QueueStatus::Queued => {
419                    item.status = QueueStatus::Error {
420                        message: "Cancelled".to_string(),
421                    };
422                    Self::sync_recovery(item);
423                    return (true, None);
424                }
425                _ => {}
426            }
427        }
428        (false, None)
429    }
430
431    pub fn retry(&mut self, id: u64) -> bool {
432        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
433            if matches!(item.status, QueueStatus::Error { .. }) {
434                item.status = QueueStatus::Queued;
435                item.cancel_token = CancellationToken::new();
436                item.percent = 0.0;
437                item.speed_bytes_per_sec = 0.0;
438                item.downloaded_bytes = 0;
439                item.file_path = None;
440                item.file_size_bytes = None;
441                Self::sync_recovery(item);
442                return true;
443            }
444        }
445        false
446    }
447
448    pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
449        let result = self.remove_inner(id);
450        if result.is_some() {
451            crate::core::manager::recovery::remove(id);
452        }
453        result
454    }
455
456    fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
457        if let Some(pos) = self.items.iter().position(|i| i.id == id) {
458            let item = &self.items[pos];
459            if item.status == QueueStatus::Active {
460                item.cancel_token.cancel();
461            }
462            if item.status == QueueStatus::Paused && item.platform == "magnet" {
463                item.cancel_token.cancel();
464            }
465            let torrent_id = if item.status == QueueStatus::Seeding
466                || (item.status == QueueStatus::Paused && item.platform == "magnet")
467            {
468                item.torrent_id
469            } else {
470                None
471            };
472            self.items.remove(pos);
473            return Some(torrent_id);
474        }
475        None
476    }
477
478    pub fn clear_finished(&mut self) {
479        let to_remove: Vec<u64> = self
480            .items
481            .iter()
482            .filter(|i| {
483                matches!(
484                    i.status,
485                    QueueStatus::Complete { .. } | QueueStatus::Error { .. }
486                )
487            })
488            .map(|i| i.id)
489            .collect();
490        for id in &to_remove {
491            crate::core::manager::recovery::remove(*id);
492        }
493        self.items.retain(|i| {
494            !matches!(
495                i.status,
496                QueueStatus::Complete { .. } | QueueStatus::Error { .. }
497            )
498        });
499    }
500
501    pub fn get_state(&self) -> Vec<QueueItemInfo> {
502        self.items.iter().map(|i| i.to_info()).collect()
503    }
504
505    pub fn has_url(&self, url: &str) -> bool {
506        self.items.iter().any(|i| {
507            i.url == url
508                && matches!(
509                    i.status,
510                    QueueStatus::Queued
511                        | QueueStatus::Active
512                        | QueueStatus::Paused
513                        | QueueStatus::Seeding
514                )
515        })
516    }
517}
518
519pub struct ProgressThrottle {
520    last_emit: std::time::Instant,
521    min_interval: std::time::Duration,
522}
523
524impl ProgressThrottle {
525    pub fn new(min_interval_ms: u64) -> Self {
526        Self {
527            last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
528            min_interval: std::time::Duration::from_millis(min_interval_ms),
529        }
530    }
531
532    pub fn should_emit(&mut self) -> bool {
533        let now = std::time::Instant::now();
534        if now.duration_since(self.last_emit) >= self.min_interval {
535            self.last_emit = now;
536            true
537        } else {
538            false
539        }
540    }
541}
542
543pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
544    let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
545    if n.is_multiple_of(10) {
546        tracing::debug!("[perf] emit_queue_state called {} times", n);
547    }
548    if let Some(reporter) = reporter {
549        reporter.on_queue_update(state);
550    }
551}
552
553pub fn emit_queue_state(queue: &DownloadQueue) {
554    let state = queue.get_state();
555    emit_queue_state_from_state(&queue.reporter, state);
556}
557
558/// RAII guard that ensures an Active queue item never leaks a slot.
559struct ActiveJobSlot {
560    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
561    item_id: u64,
562    armed: bool,
563}
564
565impl ActiveJobSlot {
566    fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
567        Self {
568            queue,
569            item_id,
570            armed: true,
571        }
572    }
573
574    fn disarm(mut self) {
575        self.armed = false;
576    }
577}
578
579impl Drop for ActiveJobSlot {
580    fn drop(&mut self) {
581        if !self.armed {
582            return;
583        }
584        let queue = self.queue.clone();
585        let item_id = self.item_id;
586        tokio::spawn(async move {
587            let state = {
588                let mut q = queue.lock().await;
589                let still_active = q
590                    .items
591                    .iter()
592                    .find(|i| i.id == item_id)
593                    .map(|i| i.status == QueueStatus::Active)
594                    .unwrap_or(false);
595                if !still_active {
596                    return;
597                }
598                tracing::warn!(
599                    "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
600                    item_id
601                );
602                q.mark_complete(
603                    item_id,
604                    false,
605                    Some("Download interrupted".to_string()),
606                    None,
607                    None,
608                );
609                q.get_state()
610            };
611            let reporter = { queue.lock().await.reporter.clone() };
612            emit_queue_state_from_state(&reporter, state);
613            tokio::spawn(try_start_next(queue));
614        });
615    }
616}
617
618pub fn spawn_download(
619    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
620    item_id: u64,
621) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
622    Box::pin(async move {
623        let _timer_start = std::time::Instant::now();
624        let slot = ActiveJobSlot::new(queue.clone(), item_id);
625        tokio::spawn(spawn_download_inner(queue.clone(), item_id));
626        slot.disarm();
627        tracing::debug!(
628            "[perf] spawn_download {} took {:?}",
629            item_id,
630            _timer_start.elapsed()
631        );
632    })
633}
634
635async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
636    tracing::info!("[queue] download {} started", item_id);
637
638    let reporter = { queue.lock().await.reporter.clone() };
639
640    if let Some(r) = &reporter {
641        r.on_progress(
642            item_id,
643            crate::core::events::QueueItemProgress {
644                id: item_id,
645                title: "".to_string(),
646                platform: "".to_string(),
647                percent: 0.0,
648                speed_bytes_per_sec: 0.0,
649                downloaded_bytes: 0,
650                total_bytes: None,
651                phase: "preparing".to_string(),
652            },
653        );
654    }
655
656    let (
657        url,
658        output_dir,
659        download_mode,
660        quality,
661        format_id,
662        referer,
663        extra_headers,
664        page_url,
665        user_agent,
666        cancel_token,
667        media_info,
668        platform_name,
669        downloader,
670        ytdlp_path,
671        from_hotkey,
672    ) = {
673        let q = queue.lock().await;
674        let item = match q.items.iter().find(|i| i.id == item_id) {
675            Some(i) => i,
676            None => return,
677        };
678        (
679            item.url.clone(),
680            item.output_dir.clone(),
681            item.download_mode.clone(),
682            item.quality.clone(),
683            item.format_id.clone(),
684            item.referer.clone(),
685            item.extra_headers.clone(),
686            item.page_url.clone(),
687            item.user_agent.clone(),
688            item.cancel_token.clone(),
689            item.media_info.clone(),
690            item.platform.clone(),
691            item.downloader.clone(),
692            item.ytdlp_path.clone(),
693            item.from_hotkey,
694        )
695    };
696
697    let info_start = std::time::Instant::now();
698    let info = match media_info {
699        Some(i) => {
700            tracing::info!(
701                "[queue] info for {} from cache/pre-fetched in {:?}",
702                item_id,
703                info_start.elapsed()
704            );
705            i
706        }
707        None => {
708            tracing::debug!(
709                "[perf] spawn_download_inner {}: media_info is None, fetching info",
710                item_id
711            );
712            if let Some(r) = &reporter {
713                r.on_progress(
714                    item_id,
715                    crate::core::events::QueueItemProgress {
716                        id: item_id,
717                        title: url.clone(),
718                        platform: platform_name.clone(),
719                        percent: 0.0,
720                        speed_bytes_per_sec: 0.0,
721                        downloaded_bytes: 0,
722                        total_bytes: None,
723                        phase: "fetching_info".to_string(),
724                    },
725                );
726            }
727
728            let info_result = tokio::time::timeout(
729                std::time::Duration::from_secs(60),
730                fetch_and_cache_info(&url, &*downloader, &platform_name),
731            )
732            .await;
733
734            match info_result {
735                Ok(Ok(i)) => i,
736                Ok(Err(e)) => {
737                    let state = {
738                        let mut q = queue.lock().await;
739                        q.mark_complete(item_id, false, Some(e.to_string()), None, None);
740                        q.get_state()
741                    };
742                    emit_queue_state_from_state(&reporter, state);
743                    tokio::spawn(try_start_next(queue));
744                    return;
745                }
746                Err(_) => {
747                    tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
748                    let state = {
749                        let mut q = queue.lock().await;
750                        q.mark_complete(
751                            item_id,
752                            false,
753                            Some("Timed out fetching video info".to_string()),
754                            None,
755                            None,
756                        );
757                        q.get_state()
758                    };
759                    emit_queue_state_from_state(&reporter, state);
760                    tokio::spawn(try_start_next(queue));
761                    return;
762                }
763            }
764        }
765    };
766    tracing::info!(
767        "[queue] info fetch for {} took {:?}",
768        item_id,
769        info_start.elapsed()
770    );
771
772    let state = {
773        let mut q = queue.lock().await;
774        if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
775            item.title = info.title.clone();
776            item.total_bytes = info.file_size_bytes;
777            let fc = if info.media_type == crate::models::media::MediaType::Carousel
778                || info.media_type == crate::models::media::MediaType::Playlist
779            {
780                info.available_qualities.len() as u32
781            } else {
782                1
783            };
784            item.file_count = Some(fc);
785            item.media_info = Some(info.clone());
786        }
787        q.get_state()
788    };
789    emit_queue_state_from_state(&reporter, state);
790
791    if let Some(r) = &reporter {
792        r.on_progress(
793            item_id,
794            crate::core::events::QueueItemProgress {
795                id: item_id,
796                title: info.title.clone(),
797                platform: platform_name.clone(),
798                percent: 0.5,
799                speed_bytes_per_sec: 0.0,
800                downloaded_bytes: 0,
801                total_bytes: info.file_size_bytes,
802                phase: "starting".to_string(),
803            },
804        );
805    }
806
807    let settings = crate::models::settings::AppSettings::load_from_disk();
808    let tmpl = settings.download.filename_template.clone();
809    let mut final_output_dir = std::path::PathBuf::from(&output_dir);
810    if settings.download.organize_by_platform {
811        final_output_dir = final_output_dir.join(&platform_name);
812    }
813    let torrent_id_slot = Arc::new(tokio::sync::Mutex::new(None));
814    let opts = crate::models::media::DownloadOptions {
815        quality: quality.or_else(|| Some(settings.download.video_quality.clone())),
816        output_dir: final_output_dir,
817        filename_template: Some(tmpl),
818        download_subtitles: settings.download.download_subtitles,
819        include_auto_subtitles: settings.download.include_auto_subtitles,
820        download_mode,
821        format_id,
822        referer,
823        extra_headers,
824        page_url,
825        user_agent,
826        cancel_token: cancel_token.clone(),
827        concurrent_fragments: settings.advanced.concurrent_fragments,
828        ytdlp_path,
829        torrent_listen_port: Some(settings.advanced.torrent_listen_port),
830        torrent_id_slot: Some(torrent_id_slot.clone()),
831    };
832
833    let total_bytes = info.file_size_bytes;
834    let item_title = info.title.clone();
835    let item_platform = platform_name.clone();
836    let (tx, mut rx) = mpsc::channel::<f64>(32);
837
838    let reporter_progress = reporter.clone();
839    let queue_progress = queue.clone();
840    let torrent_id_slot_progress = torrent_id_slot.clone();
841    let progress_forwarder = tokio::spawn(async move {
842        let mut last_bytes: u64 = 0;
843        let mut last_time = std::time::Instant::now();
844        let mut throttle = ProgressThrottle::new(250);
845        let mut current_speed: f64 = 0.0;
846
847        while let Some(percent) = rx.recv().await {
848            if !throttle.should_emit() && percent < 100.0 {
849                continue;
850            }
851
852            let now = std::time::Instant::now();
853            let clamped = percent.max(0.0);
854            let downloaded_bytes = total_bytes
855                .map(|total| (clamped / 100.0 * total as f64) as u64)
856                .unwrap_or(0);
857
858            if total_bytes.is_some() && downloaded_bytes > last_bytes {
859                let dt = now.duration_since(last_time).as_secs_f64();
860                if dt > 0.1 {
861                    let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
862                    current_speed = if current_speed > 0.0 {
863                        current_speed * 0.7 + instant_speed * 0.3
864                    } else {
865                        instant_speed
866                    };
867                }
868            }
869
870            last_bytes = downloaded_bytes;
871            last_time = now;
872
873            let phase = match percent {
874                p if p < -1.5 => "connecting",
875                p if p < -0.5 => "starting",
876                p if p > 0.0 => "downloading",
877                _ => "starting",
878            };
879
880            {
881                let mut q = queue_progress.lock().await;
882                let tid = { *torrent_id_slot_progress.lock().await };
883                q.update_progress(
884                    item_id,
885                    clamped,
886                    current_speed,
887                    downloaded_bytes,
888                    total_bytes,
889                    tid,
890                );
891            }
892
893            if let Some(r) = &reporter_progress {
894                r.on_progress(
895                    item_id,
896                    crate::core::events::QueueItemProgress {
897                        id: item_id,
898                        title: item_title.clone(),
899                        platform: item_platform.clone(),
900                        percent: clamped,
901                        speed_bytes_per_sec: current_speed,
902                        downloaded_bytes,
903                        total_bytes,
904                        phase: phase.to_string(),
905                    },
906                );
907            }
908        }
909    });
910
911    if let Some(ua) = opts.user_agent.clone() {
912        crate::core::ytdlp::register_ext_user_agent(url.clone(), ua);
913    }
914    if let Some(hdrs) = opts.extra_headers.clone() {
915        crate::core::ytdlp::register_ext_headers(url.clone(), hdrs);
916    }
917
918    let dl_start = std::time::Instant::now();
919    let dl_future = async {
920        tokio::select! {
921            r = downloader.download(&info, &opts, tx) => r,
922            _ = cancel_token.cancelled() => {
923                Err(anyhow::anyhow!("Download cancelado"))
924            }
925        }
926    };
927    let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
928        .scope(item_id, dl_future)
929        .await;
930    crate::core::ytdlp::clear_ext_user_agent(&url);
931    crate::core::ytdlp::clear_ext_headers(&url);
932    tracing::info!(
933        "[queue] download {} completed in {:?}",
934        item_id,
935        dl_start.elapsed()
936    );
937
938    let _ = progress_forwarder.await;
939
940    let was_paused = {
941        let q = queue.lock().await;
942        q.items
943            .iter()
944            .find(|i| i.id == item_id)
945            .map(|i| i.status == QueueStatus::Paused)
946            .unwrap_or(false)
947    };
948
949    if was_paused {
950        let state = {
951            let q = queue.lock().await;
952            q.get_state()
953        };
954        emit_queue_state_from_state(&reporter, state);
955        tokio::spawn(try_start_next(queue));
956        return;
957    }
958
959    match result {
960        Ok(dl) => {
961            if settings.download.embed_metadata
962                && platform_name != "magnet"
963                && crate::core::ffmpeg::is_ffmpeg_available().await
964            {
965                let metadata = crate::core::ffmpeg::MetadataEmbed {
966                    title: Some(info.title.clone()),
967                    artist: Some(info.author.clone()),
968                    thumbnail_url: info.thumbnail_url.clone(),
969                    ..Default::default()
970                };
971                if let Err(e) = crate::core::ffmpeg::embed_metadata(
972                    &dl.file_path,
973                    &metadata,
974                    settings.download.embed_thumbnail,
975                    shared_http_client(),
976                )
977                .await
978                {
979                    tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
980                }
981            }
982
983            if from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
984                #[cfg(not(target_os = "android"))]
985                {
986                    match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
987                        Ok(()) => {
988                            tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
989                            // Notified via general event if needed, or by reporter
990                            // If needed: if let Some(r) = &reporter { r.on_complete(...) }
991                        }
992                        Err(e) => {
993                            tracing::warn!("[clipboard] failed to copy file: {}", e);
994                        }
995                    }
996                }
997            }
998
999            let state = {
1000                let mut q = queue.lock().await;
1001                if platform_name == "magnet" && dl.torrent_id.is_some() {
1002                    q.mark_seeding(
1003                        item_id,
1004                        Some(dl.file_path.to_string_lossy().to_string()),
1005                        Some(dl.file_size_bytes),
1006                        dl.torrent_id,
1007                    );
1008                } else {
1009                    q.mark_complete(
1010                        item_id,
1011                        true,
1012                        None,
1013                        Some(dl.file_path.to_string_lossy().to_string()),
1014                        Some(dl.file_size_bytes),
1015                    );
1016                }
1017                q.get_state()
1018            };
1019            if let Some(r) = &reporter {
1020                r.on_complete(
1021                    item_id,
1022                    Some(dl.file_path.to_string_lossy().to_string()),
1023                    Some(dl.file_size_bytes),
1024                );
1025            }
1026            emit_queue_state_from_state(&reporter, state);
1027        }
1028        Err(e) => {
1029            let raw_err = e.to_string();
1030            let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
1031            let user_msg = if category != "unknown" {
1032                format!("{} ({})", hint, raw_err)
1033            } else {
1034                raw_err.clone()
1035            };
1036            tracing::error!(
1037                "Download error '{}' [{}]: {}",
1038                platform_name,
1039                category,
1040                raw_err
1041            );
1042            let state = {
1043                let mut q = queue.lock().await;
1044                q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
1045                q.get_state()
1046            };
1047            if let Some(r) = &reporter {
1048                r.on_error(item_id, user_msg);
1049            }
1050            emit_queue_state_from_state(&reporter, state);
1051        }
1052    }
1053
1054    tokio::spawn(try_start_next(queue));
1055}
1056
1057pub async fn fetch_and_cache_info(
1058    url: &str,
1059    downloader: &dyn PlatformDownloader,
1060    platform: &str,
1061) -> anyhow::Result<MediaInfo> {
1062    {
1063        let cache = info_cache().lock().await;
1064        if let Some(entry) = cache.get(url) {
1065            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1066                tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1067                return Ok(entry.info.clone());
1068            }
1069        }
1070    }
1071
1072    let url_lock = {
1073        let mut map = in_flight_map().lock().await;
1074        map.entry(url.to_string())
1075            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1076            .clone()
1077    };
1078    let _guard = url_lock.lock().await;
1079
1080    {
1081        let cache = info_cache().lock().await;
1082        if let Some(entry) = cache.get(url) {
1083            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1084                tracing::debug!(
1085                    "[perf] fetch_and_cache_info: dedup cache hit for {}",
1086                    platform
1087                );
1088                return Ok(entry.info.clone());
1089            }
1090        }
1091    }
1092
1093    tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1094    let mut info = downloader.get_media_info(url).await?;
1095
1096    if is_generic_title(&info.title) {
1097        let name = crate::core::random_names::get_random_name();
1098        info.title = format!("video_{}", name);
1099    }
1100
1101    let mut cache = info_cache().lock().await;
1102    cache.insert(
1103        url.to_string(),
1104        CachedInfo {
1105            info: info.clone(),
1106            cached_at: std::time::Instant::now(),
1107        },
1108    );
1109    if cache.len() > 50 {
1110        cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1111    }
1112    Ok(info)
1113}
1114
1115pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1116    let cache = info_cache().lock().await;
1117    cache
1118        .get(url)
1119        .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1120        .map(|entry| entry.info.clone())
1121}
1122
1123pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1124    prefetch_info_with_emit(url, downloader, platform, None).await;
1125}
1126
1127pub async fn prefetch_info_with_emit(
1128    url: &str,
1129    downloader: &dyn PlatformDownloader,
1130    platform: &str,
1131    reporter: Option<SharedReporter>,
1132) {
1133    let _timer_start = std::time::Instant::now();
1134    tracing::debug!("[perf] prefetch_info: started");
1135    match fetch_and_cache_info(url, downloader, platform).await {
1136        Ok(info) => {
1137            tracing::debug!(
1138                "[perf] prefetch_info: completed in {:?} — {}",
1139                _timer_start.elapsed(),
1140                info.title
1141            );
1142            if let Some(r) = reporter {
1143                r.on_media_preview(
1144                    url.to_string(),
1145                    info.title.clone(),
1146                    info.author.clone(),
1147                    info.thumbnail_url.clone(),
1148                    info.duration_seconds,
1149                );
1150            }
1151        }
1152        Err(e) => tracing::warn!(
1153            "[perf] prefetch_info: failed in {:?} — {}",
1154            _timer_start.elapsed(),
1155            e
1156        ),
1157    }
1158}
1159
1160pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1161    let _timer_start = std::time::Instant::now();
1162    let (next_ids, stagger, state_to_emit, reporter) = {
1163        let mut q = queue.lock().await;
1164        let ids = q.next_queued_ids();
1165        for nid in &ids {
1166            q.mark_active(*nid);
1167        }
1168        let state = if !ids.is_empty() {
1169            Some(q.get_state())
1170        } else {
1171            None
1172        };
1173        (ids, q.stagger_delay_ms, state, q.reporter.clone())
1174    };
1175
1176    if let Some(state) = state_to_emit {
1177        emit_queue_state_from_state(&reporter, state);
1178    }
1179
1180    let batch_size = next_ids.len();
1181    for (i, nid) in next_ids.into_iter().enumerate() {
1182        if let Some(r) = &reporter {
1183            r.on_progress(
1184                nid,
1185                crate::core::events::QueueItemProgress {
1186                    id: nid,
1187                    title: String::new(),
1188                    platform: String::new(),
1189                    percent: 0.0,
1190                    speed_bytes_per_sec: 0.0,
1191                    downloaded_bytes: 0,
1192                    total_bytes: None,
1193                    phase: "queued_starting".to_string(),
1194                },
1195            );
1196        }
1197
1198        if i > 0 {
1199            let item_platform = {
1200                let q = queue.lock().await;
1201                q.items
1202                    .iter()
1203                    .find(|item| item.id == nid)
1204                    .map(|item| item.platform.clone())
1205            };
1206            let delay_ms = if item_platform.as_deref() == Some("youtube") {
1207                2000
1208            } else if batch_size > 3 {
1209                stagger.max(1000)
1210            } else {
1211                stagger
1212            };
1213            if delay_ms > 0 {
1214                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1215            }
1216        }
1217        let queue_c = queue.clone();
1218        tokio::spawn(async move {
1219            tokio::spawn(spawn_download(queue_c, nid));
1220        });
1221    }
1222    tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1223}
1224
1225fn is_generic_title(title: &str) -> bool {
1226    let t = title.to_lowercase();
1227    let t = t.trim();
1228    t.is_empty()
1229        || t == "video"
1230        || t == "media"
1231        || t == "untitled"
1232        || t == "unknown"
1233        || t.starts_with("video [video]")
1234        || t.starts_with("media [media]")
1235}