1use std::future::Future;
2use std::io::SeekFrom;
3use std::num::{NonZeroU64, NonZeroU8, NonZeroUsize};
4use std::path::PathBuf;
5use std::sync::{Arc, Weak};
6use std::sync::atomic::{AtomicU64, Ordering};
7
8use anyhow::Result;
9use futures_util::future::BoxFuture;
10use futures_util::FutureExt;
11#[cfg(feature = "async-stream")]
12use futures_util::Stream;
13use headers::{Header, HeaderMapExt};
14use parking_lot::RwLock;
15use thiserror::Error;
16use tokio::{io, sync};
17use tokio::io::AsyncSeekExt;
18use tokio::sync::watch::error::SendError;
19use tokio::task::JoinError;
20use tokio::time::Instant;
21use tokio_util::sync::CancellationToken;
22
23use crate::{ChunkData, ChunkItem, ChunkIterator, ChunkManager, ChunksInfo, DownloadArchiveData, DownloadedLenChangeNotify, DownloaderWrapper, DownloadFuture, DownloadWay, HttpDownloadConfig, HttpRedirectionHandle, RemainingChunks, SingleDownload};
24use crate::exclusive::Exclusive;
25
26#[derive(Clone, Copy, Eq, PartialEq, Debug)]
27pub enum DownloadingEndCause {
28 DownloadFinished,
29 Cancelled,
30}
31
32#[derive(Error, Debug)]
33pub enum DownloadStartError {
34 #[error("file create failed,{:?}", .0)]
35 FileCrateFailed(#[from] io::Error),
36 #[error("{:?}", .0)]
37 Other(#[from] anyhow::Error),
38
39 #[error("already downloading")]
40 AlreadyDownloading,
41 #[error("Directory does not exist")]
42 DirectoryDoesNotExist,
43
44 #[cfg(feature = "status-tracker")]
45 #[error("Initializing")]
46 Initializing,
47 #[cfg(feature = "status-tracker")]
48 #[error("Starting")]
49 Starting,
50 #[cfg(feature = "status-tracker")]
51 #[error("Stopping")]
52 Stopping,
53}
54
55#[derive(Debug, Copy, Clone)]
56pub enum HttpResponseInvalidCause {
57 ContentLengthInvalid,
58 StatusCodeUnsuccessful,
59 RedirectionNoLocation,
60}
61
62#[derive(Error, Debug)]
63pub enum DownloadError {
64 #[error("{:?}", .0)]
65 Other(#[from] anyhow::Error),
66 #[error("ArchiveDataLoadError {:?}", .0)]
67 ArchiveDataLoadError(anyhow::Error),
68 #[error("IoError,{:?}", .0)]
69 IoError(#[from] io::Error),
70 #[error("JoinError,{:?}", .0)]
71 JoinError(#[from] JoinError),
72 #[error("chunk remove failed,{:?}", .0)]
73 ChunkRemoveFailed(usize),
74 #[error("downloading chunk remove failed,{:?}", .0)]
75 DownloadingChunkRemoveFailed(usize),
76 #[error("http request failed,{:?}", .0)]
77 HttpRequestFailed(#[from] reqwest::Error),
78 #[error("http request response invalid,{:?}", .0)]
79 HttpRequestResponseInvalid(HttpResponseInvalidCause, reqwest::Response),
80 #[error("The server file has changed.")]
81 ServerFileAlreadyChanged,
82 #[error("The redirection times are too many.")]
83 RedirectionTimesTooMany,
84}
85
86#[derive(Error, Debug)]
87pub enum DownloadToEndError {
88 #[error("{:?}", .0)]
89 DownloadStartError(#[from] DownloadStartError),
90 #[error("{:?}", .0)]
91 DownloadError(#[from] DownloadError),
92}
93
94#[derive(Error, Debug)]
95pub enum ChangeConnectionCountError {
96 #[error("SendError")]
97 SendError(#[from] SendError<u8>),
98 #[error("it is no start")]
99 NoStart,
100 #[error("The download target is not supported")]
101 DownloadTargetNotSupported,
102}
103
104#[derive(Error, Debug)]
105pub enum ChangeChunkSizeError {
106 #[error("it is no start")]
107 NoStart,
108 #[error("The download target is not supported")]
109 DownloadTargetNotSupported,
110}
111
112pub struct DownloadingState {
113 pub downloading_duration: u32,
114 pub download_instant: Instant,
115 pub download_way: DownloadWay,
116}
117
118impl DownloadingState {
119 pub fn get_current_downloading_duration(&self) -> u32 {
120 self.downloading_duration + self.download_instant.elapsed().as_secs() as u32
121 }
122}
123
124#[cfg(feature = "breakpoint-resume")]
125#[derive(Default)]
126pub struct BreakpointResume {
127 pub data_archive_notify: sync::Notify,
128 pub archive_complete_notify: sync::Notify,
129}
130
131pub struct HttpFileDownloader {
132 pub downloading_state_oneshot_vec: Vec<sync::oneshot::Sender<Arc<DownloadingState>>>,
133 pub downloaded_len_change_notify: Option<Arc<dyn DownloadedLenChangeNotify>>,
134 pub archive_data_future: Option<Exclusive<BoxFuture<'static, Result<Option<Box<DownloadArchiveData>>>>>>,
135 #[cfg(feature = "breakpoint-resume")]
136 pub breakpoint_resume: Option<Arc<BreakpointResume>>,
137 pub config: Arc<HttpDownloadConfig>,
138 pub downloaded_len_receiver: sync::watch::Receiver<u64>,
139 pub content_length: Arc<AtomicU64>,
140 client: reqwest::Client,
141 downloading_state: Arc<RwLock<
142 Option<(
143 sync::oneshot::Receiver<DownloadingEndCause>,
144 Arc<DownloadingState>,
145 )>,
146 >>,
147 downloaded_len_sender: Arc<sync::watch::Sender<u64>>,
148 pub cancel_token: CancellationToken,
149 total_size_semaphore: Arc<sync::Semaphore>,
150}
151
152impl HttpFileDownloader {
153 pub fn new(client: reqwest::Client, config: Arc<HttpDownloadConfig>) -> Self {
154 let cancel_token = config.cancel_token.clone().unwrap_or_default();
155 let (downloaded_len_sender, downloaded_len_receiver) = sync::watch::channel::<u64>(0);
156 let total_size_semaphore = Arc::new(sync::Semaphore::new(0));
157
158 Self {
159 downloading_state_oneshot_vec: vec![],
160 downloaded_len_change_notify: None,
161 archive_data_future: None,
162 #[cfg(feature = "breakpoint-resume")]
163 breakpoint_resume: None,
164 config,
165 total_size_semaphore,
166 content_length: Default::default(),
167 client,
168 downloading_state: Default::default(),
169 downloaded_len_receiver,
170 downloaded_len_sender: Arc::new(downloaded_len_sender),
171 cancel_token,
172 }
173 }
174
175 pub fn is_downloading(&self) -> bool {
176 self.downloading_state.read().is_some()
177 }
178
179 pub fn change_connection_count(
180 &self,
181 connection_count: NonZeroU8,
182 ) -> Result<(), ChangeConnectionCountError> {
183 match self.downloading_state.read().as_ref() {
184 None => Err(ChangeConnectionCountError::NoStart),
185 Some((_, downloading_state)) => match &downloading_state.download_way {
186 DownloadWay::Single(_) => {
187 Err(ChangeConnectionCountError::DownloadTargetNotSupported)
188 }
189 DownloadWay::Ranges(chunk_manager) => {
190 chunk_manager.change_connection_count(connection_count)?;
191 Ok(())
192 }
193 },
194 }
195 }
196 pub fn change_chunk_size(&self, chunk_size: NonZeroUsize) -> Result<(), ChangeChunkSizeError> {
197 match self.downloading_state.read().as_ref() {
198 None => Err(ChangeChunkSizeError::NoStart),
199 Some((_, downloading_state)) => match &downloading_state.download_way {
200 DownloadWay::Single(_) => Err(ChangeChunkSizeError::DownloadTargetNotSupported),
201 DownloadWay::Ranges(chunk_manager) => {
202 chunk_manager.change_chunk_size(chunk_size);
203 Ok(())
204 }
205 },
206 }
207 }
208
209 #[cfg(feature = "async-stream")]
210 pub fn downloaded_len_stream(&self) -> impl Stream<Item=u64> + 'static {
211 let mut downloaded_len_receiver = self.downloaded_len_receiver.clone();
212 let duration = self.config.downloaded_len_send_interval.clone();
213 async_stream::stream! {
214 let downloaded_len = *downloaded_len_receiver.borrow();
215 yield downloaded_len;
216 while downloaded_len_receiver.changed().await.is_ok() {
217 let downloaded_len = *downloaded_len_receiver.borrow();
218 yield downloaded_len;
219 if let Some(duration) = duration{
220 tokio::time::sleep(duration).await;
221 }
222 }
223 }
224 }
225
226 #[cfg(feature = "async-stream")]
227 pub fn chunks_stream(&self) -> Option<impl Stream<Item=Vec<Arc<ChunkItem>>> + 'static> {
228 match self.downloading_state.read().as_ref() {
229 None => {
230 None
232 }
233 Some((_, downloading_state)) => match &downloading_state.download_way {
234 DownloadWay::Single(_) => {
235 None
237 }
238 DownloadWay::Ranges(chunk_manager) => {
239 let mut downloaded_len_receiver = self.downloaded_len_receiver.clone();
240 let chunk_manager = chunk_manager.to_owned();
241 let duration = self.config.chunks_send_interval.clone();
242 Some(async_stream::stream! {
243 yield chunk_manager.get_chunks().await;
244 while downloaded_len_receiver.changed().await.is_ok() {
245 yield chunk_manager.get_chunks().await;
246 if let Some(duration) = duration {
247 tokio::time::sleep(duration).await;
248 }
249 }
250 })
251 }
252 },
253 }
254 }
255
256 #[cfg(feature = "async-stream")]
257 pub fn chunks_info_stream(&self) -> Option<impl Stream<Item=ChunksInfo> + 'static> {
258 match self.downloading_state.read().as_ref() {
259 None => {
260 None
262 }
263 Some((_, downloading_state)) => match &downloading_state.download_way {
264 DownloadWay::Single(_) => {
265 None
267 }
268 DownloadWay::Ranges(chunk_manager) => {
269 let mut downloaded_len_receiver = self.downloaded_len_receiver.clone();
270 let chunk_manager = chunk_manager.to_owned();
271 let duration = self.config.chunks_send_interval.clone();
272 Some(async_stream::stream! {
273 yield chunk_manager.get_chunks_info().await;
274 while downloaded_len_receiver.changed().await.is_ok() {
275 yield chunk_manager.get_chunks_info().await;
276 if let Some(duration) = duration {
277 tokio::time::sleep(duration).await;
278 }
279 }
280 })
281 }
282 },
283 }
284 }
285
286 pub fn downloaded_len(&self) -> u64 {
287 *self.downloaded_len_receiver.borrow()
288 }
289
290 pub fn total_size_future(&self) -> impl Future<Output=Option<NonZeroU64>> + 'static {
291 let total_size_semaphore = self.total_size_semaphore.clone();
292 let content_length = self.content_length.clone();
293 async move {
294 let _ = total_size_semaphore.acquire().await;
295 let content_length = content_length.load(Ordering::Relaxed);
296 if content_length == 0 {
297 None
298 } else {
299 Some(NonZeroU64::new(content_length).unwrap())
300 }
301 }
302 }
303
304 pub fn current_total_size(&self) -> Option<NonZeroU64> {
305 let content_length = self.content_length.load(Ordering::Relaxed);
306 if content_length == 0 {
307 None
308 } else {
309 Some(NonZeroU64::new(content_length).unwrap())
310 }
311 }
312
313 pub fn get_chunk_manager(&self) -> Option<Weak<ChunkManager>> {
314 self.get_downloading_state().and_then(|n|n.upgrade()).and_then(|downloading_state| {
315 if let DownloadWay::Ranges(item) = &downloading_state.download_way {
316 Some(Arc::downgrade(item))
317 } else {
318 None
319 }
320 })
321 }
322 pub fn get_downloading_state(&self) -> Option<Weak<DownloadingState>> {
323 let guard = &self.downloading_state.read();
324 guard.as_ref().map(|n| Arc::downgrade(&n.1))
325 }
326
327 pub async fn get_chunks(&self) -> Vec<Arc<ChunkItem>> {
328 match self.get_chunk_manager().and_then(|n|n.upgrade()) {
329 None => Vec::new(),
330 Some(n) => n.get_chunks().await,
331 }
332 }
333
334 pub async fn get_chunks_info(&self) -> Option<ChunksInfo> {
335 match self.get_chunk_manager().and_then(|n|n.upgrade()) {
336 None => None,
337 Some(n) => Some(n.get_chunks_info().await),
338 }
339 }
340
341 pub fn get_file_path(&self) -> PathBuf {
342 self.config.file_path()
343 }
344
345 fn reset(&self) {
346 self.downloaded_len_sender.send(0).unwrap_or_else(|_err| {
347 #[cfg(feature = "tracing")]
348 tracing::trace!("send downloaded_len failed! {}", _err);
349 });
350 }
351
352 pub(crate) fn download(
353 &mut self,
354 ) -> Result<
355 impl Future<Output=Result<DownloadingEndCause, DownloadError>> + Send + 'static,
356 DownloadStartError,
357 > {
358 self.reset();
359 if self.is_downloading() {
360 return Err(DownloadStartError::AlreadyDownloading);
361 }
362
363 if self.config.create_dir {
364 std::fs::create_dir_all(&self.config.save_dir)?;
365 } else if !self.config.save_dir.exists() {
366 return Err(DownloadStartError::DirectoryDoesNotExist);
367 }
368 Ok(self.start_download())
369 }
370
371 pub fn take_downloading_state(
372 &self,
373 ) -> Option<(
374 sync::oneshot::Receiver<DownloadingEndCause>,
375 Arc<DownloadingState>,
376 )> {
377 let mut guard = self.downloading_state.write();
378 guard.take()
379 }
380
381 pub fn cancel(&self) -> impl Future<Output=()> + 'static {
382 let downloading_state = self.downloading_state.clone();
383 let token = self.cancel_token.clone();
384 async move {
385 let (receiver, _) = {
386 let mut guard = downloading_state.write();
387 let option = guard.take();
388 if option.is_none() {
389 return;
390 }
391 option.unwrap()
392 };
393 token.cancel();
394 receiver.await.unwrap();
395 }
396 }
397
398 fn start_download(
400 &mut self,
401 ) -> impl Future<Output=Result<DownloadingEndCause, DownloadError>> + 'static {
402 if self.cancel_token.is_cancelled() {
403 self.cancel_token = CancellationToken::new();
404 }
405 let config = self.config.clone();
406 let client = self.client.clone();
407 let total_size_semaphore = self.total_size_semaphore.clone();
408 let content_length_arc = self.content_length.clone();
409 let downloading_state = self.downloading_state.clone();
410 let downloaded_len_change_notify = self.downloaded_len_change_notify.take();
411 let archive_data_future = self.archive_data_future.take();
412 let downloading_state_oneshot_vec: Vec<sync::oneshot::Sender<Arc<DownloadingState>>> = self.downloading_state_oneshot_vec.drain(..).collect();
413 let downloaded_len_sender = self.downloaded_len_sender.clone();
414 let cancel_token = self.cancel_token.clone();
415 #[cfg(feature = "breakpoint-resume")]
416 let breakpoint_resume = self.breakpoint_resume.take();
417
418
419 async move {
420 fn request<'a>(client: &'a reqwest::Client, config: &'a HttpDownloadConfig, location: Option<String>, redirection_times: usize) -> BoxFuture<'a, Result<(reqwest::Response, Option<String>), DownloadError>> {
421 async move {
422 if let HttpRedirectionHandle::RequestNewLocation { max_times } = config.handle_redirection {
423 if redirection_times >= max_times {
424 return Err(DownloadError::RedirectionTimesTooMany);
425 }
426 }
427 let mut retry_count = 0;
428 let response = loop {
429 let response = client.execute(config.create_http_request(location.as_ref().map(|n| n.as_str())))
430 .await.and_then(|n| n.error_for_status());
431
432 if response.is_err() && retry_count < config.request_retry_count {
433 retry_count += 1;
434 #[cfg(feature = "tracing")]
435 tracing::trace!(
436 "Request error! {:?},retry_info: {}/{}",
437 response.unwrap_err(),
438 retry_count,
439 config.request_retry_count
440 );
441 continue;
442 }
443 break response;
444 };
445 match response {
447 Ok(response) if config.handle_redirection != HttpRedirectionHandle::Invalid && response.status().is_redirection() => {
448 let Some(location) = response.headers().get(headers::Location::name()) else {
449 return Err(DownloadError::HttpRequestResponseInvalid(HttpResponseInvalidCause::RedirectionNoLocation, response));
450 };
451 let Ok(location) = location.to_str().map(|n| n.to_string()) else {
452 return Err(DownloadError::HttpRequestResponseInvalid(HttpResponseInvalidCause::RedirectionNoLocation, response));
453 };
454 println!("handle_redirection!!!!!!! {}",location);
455 request(client, config, Some(location), redirection_times + 1).await
456 }
457 Ok(response) if !response.status().is_success() => {
458 Err(DownloadError::HttpRequestResponseInvalid(HttpResponseInvalidCause::StatusCodeUnsuccessful, response))
459 }
460 Err(err) => {
461 Err(DownloadError::HttpRequestFailed(err))
462 }
463 Ok(response) => Ok((response, location)),
464 }
465 }.boxed()
466 }
467
468 let (end_sender, end_receiver) = sync::oneshot::channel();
469 let dec = {
470 let (response, location) = match request(&client, &config, None, 0).await {
471 Ok(r) => r,
472 Err(err) => {
473 total_size_semaphore.add_permits(1);
474 return Err(err.into());
475 }
476 };
477 let etag = {
478 if config.etag.is_some() {
479 let cur_etag = response.headers().typed_get::<headers::ETag>();
480 if cur_etag == config.etag {
481 #[cfg(feature = "tracing")]
482 tracing::trace!(
483 "etag mismatching,your etag: {:?} , current etag:{:?}",
484 config.etag,
485 cur_etag
486 );
487 total_size_semaphore.add_permits(1);
488 return Err(DownloadError::ServerFileAlreadyChanged);
489 }
490 cur_etag
491 } else {
492 None
493 }
494 };
495 let content_length = response
496 .headers()
497 .typed_get::<headers::ContentLength>()
498 .map(|n| n.0)
499 .and_then(|n| if n == 0 { None } else { Some(n) });
500
501 if let Some(0) = content_length {
502 total_size_semaphore.add_permits(1);
503 return Err(DownloadError::HttpRequestResponseInvalid(HttpResponseInvalidCause::ContentLengthInvalid, response));
504 }
505 content_length_arc.store(content_length.unwrap_or(0), Ordering::Relaxed);
506
507 let accept_ranges = response.headers().typed_get::<headers::AcceptRanges>();
508
509 let is_ranges_bytes_none = accept_ranges.is_none();
510 let is_ranges_bytes =
511 !is_ranges_bytes_none && accept_ranges.unwrap() == headers::AcceptRanges::bytes();
512 let archive_data = match archive_data_future {
513 None => { None }
514 Some(archive_data_future) => {
515 archive_data_future.await.map_err(DownloadError::ArchiveDataLoadError)?
516 }
517 };
518 let downloading_duration = archive_data.as_ref()
519 .map(|n| n.downloading_duration)
520 .unwrap_or(0);
521 let download_way = {
522 if content_length.is_some()
523 && (if config.strict_check_accept_ranges {
524 is_ranges_bytes
525 } else {
526 is_ranges_bytes_none || is_ranges_bytes
527 })
528 {
529 let content_length = content_length.unwrap();
530 let chunk_data = archive_data
531 .and_then(|archive_data| {
532 downloaded_len_sender
533 .send(archive_data.downloaded_len)
534 .unwrap_or_else(|_err| {
535 #[cfg(feature = "tracing")]
536 tracing::error!("send downloaded_len failed! {}", _err);
537 });
538 archive_data.chunk_data.map(|mut data| {
539 data.remaining.chunk_size = config.chunk_size.get();
540 data
541 })
542 })
543 .unwrap_or_else(|| ChunkData {
544 iter_count: 0,
545 remaining: RemainingChunks::new(config.chunk_size, content_length),
546 last_incomplete_chunks: Default::default(),
547 });
548
549 let chunk_iterator = ChunkIterator::new(content_length, chunk_data);
550 let chunk_manager = Arc::new(ChunkManager::new(
551 config.download_connection_count,
552 client,
553 cancel_token,
554 downloaded_len_sender,
555 chunk_iterator,
556 etag,
557 config.request_retry_count,
558 ));
559 DownloadWay::Ranges(chunk_manager)
560 } else {
561 DownloadWay::Single(SingleDownload::new(
562 cancel_token,
563 downloaded_len_sender,
564 content_length,
565 ))
566 }
567 };
568
569 let state = DownloadingState {
570 downloading_duration,
571 download_instant: Instant::now(),
572 download_way,
573 };
574
575
576 let state = Arc::new(state);
577 {
578 let mut guard = downloading_state.write();
579 *guard = Some((end_receiver, state.clone()));
580 }
581
582 total_size_semaphore.add_permits(1);
583
584 let file = {
585 let mut options = std::fs::OpenOptions::new();
586 (config.open_option)(&mut options);
587 let mut file = tokio::fs::OpenOptions::from(options)
588 .open(config.file_path())
589 .await?;
590 if config.set_len_in_advance {
591 file.set_len(content_length.unwrap()).await?
592 }
593 file.seek(SeekFrom::Start(0)).await?;
594 file
595 };
596
597 for oneshot in downloading_state_oneshot_vec.into_iter() {
598 oneshot.send(state.clone()).unwrap_or_else(|_| {
599 #[cfg(feature = "tracing")]
600 tracing::trace!("send download_way failed!");
601 });
602 }
603
604 let dec_result = match &state.download_way {
605 DownloadWay::Ranges(item) => {
606 let request = Box::new(config.create_http_request(location.as_ref().map(|n| n.as_str())));
607 item.start_download(
608 file,
609 request,
610 downloaded_len_change_notify,
611 #[cfg(feature = "breakpoint-resume")]
612 breakpoint_resume,
613 )
614 .await
615 }
616 DownloadWay::Single(item) => {
617 item.download(
618 file,
619 Box::new(response),
620 downloaded_len_change_notify,
621 config.chunk_size.get(),
622 )
623 .await
624 }
625 };
626
627 if {
628 let r = downloading_state.read().is_some();
629 r
630 } {
631 let mut guard = downloading_state.write();
632 *guard = None;
633 }
634
635 dec_result?
636 };
637
638 end_sender.send(dec).unwrap_or_else(|_err| {
639 #[cfg(feature = "tracing")]
640 tracing::trace!("DownloadingEndCause Send Failed! {:?}", _err);
641 });
642
643 Ok(dec)
644 }
645 }
646}
647
648pub struct ExtendedHttpFileDownloader {
649 pub inner: HttpFileDownloader,
650 downloader_wrapper: Box<dyn DownloaderWrapper>,
651}
652
653impl ExtendedHttpFileDownloader {
654 pub fn new(
655 downloader: HttpFileDownloader,
656 downloader_wrapper: Box<dyn DownloaderWrapper>,
657 ) -> Self {
658 Self {
659 inner: downloader,
660 downloader_wrapper,
661 }
662 }
663
664 pub fn prepare_download(&mut self) -> Result<DownloadFuture, DownloadStartError> {
666 self.downloader_wrapper.prepare_download(&mut self.inner)?;
667 let prepare_download_result = self.inner.download();
668 let download_future = self.downloader_wrapper.handle_prepare_download_result(&mut self.inner, prepare_download_result.map(|n| n.boxed()))?;
669
670 self.downloader_wrapper.download(&mut self.inner, download_future)
671 }
672
673 pub fn cancel(&self) -> impl Future<Output=()> + 'static {
675 let cancel = self.downloader_wrapper.on_cancel();
676 let cancel_future = self.inner.cancel();
677 async move {
678 cancel.await;
679 cancel_future.await;
680 }
681 }
682
683 #[inline]
685 pub fn is_downloading(&self) -> bool {
686 self.inner.is_downloading()
687 }
688
689 #[cfg(feature = "async-stream")]
691 #[inline]
692 pub fn downloaded_len_stream(&self) -> impl Stream<Item=u64> + 'static {
693 self.inner.downloaded_len_stream()
694 }
695
696 #[inline]
698 pub fn change_connection_count(
699 &self,
700 connection_count: NonZeroU8,
701 ) -> Result<(), ChangeConnectionCountError> {
702 self.inner.change_connection_count(connection_count)
703 }
704
705 #[inline]
707 pub fn change_chunk_size(&self, chunk_size: NonZeroUsize) -> Result<(), ChangeChunkSizeError> {
708 self.inner.change_chunk_size(chunk_size)
709 }
710
711 #[cfg(feature = "async-stream")]
713 #[inline]
714 pub fn chunks_stream(&self) -> Option<impl Stream<Item=Vec<Arc<ChunkItem>>> + 'static> {
715 self.inner.chunks_stream()
716 }
717
718 #[cfg(feature = "async-stream")]
720 #[inline]
721 pub fn chunks_info_stream(&self) -> Option<impl Stream<Item=ChunksInfo>> {
722 self.inner.chunks_info_stream()
723 }
724
725 #[inline]
727 pub fn downloaded_len(&self) -> u64 {
728 self.inner.downloaded_len()
729 }
730
731 #[inline]
733 pub fn total_size_future(&self) -> impl Future<Output=Option<NonZeroU64>> + 'static {
734 self.inner.total_size_future()
735 }
736
737 #[inline]
739 pub fn current_total_size(&self) -> Option<NonZeroU64> {
740 self.inner.current_total_size()
741 }
742
743 #[inline]
745 pub fn atomic_total_size(&self) -> Arc<AtomicU64> {
746 self.inner.content_length.clone()
747 }
748
749 #[inline]
751 pub async fn get_chunks(&self) -> Vec<Arc<ChunkItem>> {
752 self.inner.get_chunks().await
753 }
754
755 #[inline]
757 pub fn get_file_path(&self) -> PathBuf {
758 self.inner.get_file_path()
759 }
760
761 #[inline]
763 pub fn get_downloading_state(&self) -> Option<Weak<DownloadingState>> {
764 self.inner.get_downloading_state()
765 }
766
767 #[inline]
769 pub fn config(&self) -> &HttpDownloadConfig {
770 &self.inner.config
771 }
772
773 #[inline]
775 pub fn downloaded_len_receiver(&self) -> &sync::watch::Receiver<u64> {
776 &self.inner.downloaded_len_receiver
777 }
778
779 pub fn downloading_state_receiver(&mut self) -> sync::oneshot::Receiver<Arc<DownloadingState>> {
781 let (sender, receiver) = sync::oneshot::channel();
782 self.inner.downloading_state_oneshot_vec.push(sender);
783 receiver
784 }
785}
786
787#[cfg(test)]
788mod tests {
789 use super::*;
790
791 fn sync_send<T: Send>() {}
792
793 fn sync_sync<T: Sync>() {}
794
795 #[test]
796 fn assert_sync_send() {
797 sync_send::<HttpFileDownloader>();
798 sync_sync::<HttpFileDownloader>();
799 sync_send::<ExtendedHttpFileDownloader>();
800 sync_sync::<ExtendedHttpFileDownloader>();
801 }
802}