http_downloader/
chunk_item.rs1use std::io::SeekFrom;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use anyhow::Result;
6use bytes::Bytes;
7use futures_util::future::{BoxFuture, OptionFuture};
8use futures_util::StreamExt;
9use headers::HeaderMapExt;
10use reqwest::Request;
11use tokio::fs::File;
12use tokio::io::{AsyncSeekExt, AsyncWriteExt};
13use tokio::select;
14use tokio::sync::Mutex;
15use tokio_util::sync::CancellationToken;
16#[cfg(feature = "tracing")]
17use tracing::Instrument;
18
19use crate::{ChunkInfo, ChunkManager, ChunkRange, DownloadError, DownloadingEndCause};
20
21pub trait DownloadedLenChangeNotify: Send + Sync {
22 fn receive_len(&self, len: usize) -> OptionFuture<BoxFuture<()>>;
23}
24
25pub struct ChunkItem {
26 pub chunk_info: ChunkInfo,
27 pub downloaded_len: AtomicU64,
28 cancel_token: CancellationToken,
29 client: reqwest::Client,
30 file: Arc<Mutex<File>>,
31 etag: Option<headers::ETag>,
32}
33
34impl ChunkItem {
35 pub fn new(
36 chunk_info: ChunkInfo,
37 cancel_token: CancellationToken,
38 client: reqwest::Client,
39 file: Arc<Mutex<File>>,
40 etag: Option<headers::ETag>,
41 ) -> Self {
42 Self {
43 downloaded_len: AtomicU64::new(0),
44 cancel_token,
45 client,
46 chunk_info,
47 file,
48 etag,
49 }
50 }
51
52 #[inline]
53 fn add_downloaded_len(&self, len: usize) {
54 self.downloaded_len.fetch_add(len as u64, Ordering::Relaxed);
55 debug_assert!(
56 self.downloaded_len.load(Ordering::SeqCst) <= self.chunk_info.range.len(),
57 "downloaded_len:{},chunk_info.range.len():{}",
58 self.downloaded_len.load(Ordering::SeqCst),
59 self.chunk_info.range.len()
60 );
61 }
62
63 #[cfg_attr(feature = "tracing", tracing::instrument(name = "download chunk", skip_all, fields(chunk_index = self.chunk_info.index)))]
64 pub(crate) async fn download_chunk(
65 self: Arc<Self>,
66 mut request: Box<Request>,
67 retry_count: u8,
68 downloaded_len_receiver: Option<impl DownloadedLenChangeNotify>,
69 ) -> Result<DownloadingEndCause, DownloadError> {
70 let cancel_token = self.cancel_token.clone();
71 let mut chunk_bytes = Vec::with_capacity(self.chunk_info.range.len() as usize);
72
73 let mut cur_retry_count = 0;
74 let future = async {
75 'r: loop {
76 request.headers_mut().typed_insert(
77 ChunkRange::new(
78 self.chunk_info.range.start + chunk_bytes.len() as u64,
79 self.chunk_info.range.end,
80 )
81 .to_range_header(),
82 );
83 let response = self.client.execute(*ChunkManager::clone_request(&request));
85 #[cfg(feature = "tracing")]
86 let response = response.instrument(tracing::info_span!("chunk's http request"));
87 let response = match response.await {
88 Ok(response) => {
89 cur_retry_count = 0;
90 response
91 }
92 Err(err) => {
93 cur_retry_count += 1;
94 #[cfg(feature = "tracing")]
95 tracing::trace!(
96 "Request error! {:?},retry_info: {}/{}",
97 err,
98 cur_retry_count,
99 retry_count
100 );
101 if cur_retry_count > retry_count {
102 return Err(DownloadError::HttpRequestFailed(err));
103 }
104 continue 'r;
105 }
106 };
107 if self.etag.is_some() {
108 let etag = response.headers().typed_get::<headers::ETag>();
109 if etag != self.etag {
110 #[cfg(feature = "tracing")]
111 tracing::trace!(
112 "etag mismatching,your etag: {:?} , current etag:{:?}",
113 self.etag,
114 etag
115 );
116 return Err(DownloadError::ServerFileAlreadyChanged);
117 }
118 }
119 let mut stream = response.bytes_stream();
120 while let Some(bytes) = stream.next().await {
121 #[cfg(feature = "tracing")]
122 let span = tracing::info_span!("process received bytes", is_ok = bytes.is_ok());
123 #[cfg(feature = "tracing")]
124 let _ = span.enter();
125 let bytes: Bytes = {
126 match bytes {
127 Ok(bytes) => {
128 cur_retry_count = 0;
129 bytes
130 }
131 Err(err) => {
132 cur_retry_count += 1;
133 #[cfg(feature = "tracing")]
134 tracing::trace!(
135 "Request error! {:?},retry_info: {}/{}",
136 err,
137 cur_retry_count,
138 retry_count
139 );
140 if cur_retry_count > retry_count {
141 let mut file = self.file.lock().await;
143 file.seek(SeekFrom::Start(self.chunk_info.range.start))
144 .await?;
145 debug_assert!(
146 chunk_bytes.len() as u64 <= self.chunk_info.range.len(),
147 "chunk_bytes.len() = {}, self.chunk_info.range.len() = {}",
148 chunk_bytes.len(),
149 self.chunk_info.range.len()
150 );
151 file.write_all(chunk_bytes.as_ref()).await?;
152 file.flush().await?;
153 file.sync_all().await?;
154 return Err(DownloadError::HttpRequestFailed(err));
155 }
156 continue 'r;
157 }
158 }
159 };
160 let len = bytes.len();
161 chunk_bytes.extend(bytes);
162 self.add_downloaded_len(len);
163 if let Some(downloaded_len_receiver) = downloaded_len_receiver.as_ref() {
164 downloaded_len_receiver.receive_len(len).await;
165 }
166 }
167 break;
168 }
169 Result::<(), DownloadError>::Ok(())
170 };
171
172 select! {
173 r = future => {
174 r?;
175 let mut file = self.file.lock().await;
176 file.seek(SeekFrom::Start(self.chunk_info.range.start)).await?;
177 debug_assert_eq!(chunk_bytes.len() as u64,self.chunk_info.range.len());
178 file.write_all(chunk_bytes.as_ref()).await?;
179 file.flush().await?;
180 file.sync_all().await?;
181 Ok(DownloadingEndCause::DownloadFinished)
182 }
183 _ = cancel_token.cancelled() => {
184 let mut file = self.file.lock().await;
185 file.seek(SeekFrom::Start(self.chunk_info.range.start)).await?;
186 debug_assert!(chunk_bytes.len() as u64 <= self.chunk_info.range.len(),"chunk_bytes.len() = {}, self.chunk_info.range.len() = {}", chunk_bytes.len(), self.chunk_info.range.len());
187 file.write_all(chunk_bytes.as_ref()).await?;
188 file.flush().await?;
189 file.sync_all().await?;
190 Ok(DownloadingEndCause::Cancelled)
191 }
192 }
193 }
194 }
229
230