downloader_rs/
downloader.rs1use std::ops::Deref;
2use std::sync::Arc;
3use std::time::Duration;
4use reqwest::Client;
5use parking_lot::RwLock;
6use tokio::{fs, spawn};
7use tokio::sync::watch::Receiver;
8use tokio::task::JoinHandle;
9use tokio::time::sleep;
10use tokio_util::sync::CancellationToken;
11use crate::download_status::{DownloadStatus};
12use crate::download_configuration::DownloadConfiguration;
13use crate::download_sender::DownloadSender;
14use crate::{chunk, chunk_hub, remote_file};
15use crate::error::DownloadError;
16use crate::verify::file_verify::FileVerify;
17use crate::verify::file_verify;
18
19pub struct Downloader {
20 config: Arc<DownloadConfiguration>,
21 client: Arc<Client>,
22 download_status: Arc<RwLock<DownloadStatus>>,
23 cancel_token: CancellationToken,
24 sender: Arc<DownloadSender>,
25 thread_handle: RwLock<Option<JoinHandle<()>>>,
26}
27
28impl Downloader {
29 pub fn new(config: DownloadConfiguration, client: Arc<Client>, sender: Arc<DownloadSender>) -> Downloader {
30 let config = Arc::new(config);
31 let downloader = Downloader {
32 config: config.clone(),
33 client,
34 download_status: Arc::new(RwLock::new(DownloadStatus::None)),
35 cancel_token: CancellationToken::new(),
36 sender,
37 thread_handle: RwLock::new(None),
38 };
39 downloader
40 }
41
42 pub fn start_download(&self) {
43 let config = self.config.clone();
44 let client = self.client.clone();
45 let cancel_token = self.cancel_token.clone();
46 let sender = self.sender.clone();
47 let download_status = self.download_status.clone();
48 let handle = spawn(async move {
49 if let Err(e) = start_download_file(config.clone(),
50 client.clone(),
51 cancel_token.clone(),
52 sender.clone(),
53 download_status.clone()).await {
54 sender.error_sender.send(e).unwrap();
55 *download_status.write() = DownloadStatus::Failed;
56 return;
57 }
58
59 if cancel_token.is_cancelled() {
60 return;
61 }
62
63 if !config.download_in_memory {
64 if config.file_verify != FileVerify::None {
65 *download_status.write() = DownloadStatus::FileVerify;
66 if let Err(e) = file_verify::file_validate(&config.file_verify, config.get_file_temp_path()).await {
67 sender.error_sender.send(e).unwrap();
68 *download_status.write() = DownloadStatus::Failed;
69 return;
70 }
71 }
72
73 if let Err(e) = fs::rename(config.get_file_temp_path(), config.get_file_path()).await {
74 sender.error_sender.send(DownloadError::FileRename(format!("file rename failed {}", e))).unwrap();
75 *download_status.write() = DownloadStatus::Failed;
76 return;
77 }
78 }
79
80 *download_status.write() = DownloadStatus::Complete;
81 });
82 *self.thread_handle.write() = Some(handle);
83 }
84
85 pub fn is_done(&self) -> bool {
86 if let Some(handle) = self.thread_handle.read().as_ref() {
87 return handle.is_finished();
88 }
89 return false;
90 }
91
92 pub fn status(&self) -> DownloadStatus {
93 *self.download_status.read()
94 }
95
96 pub async fn is_pending_async(&self) -> bool {
97 return *self.download_status.read() == DownloadStatus::Pending;
98 }
99
100 pub fn pending(&mut self) {
101 *self.download_status.write() = DownloadStatus::Pending;
102 }
103
104 pub async fn pending_async(&self) {
105 *self.download_status.write() = DownloadStatus::Pending;
106 }
107
108 pub fn stop(&self) {
109 self.cancel_token.cancel();
110 *self.download_status.write() = DownloadStatus::Stop;
111 }
112
113 pub async fn stop_async(&self) {
114 self.cancel_token.cancel();
115 *self.download_status.write() = DownloadStatus::Stop;
116 }
117}
118
119async fn start_download_file(
120 config: Arc<DownloadConfiguration>,
121 client: Arc<Client>,
122 cancel_token: CancellationToken,
123 sender: Arc<DownloadSender>,
124 status: Arc<RwLock<DownloadStatus>>) -> crate::error::Result<()> {
125 if cancel_token.is_cancelled() {
126 return Ok(());
127 }
128
129 *status.write() = DownloadStatus::Head;
130
131 let remote_file = remote_file::head(&client, &config).await?;
132
133 if cancel_token.is_cancelled() {
134 return Ok(());
135 }
136
137 *status.write() = DownloadStatus::Download;
138
139 let _ = sender.download_total_size_sender.send(remote_file.total_length);
140
141 let (chunks, receivers) = chunk_hub::validate(&config, remote_file).await?;
142
143 let mut handles = Vec::with_capacity(chunks.len());
144 let chunk_length = chunks.len();
145 for chunk in chunks {
146 if chunk.valid {
147 continue;
148 }
149 let sender = sender.clone();
150 let handle = spawn(
151 chunk::start_download(
152 config.clone(),
153 client.clone(),
154 chunk,
155 sender,
156 cancel_token.clone())
157 );
158 handles.push(handle);
159 }
160
161 fn sync_downloaded_size(receivers: &Arc<Vec<Receiver<u64>>>, sender: &DownloadSender) {
162 let mut downloaded_size = 0u64;
163 for receiver in receivers.deref() {
164 downloaded_size += *receiver.borrow();
165 }
166 let _ = sender.downloaded_size_sender.send(downloaded_size);
167 }
168
169 let receivers = Arc::new(receivers);
170 let sync_downloaded_canceltoken = CancellationToken::new();
171 let downloaded_size_handle = {
172 sync_downloaded_size(&receivers, &sender);
173 let cancel_token = sync_downloaded_canceltoken.clone();
174 let sender = sender.clone();
175 let receivers = receivers.clone();
176 let handle = spawn(async move {
177 loop {
178 if cancel_token.is_cancelled() {
179 break;
180 }
181 sync_downloaded_size(&receivers, &sender);
182 sleep(Duration::from_millis(100)).await;
183 }
184 });
185 handle
186 };
187
188
189 for handle in handles {
190 match handle.await {
191 Ok(result) => {
192 if let Err(e) = result {
193 sync_downloaded_canceltoken.cancel();
194 let _ = downloaded_size_handle.await;
195 return Err(e);
196 }
197 }
198 Err(_) => {
199 sync_downloaded_canceltoken.cancel();
200 let _ = downloaded_size_handle.await;
201 return Err(DownloadError::ChunkDownloadHandle);
202 }
203 }
204 }
205
206 sync_downloaded_canceltoken.cancel();
207 let _ = downloaded_size_handle.await;
208
209 if cancel_token.is_cancelled() {
210 return Ok(());
211 }
212
213 sync_downloaded_size(&receivers, &sender);
214
215 *status.write() = DownloadStatus::DownloadPost;
216 chunk_hub::on_download_post(&config, chunk_length).await?;
217
218 Ok(())
219}