gosh_dl/
engine.rs

1//! Download Engine - Main coordinator
2//!
3//! The `DownloadEngine` is the primary entry point for the library.
4//! It manages all downloads, coordinates between HTTP and BitTorrent
5//! engines, handles persistence, and emits events.
6
7use crate::config::EngineConfig;
8use crate::error::{EngineError, Result};
9use crate::http::{HttpDownloader, SegmentedDownload};
10use crate::priority_queue::{DownloadPriority, PriorityQueue};
11use crate::scheduler::{BandwidthLimits, BandwidthScheduler};
12use crate::storage::{SqliteStorage, Storage};
13use crate::torrent::{MagnetUri, Metainfo, TorrentConfig, TorrentDownloader};
14use crate::types::{
15    DownloadEvent, DownloadId, DownloadKind, DownloadMetadata, DownloadOptions, DownloadProgress,
16    DownloadState, DownloadStatus, GlobalStats, TorrentFile, TorrentStatusInfo,
17};
18
19use chrono::Utc;
20use parking_lot::RwLock;
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::broadcast;
25use url::Url;
26
27/// Maximum number of events to buffer
28const EVENT_CHANNEL_CAPACITY: usize = 1024;
29
30/// Internal representation of a managed download
31struct ManagedDownload {
32    status: DownloadStatus,
33    handle: Option<DownloadHandle>,
34}
35
36/// Handle to control a running download
37enum DownloadHandle {
38    Http(HttpDownloadHandle),
39    Torrent(TorrentDownloadHandle),
40}
41
42/// Handle for an HTTP download task
43struct HttpDownloadHandle {
44    cancel_token: tokio_util::sync::CancellationToken,
45    task: tokio::task::JoinHandle<Result<()>>,
46    /// Reference to segmented download for persistence (if using segmented download).
47    /// Wrapped in RwLock so it can be populated from inside the spawned task.
48    segmented_download: Arc<RwLock<Option<Arc<SegmentedDownload>>>>,
49}
50
51/// Handle for a torrent download
52struct TorrentDownloadHandle {
53    downloader: Arc<TorrentDownloader>,
54    task: tokio::task::JoinHandle<Result<()>>,
55    progress_task: tokio::task::JoinHandle<()>,
56}
57
58/// The main download engine
59pub struct DownloadEngine {
60    /// Configuration
61    config: RwLock<EngineConfig>,
62
63    /// All managed downloads
64    downloads: RwLock<HashMap<DownloadId, ManagedDownload>>,
65
66    /// HTTP downloader
67    http: Arc<HttpDownloader>,
68
69    /// Event broadcaster
70    event_tx: broadcast::Sender<DownloadEvent>,
71
72    /// Priority queue for limiting and ordering concurrent downloads
73    priority_queue: Arc<PriorityQueue>,
74
75    /// Bandwidth scheduler for time-based limits
76    scheduler: Arc<RwLock<BandwidthScheduler>>,
77
78    /// Shutdown flag
79    shutdown: tokio_util::sync::CancellationToken,
80
81    /// Persistent storage for download state
82    storage: Option<Arc<dyn Storage>>,
83}
84
85impl DownloadEngine {
86    /// Create a new download engine with the given configuration
87    pub async fn new(config: EngineConfig) -> Result<Arc<Self>> {
88        // Validate configuration
89        config.validate()?;
90
91        // Create event channel
92        let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
93
94        // Create HTTP downloader
95        let http = Arc::new(HttpDownloader::new(&config)?);
96
97        // Create priority queue for concurrent download limiting
98        let priority_queue = PriorityQueue::new(config.max_concurrent_downloads);
99
100        // Create bandwidth scheduler with configured rules
101        let scheduler = Arc::new(RwLock::new(BandwidthScheduler::new(
102            config.schedule_rules.clone(),
103            BandwidthLimits {
104                download: config.global_download_limit,
105                upload: config.global_upload_limit,
106            },
107        )));
108
109        // Initialize persistent storage
110        let storage: Option<Arc<dyn Storage>> = if let Some(ref db_path) = config.database_path {
111            match SqliteStorage::new(db_path).await {
112                Ok(s) => Some(Arc::new(s)),
113                Err(e) => {
114                    tracing::warn!("Failed to initialize database storage: {}. Downloads will not be persisted.", e);
115                    None
116                }
117            }
118        } else {
119            None
120        };
121
122        let engine = Arc::new(Self {
123            config: RwLock::new(config),
124            downloads: RwLock::new(HashMap::new()),
125            http,
126            event_tx,
127            priority_queue,
128            scheduler,
129            shutdown: tokio_util::sync::CancellationToken::new(),
130            storage,
131        });
132
133        // Load persisted downloads from database
134        engine.load_persisted_downloads().await?;
135
136        // Start background persistence task
137        Self::start_persistence_task(Arc::clone(&engine));
138
139        // Start bandwidth scheduler update task
140        Self::start_scheduler_task(Arc::clone(&engine));
141
142        Ok(engine)
143    }
144
145    /// Start background task that periodically persists active download states.
146    ///
147    /// This ensures that if the process crashes, downloads can be resumed
148    /// from approximately where they left off.
149    fn start_persistence_task(engine: Arc<Self>) {
150        if engine.storage.is_none() {
151            return; // No storage configured
152        }
153
154        let shutdown = engine.shutdown.clone();
155        tokio::spawn(async move {
156            const PERSISTENCE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
157            let mut interval = tokio::time::interval(PERSISTENCE_INTERVAL);
158
159            loop {
160                tokio::select! {
161                    _ = interval.tick() => {
162                        if let Err(e) = engine.persist_active_downloads().await {
163                            tracing::warn!("Failed to persist active downloads: {}", e);
164                        }
165                    }
166                    _ = shutdown.cancelled() => {
167                        // Final persistence on shutdown
168                        if let Err(e) = engine.persist_active_downloads().await {
169                            tracing::warn!("Failed to persist downloads on shutdown: {}", e);
170                        }
171                        break;
172                    }
173                }
174            }
175        });
176    }
177
178    /// Start background task that updates bandwidth limits based on schedule.
179    ///
180    /// This checks the schedule rules every minute and updates the current
181    /// bandwidth limits if they have changed.
182    fn start_scheduler_task(engine: Arc<Self>) {
183        let shutdown = engine.shutdown.clone();
184        tokio::spawn(async move {
185            const SCHEDULER_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
186            let mut interval = tokio::time::interval(SCHEDULER_INTERVAL);
187
188            loop {
189                tokio::select! {
190                    _ = interval.tick() => {
191                        engine.scheduler.read().update();
192                    }
193                    _ = shutdown.cancelled() => {
194                        break;
195                    }
196                }
197            }
198        });
199    }
200
201    /// Persist all active (non-completed, non-error) downloads to storage.
202    async fn persist_active_downloads(&self) -> Result<()> {
203        let storage = match &self.storage {
204            Some(s) => s,
205            None => return Ok(()),
206        };
207
208        // Collect active downloads and their segment info
209        let active_downloads: Vec<(DownloadStatus, Option<Vec<crate::storage::Segment>>)> = {
210            let downloads = self.downloads.read();
211            downloads
212                .values()
213                .filter(|d| d.status.state.is_active())
214                .map(|d| {
215                    let segments = match &d.handle {
216                        Some(DownloadHandle::Http(h)) => h
217                            .segmented_download
218                            .read()
219                            .as_ref()
220                            .map(|sd| sd.segments_with_progress()),
221                        _ => None,
222                    };
223                    (d.status.clone(), segments)
224                })
225                .collect()
226        };
227
228        // Save each active download and its segments
229        for (status, segments_opt) in active_downloads {
230            if let Err(e) = storage.save_download(&status).await {
231                tracing::debug!("Failed to persist download {}: {}", status.id, e);
232            }
233
234            // Save segments if this is a segmented HTTP download
235            if let Some(segments) = segments_opt {
236                if let Err(e) = storage.save_segments(status.id, &segments).await {
237                    tracing::debug!("Failed to persist segments for {}: {}", status.id, e);
238                }
239            }
240        }
241
242        Ok(())
243    }
244
245    /// Load persisted downloads from database on startup
246    async fn load_persisted_downloads(self: &Arc<Self>) -> Result<()> {
247        let storage = match &self.storage {
248            Some(s) => s,
249            None => return Ok(()), // No storage configured
250        };
251
252        let persisted = storage.load_all().await?;
253
254        for status in persisted {
255            // Skip completed downloads - they don't need resumption
256            if matches!(status.state, DownloadState::Completed) {
257                continue;
258            }
259
260            // For active/downloading states, mark as paused (crashed mid-download)
261            let restored_state = match &status.state {
262                DownloadState::Downloading | DownloadState::Connecting => DownloadState::Paused,
263                DownloadState::Seeding => DownloadState::Paused, // Torrents that were seeding
264                other => other.clone(),
265            };
266
267            let mut restored_status = status.clone();
268            restored_status.state = restored_state;
269            // Reset speeds (they're stale)
270            restored_status.progress.download_speed = 0;
271            restored_status.progress.upload_speed = 0;
272            restored_status.progress.connections = 0;
273
274            // Insert into downloads map
275            self.downloads.write().insert(
276                status.id,
277                ManagedDownload {
278                    status: restored_status,
279                    handle: None,
280                },
281            );
282
283            tracing::info!(
284                "Restored download {} ({}) in state {:?}",
285                status.id,
286                status.metadata.name,
287                status.state
288            );
289        }
290
291        Ok(())
292    }
293
294    fn build_torrent_status_info(metainfo: &Metainfo) -> TorrentStatusInfo {
295        TorrentStatusInfo {
296            files: metainfo
297                .info
298                .files
299                .iter()
300                .enumerate()
301                .map(|(i, f)| TorrentFile {
302                    index: i,
303                    path: f.path.clone(),
304                    size: f.length,
305                    completed: 0,
306                    selected: true,
307                })
308                .collect(),
309            piece_length: metainfo.info.piece_length,
310            pieces_count: metainfo.info.pieces.len(),
311            private: metainfo.info.private,
312        }
313    }
314
315    /// Add an HTTP/HTTPS download
316    pub async fn add_http(
317        self: &Arc<Self>,
318        url: &str,
319        options: DownloadOptions,
320    ) -> Result<DownloadId> {
321        // Validate URL
322        let parsed_url = Url::parse(url)
323            .map_err(|e| EngineError::invalid_input("url", format!("Invalid URL: {}", e)))?;
324
325        // Only allow http and https
326        match parsed_url.scheme() {
327            "http" | "https" => {}
328            scheme => {
329                return Err(EngineError::invalid_input(
330                    "url",
331                    format!("Unsupported scheme: {}", scheme),
332                ));
333            }
334        }
335
336        // Generate download ID
337        let id = DownloadId::new();
338
339        // Determine save directory
340        let save_dir = options
341            .save_dir
342            .clone()
343            .unwrap_or_else(|| self.config.read().download_dir.clone());
344
345        // Extract filename from URL or options
346        let filename = options.filename.clone().or_else(|| {
347            parsed_url
348                .path_segments()
349                .and_then(|mut segments| segments.next_back())
350                .map(|s| s.to_string())
351                .filter(|s| !s.is_empty())
352        });
353
354        let name = filename.clone().unwrap_or_else(|| "download".to_string());
355
356        // Create download status
357        let status = DownloadStatus {
358            id,
359            kind: DownloadKind::Http,
360            state: DownloadState::Queued,
361            priority: options.priority,
362            progress: DownloadProgress::default(),
363            metadata: DownloadMetadata {
364                name,
365                url: Some(url.to_string()),
366                magnet_uri: None,
367                info_hash: None,
368                save_dir,
369                filename,
370                user_agent: options.user_agent.clone(),
371                referer: options.referer.clone(),
372                headers: options.headers.clone(),
373                cookies: options.cookies.clone().unwrap_or_default(),
374                checksum: options.checksum.clone(),
375                mirrors: options.mirrors.clone(),
376                etag: None,
377                last_modified: None,
378            },
379            torrent_info: None,
380            peers: None,
381            created_at: Utc::now(),
382            completed_at: None,
383        };
384
385        // Insert into downloads map
386        {
387            let mut downloads = self.downloads.write();
388            downloads.insert(
389                id,
390                ManagedDownload {
391                    status: status.clone(),
392                    handle: None,
393                },
394            );
395        }
396
397        // Persist to database
398        if let Some(ref storage) = self.storage {
399            if let Err(e) = storage.save_download(&status).await {
400                tracing::warn!("Failed to persist new download {}: {}", id, e);
401            }
402        }
403
404        // Emit event
405        let _ = self.event_tx.send(DownloadEvent::Added { id });
406
407        // Start the download (no saved segments for new downloads)
408        self.start_download(id, url.to_string(), options, None)
409            .await?;
410
411        Ok(id)
412    }
413
414    /// Add a BitTorrent download from torrent file data
415    pub async fn add_torrent(
416        self: &Arc<Self>,
417        torrent_data: &[u8],
418        options: DownloadOptions,
419    ) -> Result<DownloadId> {
420        // Parse torrent file
421        let metainfo = Metainfo::parse(torrent_data)?;
422
423        // Generate download ID
424        let id = DownloadId::new();
425
426        // Determine save directory
427        let save_dir = options
428            .save_dir
429            .clone()
430            .unwrap_or_else(|| self.config.read().download_dir.clone());
431
432        // Create download status
433        let status = DownloadStatus {
434            id,
435            kind: DownloadKind::Torrent,
436            state: DownloadState::Queued,
437            priority: options.priority,
438            progress: DownloadProgress::default(),
439            metadata: DownloadMetadata {
440                name: metainfo.info.name.clone(),
441                url: None,
442                magnet_uri: None,
443                info_hash: Some(hex::encode(metainfo.info_hash)),
444                save_dir: save_dir.clone(),
445                filename: Some(metainfo.info.name.clone()),
446                user_agent: options.user_agent.clone(),
447                referer: None,
448                headers: Vec::new(),
449                cookies: Vec::new(),
450                checksum: None,
451                mirrors: Vec::new(),
452                etag: None,
453                last_modified: None,
454            },
455            torrent_info: Some(Self::build_torrent_status_info(&metainfo)),
456            peers: Some(Vec::new()),
457            created_at: Utc::now(),
458            completed_at: None,
459        };
460
461        // Insert into downloads map
462        {
463            let mut downloads = self.downloads.write();
464            downloads.insert(
465                id,
466                ManagedDownload {
467                    status: status.clone(),
468                    handle: None,
469                },
470            );
471        }
472
473        // Persist to database
474        if let Some(ref storage) = self.storage {
475            if let Err(e) = storage.save_download(&status).await {
476                tracing::warn!("Failed to persist new torrent download {}: {}", id, e);
477            }
478        }
479
480        // Emit event
481        let _ = self.event_tx.send(DownloadEvent::Added { id });
482
483        // Start the torrent download
484        self.start_torrent(id, metainfo, save_dir, options).await?;
485
486        Ok(id)
487    }
488
489    /// Add a BitTorrent download from a magnet URI
490    pub async fn add_magnet(
491        self: &Arc<Self>,
492        magnet_uri: &str,
493        options: DownloadOptions,
494    ) -> Result<DownloadId> {
495        // Parse magnet URI
496        let magnet = MagnetUri::parse(magnet_uri)?;
497
498        // Generate download ID
499        let id = DownloadId::new();
500
501        // Determine save directory
502        let save_dir = options
503            .save_dir
504            .clone()
505            .unwrap_or_else(|| self.config.read().download_dir.clone());
506
507        // Create download status
508        let status = DownloadStatus {
509            id,
510            kind: DownloadKind::Magnet,
511            state: DownloadState::Queued,
512            priority: options.priority,
513            progress: DownloadProgress::default(),
514            metadata: DownloadMetadata {
515                name: magnet.name(),
516                url: None,
517                magnet_uri: Some(magnet_uri.to_string()),
518                info_hash: Some(hex::encode(magnet.info_hash)),
519                save_dir: save_dir.clone(),
520                filename: magnet.display_name.clone(),
521                user_agent: options.user_agent.clone(),
522                referer: None,
523                headers: Vec::new(),
524                cookies: Vec::new(),
525                checksum: None,
526                mirrors: Vec::new(),
527                etag: None,
528                last_modified: None,
529            },
530            torrent_info: None, // Will be populated when metadata is received
531            peers: Some(Vec::new()),
532            created_at: Utc::now(),
533            completed_at: None,
534        };
535
536        // Insert into downloads map
537        {
538            let mut downloads = self.downloads.write();
539            downloads.insert(
540                id,
541                ManagedDownload {
542                    status: status.clone(),
543                    handle: None,
544                },
545            );
546        }
547
548        // Persist to database
549        if let Some(ref storage) = self.storage {
550            if let Err(e) = storage.save_download(&status).await {
551                tracing::warn!("Failed to persist new magnet download {}: {}", id, e);
552            }
553        }
554
555        // Emit event
556        let _ = self.event_tx.send(DownloadEvent::Added { id });
557
558        // Start the magnet download
559        self.start_magnet(id, magnet, save_dir, options).await?;
560
561        Ok(id)
562    }
563
564    /// Start a torrent download task
565    async fn start_torrent(
566        self: &Arc<Self>,
567        id: DownloadId,
568        metainfo: Metainfo,
569        save_dir: std::path::PathBuf,
570        options: DownloadOptions,
571    ) -> Result<()> {
572        let config = self.config.read();
573        let torrent_config = TorrentConfig {
574            max_peers: config.max_peers,
575            enable_dht: config.enable_dht,
576            enable_pex: config.enable_pex,
577            enable_lpd: config.enable_lpd,
578            seed_ratio: Some(config.seed_ratio),
579            max_upload_speed: config.global_upload_limit.unwrap_or(0),
580            max_download_speed: config.global_download_limit.unwrap_or(0),
581            ..TorrentConfig::default()
582        };
583        drop(config);
584
585        let downloader = Arc::new(TorrentDownloader::from_torrent(
586            id,
587            metainfo,
588            save_dir,
589            torrent_config,
590            self.event_tx.clone(),
591        )?);
592
593        // Apply selected files for partial download
594        if let Some(ref selected) = options.selected_files {
595            downloader.set_selected_files(Some(selected));
596        }
597
598        // Apply sequential download mode if requested
599        if let Some(sequential) = options.sequential {
600            downloader.set_sequential(sequential);
601        }
602
603        let downloader_clone = Arc::clone(&downloader);
604        let engine = Arc::clone(self);
605
606        // Update state
607        self.update_state(id, DownloadState::Connecting)?;
608
609        let task = tokio::spawn(async move {
610            // Start the download (announces to trackers, verifies existing pieces)
611            if let Err(e) = Arc::clone(&downloader_clone).start().await {
612                let error_msg = e.to_string();
613                engine.update_state(
614                    id,
615                    DownloadState::Error {
616                        kind: format!("{:?}", e),
617                        message: error_msg.clone(),
618                        retryable: e.is_retryable(),
619                    },
620                )?;
621                let _ = engine.event_tx.send(DownloadEvent::Failed {
622                    id,
623                    error: error_msg,
624                    retryable: e.is_retryable(),
625                });
626                return Ok(());
627            }
628
629            // Update state to downloading
630            engine.update_state(id, DownloadState::Downloading)?;
631            let _ = engine.event_tx.send(DownloadEvent::Started { id });
632
633            // Run the peer connection loop
634            if let Err(e) = downloader_clone.run_peer_loop().await {
635                let error_msg = e.to_string();
636                engine.update_state(
637                    id,
638                    DownloadState::Error {
639                        kind: format!("{:?}", e),
640                        message: error_msg.clone(),
641                        retryable: e.is_retryable(),
642                    },
643                )?;
644                let _ = engine.event_tx.send(DownloadEvent::Failed {
645                    id,
646                    error: error_msg,
647                    retryable: e.is_retryable(),
648                });
649            }
650
651            Ok(())
652        });
653
654        let progress_task =
655            Self::spawn_torrent_progress_task(Arc::clone(self), id, Arc::clone(&downloader));
656
657        // Store the handle
658        {
659            let mut downloads = self.downloads.write();
660            if let Some(download) = downloads.get_mut(&id) {
661                download.handle = Some(DownloadHandle::Torrent(TorrentDownloadHandle {
662                    downloader,
663                    task,
664                    progress_task,
665                }));
666            }
667        }
668
669        Ok(())
670    }
671
672    /// Start a magnet download task
673    async fn start_magnet(
674        self: &Arc<Self>,
675        id: DownloadId,
676        magnet: MagnetUri,
677        save_dir: std::path::PathBuf,
678        options: DownloadOptions,
679    ) -> Result<()> {
680        let config = self.config.read();
681        let torrent_config = TorrentConfig {
682            max_peers: config.max_peers,
683            enable_dht: config.enable_dht,
684            enable_pex: config.enable_pex,
685            enable_lpd: config.enable_lpd,
686            seed_ratio: Some(config.seed_ratio),
687            max_upload_speed: config.global_upload_limit.unwrap_or(0),
688            max_download_speed: config.global_download_limit.unwrap_or(0),
689            ..TorrentConfig::default()
690        };
691        drop(config);
692
693        let downloader = Arc::new(TorrentDownloader::from_magnet(
694            id,
695            magnet,
696            save_dir,
697            torrent_config,
698            self.event_tx.clone(),
699        )?);
700
701        // Apply sequential download mode if requested
702        if let Some(sequential) = options.sequential {
703            downloader.set_sequential(sequential);
704        }
705
706        let downloader_clone = Arc::clone(&downloader);
707        let engine = Arc::clone(self);
708
709        // Update state
710        self.update_state(id, DownloadState::Connecting)?;
711
712        let task = tokio::spawn(async move {
713            // Start the download (announces to trackers)
714            if let Err(e) = Arc::clone(&downloader_clone).start().await {
715                let error_msg = e.to_string();
716                engine.update_state(
717                    id,
718                    DownloadState::Error {
719                        kind: format!("{:?}", e),
720                        message: error_msg.clone(),
721                        retryable: e.is_retryable(),
722                    },
723                )?;
724                let _ = engine.event_tx.send(DownloadEvent::Failed {
725                    id,
726                    error: error_msg,
727                    retryable: e.is_retryable(),
728                });
729                return Ok(());
730            }
731
732            // Update state - for magnets, we're initially fetching metadata
733            engine.update_state(id, DownloadState::Downloading)?;
734            let _ = engine.event_tx.send(DownloadEvent::Started { id });
735
736            // Run the peer connection loop (handles both downloading and metadata fetching for magnets)
737            if let Err(e) = downloader_clone.run_peer_loop().await {
738                let error_msg = e.to_string();
739                engine.update_state(
740                    id,
741                    DownloadState::Error {
742                        kind: format!("{:?}", e),
743                        message: error_msg.clone(),
744                        retryable: e.is_retryable(),
745                    },
746                )?;
747                let _ = engine.event_tx.send(DownloadEvent::Failed {
748                    id,
749                    error: error_msg,
750                    retryable: e.is_retryable(),
751                });
752            }
753
754            Ok(())
755        });
756
757        let progress_task =
758            Self::spawn_torrent_progress_task(Arc::clone(self), id, Arc::clone(&downloader));
759
760        // Store the handle
761        {
762            let mut downloads = self.downloads.write();
763            if let Some(download) = downloads.get_mut(&id) {
764                download.handle = Some(DownloadHandle::Torrent(TorrentDownloadHandle {
765                    downloader,
766                    task,
767                    progress_task,
768                }));
769            }
770        }
771
772        Ok(())
773    }
774
775    fn spawn_torrent_progress_task(
776        engine: Arc<Self>,
777        id: DownloadId,
778        downloader: Arc<TorrentDownloader>,
779    ) -> tokio::task::JoinHandle<()> {
780        let shutdown = engine.shutdown.clone();
781        tokio::spawn(async move {
782            let mut interval = tokio::time::interval(Duration::from_millis(500));
783            loop {
784                tokio::select! {
785                    _ = shutdown.cancelled() => break,
786                    _ = interval.tick() => {}
787                }
788
789                let progress = downloader.progress();
790                let metainfo = downloader.metainfo();
791                let send_progress = {
792                    let mut downloads = engine.downloads.write();
793                    let download = match downloads.get_mut(&id) {
794                        Some(download) => download,
795                        None => break,
796                    };
797
798                    if matches!(
799                        download.status.state,
800                        DownloadState::Error { .. } | DownloadState::Completed
801                    ) {
802                        break;
803                    }
804
805                    if let Some(ref metainfo) = metainfo {
806                        if download.status.torrent_info.is_none() {
807                            download.status.torrent_info =
808                                Some(Self::build_torrent_status_info(metainfo));
809                        }
810                        if download.status.metadata.name != metainfo.info.name {
811                            download.status.metadata.name = metainfo.info.name.clone();
812                        }
813                        if download.status.metadata.filename.as_deref()
814                            != Some(metainfo.info.name.as_str())
815                        {
816                            download.status.metadata.filename = Some(metainfo.info.name.clone());
817                        }
818                    }
819
820                    download.status.progress = progress.clone();
821                    download.status.state.is_active()
822                };
823
824                if send_progress {
825                    let _ = engine
826                        .event_tx
827                        .send(DownloadEvent::Progress { id, progress });
828                }
829            }
830        })
831    }
832
833    /// Start a download task
834    async fn start_download(
835        self: &Arc<Self>,
836        id: DownloadId,
837        url: String,
838        options: DownloadOptions,
839        saved_segments: Option<Vec<crate::storage::Segment>>,
840    ) -> Result<()> {
841        let engine = Arc::clone(self);
842        let http = Arc::clone(&self.http);
843        let priority_queue = Arc::clone(&self.priority_queue);
844        let priority = options.priority;
845        let cancel_token = tokio_util::sync::CancellationToken::new();
846        let cancel_token_clone = cancel_token.clone();
847
848        // Create shared reference for segmented download (populated by download_segmented)
849        let segmented_ref: Arc<RwLock<Option<Arc<SegmentedDownload>>>> =
850            Arc::new(RwLock::new(None));
851        let segmented_ref_for_task = Arc::clone(&segmented_ref);
852
853        // Update state to connecting
854        self.update_state(id, DownloadState::Queued)?;
855
856        let task = tokio::spawn(async move {
857            // Acquire priority queue permit for concurrent limit
858            let _permit = priority_queue.acquire(id, priority).await;
859
860            // Check if cancelled before starting
861            if cancel_token_clone.is_cancelled() {
862                return Ok(());
863            }
864
865            // Update state to connecting then downloading
866            engine.update_state(id, DownloadState::Connecting)?;
867            engine.update_state(id, DownloadState::Downloading)?;
868            let _ = engine.event_tx.send(DownloadEvent::Started { id });
869
870            // Get save path and options
871            let (save_dir, filename, user_agent, referer, headers, cookies, checksum) = {
872                let downloads = engine.downloads.read();
873                let download = downloads
874                    .get(&id)
875                    .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
876                (
877                    download.status.metadata.save_dir.clone(),
878                    download.status.metadata.filename.clone(),
879                    download.status.metadata.user_agent.clone(),
880                    download.status.metadata.referer.clone(),
881                    download.status.metadata.headers.clone(),
882                    download.status.metadata.cookies.clone(),
883                    download.status.metadata.checksum.clone(),
884                )
885            };
886
887            // Create progress callback
888            let engine_clone = Arc::clone(&engine);
889            let progress_callback = move |progress: DownloadProgress| {
890                // Update progress in download status
891                {
892                    let mut downloads = engine_clone.downloads.write();
893                    if let Some(download) = downloads.get_mut(&id) {
894                        download.status.progress = progress.clone();
895                    }
896                }
897                // Emit progress event
898                let _ = engine_clone
899                    .event_tx
900                    .send(DownloadEvent::Progress { id, progress });
901            };
902
903            // Get config for segmented downloads
904            let (max_connections, min_segment_size) = {
905                let config = engine.config.read();
906                (config.max_connections_per_download, config.min_segment_size)
907            };
908
909            // Perform the download (uses segmented if server supports it)
910            let cookies_opt = if cookies.is_empty() {
911                None
912            } else {
913                Some(cookies.as_slice())
914            };
915            let result = http
916                .download_segmented(
917                    &url,
918                    &save_dir,
919                    filename.as_deref(),
920                    user_agent.as_deref(),
921                    referer.as_deref(),
922                    &headers,
923                    cookies_opt,
924                    checksum.as_ref(),
925                    max_connections,
926                    min_segment_size,
927                    cancel_token_clone.clone(),
928                    saved_segments,
929                    progress_callback,
930                    Some(segmented_ref_for_task),
931                )
932                .await;
933
934            match result {
935                Ok((final_path, _segmented_download)) => {
936                    // Update status to completed (but not if paused - race condition)
937                    let should_complete = {
938                        let mut downloads = engine.downloads.write();
939                        if let Some(download) = downloads.get_mut(&id) {
940                            // Don't overwrite Paused state - user paused before completion
941                            if download.status.state == DownloadState::Paused {
942                                false
943                            } else {
944                                download.status.state = DownloadState::Completed;
945                                download.status.completed_at = Some(Utc::now());
946                                download.status.metadata.filename = final_path
947                                    .file_name()
948                                    .map(|s| s.to_string_lossy().to_string());
949                                true
950                            }
951                        } else {
952                            false
953                        }
954                    };
955
956                    if should_complete {
957                        // Clean up saved segments from storage
958                        if let Some(ref storage) = engine.storage {
959                            if let Err(e) = storage.delete_segments(id).await {
960                                tracing::debug!("Failed to clean up segments for {}: {}", id, e);
961                            }
962                        }
963
964                        let _ = engine.event_tx.send(DownloadEvent::Completed { id });
965                    }
966                }
967                Err(e) if cancel_token_clone.is_cancelled() => {
968                    // Cancelled, already handled
969                }
970                Err(e) => {
971                    let retryable = e.is_retryable();
972                    let error_msg = e.to_string();
973
974                    // Update status to error
975                    engine.update_state(
976                        id,
977                        DownloadState::Error {
978                            kind: format!("{:?}", e),
979                            message: error_msg.clone(),
980                            retryable,
981                        },
982                    )?;
983
984                    let _ = engine.event_tx.send(DownloadEvent::Failed {
985                        id,
986                        error: error_msg,
987                        retryable,
988                    });
989                }
990            }
991
992            Ok(())
993        });
994
995        // Store the handle
996        {
997            let mut downloads = self.downloads.write();
998            if let Some(download) = downloads.get_mut(&id) {
999                download.handle = Some(DownloadHandle::Http(HttpDownloadHandle {
1000                    cancel_token,
1001                    task,
1002                    segmented_download: segmented_ref,
1003                }));
1004            }
1005        }
1006
1007        Ok(())
1008    }
1009
1010    /// Pause a download
1011    pub async fn pause(&self, id: DownloadId) -> Result<()> {
1012        let (status_to_save, segments_to_save) = {
1013            let mut downloads = self.downloads.write();
1014            let download = downloads
1015                .get_mut(&id)
1016                .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1017
1018            // Check if can be paused
1019            if !download.status.state.is_active() {
1020                return Err(EngineError::InvalidState {
1021                    action: "pause",
1022                    current_state: format!("{:?}", download.status.state),
1023                });
1024            }
1025
1026            // Extract segments before taking the handle (for HTTP resume)
1027            let segments = match &download.handle {
1028                Some(DownloadHandle::Http(h)) => h
1029                    .segmented_download
1030                    .read()
1031                    .as_ref()
1032                    .map(|sd| sd.segments_with_progress()),
1033                _ => None,
1034            };
1035
1036            // Cancel the task
1037            if let Some(handle) = download.handle.take() {
1038                match handle {
1039                    DownloadHandle::Http(h) => {
1040                        h.cancel_token.cancel();
1041                        // Don't await the task here to avoid blocking
1042                    }
1043                    DownloadHandle::Torrent(h) => {
1044                        h.downloader.pause();
1045                        download.handle = Some(DownloadHandle::Torrent(h));
1046                        // Don't await the task
1047                    }
1048                }
1049            }
1050
1051            // Update state
1052            let old_state = download.status.state.clone();
1053            download.status.state = DownloadState::Paused;
1054
1055            // Emit events
1056            let _ = self.event_tx.send(DownloadEvent::StateChanged {
1057                id,
1058                old_state,
1059                new_state: DownloadState::Paused,
1060            });
1061            let _ = self.event_tx.send(DownloadEvent::Paused { id });
1062
1063            (download.status.clone(), segments)
1064        };
1065
1066        // Persist to database
1067        if let Some(ref storage) = self.storage {
1068            if let Err(e) = storage.save_download(&status_to_save).await {
1069                tracing::warn!("Failed to persist paused download {}: {}", id, e);
1070            }
1071            // Save HTTP segments for resume
1072            if let Some(segments) = segments_to_save {
1073                if let Err(e) = storage.save_segments(id, &segments).await {
1074                    tracing::warn!(
1075                        "Failed to persist segments for paused download {}: {}",
1076                        id,
1077                        e
1078                    );
1079                }
1080            }
1081        }
1082
1083        Ok(())
1084    }
1085
1086    /// Resume a paused download
1087    pub async fn resume(self: &Arc<Self>, id: DownloadId) -> Result<()> {
1088        // Get download info and determine type
1089        let (kind, url, options, has_torrent_handle) = {
1090            let downloads = self.downloads.read();
1091            let download = downloads
1092                .get(&id)
1093                .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1094
1095            // Check if can be resumed
1096            if download.status.state != DownloadState::Paused {
1097                return Err(EngineError::InvalidState {
1098                    action: "resume",
1099                    current_state: format!("{:?}", download.status.state),
1100                });
1101            }
1102
1103            let has_torrent_handle = matches!(download.handle, Some(DownloadHandle::Torrent(_)));
1104
1105            let options = DownloadOptions {
1106                save_dir: Some(download.status.metadata.save_dir.clone()),
1107                filename: download.status.metadata.filename.clone(),
1108                user_agent: download.status.metadata.user_agent.clone(),
1109                referer: download.status.metadata.referer.clone(),
1110                headers: download.status.metadata.headers.clone(),
1111                cookies: if download.status.metadata.cookies.is_empty() {
1112                    None
1113                } else {
1114                    Some(download.status.metadata.cookies.clone())
1115                },
1116                ..Default::default()
1117            };
1118
1119            (
1120                download.status.kind,
1121                download.status.metadata.url.clone(),
1122                options,
1123                has_torrent_handle,
1124            )
1125        };
1126
1127        match kind {
1128            DownloadKind::Http => {
1129                // HTTP: restart download with saved segments
1130                let url = url.ok_or_else(|| {
1131                    EngineError::Internal("HTTP download missing URL".to_string())
1132                })?;
1133
1134                // Load saved segments from storage if available
1135                let saved_segments = if let Some(ref storage) = self.storage {
1136                    match storage.load_segments(id).await {
1137                        Ok(segments) if !segments.is_empty() => {
1138                            tracing::debug!(
1139                                "Loaded {} saved segments for download {}",
1140                                segments.len(),
1141                                id
1142                            );
1143                            Some(segments)
1144                        }
1145                        Ok(_) => None,
1146                        Err(e) => {
1147                            tracing::debug!("Failed to load segments for {}: {}", id, e);
1148                            None
1149                        }
1150                    }
1151                } else {
1152                    None
1153                };
1154
1155                self.start_download(id, url, options, saved_segments)
1156                    .await?;
1157            }
1158            DownloadKind::Torrent | DownloadKind::Magnet => {
1159                // Torrent/Magnet: resume the existing downloader
1160                if has_torrent_handle {
1161                    let mut downloads = self.downloads.write();
1162                    if let Some(download) = downloads.get_mut(&id) {
1163                        if let Some(DownloadHandle::Torrent(ref h)) = download.handle {
1164                            h.downloader.resume();
1165                            download.status.state = DownloadState::Downloading;
1166                        }
1167                    }
1168                } else {
1169                    return Err(EngineError::Internal(
1170                        "Torrent download has no active handle to resume".to_string(),
1171                    ));
1172                }
1173            }
1174        }
1175
1176        let _ = self.event_tx.send(DownloadEvent::Resumed { id });
1177
1178        Ok(())
1179    }
1180
1181    /// Cancel a download and optionally delete files
1182    pub async fn cancel(&self, id: DownloadId, delete_files: bool) -> Result<()> {
1183        let (handle, save_path) = {
1184            let mut downloads = self.downloads.write();
1185            let download = downloads
1186                .remove(&id)
1187                .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1188
1189            let save_path = if delete_files {
1190                Some(
1191                    download.status.metadata.save_dir.join(
1192                        download
1193                            .status
1194                            .metadata
1195                            .filename
1196                            .as_deref()
1197                            .unwrap_or("download"),
1198                    ),
1199                )
1200            } else {
1201                None
1202            };
1203
1204            (download.handle, save_path)
1205        };
1206
1207        // Cancel the task if running
1208        if let Some(handle) = handle {
1209            match handle {
1210                DownloadHandle::Http(h) => {
1211                    h.cancel_token.cancel();
1212                }
1213                DownloadHandle::Torrent(h) => {
1214                    drop(h.downloader.stop());
1215                    h.progress_task.abort();
1216                    h.task.abort();
1217                }
1218            }
1219        }
1220
1221        // Delete files and segments if requested
1222        if let Some(path) = save_path {
1223            if path.exists() {
1224                if path.is_dir() {
1225                    // Multi-file torrent: remove entire directory
1226                    tokio::fs::remove_dir_all(&path).await.ok();
1227                } else {
1228                    // Single file: remove the file
1229                    tokio::fs::remove_file(&path).await.ok();
1230                }
1231            }
1232            // Also try to remove partial file
1233            let partial_path = path.with_extension("part");
1234            if partial_path.exists() {
1235                tokio::fs::remove_file(&partial_path).await.ok();
1236            }
1237        }
1238
1239        // Clean up saved segments and download record from storage
1240        if let Some(ref storage) = self.storage {
1241            if let Err(e) = storage.delete_segments(id).await {
1242                tracing::debug!(
1243                    "Failed to clean up segments for cancelled download {}: {}",
1244                    id,
1245                    e
1246                );
1247            }
1248            if let Err(e) = storage.delete_download(id).await {
1249                tracing::debug!("Failed to delete download record for {}: {}", id, e);
1250            }
1251        }
1252
1253        let _ = self.event_tx.send(DownloadEvent::Removed { id });
1254
1255        Ok(())
1256    }
1257
1258    /// Get the status of a download
1259    pub fn status(&self, id: DownloadId) -> Option<DownloadStatus> {
1260        self.downloads.read().get(&id).map(|d| d.status.clone())
1261    }
1262
1263    /// List all downloads
1264    pub fn list(&self) -> Vec<DownloadStatus> {
1265        self.downloads
1266            .read()
1267            .values()
1268            .map(|d| d.status.clone())
1269            .collect()
1270    }
1271
1272    /// Get active downloads
1273    pub fn active(&self) -> Vec<DownloadStatus> {
1274        self.downloads
1275            .read()
1276            .values()
1277            .filter(|d| d.status.state.is_active())
1278            .map(|d| d.status.clone())
1279            .collect()
1280    }
1281
1282    /// Get waiting/queued downloads
1283    pub fn waiting(&self) -> Vec<DownloadStatus> {
1284        self.downloads
1285            .read()
1286            .values()
1287            .filter(|d| matches!(d.status.state, DownloadState::Queued))
1288            .map(|d| d.status.clone())
1289            .collect()
1290    }
1291
1292    /// Get stopped downloads (paused, completed, error)
1293    pub fn stopped(&self) -> Vec<DownloadStatus> {
1294        self.downloads
1295            .read()
1296            .values()
1297            .filter(|d| {
1298                matches!(
1299                    d.status.state,
1300                    DownloadState::Paused | DownloadState::Completed | DownloadState::Error { .. }
1301                )
1302            })
1303            .map(|d| d.status.clone())
1304            .collect()
1305    }
1306
1307    /// Get global statistics
1308    pub fn global_stats(&self) -> GlobalStats {
1309        let downloads = self.downloads.read();
1310        let mut stats = GlobalStats::default();
1311
1312        for download in downloads.values() {
1313            match &download.status.state {
1314                DownloadState::Downloading | DownloadState::Seeding | DownloadState::Connecting => {
1315                    stats.num_active += 1;
1316                    stats.download_speed += download.status.progress.download_speed;
1317                    stats.upload_speed += download.status.progress.upload_speed;
1318                }
1319                DownloadState::Queued => {
1320                    stats.num_waiting += 1;
1321                }
1322                DownloadState::Paused | DownloadState::Completed | DownloadState::Error { .. } => {
1323                    stats.num_stopped += 1;
1324                }
1325            }
1326        }
1327
1328        stats
1329    }
1330
1331    /// Subscribe to download events
1332    pub fn subscribe(&self) -> broadcast::Receiver<DownloadEvent> {
1333        self.event_tx.subscribe()
1334    }
1335
1336    /// Update engine configuration
1337    pub fn set_config(&self, config: EngineConfig) -> Result<()> {
1338        config.validate()?;
1339
1340        // Update concurrent download limit
1341        // Note: This doesn't affect currently running downloads
1342
1343        *self.config.write() = config;
1344        Ok(())
1345    }
1346
1347    /// Get current configuration
1348    pub fn get_config(&self) -> EngineConfig {
1349        self.config.read().clone()
1350    }
1351
1352    /// Set the priority of a download
1353    ///
1354    /// This affects the order in which downloads acquire slots when queued.
1355    /// If the download is already active, the priority is updated but
1356    /// won't affect scheduling until the download is paused and resumed.
1357    ///
1358    /// The priority change is persisted to storage immediately (non-blocking).
1359    pub fn set_priority(&self, id: DownloadId, priority: DownloadPriority) -> Result<()> {
1360        // Update in downloads map and get status for persistence
1361        let status_to_save = {
1362            let mut downloads = self.downloads.write();
1363            let download = downloads
1364                .get_mut(&id)
1365                .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1366            download.status.priority = priority;
1367            download.status.clone()
1368        };
1369
1370        // Update in priority queue (affects scheduling if waiting)
1371        self.priority_queue.set_priority(id, priority);
1372
1373        // Persist in background (fire-and-forget)
1374        if let Some(storage) = self.storage.as_ref().map(Arc::clone) {
1375            tokio::spawn(async move {
1376                if let Err(e) = storage.save_download(&status_to_save).await {
1377                    tracing::debug!("Failed to persist priority change for {}: {}", id, e);
1378                }
1379            });
1380        }
1381
1382        Ok(())
1383    }
1384
1385    /// Get the current priority of a download
1386    pub fn get_priority(&self, id: DownloadId) -> Option<DownloadPriority> {
1387        self.downloads.read().get(&id).map(|d| d.status.priority)
1388    }
1389
1390    /// Get current bandwidth limits (accounting for schedule rules)
1391    pub fn get_bandwidth_limits(&self) -> BandwidthLimits {
1392        self.scheduler.read().get_limits()
1393    }
1394
1395    /// Update the bandwidth schedule rules
1396    ///
1397    /// The new rules take effect immediately after evaluation.
1398    pub fn set_schedule_rules(&self, rules: Vec<crate::scheduler::ScheduleRule>) {
1399        self.scheduler.write().set_rules(rules);
1400    }
1401
1402    /// Get the current schedule rules
1403    pub fn get_schedule_rules(&self) -> Vec<crate::scheduler::ScheduleRule> {
1404        self.scheduler.read().rules().to_vec()
1405    }
1406
1407    /// Graceful shutdown
1408    pub async fn shutdown(&self) -> Result<()> {
1409        // Signal shutdown
1410        self.shutdown.cancel();
1411
1412        // Cancel all active downloads
1413        let handles: Vec<_> = {
1414            let mut downloads = self.downloads.write();
1415            downloads
1416                .values_mut()
1417                .filter_map(|d| d.handle.take())
1418                .collect()
1419        };
1420
1421        for handle in handles {
1422            match handle {
1423                DownloadHandle::Http(h) => {
1424                    h.cancel_token.cancel();
1425                    // Wait for task to finish (with timeout)
1426                    let _ = tokio::time::timeout(std::time::Duration::from_secs(5), h.task).await;
1427                }
1428                DownloadHandle::Torrent(h) => {
1429                    drop(h.downloader.stop());
1430                    h.progress_task.abort();
1431                    // Wait for task to finish (with timeout)
1432                    let _ = tokio::time::timeout(std::time::Duration::from_secs(5), h.task).await;
1433                }
1434            }
1435        }
1436
1437        Ok(())
1438    }
1439
1440    /// Helper to update download state
1441    fn update_state(&self, id: DownloadId, new_state: DownloadState) -> Result<()> {
1442        let mut downloads = self.downloads.write();
1443        let download = downloads
1444            .get_mut(&id)
1445            .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1446
1447        let old_state = download.status.state.clone();
1448        download.status.state = new_state.clone();
1449
1450        let _ = self.event_tx.send(DownloadEvent::StateChanged {
1451            id,
1452            old_state,
1453            new_state,
1454        });
1455
1456        Ok(())
1457    }
1458}
1459
1460impl Drop for DownloadEngine {
1461    fn drop(&mut self) {
1462        // Signal shutdown on drop
1463        self.shutdown.cancel();
1464    }
1465}