download_lib/
lib.rs

1mod error;
2mod file_save;
3mod reqwest_file;
4
5use aqueue::Actor;
6pub use error::DownloadError;
7use error::Result;
8use file_save::FileSave;
9use file_save::IFileSave;
10use reqwest::{IntoUrl, Response, StatusCode, Url};
11use reqwest_file::ReqwestFile;
12use std::cmp::{max, min};
13use std::path::PathBuf;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::Arc;
16use std::time::Duration;
17use log::info;
18use tokio::sync::OnceCell;
19use tokio::task::JoinHandle;
20use tokio::time::sleep;
21
22/// Down file handler
23pub struct DownloadFile {
24    task_count: u64,
25    save_file: Arc<Actor<FileSave>>,
26    inner_status: Arc<DownloadInner>,
27}
28
29impl DownloadFile {
30    /// start download now
31    #[inline]
32    pub async fn start_download<U: IntoUrl>(
33        url: U,
34        mut save_path: PathBuf,
35        task_count: u64,
36        block: u64,
37    ) -> Result<Self> {
38        let url = url.into_url()?;
39        let (size,file_name, response) = Self::get_size_and_filename(&url).await?;
40        if save_path.is_dir() {
41            if let Some(filename)=file_name{
42                save_path.push(filename);
43            }else{
44                let file_name = url
45                    .path_segments()
46                    .ok_or_else(|| DownloadError::NotFileName(url.clone()))?
47                    .rev()
48                    .next()
49                    .ok_or_else(|| DownloadError::NotFileName(url.clone()))?;
50                save_path.push(file_name);
51            }
52        }
53
54        let task_count = { max(min(task_count, size / block), 1) };
55
56        let file = Self {
57            task_count,
58            save_file: Arc::new(FileSave::create(save_path, size)?),
59            inner_status: Arc::new(DownloadInner {
60                size,
61                url,
62                is_start: Default::default(),
63                is_finish: Default::default(),
64                down_size: Default::default(),
65                byte_sec_total: Default::default(),
66                byte_sec: Default::default(),
67                error: OnceCell::default(),
68            }),
69        };
70        file.save_file.init().await?;
71        log::trace!("url file:{} init ok size:{}", file.inner_status.url, size);
72        if file.size() > 0 {
73            let size = file.size();
74            file.inner_status.is_start.store(true, Ordering::Release);
75            let connect_count = file.task_count;
76
77            if connect_count > 1 {
78                drop(response);
79                let block_size = size / connect_count;
80                let end_add_size = size % block_size;
81                assert_eq!(block_size * connect_count + end_add_size, size);
82                log::trace!(
83                    "computer task count:{}  block size:{} end add size:{}",
84                    connect_count,
85                    block_size,
86                    end_add_size
87                );
88                let save_file = file.save_file.clone();
89                let inner_status = file.inner_status.clone();
90                tokio::spawn(async move {
91                    let mut join_vec = Vec::with_capacity(connect_count as usize);
92                    for i in 0..connect_count {
93                        let down_size = if i == connect_count - 1 {
94                            block_size + end_add_size
95                        } else {
96                            block_size
97                        };
98                        let start = i * block_size;
99
100                        let save_file = save_file.clone();
101                        let inner_status = inner_status.clone();
102                        let join: JoinHandle<Result<()>> = tokio::spawn(async move {
103                            let end = start + down_size - 1;
104
105                            log::trace!(
106                                "task:{} start:{} down size:{} end:{} init",
107                                i,
108                                start,
109                                down_size,
110                                end
111                            );
112
113                            ReqwestFile::new(save_file, inner_status, start, end)
114                                .run()
115                                .await?;
116                            log::trace!("task:{} finish", i);
117                            Ok(())
118                        });
119                        join_vec.push(join);
120                    }
121
122                    let inner_status_sec = inner_status.clone();
123                    tokio::spawn(async move {
124                        while !inner_status_sec.is_finish() {
125                            inner_status_sec.byte_sec.store(
126                                inner_status_sec.byte_sec_total.swap(0, Ordering::Release),
127                                Ordering::Release,
128                            );
129                            sleep(Duration::from_secs(1)).await
130                        }
131                    });
132
133                    for task in join_vec {
134                        match task.await {
135                            Ok(Err(err)) => {
136                                log::error!("http download error:{:?}", err);
137                                if !inner_status.error.initialized() {
138                                    if let Err(err) = inner_status.error.set(err) {
139                                        log::error!("set error fail:{}", err)
140                                    }
141                                }
142                            }
143                            Err(err) => {
144                                log::error!("join error:{:?}", err);
145                                if !inner_status.error.initialized() {
146                                    if let Err(err) =
147                                        inner_status.error.set(DownloadError::JoinInError(err))
148                                    {
149                                        log::error!("set error fail:{}", err)
150                                    }
151                                }
152                            }
153                            _ => {}
154                        }
155                    }
156                    if let Err(err) = save_file.finish().await {
157                        log::error!("save file finish error:{:?}", err);
158                        if !inner_status.error.initialized() {
159                            if let Err(err) = inner_status.error.set(err) {
160                                log::error!("set error fail:{}", err)
161                            }
162                        }
163                    }
164                    inner_status
165                        .down_size
166                        .store(inner_status.size, Ordering::Release);
167                    inner_status.is_finish.store(true, Ordering::Release);
168                });
169            } else {
170                let save_file = file.save_file.clone();
171                let inner_status = file.inner_status.clone();
172
173                tokio::spawn(async move {
174                    let inner_status_sec = inner_status.clone();
175                    tokio::spawn(async move {
176                        while !inner_status_sec.is_finish() {
177                            inner_status_sec.byte_sec.store(
178                                inner_status_sec.byte_sec_total.swap(0, Ordering::Release),
179                                Ordering::Release,
180                            );
181                            sleep(Duration::from_secs(1)).await
182                        }
183                    });
184
185                    log::trace!(
186                        "start once task download url:{} size:{}",
187                        inner_status.url,
188                        size
189                    );
190
191                    match ReqwestFile::new(save_file.clone(), inner_status.clone(), 0, size - 1)
192                        .run_once(response)
193                        .await
194                    {
195                        Err(err) => {
196                            log::error!("http download error:{:?}", err);
197                            if !inner_status.error.initialized() {
198                                if let Err(err) = inner_status.error.set(err) {
199                                    log::error!("set error fail:{}", err)
200                                }
201                            }
202                        }
203                        _ => {}
204                    }
205
206                    if let Err(err) = save_file.finish().await {
207                        log::error!("save file finish error:{:?}", err);
208                        if !inner_status.error.initialized() {
209                            if let Err(err) = inner_status.error.set(err) {
210                                log::error!("set error fail:{}", err)
211                            }
212                        }
213                    }
214
215                    inner_status
216                        .down_size
217                        .store(inner_status.size, Ordering::Release);
218                    inner_status.is_finish.store(true, Ordering::Release);
219                });
220            }
221        } else {
222            file.save_file.finish().await?;
223            file.inner_status.is_finish.store(true, Ordering::Release);
224        }
225
226        Ok(file)
227    }
228
229    /// get url file size and file name
230    #[inline]
231    async fn get_size_and_filename(url: &Url) -> Result<(u64, Option<String>, Response)> {
232        let response = reqwest::Client::new().get(url.as_str()).send().await?;
233        if response.status() == StatusCode::OK {
234            let filename=Self::parse_content_filename(response.headers());
235            let size= Self::parse_content_length(response.headers())
236                .ok_or_else(|| DownloadError::NotGetFileSize(url.clone()))?;
237            Ok((
238                size,
239                filename,
240                response,
241            ))
242        } else {
243            Err(DownloadError::HttpStatusError(
244                response.status().to_string(),
245            ))
246        }
247    }
248
249    #[inline]
250    fn parse_content_length(headers: &reqwest::header::HeaderMap) -> Option<u64> {
251        headers
252            .get(reqwest::header::CONTENT_LENGTH)?
253            .to_str()
254            .ok()?
255            .parse::<u64>()
256            .ok()
257    }
258
259    #[inline]
260    fn parse_content_filename(headers: &reqwest::header::HeaderMap)->Option<String> {
261        headers
262            .get(reqwest::header::CONTENT_DISPOSITION)?
263            .to_str()
264            .ok()?
265            .trim()
266            .split(';')
267            .find_map(|content| {
268                let content = content.trim();
269                if content.find("filename") == Some(0) {
270                    content.split('=').last()
271                } else {
272                    None
273                }
274            }).map_or(None, |x| Some(x.to_string()))
275    }
276
277    /// get url
278    #[inline]
279    pub fn url(&self) -> &str {
280        self.inner_status.url()
281    }
282
283    /// get status arc
284    #[inline]
285    pub fn get_status(&self) -> Arc<DownloadInner> {
286        self.inner_status.clone()
287    }
288
289    /// file size
290    #[inline]
291    pub fn size(&self) -> u64 {
292        self.inner_status.size
293    }
294
295    /// get down size
296    #[inline]
297    pub fn get_down_size(&self) -> u64 {
298        self.inner_status.get_down_size()
299    }
300
301    /// is start
302    #[inline]
303    pub fn is_start(&self) -> bool {
304        self.inner_status.is_start()
305    }
306
307    /// is finish
308    #[inline]
309    pub fn is_finish(&self) -> bool {
310        self.inner_status.is_finish()
311    }
312
313    /// is error
314    #[inline]
315    pub fn is_error(&self) -> bool {
316        self.inner_status.is_error()
317    }
318
319    /// get error
320    #[inline]
321    pub fn get_error(&self) -> Option<&DownloadError> {
322        self.inner_status.get_error()
323    }
324
325    /// get save file real path
326    #[inline]
327    pub fn get_real_file_path(&self) -> String {
328        self.save_file.get_real_file_path()
329    }
330
331    /// suspend download
332    #[inline]
333    pub fn suspend(&self) {
334        self.inner_status.is_start.store(false, Ordering::Release);
335    }
336
337    /// restart download
338    #[inline]
339    pub fn restart(&self) {
340        self.inner_status.is_start.store(true, Ordering::Release);
341    }
342}
343
344/// download status
345pub struct DownloadInner {
346    url: Url,
347    size: u64,
348    down_size: AtomicU64,
349    is_start: AtomicBool,
350    is_finish: AtomicBool,
351    error: OnceCell<DownloadError>,
352    byte_sec: AtomicU64,
353    byte_sec_total: AtomicU64,
354}
355
356impl DownloadInner {
357    /// get url
358    #[inline]
359    pub fn url(&self) -> &str {
360        self.url.as_str()
361    }
362
363    /// is start
364    #[inline]
365    pub fn is_start(&self) -> bool {
366        self.is_start.load(Ordering::Acquire)
367    }
368
369    /// is finish
370    #[inline]
371    pub fn is_finish(&self) -> bool {
372        self.is_finish.load(Ordering::Acquire)
373    }
374
375    /// is error
376    #[inline]
377    pub fn is_error(&self) -> bool {
378        self.error.initialized()
379    }
380
381    /// get error
382    #[inline]
383    pub fn get_error(&self) -> Option<&DownloadError> {
384        self.error.get()
385    }
386
387    /// get complete percent
388    #[inline]
389    pub fn get_percent_complete(&self) -> f64 {
390        let current =
391            self.down_size.load(Ordering::Acquire) as f64 / self.size.max(1) as f64 * 100.0;
392        (current * 100.0).round() / 100.0
393    }
394
395    /// computer bs
396    #[inline]
397    pub fn get_byte_sec(&self) -> u64 {
398        self.byte_sec.load(Ordering::Acquire)
399    }
400
401    /// get size
402    #[inline]
403    pub fn get_down_size(&self) -> u64 {
404        self.down_size.load(Ordering::Acquire)
405    }
406
407    /// add down size
408    #[inline]
409    fn add_down_size(&self, len: u64) {
410        self.down_size.fetch_add(len, Ordering::Release);
411        self.byte_sec_total.fetch_add(len, Ordering::Release);
412    }
413}