1use crate::{
2 fmt::{format_size, format_time},
3 persist::{self, DatabaseEntry, Status},
4 ui::DownloadConfig,
5 utils::{auto_ext, sanitize},
6};
7use fast_down_ffi::{Event, Total, create_channel, prefetch, unique_path::gen_unique_path};
8use file_alloc::FileAlloc;
9use parking_lot::Mutex;
10use slint::SharedString;
11use std::{
12 borrow::Cow,
13 ops::Range,
14 sync::Arc,
15 time::{Duration, Instant},
16};
17use tokio::fs::{self, OpenOptions};
18use tokio_util::sync::CancellationToken;
19use tracing::{error, info, warn};
20use url::Url;
21
22pub enum DownloadEvent {
23 Info(Box<DatabaseEntry>),
24 Progress(ProgressInfo),
25 Flushing,
26 FlushError(SharedString),
27 End { is_cancelled: bool },
28}
29
30pub struct ProgressInfo {
31 pub downloaded: SharedString,
32 pub speed: SharedString,
33 pub avg_speed: SharedString,
34 pub time: SharedString,
35 pub remaining_time: SharedString,
36 pub remaining_size: SharedString,
37 pub percentage: SharedString,
38 pub elapsed: Duration,
39 pub progress: Vec<Range<u64>>,
40}
41
42pub async fn download(
43 url: Url,
44 config: &DownloadConfig,
45 cancel_token: CancellationToken,
46 mut entry: Option<DatabaseEntry>,
47 mut on_event: impl FnMut(DownloadEvent) + Send + Sync + 'static,
48) -> color_eyre::Result<()> {
49 info!(url = url.as_str(), config = ?config, "启动下载");
50 let result = async {
51 let file_exists = matches!(&entry, Some(entry) if fs::try_exists(&entry.file_path).await.unwrap_or(false));
52 if !file_exists {
53 entry = None
54 }
55 let config: persist::DownloadConfig = config.into();
56 let progress = Arc::new(Mutex::new(
57 entry
58 .as_ref()
59 .map(|e| e.progress.clone())
60 .unwrap_or_default(),
61 ));
62 let pre_allocate = config.pre_allocate;
63 let download_config = fast_down_ffi::Config {
64 retry_times: config.retry_times,
65 threads: config.threads,
66 proxy: config.proxy.clone(),
67 headers: config.headers.clone(),
68 min_chunk_size: config.min_chunk_size,
69 write_buffer_size: config.write_buffer_size,
70 write_queue_cap: config.write_queue_cap,
71 retry_gap: config.retry_gap,
72 pull_timeout: config.pull_timeout,
73 accept_invalid_certs: config.accept_invalid_certs,
74 accept_invalid_hostnames: config.accept_invalid_hostnames,
75 write_method: config.write_method.clone(),
76 local_address: config.local_address.clone(),
77 max_speculative: config.max_speculative,
78 downloaded_chunk: progress.clone(),
79 chunk_window: config.chunk_window,
80 };
81 let elapsed = entry.as_ref().map(|e| e.elapsed).unwrap_or_default();
82 let (tx, rx) = create_channel();
83 let task = prefetch(url.clone(), download_config, tx).await?;
84 info!(info = ?task.info, "获取元数据成功");
85 let total_size = task.info.size;
86 let (save_path, entry) = if let Some(entry) = entry
87 && fs::try_exists(&entry.file_path).await.unwrap_or(false)
88 {
89 (entry.file_path.clone(), entry)
90 } else {
91 let file_name = sanitize(
92 if config.file_name.is_empty() {
93 auto_ext(&task.info.raw_name, task.info.content_type.as_deref())
94 } else {
95 Cow::Borrowed(config.file_name.as_str())
96 },
97 248,
98 );
99 let save_dir = soft_canonicalize::soft_canonicalize(
100 if config.save_dir.to_string_lossy().is_empty() {
101 dirs::download_dir().unwrap_or_default()
102 } else {
103 config.save_dir.clone()
104 },
105 )?;
106 let _ = fs::create_dir_all(&save_dir).await;
107 let save_path = gen_unique_path(&save_dir.join(&file_name)).await?;
108 let file_name = save_path.file_name().unwrap().to_string_lossy().to_string();
109 (
110 save_path.clone(),
111 DatabaseEntry {
112 file_name,
113 file_path: save_path,
114 file_size: total_size,
115 file_id: task.info.file_id.clone(),
116 progress: Vec::new(),
117 elapsed: Duration::ZERO,
118 url,
119 config,
120 status: Status::Paused,
121 },
122 )
123 };
124 on_event(DownloadEvent::Info(Box::new(entry)));
125 if pre_allocate && total_size > 1024 * 1024 && progress.lock().is_empty() {
126 let mut file = OpenOptions::new()
127 .write(true)
128 .create(true)
129 .truncate(false)
130 .open(&save_path)
131 .await?;
132 file.allocate(total_size).await?;
133 }
134 Ok::<_, color_eyre::Report>((
135 task,
136 save_path,
137 cancel_token.clone(),
138 elapsed,
139 total_size,
140 rx,
141 ))
142 };
143 let (task, save_path, cancel_token, elapsed, total_size, rx) = tokio::select! {
144 _ = cancel_token.cancelled() => {
145 on_event(DownloadEvent::End { is_cancelled: true });
146 return Ok(());
147 },
148 res = result => res?,
149 };
150 tokio::pin! {
151 let fut = task.start(save_path, cancel_token.clone());
152 };
153
154 let progress = &task.config.downloaded_chunk;
155 let mut smoothed_speed = 0.;
156 let alpha = 0.3;
157 let mut last_bytes = progress.lock().total();
158 let mut last_update = Instant::now();
159 let mut start = last_update - elapsed;
160
161 macro_rules! update_progress {
162 ($now:expr, $elapsed:expr, $total_elapsed:expr) => {{
163 let downloaded = progress.lock().total();
164 let bytes_diff = downloaded - last_bytes;
165 let instant_speed = bytes_diff as f64 / $elapsed;
166 smoothed_speed = if smoothed_speed == 0. {
167 instant_speed
168 } else {
169 alpha * instant_speed + (1.0 - alpha) * smoothed_speed
170 };
171 let avg_speed = downloaded as f64 / $total_elapsed.as_secs_f64();
172 let remaining_size = total_size.saturating_sub(downloaded);
173 let remaining_time = remaining_size as f64 / smoothed_speed;
174 let percentage = format!("{:.2}%", downloaded as f64 / total_size as f64 * 100.0);
175 on_event(DownloadEvent::Progress(ProgressInfo {
176 downloaded: format_size(downloaded as f64).into(),
177 speed: format!("{}/s", format_size(smoothed_speed)).into(),
178 avg_speed: format!("{}/s", format_size(avg_speed)).into(),
179 time: format_time($total_elapsed.as_secs()).into(),
180 remaining_time: format_time(remaining_time as u64).into(),
181 remaining_size: format_size(remaining_size as f64).into(),
182 percentage: percentage.into(),
183 elapsed: $total_elapsed,
184 progress: progress.lock().clone(),
185 }));
186 downloaded
187 }};
188 }
189
190 let mut is_first = true;
191 loop {
192 tokio::select! {
193 res = &mut fut => {
194 res?;
195 break;
196 }
197 event = rx.recv() => {
198 let e = match event {
199 Ok(e) => e,
200 Err(_) => break,
201 };
202 match e {
203 Event::PrefetchError(e) => error!(err = e, "获取元数据失败"),
204 Event::Pulling(id) => info!(id = id, "开始下载"),
205 Event::PullProgress(_, _) => {}
206 Event::PullError(id, e) => warn!(err = e, id = id, "下载数据出错"),
207 Event::PullTimeout(id) => warn!("拉取数据超时 {id}"),
208 Event::Pushing(_, _) => {},
209 Event::PushError(id, r, e) => error!(err = e, id = id, start = r.start, end = r.end, "写入数据出错"),
210 Event::Flushing => {
211 info!("开始刷写磁盘");
212 if is_first {
213 is_first = false;
214 let now = Instant::now();
215 let elapsed = (now - last_update).as_secs_f64();
216 let total_elapsed = now - start;
217 update_progress!(now, elapsed, total_elapsed);
218 }
219 on_event(DownloadEvent::Flushing);
220 },
221 Event::FlushError(e) => {
222 error!(err = e, "磁盘刷写失败");
223 on_event(DownloadEvent::FlushError(e.into()));
224 },
225 Event::Finished(id) => info!(id = id, "下载完成"),
226 Event::PushProgress(_, p) => {
227 if p.start == 0 {
228 smoothed_speed = 0.;
229 last_update = Instant::now();
230 start = last_update;
231 last_bytes = 0;
232 }
233 let now = Instant::now();
234 let elapsed = (now - last_update).as_secs_f64();
235 let total_elapsed = now - start;
236 if elapsed > 1. {
237 last_bytes = update_progress!(now, elapsed, total_elapsed);
238 last_update = now;
239 }
240 }
241 }
242 }
243 }
244 }
245 on_event(DownloadEvent::End {
246 is_cancelled: cancel_token.is_cancelled(),
247 });
248 Ok(())
249}