use futures::{stream, StreamExt};
use reqwest::{Client, IntoUrl};
use std::{
path::Path,
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
fs::{create_dir_all, File},
io::AsyncWriteExt,
sync::Mutex,
time::timeout,
};
use crate::{
error::Error,
minecraft::{
emitter::{Emit, Emitter, Event},
install::FileType,
},
util::retry::retry,
};
pub async fn download<P: AsRef<Path>>(
url: impl IntoUrl,
destination: P,
emitter: Option<&Emitter>,
client: Option<&Client>,
) -> crate::Result<u64> {
let default_client = Client::default();
let client = client.unwrap_or(&default_client);
let response = client.get(url).send().await?;
if !response.status().is_success() {
return Err(Error::Download(response.status().to_string()));
}
let total_size = response.content_length().unwrap_or(0);
let mut downloaded: u64 = 0;
if let Some(parent) = destination.as_ref().parent() {
if !parent.is_dir() {
create_dir_all(parent).await?;
}
}
let mut file = File::create(&destination).await?;
let mut stream = response.bytes_stream();
let mut last_data_received;
while let Some(chunk_result) = timeout(Duration::from_secs(10), stream.next()).await? {
match chunk_result {
Ok(chunk) => {
last_data_received = Instant::now();
downloaded += chunk.len() as u64;
file.write_all(&chunk).await?;
emitter
.emit(
Event::SingleDownloadProgress,
(
destination.as_ref().to_string_lossy().into_owned(),
downloaded,
total_size,
),
)
.await;
}
Err(_) => {
return Err(Error::Download(
"Connection dead, no data for 3 seconds.".to_string(),
));
}
}
if last_data_received.elapsed() > Duration::from_secs(10) {
return Err(Error::Download(
"Connection dead, no data for 3 seconds.".to_string(),
));
}
}
Ok(total_size)
}
pub async fn download_multiple<U, P>(
downloads: Vec<(U, P, FileType)>,
emitter: Option<&Emitter>,
client: Option<&Client>,
) -> crate::Result<()>
where
U: IntoUrl + Send, P: AsRef<Path> + Send + 'static, {
let total_files = downloads.len();
let total_downloaded = Arc::new(Mutex::new(0));
let tasks = downloads.into_iter().map(|(url, destination, file_type)| {
let total_downloaded = Arc::clone(&total_downloaded);
async move {
let result = retry(
|| async { download(url.as_str(), destination.as_ref(), emitter, client).await },
Result::is_ok,
3,
Duration::from_secs(5),
)
.await;
match result {
Ok(_) => {
let mut downloaded = total_downloaded.lock().await;
*downloaded += 1;
emitter
.emit(
Event::MultipleDownloadProgress,
(
destination.as_ref().to_string_lossy().into_owned(),
*downloaded as u64,
total_files as u64,
file_type.to_string(),
),
)
.await;
Ok::<(), Error>(())
}
Err(e) => {
Err(e)
}
}
}
});
let mut stream = stream::iter(tasks).buffered(10);
while let Some(result) = stream.next().await {
result?;
}
Ok(())
}