Skip to main content

fast_down_gui/core/
download.rs

1use crate::{
2    fmt::{format_size, format_time},
3    persist::{self, DatabaseEntry, Status},
4    ui::DownloadConfig,
5    utils::{auto_ext, sanitize},
6};
7use fast_down_ffi::{Event, Total, create_channel, prefetch, unique_path::gen_unique_path};
8use file_alloc::FileAlloc;
9use parking_lot::Mutex;
10use slint::SharedString;
11use std::{
12    borrow::Cow,
13    ops::Range,
14    sync::Arc,
15    time::{Duration, Instant},
16};
17use tokio::fs::{self, OpenOptions};
18use tokio_util::sync::CancellationToken;
19use tracing::{error, info, warn};
20use url::Url;
21
22pub enum DownloadEvent {
23    Info(Box<DatabaseEntry>),
24    Progress(ProgressInfo),
25    Flushing,
26    FlushError(SharedString),
27    End { is_cancelled: bool },
28}
29
30pub struct ProgressInfo {
31    pub downloaded: SharedString,
32    pub speed: SharedString,
33    pub avg_speed: SharedString,
34    pub time: SharedString,
35    pub remaining_time: SharedString,
36    pub remaining_size: SharedString,
37    pub percentage: SharedString,
38    pub elapsed: Duration,
39    pub progress: Vec<Range<u64>>,
40}
41
42pub async fn download(
43    url: Url,
44    config: &DownloadConfig,
45    cancel_token: CancellationToken,
46    mut entry: Option<DatabaseEntry>,
47    mut on_event: impl FnMut(DownloadEvent) + Send + Sync + 'static,
48) -> color_eyre::Result<()> {
49    info!(url = url.as_str(), config = ?config, "启动下载");
50    let result = async {
51        let file_exists = matches!(&entry, Some(entry) if fs::try_exists(&entry.file_path).await.unwrap_or(false));
52        if !file_exists {
53            entry = None
54        }
55        let config: persist::DownloadConfig = config.into();
56        let progress = Arc::new(Mutex::new(
57            entry
58                .as_ref()
59                .map(|e| e.progress.clone())
60                .unwrap_or_default(),
61        ));
62        let pre_allocate = config.pre_allocate;
63        let download_config = fast_down_ffi::Config {
64            retry_times: config.retry_times,
65            threads: config.threads,
66            proxy: config.proxy.clone(),
67            headers: config.headers.clone(),
68            min_chunk_size: config.min_chunk_size,
69            write_buffer_size: config.write_buffer_size,
70            write_queue_cap: config.write_queue_cap,
71            retry_gap: config.retry_gap,
72            pull_timeout: config.pull_timeout,
73            accept_invalid_certs: config.accept_invalid_certs,
74            accept_invalid_hostnames: config.accept_invalid_hostnames,
75            write_method: config.write_method.clone(),
76            local_address: config.local_address.clone(),
77            max_speculative: config.max_speculative,
78            downloaded_chunk: progress.clone(),
79            chunk_window: config.chunk_window,
80        };
81        let elapsed = entry.as_ref().map(|e| e.elapsed).unwrap_or_default();
82        let (tx, rx) = create_channel();
83        let task = prefetch(url.clone(), download_config, tx).await?;
84        info!(info = ?task.info, "获取元数据成功");
85        let total_size = task.info.size;
86        let (save_path, entry) = if let Some(entry) = entry
87            && fs::try_exists(&entry.file_path).await.unwrap_or(false)
88        {
89            (entry.file_path.clone(), entry)
90        } else {
91            let file_name = sanitize(
92                if config.file_name.is_empty() {
93                    auto_ext(&task.info.raw_name, task.info.content_type.as_deref())
94                } else {
95                    Cow::Borrowed(config.file_name.as_str())
96                },
97                248,
98            );
99            let save_dir = soft_canonicalize::soft_canonicalize(
100                if config.save_dir.to_string_lossy().is_empty() {
101                    dirs::download_dir().unwrap_or_default()
102                } else {
103                    config.save_dir.clone()
104                },
105            )?;
106            let _ = fs::create_dir_all(&save_dir).await;
107            let save_path = gen_unique_path(&save_dir.join(&file_name)).await?;
108            let file_name = save_path.file_name().unwrap().to_string_lossy().to_string();
109            (
110                save_path.clone(),
111                DatabaseEntry {
112                    file_name,
113                    file_path: save_path,
114                    file_size: total_size,
115                    file_id: task.info.file_id.clone(),
116                    progress: Vec::new(),
117                    elapsed: Duration::ZERO,
118                    url,
119                    config,
120                    status: Status::Paused,
121                },
122            )
123        };
124        on_event(DownloadEvent::Info(Box::new(entry)));
125        if pre_allocate && total_size > 1024 * 1024 && progress.lock().is_empty() {
126            let mut file = OpenOptions::new()
127                .write(true)
128                .create(true)
129                .truncate(false)
130                .open(&save_path)
131                .await?;
132            file.allocate(total_size).await?;
133        }
134        Ok::<_, color_eyre::Report>((
135            task,
136            save_path,
137            cancel_token.clone(),
138            elapsed,
139            total_size,
140            rx,
141        ))
142    };
143    let (task, save_path, cancel_token, elapsed, total_size, rx) = tokio::select! {
144        _ = cancel_token.cancelled() => {
145            on_event(DownloadEvent::End { is_cancelled: true });
146            return Ok(());
147        },
148        res = result => res?,
149    };
150    tokio::pin! {
151        let fut = task.start(save_path, cancel_token.clone());
152    };
153
154    let progress = &task.config.downloaded_chunk;
155    let mut smoothed_speed = 0.;
156    let alpha = 0.3;
157    let mut last_bytes = progress.lock().total();
158    let mut last_update = Instant::now();
159    let mut start = last_update - elapsed;
160
161    macro_rules! update_progress {
162        ($now:expr, $elapsed:expr, $total_elapsed:expr) => {{
163            let downloaded = progress.lock().total();
164            let bytes_diff = downloaded - last_bytes;
165            let instant_speed = bytes_diff as f64 / $elapsed;
166            smoothed_speed = if smoothed_speed == 0. {
167                instant_speed
168            } else {
169                alpha * instant_speed + (1.0 - alpha) * smoothed_speed
170            };
171            let avg_speed = downloaded as f64 / $total_elapsed.as_secs_f64();
172            let remaining_size = total_size.saturating_sub(downloaded);
173            let remaining_time = remaining_size as f64 / smoothed_speed;
174            let percentage = format!("{:.2}%", downloaded as f64 / total_size as f64 * 100.0);
175            on_event(DownloadEvent::Progress(ProgressInfo {
176                downloaded: format_size(downloaded as f64).into(),
177                speed: format!("{}/s", format_size(smoothed_speed)).into(),
178                avg_speed: format!("{}/s", format_size(avg_speed)).into(),
179                time: format_time($total_elapsed.as_secs()).into(),
180                remaining_time: format_time(remaining_time as u64).into(),
181                remaining_size: format_size(remaining_size as f64).into(),
182                percentage: percentage.into(),
183                elapsed: $total_elapsed,
184                progress: progress.lock().clone(),
185            }));
186            downloaded
187        }};
188    }
189
190    let mut is_first = true;
191    loop {
192        tokio::select! {
193            res = &mut fut => {
194                res?;
195                break;
196            }
197            event = rx.recv() => {
198                let e = match event {
199                    Ok(e) => e,
200                    Err(_) => break,
201                };
202                match e {
203                    Event::PrefetchError(e) => error!(err = e, "获取元数据失败"),
204                    Event::Pulling(id) => info!(id = id, "开始下载"),
205                    Event::PullProgress(_, _) => {}
206                    Event::PullError(id, e) => warn!(err = e, id = id, "下载数据出错"),
207                    Event::PullTimeout(id) => warn!("拉取数据超时 {id}"),
208                    Event::Pushing(_, _) => {},
209                    Event::PushError(id, r, e) => error!(err = e, id = id, start = r.start, end = r.end, "写入数据出错"),
210                    Event::Flushing => {
211                        info!("开始刷写磁盘");
212                        if is_first {
213                            is_first = false;
214                            let now = Instant::now();
215                            let elapsed = (now - last_update).as_secs_f64();
216                            let total_elapsed = now - start;
217                            update_progress!(now, elapsed, total_elapsed);
218                        }
219                        on_event(DownloadEvent::Flushing);
220                    },
221                    Event::FlushError(e) => {
222                        error!(err = e, "磁盘刷写失败");
223                        on_event(DownloadEvent::FlushError(e.into()));
224                    },
225                    Event::Finished(id) => info!(id = id, "下载完成"),
226                    Event::PushProgress(_, p) => {
227                        if p.start == 0 {
228                            smoothed_speed = 0.;
229                            last_update = Instant::now();
230                            start = last_update;
231                            last_bytes = 0;
232                        }
233                        let now = Instant::now();
234                        let elapsed = (now - last_update).as_secs_f64();
235                        let total_elapsed = now - start;
236                        if elapsed > 1. {
237                            last_bytes = update_progress!(now, elapsed, total_elapsed);
238                            last_update = now;
239                        }
240                    }
241                }
242            }
243        }
244    }
245    on_event(DownloadEvent::End {
246        is_cancelled: cancel_token.is_cancelled(),
247    });
248    Ok(())
249}