1use crate::{
2 fmt::{format_size, format_time},
3 persist::{self, DatabaseEntry, Status},
4 ui::DownloadConfig,
5 utils::sanitize,
6};
7use fast_down_ffi::{
8 Event, create_channel,
9 fast_down::{Merge, Total, utils::gen_unique_path},
10 prefetch,
11};
12use slint::SharedString;
13use std::{ops::Range, time::Duration};
14use tokio::{fs, time::Instant};
15use tokio_util::sync::CancellationToken;
16use tracing::{error, info, warn};
17use url::Url;
18
19pub enum DownloadEvent {
20 Info(Box<DatabaseEntry>),
21 Progress(ProgressInfo),
22 End { is_cancelled: bool },
23}
24
25pub struct ProgressInfo {
26 pub downloaded: SharedString,
27 pub speed: SharedString,
28 pub avg_speed: SharedString,
29 pub time: SharedString,
30 pub remaining_time: SharedString,
31 pub remaining_size: SharedString,
32 pub percentage: SharedString,
33 pub elapsed: Duration,
34 pub progress: Vec<Range<u64>>,
35}
36
37pub async fn download(
38 url: Url,
39 config: &DownloadConfig,
40 cancel_token: CancellationToken,
41 mut entry: Option<DatabaseEntry>,
42 mut on_event: impl FnMut(DownloadEvent) + Send + Sync + 'static,
43) -> color_eyre::Result<()> {
44 info!(url = url.as_str(), config = ?config, "启动下载");
45 let result = async {
46 let file_exists = matches!(&entry, Some(entry) if fs::try_exists(&entry.file_path).await.unwrap_or(false));
47 if !file_exists {
48 entry = None
49 }
50 let config: persist::DownloadConfig = config.into();
51 let progress = entry
52 .as_ref()
53 .map(|e| e.progress.clone())
54 .unwrap_or_default();
55 let download_config = fast_down_ffi::Config {
56 retry_times: config.retry_times,
57 threads: config.threads,
58 proxy: config.proxy.clone(),
59 headers: config.headers.clone(),
60 min_chunk_size: config.min_chunk_size,
61 write_buffer_size: config.write_buffer_size,
62 write_queue_cap: config.write_queue_cap,
63 retry_gap: config.retry_gap,
64 pull_timeout: config.pull_timeout,
65 accept_invalid_certs: config.accept_invalid_certs,
66 accept_invalid_hostnames: config.accept_invalid_hostnames,
67 write_method: config.write_method.clone(),
68 local_address: config.local_address.clone(),
69 max_speculative: config.max_speculative,
70 downloaded_chunk: progress.clone(),
71 chunk_window: config.chunk_window,
72 };
73 let elapsed = entry.as_ref().map(|e| e.elapsed).unwrap_or_default();
74 let (tx, rx) = create_channel();
75 let task = prefetch(url.clone(), download_config, tx).await?;
76 info!(info = ?task.info, "获取元数据成功");
77 let total_size = task.info.size;
78 let (save_path, entry) = if let Some(entry) = entry
79 && fs::try_exists(&entry.file_path).await.unwrap_or(false)
80 {
81 (entry.file_path.clone(), entry)
82 } else {
83 let file_name = sanitize(task.info.raw_name.clone(), 248);
84 let save_dir = soft_canonicalize::soft_canonicalize(
85 if config.save_dir.to_string_lossy().is_empty() {
86 dirs::download_dir().unwrap_or_default()
87 } else {
88 config.save_dir.clone()
89 },
90 )?;
91 let _ = fs::create_dir_all(&save_dir).await;
92 let save_path = gen_unique_path(&save_dir.join(&file_name)).await?;
93 let file_name = save_path.file_name().unwrap().to_string_lossy().to_string();
94 (
95 save_path.clone(),
96 DatabaseEntry {
97 file_name,
98 file_path: save_path,
99 file_size: total_size,
100 file_id: task.info.file_id.clone(),
101 progress: Vec::new(),
102 elapsed: Duration::ZERO,
103 url,
104 config,
105 status: Status::Paused,
106 },
107 )
108 };
109 on_event(DownloadEvent::Info(Box::new(entry)));
110 let fut = task.start(save_path, cancel_token.clone());
111 Ok::<_, color_eyre::Report>((fut, progress, elapsed, total_size, rx))
112 };
113 let (fut, mut progress, elapsed, total_size, rx) = tokio::select! {
114 _ = cancel_token.cancelled() => {
115 on_event(DownloadEvent::End { is_cancelled: true });
116 return Ok(());
117 },
118 res = result => res?,
119 };
120 tokio::pin!(fut);
121 let mut smoothed_speed = 0.;
122 let alpha = 0.3;
123 let mut last_bytes = progress.total();
124 let mut last_update = Instant::now();
125 let mut start = last_update - elapsed;
126
127 macro_rules! update_progress {
128 ($now:expr, $elapsed:expr, $total_elapsed:expr) => {{
129 let downloaded = progress.total();
130 let bytes_diff = downloaded - last_bytes;
131 let instant_speed = bytes_diff as f64 / $elapsed;
132 smoothed_speed = if smoothed_speed == 0. {
133 instant_speed
134 } else {
135 alpha * instant_speed + (1.0 - alpha) * smoothed_speed
136 };
137 let avg_speed = downloaded as f64 / $total_elapsed.as_secs_f64();
138 let remaining_size = total_size.saturating_sub(downloaded);
139 let remaining_time = remaining_size as f64 / smoothed_speed;
140 let percentage = format!("{:.2}%", downloaded as f64 / total_size as f64 * 100.0);
141 on_event(DownloadEvent::Progress(ProgressInfo {
142 downloaded: format_size(downloaded as f64).into(),
143 speed: format!("{}/s", format_size(smoothed_speed)).into(),
144 avg_speed: format!("{}/s", format_size(avg_speed)).into(),
145 time: format_time($total_elapsed.as_secs()).into(),
146 remaining_time: format_time(remaining_time as u64).into(),
147 remaining_size: format_size(remaining_size as f64).into(),
148 percentage: percentage.into(),
149 elapsed: $total_elapsed,
150 progress: progress.clone(),
151 }));
152 downloaded
153 }};
154 }
155
156 loop {
157 tokio::select! {
158 res = &mut fut => {
159 res?;
160 break;
161 }
162 event = rx.recv() => {
163 let e = match event {
164 Ok(e) => e,
165 Err(_) => break,
166 };
167 match e {
168 Event::PrefetchError(e) => error!(err = e, "获取元数据失败"),
169 Event::Pulling(id) => info!(id = id, "开始下载"),
170 Event::PullProgress(_, _) => {}
171 Event::PullError(id, e) => warn!(err = e, id = id, "下载数据出错"),
172 Event::PullTimeout(id) => warn!("拉取数据超时 {id}"),
173 Event::PushError(id, e) => error!(err = e, id = id, "写入数据出错"),
174 Event::FlushError(e) => error!(err = e, "磁盘刷写失败"),
175 Event::Finished(id) => info!(id = id, "下载完成"),
176 Event::PushProgress(_, p) => {
177 if p.start == 0 {
178 progress.clear();
179 smoothed_speed = 0.;
180 last_update = Instant::now();
181 start = last_update;
182 last_bytes = 0;
183 }
184 progress.merge_progress(p);
185 let now = Instant::now();
186 let elapsed = (now - last_update).as_secs_f64();
187 let total_elapsed = now - start;
188 if elapsed > 1. {
189 last_bytes = update_progress!(now, elapsed, total_elapsed);
190 last_update = now;
191 }
192 }
193 }
194 }
195 }
196 }
197
198 let now = Instant::now();
199 let elapsed = (now - last_update).as_secs_f64();
200 let total_elapsed = now - start;
201 update_progress!(now, elapsed, total_elapsed);
202 on_event(DownloadEvent::End {
203 is_cancelled: cancel_token.is_cancelled(),
204 });
205 Ok(())
206}