Skip to main content

fast_down_ffi/
download.rs

1use crate::{Config, Error, Event, Tx, WriteMethod};
2use fast_down::{
3    BoxPusher, UrlInfo,
4    file::FilePusher,
5    http::Prefetch,
6    invert,
7    multi::{self, download_multi},
8    single::{self, download_single},
9    utils::{FastDownPuller, FastDownPullerOptions, build_client},
10};
11use parking_lot::Mutex;
12use reqwest::{Response, header::HeaderMap};
13use std::{
14    net::IpAddr,
15    path::{Path, PathBuf},
16    sync::Arc,
17};
18use tokio::fs::OpenOptions;
19use tokio_util::sync::CancellationToken;
20use url::Url;
21
22pub struct DownloadTask {
23    pub info: UrlInfo,
24    pub config: Config,
25    pub headers: Arc<HeaderMap>,
26    pub local_addr: Arc<[IpAddr]>,
27    pub resp: Option<Arc<Mutex<Option<Response>>>>,
28    pub tx: Tx,
29}
30
31/// 这个函数允许通过 drop Future 的方式来取消
32pub async fn prefetch(url: Url, config: Config, tx: Tx) -> Result<DownloadTask, Error> {
33    let headers: Arc<_> = config
34        .headers
35        .iter()
36        .map(|(k, v)| (k.parse(), v.parse()))
37        .filter_map(|(k, v)| k.ok().zip(v.ok()))
38        .collect::<HeaderMap>()
39        .into();
40    let local_addr: Arc<[_]> = config.local_address.clone().into();
41    let client = build_client(
42        &headers,
43        config.proxy.as_deref(),
44        config.accept_invalid_certs,
45        config.accept_invalid_hostnames,
46        local_addr.first().copied(),
47    )?;
48    let mut retry_count = 0;
49    let (info, resp) = loop {
50        match client.prefetch(url.clone()).await {
51            Ok(t) => break t,
52            Err((e, t)) => {
53                let _ = tx.send(Event::PrefetchError(format!("{e:?}")));
54                retry_count += 1;
55                if retry_count >= config.retry_times {
56                    return Err(Error::PrefetchTimeout(e));
57                }
58                tokio::time::sleep(t.unwrap_or(config.retry_gap)).await;
59            }
60        }
61    };
62    Ok(DownloadTask {
63        config,
64        headers,
65        local_addr,
66        resp: Some(Arc::new(Mutex::new(Some(resp)))),
67        tx,
68        info,
69    })
70}
71
72impl DownloadTask {
73    /// 不能通过 drop Future 来终止这个函数,否则写入内容将会不完整
74    pub async fn start(
75        self,
76        save_path: PathBuf,
77        cancel_token: CancellationToken,
78    ) -> Result<(), Error> {
79        let Self {
80            info,
81            config,
82            headers,
83            local_addr,
84            resp,
85            tx,
86        } = self;
87        let pusher = get_pusher(
88            &info,
89            config.write_method,
90            config.write_buffer_size,
91            &save_path,
92        );
93        let pusher = tokio::select! {
94              () = cancel_token.cancelled() => return Ok(()),
95              pusher = pusher => pusher.map_err(Error::Io)?,
96        };
97        let puller = FastDownPuller::new(FastDownPullerOptions {
98            url: info.final_url,
99            headers,
100            proxy: config.proxy.as_deref(),
101            available_ips: local_addr,
102            accept_invalid_certs: config.accept_invalid_certs,
103            accept_invalid_hostnames: config.accept_invalid_hostnames,
104            file_id: info.file_id,
105            resp,
106        })?;
107        let threads = if info.fast_download {
108            config.threads.max(1)
109        } else {
110            1
111        };
112        let result = if info.fast_download {
113            download_multi(
114                puller,
115                pusher,
116                multi::DownloadOptions {
117                    download_chunks: invert(
118                        config.downloaded_chunk.into_iter(),
119                        info.size,
120                        config.chunk_window,
121                    ),
122                    retry_gap: config.retry_gap,
123                    concurrent: threads,
124                    pull_timeout: config.pull_timeout,
125                    push_queue_cap: config.write_queue_cap,
126                    min_chunk_size: config.min_chunk_size,
127                    max_speculative: config.max_speculative,
128                },
129            )
130        } else {
131            download_single(
132                puller,
133                pusher,
134                single::DownloadOptions {
135                    retry_gap: config.retry_gap,
136                    push_queue_cap: config.write_queue_cap,
137                },
138            )
139        };
140        loop {
141            tokio::select! {
142                () = cancel_token.cancelled() => {
143                    result.abort();
144                    break;
145                },
146                e = result.event_chain.recv() => match e {
147                    Ok(e) => {
148                        let _ = tx.send((&e).into());
149                    },
150                    Err(_) => break,
151                }
152            }
153        }
154        result.join().await?;
155        Ok(())
156    }
157}
158
159pub async fn get_pusher(
160    info: &UrlInfo,
161    write_method: WriteMethod,
162    buffer_size: usize,
163    save_path: &Path,
164) -> Result<BoxPusher, String> {
165    #[cfg(target_pointer_width = "64")]
166    if info.fast_download && write_method == WriteMethod::Mmap {
167        use fast_down::file::MmapFilePusher;
168        let pusher = BoxPusher::new(
169            MmapFilePusher::new(&save_path, info.size)
170                .await
171                .map_err(|e| format!("{e:?}"))?,
172        );
173        return Ok(pusher);
174    }
175    let file = OpenOptions::new()
176        .create(true)
177        .write(true)
178        .read(true)
179        .truncate(false)
180        .open(&save_path)
181        .await
182        .map_err(|e| format!("{e:?}"))?;
183    let pusher = FilePusher::new(file, info.size, buffer_size)
184        .await
185        .map_err(|e| format!("{e:?}"))?;
186    Ok(BoxPusher::new(pusher))
187}