Skip to main content

mangofetch_core/core/manager/
queue.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use crate::core::traits::SharedReporter;
13use crate::models::media::MediaInfo;
14use crate::models::queue::{QueueItemInfo, QueueStatus};
15use crate::platforms::traits::PlatformDownloader;
16
17fn shared_http_client() -> &'static reqwest::Client {
18    static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
19    CLIENT.get_or_init(|| {
20        crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
21            .build()
22            .unwrap_or_default()
23    })
24}
25
26struct CachedInfo {
27    info: MediaInfo,
28    cached_at: std::time::Instant,
29}
30
31static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
32
33fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
34    INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
35}
36
37const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
38
39static IN_FLIGHT_FETCHES: OnceLock<
40    tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
41> = OnceLock::new();
42
43fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
44    IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct MediaPreviewEvent {
49    pub url: String,
50    pub title: String,
51    pub author: String,
52    pub thumbnail_url: Option<String>,
53    pub duration_seconds: Option<f64>,
54}
55
56pub struct QueueItem {
57    pub id: u64,
58    pub url: String,
59    pub platform: String,
60    pub title: String,
61    pub status: QueueStatus,
62    pub cancel_token: CancellationToken,
63    pub output_dir: String,
64    pub download_mode: Option<String>,
65    pub quality: Option<String>,
66    pub format_id: Option<String>,
67    pub referer: Option<String>,
68    pub extra_headers: Option<std::collections::HashMap<String, String>>,
69    pub page_url: Option<String>,
70    pub user_agent: Option<String>,
71    pub percent: f64,
72    pub speed_bytes_per_sec: f64,
73    pub downloaded_bytes: u64,
74    pub total_bytes: Option<u64>,
75    pub file_path: Option<String>,
76    pub file_size_bytes: Option<u64>,
77    pub file_count: Option<u32>,
78    pub media_info: Option<MediaInfo>,
79    pub downloader: Arc<dyn PlatformDownloader>,
80    pub ytdlp_path: Option<PathBuf>,
81    pub from_hotkey: bool,
82    pub torrent_id: Option<usize>,
83    pub phase: String,
84}
85
86impl QueueItem {
87    pub fn to_info(&self) -> QueueItemInfo {
88        QueueItemInfo {
89            id: self.id,
90            url: self.url.clone(),
91            platform: self.platform.clone(),
92            title: self.title.clone(),
93            status: self.status.clone(),
94            percent: self.percent,
95            speed_bytes_per_sec: self.speed_bytes_per_sec,
96            downloaded_bytes: self.downloaded_bytes,
97            total_bytes: self.total_bytes,
98            phase: self.phase.clone(),
99            file_path: self.file_path.clone(),
100            file_size_bytes: self.file_size_bytes,
101            file_count: self.file_count,
102            thumbnail_url: self
103                .media_info
104                .as_ref()
105                .and_then(|m| m.thumbnail_url.clone()),
106        }
107    }
108}
109
110pub struct DownloadQueue {
111    pub items: Vec<QueueItem>,
112    pub max_concurrent: u32,
113    pub stagger_delay_ms: u64,
114    pub reporter: Option<SharedReporter>,
115}
116
117impl DownloadQueue {
118    pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
119        Self {
120            items: Vec::new(),
121            max_concurrent,
122            stagger_delay_ms: 150,
123            reporter,
124        }
125    }
126
127    pub fn set_reporter(&mut self, reporter: SharedReporter) {
128        self.reporter = Some(reporter);
129    }
130
131    pub fn load_from_recovery(&mut self, registry: &crate::core::registry::PlatformRegistry) {
132        let recovery_items = crate::core::manager::recovery::list();
133        for item in recovery_items {
134            let downloader = registry
135                .find_platform(&item.url)
136                .or_else(|| registry.find_by_name(&item.platform));
137
138            if let Some(dl) = downloader {
139                let percent = if matches!(item.status, QueueStatus::Complete { success: true }) {
140                    100.0
141                } else {
142                    0.0
143                };
144
145                let q_item = QueueItem {
146                    id: item.id,
147                    url: item.url,
148                    platform: item.platform,
149                    title: item.title,
150                    status: item.status,
151                    cancel_token: CancellationToken::new(),
152                    output_dir: item.output_dir,
153                    download_mode: item.download_mode,
154                    quality: item.quality,
155                    format_id: item.format_id,
156                    referer: item.referer,
157                    extra_headers: None,
158                    page_url: None,
159                    user_agent: None,
160                    percent,
161                    speed_bytes_per_sec: 0.0,
162                    downloaded_bytes: 0,
163                    total_bytes: item.file_size_bytes,
164                    file_path: item.file_path,
165                    file_size_bytes: item.file_size_bytes,
166                    file_count: None,
167                    media_info: None,
168                    downloader: dl,
169                    ytdlp_path: None,
170                    from_hotkey: false,
171                    torrent_id: None,
172                    phase: "Restored".to_string(),
173                };
174                self.items.push(q_item);
175            }
176        }
177    }
178
179    fn sync_recovery(&self, item: &QueueItem) {
180        crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
181            id: item.id,
182            url: item.url.clone(),
183            title: item.title.clone(),
184            platform: item.platform.clone(),
185            output_dir: item.output_dir.clone(),
186            status: item.status.clone(),
187            download_mode: item.download_mode.clone(),
188            quality: item.quality.clone(),
189            format_id: item.format_id.clone(),
190            referer: item.referer.clone(),
191            file_path: item.file_path.clone(),
192            file_size_bytes: item.file_size_bytes,
193        });
194    }
195
196    #[allow(clippy::too_many_arguments)]
197    pub fn enqueue(
198        &mut self,
199        id: u64,
200        url: String,
201        platform: String,
202        title: String,
203        output_dir: String,
204        download_mode: Option<String>,
205        quality: Option<String>,
206        format_id: Option<String>,
207        referer: Option<String>,
208        extra_headers: Option<std::collections::HashMap<String, String>>,
209        page_url: Option<String>,
210        user_agent: Option<String>,
211        media_info: Option<MediaInfo>,
212        total_bytes: Option<u64>,
213        file_count: Option<u32>,
214        downloader: Arc<dyn PlatformDownloader>,
215        ytdlp_path: Option<PathBuf>,
216        from_hotkey: bool,
217    ) {
218        let item = QueueItem {
219            id,
220            url,
221            platform,
222            title,
223            status: QueueStatus::Queued,
224            cancel_token: CancellationToken::new(),
225            output_dir,
226            download_mode,
227            quality,
228            format_id,
229            referer,
230            extra_headers,
231            page_url,
232            user_agent,
233            percent: 0.0,
234            speed_bytes_per_sec: 0.0,
235            downloaded_bytes: 0,
236            total_bytes,
237            file_path: None,
238            file_size_bytes: None,
239            file_count,
240            media_info,
241            downloader,
242            ytdlp_path,
243            from_hotkey,
244            torrent_id: None,
245            phase: "Queued".to_string(),
246        };
247        self.items.push(item);
248        self.sync_recovery(self.items.last().unwrap());
249    }
250
251    pub fn active_count(&self) -> u32 {
252        self.items
253            .iter()
254            .filter(|i| i.status == QueueStatus::Active)
255            .count() as u32
256    }
257
258    pub fn next_queued_ids(&self) -> Vec<u64> {
259        let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
260        self.items
261            .iter()
262            .filter(|i| i.status == QueueStatus::Queued)
263            .take(slots)
264            .map(|i| i.id)
265            .collect()
266    }
267
268    pub fn mark_active(&mut self, id: u64) {
269        let item = self.items.iter_mut().find(|i| i.id == id);
270        if let Some(item) = item {
271            item.status = QueueStatus::Active;
272            item.cancel_token = CancellationToken::new();
273            let item_ref = item as *const QueueItem;
274            self.sync_recovery(unsafe { &*item_ref });
275        }
276    }
277
278    pub fn mark_complete(
279        &mut self,
280        id: u64,
281        success: bool,
282        error: Option<String>,
283        file_path: Option<String>,
284        file_size_bytes: Option<u64>,
285    ) {
286        let item = self.items.iter_mut().find(|i| i.id == id);
287        if let Some(item) = item {
288            if success {
289                item.status = QueueStatus::Complete { success: true };
290                item.percent = 100.0;
291            } else {
292                item.status = QueueStatus::Error {
293                    message: error.unwrap_or_default(),
294                };
295            }
296            item.file_path = file_path;
297            item.file_size_bytes = file_size_bytes;
298            item.speed_bytes_per_sec = 0.0;
299            let item_ref = item as *const QueueItem;
300            self.sync_recovery(unsafe { &*item_ref });
301        }
302    }
303
304    pub fn mark_seeding(
305        &mut self,
306        id: u64,
307        file_path: Option<String>,
308        file_size_bytes: Option<u64>,
309        torrent_id: Option<usize>,
310    ) {
311        let item = self.items.iter_mut().find(|i| i.id == id);
312        if let Some(item) = item {
313            item.status = QueueStatus::Seeding;
314            item.percent = 100.0;
315            item.file_path = file_path;
316            item.file_size_bytes = file_size_bytes;
317            item.speed_bytes_per_sec = 0.0;
318            item.torrent_id = torrent_id;
319            let item_ref = item as *const QueueItem;
320            self.sync_recovery(unsafe { &*item_ref });
321        }
322    }
323
324    pub fn update_progress(
325        &mut self,
326        id: u64,
327        percent: f64,
328        speed: f64,
329        downloaded: u64,
330        total: Option<u64>,
331        torrent_id: Option<usize>,
332    ) {
333        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
334            item.percent = percent;
335            item.speed_bytes_per_sec = speed;
336            item.downloaded_bytes = downloaded;
337            if let Some(t) = total {
338                item.total_bytes = Some(t);
339            }
340            if torrent_id.is_some() && item.torrent_id.is_none() {
341                item.torrent_id = torrent_id;
342            }
343        }
344    }
345
346    pub fn pause(&mut self, id: u64) -> bool {
347        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
348            if item.status == QueueStatus::Active {
349                if item.platform != "magnet" {
350                    item.cancel_token.cancel();
351                }
352                item.status = QueueStatus::Paused;
353                item.speed_bytes_per_sec = 0.0;
354                let item_ref = item as *const QueueItem;
355                self.sync_recovery(unsafe { &*item_ref });
356                return true;
357            }
358        }
359        false
360    }
361
362    pub fn resume(&mut self, id: u64) -> bool {
363        let item = self.items.iter_mut().find(|i| i.id == id);
364        if let Some(item) = item {
365            if item.status == QueueStatus::Paused {
366                if item.platform == "magnet" {
367                    item.status = QueueStatus::Active;
368                } else {
369                    item.status = QueueStatus::Queued;
370                    item.cancel_token = CancellationToken::new();
371                }
372                let item_ref = item as *const QueueItem;
373                self.sync_recovery(unsafe { &*item_ref });
374                return true;
375            }
376        }
377        false
378    }
379
380    pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
381        let result = self.cancel_inner(id);
382        if result.0 {
383            crate::core::manager::recovery::remove(id);
384        }
385        result
386    }
387
388    fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
389        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
390            match &item.status {
391                QueueStatus::Active => {
392                    item.cancel_token.cancel();
393                    item.status = QueueStatus::Error {
394                        message: "Cancelled".to_string(),
395                    };
396                    item.speed_bytes_per_sec = 0.0;
397                    let item_ref = item as *const QueueItem;
398                    self.sync_recovery(unsafe { &*item_ref });
399                    return (true, None);
400                }
401                QueueStatus::Seeding => {
402                    let tid = item.torrent_id;
403                    item.status = QueueStatus::Error {
404                        message: "Cancelled".to_string(),
405                    };
406                    item.speed_bytes_per_sec = 0.0;
407                    let item_ref = item as *const QueueItem;
408                    self.sync_recovery(unsafe { &*item_ref });
409                    return (true, tid);
410                }
411                QueueStatus::Paused => {
412                    item.cancel_token.cancel();
413                    let tid = if item.platform == "magnet" {
414                        item.torrent_id
415                    } else {
416                        None
417                    };
418                    item.status = QueueStatus::Error {
419                        message: "Cancelled".to_string(),
420                    };
421                    item.speed_bytes_per_sec = 0.0;
422                    let item_ref = item as *const QueueItem;
423                    self.sync_recovery(unsafe { &*item_ref });
424                    return (true, tid);
425                }
426                QueueStatus::Queued => {
427                    item.status = QueueStatus::Error {
428                        message: "Cancelled".to_string(),
429                    };
430                    let item_ref = item as *const QueueItem;
431                    self.sync_recovery(unsafe { &*item_ref });
432                    return (true, None);
433                }
434                _ => {}
435            }
436        }
437        (false, None)
438    }
439
440    pub fn retry(&mut self, id: u64) -> bool {
441        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
442            if matches!(item.status, QueueStatus::Error { .. }) {
443                item.status = QueueStatus::Queued;
444                item.cancel_token = CancellationToken::new();
445                item.percent = 0.0;
446                item.speed_bytes_per_sec = 0.0;
447                item.downloaded_bytes = 0;
448                item.file_path = None;
449                item.file_size_bytes = None;
450                let item_ref = item as *const QueueItem;
451                self.sync_recovery(unsafe { &*item_ref });
452                return true;
453            }
454        }
455        false
456    }
457
458    pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
459        let result = self.remove_inner(id);
460        if result.is_some() {
461            crate::core::manager::recovery::remove(id);
462        }
463        result
464    }
465
466    fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
467        if let Some(pos) = self.items.iter().position(|i| i.id == id) {
468            let item = &self.items[pos];
469            if item.status == QueueStatus::Active {
470                item.cancel_token.cancel();
471            }
472            if item.status == QueueStatus::Paused && item.platform == "magnet" {
473                item.cancel_token.cancel();
474            }
475            let torrent_id = if item.status == QueueStatus::Seeding
476                || (item.status == QueueStatus::Paused && item.platform == "magnet")
477            {
478                item.torrent_id
479            } else {
480                None
481            };
482            self.items.remove(pos);
483            return Some(torrent_id);
484        }
485        None
486    }
487
488    pub fn clear_finished(&mut self) {
489        let to_remove: Vec<u64> = self
490            .items
491            .iter()
492            .filter(|i| {
493                matches!(
494                    i.status,
495                    QueueStatus::Complete { .. } | QueueStatus::Error { .. }
496                )
497            })
498            .map(|i| i.id)
499            .collect();
500        for id in &to_remove {
501            crate::core::manager::recovery::remove(*id);
502        }
503        self.items.retain(|i| {
504            !matches!(
505                i.status,
506                QueueStatus::Complete { .. } | QueueStatus::Error { .. }
507            )
508        });
509    }
510
511    pub fn get_state(&self) -> Vec<QueueItemInfo> {
512        self.items.iter().map(|i| i.to_info()).collect()
513    }
514
515    pub fn has_url(&self, url: &str) -> bool {
516        self.items.iter().any(|i| {
517            i.url == url
518                && matches!(
519                    i.status,
520                    QueueStatus::Queued
521                        | QueueStatus::Active
522                        | QueueStatus::Paused
523                        | QueueStatus::Seeding
524                )
525        })
526    }
527}
528
529pub struct ProgressThrottle {
530    last_emit: std::time::Instant,
531    min_interval: std::time::Duration,
532}
533
534impl ProgressThrottle {
535    pub fn new(min_interval_ms: u64) -> Self {
536        Self {
537            last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
538            min_interval: std::time::Duration::from_millis(min_interval_ms),
539        }
540    }
541
542    pub fn should_emit(&mut self) -> bool {
543        let now = std::time::Instant::now();
544        if now.duration_since(self.last_emit) >= self.min_interval {
545            self.last_emit = now;
546            true
547        } else {
548            false
549        }
550    }
551}
552
553pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
554    let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
555    if n.is_multiple_of(10) {
556        tracing::debug!("[perf] emit_queue_state called {} times", n);
557    }
558    if let Some(reporter) = reporter {
559        reporter.on_queue_update(state);
560    }
561}
562
563pub fn emit_queue_state(queue: &DownloadQueue) {
564    let state = queue.get_state();
565    emit_queue_state_from_state(&queue.reporter, state);
566}
567
568/// RAII guard that ensures an Active queue item never leaks a slot.
569struct ActiveJobSlot {
570    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
571    item_id: u64,
572    armed: bool,
573}
574
575impl ActiveJobSlot {
576    fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
577        Self {
578            queue,
579            item_id,
580            armed: true,
581        }
582    }
583
584    fn disarm(mut self) {
585        self.armed = false;
586    }
587}
588
589impl Drop for ActiveJobSlot {
590    fn drop(&mut self) {
591        if !self.armed {
592            return;
593        }
594        let queue = self.queue.clone();
595        let item_id = self.item_id;
596        tokio::spawn(async move {
597            let state = {
598                let mut q = queue.lock().await;
599                let still_active = q
600                    .items
601                    .iter()
602                    .find(|i| i.id == item_id)
603                    .map(|i| i.status == QueueStatus::Active)
604                    .unwrap_or(false);
605                if !still_active {
606                    return;
607                }
608                tracing::warn!(
609                    "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
610                    item_id
611                );
612                q.mark_complete(
613                    item_id,
614                    false,
615                    Some("Download interrupted".to_string()),
616                    None,
617                    None,
618                );
619                q.get_state()
620            };
621            let reporter = { queue.lock().await.reporter.clone() };
622            emit_queue_state_from_state(&reporter, state);
623            tokio::spawn(try_start_next(queue));
624        });
625    }
626}
627
628pub fn spawn_download(
629    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
630    item_id: u64,
631) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
632    Box::pin(async move {
633        let _timer_start = std::time::Instant::now();
634        let slot = ActiveJobSlot::new(queue.clone(), item_id);
635        tokio::spawn(spawn_download_inner(queue.clone(), item_id));
636        slot.disarm();
637        tracing::debug!(
638            "[perf] spawn_download {} took {:?}",
639            item_id,
640            _timer_start.elapsed()
641        );
642    })
643}
644
645async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
646    tracing::info!("[queue] download {} started", item_id);
647
648    let reporter = { queue.lock().await.reporter.clone() };
649
650    if let Some(r) = &reporter {
651        r.on_progress(
652            item_id,
653            crate::core::events::QueueItemProgress {
654                id: item_id,
655                title: "".to_string(),
656                platform: "".to_string(),
657                percent: 0.0,
658                speed_bytes_per_sec: 0.0,
659                downloaded_bytes: 0,
660                total_bytes: None,
661                phase: "preparing".to_string(),
662            },
663        );
664    }
665
666    let (
667        url,
668        output_dir,
669        download_mode,
670        quality,
671        format_id,
672        referer,
673        extra_headers,
674        page_url,
675        user_agent,
676        cancel_token,
677        media_info,
678        platform_name,
679        downloader,
680        ytdlp_path,
681        from_hotkey,
682    ) = {
683        let q = queue.lock().await;
684        let item = match q.items.iter().find(|i| i.id == item_id) {
685            Some(i) => i,
686            None => return,
687        };
688        (
689            item.url.clone(),
690            item.output_dir.clone(),
691            item.download_mode.clone(),
692            item.quality.clone(),
693            item.format_id.clone(),
694            item.referer.clone(),
695            item.extra_headers.clone(),
696            item.page_url.clone(),
697            item.user_agent.clone(),
698            item.cancel_token.clone(),
699            item.media_info.clone(),
700            item.platform.clone(),
701            item.downloader.clone(),
702            item.ytdlp_path.clone(),
703            item.from_hotkey,
704        )
705    };
706
707    let info_start = std::time::Instant::now();
708    let info = match media_info {
709        Some(i) => {
710            tracing::info!(
711                "[queue] info for {} from cache/pre-fetched in {:?}",
712                item_id,
713                info_start.elapsed()
714            );
715            i
716        }
717        None => {
718            tracing::debug!(
719                "[perf] spawn_download_inner {}: media_info is None, fetching info",
720                item_id
721            );
722            if let Some(r) = &reporter {
723                r.on_progress(
724                    item_id,
725                    crate::core::events::QueueItemProgress {
726                        id: item_id,
727                        title: url.clone(),
728                        platform: platform_name.clone(),
729                        percent: 0.0,
730                        speed_bytes_per_sec: 0.0,
731                        downloaded_bytes: 0,
732                        total_bytes: None,
733                        phase: "fetching_info".to_string(),
734                    },
735                );
736            }
737
738            let info_result = tokio::time::timeout(
739                std::time::Duration::from_secs(60),
740                fetch_and_cache_info(&url, &*downloader, &platform_name),
741            )
742            .await;
743
744            match info_result {
745                Ok(Ok(i)) => i,
746                Ok(Err(e)) => {
747                    let state = {
748                        let mut q = queue.lock().await;
749                        q.mark_complete(item_id, false, Some(e.to_string()), None, None);
750                        q.get_state()
751                    };
752                    emit_queue_state_from_state(&reporter, state);
753                    tokio::spawn(try_start_next(queue));
754                    return;
755                }
756                Err(_) => {
757                    tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
758                    let state = {
759                        let mut q = queue.lock().await;
760                        q.mark_complete(
761                            item_id,
762                            false,
763                            Some("Timed out fetching video info".to_string()),
764                            None,
765                            None,
766                        );
767                        q.get_state()
768                    };
769                    emit_queue_state_from_state(&reporter, state);
770                    tokio::spawn(try_start_next(queue));
771                    return;
772                }
773            }
774        }
775    };
776    tracing::info!(
777        "[queue] info fetch for {} took {:?}",
778        item_id,
779        info_start.elapsed()
780    );
781
782    let state = {
783        let mut q = queue.lock().await;
784        if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
785            item.title = info.title.clone();
786            item.total_bytes = info.file_size_bytes;
787            let fc = if info.media_type == crate::models::media::MediaType::Carousel
788                || info.media_type == crate::models::media::MediaType::Playlist
789            {
790                info.available_qualities.len() as u32
791            } else {
792                1
793            };
794            item.file_count = Some(fc);
795            item.media_info = Some(info.clone());
796        }
797        q.get_state()
798    };
799    emit_queue_state_from_state(&reporter, state);
800
801    if let Some(r) = &reporter {
802        r.on_progress(
803            item_id,
804            crate::core::events::QueueItemProgress {
805                id: item_id,
806                title: info.title.clone(),
807                platform: platform_name.clone(),
808                percent: 0.5,
809                speed_bytes_per_sec: 0.0,
810                downloaded_bytes: 0,
811                total_bytes: info.file_size_bytes,
812                phase: "starting".to_string(),
813            },
814        );
815    }
816
817    let settings = crate::models::settings::AppSettings::load_from_disk();
818    let tmpl = settings.download.filename_template.clone();
819    let mut final_output_dir = std::path::PathBuf::from(&output_dir);
820    if settings.download.organize_by_platform {
821        final_output_dir = final_output_dir.join(&platform_name);
822    }
823    let torrent_id_slot = Arc::new(tokio::sync::Mutex::new(None));
824    let opts = crate::models::media::DownloadOptions {
825        quality: quality.or_else(|| Some(settings.download.video_quality.clone())),
826        output_dir: final_output_dir,
827        filename_template: Some(tmpl),
828        download_subtitles: settings.download.download_subtitles,
829        include_auto_subtitles: settings.download.include_auto_subtitles,
830        download_mode,
831        format_id,
832        referer,
833        extra_headers,
834        page_url,
835        user_agent,
836        cancel_token: cancel_token.clone(),
837        concurrent_fragments: settings.advanced.concurrent_fragments,
838        ytdlp_path,
839        torrent_listen_port: Some(settings.advanced.torrent_listen_port),
840        torrent_id_slot: Some(torrent_id_slot.clone()),
841    };
842
843    let total_bytes = info.file_size_bytes;
844    let item_title = info.title.clone();
845    let item_platform = platform_name.clone();
846    let (tx, mut rx) = mpsc::channel::<f64>(32);
847
848    let reporter_progress = reporter.clone();
849    let queue_progress = queue.clone();
850    let torrent_id_slot_progress = torrent_id_slot.clone();
851    let progress_forwarder = tokio::spawn(async move {
852        let mut last_bytes: u64 = 0;
853        let mut last_time = std::time::Instant::now();
854        let mut throttle = ProgressThrottle::new(250);
855        let mut current_speed: f64 = 0.0;
856
857        while let Some(percent) = rx.recv().await {
858            if !throttle.should_emit() && percent < 100.0 {
859                continue;
860            }
861
862            let now = std::time::Instant::now();
863            let clamped = percent.max(0.0);
864            let downloaded_bytes = total_bytes
865                .map(|total| (clamped / 100.0 * total as f64) as u64)
866                .unwrap_or(0);
867
868            if total_bytes.is_some() && downloaded_bytes > last_bytes {
869                let dt = now.duration_since(last_time).as_secs_f64();
870                if dt > 0.1 {
871                    let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
872                    current_speed = if current_speed > 0.0 {
873                        current_speed * 0.7 + instant_speed * 0.3
874                    } else {
875                        instant_speed
876                    };
877                }
878            }
879
880            last_bytes = downloaded_bytes;
881            last_time = now;
882
883            let phase = match percent {
884                p if p < -1.5 => "connecting",
885                p if p < -0.5 => "starting",
886                p if p > 0.0 => "downloading",
887                _ => "starting",
888            };
889
890            {
891                let mut q = queue_progress.lock().await;
892                let tid = { *torrent_id_slot_progress.lock().await };
893                q.update_progress(
894                    item_id,
895                    clamped,
896                    current_speed,
897                    downloaded_bytes,
898                    total_bytes,
899                    tid,
900                );
901            }
902
903            if let Some(r) = &reporter_progress {
904                r.on_progress(
905                    item_id,
906                    crate::core::events::QueueItemProgress {
907                        id: item_id,
908                        title: item_title.clone(),
909                        platform: item_platform.clone(),
910                        percent: clamped,
911                        speed_bytes_per_sec: current_speed,
912                        downloaded_bytes,
913                        total_bytes,
914                        phase: phase.to_string(),
915                    },
916                );
917            }
918        }
919    });
920
921    if let Some(ua) = opts.user_agent.clone() {
922        crate::core::ytdlp::register_ext_user_agent(url.clone(), ua);
923    }
924    if let Some(hdrs) = opts.extra_headers.clone() {
925        crate::core::ytdlp::register_ext_headers(url.clone(), hdrs);
926    }
927
928    let dl_start = std::time::Instant::now();
929    let dl_future = async {
930        tokio::select! {
931            r = downloader.download(&info, &opts, tx) => r,
932            _ = cancel_token.cancelled() => {
933                Err(anyhow::anyhow!("Download cancelado"))
934            }
935        }
936    };
937    let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
938        .scope(item_id, dl_future)
939        .await;
940    crate::core::ytdlp::clear_ext_user_agent(&url);
941    crate::core::ytdlp::clear_ext_headers(&url);
942    tracing::info!(
943        "[queue] download {} completed in {:?}",
944        item_id,
945        dl_start.elapsed()
946    );
947
948    let _ = progress_forwarder.await;
949
950    let was_paused = {
951        let q = queue.lock().await;
952        q.items
953            .iter()
954            .find(|i| i.id == item_id)
955            .map(|i| i.status == QueueStatus::Paused)
956            .unwrap_or(false)
957    };
958
959    if was_paused {
960        let state = {
961            let q = queue.lock().await;
962            q.get_state()
963        };
964        emit_queue_state_from_state(&reporter, state);
965        tokio::spawn(try_start_next(queue));
966        return;
967    }
968
969    match result {
970        Ok(dl) => {
971            if settings.download.embed_metadata
972                && platform_name != "magnet"
973                && crate::core::ffmpeg::is_ffmpeg_available().await
974            {
975                let metadata = crate::core::ffmpeg::MetadataEmbed {
976                    title: Some(info.title.clone()),
977                    artist: Some(info.author.clone()),
978                    thumbnail_url: info.thumbnail_url.clone(),
979                    ..Default::default()
980                };
981                if let Err(e) = crate::core::ffmpeg::embed_metadata(
982                    &dl.file_path,
983                    &metadata,
984                    settings.download.embed_thumbnail,
985                    shared_http_client(),
986                )
987                .await
988                {
989                    tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
990                }
991            }
992
993            if from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
994                #[cfg(not(target_os = "android"))]
995                {
996                    match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
997                        Ok(()) => {
998                            tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
999                            // Notified via general event if needed, or by reporter
1000                            // If needed: if let Some(r) = &reporter { r.on_complete(...) }
1001                        }
1002                        Err(e) => {
1003                            tracing::warn!("[clipboard] failed to copy file: {}", e);
1004                        }
1005                    }
1006                }
1007            }
1008
1009            let state = {
1010                let mut q = queue.lock().await;
1011                if platform_name == "magnet" && dl.torrent_id.is_some() {
1012                    q.mark_seeding(
1013                        item_id,
1014                        Some(dl.file_path.to_string_lossy().to_string()),
1015                        Some(dl.file_size_bytes),
1016                        dl.torrent_id,
1017                    );
1018                } else {
1019                    q.mark_complete(
1020                        item_id,
1021                        true,
1022                        None,
1023                        Some(dl.file_path.to_string_lossy().to_string()),
1024                        Some(dl.file_size_bytes),
1025                    );
1026                }
1027                q.get_state()
1028            };
1029            if let Some(r) = &reporter {
1030                r.on_complete(
1031                    item_id,
1032                    Some(dl.file_path.to_string_lossy().to_string()),
1033                    Some(dl.file_size_bytes),
1034                );
1035            }
1036            emit_queue_state_from_state(&reporter, state);
1037        }
1038        Err(e) => {
1039            let raw_err = e.to_string();
1040            let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
1041            let user_msg = if category != "unknown" {
1042                format!("{} ({})", hint, raw_err)
1043            } else {
1044                raw_err.clone()
1045            };
1046            tracing::error!(
1047                "Download error '{}' [{}]: {}",
1048                platform_name,
1049                category,
1050                raw_err
1051            );
1052            let state = {
1053                let mut q = queue.lock().await;
1054                q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
1055                q.get_state()
1056            };
1057            if let Some(r) = &reporter {
1058                r.on_error(item_id, user_msg);
1059            }
1060            emit_queue_state_from_state(&reporter, state);
1061        }
1062    }
1063
1064    tokio::spawn(try_start_next(queue));
1065}
1066
1067pub async fn fetch_and_cache_info(
1068    url: &str,
1069    downloader: &dyn PlatformDownloader,
1070    platform: &str,
1071) -> anyhow::Result<MediaInfo> {
1072    {
1073        let cache = info_cache().lock().await;
1074        if let Some(entry) = cache.get(url) {
1075            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1076                tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1077                return Ok(entry.info.clone());
1078            }
1079        }
1080    }
1081
1082    let url_lock = {
1083        let mut map = in_flight_map().lock().await;
1084        map.entry(url.to_string())
1085            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1086            .clone()
1087    };
1088    let _guard = url_lock.lock().await;
1089
1090    {
1091        let cache = info_cache().lock().await;
1092        if let Some(entry) = cache.get(url) {
1093            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1094                tracing::debug!(
1095                    "[perf] fetch_and_cache_info: dedup cache hit for {}",
1096                    platform
1097                );
1098                return Ok(entry.info.clone());
1099            }
1100        }
1101    }
1102
1103    tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1104    let mut info = downloader.get_media_info(url).await?;
1105
1106    if is_generic_title(&info.title) {
1107        let name = crate::core::random_names::get_random_name();
1108        info.title = format!("video_{}", name);
1109    }
1110
1111    let mut cache = info_cache().lock().await;
1112    cache.insert(
1113        url.to_string(),
1114        CachedInfo {
1115            info: info.clone(),
1116            cached_at: std::time::Instant::now(),
1117        },
1118    );
1119    if cache.len() > 50 {
1120        cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1121    }
1122    Ok(info)
1123}
1124
1125pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1126    let cache = info_cache().lock().await;
1127    cache
1128        .get(url)
1129        .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1130        .map(|entry| entry.info.clone())
1131}
1132
1133pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1134    prefetch_info_with_emit(url, downloader, platform, None).await;
1135}
1136
1137pub async fn prefetch_info_with_emit(
1138    url: &str,
1139    downloader: &dyn PlatformDownloader,
1140    platform: &str,
1141    reporter: Option<SharedReporter>,
1142) {
1143    let _timer_start = std::time::Instant::now();
1144    tracing::debug!("[perf] prefetch_info: started");
1145    match fetch_and_cache_info(url, downloader, platform).await {
1146        Ok(info) => {
1147            tracing::debug!(
1148                "[perf] prefetch_info: completed in {:?} — {}",
1149                _timer_start.elapsed(),
1150                info.title
1151            );
1152            if let Some(r) = reporter {
1153                r.on_media_preview(
1154                    url.to_string(),
1155                    info.title.clone(),
1156                    info.author.clone(),
1157                    info.thumbnail_url.clone(),
1158                    info.duration_seconds,
1159                );
1160            }
1161        }
1162        Err(e) => tracing::warn!(
1163            "[perf] prefetch_info: failed in {:?} — {}",
1164            _timer_start.elapsed(),
1165            e
1166        ),
1167    }
1168}
1169
1170pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1171    let _timer_start = std::time::Instant::now();
1172    let (next_ids, stagger, state_to_emit, reporter) = {
1173        let mut q = queue.lock().await;
1174        let ids = q.next_queued_ids();
1175        for nid in &ids {
1176            q.mark_active(*nid);
1177        }
1178        let state = if !ids.is_empty() {
1179            Some(q.get_state())
1180        } else {
1181            None
1182        };
1183        (ids, q.stagger_delay_ms, state, q.reporter.clone())
1184    };
1185
1186    if let Some(state) = state_to_emit {
1187        emit_queue_state_from_state(&reporter, state);
1188    }
1189
1190    let batch_size = next_ids.len();
1191    for (i, nid) in next_ids.into_iter().enumerate() {
1192        if let Some(r) = &reporter {
1193            r.on_progress(
1194                nid,
1195                crate::core::events::QueueItemProgress {
1196                    id: nid,
1197                    title: String::new(),
1198                    platform: String::new(),
1199                    percent: 0.0,
1200                    speed_bytes_per_sec: 0.0,
1201                    downloaded_bytes: 0,
1202                    total_bytes: None,
1203                    phase: "queued_starting".to_string(),
1204                },
1205            );
1206        }
1207
1208        if i > 0 {
1209            let item_platform = {
1210                let q = queue.lock().await;
1211                q.items
1212                    .iter()
1213                    .find(|item| item.id == nid)
1214                    .map(|item| item.platform.clone())
1215            };
1216            let delay_ms = if item_platform.as_deref() == Some("youtube") {
1217                2000
1218            } else if batch_size > 3 {
1219                stagger.max(1000)
1220            } else {
1221                stagger
1222            };
1223            if delay_ms > 0 {
1224                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1225            }
1226        }
1227        let queue_c = queue.clone();
1228        tokio::spawn(async move {
1229            tokio::spawn(spawn_download(queue_c, nid));
1230        });
1231    }
1232    tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1233}
1234
1235fn is_generic_title(title: &str) -> bool {
1236    let t = title.to_lowercase();
1237    let t = t.trim();
1238    t.is_empty()
1239        || t == "video"
1240        || t == "media"
1241        || t == "untitled"
1242        || t == "unknown"
1243        || t.starts_with("video [video]")
1244        || t.starts_with("media [media]")
1245}