cobble_core/utils/
download.rs

1use crate::error::{DownloadError, DownloadResult};
2use futures::{stream, StreamExt, TryStreamExt};
3use sha1::Digest;
4use sha1::Sha1;
5use std::cmp::min;
6use std::path::PathBuf;
7use time::OffsetDateTime;
8use tokio::fs::create_dir_all;
9use tokio::fs::File;
10use tokio::io::AsyncWriteExt;
11use tokio::sync::mpsc::channel;
12use tokio::sync::mpsc::Receiver;
13use tokio::sync::mpsc::Sender;
14use tokio::task;
15
16#[derive(Clone, Debug)]
17pub struct Download {
18    pub url: String,
19    pub file: PathBuf,
20    pub sha1: Option<Vec<u8>>,
21}
22
23impl Download {
24    #[instrument(
25        name = "download_file",
26        level = "trace",
27        skip_all,
28        fields(
29            url = self.url,
30            file = %self.file.to_string_lossy(),
31            current_file,
32            total_files,
33        )
34    )]
35    pub async fn download(
36        &self,
37        client: reqwest::Client,
38        mut progress_sender: Option<Sender<DownloadProgress>>,
39        current_file: usize,
40        total_files: usize,
41    ) -> DownloadResult<()> {
42        // Create parent folder
43        if let Some(parent) = self.file.parent() {
44            trace!("Creating parent folder");
45            create_dir_all(parent).await?;
46        }
47
48        // Setup request
49        let response = client.get(&self.url).send().await?.error_for_status()?;
50
51        // Progress Update
52        trace!("Sending request to get content-length");
53        let total_bytes = response
54            .content_length()
55            .ok_or(DownloadError::NoContentLength)?;
56        let mut progress = DownloadProgress {
57            url: self.url.clone(),
58            file: self.file.clone(),
59            current_file,
60            total_files,
61            downloaded_bytes: 0,
62            total_bytes,
63        };
64
65        trace!("Send initial progress");
66        progress.send(&mut progress_sender).await;
67
68        let mut file = File::create(&self.file).await?;
69        let mut stream = response.bytes_stream();
70
71        trace!("Writing content to disk.");
72        let mut last_chunk_time = OffsetDateTime::now_utc().unix_timestamp_nanos();
73        while let Some(item) = stream.next().await {
74            let chunk = item?;
75            file.write_all(&chunk).await?;
76
77            progress.downloaded_bytes = min(
78                progress.downloaded_bytes + (chunk.len() as u64),
79                progress.total_bytes,
80            );
81
82            let now = OffsetDateTime::now_utc().unix_timestamp_nanos();
83            if now - last_chunk_time > 500000000 {
84                last_chunk_time = now;
85
86                // Last update was more than 500000000 nanoseconds (500ms) ago
87                trace!("Send progress");
88                progress.send(&mut progress_sender).await;
89            }
90        }
91
92        file.sync_all().await?;
93
94        // Send finished progress
95        trace!("Send progress");
96        progress.downloaded_bytes = progress.total_bytes;
97        progress.send(&mut progress_sender).await;
98
99        Ok(())
100    }
101
102    #[instrument(
103        name = "verify_file",
104        level = "trace",
105        skip_all,
106        fields(
107            url = self.url,
108            file = %self.file.to_string_lossy(),
109        )
110    )]
111    pub async fn verify(&self) -> DownloadResult<bool> {
112        let this = self.clone();
113
114        task::spawn_blocking(move || this.blocking_verify())
115            .await
116            .unwrap()
117    }
118
119    fn blocking_verify(self) -> DownloadResult<bool> {
120        if let Some(sha) = self.sha1 {
121            if !self.file.is_file() {
122                return Ok(false);
123            }
124
125            let mut file = std::fs::File::open(self.file)?;
126            let mut hasher = Sha1::new();
127            std::io::copy(&mut file, &mut hasher)?;
128            let hash = hasher.finalize().to_vec();
129
130            Ok(sha == hash)
131        } else {
132            Ok(self.file.is_file())
133        }
134    }
135}
136
137/// Progress of an ongoing download.
138#[derive(Clone, Debug)]
139pub struct DownloadProgress {
140    /// The URL of the download.
141    pub url: String,
142    /// The path where the file is saved.
143    pub file: PathBuf,
144    /// Current file index.
145    pub current_file: usize,
146    /// Number of files that are being downloaded.
147    pub total_files: usize,
148    /// Bytes that already got downloaded.
149    pub downloaded_bytes: u64,
150    /// Total bytes of the file.
151    pub total_bytes: u64,
152}
153
154impl DownloadProgress {
155    pub(crate) async fn send(&self, sender: &mut Option<Sender<Self>>) {
156        if let Some(s) = &sender {
157            if s.send(self.clone()).await.is_err() {
158                trace!("Sending failed because receiver is no longer around. Dropping sender...");
159                *sender = None;
160            }
161        }
162    }
163}
164
165#[instrument(
166    name = "download",
167    level = "trace",
168    skip_all,
169    fields(parallel_downloads, verify)
170)]
171pub async fn download(
172    downloads: Vec<Download>,
173    progress_sender: Option<Sender<DownloadProgress>>,
174    parallel_downloads: u16,
175    retries: u16,
176    verify: bool,
177) -> DownloadResult<()> {
178    let client = reqwest::Client::new();
179    let total = downloads.len();
180    let downloads = downloads.into_iter().enumerate();
181
182    stream::iter(downloads)
183        .map(move |(n, d)| {
184            let client = client.clone();
185            let mut sender = progress_sender.clone();
186            async move {
187                // Retries
188                for x in 0..=retries {
189                    if x > 0 {
190                        trace!("Retrying to download file for the {}th time", x);
191                    }
192
193                    // Download file
194                    if !d.file.exists() || x > 0 {
195                        trace!("File does not exist or retrying");
196                        d.download(client.clone(), sender.clone(), n, total).await?;
197                    } else {
198                        trace!("File does exist, sending progress update");
199                        let file = File::open(&d.file).await?;
200                        let size = file.metadata().await?.len();
201
202                        // Send progress
203                        DownloadProgress {
204                            url: d.url.clone(),
205                            file: d.file.clone(),
206                            current_file: n,
207                            total_files: total,
208                            downloaded_bytes: size,
209                            total_bytes: size,
210                        }
211                        .send(&mut sender)
212                        .await;
213                    }
214
215                    // Verify file
216                    if verify && !d.verify().await? {
217                        // Checksum does not match
218                        if x == retries {
219                            debug!("Verification of file failed");
220                            return Err(DownloadError::ChecksumMismatch);
221                        } else {
222                            debug!("Verification of file failed, retrying...");
223                        }
224                    } else {
225                        // Success
226                        return Ok(());
227                    }
228                }
229
230                Ok(())
231            }
232        })
233        .buffer_unordered(parallel_downloads as usize)
234        .try_collect::<()>()
235        .await?;
236
237    Ok(())
238}
239
240pub fn download_progress_channel(
241    buffer: usize,
242) -> (Sender<DownloadProgress>, Receiver<DownloadProgress>) {
243    channel(buffer)
244}