use std::fs::{self, File};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::Duration;
use reqwest::blocking::Client;
use reqwest::header::{CONTENT_LENGTH, ETAG, IF_NONE_MATCH, RANGE};
use reqwest::StatusCode;
use sha2::{Digest, Sha256};
use super::error::BootstrapError;
const CHUNK_TIMEOUT: Duration = Duration::from_secs(30);
const HEAD_TIMEOUT: Duration = Duration::from_secs(30);
const RETRY_PER_CHUNK: u32 = 1;
#[derive(Debug)]
pub struct DownloadOutcome {
pub final_path: PathBuf,
pub downloaded: bool,
}
pub fn download_with_chunks(
urls: &[String],
target_dir: &Path,
file_name: &str,
expected_size: u64,
chunk_count: u32,
) -> Result<DownloadOutcome, BootstrapError> {
if urls.is_empty() {
return Err(BootstrapError::NetworkUnreachable {
tried_urls: Vec::new(),
last_error: "no urls provided".to_string(),
});
}
let final_path = target_dir.join(file_name);
let mut tried: Vec<String> = Vec::with_capacity(urls.len());
let mut last_err: String = String::new();
for url in urls {
tried.push(url.clone());
let client = match build_client(CHUNK_TIMEOUT) {
Ok(c) => c,
Err(e) => {
last_err = e.to_string();
continue;
}
};
let etag_path = etag_path_for(target_dir, url);
if final_path.exists() {
if let Some(stored_etag) = read_existing_etag(&etag_path) {
match try_etag_short_circuit(&client, url, &stored_etag) {
Ok(true) => {
return Ok(DownloadOutcome {
final_path,
downloaded: false,
});
}
Ok(false) => { }
Err(_e) => {
}
}
}
}
let head_client = match build_client(HEAD_TIMEOUT) {
Ok(c) => c,
Err(e) => {
last_err = e.to_string();
continue;
}
};
let (server_size, etag_opt) = match head_probe(&head_client, url) {
Ok(v) => v,
Err(e) => {
last_err = format!("HEAD {url}: {e}");
continue;
}
};
if server_size != 0 && server_size != expected_size {
last_err = format!(
"HEAD {url}: Content-Length={server_size} 与 manifest size={expected_size} 不符"
);
continue;
}
let _ = client; match download_all_chunks(url, target_dir, expected_size, chunk_count) {
Ok(()) => {}
Err(e) => {
last_err = format!("download chunks from {url}: {e}");
let _ = super::verify::cleanup_partials(target_dir);
continue;
}
}
match super::verify::assemble_chunks(target_dir, chunk_count, file_name) {
Ok(_) => {}
Err(e) => {
last_err = format!("assemble chunks: {e}");
let _ = super::verify::cleanup_partials(target_dir);
continue;
}
}
if let Some(etag) = etag_opt {
if let Err(e) = fs::write(&etag_path, etag.as_bytes()) {
eprintln!(
"[vigil-bootstrap] warn: persist etag failed: {} ({})",
etag_path.display(),
e
);
}
}
return Ok(DownloadOutcome {
final_path,
downloaded: true,
});
}
Err(BootstrapError::NetworkUnreachable {
tried_urls: tried,
last_error: last_err,
})
}
fn build_client(timeout: Duration) -> Result<Client, reqwest::Error> {
Client::builder()
.timeout(timeout)
.connect_timeout(Duration::from_secs(10))
.build()
}
fn head_probe(client: &Client, url: &str) -> Result<(u64, Option<String>), reqwest::Error> {
let resp = client.head(url).send()?;
let status = resp.status();
if !status.is_success() {
return Err(resp.error_for_status().unwrap_err());
}
let len = resp
.headers()
.get(CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
let etag = resp
.headers()
.get(ETAG)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
Ok((len, etag))
}
fn try_etag_short_circuit(
client: &Client,
url: &str,
stored_etag: &str,
) -> Result<bool, reqwest::Error> {
let resp = client
.get(url)
.header(IF_NONE_MATCH, stored_etag)
.header(RANGE, "bytes=0-0")
.send()?;
Ok(resp.status() == StatusCode::NOT_MODIFIED)
}
fn download_all_chunks(
url: &str,
target_dir: &Path,
total: u64,
chunk_count: u32,
) -> Result<(), BootstrapError> {
if chunk_count == 0 {
return Err(BootstrapError::DiskFull {
path: target_dir.to_path_buf(),
source: std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"chunk_count must be > 0",
),
});
}
let n = chunk_count as u64;
let chunk_size = total.div_ceil(n).max(1);
let mut results: Vec<Result<(), BootstrapError>> = Vec::with_capacity(chunk_count as usize);
std::thread::scope(|s| {
let mut handles = Vec::with_capacity(chunk_count as usize);
for idx in 0..chunk_count {
let start = (idx as u64) * chunk_size;
if start >= total {
handles.push(s.spawn(move || write_empty_partial(target_dir, idx)));
continue;
}
let end_excl = ((idx as u64 + 1) * chunk_size).min(total);
let end_incl = end_excl - 1;
handles.push(
s.spawn(move || fetch_chunk_with_retry(url, target_dir, idx, start, end_incl)),
);
}
for h in handles {
let r = h.join().unwrap_or_else(|_| {
Err(BootstrapError::NetworkUnreachable {
tried_urls: vec![url.to_string()],
last_error: "worker thread panicked".to_string(),
})
});
results.push(r);
}
});
for r in results {
r?;
}
Ok(())
}
fn fetch_chunk_with_retry(
url: &str,
target_dir: &Path,
idx: u32,
start: u64,
end_incl: u64,
) -> Result<(), BootstrapError> {
let mut last: Option<BootstrapError> = None;
for _attempt in 0..=RETRY_PER_CHUNK {
match fetch_chunk_once(url, target_dir, idx, start, end_incl) {
Ok(()) => return Ok(()),
Err(e) => {
last = Some(e);
}
}
}
Err(last.unwrap_or_else(|| BootstrapError::NetworkUnreachable {
tried_urls: vec![url.to_string()],
last_error: "all retries exhausted (no error captured)".to_string(),
}))
}
fn fetch_chunk_once(
url: &str,
target_dir: &Path,
idx: u32,
start: u64,
end_incl: u64,
) -> Result<(), BootstrapError> {
let client = build_client(CHUNK_TIMEOUT).map_err(|e| BootstrapError::DownloadFailed {
url: url.to_string(),
status: 0,
source: e,
})?;
let range_val = format!("bytes={start}-{end_incl}");
let resp = client
.get(url)
.header(RANGE, &range_val)
.send()
.map_err(|e| BootstrapError::DownloadFailed {
url: url.to_string(),
status: 0,
source: e,
})?;
let status = resp.status();
if !status.is_success() && status != StatusCode::PARTIAL_CONTENT {
let status_code = status.as_u16();
return Err(BootstrapError::DownloadFailed {
url: url.to_string(),
status: status_code,
source: resp.error_for_status().unwrap_err(),
});
}
let bytes = resp.bytes().map_err(|e| BootstrapError::DownloadFailed {
url: url.to_string(),
status: status.as_u16(),
source: e,
})?;
let partial_path = target_dir.join(format!(".partial.{idx}"));
let mut f = File::create(&partial_path).map_err(|e| BootstrapError::DiskFull {
path: partial_path.clone(),
source: e,
})?;
f.write_all(&bytes).map_err(|e| BootstrapError::DiskFull {
path: partial_path.clone(),
source: e,
})?;
f.flush().map_err(|e| BootstrapError::DiskFull {
path: partial_path,
source: e,
})?;
Ok(())
}
fn write_empty_partial(target_dir: &Path, idx: u32) -> Result<(), BootstrapError> {
let partial_path = target_dir.join(format!(".partial.{idx}"));
File::create(&partial_path)
.map_err(|e| BootstrapError::DiskFull {
path: partial_path,
source: e,
})
.map(|_| ())
}
fn etag_path_for(target_dir: &Path, url: &str) -> PathBuf {
let digest = Sha256::digest(url.as_bytes());
let hex_full = hex::encode(digest);
let short = &hex_full[..8];
target_dir.join(format!(".etag.{short}"))
}
fn read_existing_etag(etag_path: &Path) -> Option<String> {
fs::read_to_string(etag_path).ok().and_then(|s| {
let trimmed = s.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
})
}
#[cfg(test)]
mod chunk_math_tests {
#[test]
fn chunk_size_total_100_n_16_last_chunk_has_remainder() {
let total: u64 = 100;
let n: u64 = 16;
let chunk = total.div_ceil(n);
assert_eq!(chunk, 7);
let idx: u64 = 14;
let start = idx * chunk;
let end_excl = ((idx + 1) * chunk).min(total);
let end_incl = end_excl - 1;
assert_eq!(start, 98);
assert_eq!(end_incl, 99);
assert!(15 * chunk >= total);
}
#[test]
fn chunk_size_total_15_n_16_only_15_active_workers() {
let total: u64 = 15;
let n: u64 = 16;
let chunk = total.div_ceil(n).max(1); assert_eq!(chunk, 1);
assert_eq!(15u64 * chunk, total);
}
#[test]
fn chunk_size_total_1_n_16_only_first_active() {
let total: u64 = 1;
let n: u64 = 16;
let chunk = total.div_ceil(n).max(1); assert_eq!(chunk, 1);
}
}