http_downloader/
downloader.rs

1use std::future::Future;
2use std::io::SeekFrom;
3use std::num::{NonZeroU64, NonZeroU8, NonZeroUsize};
4use std::path::PathBuf;
5use std::sync::{Arc, Weak};
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use anyhow::Result;
9use futures_util::future::BoxFuture;
10use futures_util::FutureExt;
11#[cfg(feature = "async-stream")]
12use futures_util::Stream;
13use headers::{Header, HeaderMapExt};
14use parking_lot::RwLock;
15use thiserror::Error;
16use tokio::{io, sync};
17use tokio::io::AsyncSeekExt;
18use tokio::sync::watch::error::SendError;
19use tokio::task::JoinError;
20use tokio::time::Instant;
21use tokio_util::sync::CancellationToken;
22
23use crate::{ChunkData, ChunkItem, ChunkIterator, ChunkManager, ChunksInfo, DownloadArchiveData, DownloadedLenChangeNotify, DownloaderWrapper, DownloadFuture, DownloadWay, HttpDownloadConfig, HttpRedirectionHandle, RemainingChunks, SingleDownload};
24use crate::exclusive::Exclusive;
25
26#[derive(Clone, Copy, Eq, PartialEq, Debug)]
27pub enum DownloadingEndCause {
28    DownloadFinished,
29    Cancelled,
30}
31
32#[derive(Error, Debug)]
33pub enum DownloadStartError {
34    #[error("file create failed,{:?}", .0)]
35    FileCrateFailed(#[from] io::Error),
36    #[error("{:?}", .0)]
37    Other(#[from] anyhow::Error),
38
39    #[error("already downloading")]
40    AlreadyDownloading,
41    #[error("Directory does not exist")]
42    DirectoryDoesNotExist,
43
44    #[cfg(feature = "status-tracker")]
45    #[error("Initializing")]
46    Initializing,
47    #[cfg(feature = "status-tracker")]
48    #[error("Starting")]
49    Starting,
50    #[cfg(feature = "status-tracker")]
51    #[error("Stopping")]
52    Stopping,
53}
54
55#[derive(Debug, Copy, Clone)]
56pub enum HttpResponseInvalidCause {
57    ContentLengthInvalid,
58    StatusCodeUnsuccessful,
59    RedirectionNoLocation,
60}
61
62#[derive(Error, Debug)]
63pub enum DownloadError {
64    #[error("{:?}", .0)]
65    Other(#[from] anyhow::Error),
66    #[error("ArchiveDataLoadError {:?}", .0)]
67    ArchiveDataLoadError(anyhow::Error),
68    #[error("IoError,{:?}", .0)]
69    IoError(#[from] io::Error),
70    #[error("JoinError,{:?}", .0)]
71    JoinError(#[from] JoinError),
72    #[error("chunk remove failed,{:?}", .0)]
73    ChunkRemoveFailed(usize),
74    #[error("downloading chunk remove failed,{:?}", .0)]
75    DownloadingChunkRemoveFailed(usize),
76    #[error("http request failed,{:?}", .0)]
77    HttpRequestFailed(#[from] reqwest::Error),
78    #[error("http request response invalid,{:?}", .0)]
79    HttpRequestResponseInvalid(HttpResponseInvalidCause, reqwest::Response),
80    #[error("The server file has changed.")]
81    ServerFileAlreadyChanged,
82    #[error("The redirection times are too many.")]
83    RedirectionTimesTooMany,
84}
85
86#[derive(Error, Debug)]
87pub enum DownloadToEndError {
88    #[error("{:?}", .0)]
89    DownloadStartError(#[from] DownloadStartError),
90    #[error("{:?}", .0)]
91    DownloadError(#[from] DownloadError),
92}
93
94#[derive(Error, Debug)]
95pub enum ChangeConnectionCountError {
96    #[error("SendError")]
97    SendError(#[from] SendError<u8>),
98    #[error("it is no start")]
99    NoStart,
100    #[error("The download target is not supported")]
101    DownloadTargetNotSupported,
102}
103
104#[derive(Error, Debug)]
105pub enum ChangeChunkSizeError {
106    #[error("it is no start")]
107    NoStart,
108    #[error("The download target is not supported")]
109    DownloadTargetNotSupported,
110}
111
112pub struct DownloadingState {
113    pub downloading_duration: u32,
114    pub download_instant: Instant,
115    pub download_way: DownloadWay,
116}
117
118impl DownloadingState {
119    pub fn get_current_downloading_duration(&self) -> u32 {
120        self.downloading_duration + self.download_instant.elapsed().as_secs() as u32
121    }
122}
123
124#[cfg(feature = "breakpoint-resume")]
125#[derive(Default)]
126pub struct BreakpointResume {
127    pub data_archive_notify: sync::Notify,
128    pub archive_complete_notify: sync::Notify,
129}
130
131pub struct HttpFileDownloader {
132    pub downloading_state_oneshot_vec: Vec<sync::oneshot::Sender<Arc<DownloadingState>>>,
133    pub downloaded_len_change_notify: Option<Arc<dyn DownloadedLenChangeNotify>>,
134    pub archive_data_future: Option<Exclusive<BoxFuture<'static, Result<Option<Box<DownloadArchiveData>>>>>>,
135    #[cfg(feature = "breakpoint-resume")]
136    pub breakpoint_resume: Option<Arc<BreakpointResume>>,
137    pub config: Arc<HttpDownloadConfig>,
138    pub downloaded_len_receiver: sync::watch::Receiver<u64>,
139    pub content_length: Arc<AtomicU64>,
140    client: reqwest::Client,
141    downloading_state: Arc<RwLock<
142        Option<(
143            sync::oneshot::Receiver<DownloadingEndCause>,
144            Arc<DownloadingState>,
145        )>,
146    >>,
147    downloaded_len_sender: Arc<sync::watch::Sender<u64>>,
148    pub cancel_token: CancellationToken,
149    total_size_semaphore: Arc<sync::Semaphore>,
150}
151
152impl HttpFileDownloader {
153    pub fn new(client: reqwest::Client, config: Arc<HttpDownloadConfig>) -> Self {
154        let cancel_token = config.cancel_token.clone().unwrap_or_default();
155        let (downloaded_len_sender, downloaded_len_receiver) = sync::watch::channel::<u64>(0);
156        let total_size_semaphore = Arc::new(sync::Semaphore::new(0));
157
158        Self {
159            downloading_state_oneshot_vec: vec![],
160            downloaded_len_change_notify: None,
161            archive_data_future: None,
162            #[cfg(feature = "breakpoint-resume")]
163            breakpoint_resume: None,
164            config,
165            total_size_semaphore,
166            content_length: Default::default(),
167            client,
168            downloading_state: Default::default(),
169            downloaded_len_receiver,
170            downloaded_len_sender: Arc::new(downloaded_len_sender),
171            cancel_token,
172        }
173    }
174
175    pub fn is_downloading(&self) -> bool {
176        self.downloading_state.read().is_some()
177    }
178
179    pub fn change_connection_count(
180        &self,
181        connection_count: NonZeroU8,
182    ) -> Result<(), ChangeConnectionCountError> {
183        match self.downloading_state.read().as_ref() {
184            None => Err(ChangeConnectionCountError::NoStart),
185            Some((_, downloading_state)) => match &downloading_state.download_way {
186                DownloadWay::Single(_) => {
187                    Err(ChangeConnectionCountError::DownloadTargetNotSupported)
188                }
189                DownloadWay::Ranges(chunk_manager) => {
190                    chunk_manager.change_connection_count(connection_count)?;
191                    Ok(())
192                }
193            },
194        }
195    }
196    pub fn change_chunk_size(&self, chunk_size: NonZeroUsize) -> Result<(), ChangeChunkSizeError> {
197        match self.downloading_state.read().as_ref() {
198            None => Err(ChangeChunkSizeError::NoStart),
199            Some((_, downloading_state)) => match &downloading_state.download_way {
200                DownloadWay::Single(_) => Err(ChangeChunkSizeError::DownloadTargetNotSupported),
201                DownloadWay::Ranges(chunk_manager) => {
202                    chunk_manager.change_chunk_size(chunk_size);
203                    Ok(())
204                }
205            },
206        }
207    }
208
209    #[cfg(feature = "async-stream")]
210    pub fn downloaded_len_stream(&self) -> impl Stream<Item=u64> + 'static {
211        let mut downloaded_len_receiver = self.downloaded_len_receiver.clone();
212        let duration = self.config.downloaded_len_send_interval.clone();
213        async_stream::stream! {
214            let downloaded_len = *downloaded_len_receiver.borrow();
215            yield downloaded_len;
216            while downloaded_len_receiver.changed().await.is_ok() {
217                let downloaded_len = *downloaded_len_receiver.borrow();
218                yield downloaded_len;
219                if let Some(duration) = duration{
220                   tokio::time::sleep(duration).await;
221                }
222            }
223        }
224    }
225
226    #[cfg(feature = "async-stream")]
227    pub fn chunks_stream(&self) -> Option<impl Stream<Item=Vec<Arc<ChunkItem>>> + 'static> {
228        match self.downloading_state.read().as_ref() {
229            None => {
230                // tracing::info!("downloading_state is null!");
231                None
232            }
233            Some((_, downloading_state)) => match &downloading_state.download_way {
234                DownloadWay::Single(_) => {
235                    // tracing::info!("DownloadWay is Single!");
236                    None
237                }
238                DownloadWay::Ranges(chunk_manager) => {
239                    let mut downloaded_len_receiver = self.downloaded_len_receiver.clone();
240                    let chunk_manager = chunk_manager.to_owned();
241                    let duration = self.config.chunks_send_interval.clone();
242                    Some(async_stream::stream! {
243                          yield chunk_manager.get_chunks().await;
244                          while downloaded_len_receiver.changed().await.is_ok() {
245                              yield chunk_manager.get_chunks().await;
246                              if let Some(duration) = duration {
247                                 tokio::time::sleep(duration).await;
248                              }
249                          }
250                    })
251                }
252            },
253        }
254    }
255
256    #[cfg(feature = "async-stream")]
257    pub fn chunks_info_stream(&self) -> Option<impl Stream<Item=ChunksInfo> + 'static> {
258        match self.downloading_state.read().as_ref() {
259            None => {
260                // tracing::info!("downloading_state is null!");
261                None
262            }
263            Some((_, downloading_state)) => match &downloading_state.download_way {
264                DownloadWay::Single(_) => {
265                    // tracing::info!("DownloadWay is Single!");
266                    None
267                }
268                DownloadWay::Ranges(chunk_manager) => {
269                    let mut downloaded_len_receiver = self.downloaded_len_receiver.clone();
270                    let chunk_manager = chunk_manager.to_owned();
271                    let duration = self.config.chunks_send_interval.clone();
272                    Some(async_stream::stream! {
273                          yield chunk_manager.get_chunks_info().await;
274                          while downloaded_len_receiver.changed().await.is_ok() {
275                              yield chunk_manager.get_chunks_info().await;
276                              if let Some(duration) = duration {
277                                 tokio::time::sleep(duration).await;
278                              }
279                          }
280                    })
281                }
282            },
283        }
284    }
285
286    pub fn downloaded_len(&self) -> u64 {
287        *self.downloaded_len_receiver.borrow()
288    }
289
290    pub fn total_size_future(&self) -> impl Future<Output=Option<NonZeroU64>> + 'static {
291        let total_size_semaphore = self.total_size_semaphore.clone();
292        let content_length = self.content_length.clone();
293        async move {
294            let _ = total_size_semaphore.acquire().await;
295            let content_length = content_length.load(Ordering::Relaxed);
296            if content_length == 0 {
297                None
298            } else {
299                Some(NonZeroU64::new(content_length).unwrap())
300            }
301        }
302    }
303
304    pub fn current_total_size(&self) -> Option<NonZeroU64> {
305        let content_length = self.content_length.load(Ordering::Relaxed);
306        if content_length == 0 {
307            None
308        } else {
309            Some(NonZeroU64::new(content_length).unwrap())
310        }
311    }
312
313    pub fn get_chunk_manager(&self) -> Option<Weak<ChunkManager>> {
314        self.get_downloading_state().and_then(|n|n.upgrade()).and_then(|downloading_state| {
315            if let DownloadWay::Ranges(item) = &downloading_state.download_way {
316                Some(Arc::downgrade(item))
317            } else {
318                None
319            }
320        })
321    }
322    pub fn get_downloading_state(&self) -> Option<Weak<DownloadingState>> {
323        let guard = &self.downloading_state.read();
324        guard.as_ref().map(|n| Arc::downgrade(&n.1))
325    }
326
327    pub async fn get_chunks(&self) -> Vec<Arc<ChunkItem>> {
328        match self.get_chunk_manager().and_then(|n|n.upgrade()) {
329            None => Vec::new(),
330            Some(n) => n.get_chunks().await,
331        }
332    }
333
334    pub async fn get_chunks_info(&self) -> Option<ChunksInfo> {
335        match self.get_chunk_manager().and_then(|n|n.upgrade()) {
336            None => None,
337            Some(n) => Some(n.get_chunks_info().await),
338        }
339    }
340
341    pub fn get_file_path(&self) -> PathBuf {
342        self.config.file_path()
343    }
344
345    fn reset(&self) {
346        self.downloaded_len_sender.send(0).unwrap_or_else(|_err| {
347            #[cfg(feature = "tracing")]
348            tracing::trace!("send downloaded_len failed! {}", _err);
349        });
350    }
351
352    pub(crate) fn download(
353        &mut self,
354    ) -> Result<
355        impl Future<Output=Result<DownloadingEndCause, DownloadError>> + Send + 'static,
356        DownloadStartError,
357    > {
358        self.reset();
359        if self.is_downloading() {
360            return Err(DownloadStartError::AlreadyDownloading);
361        }
362
363        if self.config.create_dir {
364            std::fs::create_dir_all(&self.config.save_dir)?;
365        } else if !self.config.save_dir.exists() {
366            return Err(DownloadStartError::DirectoryDoesNotExist);
367        }
368        Ok(self.start_download())
369    }
370
371    pub fn take_downloading_state(
372        &self,
373    ) -> Option<(
374        sync::oneshot::Receiver<DownloadingEndCause>,
375        Arc<DownloadingState>,
376    )> {
377        let mut guard = self.downloading_state.write();
378        guard.take()
379    }
380
381    pub fn cancel(&self) -> impl Future<Output=()> + 'static {
382        let downloading_state = self.downloading_state.clone();
383        let token = self.cancel_token.clone();
384        async move {
385            let (receiver, _) = {
386                let mut guard = downloading_state.write();
387                let option = guard.take();
388                if option.is_none() {
389                    return;
390                }
391                option.unwrap()
392            };
393            token.cancel();
394            receiver.await.unwrap();
395        }
396    }
397
398    //noinspection RsExternalLinter
399    fn start_download(
400        &mut self,
401    ) -> impl Future<Output=Result<DownloadingEndCause, DownloadError>> + 'static {
402        if self.cancel_token.is_cancelled() {
403            self.cancel_token = CancellationToken::new();
404        }
405        let config = self.config.clone();
406        let client = self.client.clone();
407        let total_size_semaphore = self.total_size_semaphore.clone();
408        let content_length_arc = self.content_length.clone();
409        let downloading_state = self.downloading_state.clone();
410        let downloaded_len_change_notify = self.downloaded_len_change_notify.take();
411        let archive_data_future = self.archive_data_future.take();
412        let downloading_state_oneshot_vec: Vec<sync::oneshot::Sender<Arc<DownloadingState>>> = self.downloading_state_oneshot_vec.drain(..).collect();
413        let downloaded_len_sender = self.downloaded_len_sender.clone();
414        let cancel_token = self.cancel_token.clone();
415        #[cfg(feature = "breakpoint-resume")]
416            let breakpoint_resume = self.breakpoint_resume.take();
417
418
419        async move {
420            fn request<'a>(client: &'a reqwest::Client, config: &'a HttpDownloadConfig, location: Option<String>, redirection_times: usize) -> BoxFuture<'a, Result<(reqwest::Response, Option<String>), DownloadError>> {
421                async move {
422                    if let HttpRedirectionHandle::RequestNewLocation { max_times } = config.handle_redirection {
423                        if redirection_times >= max_times {
424                            return Err(DownloadError::RedirectionTimesTooMany);
425                        }
426                    }
427                    let mut retry_count = 0;
428                    let response = loop {
429                        let response = client.execute(config.create_http_request(location.as_ref().map(|n| n.as_str())))
430                            .await.and_then(|n| n.error_for_status());
431
432                        if response.is_err() && retry_count < config.request_retry_count {
433                            retry_count += 1;
434                            #[cfg(feature = "tracing")]
435                            tracing::trace!(
436                            "Request error! {:?},retry_info: {}/{}",
437                            response.unwrap_err(),
438                            retry_count,
439                            config.request_retry_count
440                        );
441                            continue;
442                        }
443                        break response;
444                    };
445                    // todo: 删除重定向,reqwest 本身可以处理重定向
446                    match response {
447                        Ok(response) if config.handle_redirection != HttpRedirectionHandle::Invalid && response.status().is_redirection() => {
448                            let Some(location) = response.headers().get(headers::Location::name()) else {
449                                return Err(DownloadError::HttpRequestResponseInvalid(HttpResponseInvalidCause::RedirectionNoLocation, response));
450                            };
451                            let Ok(location) = location.to_str().map(|n| n.to_string()) else {
452                                return Err(DownloadError::HttpRequestResponseInvalid(HttpResponseInvalidCause::RedirectionNoLocation, response));
453                            };
454                            println!("handle_redirection!!!!!!! {}",location);
455                            request(client, config, Some(location), redirection_times + 1).await
456                        }
457                        Ok(response) if !response.status().is_success() => {
458                            Err(DownloadError::HttpRequestResponseInvalid(HttpResponseInvalidCause::StatusCodeUnsuccessful, response))
459                        }
460                        Err(err) => {
461                            Err(DownloadError::HttpRequestFailed(err))
462                        }
463                        Ok(response) => Ok((response, location)),
464                    }
465                }.boxed()
466            }
467
468            let (end_sender, end_receiver) = sync::oneshot::channel();
469            let dec = {
470                let (response, location) = match request(&client, &config, None, 0).await {
471                    Ok(r) => r,
472                    Err(err) => {
473                        total_size_semaphore.add_permits(1);
474                        return Err(err.into());
475                    }
476                };
477                let etag = {
478                    if config.etag.is_some() {
479                        let cur_etag = response.headers().typed_get::<headers::ETag>();
480                        if cur_etag == config.etag {
481                            #[cfg(feature = "tracing")]
482                            tracing::trace!(
483                        "etag mismatching,your etag: {:?} , current etag:{:?}",
484                        config.etag,
485                        cur_etag
486                    );
487                            total_size_semaphore.add_permits(1);
488                            return Err(DownloadError::ServerFileAlreadyChanged);
489                        }
490                        cur_etag
491                    } else {
492                        None
493                    }
494                };
495                let content_length = response
496                    .headers()
497                    .typed_get::<headers::ContentLength>()
498                    .map(|n| n.0)
499                    .and_then(|n| if n == 0 { None } else { Some(n) });
500
501                if let Some(0) = content_length {
502                    total_size_semaphore.add_permits(1);
503                    return Err(DownloadError::HttpRequestResponseInvalid(HttpResponseInvalidCause::ContentLengthInvalid, response));
504                }
505                content_length_arc.store(content_length.unwrap_or(0), Ordering::Relaxed);
506
507                let accept_ranges = response.headers().typed_get::<headers::AcceptRanges>();
508
509                let is_ranges_bytes_none = accept_ranges.is_none();
510                let is_ranges_bytes =
511                    !is_ranges_bytes_none && accept_ranges.unwrap() == headers::AcceptRanges::bytes();
512                let archive_data = match archive_data_future {
513                    None => { None }
514                    Some(archive_data_future) => {
515                        archive_data_future.await.map_err(DownloadError::ArchiveDataLoadError)?
516                    }
517                };
518                let downloading_duration = archive_data.as_ref()
519                    .map(|n| n.downloading_duration)
520                    .unwrap_or(0);
521                let download_way = {
522                    if content_length.is_some()
523                        && (if config.strict_check_accept_ranges {
524                        is_ranges_bytes
525                    } else {
526                        is_ranges_bytes_none || is_ranges_bytes
527                    })
528                    {
529                        let content_length = content_length.unwrap();
530                        let chunk_data = archive_data
531                            .and_then(|archive_data| {
532                                downloaded_len_sender
533                                    .send(archive_data.downloaded_len)
534                                    .unwrap_or_else(|_err| {
535                                        #[cfg(feature = "tracing")]
536                                        tracing::error!("send downloaded_len failed! {}", _err);
537                                    });
538                                archive_data.chunk_data.map(|mut data| {
539                                    data.remaining.chunk_size = config.chunk_size.get();
540                                    data
541                                })
542                            })
543                            .unwrap_or_else(|| ChunkData {
544                                iter_count: 0,
545                                remaining: RemainingChunks::new(config.chunk_size, content_length),
546                                last_incomplete_chunks: Default::default(),
547                            });
548
549                        let chunk_iterator = ChunkIterator::new(content_length, chunk_data);
550                        let chunk_manager = Arc::new(ChunkManager::new(
551                            config.download_connection_count,
552                            client,
553                            cancel_token,
554                            downloaded_len_sender,
555                            chunk_iterator,
556                            etag,
557                            config.request_retry_count,
558                        ));
559                        DownloadWay::Ranges(chunk_manager)
560                    } else {
561                        DownloadWay::Single(SingleDownload::new(
562                            cancel_token,
563                            downloaded_len_sender,
564                            content_length,
565                        ))
566                    }
567                };
568
569                let state = DownloadingState {
570                    downloading_duration,
571                    download_instant: Instant::now(),
572                    download_way,
573                };
574
575
576                let state = Arc::new(state);
577                {
578                    let mut guard = downloading_state.write();
579                    *guard = Some((end_receiver, state.clone()));
580                }
581
582                total_size_semaphore.add_permits(1);
583
584                let file = {
585                    let mut options = std::fs::OpenOptions::new();
586                    (config.open_option)(&mut options);
587                    let mut file = tokio::fs::OpenOptions::from(options)
588                        .open(config.file_path())
589                        .await?;
590                    if config.set_len_in_advance {
591                        file.set_len(content_length.unwrap()).await?
592                    }
593                    file.seek(SeekFrom::Start(0)).await?;
594                    file
595                };
596
597                for oneshot in downloading_state_oneshot_vec.into_iter() {
598                    oneshot.send(state.clone()).unwrap_or_else(|_| {
599                        #[cfg(feature = "tracing")]
600                        tracing::trace!("send download_way failed!");
601                    });
602                }
603
604                let dec_result = match &state.download_way {
605                    DownloadWay::Ranges(item) => {
606                        let request = Box::new(config.create_http_request(location.as_ref().map(|n| n.as_str())));
607                        item.start_download(
608                            file,
609                            request,
610                            downloaded_len_change_notify,
611                            #[cfg(feature = "breakpoint-resume")]
612                                breakpoint_resume,
613                        )
614                            .await
615                    }
616                    DownloadWay::Single(item) => {
617                        item.download(
618                            file,
619                            Box::new(response),
620                            downloaded_len_change_notify,
621                            config.chunk_size.get(),
622                        )
623                            .await
624                    }
625                };
626
627                if {
628                    let r = downloading_state.read().is_some();
629                    r
630                } {
631                    let mut guard = downloading_state.write();
632                    *guard = None;
633                }
634
635                dec_result?
636            };
637
638            end_sender.send(dec).unwrap_or_else(|_err| {
639                #[cfg(feature = "tracing")]
640                tracing::trace!("DownloadingEndCause Send Failed! {:?}", _err);
641            });
642
643            Ok(dec)
644        }
645    }
646}
647
648pub struct ExtendedHttpFileDownloader {
649    pub inner: HttpFileDownloader,
650    downloader_wrapper: Box<dyn DownloaderWrapper>,
651}
652
653impl ExtendedHttpFileDownloader {
654    pub fn new(
655        downloader: HttpFileDownloader,
656        downloader_wrapper: Box<dyn DownloaderWrapper>,
657    ) -> Self {
658        Self {
659            inner: downloader,
660            downloader_wrapper,
661        }
662    }
663
664    /// 准备下载,返回了用于下载用的 'static 的 Future
665    pub fn prepare_download(&mut self) -> Result<DownloadFuture, DownloadStartError> {
666        self.downloader_wrapper.prepare_download(&mut self.inner)?;
667        let prepare_download_result = self.inner.download();
668        let download_future = self.downloader_wrapper.handle_prepare_download_result(&mut self.inner, prepare_download_result.map(|n| n.boxed()))?;
669
670        self.downloader_wrapper.download(&mut self.inner, download_future)
671    }
672
673    /// 取消下载
674    pub fn cancel(&self) -> impl Future<Output=()> + 'static {
675        let cancel = self.downloader_wrapper.on_cancel();
676        let cancel_future = self.inner.cancel();
677        async move {
678            cancel.await;
679            cancel_future.await;
680        }
681    }
682
683    /// 是否正在下载
684    #[inline]
685    pub fn is_downloading(&self) -> bool {
686        self.inner.is_downloading()
687    }
688
689    /// 已下载长度流
690    #[cfg(feature = "async-stream")]
691    #[inline]
692    pub fn downloaded_len_stream(&self) -> impl Stream<Item=u64> + 'static {
693        self.inner.downloaded_len_stream()
694    }
695
696    /// 更改连接数
697    #[inline]
698    pub fn change_connection_count(
699        &self,
700        connection_count: NonZeroU8,
701    ) -> Result<(), ChangeConnectionCountError> {
702        self.inner.change_connection_count(connection_count)
703    }
704
705    /// 更改 chunk 大小
706    #[inline]
707    pub fn change_chunk_size(&self, chunk_size: NonZeroUsize) -> Result<(), ChangeChunkSizeError> {
708        self.inner.change_chunk_size(chunk_size)
709    }
710
711    /// chunks 流,如果还真正的开始下载(获取了请求响应内容)会返回 None,可通过 `total_size_future().await` 等待获取它,避免得到 None
712    #[cfg(feature = "async-stream")]
713    #[inline]
714    pub fn chunks_stream(&self) -> Option<impl Stream<Item=Vec<Arc<ChunkItem>>> + 'static> {
715        self.inner.chunks_stream()
716    }
717
718    /// chunks 信息流,如果还真正的开始下载(获取了请求响应内容)会返回 None,可通过 `total_size_future().await` 等待获取它,避免得到 None
719    #[cfg(feature = "async-stream")]
720    #[inline]
721    pub fn chunks_info_stream(&self) -> Option<impl Stream<Item=ChunksInfo>> {
722        self.inner.chunks_info_stream()
723    }
724
725    /// 已下载长度
726    #[inline]
727    pub fn downloaded_len(&self) -> u64 {
728        self.inner.downloaded_len()
729    }
730
731    /// 总大小,会等待服务器响应,如果文件无大小则返回 None
732    #[inline]
733    pub fn total_size_future(&self) -> impl Future<Output=Option<NonZeroU64>> + 'static {
734        self.inner.total_size_future()
735    }
736
737    /// 总大小,如果文件无大小或者还没有得到服务器响应时返回 None
738    #[inline]
739    pub fn current_total_size(&self) -> Option<NonZeroU64> {
740        self.inner.current_total_size()
741    }
742
743    /// 总大小的`Arc`引用
744    #[inline]
745    pub fn atomic_total_size(&self) -> Arc<AtomicU64> {
746        self.inner.content_length.clone()
747    }
748
749    /// 获取 chunks
750    #[inline]
751    pub async fn get_chunks(&self) -> Vec<Arc<ChunkItem>> {
752        self.inner.get_chunks().await
753    }
754
755    /// 获取文件路径
756    #[inline]
757    pub fn get_file_path(&self) -> PathBuf {
758        self.inner.get_file_path()
759    }
760
761    /// 获取 DownloadingState,如果下载没有开始则返回 None
762    #[inline]
763    pub fn get_downloading_state(&self) -> Option<Weak<DownloadingState>> {
764        self.inner.get_downloading_state()
765    }
766
767    /// 配置
768    #[inline]
769    pub fn config(&self) -> &HttpDownloadConfig {
770        &self.inner.config
771    }
772
773    /// 已下载长度接收器
774    #[inline]
775    pub fn downloaded_len_receiver(&self) -> &sync::watch::Receiver<u64> {
776        &self.inner.downloaded_len_receiver
777    }
778
779    /// DownloadingState 接收器
780    pub fn downloading_state_receiver(&mut self) -> sync::oneshot::Receiver<Arc<DownloadingState>> {
781        let (sender, receiver) = sync::oneshot::channel();
782        self.inner.downloading_state_oneshot_vec.push(sender);
783        receiver
784    }
785}
786
787#[cfg(test)]
788mod tests {
789    use super::*;
790
791    fn sync_send<T: Send>() {}
792
793    fn sync_sync<T: Sync>() {}
794
795    #[test]
796    fn assert_sync_send() {
797        sync_send::<HttpFileDownloader>();
798        sync_sync::<HttpFileDownloader>();
799        sync_send::<ExtendedHttpFileDownloader>();
800        sync_sync::<ExtendedHttpFileDownloader>();
801    }
802}