1mod error;
2mod file_save;
3mod reqwest_file;
4
5use aqueue::Actor;
6pub use error::DownloadError;
7use error::Result;
8use file_save::FileSave;
9use file_save::IFileSave;
10use reqwest::{IntoUrl, Response, StatusCode, Url};
11use reqwest_file::ReqwestFile;
12use std::cmp::{max, min};
13use std::path::PathBuf;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use log::info;
18use tokio::sync::OnceCell;
19use tokio::task::JoinHandle;
20use tokio::time::sleep;
21
22pub struct DownloadFile {
24 task_count: u64,
25 save_file: Arc<Actor<FileSave>>,
26 inner_status: Arc<DownloadInner>,
27}
28
29impl DownloadFile {
30 #[inline]
32 pub async fn start_download<U: IntoUrl>(
33 url: U,
34 mut save_path: PathBuf,
35 task_count: u64,
36 block: u64,
37 ) -> Result<Self> {
38 let url = url.into_url()?;
39 let (size,file_name, response) = Self::get_size_and_filename(&url).await?;
40 if save_path.is_dir() {
41 if let Some(filename)=file_name{
42 save_path.push(filename);
43 }else{
44 let file_name = url
45 .path_segments()
46 .ok_or_else(|| DownloadError::NotFileName(url.clone()))?
47 .rev()
48 .next()
49 .ok_or_else(|| DownloadError::NotFileName(url.clone()))?;
50 save_path.push(file_name);
51 }
52 }
53
54 let task_count = { max(min(task_count, size / block), 1) };
55
56 let file = Self {
57 task_count,
58 save_file: Arc::new(FileSave::create(save_path, size)?),
59 inner_status: Arc::new(DownloadInner {
60 size,
61 url,
62 is_start: Default::default(),
63 is_finish: Default::default(),
64 down_size: Default::default(),
65 byte_sec_total: Default::default(),
66 byte_sec: Default::default(),
67 error: OnceCell::default(),
68 }),
69 };
70 file.save_file.init().await?;
71 log::trace!("url file:{} init ok size:{}", file.inner_status.url, size);
72 if file.size() > 0 {
73 let size = file.size();
74 file.inner_status.is_start.store(true, Ordering::Release);
75 let connect_count = file.task_count;
76
77 if connect_count > 1 {
78 drop(response);
79 let block_size = size / connect_count;
80 let end_add_size = size % block_size;
81 assert_eq!(block_size * connect_count + end_add_size, size);
82 log::trace!(
83 "computer task count:{} block size:{} end add size:{}",
84 connect_count,
85 block_size,
86 end_add_size
87 );
88 let save_file = file.save_file.clone();
89 let inner_status = file.inner_status.clone();
90 tokio::spawn(async move {
91 let mut join_vec = Vec::with_capacity(connect_count as usize);
92 for i in 0..connect_count {
93 let down_size = if i == connect_count - 1 {
94 block_size + end_add_size
95 } else {
96 block_size
97 };
98 let start = i * block_size;
99
100 let save_file = save_file.clone();
101 let inner_status = inner_status.clone();
102 let join: JoinHandle<Result<()>> = tokio::spawn(async move {
103 let end = start + down_size - 1;
104
105 log::trace!(
106 "task:{} start:{} down size:{} end:{} init",
107 i,
108 start,
109 down_size,
110 end
111 );
112
113 ReqwestFile::new(save_file, inner_status, start, end)
114 .run()
115 .await?;
116 log::trace!("task:{} finish", i);
117 Ok(())
118 });
119 join_vec.push(join);
120 }
121
122 let inner_status_sec = inner_status.clone();
123 tokio::spawn(async move {
124 while !inner_status_sec.is_finish() {
125 inner_status_sec.byte_sec.store(
126 inner_status_sec.byte_sec_total.swap(0, Ordering::Release),
127 Ordering::Release,
128 );
129 sleep(Duration::from_secs(1)).await
130 }
131 });
132
133 for task in join_vec {
134 match task.await {
135 Ok(Err(err)) => {
136 log::error!("http download error:{:?}", err);
137 if !inner_status.error.initialized() {
138 if let Err(err) = inner_status.error.set(err) {
139 log::error!("set error fail:{}", err)
140 }
141 }
142 }
143 Err(err) => {
144 log::error!("join error:{:?}", err);
145 if !inner_status.error.initialized() {
146 if let Err(err) =
147 inner_status.error.set(DownloadError::JoinInError(err))
148 {
149 log::error!("set error fail:{}", err)
150 }
151 }
152 }
153 _ => {}
154 }
155 }
156 if let Err(err) = save_file.finish().await {
157 log::error!("save file finish error:{:?}", err);
158 if !inner_status.error.initialized() {
159 if let Err(err) = inner_status.error.set(err) {
160 log::error!("set error fail:{}", err)
161 }
162 }
163 }
164 inner_status
165 .down_size
166 .store(inner_status.size, Ordering::Release);
167 inner_status.is_finish.store(true, Ordering::Release);
168 });
169 } else {
170 let save_file = file.save_file.clone();
171 let inner_status = file.inner_status.clone();
172
173 tokio::spawn(async move {
174 let inner_status_sec = inner_status.clone();
175 tokio::spawn(async move {
176 while !inner_status_sec.is_finish() {
177 inner_status_sec.byte_sec.store(
178 inner_status_sec.byte_sec_total.swap(0, Ordering::Release),
179 Ordering::Release,
180 );
181 sleep(Duration::from_secs(1)).await
182 }
183 });
184
185 log::trace!(
186 "start once task download url:{} size:{}",
187 inner_status.url,
188 size
189 );
190
191 match ReqwestFile::new(save_file.clone(), inner_status.clone(), 0, size - 1)
192 .run_once(response)
193 .await
194 {
195 Err(err) => {
196 log::error!("http download error:{:?}", err);
197 if !inner_status.error.initialized() {
198 if let Err(err) = inner_status.error.set(err) {
199 log::error!("set error fail:{}", err)
200 }
201 }
202 }
203 _ => {}
204 }
205
206 if let Err(err) = save_file.finish().await {
207 log::error!("save file finish error:{:?}", err);
208 if !inner_status.error.initialized() {
209 if let Err(err) = inner_status.error.set(err) {
210 log::error!("set error fail:{}", err)
211 }
212 }
213 }
214
215 inner_status
216 .down_size
217 .store(inner_status.size, Ordering::Release);
218 inner_status.is_finish.store(true, Ordering::Release);
219 });
220 }
221 } else {
222 file.save_file.finish().await?;
223 file.inner_status.is_finish.store(true, Ordering::Release);
224 }
225
226 Ok(file)
227 }
228
229 #[inline]
231 async fn get_size_and_filename(url: &Url) -> Result<(u64, Option<String>, Response)> {
232 let response = reqwest::Client::new().get(url.as_str()).send().await?;
233 if response.status() == StatusCode::OK {
234 let filename=Self::parse_content_filename(response.headers());
235 let size= Self::parse_content_length(response.headers())
236 .ok_or_else(|| DownloadError::NotGetFileSize(url.clone()))?;
237 Ok((
238 size,
239 filename,
240 response,
241 ))
242 } else {
243 Err(DownloadError::HttpStatusError(
244 response.status().to_string(),
245 ))
246 }
247 }
248
249 #[inline]
250 fn parse_content_length(headers: &reqwest::header::HeaderMap) -> Option<u64> {
251 headers
252 .get(reqwest::header::CONTENT_LENGTH)?
253 .to_str()
254 .ok()?
255 .parse::<u64>()
256 .ok()
257 }
258
259 #[inline]
260 fn parse_content_filename(headers: &reqwest::header::HeaderMap)->Option<String> {
261 headers
262 .get(reqwest::header::CONTENT_DISPOSITION)?
263 .to_str()
264 .ok()?
265 .trim()
266 .split(';')
267 .find_map(|content| {
268 let content = content.trim();
269 if content.find("filename") == Some(0) {
270 content.split('=').last()
271 } else {
272 None
273 }
274 }).map_or(None, |x| Some(x.to_string()))
275 }
276
277 #[inline]
279 pub fn url(&self) -> &str {
280 self.inner_status.url()
281 }
282
283 #[inline]
285 pub fn get_status(&self) -> Arc<DownloadInner> {
286 self.inner_status.clone()
287 }
288
289 #[inline]
291 pub fn size(&self) -> u64 {
292 self.inner_status.size
293 }
294
295 #[inline]
297 pub fn get_down_size(&self) -> u64 {
298 self.inner_status.get_down_size()
299 }
300
301 #[inline]
303 pub fn is_start(&self) -> bool {
304 self.inner_status.is_start()
305 }
306
307 #[inline]
309 pub fn is_finish(&self) -> bool {
310 self.inner_status.is_finish()
311 }
312
313 #[inline]
315 pub fn is_error(&self) -> bool {
316 self.inner_status.is_error()
317 }
318
319 #[inline]
321 pub fn get_error(&self) -> Option<&DownloadError> {
322 self.inner_status.get_error()
323 }
324
325 #[inline]
327 pub fn get_real_file_path(&self) -> String {
328 self.save_file.get_real_file_path()
329 }
330
331 #[inline]
333 pub fn suspend(&self) {
334 self.inner_status.is_start.store(false, Ordering::Release);
335 }
336
337 #[inline]
339 pub fn restart(&self) {
340 self.inner_status.is_start.store(true, Ordering::Release);
341 }
342}
343
344pub struct DownloadInner {
346 url: Url,
347 size: u64,
348 down_size: AtomicU64,
349 is_start: AtomicBool,
350 is_finish: AtomicBool,
351 error: OnceCell<DownloadError>,
352 byte_sec: AtomicU64,
353 byte_sec_total: AtomicU64,
354}
355
356impl DownloadInner {
357 #[inline]
359 pub fn url(&self) -> &str {
360 self.url.as_str()
361 }
362
363 #[inline]
365 pub fn is_start(&self) -> bool {
366 self.is_start.load(Ordering::Acquire)
367 }
368
369 #[inline]
371 pub fn is_finish(&self) -> bool {
372 self.is_finish.load(Ordering::Acquire)
373 }
374
375 #[inline]
377 pub fn is_error(&self) -> bool {
378 self.error.initialized()
379 }
380
381 #[inline]
383 pub fn get_error(&self) -> Option<&DownloadError> {
384 self.error.get()
385 }
386
387 #[inline]
389 pub fn get_percent_complete(&self) -> f64 {
390 let current =
391 self.down_size.load(Ordering::Acquire) as f64 / self.size.max(1) as f64 * 100.0;
392 (current * 100.0).round() / 100.0
393 }
394
395 #[inline]
397 pub fn get_byte_sec(&self) -> u64 {
398 self.byte_sec.load(Ordering::Acquire)
399 }
400
401 #[inline]
403 pub fn get_down_size(&self) -> u64 {
404 self.down_size.load(Ordering::Acquire)
405 }
406
407 #[inline]
409 fn add_down_size(&self, len: u64) {
410 self.down_size.fetch_add(len, Ordering::Release);
411 self.byte_sec_total.fetch_add(len, Ordering::Release);
412 }
413}