#![cfg(feature = "google-books")]
use std::path::Path;
use super::super::reader::ReaderError;
use super::super::task_manager::RetryAfter;
use super::ImportError;
#[cfg(feature = "google-books")]
pub(super) async fn download_to_cache(
url: &str,
cache_path: &Path,
client: &reqwest::Client,
) -> Result<(), ImportError> {
use tokio::io::AsyncWriteExt;
if cache_path.exists() {
log::debug!("Cache hit: {}", cache_path.display());
return Ok(());
}
if let Some(parent) = cache_path.parent() {
tokio::fs::create_dir_all(parent).await.map_err(|e| {
ImportError::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to create cache directory {}: {}",
parent.display(),
e
),
))
})?;
}
let downloading_path = cache_path.with_extension("gz.downloading");
let existing_len = tokio::fs::metadata(&downloading_path)
.await
.map(|m| m.len())
.unwrap_or(0);
let mut request = client.get(url);
if existing_len > 0 {
request = request.header(reqwest::header::RANGE, format!("bytes={}-", existing_len));
log::debug!("Requesting resume from byte {} for {}", existing_len, url);
}
let response = request
.send()
.await
.map_err(|e| ImportError::Reader(ReaderError::Http(format!("GET {}: {}", url, e))))?;
let status = response.status();
if status == reqwest::StatusCode::TOO_MANY_REQUESTS {
let retry_after = response
.headers()
.get(reqwest::header::RETRY_AFTER)
.and_then(|v| v.to_str().ok())
.map(|s| {
if let Ok(secs) = s.parse::<u64>() {
RetryAfter::Seconds(secs)
} else {
RetryAfter::Seconds(60) }
});
return Err(ImportError::Reader(ReaderError::RateLimited {
url: url.to_string(),
retry_after,
}));
}
let (response, status, existing_len) =
if status == reqwest::StatusCode::RANGE_NOT_SATISFIABLE && existing_len > 0 {
log::warn!(
"Range not satisfiable (partial file {} bytes) for {} — restarting from scratch",
existing_len,
url
);
let _ = tokio::fs::remove_file(&downloading_path).await;
let retry_resp = client.get(url).send().await.map_err(|e| {
ImportError::Reader(ReaderError::Http(format!("GET {} (retry): {}", url, e)))
})?;
let retry_status = retry_resp.status();
(retry_resp, retry_status, 0u64)
} else {
(response, status, existing_len)
};
if !status.is_success() && status != reqwest::StatusCode::PARTIAL_CONTENT {
return Err(ImportError::Reader(ReaderError::Http(format!(
"HTTP {} for {}",
status, url
))));
}
use tokio::io::BufWriter;
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
let file = if status == reqwest::StatusCode::PARTIAL_CONTENT && existing_len > 0 {
log::info!("Resuming download from byte {} for {}", existing_len, url);
tokio::fs::OpenOptions::new()
.append(true)
.open(&downloading_path)
.await
.map_err(|e| {
ImportError::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to open partial cache file {}: {}",
downloading_path.display(),
e
),
))
})?
} else {
if existing_len > 0 {
log::debug!(
"Server returned 200 (not 206) despite Range request — restarting download for {}",
url
);
}
tokio::fs::File::create(&downloading_path)
.await
.map_err(|e| {
ImportError::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to create cache file {}: {}",
downloading_path.display(),
e
),
))
})?
};
let mut writer = BufWriter::with_capacity(256 * 1024, file);
let byte_stream = response.bytes_stream();
let url_for_errors = url.to_string();
let mapped_stream = byte_stream.map(move |result| {
result.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("HTTP stream error for {}: {}", url_for_errors, e),
)
})
});
let mut reader = StreamReader::new(mapped_stream);
tokio::io::copy(&mut reader, &mut writer)
.await
.map_err(|e| {
ImportError::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to download {} to {}: {}",
url,
downloading_path.display(),
e
),
))
})?;
writer.flush().await.map_err(|e| {
ImportError::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to flush cache file {}: {}",
downloading_path.display(),
e
),
))
})?;
drop(writer);
tokio::fs::rename(&downloading_path, cache_path)
.await
.map_err(|e| {
ImportError::Io(std::io::Error::new(
e.kind(),
format!(
"Failed to rename {} → {}: {}",
downloading_path.display(),
cache_path.display(),
e
),
))
})?;
log::debug!("Cached {} → {}", url, cache_path.display());
Ok(())
}
#[cfg(feature = "google-books")]
pub(super) async fn cleanup_cache_file(cache_path: &Path) {
if cache_path.exists() {
if let Err(e) = tokio::fs::remove_file(cache_path).await {
log::warn!(
"Failed to remove cached file {}: {}",
cache_path.display(),
e
);
}
}
let downloading_path = cache_path.with_extension("gz.downloading");
if downloading_path.exists() {
if let Err(e) = tokio::fs::remove_file(&downloading_path).await {
log::warn!(
"Failed to remove downloading remnant {}: {}",
downloading_path.display(),
e
);
}
}
}