http_downloader/
chunk_item.rs

1use std::io::SeekFrom;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use anyhow::Result;
6use bytes::Bytes;
7use futures_util::future::{BoxFuture, OptionFuture};
8use futures_util::StreamExt;
9use headers::HeaderMapExt;
10use reqwest::Request;
11use tokio::fs::File;
12use tokio::io::{AsyncSeekExt, AsyncWriteExt};
13use tokio::select;
14use tokio::sync::Mutex;
15use tokio_util::sync::CancellationToken;
16#[cfg(feature = "tracing")]
17use tracing::Instrument;
18
19use crate::{ChunkInfo, ChunkManager, ChunkRange, DownloadError, DownloadingEndCause};
20
21pub trait DownloadedLenChangeNotify: Send + Sync {
22    fn receive_len(&self, len: usize) -> OptionFuture<BoxFuture<()>>;
23}
24
25pub struct ChunkItem {
26    pub chunk_info: ChunkInfo,
27    pub downloaded_len: AtomicU64,
28    cancel_token: CancellationToken,
29    client: reqwest::Client,
30    file: Arc<Mutex<File>>,
31    etag: Option<headers::ETag>,
32}
33
34impl ChunkItem {
35    pub fn new(
36        chunk_info: ChunkInfo,
37        cancel_token: CancellationToken,
38        client: reqwest::Client,
39        file: Arc<Mutex<File>>,
40        etag: Option<headers::ETag>,
41    ) -> Self {
42        Self {
43            downloaded_len: AtomicU64::new(0),
44            cancel_token,
45            client,
46            chunk_info,
47            file,
48            etag,
49        }
50    }
51
52    #[inline]
53    fn add_downloaded_len(&self, len: usize) {
54        self.downloaded_len.fetch_add(len as u64, Ordering::Relaxed);
55        debug_assert!(
56            self.downloaded_len.load(Ordering::SeqCst) <= self.chunk_info.range.len(),
57            "downloaded_len:{},chunk_info.range.len():{}",
58            self.downloaded_len.load(Ordering::SeqCst),
59            self.chunk_info.range.len()
60        );
61    }
62
63    #[cfg_attr(feature = "tracing", tracing::instrument(name = "download chunk", skip_all, fields(chunk_index = self.chunk_info.index)))]
64    pub(crate) async fn download_chunk(
65        self: Arc<Self>,
66        mut request: Box<Request>,
67        retry_count: u8,
68        downloaded_len_receiver: Option<impl DownloadedLenChangeNotify>,
69    ) -> Result<DownloadingEndCause, DownloadError> {
70        let cancel_token = self.cancel_token.clone();
71        let mut chunk_bytes = Vec::with_capacity(self.chunk_info.range.len() as usize);
72
73        let mut cur_retry_count = 0;
74        let future = async {
75            'r: loop {
76                request.headers_mut().typed_insert(
77                    ChunkRange::new(
78                        self.chunk_info.range.start + chunk_bytes.len() as u64,
79                        self.chunk_info.range.end,
80                    )
81                        .to_range_header(),
82                );
83                // 避免 clone request ?
84                let response = self.client.execute(*ChunkManager::clone_request(&request));
85                #[cfg(feature = "tracing")]
86                    let response = response.instrument(tracing::info_span!("chunk's http request"));
87                let response = match response.await {
88                    Ok(response) => {
89                        cur_retry_count = 0;
90                        response
91                    }
92                    Err(err) => {
93                        cur_retry_count += 1;
94                        #[cfg(feature = "tracing")]
95                        tracing::trace!(
96                            "Request error! {:?},retry_info: {}/{}",
97                            err,
98                            cur_retry_count,
99                            retry_count
100                        );
101                        if cur_retry_count > retry_count {
102                            return Err(DownloadError::HttpRequestFailed(err));
103                        }
104                        continue 'r;
105                    }
106                };
107                if self.etag.is_some() {
108                    let etag = response.headers().typed_get::<headers::ETag>();
109                    if etag != self.etag {
110                        #[cfg(feature = "tracing")]
111                        tracing::trace!(
112                            "etag mismatching,your etag: {:?} , current etag:{:?}",
113                            self.etag,
114                            etag
115                        );
116                        return Err(DownloadError::ServerFileAlreadyChanged);
117                    }
118                }
119                let mut stream = response.bytes_stream();
120                while let Some(bytes) = stream.next().await {
121                    #[cfg(feature = "tracing")]
122                        let span = tracing::info_span!("process received bytes", is_ok = bytes.is_ok());
123                    #[cfg(feature = "tracing")]
124                        let _ = span.enter();
125                    let bytes: Bytes = {
126                        match bytes {
127                            Ok(bytes) => {
128                                cur_retry_count = 0;
129                                bytes
130                            }
131                            Err(err) => {
132                                cur_retry_count += 1;
133                                #[cfg(feature = "tracing")]
134                                tracing::trace!(
135                                    "Request error! {:?},retry_info: {}/{}",
136                                    err,
137                                    cur_retry_count,
138                                    retry_count
139                                );
140                                if cur_retry_count > retry_count {
141                                    // 出错后与取消一样处理:将缓冲中的数据写入磁盘并持久化数据
142                                    let mut file = self.file.lock().await;
143                                    file.seek(SeekFrom::Start(self.chunk_info.range.start))
144                                        .await?;
145                                    debug_assert!(
146                                        chunk_bytes.len() as u64 <= self.chunk_info.range.len(),
147                                        "chunk_bytes.len() = {}, self.chunk_info.range.len() = {}",
148                                        chunk_bytes.len(),
149                                        self.chunk_info.range.len()
150                                    );
151                                    file.write_all(chunk_bytes.as_ref()).await?;
152                                    file.flush().await?;
153                                    file.sync_all().await?;
154                                    return Err(DownloadError::HttpRequestFailed(err));
155                                }
156                                continue 'r;
157                            }
158                        }
159                    };
160                    let len = bytes.len();
161                    chunk_bytes.extend(bytes);
162                    self.add_downloaded_len(len);
163                    if let Some(downloaded_len_receiver) = downloaded_len_receiver.as_ref() {
164                        downloaded_len_receiver.receive_len(len).await;
165                    }
166                }
167                break;
168            }
169            Result::<(), DownloadError>::Ok(())
170        };
171
172        select! {
173            r = future => {
174                r?;
175                let mut file = self.file.lock().await;
176                file.seek(SeekFrom::Start(self.chunk_info.range.start)).await?;
177                debug_assert_eq!(chunk_bytes.len() as u64,self.chunk_info.range.len());
178                file.write_all(chunk_bytes.as_ref()).await?;
179                file.flush().await?;
180                file.sync_all().await?;
181                Ok(DownloadingEndCause::DownloadFinished)
182            }
183            _ = cancel_token.cancelled() => {
184                let mut file = self.file.lock().await;
185                file.seek(SeekFrom::Start(self.chunk_info.range.start)).await?;
186                debug_assert!(chunk_bytes.len() as u64 <= self.chunk_info.range.len(),"chunk_bytes.len() = {}, self.chunk_info.range.len() = {}", chunk_bytes.len(), self.chunk_info.range.len());
187                file.write_all(chunk_bytes.as_ref()).await?;
188                file.flush().await?;
189                file.sync_all().await?;
190                Ok(DownloadingEndCause::Cancelled)
191            }
192        }
193    }
194    /*
195        pub(crate) fn start_download(
196            self: Arc<Self>,
197            request: Box<Request>,
198            retry_count: u8,
199        ) -> DownloadedChunkItem {
200            use futures_util::FutureExt;
201
202            let chunk_item = self.clone();
203            let join_handle = tokio::spawn(self.download_chunk(request, retry_count).then(
204                |result| async move {
205                    match result {
206                        Ok(is_finished) => {
207                            if is_finished {
208                                self.send_message(ChunkMessageKind::DownloadFinished)
209                                    .await
210                                    .unwrap_or_else(|_err| {
211                                        #[cfg(feature = "tracing")]
212                                        tracing::trace!("ChunkMessageInfoSendFailed! {:?}", _err);
213                                    })
214                            }
215                        }
216                        Err(err) => self
217                            .send_message(ChunkMessageKind::Error(err))
218                            .await
219                            .unwrap_or_else(|_err| {
220                                #[cfg(feature = "tracing")]
221                                tracing::trace!("ChunkMessageInfoSendFailed! {:?}", _err);
222                            }),
223                    };
224                },
225            ));
226            DownloadedChunkItem::new(chunk_item, join_handle)
227        }*/
228}
229
230/*pub struct DownloadedChunkItem {
231    pub chunk_item: Arc<ChunkItem>,
232    pub join_handle: JoinHandle<()>,
233}
234
235impl DownloadedChunkItem {
236    pub fn new(chunk_item: Arc<ChunkItem>, join_handle: JoinHandle<()>) -> Self {
237        Self {
238            chunk_item,
239            join_handle,
240        }
241    }
242}
243
244impl Deref for DownloadedChunkItem {
245    type Target = Arc<ChunkItem>;
246
247    fn deref(&self) -> &Self::Target {
248        &self.chunk_item
249    }
250}*/