1use crate::{Config, Error, Event, Tx, WriteMethod};
2use fast_down::{
3 BoxPusher, UrlInfo,
4 file::FilePusher,
5 http::Prefetch,
6 invert,
7 multi::{self, download_multi},
8 single::{self, download_single},
9 utils::{FastDownPuller, FastDownPullerOptions, build_client},
10};
11use parking_lot::Mutex;
12use reqwest::{Response, header::HeaderMap};
13use std::{
14 net::IpAddr,
15 path::{Path, PathBuf},
16 sync::Arc,
17};
18use tokio::fs::OpenOptions;
19use tokio_util::sync::CancellationToken;
20use url::Url;
21
22pub struct DownloadTask {
23 pub info: UrlInfo,
24 pub config: Config,
25 pub headers: Arc<HeaderMap>,
26 pub local_addr: Arc<[IpAddr]>,
27 pub resp: Option<Arc<Mutex<Option<Response>>>>,
28 pub tx: Tx,
29}
30
31pub async fn prefetch(url: Url, config: Config, tx: Tx) -> Result<DownloadTask, Error> {
33 let headers: Arc<_> = config
34 .headers
35 .iter()
36 .map(|(k, v)| (k.parse(), v.parse()))
37 .filter_map(|(k, v)| k.ok().zip(v.ok()))
38 .collect::<HeaderMap>()
39 .into();
40 let local_addr: Arc<[_]> = config.local_address.clone().into();
41 let client = build_client(
42 &headers,
43 config.proxy.as_deref(),
44 config.accept_invalid_certs,
45 config.accept_invalid_hostnames,
46 local_addr.first().copied(),
47 )?;
48 let mut retry_count = 0;
49 let (info, resp) = loop {
50 match client.prefetch(url.clone()).await {
51 Ok(t) => break t,
52 Err((e, t)) => {
53 let _ = tx.send(Event::PrefetchError(format!("{e:?}")));
54 retry_count += 1;
55 if retry_count >= config.retry_times {
56 return Err(Error::PrefetchTimeout(e));
57 }
58 tokio::time::sleep(t.unwrap_or(config.retry_gap)).await;
59 }
60 }
61 };
62 Ok(DownloadTask {
63 config,
64 headers,
65 local_addr,
66 resp: Some(Arc::new(Mutex::new(Some(resp)))),
67 tx,
68 info,
69 })
70}
71
72impl DownloadTask {
73 pub async fn start(
75 self,
76 save_path: PathBuf,
77 cancel_token: CancellationToken,
78 ) -> Result<(), Error> {
79 let Self {
80 info,
81 config,
82 headers,
83 local_addr,
84 resp,
85 tx,
86 } = self;
87 let pusher = get_pusher(
88 &info,
89 config.write_method,
90 config.write_buffer_size,
91 &save_path,
92 );
93 let pusher = tokio::select! {
94 () = cancel_token.cancelled() => return Ok(()),
95 pusher = pusher => pusher.map_err(Error::Io)?,
96 };
97 let puller = FastDownPuller::new(FastDownPullerOptions {
98 url: info.final_url,
99 headers,
100 proxy: config.proxy.as_deref(),
101 available_ips: local_addr,
102 accept_invalid_certs: config.accept_invalid_certs,
103 accept_invalid_hostnames: config.accept_invalid_hostnames,
104 file_id: info.file_id,
105 resp,
106 })?;
107 let threads = if info.fast_download {
108 config.threads.max(1)
109 } else {
110 1
111 };
112 let result = if info.fast_download {
113 download_multi(
114 puller,
115 pusher,
116 multi::DownloadOptions {
117 download_chunks: invert(
118 config.downloaded_chunk.into_iter(),
119 info.size,
120 config.chunk_window,
121 ),
122 retry_gap: config.retry_gap,
123 concurrent: threads,
124 pull_timeout: config.pull_timeout,
125 push_queue_cap: config.write_queue_cap,
126 min_chunk_size: config.min_chunk_size,
127 max_speculative: config.max_speculative,
128 },
129 )
130 } else {
131 download_single(
132 puller,
133 pusher,
134 single::DownloadOptions {
135 retry_gap: config.retry_gap,
136 push_queue_cap: config.write_queue_cap,
137 },
138 )
139 };
140 loop {
141 tokio::select! {
142 () = cancel_token.cancelled() => {
143 result.abort();
144 break;
145 },
146 e = result.event_chain.recv() => match e {
147 Ok(e) => {
148 let _ = tx.send((&e).into());
149 },
150 Err(_) => break,
151 }
152 }
153 }
154 result.join().await?;
155 Ok(())
156 }
157}
158
159pub async fn get_pusher(
160 info: &UrlInfo,
161 write_method: WriteMethod,
162 buffer_size: usize,
163 save_path: &Path,
164) -> Result<BoxPusher, String> {
165 #[cfg(target_pointer_width = "64")]
166 if info.fast_download && write_method == WriteMethod::Mmap {
167 use fast_down::file::MmapFilePusher;
168 let pusher = BoxPusher::new(
169 MmapFilePusher::new(&save_path, info.size)
170 .await
171 .map_err(|e| format!("{e:?}"))?,
172 );
173 return Ok(pusher);
174 }
175 let file = OpenOptions::new()
176 .create(true)
177 .write(true)
178 .read(true)
179 .truncate(false)
180 .open(&save_path)
181 .await
182 .map_err(|e| format!("{e:?}"))?;
183 let pusher = FilePusher::new(file, info.size, buffer_size)
184 .await
185 .map_err(|e| format!("{e:?}"))?;
186 Ok(BoxPusher::new(pusher))
187}