use futures_util::TryStreamExt;
use crate::util::create_progress_bar;
use std::path::Path;
use tokio::io::AsyncWriteExt;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
pub async fn stream_response_to_writer<W: AsyncWrite + Unpin>(
resp: reqwest::Response,
mut writer: W,
pb: Option<&indicatif::ProgressBar>,
url: &str,
out_path: Option<&str>,
) -> Result<u64, IskraError> {
use tokio_util::io::StreamReader;
use tokio::io::AsyncReadExt;
let mut downloaded = 0u64;
let mut reader = StreamReader::new(
resp.bytes_stream().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
);
let mut buf = [0u8; 8192];
loop {
let n = reader.read(&mut buf).await
.map_err(|e| {
if let Some(path) = out_path {
IskraError::Io { source: e, path: path.to_string() }
} else {
IskraError::Other(format!("stream read error for {url}: {e}"))
}
})?;
if n == 0 { break; }
writer.write_all(&buf[..n]).await
.map_err(|e| {
if let Some(path) = out_path {
IskraError::Io { source: e, path: path.to_string() }
} else {
IskraError::Other(format!("stream write error for {url}: {e}"))
}
})?;
downloaded += n as u64;
if let Some(pb) = pb {
pb.set_position(downloaded);
}
}
Ok(downloaded)
}
impl IskraClient {
pub async fn stream_response_to_writer<W: AsyncWrite + Unpin>(
&self,
resp: reqwest::Response,
mut writer: W,
pb: Option<&indicatif::ProgressBar>,
url: &str,
out_path: Option<&str>,
) -> Result<u64, IskraError> {
use tokio_util::io::StreamReader;
let mut downloaded = 0u64;
let mut reader = StreamReader::new(
resp.bytes_stream().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
);
let mut buf = [0u8; 8192];
loop {
let n = reader.read(&mut buf).await
.map_err(|e| {
if let Some(path) = out_path {
IskraError::Io { source: e, path: path.to_string() }
} else {
IskraError::Other(format!("stream read error for {url}: {e}"))
}
})?;
if n == 0 { break; }
writer.write_all(&buf[..n]).await
.map_err(|e| {
if let Some(path) = out_path {
IskraError::Io { source: e, path: path.to_string() }
} else {
IskraError::Other(format!("stream write error for {url}: {e}"))
}
})?;
downloaded += n as u64;
if let Some(pb) = pb {
pb.set_position(downloaded);
}
}
Ok(downloaded)
}
pub async fn download_with_range(
&self,
method: &str,
url: &str,
body: Option<&str>,
headers: &[(&str, &str)],
queries: &[(&str, &str)],
out_path: &Path,
range: Option<(u64, Option<u64>)>,
resume: bool,
cache: Option<&crate::cache::Cache>,
) -> Result<u64, IskraError> {
use reqwest::header::{RANGE, HeaderValue};
use reqwest::Method;
let method_str = method; let method = Method::from_bytes(method_str.as_bytes())
.map_err(|e| IskraError::Config { msg: format!("invalid method: {e}") })?;
let mut req = self.client.request(method.clone(), url);
if let Some(b) = body {
req = req.body(b.to_owned());
}
for (k, v) in headers {
req = req.header(*k, *v);
}
if !queries.is_empty() {
req = req.query(queries);
}
let cache_key = format!("{}:{}:{}:{}", method_str, url, join_pairs(headers), join_pairs(queries));
if let Some(cache) = cache {
let cache_range = if resume && out_path.exists() {
let meta = std::fs::metadata(out_path).map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
let start = meta.len();
Some((start, u64::MAX)) } else if let Some((range_start, range_end)) = range {
Some((range_start, range_end.unwrap_or(u64::MAX)))
} else {
None
};
if let Some(r) = cache_range {
if let Some((data, _meta)) = cache.get(&cache_key, Some((r.0, r.1))) {
if resume && out_path.exists() {
let mut file = tokio::fs::OpenOptions::new().append(true).open(out_path).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
tokio::io::AsyncWriteExt::write_all(&mut file, &data).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
return Ok(std::fs::metadata(out_path).map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?.len());
} else {
let mut file = tokio::fs::File::create(out_path).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
tokio::io::AsyncWriteExt::write_all(&mut file, &data).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
return Ok(data.len() as u64);
}
}
}
}
let (mut file, start, expect_partial) = if resume && out_path.exists() {
let start = std::fs::metadata(out_path)
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?
.len();
req = req.header(RANGE, HeaderValue::from_str(&format!("bytes={}-", start)).unwrap());
let file = tokio::fs::OpenOptions::new().append(true).open(out_path).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
(file, start, true)
} else if let Some((range_start, range_end)) = range {
let range_header = match range_end {
Some(end) => format!("bytes={}-{}", range_start, end),
None => format!("bytes={}-", range_start),
};
req = req.header(RANGE, HeaderValue::from_str(&range_header).unwrap());
let file = tokio::fs::OpenOptions::new().write(true).create(true).truncate(true).open(out_path).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
(file, range_start, true)
} else {
let file = tokio::fs::OpenOptions::new().write(true).create(true).truncate(true).open(out_path).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
(file, 0u64, false)
};
let resp = req.send().await.map_err(|e| IskraError::Http { source: e, url: url.to_string() })?;
let status = resp.status();
if expect_partial {
if status == reqwest::StatusCode::PARTIAL_CONTENT {
} else if status == reqwest::StatusCode::RANGE_NOT_SATISFIABLE {
return Err(IskraError::Status { status: status.as_u16(), url: url.to_string() });
} else if status == reqwest::StatusCode::OK {
return Err(IskraError::Other(format!("Server ignored Range header for {url}, refusing to overwrite/append entire file (status 200 OK)")));
} else {
return Err(IskraError::Status { status: status.as_u16(), url: url.to_string() });
}
} else {
if !status.is_success() {
return Err(IskraError::Status { status: status.as_u16(), url: url.to_string() });
}
}
let total = resp.content_length().unwrap_or(0);
let pb = if total > 0 { Some(create_progress_bar(total)) } else { None };
use tokio_util::io::StreamReader;
let mut downloaded = 0u64;
let mut buf = [0u8; 8192];
let resp_headers = resp.headers().clone();
let mut reader = StreamReader::new(
resp.bytes_stream().map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
);
let mut cache_buf: Vec<u8> = Vec::new();
let max_bytes = if let Some((range_start, Some(range_end))) = range {
Some(range_end.saturating_sub(range_start) + 1)
} else {
None
};
loop {
let n = reader.read(&mut buf).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
if n == 0 { break; }
let to_write = if let Some(max) = max_bytes {
let remaining = max.saturating_sub(downloaded);
if remaining == 0 { break; }
std::cmp::min(n as u64, remaining) as usize
} else {
n
};
if to_write == 0 { break; }
file.write_all(&buf[..to_write]).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
if let Some(pb) = pb.as_ref() {
pb.set_position(start + downloaded + to_write as u64);
}
if let Some(_) = cache {
cache_buf.extend_from_slice(&buf[..to_write]);
}
downloaded += to_write as u64;
if let Some(max) = max_bytes {
if downloaded > max {
return Err(IskraError::Other(format!("Server sent more data than requested range for {url}")));
}
if downloaded == max {
let extra = reader.read(&mut buf).await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
if extra > 0 {
return Err(IskraError::Other(format!("Server sent more data than requested range for {url}")));
}
break;
}
}
}
if let Some(pb) = pb {
pb.finish_with_message("Download complete");
}
if let Some(cache) = cache {
use crate::cache::CacheMeta;
let mut meta = CacheMeta::default();
if expect_partial {
let req_range = if resume && out_path.exists() {
(start, start + downloaded - 1)
} else if let Some((range_start, range_end)) = range {
(range_start, range_end.unwrap_or(range_start + downloaded - 1))
} else {
(0, downloaded - 1)
};
meta.range = Some(req_range);
}
for (k, v) in resp_headers.iter() {
if let Ok(val) = v.to_str() {
meta.headers.insert(k.to_string(), val.to_string());
}
}
cache.set(&cache_key, meta.range, &cache_buf, &meta);
}
if resume && out_path.exists() {
Ok(start + downloaded)
} else if let Some((_range_start, Some(_range_end))) = range {
Ok(downloaded)
} else {
Ok(downloaded)
}
}
pub async fn download_to_file(&self, method: &str, url: &str, body: Option<&str>, headers: &[(&str, &str)], queries: &[(&str, &str)], out_path: &Path) -> Result<u64, IskraError> {
use reqwest::Method;
let method = Method::from_bytes(method.as_bytes())
.map_err(|e| IskraError::Config { msg: format!("invalid method: {e}") })?;
let mut req = self.client.request(method, url);
if let Some(b) = body {
req = req.body(b.to_owned());
}
for (k, v) in headers {
req = req.header(*k, *v);
}
if !queries.is_empty() {
req = req.query(queries);
}
let resp = req.send().await.map_err(|e| IskraError::Http { source: e, url: url.to_string() })?;
let status = resp.status();
if !status.is_success() {
return Err(IskraError::Status { status: status.as_u16(), url: url.to_string() });
}
let total = resp.content_length().unwrap_or(0);
let pb = if total > 0 { Some(create_progress_bar(total)) } else { None };
let mut file = tokio::fs::File::create(out_path)
.await
.map_err(|e| IskraError::Io { source: e, path: out_path.display().to_string() })?;
let downloaded = stream_response_to_writer(
resp,
&mut file,
pb.as_ref(),
url,
Some(&out_path.display().to_string()),
).await?;
if let Some(pb) = pb {
pb.finish_with_message("Download complete");
}
Ok(downloaded)
}
}
use reqwest::{Client, Response};
use crate::error::IskraError;
use crate::util::join_pairs;
use std::sync::Arc;
#[derive(Clone)]
pub struct IskraClient {
client: Arc<Client>,
}
impl IskraClient {
pub async fn request(&self, method: &str, url: &str, body: Option<&str>, headers: &[(&str, &str)], queries: &[(&str, &str)]) -> Result<Response, IskraError> {
use reqwest::Method;
let method = Method::from_bytes(method.as_bytes())
.map_err(|e| IskraError::Config { msg: format!("invalid method: {e}") })?;
let mut req = self.client.request(method, url);
if let Some(b) = body {
req = req.body(b.to_owned());
}
for (k, v) in headers {
req = req.header(*k, *v);
}
if !queries.is_empty() {
req = req.query(queries);
}
tracing::debug!("Request headers: {}", join_pairs(headers));
tracing::debug!("Request queries: {}", join_pairs(queries));
let resp = req.send().await.map_err(|e| IskraError::Http { source: e, url: url.to_string() })?;
if !resp.status().is_success() {
return Err(IskraError::Status { status: resp.status().as_u16(), url: url.to_string() });
}
Ok(resp)
}
pub fn new_with_timeout_and_decompression(timeout: std::time::Duration, decompress: bool) -> Result<Self, IskraError> {
let mut builder = Client::builder()
.use_rustls_tls()
.timeout(timeout);
if !decompress {
builder = builder.no_gzip().no_deflate().no_brotli();
}
let client = builder.build().map_err(|e| IskraError::Http { source: e, url: "(client build)".to_string() })?;
Ok(Self { client: Arc::new(client) })
}
pub fn new_with_timeout(timeout: std::time::Duration) -> Result<Self, IskraError> {
Self::new_with_timeout_and_decompression(timeout, true)
}
pub fn new() -> Result<Self, IskraError> {
Self::new_with_timeout(std::time::Duration::from_secs(30))
}
pub async fn get(&self, url: &str, headers: &[(&str, &str)], queries: &[(&str, &str)]) -> Result<Response, IskraError> {
let mut req = self.client.get(url);
for (k, v) in headers {
req = req.header(*k, *v);
}
if !queries.is_empty() {
req = req.query(queries);
}
let resp = req.send().await.map_err(|e| IskraError::Http { source: e, url: url.to_string() })?;
if !resp.status().is_success() {
return Err(IskraError::Status { status: resp.status().as_u16(), url: url.to_string() });
}
Ok(resp)
}
pub async fn post(&self, url: &str, body: &str, headers: &[(&str, &str)], queries: &[(&str, &str)]) -> Result<Response, IskraError> {
let mut req = self.client.post(url).body(body.to_owned());
for (k, v) in headers {
req = req.header(*k, *v);
}
if !queries.is_empty() {
req = req.query(queries);
}
let resp = req.send().await.map_err(|e| IskraError::Http { source: e, url: url.to_string() })?;
if !resp.status().is_success() {
return Err(IskraError::Status { status: resp.status().as_u16(), url: url.to_string() });
}
Ok(resp)
}
pub async fn put(&self, url: &str, body: &str, headers: &[(&str, &str)], queries: &[(&str, &str)]) -> Result<Response, IskraError> {
let mut req = self.client.put(url).body(body.to_owned());
for (k, v) in headers {
req = req.header(*k, *v);
}
if !queries.is_empty() {
req = req.query(queries);
}
let resp = req.send().await.map_err(|e| IskraError::Http { source: e, url: url.to_string() })?;
if !resp.status().is_success() {
return Err(IskraError::Status { status: resp.status().as_u16(), url: url.to_string() });
}
Ok(resp)
}
pub async fn delete(&self, url: &str, headers: &[(&str, &str)], queries: &[(&str, &str)]) -> Result<Response, IskraError> {
let mut req = self.client.delete(url);
for (k, v) in headers {
req = req.header(*k, *v);
}
if !queries.is_empty() {
req = req.query(queries);
}
let resp = req.send().await.map_err(|e| IskraError::Http { source: e, url: url.to_string() })?;
if !resp.status().is_success() {
return Err(IskraError::Status { status: resp.status().as_u16(), url: url.to_string() });
}
Ok(resp)
}
}