1use crate::config::EngineConfig;
8use crate::error::{EngineError, Result};
9use crate::http::{HttpDownloader, SegmentedDownload};
10use crate::priority_queue::{DownloadPriority, PriorityQueue};
11use crate::scheduler::{BandwidthLimits, BandwidthScheduler};
12use crate::storage::{SqliteStorage, Storage};
13use crate::torrent::{MagnetUri, Metainfo, TorrentConfig, TorrentDownloader};
14use crate::types::{
15 DownloadEvent, DownloadId, DownloadKind, DownloadMetadata, DownloadOptions, DownloadProgress,
16 DownloadState, DownloadStatus, GlobalStats, TorrentFile, TorrentStatusInfo,
17};
18
19use chrono::Utc;
20use parking_lot::RwLock;
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::sync::broadcast;
25use url::Url;
26
27const EVENT_CHANNEL_CAPACITY: usize = 1024;
29
30struct ManagedDownload {
32 status: DownloadStatus,
33 handle: Option<DownloadHandle>,
34}
35
36enum DownloadHandle {
38 Http(HttpDownloadHandle),
39 Torrent(TorrentDownloadHandle),
40}
41
42struct HttpDownloadHandle {
44 cancel_token: tokio_util::sync::CancellationToken,
45 task: tokio::task::JoinHandle<Result<()>>,
46 segmented_download: Arc<RwLock<Option<Arc<SegmentedDownload>>>>,
49}
50
51struct TorrentDownloadHandle {
53 downloader: Arc<TorrentDownloader>,
54 task: tokio::task::JoinHandle<Result<()>>,
55 progress_task: tokio::task::JoinHandle<()>,
56}
57
58pub struct DownloadEngine {
60 config: RwLock<EngineConfig>,
62
63 downloads: RwLock<HashMap<DownloadId, ManagedDownload>>,
65
66 http: Arc<HttpDownloader>,
68
69 event_tx: broadcast::Sender<DownloadEvent>,
71
72 priority_queue: Arc<PriorityQueue>,
74
75 scheduler: Arc<RwLock<BandwidthScheduler>>,
77
78 shutdown: tokio_util::sync::CancellationToken,
80
81 storage: Option<Arc<dyn Storage>>,
83}
84
85impl DownloadEngine {
86 pub async fn new(config: EngineConfig) -> Result<Arc<Self>> {
88 config.validate()?;
90
91 let (event_tx, _) = broadcast::channel(EVENT_CHANNEL_CAPACITY);
93
94 let http = Arc::new(HttpDownloader::new(&config)?);
96
97 let priority_queue = PriorityQueue::new(config.max_concurrent_downloads);
99
100 let scheduler = Arc::new(RwLock::new(BandwidthScheduler::new(
102 config.schedule_rules.clone(),
103 BandwidthLimits {
104 download: config.global_download_limit,
105 upload: config.global_upload_limit,
106 },
107 )));
108
109 let storage: Option<Arc<dyn Storage>> = if let Some(ref db_path) = config.database_path {
111 match SqliteStorage::new(db_path).await {
112 Ok(s) => Some(Arc::new(s)),
113 Err(e) => {
114 tracing::warn!("Failed to initialize database storage: {}. Downloads will not be persisted.", e);
115 None
116 }
117 }
118 } else {
119 None
120 };
121
122 let engine = Arc::new(Self {
123 config: RwLock::new(config),
124 downloads: RwLock::new(HashMap::new()),
125 http,
126 event_tx,
127 priority_queue,
128 scheduler,
129 shutdown: tokio_util::sync::CancellationToken::new(),
130 storage,
131 });
132
133 engine.load_persisted_downloads().await?;
135
136 Self::start_persistence_task(Arc::clone(&engine));
138
139 Self::start_scheduler_task(Arc::clone(&engine));
141
142 Ok(engine)
143 }
144
145 fn start_persistence_task(engine: Arc<Self>) {
150 if engine.storage.is_none() {
151 return; }
153
154 let shutdown = engine.shutdown.clone();
155 tokio::spawn(async move {
156 const PERSISTENCE_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
157 let mut interval = tokio::time::interval(PERSISTENCE_INTERVAL);
158
159 loop {
160 tokio::select! {
161 _ = interval.tick() => {
162 if let Err(e) = engine.persist_active_downloads().await {
163 tracing::warn!("Failed to persist active downloads: {}", e);
164 }
165 }
166 _ = shutdown.cancelled() => {
167 if let Err(e) = engine.persist_active_downloads().await {
169 tracing::warn!("Failed to persist downloads on shutdown: {}", e);
170 }
171 break;
172 }
173 }
174 }
175 });
176 }
177
178 fn start_scheduler_task(engine: Arc<Self>) {
183 let shutdown = engine.shutdown.clone();
184 tokio::spawn(async move {
185 const SCHEDULER_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
186 let mut interval = tokio::time::interval(SCHEDULER_INTERVAL);
187
188 loop {
189 tokio::select! {
190 _ = interval.tick() => {
191 engine.scheduler.read().update();
192 }
193 _ = shutdown.cancelled() => {
194 break;
195 }
196 }
197 }
198 });
199 }
200
201 async fn persist_active_downloads(&self) -> Result<()> {
203 let storage = match &self.storage {
204 Some(s) => s,
205 None => return Ok(()),
206 };
207
208 let active_downloads: Vec<(DownloadStatus, Option<Vec<crate::storage::Segment>>)> = {
210 let downloads = self.downloads.read();
211 downloads
212 .values()
213 .filter(|d| d.status.state.is_active())
214 .map(|d| {
215 let segments = match &d.handle {
216 Some(DownloadHandle::Http(h)) => h
217 .segmented_download
218 .read()
219 .as_ref()
220 .map(|sd| sd.segments_with_progress()),
221 _ => None,
222 };
223 (d.status.clone(), segments)
224 })
225 .collect()
226 };
227
228 for (status, segments_opt) in active_downloads {
230 if let Err(e) = storage.save_download(&status).await {
231 tracing::debug!("Failed to persist download {}: {}", status.id, e);
232 }
233
234 if let Some(segments) = segments_opt {
236 if let Err(e) = storage.save_segments(status.id, &segments).await {
237 tracing::debug!("Failed to persist segments for {}: {}", status.id, e);
238 }
239 }
240 }
241
242 Ok(())
243 }
244
245 async fn load_persisted_downloads(self: &Arc<Self>) -> Result<()> {
247 let storage = match &self.storage {
248 Some(s) => s,
249 None => return Ok(()), };
251
252 let persisted = storage.load_all().await?;
253
254 for status in persisted {
255 if matches!(status.state, DownloadState::Completed) {
257 continue;
258 }
259
260 let restored_state = match &status.state {
262 DownloadState::Downloading | DownloadState::Connecting => DownloadState::Paused,
263 DownloadState::Seeding => DownloadState::Paused, other => other.clone(),
265 };
266
267 let mut restored_status = status.clone();
268 restored_status.state = restored_state;
269 restored_status.progress.download_speed = 0;
271 restored_status.progress.upload_speed = 0;
272 restored_status.progress.connections = 0;
273
274 self.downloads.write().insert(
276 status.id,
277 ManagedDownload {
278 status: restored_status,
279 handle: None,
280 },
281 );
282
283 tracing::info!(
284 "Restored download {} ({}) in state {:?}",
285 status.id,
286 status.metadata.name,
287 status.state
288 );
289 }
290
291 Ok(())
292 }
293
294 fn build_torrent_status_info(metainfo: &Metainfo) -> TorrentStatusInfo {
295 TorrentStatusInfo {
296 files: metainfo
297 .info
298 .files
299 .iter()
300 .enumerate()
301 .map(|(i, f)| TorrentFile {
302 index: i,
303 path: f.path.clone(),
304 size: f.length,
305 completed: 0,
306 selected: true,
307 })
308 .collect(),
309 piece_length: metainfo.info.piece_length,
310 pieces_count: metainfo.info.pieces.len(),
311 private: metainfo.info.private,
312 }
313 }
314
315 pub async fn add_http(
317 self: &Arc<Self>,
318 url: &str,
319 options: DownloadOptions,
320 ) -> Result<DownloadId> {
321 let parsed_url = Url::parse(url)
323 .map_err(|e| EngineError::invalid_input("url", format!("Invalid URL: {}", e)))?;
324
325 match parsed_url.scheme() {
327 "http" | "https" => {}
328 scheme => {
329 return Err(EngineError::invalid_input(
330 "url",
331 format!("Unsupported scheme: {}", scheme),
332 ));
333 }
334 }
335
336 let id = DownloadId::new();
338
339 let save_dir = options
341 .save_dir
342 .clone()
343 .unwrap_or_else(|| self.config.read().download_dir.clone());
344
345 let filename = options.filename.clone().or_else(|| {
347 parsed_url
348 .path_segments()
349 .and_then(|mut segments| segments.next_back())
350 .map(|s| s.to_string())
351 .filter(|s| !s.is_empty())
352 });
353
354 let name = filename.clone().unwrap_or_else(|| "download".to_string());
355
356 let status = DownloadStatus {
358 id,
359 kind: DownloadKind::Http,
360 state: DownloadState::Queued,
361 priority: options.priority,
362 progress: DownloadProgress::default(),
363 metadata: DownloadMetadata {
364 name,
365 url: Some(url.to_string()),
366 magnet_uri: None,
367 info_hash: None,
368 save_dir,
369 filename,
370 user_agent: options.user_agent.clone(),
371 referer: options.referer.clone(),
372 headers: options.headers.clone(),
373 cookies: options.cookies.clone().unwrap_or_default(),
374 checksum: options.checksum.clone(),
375 mirrors: options.mirrors.clone(),
376 etag: None,
377 last_modified: None,
378 },
379 torrent_info: None,
380 peers: None,
381 created_at: Utc::now(),
382 completed_at: None,
383 };
384
385 {
387 let mut downloads = self.downloads.write();
388 downloads.insert(
389 id,
390 ManagedDownload {
391 status: status.clone(),
392 handle: None,
393 },
394 );
395 }
396
397 if let Some(ref storage) = self.storage {
399 if let Err(e) = storage.save_download(&status).await {
400 tracing::warn!("Failed to persist new download {}: {}", id, e);
401 }
402 }
403
404 let _ = self.event_tx.send(DownloadEvent::Added { id });
406
407 self.start_download(id, url.to_string(), options, None)
409 .await?;
410
411 Ok(id)
412 }
413
414 pub async fn add_torrent(
416 self: &Arc<Self>,
417 torrent_data: &[u8],
418 options: DownloadOptions,
419 ) -> Result<DownloadId> {
420 let metainfo = Metainfo::parse(torrent_data)?;
422
423 let id = DownloadId::new();
425
426 let save_dir = options
428 .save_dir
429 .clone()
430 .unwrap_or_else(|| self.config.read().download_dir.clone());
431
432 let status = DownloadStatus {
434 id,
435 kind: DownloadKind::Torrent,
436 state: DownloadState::Queued,
437 priority: options.priority,
438 progress: DownloadProgress::default(),
439 metadata: DownloadMetadata {
440 name: metainfo.info.name.clone(),
441 url: None,
442 magnet_uri: None,
443 info_hash: Some(hex::encode(metainfo.info_hash)),
444 save_dir: save_dir.clone(),
445 filename: Some(metainfo.info.name.clone()),
446 user_agent: options.user_agent.clone(),
447 referer: None,
448 headers: Vec::new(),
449 cookies: Vec::new(),
450 checksum: None,
451 mirrors: Vec::new(),
452 etag: None,
453 last_modified: None,
454 },
455 torrent_info: Some(Self::build_torrent_status_info(&metainfo)),
456 peers: Some(Vec::new()),
457 created_at: Utc::now(),
458 completed_at: None,
459 };
460
461 {
463 let mut downloads = self.downloads.write();
464 downloads.insert(
465 id,
466 ManagedDownload {
467 status: status.clone(),
468 handle: None,
469 },
470 );
471 }
472
473 if let Some(ref storage) = self.storage {
475 if let Err(e) = storage.save_download(&status).await {
476 tracing::warn!("Failed to persist new torrent download {}: {}", id, e);
477 }
478 }
479
480 let _ = self.event_tx.send(DownloadEvent::Added { id });
482
483 self.start_torrent(id, metainfo, save_dir, options).await?;
485
486 Ok(id)
487 }
488
489 pub async fn add_magnet(
491 self: &Arc<Self>,
492 magnet_uri: &str,
493 options: DownloadOptions,
494 ) -> Result<DownloadId> {
495 let magnet = MagnetUri::parse(magnet_uri)?;
497
498 let id = DownloadId::new();
500
501 let save_dir = options
503 .save_dir
504 .clone()
505 .unwrap_or_else(|| self.config.read().download_dir.clone());
506
507 let status = DownloadStatus {
509 id,
510 kind: DownloadKind::Magnet,
511 state: DownloadState::Queued,
512 priority: options.priority,
513 progress: DownloadProgress::default(),
514 metadata: DownloadMetadata {
515 name: magnet.name(),
516 url: None,
517 magnet_uri: Some(magnet_uri.to_string()),
518 info_hash: Some(hex::encode(magnet.info_hash)),
519 save_dir: save_dir.clone(),
520 filename: magnet.display_name.clone(),
521 user_agent: options.user_agent.clone(),
522 referer: None,
523 headers: Vec::new(),
524 cookies: Vec::new(),
525 checksum: None,
526 mirrors: Vec::new(),
527 etag: None,
528 last_modified: None,
529 },
530 torrent_info: None, peers: Some(Vec::new()),
532 created_at: Utc::now(),
533 completed_at: None,
534 };
535
536 {
538 let mut downloads = self.downloads.write();
539 downloads.insert(
540 id,
541 ManagedDownload {
542 status: status.clone(),
543 handle: None,
544 },
545 );
546 }
547
548 if let Some(ref storage) = self.storage {
550 if let Err(e) = storage.save_download(&status).await {
551 tracing::warn!("Failed to persist new magnet download {}: {}", id, e);
552 }
553 }
554
555 let _ = self.event_tx.send(DownloadEvent::Added { id });
557
558 self.start_magnet(id, magnet, save_dir, options).await?;
560
561 Ok(id)
562 }
563
564 async fn start_torrent(
566 self: &Arc<Self>,
567 id: DownloadId,
568 metainfo: Metainfo,
569 save_dir: std::path::PathBuf,
570 options: DownloadOptions,
571 ) -> Result<()> {
572 let config = self.config.read();
573 let torrent_config = TorrentConfig {
574 max_peers: config.max_peers,
575 enable_dht: config.enable_dht,
576 enable_pex: config.enable_pex,
577 enable_lpd: config.enable_lpd,
578 seed_ratio: Some(config.seed_ratio),
579 max_upload_speed: config.global_upload_limit.unwrap_or(0),
580 max_download_speed: config.global_download_limit.unwrap_or(0),
581 ..TorrentConfig::default()
582 };
583 drop(config);
584
585 let downloader = Arc::new(TorrentDownloader::from_torrent(
586 id,
587 metainfo,
588 save_dir,
589 torrent_config,
590 self.event_tx.clone(),
591 )?);
592
593 if let Some(ref selected) = options.selected_files {
595 downloader.set_selected_files(Some(selected));
596 }
597
598 if let Some(sequential) = options.sequential {
600 downloader.set_sequential(sequential);
601 }
602
603 let downloader_clone = Arc::clone(&downloader);
604 let engine = Arc::clone(self);
605
606 self.update_state(id, DownloadState::Connecting)?;
608
609 let task = tokio::spawn(async move {
610 if let Err(e) = Arc::clone(&downloader_clone).start().await {
612 let error_msg = e.to_string();
613 engine.update_state(
614 id,
615 DownloadState::Error {
616 kind: format!("{:?}", e),
617 message: error_msg.clone(),
618 retryable: e.is_retryable(),
619 },
620 )?;
621 let _ = engine.event_tx.send(DownloadEvent::Failed {
622 id,
623 error: error_msg,
624 retryable: e.is_retryable(),
625 });
626 return Ok(());
627 }
628
629 engine.update_state(id, DownloadState::Downloading)?;
631 let _ = engine.event_tx.send(DownloadEvent::Started { id });
632
633 if let Err(e) = downloader_clone.run_peer_loop().await {
635 let error_msg = e.to_string();
636 engine.update_state(
637 id,
638 DownloadState::Error {
639 kind: format!("{:?}", e),
640 message: error_msg.clone(),
641 retryable: e.is_retryable(),
642 },
643 )?;
644 let _ = engine.event_tx.send(DownloadEvent::Failed {
645 id,
646 error: error_msg,
647 retryable: e.is_retryable(),
648 });
649 }
650
651 Ok(())
652 });
653
654 let progress_task =
655 Self::spawn_torrent_progress_task(Arc::clone(self), id, Arc::clone(&downloader));
656
657 {
659 let mut downloads = self.downloads.write();
660 if let Some(download) = downloads.get_mut(&id) {
661 download.handle = Some(DownloadHandle::Torrent(TorrentDownloadHandle {
662 downloader,
663 task,
664 progress_task,
665 }));
666 }
667 }
668
669 Ok(())
670 }
671
672 async fn start_magnet(
674 self: &Arc<Self>,
675 id: DownloadId,
676 magnet: MagnetUri,
677 save_dir: std::path::PathBuf,
678 options: DownloadOptions,
679 ) -> Result<()> {
680 let config = self.config.read();
681 let torrent_config = TorrentConfig {
682 max_peers: config.max_peers,
683 enable_dht: config.enable_dht,
684 enable_pex: config.enable_pex,
685 enable_lpd: config.enable_lpd,
686 seed_ratio: Some(config.seed_ratio),
687 max_upload_speed: config.global_upload_limit.unwrap_or(0),
688 max_download_speed: config.global_download_limit.unwrap_or(0),
689 ..TorrentConfig::default()
690 };
691 drop(config);
692
693 let downloader = Arc::new(TorrentDownloader::from_magnet(
694 id,
695 magnet,
696 save_dir,
697 torrent_config,
698 self.event_tx.clone(),
699 )?);
700
701 if let Some(sequential) = options.sequential {
703 downloader.set_sequential(sequential);
704 }
705
706 let downloader_clone = Arc::clone(&downloader);
707 let engine = Arc::clone(self);
708
709 self.update_state(id, DownloadState::Connecting)?;
711
712 let task = tokio::spawn(async move {
713 if let Err(e) = Arc::clone(&downloader_clone).start().await {
715 let error_msg = e.to_string();
716 engine.update_state(
717 id,
718 DownloadState::Error {
719 kind: format!("{:?}", e),
720 message: error_msg.clone(),
721 retryable: e.is_retryable(),
722 },
723 )?;
724 let _ = engine.event_tx.send(DownloadEvent::Failed {
725 id,
726 error: error_msg,
727 retryable: e.is_retryable(),
728 });
729 return Ok(());
730 }
731
732 engine.update_state(id, DownloadState::Downloading)?;
734 let _ = engine.event_tx.send(DownloadEvent::Started { id });
735
736 if let Err(e) = downloader_clone.run_peer_loop().await {
738 let error_msg = e.to_string();
739 engine.update_state(
740 id,
741 DownloadState::Error {
742 kind: format!("{:?}", e),
743 message: error_msg.clone(),
744 retryable: e.is_retryable(),
745 },
746 )?;
747 let _ = engine.event_tx.send(DownloadEvent::Failed {
748 id,
749 error: error_msg,
750 retryable: e.is_retryable(),
751 });
752 }
753
754 Ok(())
755 });
756
757 let progress_task =
758 Self::spawn_torrent_progress_task(Arc::clone(self), id, Arc::clone(&downloader));
759
760 {
762 let mut downloads = self.downloads.write();
763 if let Some(download) = downloads.get_mut(&id) {
764 download.handle = Some(DownloadHandle::Torrent(TorrentDownloadHandle {
765 downloader,
766 task,
767 progress_task,
768 }));
769 }
770 }
771
772 Ok(())
773 }
774
775 fn spawn_torrent_progress_task(
776 engine: Arc<Self>,
777 id: DownloadId,
778 downloader: Arc<TorrentDownloader>,
779 ) -> tokio::task::JoinHandle<()> {
780 let shutdown = engine.shutdown.clone();
781 tokio::spawn(async move {
782 let mut interval = tokio::time::interval(Duration::from_millis(500));
783 loop {
784 tokio::select! {
785 _ = shutdown.cancelled() => break,
786 _ = interval.tick() => {}
787 }
788
789 let progress = downloader.progress();
790 let metainfo = downloader.metainfo();
791 let send_progress = {
792 let mut downloads = engine.downloads.write();
793 let download = match downloads.get_mut(&id) {
794 Some(download) => download,
795 None => break,
796 };
797
798 if matches!(
799 download.status.state,
800 DownloadState::Error { .. } | DownloadState::Completed
801 ) {
802 break;
803 }
804
805 if let Some(ref metainfo) = metainfo {
806 if download.status.torrent_info.is_none() {
807 download.status.torrent_info =
808 Some(Self::build_torrent_status_info(metainfo));
809 }
810 if download.status.metadata.name != metainfo.info.name {
811 download.status.metadata.name = metainfo.info.name.clone();
812 }
813 if download.status.metadata.filename.as_deref()
814 != Some(metainfo.info.name.as_str())
815 {
816 download.status.metadata.filename = Some(metainfo.info.name.clone());
817 }
818 }
819
820 download.status.progress = progress.clone();
821 download.status.state.is_active()
822 };
823
824 if send_progress {
825 let _ = engine
826 .event_tx
827 .send(DownloadEvent::Progress { id, progress });
828 }
829 }
830 })
831 }
832
833 async fn start_download(
835 self: &Arc<Self>,
836 id: DownloadId,
837 url: String,
838 options: DownloadOptions,
839 saved_segments: Option<Vec<crate::storage::Segment>>,
840 ) -> Result<()> {
841 let engine = Arc::clone(self);
842 let http = Arc::clone(&self.http);
843 let priority_queue = Arc::clone(&self.priority_queue);
844 let priority = options.priority;
845 let cancel_token = tokio_util::sync::CancellationToken::new();
846 let cancel_token_clone = cancel_token.clone();
847
848 let segmented_ref: Arc<RwLock<Option<Arc<SegmentedDownload>>>> =
850 Arc::new(RwLock::new(None));
851 let segmented_ref_for_task = Arc::clone(&segmented_ref);
852
853 self.update_state(id, DownloadState::Queued)?;
855
856 let task = tokio::spawn(async move {
857 let _permit = priority_queue.acquire(id, priority).await;
859
860 if cancel_token_clone.is_cancelled() {
862 return Ok(());
863 }
864
865 engine.update_state(id, DownloadState::Connecting)?;
867 engine.update_state(id, DownloadState::Downloading)?;
868 let _ = engine.event_tx.send(DownloadEvent::Started { id });
869
870 let (save_dir, filename, user_agent, referer, headers, cookies, checksum) = {
872 let downloads = engine.downloads.read();
873 let download = downloads
874 .get(&id)
875 .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
876 (
877 download.status.metadata.save_dir.clone(),
878 download.status.metadata.filename.clone(),
879 download.status.metadata.user_agent.clone(),
880 download.status.metadata.referer.clone(),
881 download.status.metadata.headers.clone(),
882 download.status.metadata.cookies.clone(),
883 download.status.metadata.checksum.clone(),
884 )
885 };
886
887 let engine_clone = Arc::clone(&engine);
889 let progress_callback = move |progress: DownloadProgress| {
890 {
892 let mut downloads = engine_clone.downloads.write();
893 if let Some(download) = downloads.get_mut(&id) {
894 download.status.progress = progress.clone();
895 }
896 }
897 let _ = engine_clone
899 .event_tx
900 .send(DownloadEvent::Progress { id, progress });
901 };
902
903 let (max_connections, min_segment_size) = {
905 let config = engine.config.read();
906 (config.max_connections_per_download, config.min_segment_size)
907 };
908
909 let cookies_opt = if cookies.is_empty() {
911 None
912 } else {
913 Some(cookies.as_slice())
914 };
915 let result = http
916 .download_segmented(
917 &url,
918 &save_dir,
919 filename.as_deref(),
920 user_agent.as_deref(),
921 referer.as_deref(),
922 &headers,
923 cookies_opt,
924 checksum.as_ref(),
925 max_connections,
926 min_segment_size,
927 cancel_token_clone.clone(),
928 saved_segments,
929 progress_callback,
930 Some(segmented_ref_for_task),
931 )
932 .await;
933
934 match result {
935 Ok((final_path, _segmented_download)) => {
936 let should_complete = {
938 let mut downloads = engine.downloads.write();
939 if let Some(download) = downloads.get_mut(&id) {
940 if download.status.state == DownloadState::Paused {
942 false
943 } else {
944 download.status.state = DownloadState::Completed;
945 download.status.completed_at = Some(Utc::now());
946 download.status.metadata.filename = final_path
947 .file_name()
948 .map(|s| s.to_string_lossy().to_string());
949 true
950 }
951 } else {
952 false
953 }
954 };
955
956 if should_complete {
957 if let Some(ref storage) = engine.storage {
959 if let Err(e) = storage.delete_segments(id).await {
960 tracing::debug!("Failed to clean up segments for {}: {}", id, e);
961 }
962 }
963
964 let _ = engine.event_tx.send(DownloadEvent::Completed { id });
965 }
966 }
967 Err(e) if cancel_token_clone.is_cancelled() => {
968 }
970 Err(e) => {
971 let retryable = e.is_retryable();
972 let error_msg = e.to_string();
973
974 engine.update_state(
976 id,
977 DownloadState::Error {
978 kind: format!("{:?}", e),
979 message: error_msg.clone(),
980 retryable,
981 },
982 )?;
983
984 let _ = engine.event_tx.send(DownloadEvent::Failed {
985 id,
986 error: error_msg,
987 retryable,
988 });
989 }
990 }
991
992 Ok(())
993 });
994
995 {
997 let mut downloads = self.downloads.write();
998 if let Some(download) = downloads.get_mut(&id) {
999 download.handle = Some(DownloadHandle::Http(HttpDownloadHandle {
1000 cancel_token,
1001 task,
1002 segmented_download: segmented_ref,
1003 }));
1004 }
1005 }
1006
1007 Ok(())
1008 }
1009
1010 pub async fn pause(&self, id: DownloadId) -> Result<()> {
1012 let (status_to_save, segments_to_save) = {
1013 let mut downloads = self.downloads.write();
1014 let download = downloads
1015 .get_mut(&id)
1016 .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1017
1018 if !download.status.state.is_active() {
1020 return Err(EngineError::InvalidState {
1021 action: "pause",
1022 current_state: format!("{:?}", download.status.state),
1023 });
1024 }
1025
1026 let segments = match &download.handle {
1028 Some(DownloadHandle::Http(h)) => h
1029 .segmented_download
1030 .read()
1031 .as_ref()
1032 .map(|sd| sd.segments_with_progress()),
1033 _ => None,
1034 };
1035
1036 if let Some(handle) = download.handle.take() {
1038 match handle {
1039 DownloadHandle::Http(h) => {
1040 h.cancel_token.cancel();
1041 }
1043 DownloadHandle::Torrent(h) => {
1044 h.downloader.pause();
1045 download.handle = Some(DownloadHandle::Torrent(h));
1046 }
1048 }
1049 }
1050
1051 let old_state = download.status.state.clone();
1053 download.status.state = DownloadState::Paused;
1054
1055 let _ = self.event_tx.send(DownloadEvent::StateChanged {
1057 id,
1058 old_state,
1059 new_state: DownloadState::Paused,
1060 });
1061 let _ = self.event_tx.send(DownloadEvent::Paused { id });
1062
1063 (download.status.clone(), segments)
1064 };
1065
1066 if let Some(ref storage) = self.storage {
1068 if let Err(e) = storage.save_download(&status_to_save).await {
1069 tracing::warn!("Failed to persist paused download {}: {}", id, e);
1070 }
1071 if let Some(segments) = segments_to_save {
1073 if let Err(e) = storage.save_segments(id, &segments).await {
1074 tracing::warn!(
1075 "Failed to persist segments for paused download {}: {}",
1076 id,
1077 e
1078 );
1079 }
1080 }
1081 }
1082
1083 Ok(())
1084 }
1085
1086 pub async fn resume(self: &Arc<Self>, id: DownloadId) -> Result<()> {
1088 let (kind, url, options, has_torrent_handle) = {
1090 let downloads = self.downloads.read();
1091 let download = downloads
1092 .get(&id)
1093 .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1094
1095 if download.status.state != DownloadState::Paused {
1097 return Err(EngineError::InvalidState {
1098 action: "resume",
1099 current_state: format!("{:?}", download.status.state),
1100 });
1101 }
1102
1103 let has_torrent_handle = matches!(download.handle, Some(DownloadHandle::Torrent(_)));
1104
1105 let options = DownloadOptions {
1106 save_dir: Some(download.status.metadata.save_dir.clone()),
1107 filename: download.status.metadata.filename.clone(),
1108 user_agent: download.status.metadata.user_agent.clone(),
1109 referer: download.status.metadata.referer.clone(),
1110 headers: download.status.metadata.headers.clone(),
1111 cookies: if download.status.metadata.cookies.is_empty() {
1112 None
1113 } else {
1114 Some(download.status.metadata.cookies.clone())
1115 },
1116 ..Default::default()
1117 };
1118
1119 (
1120 download.status.kind,
1121 download.status.metadata.url.clone(),
1122 options,
1123 has_torrent_handle,
1124 )
1125 };
1126
1127 match kind {
1128 DownloadKind::Http => {
1129 let url = url.ok_or_else(|| {
1131 EngineError::Internal("HTTP download missing URL".to_string())
1132 })?;
1133
1134 let saved_segments = if let Some(ref storage) = self.storage {
1136 match storage.load_segments(id).await {
1137 Ok(segments) if !segments.is_empty() => {
1138 tracing::debug!(
1139 "Loaded {} saved segments for download {}",
1140 segments.len(),
1141 id
1142 );
1143 Some(segments)
1144 }
1145 Ok(_) => None,
1146 Err(e) => {
1147 tracing::debug!("Failed to load segments for {}: {}", id, e);
1148 None
1149 }
1150 }
1151 } else {
1152 None
1153 };
1154
1155 self.start_download(id, url, options, saved_segments)
1156 .await?;
1157 }
1158 DownloadKind::Torrent | DownloadKind::Magnet => {
1159 if has_torrent_handle {
1161 let mut downloads = self.downloads.write();
1162 if let Some(download) = downloads.get_mut(&id) {
1163 if let Some(DownloadHandle::Torrent(ref h)) = download.handle {
1164 h.downloader.resume();
1165 download.status.state = DownloadState::Downloading;
1166 }
1167 }
1168 } else {
1169 return Err(EngineError::Internal(
1170 "Torrent download has no active handle to resume".to_string(),
1171 ));
1172 }
1173 }
1174 }
1175
1176 let _ = self.event_tx.send(DownloadEvent::Resumed { id });
1177
1178 Ok(())
1179 }
1180
1181 pub async fn cancel(&self, id: DownloadId, delete_files: bool) -> Result<()> {
1183 let (handle, save_path) = {
1184 let mut downloads = self.downloads.write();
1185 let download = downloads
1186 .remove(&id)
1187 .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1188
1189 let save_path = if delete_files {
1190 Some(
1191 download.status.metadata.save_dir.join(
1192 download
1193 .status
1194 .metadata
1195 .filename
1196 .as_deref()
1197 .unwrap_or("download"),
1198 ),
1199 )
1200 } else {
1201 None
1202 };
1203
1204 (download.handle, save_path)
1205 };
1206
1207 if let Some(handle) = handle {
1209 match handle {
1210 DownloadHandle::Http(h) => {
1211 h.cancel_token.cancel();
1212 }
1213 DownloadHandle::Torrent(h) => {
1214 drop(h.downloader.stop());
1215 h.progress_task.abort();
1216 h.task.abort();
1217 }
1218 }
1219 }
1220
1221 if let Some(path) = save_path {
1223 if path.exists() {
1224 if path.is_dir() {
1225 tokio::fs::remove_dir_all(&path).await.ok();
1227 } else {
1228 tokio::fs::remove_file(&path).await.ok();
1230 }
1231 }
1232 let partial_path = path.with_extension("part");
1234 if partial_path.exists() {
1235 tokio::fs::remove_file(&partial_path).await.ok();
1236 }
1237 }
1238
1239 if let Some(ref storage) = self.storage {
1241 if let Err(e) = storage.delete_segments(id).await {
1242 tracing::debug!(
1243 "Failed to clean up segments for cancelled download {}: {}",
1244 id,
1245 e
1246 );
1247 }
1248 if let Err(e) = storage.delete_download(id).await {
1249 tracing::debug!("Failed to delete download record for {}: {}", id, e);
1250 }
1251 }
1252
1253 let _ = self.event_tx.send(DownloadEvent::Removed { id });
1254
1255 Ok(())
1256 }
1257
1258 pub fn status(&self, id: DownloadId) -> Option<DownloadStatus> {
1260 self.downloads.read().get(&id).map(|d| d.status.clone())
1261 }
1262
1263 pub fn list(&self) -> Vec<DownloadStatus> {
1265 self.downloads
1266 .read()
1267 .values()
1268 .map(|d| d.status.clone())
1269 .collect()
1270 }
1271
1272 pub fn active(&self) -> Vec<DownloadStatus> {
1274 self.downloads
1275 .read()
1276 .values()
1277 .filter(|d| d.status.state.is_active())
1278 .map(|d| d.status.clone())
1279 .collect()
1280 }
1281
1282 pub fn waiting(&self) -> Vec<DownloadStatus> {
1284 self.downloads
1285 .read()
1286 .values()
1287 .filter(|d| matches!(d.status.state, DownloadState::Queued))
1288 .map(|d| d.status.clone())
1289 .collect()
1290 }
1291
1292 pub fn stopped(&self) -> Vec<DownloadStatus> {
1294 self.downloads
1295 .read()
1296 .values()
1297 .filter(|d| {
1298 matches!(
1299 d.status.state,
1300 DownloadState::Paused | DownloadState::Completed | DownloadState::Error { .. }
1301 )
1302 })
1303 .map(|d| d.status.clone())
1304 .collect()
1305 }
1306
1307 pub fn global_stats(&self) -> GlobalStats {
1309 let downloads = self.downloads.read();
1310 let mut stats = GlobalStats::default();
1311
1312 for download in downloads.values() {
1313 match &download.status.state {
1314 DownloadState::Downloading | DownloadState::Seeding | DownloadState::Connecting => {
1315 stats.num_active += 1;
1316 stats.download_speed += download.status.progress.download_speed;
1317 stats.upload_speed += download.status.progress.upload_speed;
1318 }
1319 DownloadState::Queued => {
1320 stats.num_waiting += 1;
1321 }
1322 DownloadState::Paused | DownloadState::Completed | DownloadState::Error { .. } => {
1323 stats.num_stopped += 1;
1324 }
1325 }
1326 }
1327
1328 stats
1329 }
1330
1331 pub fn subscribe(&self) -> broadcast::Receiver<DownloadEvent> {
1333 self.event_tx.subscribe()
1334 }
1335
1336 pub fn set_config(&self, config: EngineConfig) -> Result<()> {
1338 config.validate()?;
1339
1340 *self.config.write() = config;
1344 Ok(())
1345 }
1346
1347 pub fn get_config(&self) -> EngineConfig {
1349 self.config.read().clone()
1350 }
1351
1352 pub fn set_priority(&self, id: DownloadId, priority: DownloadPriority) -> Result<()> {
1360 let status_to_save = {
1362 let mut downloads = self.downloads.write();
1363 let download = downloads
1364 .get_mut(&id)
1365 .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1366 download.status.priority = priority;
1367 download.status.clone()
1368 };
1369
1370 self.priority_queue.set_priority(id, priority);
1372
1373 if let Some(storage) = self.storage.as_ref().map(Arc::clone) {
1375 tokio::spawn(async move {
1376 if let Err(e) = storage.save_download(&status_to_save).await {
1377 tracing::debug!("Failed to persist priority change for {}: {}", id, e);
1378 }
1379 });
1380 }
1381
1382 Ok(())
1383 }
1384
1385 pub fn get_priority(&self, id: DownloadId) -> Option<DownloadPriority> {
1387 self.downloads.read().get(&id).map(|d| d.status.priority)
1388 }
1389
1390 pub fn get_bandwidth_limits(&self) -> BandwidthLimits {
1392 self.scheduler.read().get_limits()
1393 }
1394
1395 pub fn set_schedule_rules(&self, rules: Vec<crate::scheduler::ScheduleRule>) {
1399 self.scheduler.write().set_rules(rules);
1400 }
1401
1402 pub fn get_schedule_rules(&self) -> Vec<crate::scheduler::ScheduleRule> {
1404 self.scheduler.read().rules().to_vec()
1405 }
1406
1407 pub async fn shutdown(&self) -> Result<()> {
1409 self.shutdown.cancel();
1411
1412 let handles: Vec<_> = {
1414 let mut downloads = self.downloads.write();
1415 downloads
1416 .values_mut()
1417 .filter_map(|d| d.handle.take())
1418 .collect()
1419 };
1420
1421 for handle in handles {
1422 match handle {
1423 DownloadHandle::Http(h) => {
1424 h.cancel_token.cancel();
1425 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), h.task).await;
1427 }
1428 DownloadHandle::Torrent(h) => {
1429 drop(h.downloader.stop());
1430 h.progress_task.abort();
1431 let _ = tokio::time::timeout(std::time::Duration::from_secs(5), h.task).await;
1433 }
1434 }
1435 }
1436
1437 Ok(())
1438 }
1439
1440 fn update_state(&self, id: DownloadId, new_state: DownloadState) -> Result<()> {
1442 let mut downloads = self.downloads.write();
1443 let download = downloads
1444 .get_mut(&id)
1445 .ok_or_else(|| EngineError::NotFound(id.to_string()))?;
1446
1447 let old_state = download.status.state.clone();
1448 download.status.state = new_state.clone();
1449
1450 let _ = self.event_tx.send(DownloadEvent::StateChanged {
1451 id,
1452 old_state,
1453 new_state,
1454 });
1455
1456 Ok(())
1457 }
1458}
1459
1460impl Drop for DownloadEngine {
1461 fn drop(&mut self) {
1462 self.shutdown.cancel();
1464 }
1465}