use reqwest::header::CONTENT_LENGTH;
use reqwest::StatusCode;
use std::io::Read;
use std::str::FromStr;
use std::sync::mpsc::SyncSender;
use std::sync::{Arc, Mutex};
use std::thread;
use crate::options::PipeDownloaderOptions;
use anyhow::anyhow;
use reqwest::blocking::Response;
use std::time::Duration;
use crate::pipe_progress::InternalProgress;
use crate::pipe_utils::bytes_to_human;
use crate::pipe_wrapper::DataChunk;
fn download_chunk(
thread_no: usize,
progress_context: Arc<Mutex<InternalProgress>>,
range: &std::ops::Range<usize>,
max_speed: Option<usize>,
response: &mut reqwest::blocking::Response,
) -> anyhow::Result<Vec<u8>> {
let mut buf_vec: Vec<u8> = Vec::with_capacity(range.end - range.start);
let mut buf = vec![0; 1024 * 1024];
let mut total_downloaded: usize = 0;
let start_time = std::time::Instant::now();
loop {
let left_to_download = (range.end - range.start) - total_downloaded;
let max_buf_size = std::cmp::min(buf.len(), left_to_download);
if max_buf_size == 0 {
break;
}
let n = response.read(&mut buf[..max_buf_size])?;
if n == 0 {
return Err(anyhow!("Unexpected end of file"));
}
total_downloaded += n;
buf_vec.extend_from_slice(&buf[..n]);
{
let mut progress_context = progress_context.lock().unwrap();
if let Some(cd) = progress_context.chunk_downloaded.get_mut(thread_no) {
*cd += n;
}
progress_context.progress_buckets_download.add_bytes(n);
if progress_context.paused {
return Err(anyhow::anyhow!("Download paused"));
}
if progress_context.stop_requested {
return Err(anyhow::anyhow!("Stop requested"));
}
}
if let Some(max_speed) = max_speed {
let should_take_time =
Duration::from_secs_f64(total_downloaded as f64 / max_speed as f64);
log::debug!("Should take time: {:?}", should_take_time);
loop {
let elapsed = std::time::Instant::now().duration_since(start_time);
if should_take_time > elapsed {
std::thread::sleep(Duration::from_millis(1));
} else {
break;
}
}
}
}
if buf_vec.len() != range.end - range.start {
return Err(anyhow::anyhow!(
"unexpected content length: {}",
buf_vec.len()
));
}
log::debug!(
"Chunk downloaded: range {:?} / {}",
range,
range.end - range.start
);
Ok(buf_vec)
}
fn request_chunk(
url: &str,
client: &reqwest::blocking::Client,
range: &std::ops::Range<usize>,
) -> anyhow::Result<Response> {
log::debug!(
"Downloading chunk: range {:?} / {}",
range,
range.end - range.start
);
let header = format!("bytes={}-{}", range.start, range.end - 1);
let response = client.get(url).header("Range", header).send()?;
let status = response.status();
let content_length = response
.headers()
.get("Content-Length")
.ok_or_else(|| anyhow::anyhow!("Content-Length header not found"))?
.to_str()?;
let content_length = usize::from_str(content_length)?;
if status == StatusCode::OK && range.start != 0 {
return Err(anyhow::anyhow!(
"Seems like server does not support partial content: {}",
status
));
}
if status != StatusCode::PARTIAL_CONTENT && range.start != 0 {
return Err(anyhow::anyhow!("unexpected status code: {}", status));
} else {
log::info!(
"Received status: {:?}, starting downloading chunk data...",
status
);
}
if content_length != range.end - range.start {
return Err(anyhow::anyhow!(
"unexpected content length: {}",
content_length
));
}
Ok(response)
}
pub fn decode_loop<T: Read>(
progress_context: Arc<Mutex<InternalProgress>>,
options: &PipeDownloaderOptions,
decoder: &mut T,
send: std::sync::mpsc::SyncSender<DataChunk>,
) -> anyhow::Result<()> {
let mut unpacked_size = 0;
loop {
let mut buf = vec![0u8; options.chunk_size_decoder];
let bytes_read = match decoder.read(&mut buf) {
Ok(bytes_read) => bytes_read,
Err(err) => {
log::error!("Error while reading from decoder {:?}", err);
return Err(anyhow::anyhow!(
"Error while reading from decoder {:?}",
err
));
}
};
if bytes_read == 0 {
break;
}
unpacked_size += bytes_read;
{
let mut progress = progress_context.lock().unwrap();
progress.total_unpacked = unpacked_size;
progress.progress_buckets_unpack.add_bytes(bytes_read);
if progress.stop_requested {
break;
}
}
log::debug!(
"Decode loop, Unpacked size: {}",
bytes_to_human(unpacked_size)
);
buf.resize(bytes_read, 0);
let data_chunk = DataChunk {
data: buf,
range: unpacked_size - bytes_read..unpacked_size,
};
send.send(data_chunk)?;
}
log::info!("Finishing decode loop");
Ok(())
}
pub fn download_loop(
thread_no: usize,
thread_count: usize,
options: PipeDownloaderOptions,
progress_context: Arc<Mutex<InternalProgress>>,
send_download_chunks: SyncSender<DataChunk>,
download_url: &str,
) -> anyhow::Result<()> {
let mut use_chunks = !options.force_no_chunks;
let client = reqwest::blocking::Client::new();
let download_url = if download_url.ends_with(".link") {
let url = client.get(download_url).send()?.text()?.trim().to_string();
if url.is_empty() {
return Err(anyhow::anyhow!("Empty url from link"));
}
if !url.starts_with("http://") && !url.starts_with("https://") {
return Err(anyhow::anyhow!(
"Wrong url, should start from http:// or https://"
));
}
log::info!("Got download address from link: {}", url);
url
} else {
download_url.to_string()
};
progress_context.lock().unwrap().download_url = Some(download_url.clone());
let response = client.head(&download_url).send()?;
let total_length = match response
.headers()
.get(CONTENT_LENGTH)
.ok_or_else(|| anyhow!("response doesn't include the content length"))
{
Ok(length) => Some(
usize::from_str(length.to_str()?)
.map_err(|_| anyhow!("invalid Content-Length header"))?,
),
Err(_) => {
log::warn!("Content-Length header not found, continue download without knowledge about file size...");
use_chunks = false;
None
}
};
progress_context.lock().unwrap().total_download_size = total_length;
if total_length
.map(|total_length| total_length == 0)
.unwrap_or(false)
{
return Err(anyhow::anyhow!(
"Content-Length is 0, empty files not supported"
));
}
if total_length
.map(|total_length| total_length < 10000)
.unwrap_or(false)
{
log::info!("Single request mode");
use_chunks = false;
}
if use_chunks {
let header = format!("bytes={}-{}", 1000, 2000);
let response = client.head(&download_url).header("Range", header).send()?;
if response.status() != StatusCode::PARTIAL_CONTENT {
log::warn!("Server does not support partial content, falling back to single request. No retries after connection error will be possible");
use_chunks = false;
}
}
let thread_count = if use_chunks {
thread_count
} else if thread_no != 0 {
log::warn!("Thread number is set, but server does not support partial content, falling back to single request");
return Ok(());
} else {
1
};
progress_context
.lock()
.unwrap()
.chunk_downloaded
.resize(thread_count, 0);
let mut download_response = if !use_chunks {
Some(client.get(&download_url).send()?)
} else {
None
};
let chunk_size = options.chunk_size_downloader;
let chunk_count = match total_length {
Some(total_length) => ((total_length - 1) / chunk_size + 1) as usize,
None => 1,
};
let total_length = total_length.ok_or_else(|| anyhow!("Content length unknown"))?;
{
let mut pc = progress_context.lock().unwrap();
if pc.unfinished_chunks.is_empty() {
pc.total_chunks = chunk_count;
pc.unfinished_chunks.reserve(chunk_count);
for i in (0..chunk_count).rev() {
pc.unfinished_chunks.push(i);
}
log::warn!("Unfinished chunks: {:?}", pc.unfinished_chunks);
}
}
for i in 0..chunk_count {
let max_length = std::cmp::min(chunk_size, total_length - i * chunk_size);
if max_length == 0 {
break;
}
if i % thread_count != thread_no {
continue;
}
loop {
let smallest_unfinished = *progress_context
.lock()
.unwrap()
.unfinished_chunks
.last()
.expect("Critical error unfinished chunks have to be here");
if i - smallest_unfinished > thread_count {
std::thread::sleep(std::time::Duration::from_millis(100));
continue;
}
break;
}
let range = std::ops::Range {
start: i * chunk_size,
end: i * chunk_size + max_length,
};
let client = reqwest::blocking::Client::new();
loop {
let progress = { progress_context.lock().unwrap().clone() };
if progress.stop_requested {
return Err(anyhow::anyhow!("Stop requested"));
}
if progress.paused {
log::info!("Download still paused...");
thread::sleep(Duration::from_secs(5));
continue;
}
if thread_count > 1 {
download_response = None;
}
let result = if let Some(download_response) = &mut download_response {
download_chunk(
thread_no,
progress_context.clone(),
&range,
options.max_download_speed,
download_response,
)
} else {
let new_range = if thread_count == 1 {
std::ops::Range {
start: range.start,
end: total_length,
}
} else {
range.clone()
};
match request_chunk(&download_url, &client, &new_range) {
Ok(mut new_response) => {
let res = download_chunk(
thread_no,
progress_context.clone(),
&range,
options.max_download_speed,
&mut new_response,
);
download_response = Some(new_response);
res
}
Err(err) => {
log::error!("Error while requesting chunk: {:?}", err);
Err(err)
}
}
};
match result {
Ok(buf) => {
{
let mut progress = progress_context.lock().unwrap();
progress.total_downloaded += progress.chunk_downloaded[thread_no];
progress.chunk_downloaded[thread_no] = 0;
if progress.stop_requested {
return Err(anyhow::anyhow!("Stop requested"));
}
}
let dc = DataChunk {
range: range.clone(),
data: buf,
};
{
let mut pc = progress_context.lock().unwrap();
let idx_to_remove = pc
.unfinished_chunks
.iter()
.rposition(|el| *el == i)
.unwrap_or_else(|| {
panic!("Critical error, chunk {} should be in unfinished chunks", i)
});
assert!(i == pc.unfinished_chunks[idx_to_remove]);
log::warn!("Removing chunk {} at idx {}", i, idx_to_remove);
pc.unfinished_chunks.remove(idx_to_remove);
}
if let Err(err) = send_download_chunks.send(dc) {
log::error!("Error while sending chunk: {:?}", err);
return Err(anyhow::anyhow!("Error while sending chunk: {:?}", err));
}
break;
}
Err(err) => {
download_response = None;
let progress = {
let mut progress = progress_context.lock().unwrap();
progress.chunk_downloaded[thread_no] = 0;
progress.clone()
};
if progress.stop_requested {
return Err(anyhow::anyhow!("Stop requested"));
}
if !use_chunks {
return Err(anyhow::anyhow!("Error while downloading: {:?}", err));
}
if progress.paused {
log::info!("Download paused, trying again");
} else {
log::warn!("Error while downloading chunk, trying again: {:?}", err);
}
thread::sleep(Duration::from_secs(5));
}
}
}
}
Ok(())
}