use std::{iter::Iterator, path::PathBuf, sync::Arc, time::Duration};
use color_eyre::{eyre::eyre, Result};
use futures::StreamExt;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use log::{debug, error, info, warn};
use lychee_lib::{CacheStatus, Status};
use reqwest::Client;
use tokio::{
fs::{self, File},
io::AsyncWriteExt,
time,
};
use url::Url;
use crate::validate::UnvalidatedFile;
pub async fn request_dataset(
file_to_request: UnvalidatedFile,
client: Client,
target_dir: Arc<PathBuf>,
multi_progbar: Arc<MultiProgress>,
) -> Result<UnvalidatedFile> {
let url = file_to_request.url();
let valid_url = check_url(url).await?;
debug!("Downloading dataset file from {:?}", valid_url);
let response = match download_with_retries(&client, valid_url.as_str()).await {
Ok(r) => {
debug!("Successfully downloaded from {:?}", valid_url);
r
}
Err(e) => {
return Err(eyre!(
"The request encountered an error: {:?}. Skipping.",
e
));
}
};
let total_size = response.content_length().unwrap_or(0);
let filename = uri_to_filename(&valid_url)?;
let downloaded_file = if response.status().is_success() {
let file_path = target_dir.join(filename);
if let Some(parent) = file_path.parent() {
fs::create_dir_all(parent).await?;
}
let prog_bar = multi_progbar.add(ProgressBar::new(total_size));
prog_bar.set_style(
ProgressStyle::default_bar()
.template(
"{msg} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})",
)?
.progress_chars("##-"),
);
prog_bar.set_message(format!("Writing data into {filename}..."));
let mut file = File::create(&file_path).await?;
let mut stream = response.bytes_stream();
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
file.write_all(&chunk).await?;
prog_bar.inc(chunk.len() as u64);
}
Err(e) => {
error!("Error while reading chunk from {}: {}", url, e);
return Err(e.into());
}
}
}
prog_bar.set_message(format!("Writing data into {filename}...Done!"));
file_path
} else if response.status().as_u16() == 404 {
warn!("File not found: {}", url);
return Err(eyre!(
"Failed to download {}: HTTP {}",
filename,
response.status()
));
} else {
error!(
"Failed to download {}: HTTP {}",
filename,
response.status()
);
return Err(eyre!(
"Failed to download {}: HTTP {}",
filename,
response.status()
));
};
let downloaded = file_to_request.set_path(downloaded_file);
Ok(downloaded)
}
async fn download_with_retries(client: &Client, url: &str) -> Result<reqwest::Response> {
let mut attempt = 0;
let max_attempts = 5;
loop {
attempt += 1;
debug!("Performing attempt #{} to download from {}.", &attempt, url);
match run_http_request(client, url).await {
Ok(response) => {
debug!("Successfully downloaded files for URL {}", url);
return Ok(response);
}
Err(e) => {
if attempt >= max_attempts {
error!(
"Failed to download files for URL {} after {} attempts:\n\n{}",
url, attempt, e
);
return Err(e);
}
let delay = Duration::from_secs(2_u64.pow(attempt));
warn!(
"Attempt {} failed for URL {}: {}. Retrying in {} seconds...",
attempt,
url,
e,
delay.as_secs()
);
time::sleep(delay).await;
}
}
}
}
async fn run_http_request(client: &Client, url: &str) -> Result<reqwest::Response> {
debug!("Downloading {}", url);
let response = client.get(url).send().await?;
if response.status().is_success() {
debug!("Downloaded successful for {}", url);
Ok(response)
} else {
Err(eyre!(
"Failed to download from URL {}: {}",
url,
response.status()
))
}
}
#[inline]
pub async fn check_url(url: &str) -> Result<Url> {
debug!("Checking the requested URL '{url}' to make sure it's valid");
let response = lychee_lib::check(url).await?;
let response_body = response.body();
match &response_body.status {
Status::Ok(status_code) => {
info!(
"The URL {url} has been successfully checked with status code {}, and is thus valid and not broken.",
status_code.as_str()
);
let parsed_url = Url::parse(response_body.uri.as_str())?;
Ok(parsed_url)
}
Status::Error(error_kind) => Err(eyre!(
"An error was encountered when checking the provided URI, '{url}': {:?}",
error_kind
)),
Status::Timeout(possible_code) => {
if let Some(code) = possible_code {
return Err(eyre!(
"The request for the provided URI, '{url}', timed out with the status code {}.",
code.as_str()
));
}
Err(eyre!(
"The request for the provided URI, '{url}', timed out without a status code."
))
}
Status::Redirected(status_code) => {
warn!(
"The provided URI resulted in a redirect to a different resource with status code {:?}. `refman` will proceed, though it may download a different file than is expected.",
status_code.as_str()
);
let parsed_url = Url::parse(response_body.uri.as_str())?;
Ok(parsed_url)
}
Status::UnknownStatusCode(status_code) => Err(eyre!(
"An unknown status code was received: {:?}",
status_code.as_str()
)),
Status::Excluded => Err(eyre!(
"The requested URL '{url}' has been excluded by the host."
)),
Status::Unsupported(error_kind) => {
warn!(
"The requested URL '{url}' is valid but unsupported by the validator. Proceed with downloading it at your own risk. Here's the validator error: {:?}",
error_kind
);
let parsed_url = Url::parse(response_body.uri.as_str())?;
Ok(parsed_url)
}
Status::Cached(cache_status) => {
if let CacheStatus::Ok(_) = cache_status {
info!("A cached response is being used instead of a fresh download.");
let parsed_url = Url::parse(response_body.uri.as_str())?;
Ok(parsed_url)
} else {
warn!(
"The requested url '{url}' appears to be valid and was cached, but the cache has become invalid."
);
let parsed_url = Url::parse(response_body.uri.as_str())?;
Ok(parsed_url)
}
}
}
}
#[inline]
pub fn uri_to_filename(url: &Url) -> Result<&str> {
match url.path_segments().and_then(Iterator::last) {
Some(filename) if !filename.is_empty() => Ok(filename),
_ => Err(eyre!(
"Failed to extract filename from URL, which may be corrupted or may not end with the name of a file: {}",
url
)),
}
}