Skip to main content

mangofetch_core/core/manager/
queue.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::{Arc, OnceLock};
5
6static EMIT_COUNT: AtomicU64 = AtomicU64::new(0);
7
8use serde::Serialize;
9use tokio::sync::mpsc;
10use tokio_util::sync::CancellationToken;
11
12use crate::core::traits::SharedReporter;
13use crate::models::media::MediaInfo;
14use crate::models::queue::{QueueItemInfo, QueueStatus};
15use crate::platforms::traits::PlatformDownloader;
16
17fn shared_http_client() -> &'static reqwest::Client {
18    static CLIENT: OnceLock<reqwest::Client> = OnceLock::new();
19    CLIENT.get_or_init(|| {
20        crate::core::http_client::apply_global_proxy(reqwest::Client::builder())
21            .build()
22            .unwrap_or_default()
23    })
24}
25
26struct CachedInfo {
27    info: MediaInfo,
28    cached_at: std::time::Instant,
29}
30
31static INFO_CACHE: OnceLock<tokio::sync::Mutex<HashMap<String, CachedInfo>>> = OnceLock::new();
32
33fn info_cache() -> &'static tokio::sync::Mutex<HashMap<String, CachedInfo>> {
34    INFO_CACHE.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
35}
36
37const INFO_CACHE_TTL: std::time::Duration = std::time::Duration::from_secs(600);
38
39static IN_FLIGHT_FETCHES: OnceLock<
40    tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>>,
41> = OnceLock::new();
42
43fn in_flight_map() -> &'static tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::Mutex<()>>>> {
44    IN_FLIGHT_FETCHES.get_or_init(|| tokio::sync::Mutex::new(HashMap::new()))
45}
46
47#[derive(Debug, Clone, Serialize)]
48pub struct MediaPreviewEvent {
49    pub url: String,
50    pub title: String,
51    pub author: String,
52    pub thumbnail_url: Option<String>,
53    pub duration_seconds: Option<f64>,
54}
55
56pub struct QueueItem {
57    pub id: u64,
58    pub url: String,
59    pub platform: String,
60    pub title: String,
61    pub status: QueueStatus,
62    pub cancel_token: CancellationToken,
63    pub output_dir: String,
64    pub download_mode: Option<String>,
65    pub quality: Option<String>,
66    pub format_id: Option<String>,
67    pub referer: Option<String>,
68    pub extra_headers: Option<std::collections::HashMap<String, String>>,
69    pub page_url: Option<String>,
70    pub user_agent: Option<String>,
71    pub percent: f64,
72    pub speed_bytes_per_sec: f64,
73    pub downloaded_bytes: u64,
74    pub total_bytes: Option<u64>,
75    pub file_path: Option<String>,
76    pub file_size_bytes: Option<u64>,
77    pub file_count: Option<u32>,
78    pub media_info: Option<MediaInfo>,
79    pub downloader: Arc<dyn PlatformDownloader>,
80    pub ytdlp_path: Option<PathBuf>,
81    pub from_hotkey: bool,
82    pub torrent_id: Option<usize>,
83    pub phase: String,
84}
85
86impl QueueItem {
87    pub fn to_info(&self) -> QueueItemInfo {
88        QueueItemInfo {
89            id: self.id,
90            url: self.url.clone(),
91            platform: self.platform.clone(),
92            title: self.title.clone(),
93            status: self.status.clone(),
94            percent: self.percent,
95            speed_bytes_per_sec: self.speed_bytes_per_sec,
96            downloaded_bytes: self.downloaded_bytes,
97            total_bytes: self.total_bytes,
98            phase: self.phase.clone(),
99            file_path: self.file_path.clone(),
100            file_size_bytes: self.file_size_bytes,
101            file_count: self.file_count,
102            thumbnail_url: self
103                .media_info
104                .as_ref()
105                .and_then(|m| m.thumbnail_url.clone()),
106        }
107    }
108}
109
110pub struct DownloadQueue {
111    pub items: Vec<QueueItem>,
112    pub max_concurrent: u32,
113    pub stagger_delay_ms: u64,
114    pub reporter: Option<SharedReporter>,
115}
116
117impl DownloadQueue {
118    pub fn new(max_concurrent: u32, reporter: Option<SharedReporter>) -> Self {
119        Self {
120            items: Vec::new(),
121            max_concurrent,
122            stagger_delay_ms: 150,
123            reporter,
124        }
125    }
126
127    pub fn set_reporter(&mut self, reporter: SharedReporter) {
128        self.reporter = Some(reporter);
129    }
130
131    fn sync_recovery(&self, item: &QueueItem) {
132        crate::core::manager::recovery::persist(crate::core::manager::recovery::RecoveryItem {
133            id: item.id,
134            url: item.url.clone(),
135            title: item.title.clone(),
136            platform: item.platform.clone(),
137            output_dir: item.output_dir.clone(),
138            status: item.status.clone(),
139            download_mode: item.download_mode.clone(),
140            quality: item.quality.clone(),
141            format_id: item.format_id.clone(),
142            referer: item.referer.clone(),
143        });
144    }
145
146    #[allow(clippy::too_many_arguments)]
147    pub fn enqueue(
148        &mut self,
149        id: u64,
150        url: String,
151        platform: String,
152        title: String,
153        output_dir: String,
154        download_mode: Option<String>,
155        quality: Option<String>,
156        format_id: Option<String>,
157        referer: Option<String>,
158        extra_headers: Option<std::collections::HashMap<String, String>>,
159        page_url: Option<String>,
160        user_agent: Option<String>,
161        media_info: Option<MediaInfo>,
162        total_bytes: Option<u64>,
163        file_count: Option<u32>,
164        downloader: Arc<dyn PlatformDownloader>,
165        ytdlp_path: Option<PathBuf>,
166        from_hotkey: bool,
167    ) {
168        let item = QueueItem {
169            id,
170            url,
171            platform,
172            title,
173            status: QueueStatus::Queued,
174            cancel_token: CancellationToken::new(),
175            output_dir,
176            download_mode,
177            quality,
178            format_id,
179            referer,
180            extra_headers,
181            page_url,
182            user_agent,
183            percent: 0.0,
184            speed_bytes_per_sec: 0.0,
185            downloaded_bytes: 0,
186            total_bytes,
187            file_path: None,
188            file_size_bytes: None,
189            file_count,
190            media_info,
191            downloader,
192            ytdlp_path,
193            from_hotkey,
194            torrent_id: None,
195            phase: "Queued".to_string(),
196        };
197        self.items.push(item);
198        self.sync_recovery(self.items.last().unwrap());
199    }
200
201    pub fn active_count(&self) -> u32 {
202        self.items
203            .iter()
204            .filter(|i| i.status == QueueStatus::Active)
205            .count() as u32
206    }
207
208    pub fn next_queued_ids(&self) -> Vec<u64> {
209        let slots = self.max_concurrent.saturating_sub(self.active_count()) as usize;
210        self.items
211            .iter()
212            .filter(|i| i.status == QueueStatus::Queued)
213            .take(slots)
214            .map(|i| i.id)
215            .collect()
216    }
217
218    pub fn mark_active(&mut self, id: u64) {
219        let item = self.items.iter_mut().find(|i| i.id == id);
220        if let Some(item) = item {
221            item.status = QueueStatus::Active;
222            item.cancel_token = CancellationToken::new();
223            let item_ref = item as *const QueueItem;
224            self.sync_recovery(unsafe { &*item_ref });
225        }
226    }
227
228    pub fn mark_complete(
229        &mut self,
230        id: u64,
231        success: bool,
232        error: Option<String>,
233        file_path: Option<String>,
234        file_size_bytes: Option<u64>,
235    ) {
236        let item = self.items.iter_mut().find(|i| i.id == id);
237        if let Some(item) = item {
238            if success {
239                item.status = QueueStatus::Complete { success: true };
240                item.percent = 100.0;
241            } else {
242                item.status = QueueStatus::Error {
243                    message: error.unwrap_or_default(),
244                };
245            }
246            item.file_path = file_path;
247            item.file_size_bytes = file_size_bytes;
248            item.speed_bytes_per_sec = 0.0;
249            let item_ref = item as *const QueueItem;
250            self.sync_recovery(unsafe { &*item_ref });
251        }
252    }
253
254    pub fn mark_seeding(
255        &mut self,
256        id: u64,
257        file_path: Option<String>,
258        file_size_bytes: Option<u64>,
259        torrent_id: Option<usize>,
260    ) {
261        let item = self.items.iter_mut().find(|i| i.id == id);
262        if let Some(item) = item {
263            item.status = QueueStatus::Seeding;
264            item.percent = 100.0;
265            item.file_path = file_path;
266            item.file_size_bytes = file_size_bytes;
267            item.speed_bytes_per_sec = 0.0;
268            item.torrent_id = torrent_id;
269            let item_ref = item as *const QueueItem;
270            self.sync_recovery(unsafe { &*item_ref });
271        }
272    }
273
274    pub fn update_progress(
275        &mut self,
276        id: u64,
277        percent: f64,
278        speed: f64,
279        downloaded: u64,
280        total: Option<u64>,
281        torrent_id: Option<usize>,
282    ) {
283        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
284            item.percent = percent;
285            item.speed_bytes_per_sec = speed;
286            item.downloaded_bytes = downloaded;
287            if let Some(t) = total {
288                item.total_bytes = Some(t);
289            }
290            if torrent_id.is_some() && item.torrent_id.is_none() {
291                item.torrent_id = torrent_id;
292            }
293        }
294    }
295
296    pub fn pause(&mut self, id: u64) -> bool {
297        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
298            if item.status == QueueStatus::Active {
299                if item.platform != "magnet" {
300                    item.cancel_token.cancel();
301                }
302                item.status = QueueStatus::Paused;
303                item.speed_bytes_per_sec = 0.0;
304                let item_ref = item as *const QueueItem;
305                self.sync_recovery(unsafe { &*item_ref });
306                return true;
307            }
308        }
309        false
310    }
311
312    pub fn resume(&mut self, id: u64) -> bool {
313        let item = self.items.iter_mut().find(|i| i.id == id);
314        if let Some(item) = item {
315            if item.status == QueueStatus::Paused {
316                if item.platform == "magnet" {
317                    item.status = QueueStatus::Active;
318                } else {
319                    item.status = QueueStatus::Queued;
320                    item.cancel_token = CancellationToken::new();
321                }
322                let item_ref = item as *const QueueItem;
323                self.sync_recovery(unsafe { &*item_ref });
324                return true;
325            }
326        }
327        false
328    }
329
330    pub fn cancel(&mut self, id: u64) -> (bool, Option<usize>) {
331        let result = self.cancel_inner(id);
332        if result.0 {
333            crate::core::manager::recovery::remove(id);
334        }
335        result
336    }
337
338    fn cancel_inner(&mut self, id: u64) -> (bool, Option<usize>) {
339        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
340            match &item.status {
341                QueueStatus::Active => {
342                    item.cancel_token.cancel();
343                    item.status = QueueStatus::Error {
344                        message: "Cancelled".to_string(),
345                    };
346                    item.speed_bytes_per_sec = 0.0;
347                    let item_ref = item as *const QueueItem;
348                    self.sync_recovery(unsafe { &*item_ref });
349                    return (true, None);
350                }
351                QueueStatus::Seeding => {
352                    let tid = item.torrent_id;
353                    item.status = QueueStatus::Error {
354                        message: "Cancelled".to_string(),
355                    };
356                    item.speed_bytes_per_sec = 0.0;
357                    let item_ref = item as *const QueueItem;
358                    self.sync_recovery(unsafe { &*item_ref });
359                    return (true, tid);
360                }
361                QueueStatus::Paused => {
362                    item.cancel_token.cancel();
363                    let tid = if item.platform == "magnet" {
364                        item.torrent_id
365                    } else {
366                        None
367                    };
368                    item.status = QueueStatus::Error {
369                        message: "Cancelled".to_string(),
370                    };
371                    item.speed_bytes_per_sec = 0.0;
372                    let item_ref = item as *const QueueItem;
373                    self.sync_recovery(unsafe { &*item_ref });
374                    return (true, tid);
375                }
376                QueueStatus::Queued => {
377                    item.status = QueueStatus::Error {
378                        message: "Cancelled".to_string(),
379                    };
380                    let item_ref = item as *const QueueItem;
381                    self.sync_recovery(unsafe { &*item_ref });
382                    return (true, None);
383                }
384                _ => {}
385            }
386        }
387        (false, None)
388    }
389
390    pub fn retry(&mut self, id: u64) -> bool {
391        if let Some(item) = self.items.iter_mut().find(|i| i.id == id) {
392            if matches!(item.status, QueueStatus::Error { .. }) {
393                item.status = QueueStatus::Queued;
394                item.cancel_token = CancellationToken::new();
395                item.percent = 0.0;
396                item.speed_bytes_per_sec = 0.0;
397                item.downloaded_bytes = 0;
398                item.file_path = None;
399                item.file_size_bytes = None;
400                let item_ref = item as *const QueueItem;
401                self.sync_recovery(unsafe { &*item_ref });
402                return true;
403            }
404        }
405        false
406    }
407
408    pub fn remove(&mut self, id: u64) -> Option<Option<usize>> {
409        let result = self.remove_inner(id);
410        if result.is_some() {
411            crate::core::manager::recovery::remove(id);
412        }
413        result
414    }
415
416    fn remove_inner(&mut self, id: u64) -> Option<Option<usize>> {
417        if let Some(pos) = self.items.iter().position(|i| i.id == id) {
418            let item = &self.items[pos];
419            if item.status == QueueStatus::Active {
420                item.cancel_token.cancel();
421            }
422            if item.status == QueueStatus::Paused && item.platform == "magnet" {
423                item.cancel_token.cancel();
424            }
425            let torrent_id = if item.status == QueueStatus::Seeding
426                || (item.status == QueueStatus::Paused && item.platform == "magnet")
427            {
428                item.torrent_id
429            } else {
430                None
431            };
432            self.items.remove(pos);
433            return Some(torrent_id);
434        }
435        None
436    }
437
438    pub fn clear_finished(&mut self) {
439        let to_remove: Vec<u64> = self
440            .items
441            .iter()
442            .filter(|i| {
443                matches!(
444                    i.status,
445                    QueueStatus::Complete { .. } | QueueStatus::Error { .. }
446                )
447            })
448            .map(|i| i.id)
449            .collect();
450        for id in &to_remove {
451            crate::core::manager::recovery::remove(*id);
452        }
453        self.items.retain(|i| {
454            !matches!(
455                i.status,
456                QueueStatus::Complete { .. } | QueueStatus::Error { .. }
457            )
458        });
459    }
460
461    pub fn get_state(&self) -> Vec<QueueItemInfo> {
462        self.items.iter().map(|i| i.to_info()).collect()
463    }
464
465    pub fn has_url(&self, url: &str) -> bool {
466        self.items.iter().any(|i| {
467            i.url == url
468                && matches!(
469                    i.status,
470                    QueueStatus::Queued
471                        | QueueStatus::Active
472                        | QueueStatus::Paused
473                        | QueueStatus::Seeding
474                )
475        })
476    }
477}
478
479pub struct ProgressThrottle {
480    last_emit: std::time::Instant,
481    min_interval: std::time::Duration,
482}
483
484impl ProgressThrottle {
485    pub fn new(min_interval_ms: u64) -> Self {
486        Self {
487            last_emit: std::time::Instant::now() - std::time::Duration::from_secs(10),
488            min_interval: std::time::Duration::from_millis(min_interval_ms),
489        }
490    }
491
492    pub fn should_emit(&mut self) -> bool {
493        let now = std::time::Instant::now();
494        if now.duration_since(self.last_emit) >= self.min_interval {
495            self.last_emit = now;
496            true
497        } else {
498            false
499        }
500    }
501}
502
503pub fn emit_queue_state_from_state(reporter: &Option<SharedReporter>, state: Vec<QueueItemInfo>) {
504    let n = EMIT_COUNT.fetch_add(1, Ordering::Relaxed);
505    if n % 10 == 0 {
506        tracing::debug!("[perf] emit_queue_state called {} times", n);
507    }
508    if let Some(reporter) = reporter {
509        reporter.on_queue_update(state);
510    }
511}
512
513pub fn emit_queue_state(queue: &DownloadQueue) {
514    let state = queue.get_state();
515    emit_queue_state_from_state(&queue.reporter, state);
516}
517
518/// RAII guard that ensures an Active queue item never leaks a slot.
519struct ActiveJobSlot {
520    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
521    item_id: u64,
522    armed: bool,
523}
524
525impl ActiveJobSlot {
526    fn new(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) -> Self {
527        Self {
528            queue,
529            item_id,
530            armed: true,
531        }
532    }
533
534    fn disarm(mut self) {
535        self.armed = false;
536    }
537}
538
539impl Drop for ActiveJobSlot {
540    fn drop(&mut self) {
541        if !self.armed {
542            return;
543        }
544        let queue = self.queue.clone();
545        let item_id = self.item_id;
546        tokio::spawn(async move {
547            let state = {
548                let mut q = queue.lock().await;
549                let still_active = q
550                    .items
551                    .iter()
552                    .find(|i| i.id == item_id)
553                    .map(|i| i.status == QueueStatus::Active)
554                    .unwrap_or(false);
555                if !still_active {
556                    return;
557                }
558                tracing::warn!(
559                    "[queue] ActiveJobSlot guard firing for {} — download ended without clean release",
560                    item_id
561                );
562                q.mark_complete(
563                    item_id,
564                    false,
565                    Some("Download interrupted".to_string()),
566                    None,
567                    None,
568                );
569                q.get_state()
570            };
571            let reporter = { queue.lock().await.reporter.clone() };
572            emit_queue_state_from_state(&reporter, state);
573            tokio::spawn(try_start_next(queue));
574        });
575    }
576}
577
578pub fn spawn_download(
579    queue: Arc<tokio::sync::Mutex<DownloadQueue>>,
580    item_id: u64,
581) -> std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> {
582    Box::pin(async move {
583        let _timer_start = std::time::Instant::now();
584        let slot = ActiveJobSlot::new(queue.clone(), item_id);
585        tokio::spawn(spawn_download_inner(queue.clone(), item_id));
586        slot.disarm();
587        tracing::debug!(
588            "[perf] spawn_download {} took {:?}",
589            item_id,
590            _timer_start.elapsed()
591        );
592    })
593}
594
595async fn spawn_download_inner(queue: Arc<tokio::sync::Mutex<DownloadQueue>>, item_id: u64) {
596    tracing::info!("[queue] download {} started", item_id);
597
598    let reporter = { queue.lock().await.reporter.clone() };
599
600    if let Some(r) = &reporter {
601        r.on_progress(
602            item_id,
603            crate::core::events::QueueItemProgress {
604                id: item_id,
605                title: "".to_string(),
606                platform: "".to_string(),
607                percent: 0.0,
608                speed_bytes_per_sec: 0.0,
609                downloaded_bytes: 0,
610                total_bytes: None,
611                phase: "preparing".to_string(),
612            },
613        );
614    }
615
616    let (
617        url,
618        output_dir,
619        download_mode,
620        quality,
621        format_id,
622        referer,
623        extra_headers,
624        page_url,
625        user_agent,
626        cancel_token,
627        media_info,
628        platform_name,
629        downloader,
630        ytdlp_path,
631        from_hotkey,
632    ) = {
633        let q = queue.lock().await;
634        let item = match q.items.iter().find(|i| i.id == item_id) {
635            Some(i) => i,
636            None => return,
637        };
638        (
639            item.url.clone(),
640            item.output_dir.clone(),
641            item.download_mode.clone(),
642            item.quality.clone(),
643            item.format_id.clone(),
644            item.referer.clone(),
645            item.extra_headers.clone(),
646            item.page_url.clone(),
647            item.user_agent.clone(),
648            item.cancel_token.clone(),
649            item.media_info.clone(),
650            item.platform.clone(),
651            item.downloader.clone(),
652            item.ytdlp_path.clone(),
653            item.from_hotkey,
654        )
655    };
656
657    let info_start = std::time::Instant::now();
658    let info = match media_info {
659        Some(i) => {
660            tracing::info!(
661                "[queue] info for {} from cache/pre-fetched in {:?}",
662                item_id,
663                info_start.elapsed()
664            );
665            i
666        }
667        None => {
668            tracing::debug!(
669                "[perf] spawn_download_inner {}: media_info is None, fetching info",
670                item_id
671            );
672            if let Some(r) = &reporter {
673                r.on_progress(
674                    item_id,
675                    crate::core::events::QueueItemProgress {
676                        id: item_id,
677                        title: url.clone(),
678                        platform: platform_name.clone(),
679                        percent: 0.0,
680                        speed_bytes_per_sec: 0.0,
681                        downloaded_bytes: 0,
682                        total_bytes: None,
683                        phase: "fetching_info".to_string(),
684                    },
685                );
686            }
687
688            let info_result = tokio::time::timeout(
689                std::time::Duration::from_secs(60),
690                fetch_and_cache_info(&url, &*downloader, &platform_name),
691            )
692            .await;
693
694            match info_result {
695                Ok(Ok(i)) => i,
696                Ok(Err(e)) => {
697                    let state = {
698                        let mut q = queue.lock().await;
699                        q.mark_complete(item_id, false, Some(e.to_string()), None, None);
700                        q.get_state()
701                    };
702                    emit_queue_state_from_state(&reporter, state);
703                    tokio::spawn(try_start_next(queue));
704                    return;
705                }
706                Err(_) => {
707                    tracing::warn!("[queue] info fetch timed out for {} after 60s", item_id);
708                    let state = {
709                        let mut q = queue.lock().await;
710                        q.mark_complete(
711                            item_id,
712                            false,
713                            Some("Timed out fetching video info".to_string()),
714                            None,
715                            None,
716                        );
717                        q.get_state()
718                    };
719                    emit_queue_state_from_state(&reporter, state);
720                    tokio::spawn(try_start_next(queue));
721                    return;
722                }
723            }
724        }
725    };
726    tracing::info!(
727        "[queue] info fetch for {} took {:?}",
728        item_id,
729        info_start.elapsed()
730    );
731
732    let mut info = info;
733    if is_generic_title(&info.title) {
734        let pokemon = crate::core::pokemon_names::random_pokemon_name();
735        info.title = format!("video_{}", pokemon);
736    }
737
738    let state = {
739        let mut q = queue.lock().await;
740        if let Some(item) = q.items.iter_mut().find(|i| i.id == item_id) {
741            item.title = info.title.clone();
742            item.total_bytes = info.file_size_bytes;
743            let fc = if info.media_type == crate::models::media::MediaType::Carousel
744                || info.media_type == crate::models::media::MediaType::Playlist
745            {
746                info.available_qualities.len() as u32
747            } else {
748                1
749            };
750            item.file_count = Some(fc);
751            item.media_info = Some(info.clone());
752        }
753        q.get_state()
754    };
755    emit_queue_state_from_state(&reporter, state);
756
757    if let Some(r) = &reporter {
758        r.on_progress(
759            item_id,
760            crate::core::events::QueueItemProgress {
761                id: item_id,
762                title: info.title.clone(),
763                platform: platform_name.clone(),
764                percent: 0.5,
765                speed_bytes_per_sec: 0.0,
766                downloaded_bytes: 0,
767                total_bytes: info.file_size_bytes,
768                phase: "starting".to_string(),
769            },
770        );
771    }
772
773    let settings = crate::models::settings::AppSettings::load_from_disk();
774    let tmpl = settings.download.filename_template.clone();
775    let mut final_output_dir = std::path::PathBuf::from(&output_dir);
776    if settings.download.organize_by_platform {
777        final_output_dir = final_output_dir.join(&platform_name);
778    }
779    let torrent_id_slot = Arc::new(tokio::sync::Mutex::new(None));
780    let opts = crate::models::media::DownloadOptions {
781        quality: quality.or_else(|| Some(settings.download.video_quality.clone())),
782        output_dir: final_output_dir,
783        filename_template: Some(tmpl),
784        download_subtitles: settings.download.download_subtitles,
785        include_auto_subtitles: settings.download.include_auto_subtitles,
786        download_mode,
787        format_id,
788        referer,
789        extra_headers,
790        page_url,
791        user_agent,
792        cancel_token: cancel_token.clone(),
793        concurrent_fragments: settings.advanced.concurrent_fragments,
794        ytdlp_path,
795        torrent_listen_port: Some(settings.advanced.torrent_listen_port),
796        torrent_id_slot: Some(torrent_id_slot.clone()),
797    };
798
799    let total_bytes = info.file_size_bytes;
800    let item_title = info.title.clone();
801    let item_platform = platform_name.clone();
802    let (tx, mut rx) = mpsc::channel::<f64>(32);
803
804    let reporter_progress = reporter.clone();
805    let queue_progress = queue.clone();
806    let torrent_id_slot_progress = torrent_id_slot.clone();
807    let progress_forwarder = tokio::spawn(async move {
808        let mut last_bytes: u64 = 0;
809        let mut last_time = std::time::Instant::now();
810        let mut throttle = ProgressThrottle::new(250);
811        let mut current_speed: f64 = 0.0;
812
813        while let Some(percent) = rx.recv().await {
814            if !throttle.should_emit() && percent < 100.0 {
815                continue;
816            }
817
818            let now = std::time::Instant::now();
819            let clamped = percent.max(0.0);
820            let downloaded_bytes = total_bytes
821                .map(|total| (clamped / 100.0 * total as f64) as u64)
822                .unwrap_or(0);
823
824            if total_bytes.is_some() && downloaded_bytes > last_bytes {
825                let dt = now.duration_since(last_time).as_secs_f64();
826                if dt > 0.1 {
827                    let instant_speed = (downloaded_bytes - last_bytes) as f64 / dt;
828                    current_speed = if current_speed > 0.0 {
829                        current_speed * 0.7 + instant_speed * 0.3
830                    } else {
831                        instant_speed
832                    };
833                }
834            }
835
836            last_bytes = downloaded_bytes;
837            last_time = now;
838
839            let phase = match percent {
840                p if p < -1.5 => "connecting",
841                p if p < -0.5 => "starting",
842                p if p > 0.0 => "downloading",
843                _ => "starting",
844            };
845
846            {
847                let mut q = queue_progress.lock().await;
848                let tid = { *torrent_id_slot_progress.lock().await };
849                q.update_progress(
850                    item_id,
851                    clamped,
852                    current_speed,
853                    downloaded_bytes,
854                    total_bytes,
855                    tid,
856                );
857            }
858
859            if let Some(r) = &reporter_progress {
860                r.on_progress(
861                    item_id,
862                    crate::core::events::QueueItemProgress {
863                        id: item_id,
864                        title: item_title.clone(),
865                        platform: item_platform.clone(),
866                        percent: clamped,
867                        speed_bytes_per_sec: current_speed,
868                        downloaded_bytes,
869                        total_bytes,
870                        phase: phase.to_string(),
871                    },
872                );
873            }
874        }
875    });
876
877    if let Some(ua) = opts.user_agent.clone() {
878        crate::core::ytdlp::register_ext_user_agent(url.clone(), ua);
879    }
880    if let Some(hdrs) = opts.extra_headers.clone() {
881        crate::core::ytdlp::register_ext_headers(url.clone(), hdrs);
882    }
883
884    let dl_start = std::time::Instant::now();
885    let dl_future = async {
886        tokio::select! {
887            r = downloader.download(&info, &opts, tx) => r,
888            _ = cancel_token.cancelled() => {
889                Err(anyhow::anyhow!("Download cancelado"))
890            }
891        }
892    };
893    let result = crate::core::log_hook::CURRENT_DOWNLOAD_ID
894        .scope(item_id, dl_future)
895        .await;
896    crate::core::ytdlp::clear_ext_user_agent(&url);
897    crate::core::ytdlp::clear_ext_headers(&url);
898    tracing::info!(
899        "[queue] download {} completed in {:?}",
900        item_id,
901        dl_start.elapsed()
902    );
903
904    let _ = progress_forwarder.await;
905
906    let was_paused = {
907        let q = queue.lock().await;
908        q.items
909            .iter()
910            .find(|i| i.id == item_id)
911            .map(|i| i.status == QueueStatus::Paused)
912            .unwrap_or(false)
913    };
914
915    if was_paused {
916        let state = {
917            let q = queue.lock().await;
918            q.get_state()
919        };
920        emit_queue_state_from_state(&reporter, state);
921        tokio::spawn(try_start_next(queue));
922        return;
923    }
924
925    match result {
926        Ok(dl) => {
927            if settings.download.embed_metadata
928                && platform_name != "magnet"
929                && crate::core::ffmpeg::is_ffmpeg_available().await
930            {
931                let metadata = crate::core::ffmpeg::MetadataEmbed {
932                    title: Some(info.title.clone()),
933                    artist: Some(info.author.clone()),
934                    thumbnail_url: info.thumbnail_url.clone(),
935                    ..Default::default()
936                };
937                if let Err(e) = crate::core::ffmpeg::embed_metadata(
938                    &dl.file_path,
939                    &metadata,
940                    settings.download.embed_thumbnail,
941                    shared_http_client(),
942                )
943                .await
944                {
945                    tracing::warn!("Metadata embed failed for '{}': {}", info.title, e);
946                }
947            }
948
949            if from_hotkey && settings.download.copy_to_clipboard_on_hotkey {
950                #[cfg(not(target_os = "android"))]
951                {
952                    match crate::core::clipboard::copy_file_to_clipboard(&dl.file_path).await {
953                        Ok(()) => {
954                            tracing::info!("[clipboard] file copied: {:?}", dl.file_path);
955                            // Notified via general event if needed, or by reporter
956                            // If needed: if let Some(r) = &reporter { r.on_complete(...) }
957                        }
958                        Err(e) => {
959                            tracing::warn!("[clipboard] failed to copy file: {}", e);
960                        }
961                    }
962                }
963            }
964
965            let state = {
966                let mut q = queue.lock().await;
967                if platform_name == "magnet" && dl.torrent_id.is_some() {
968                    q.mark_seeding(
969                        item_id,
970                        Some(dl.file_path.to_string_lossy().to_string()),
971                        Some(dl.file_size_bytes),
972                        dl.torrent_id,
973                    );
974                } else {
975                    q.mark_complete(
976                        item_id,
977                        true,
978                        None,
979                        Some(dl.file_path.to_string_lossy().to_string()),
980                        Some(dl.file_size_bytes),
981                    );
982                }
983                q.get_state()
984            };
985            if let Some(r) = &reporter {
986                r.on_complete(
987                    item_id,
988                    Some(dl.file_path.to_string_lossy().to_string()),
989                    Some(dl.file_size_bytes),
990                );
991            }
992            emit_queue_state_from_state(&reporter, state);
993        }
994        Err(e) => {
995            let raw_err = e.to_string();
996            let (category, hint) = crate::core::errors::classify_download_error(&raw_err);
997            let user_msg = if category != "unknown" {
998                format!("{} ({})", hint, raw_err)
999            } else {
1000                raw_err.clone()
1001            };
1002            tracing::error!(
1003                "Download error '{}' [{}]: {}",
1004                platform_name,
1005                category,
1006                raw_err
1007            );
1008            let state = {
1009                let mut q = queue.lock().await;
1010                q.mark_complete(item_id, false, Some(user_msg.clone()), None, None);
1011                q.get_state()
1012            };
1013            if let Some(r) = &reporter {
1014                r.on_error(item_id, user_msg);
1015            }
1016            emit_queue_state_from_state(&reporter, state);
1017        }
1018    }
1019
1020    tokio::spawn(try_start_next(queue));
1021}
1022
1023pub async fn fetch_and_cache_info(
1024    url: &str,
1025    downloader: &dyn PlatformDownloader,
1026    platform: &str,
1027) -> anyhow::Result<MediaInfo> {
1028    {
1029        let cache = info_cache().lock().await;
1030        if let Some(entry) = cache.get(url) {
1031            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1032                tracing::debug!("[perf] fetch_and_cache_info: cache hit for {}", platform);
1033                return Ok(entry.info.clone());
1034            }
1035        }
1036    }
1037
1038    let url_lock = {
1039        let mut map = in_flight_map().lock().await;
1040        map.entry(url.to_string())
1041            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())))
1042            .clone()
1043    };
1044    let _guard = url_lock.lock().await;
1045
1046    {
1047        let cache = info_cache().lock().await;
1048        if let Some(entry) = cache.get(url) {
1049            if entry.cached_at.elapsed() < INFO_CACHE_TTL {
1050                tracing::debug!(
1051                    "[perf] fetch_and_cache_info: dedup cache hit for {}",
1052                    platform
1053                );
1054                return Ok(entry.info.clone());
1055            }
1056        }
1057    }
1058
1059    tracing::debug!("[perf] fetch_and_cache_info: fetching for {}", platform);
1060    let info = downloader.get_media_info(url).await?;
1061
1062    let mut cache = info_cache().lock().await;
1063    cache.insert(
1064        url.to_string(),
1065        CachedInfo {
1066            info: info.clone(),
1067            cached_at: std::time::Instant::now(),
1068        },
1069    );
1070    if cache.len() > 50 {
1071        cache.retain(|_, v| v.cached_at.elapsed() < INFO_CACHE_TTL);
1072    }
1073    Ok(info)
1074}
1075
1076pub async fn try_get_cached_info(url: &str) -> Option<MediaInfo> {
1077    let cache = info_cache().lock().await;
1078    cache
1079        .get(url)
1080        .filter(|entry| entry.cached_at.elapsed() < INFO_CACHE_TTL)
1081        .map(|entry| entry.info.clone())
1082}
1083
1084pub async fn prefetch_info(url: &str, downloader: &dyn PlatformDownloader, platform: &str) {
1085    prefetch_info_with_emit(url, downloader, platform, None).await;
1086}
1087
1088pub async fn prefetch_info_with_emit(
1089    url: &str,
1090    downloader: &dyn PlatformDownloader,
1091    platform: &str,
1092    reporter: Option<SharedReporter>,
1093) {
1094    let _timer_start = std::time::Instant::now();
1095    tracing::debug!("[perf] prefetch_info: started");
1096    match fetch_and_cache_info(url, downloader, platform).await {
1097        Ok(info) => {
1098            tracing::debug!(
1099                "[perf] prefetch_info: completed in {:?} — {}",
1100                _timer_start.elapsed(),
1101                info.title
1102            );
1103            if let Some(r) = reporter {
1104                r.on_media_preview(
1105                    url.to_string(),
1106                    info.title.clone(),
1107                    info.author.clone(),
1108                    info.thumbnail_url.clone(),
1109                    info.duration_seconds,
1110                );
1111            }
1112        }
1113        Err(e) => tracing::warn!(
1114            "[perf] prefetch_info: failed in {:?} — {}",
1115            _timer_start.elapsed(),
1116            e
1117        ),
1118    }
1119}
1120
1121pub async fn try_start_next(queue: Arc<tokio::sync::Mutex<DownloadQueue>>) {
1122    let _timer_start = std::time::Instant::now();
1123    let (next_ids, stagger, state_to_emit, reporter) = {
1124        let mut q = queue.lock().await;
1125        let ids = q.next_queued_ids();
1126        for nid in &ids {
1127            q.mark_active(*nid);
1128        }
1129        let state = if !ids.is_empty() {
1130            Some(q.get_state())
1131        } else {
1132            None
1133        };
1134        (ids, q.stagger_delay_ms, state, q.reporter.clone())
1135    };
1136
1137    if let Some(state) = state_to_emit {
1138        emit_queue_state_from_state(&reporter, state);
1139    }
1140
1141    let batch_size = next_ids.len();
1142    for (i, nid) in next_ids.into_iter().enumerate() {
1143        if let Some(r) = &reporter {
1144            r.on_progress(
1145                nid,
1146                crate::core::events::QueueItemProgress {
1147                    id: nid,
1148                    title: String::new(),
1149                    platform: String::new(),
1150                    percent: 0.0,
1151                    speed_bytes_per_sec: 0.0,
1152                    downloaded_bytes: 0,
1153                    total_bytes: None,
1154                    phase: "queued_starting".to_string(),
1155                },
1156            );
1157        }
1158
1159        if i > 0 {
1160            let item_platform = {
1161                let q = queue.lock().await;
1162                q.items
1163                    .iter()
1164                    .find(|item| item.id == nid)
1165                    .map(|item| item.platform.clone())
1166            };
1167            let delay_ms = if item_platform.as_deref() == Some("youtube") {
1168                2000
1169            } else if batch_size > 3 {
1170                stagger.max(1000)
1171            } else {
1172                stagger
1173            };
1174            if delay_ms > 0 {
1175                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
1176            }
1177        }
1178        let queue_c = queue.clone();
1179        tokio::spawn(async move {
1180            tokio::spawn(spawn_download(queue_c, nid));
1181        });
1182    }
1183    tracing::debug!("[perf] try_start_next took {:?}", _timer_start.elapsed());
1184}
1185
1186fn is_generic_title(title: &str) -> bool {
1187    let t = title.to_lowercase();
1188    let t = t.trim();
1189    t.is_empty()
1190        || t == "video"
1191        || t == "media"
1192        || t == "untitled"
1193        || t == "unknown"
1194        || t.starts_with("video [video]")
1195        || t.starts_with("media [media]")
1196}