Skip to main content

fast_down_gui/core/
download.rs

1use crate::{
2    fmt::{format_size, format_time},
3    persist::{self, DatabaseEntry, Status},
4    ui::DownloadConfig,
5    utils::sanitize,
6};
7use fast_down_ffi::{
8    Event, create_channel,
9    fast_down::{Merge, Total, utils::gen_unique_path},
10    prefetch,
11};
12use slint::SharedString;
13use std::{ops::Range, time::Duration};
14use tokio::{fs, time::Instant};
15use tokio_util::sync::CancellationToken;
16use tracing::{error, info, warn};
17use url::Url;
18
19pub enum DownloadEvent {
20    Info(Box<DatabaseEntry>),
21    Progress(ProgressInfo),
22    End { is_cancelled: bool },
23}
24
25pub struct ProgressInfo {
26    pub downloaded: SharedString,
27    pub speed: SharedString,
28    pub avg_speed: SharedString,
29    pub time: SharedString,
30    pub remaining_time: SharedString,
31    pub remaining_size: SharedString,
32    pub percentage: SharedString,
33    pub elapsed: Duration,
34    pub progress: Vec<Range<u64>>,
35}
36
37pub async fn download(
38    url: Url,
39    config: &DownloadConfig,
40    cancel_token: CancellationToken,
41    mut entry: Option<DatabaseEntry>,
42    mut on_event: impl FnMut(DownloadEvent) + Send + Sync + 'static,
43) -> color_eyre::Result<()> {
44    info!(url = url.as_str(), config = ?config, "启动下载");
45    let result = async {
46        let file_exists = matches!(&entry, Some(entry) if fs::try_exists(&entry.file_path).await.unwrap_or(false));
47        if !file_exists {
48            entry = None
49        }
50        let config: persist::DownloadConfig = config.into();
51        let progress = entry
52            .as_ref()
53            .map(|e| e.progress.clone())
54            .unwrap_or_default();
55        let download_config = fast_down_ffi::Config {
56            retry_times: config.retry_times,
57            threads: config.threads,
58            proxy: config.proxy.clone(),
59            headers: config.headers.clone(),
60            min_chunk_size: config.min_chunk_size,
61            write_buffer_size: config.write_buffer_size,
62            write_queue_cap: config.write_queue_cap,
63            retry_gap: config.retry_gap,
64            pull_timeout: config.pull_timeout,
65            accept_invalid_certs: config.accept_invalid_certs,
66            accept_invalid_hostnames: config.accept_invalid_hostnames,
67            write_method: config.write_method.clone(),
68            local_address: config.local_address.clone(),
69            max_speculative: config.max_speculative,
70            downloaded_chunk: progress.clone(),
71            chunk_window: config.chunk_window,
72        };
73        let elapsed = entry.as_ref().map(|e| e.elapsed).unwrap_or_default();
74        let (tx, rx) = create_channel();
75        let task = prefetch(url.clone(), download_config, tx).await?;
76        info!(info = ?task.info, "获取元数据成功");
77        let total_size = task.info.size;
78        let (save_path, entry) = if let Some(entry) = entry
79            && fs::try_exists(&entry.file_path).await.unwrap_or(false)
80        {
81            (entry.file_path.clone(), entry)
82        } else {
83            let file_name = sanitize(task.info.raw_name.clone(), 248);
84            let save_dir = soft_canonicalize::soft_canonicalize(
85                if config.save_dir.to_string_lossy().is_empty() {
86                    dirs::download_dir().unwrap_or_default()
87                } else {
88                    config.save_dir.clone()
89                },
90            )?;
91            let _ = fs::create_dir_all(&save_dir).await;
92            let save_path = gen_unique_path(&save_dir.join(&file_name)).await?;
93            let file_name = save_path.file_name().unwrap().to_string_lossy().to_string();
94            (
95                save_path.clone(),
96                DatabaseEntry {
97                    file_name,
98                    file_path: save_path,
99                    file_size: total_size,
100                    file_id: task.info.file_id.clone(),
101                    progress: Vec::new(),
102                    elapsed: Duration::ZERO,
103                    url,
104                    config,
105                    status: Status::Paused,
106                },
107            )
108        };
109        on_event(DownloadEvent::Info(Box::new(entry)));
110        let fut = task.start(save_path, cancel_token.clone());
111        Ok::<_, color_eyre::Report>((fut, progress, elapsed, total_size, rx))
112    };
113    let (fut, mut progress, elapsed, total_size, rx) = tokio::select! {
114        _ = cancel_token.cancelled() => {
115            on_event(DownloadEvent::End { is_cancelled: true });
116            return Ok(());
117        },
118        res = result => res?,
119    };
120    tokio::pin!(fut);
121    let mut smoothed_speed = 0.;
122    let alpha = 0.3;
123    let mut last_bytes = progress.total();
124    let mut last_update = Instant::now();
125    let mut start = last_update - elapsed;
126
127    macro_rules! update_progress {
128        ($now:expr, $elapsed:expr, $total_elapsed:expr) => {{
129            let downloaded = progress.total();
130            let bytes_diff = downloaded - last_bytes;
131            let instant_speed = bytes_diff as f64 / $elapsed;
132            smoothed_speed = if smoothed_speed == 0. {
133                instant_speed
134            } else {
135                alpha * instant_speed + (1.0 - alpha) * smoothed_speed
136            };
137            let avg_speed = downloaded as f64 / $total_elapsed.as_secs_f64();
138            let remaining_size = total_size.saturating_sub(downloaded);
139            let remaining_time = remaining_size as f64 / smoothed_speed;
140            let percentage = format!("{:.2}%", downloaded as f64 / total_size as f64 * 100.0);
141            on_event(DownloadEvent::Progress(ProgressInfo {
142                downloaded: format_size(downloaded as f64).into(),
143                speed: format!("{}/s", format_size(smoothed_speed)).into(),
144                avg_speed: format!("{}/s", format_size(avg_speed)).into(),
145                time: format_time($total_elapsed.as_secs()).into(),
146                remaining_time: format_time(remaining_time as u64).into(),
147                remaining_size: format_size(remaining_size as f64).into(),
148                percentage: percentage.into(),
149                elapsed: $total_elapsed,
150                progress: progress.clone(),
151            }));
152            downloaded
153        }};
154    }
155
156    loop {
157        tokio::select! {
158            res = &mut fut => {
159                res?;
160                break;
161            }
162            event = rx.recv() => {
163                let e = match event {
164                    Ok(e) => e,
165                    Err(_) => break,
166                };
167                match e {
168                    Event::PrefetchError(e) => error!(err = e, "获取元数据失败"),
169                    Event::Pulling(id) => info!(id = id, "开始下载"),
170                    Event::PullProgress(_, _) => {}
171                    Event::PullError(id, e) => warn!(err = e, id = id, "下载数据出错"),
172                    Event::PullTimeout(id) => warn!("拉取数据超时 {id}"),
173                    Event::PushError(id, e) => error!(err = e, id = id, "写入数据出错"),
174                    Event::FlushError(e) => error!(err = e, "磁盘刷写失败"),
175                    Event::Finished(id) => info!(id = id, "下载完成"),
176                    Event::PushProgress(_, p) => {
177                        if p.start == 0 {
178                            progress.clear();
179                            smoothed_speed = 0.;
180                            last_update = Instant::now();
181                            start = last_update;
182                            last_bytes = 0;
183                        }
184                        progress.merge_progress(p);
185                        let now = Instant::now();
186                        let elapsed = (now - last_update).as_secs_f64();
187                        let total_elapsed = now - start;
188                        if elapsed > 1. {
189                            last_bytes = update_progress!(now, elapsed, total_elapsed);
190                            last_update = now;
191                        }
192                    }
193                }
194            }
195        }
196    }
197
198    let now = Instant::now();
199    let elapsed = (now - last_update).as_secs_f64();
200    let total_elapsed = now - start;
201    update_progress!(now, elapsed, total_elapsed);
202    on_event(DownloadEvent::End {
203        is_cancelled: cancel_token.is_cancelled(),
204    });
205    Ok(())
206}