cobble_core/utils/
download.rs1use crate::error::{DownloadError, DownloadResult};
2use futures::{stream, StreamExt, TryStreamExt};
3use sha1::Digest;
4use sha1::Sha1;
5use std::cmp::min;
6use std::path::PathBuf;
7use time::OffsetDateTime;
8use tokio::fs::create_dir_all;
9use tokio::fs::File;
10use tokio::io::AsyncWriteExt;
11use tokio::sync::mpsc::channel;
12use tokio::sync::mpsc::Receiver;
13use tokio::sync::mpsc::Sender;
14use tokio::task;
15
16#[derive(Clone, Debug)]
17pub struct Download {
18 pub url: String,
19 pub file: PathBuf,
20 pub sha1: Option<Vec<u8>>,
21}
22
23impl Download {
24 #[instrument(
25 name = "download_file",
26 level = "trace",
27 skip_all,
28 fields(
29 url = self.url,
30 file = %self.file.to_string_lossy(),
31 current_file,
32 total_files,
33 )
34 )]
35 pub async fn download(
36 &self,
37 client: reqwest::Client,
38 mut progress_sender: Option<Sender<DownloadProgress>>,
39 current_file: usize,
40 total_files: usize,
41 ) -> DownloadResult<()> {
42 if let Some(parent) = self.file.parent() {
44 trace!("Creating parent folder");
45 create_dir_all(parent).await?;
46 }
47
48 let response = client.get(&self.url).send().await?.error_for_status()?;
50
51 trace!("Sending request to get content-length");
53 let total_bytes = response
54 .content_length()
55 .ok_or(DownloadError::NoContentLength)?;
56 let mut progress = DownloadProgress {
57 url: self.url.clone(),
58 file: self.file.clone(),
59 current_file,
60 total_files,
61 downloaded_bytes: 0,
62 total_bytes,
63 };
64
65 trace!("Send initial progress");
66 progress.send(&mut progress_sender).await;
67
68 let mut file = File::create(&self.file).await?;
69 let mut stream = response.bytes_stream();
70
71 trace!("Writing content to disk.");
72 let mut last_chunk_time = OffsetDateTime::now_utc().unix_timestamp_nanos();
73 while let Some(item) = stream.next().await {
74 let chunk = item?;
75 file.write_all(&chunk).await?;
76
77 progress.downloaded_bytes = min(
78 progress.downloaded_bytes + (chunk.len() as u64),
79 progress.total_bytes,
80 );
81
82 let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
83 if now - last_chunk_time > 500000000 {
84 last_chunk_time = now;
85
86 trace!("Send progress");
88 progress.send(&mut progress_sender).await;
89 }
90 }
91
92 file.sync_all().await?;
93
94 trace!("Send progress");
96 progress.downloaded_bytes = progress.total_bytes;
97 progress.send(&mut progress_sender).await;
98
99 Ok(())
100 }
101
102 #[instrument(
103 name = "verify_file",
104 level = "trace",
105 skip_all,
106 fields(
107 url = self.url,
108 file = %self.file.to_string_lossy(),
109 )
110 )]
111 pub async fn verify(&self) -> DownloadResult<bool> {
112 let this = self.clone();
113
114 task::spawn_blocking(move || this.blocking_verify())
115 .await
116 .unwrap()
117 }
118
119 fn blocking_verify(self) -> DownloadResult<bool> {
120 if let Some(sha) = self.sha1 {
121 if !self.file.is_file() {
122 return Ok(false);
123 }
124
125 let mut file = std::fs::File::open(self.file)?;
126 let mut hasher = Sha1::new();
127 std::io::copy(&mut file, &mut hasher)?;
128 let hash = hasher.finalize().to_vec();
129
130 Ok(sha == hash)
131 } else {
132 Ok(self.file.is_file())
133 }
134 }
135}
136
137#[derive(Clone, Debug)]
139pub struct DownloadProgress {
140 pub url: String,
142 pub file: PathBuf,
144 pub current_file: usize,
146 pub total_files: usize,
148 pub downloaded_bytes: u64,
150 pub total_bytes: u64,
152}
153
154impl DownloadProgress {
155 pub(crate) async fn send(&self, sender: &mut Option<Sender<Self>>) {
156 if let Some(s) = &sender {
157 if s.send(self.clone()).await.is_err() {
158 trace!("Sending failed because receiver is no longer around. Dropping sender...");
159 *sender = None;
160 }
161 }
162 }
163}
164
165#[instrument(
166 name = "download",
167 level = "trace",
168 skip_all,
169 fields(parallel_downloads, verify)
170)]
171pub async fn download(
172 downloads: Vec<Download>,
173 progress_sender: Option<Sender<DownloadProgress>>,
174 parallel_downloads: u16,
175 retries: u16,
176 verify: bool,
177) -> DownloadResult<()> {
178 let client = reqwest::Client::new();
179 let total = downloads.len();
180 let downloads = downloads.into_iter().enumerate();
181
182 stream::iter(downloads)
183 .map(move |(n, d)| {
184 let client = client.clone();
185 let mut sender = progress_sender.clone();
186 async move {
187 for x in 0..=retries {
189 if x > 0 {
190 trace!("Retrying to download file for the {}th time", x);
191 }
192
193 if !d.file.exists() || x > 0 {
195 trace!("File does not exist or retrying");
196 d.download(client.clone(), sender.clone(), n, total).await?;
197 } else {
198 trace!("File does exist, sending progress update");
199 let file = File::open(&d.file).await?;
200 let size = file.metadata().await?.len();
201
202 DownloadProgress {
204 url: d.url.clone(),
205 file: d.file.clone(),
206 current_file: n,
207 total_files: total,
208 downloaded_bytes: size,
209 total_bytes: size,
210 }
211 .send(&mut sender)
212 .await;
213 }
214
215 if verify && !d.verify().await? {
217 if x == retries {
219 debug!("Verification of file failed");
220 return Err(DownloadError::ChecksumMismatch);
221 } else {
222 debug!("Verification of file failed, retrying...");
223 }
224 } else {
225 return Ok(());
227 }
228 }
229
230 Ok(())
231 }
232 })
233 .buffer_unordered(parallel_downloads as usize)
234 .try_collect::<()>()
235 .await?;
236
237 Ok(())
238}
239
240pub fn download_progress_channel(
241 buffer: usize,
242) -> (Sender<DownloadProgress>, Receiver<DownloadProgress>) {
243 channel(buffer)
244}