use anyhow::{bail, Context, Result};
use base64::Engine;
use futures::stream::{self, StreamExt};
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tracing::{debug, error, info, instrument, trace, warn};
use veilid_core::{RecordKey, RouteId, RoutingContext, Target};
use crate::chunker::reassemble_chunks;
use crate::compression;
use crate::encryption::{EncryptionContext, SESSION_NONCE_LEN};
use crate::metrics::{global_metrics, MetricCategory};
use crate::node::{start_node, stop_node, wait_for_attach};
use crate::protocol::{decode_response, FileMetadata, Request, Response};
use crate::resume::ResumeManager;
use crate::throttle::Throttler;
type ChunkFetchResult = Result<(u64, Vec<u8>), (u64, anyhow::Error)>;
const CHUNK_FETCH_TIMEOUT: Duration = Duration::from_secs(30);
#[allow(clippy::too_many_arguments)]
#[instrument(
level = "info",
skip(output_dir, password, throttle),
fields(dht_key = %dht_key_str, output_dir = %output_dir.display(), parallel = parallel_downloads)
)]
pub async fn fetch_file(
dht_key_str: &str,
output_dir: &Path,
insecure_local_fallback: bool,
password: Option<&str>,
parallel_downloads: usize,
no_resume: bool,
trust_cache: bool,
throttle: Option<Arc<Throttler>>,
) -> Result<()> {
if insecure_local_fallback {
use crate::chunker::Chunk;
use std::fs;
use std::io::Read;
let file_name = dht_key_str; let in_dir = std::env::temp_dir().join("vflight-local").join(file_name);
if !in_dir.exists() {
anyhow::bail!("No local chunks found for {}", file_name);
}
let encryption_ctx = {
let meta_path = in_dir.join("encryption.json");
if meta_path.exists() {
let meta_str = fs::read_to_string(&meta_path)?;
let meta: serde_json::Value = serde_json::from_str(&meta_str)?;
let salt_b64 = meta["salt"]
.as_str()
.context("Missing salt in encryption metadata")?;
let nonce_b64 = meta["nonce"]
.as_str()
.context("Missing nonce in encryption metadata")?;
let salt = base64::engine::general_purpose::STANDARD.decode(salt_b64)?;
let nonce_bytes = base64::engine::general_purpose::STANDARD.decode(nonce_b64)?;
if let Some(pwd) = password {
let mut nonce = [0u8; SESSION_NONCE_LEN];
nonce.copy_from_slice(&nonce_bytes);
Some(EncryptionContext::with_session_nonce(pwd, &salt, nonce)?)
} else {
anyhow::bail!("File is encrypted. Please provide password with --password");
}
} else {
None
}
};
let mut chunks = vec![];
for entry in fs::read_dir(&in_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("bin") {
let mut data = vec![];
fs::File::open(&path)?.read_to_end(&mut data)?;
let fname = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
let index: u64 = fname
.trim_start_matches("chunk_")
.trim_end_matches(".bin")
.parse()
.unwrap_or(0);
let decrypted_data = if let Some(ref ctx) = encryption_ctx {
ctx.decrypt_chunk(index, &data)?
} else {
data
};
chunks.push(Chunk {
index,
data: decrypted_data,
hash: String::new(),
});
}
}
chunks.sort_by_key(|c| c.index);
let is_compressed = {
let comp_path = in_dir.join("compression.json");
if comp_path.exists() {
let comp_str = fs::read_to_string(&comp_path)?;
let comp_meta: serde_json::Value = serde_json::from_str(&comp_str)?;
comp_meta["compressed"].as_bool().unwrap_or(false)
} else {
false
}
};
let out_path = output_dir.join(file_name);
crate::chunker::reassemble_chunks(&chunks, &out_path)?;
if is_compressed {
let compressed_data = std::fs::read(&out_path)?;
let decompressed = compression::decompress(&compressed_data)?;
std::fs::write(&out_path, &decompressed)?;
info!(
"Decompressed {} -> {} bytes",
compressed_data.len(),
decompressed.len()
);
}
info!(
"\n[INSECURE LOCAL MODE] Reassembled file to {}",
out_path.display()
);
return Ok(());
}
let dht_key: RecordKey = dht_key_str.parse().context("Invalid DHT key format")?;
info!("Starting Veilid node");
let (api, mut rx) = start_node("fetcher").await?;
debug!("Waiting for network attachment");
wait_for_attach(&mut rx).await?;
info!("Attached to network");
let routing_context = api
.routing_context()
.context("Failed to get routing context")?;
debug!(dht_key = %dht_key, "Opening DHT record");
let dht_open_start = Instant::now();
let _dht_record = routing_context
.open_dht_record(dht_key.clone(), None)
.await
.context("Failed to open DHT record")?;
global_metrics().record(MetricCategory::DhtOperation, dht_open_start.elapsed(), 0);
debug!("Reading file metadata from DHT");
let dht_read_start = Instant::now();
let metadata_value = routing_context
.get_dht_value(dht_key.clone(), 0, true)
.await
.context("Failed to read DHT value")?
.context("No metadata found in DHT record")?;
global_metrics().record(
MetricCategory::DhtOperation,
dht_read_start.elapsed(),
metadata_value.data().len() as u64,
);
let metadata: FileMetadata =
serde_json::from_slice(metadata_value.data()).context("Failed to parse metadata")?;
let is_encrypted = metadata.encryption_salt.is_some() && metadata.encryption_nonce.is_some();
info!(
name = %metadata.name,
size = metadata.size,
chunks = metadata.total_chunks,
encrypted = is_encrypted,
"File metadata retrieved"
);
std::fs::create_dir_all(output_dir).context("Failed to create output directory")?;
let mut resume_manager = if no_resume {
ResumeManager::new_fresh(output_dir, dht_key_str, &metadata)?
} else {
ResumeManager::init(output_dir, dht_key_str, &metadata)?
};
if !trust_cache && !no_resume && resume_manager.completed_count() > 0 {
info!(
"Verifying {} cached chunks...",
resume_manager.completed_count()
);
let invalidated = resume_manager.verify_and_invalidate_corrupted()?;
if !invalidated.is_empty() {
warn!(
"Found {} corrupted cached chunks, will re-download",
invalidated.len()
);
}
}
let pending_chunks = resume_manager.pending_chunks();
let completed_count = resume_manager.completed_count();
let total_chunks = metadata.total_chunks;
println!("\n========================================");
println!("File: {}", metadata.name);
println!("Size: {} bytes", metadata.size);
println!("Chunks: {}", total_chunks);
if metadata.compressed {
println!("Compression: ENABLED");
}
if is_encrypted {
println!("Encryption: ENABLED");
}
if completed_count > 0 {
println!(
"Resuming: {}/{} chunks cached",
completed_count, total_chunks
);
}
if let Some(ref t) = throttle {
println!("Throttle: {} KB/s", t.rate_kb_s());
}
println!("========================================\n");
let encryption_ctx = if is_encrypted && !pending_chunks.is_empty() {
let salt_b64 = metadata.encryption_salt.as_ref().unwrap();
let nonce_b64 = metadata.encryption_nonce.as_ref().unwrap();
let salt = base64::engine::general_purpose::STANDARD
.decode(salt_b64)
.context("Failed to decode encryption salt")?;
let nonce_bytes = base64::engine::general_purpose::STANDARD
.decode(nonce_b64)
.context("Failed to decode encryption nonce")?;
if let Some(pwd) = password {
let mut nonce = [0u8; SESSION_NONCE_LEN];
if nonce_bytes.len() != SESSION_NONCE_LEN {
bail!("Invalid encryption nonce length");
}
nonce.copy_from_slice(&nonce_bytes);
Some(EncryptionContext::with_session_nonce(pwd, &salt, nonce)?)
} else {
bail!("File is encrypted. Please provide password with --password");
}
} else {
None
};
if pending_chunks.is_empty() {
info!("All chunks already cached, reassembling...");
}
let route_blob = base64::engine::general_purpose::STANDARD
.decode(&metadata.route_blob)
.context("Failed to decode route blob")?;
let seeder_route = api
.import_remote_private_route(route_blob)
.context("Failed to import seeder's route")?;
let routing_context = Arc::new(routing_context);
let seeder_route = Arc::new(seeder_route);
let encryption_ctx = encryption_ctx.map(Arc::new);
let chunk_hashes = Arc::new(metadata.chunk_hashes.clone());
let resume_manager = Arc::new(Mutex::new(resume_manager));
if !pending_chunks.is_empty() {
info!(
parallel = parallel_downloads.max(1),
pending = pending_chunks.len(),
total = total_chunks,
"Starting parallel chunk downloads"
);
let results: Vec<ChunkFetchResult> = stream::iter(pending_chunks)
.map(|i| {
let rc = Arc::clone(&routing_context);
let sr = Arc::clone(&seeder_route);
let ec = encryption_ctx.clone();
let ch = Arc::clone(&chunk_hashes);
let rm = Arc::clone(&resume_manager);
let th = throttle.clone();
async move {
if let Some(ref t) = th {
t.acquire(crate::protocol::CHUNK_SIZE).await;
}
let result = fetch_single_chunk(i, &rc, &sr, ec.as_deref(), &ch).await;
if let Ok((idx, ref data)) = result {
let mut mgr = rm.lock().await;
if let Err(e) = mgr.save_and_mark_complete(idx, data) {
warn!(chunk = idx, error = %e, "Failed to save chunk to cache");
}
}
result
}
})
.buffer_unordered(parallel_downloads.max(1))
.collect()
.await;
let failed_chunks: Vec<u64> = results
.iter()
.filter_map(|r| match r {
Err((idx, _)) => Some(*idx),
Ok(_) => None,
})
.collect();
if !failed_chunks.is_empty() {
bail!(
"Failed to fetch {} chunks: {:?}",
failed_chunks.len(),
failed_chunks
);
}
}
let output_path = output_dir.join(&metadata.name);
info!(output = %output_path.display(), "Reassembling file from cache");
let resume_manager = Arc::try_unwrap(resume_manager)
.map_err(|_| anyhow::anyhow!("Failed to unwrap resume manager"))?
.into_inner();
let mut chunk_structs = Vec::with_capacity(total_chunks as usize);
for i in 0..total_chunks {
let data = resume_manager.load_chunk(i)?;
chunk_structs.push(crate::chunker::Chunk {
index: i,
data,
hash: metadata
.chunk_hashes
.get(i as usize)
.cloned()
.unwrap_or_default(),
});
}
reassemble_chunks(&chunk_structs, &output_path)?;
if metadata.compressed {
let compressed_data = std::fs::read(&output_path)?;
let decompressed = compression::decompress(&compressed_data)?;
std::fs::write(&output_path, &decompressed)?;
info!(
"Decompressed {} -> {} bytes",
compressed_data.len(),
decompressed.len()
);
}
resume_manager.cleanup()?;
println!("\n========================================");
println!("Download complete!");
println!("Output: {}", output_path.display());
println!("========================================");
debug!("Cleaning up resources");
api.release_private_route((*seeder_route).clone()).ok();
routing_context.close_dht_record(dht_key).await.ok();
stop_node(api).await?;
info!("Fetch complete");
Ok(())
}
#[instrument(
level = "debug",
skip(routing_context, seeder_route, encryption_ctx, chunk_hashes)
)]
async fn fetch_single_chunk(
index: u64,
routing_context: &RoutingContext,
seeder_route: &RouteId,
encryption_ctx: Option<&EncryptionContext>,
chunk_hashes: &[String],
) -> Result<(u64, Vec<u8>), (u64, anyhow::Error)> {
debug!(chunk_index = index, "Fetching chunk");
let request = Request::GetChunk { index };
let request_bytes = serde_json::to_vec(&request).map_err(|e| (index, e.into()))?;
let transfer_start = Instant::now();
let response_bytes = tokio::time::timeout(
CHUNK_FETCH_TIMEOUT,
routing_context.app_call(Target::RouteId(seeder_route.clone()), request_bytes),
)
.await
.map_err(|_| {
error!(chunk_index = index, "Chunk request timed out");
(
index,
anyhow::anyhow!(
"Chunk {} request timed out after {}s",
index,
CHUNK_FETCH_TIMEOUT.as_secs()
),
)
})?
.map_err(|e| {
error!(chunk_index = index, error = %e, "Chunk request failed");
(index, anyhow::anyhow!("Chunk request failed: {}", e))
})?;
let transfer_elapsed = transfer_start.elapsed();
trace!(
chunk_index = index,
response_len = response_bytes.len(),
"Received response"
);
match decode_response(&response_bytes) {
Ok(Response::ChunkData {
index: resp_index,
data: received_data,
hash,
}) => {
global_metrics().record(
MetricCategory::ChunkTransfer,
transfer_elapsed,
received_data.len() as u64,
);
let chunk_data = if let Some(ctx) = encryption_ctx {
ctx.decrypt_chunk(resp_index, &received_data).map_err(|e| {
error!(
chunk_index = index,
error = %e,
"Decryption failed (wrong password?)"
);
(index, e)
})?
} else {
received_data
};
let hash_start = Instant::now();
let computed_hash = blake3::hash(&chunk_data).to_hex().to_string();
global_metrics().record(
MetricCategory::HashCompute,
hash_start.elapsed(),
chunk_data.len() as u64,
);
if let Some(expected_hash) = chunk_hashes.get(resp_index as usize) {
if &computed_hash != expected_hash {
error!(
chunk_index = index,
expected_hash = %expected_hash,
computed_hash = %computed_hash,
"Chunk hash mismatch - metadata hash"
);
return Err((index, anyhow::anyhow!("Hash mismatch for chunk {}", index)));
}
}
if encryption_ctx.is_none() && computed_hash != hash {
error!(
chunk_index = index,
expected_hash = %hash,
computed_hash = %computed_hash,
"Chunk hash mismatch - response hash"
);
return Err((index, anyhow::anyhow!("Hash mismatch for chunk {}", index)));
}
trace!(chunk_index = resp_index, "Chunk verified successfully");
Ok((resp_index, chunk_data))
}
Ok(Response::Error { message }) => {
warn!(chunk_index = index, error = %message, "Seeder returned error");
Err((index, anyhow::anyhow!("Seeder error: {}", message)))
}
Ok(_) => {
warn!(chunk_index = index, "Received unexpected response type");
Err((index, anyhow::anyhow!("Unexpected response type")))
}
Err(e) => {
warn!(chunk_index = index, error = %e, "Failed to parse response");
Err((index, anyhow::anyhow!("Parse error: {}", e)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parallel_minimum_one() {
assert_eq!(0_usize.max(1), 1);
assert_eq!(1_usize.max(1), 1);
assert_eq!(8_usize.max(1), 8);
}
#[test]
fn test_parallel_results_correct_indexing() {
let results: Vec<ChunkFetchResult> = vec![
Ok((2, vec![30, 31, 32])),
Ok((0, vec![10, 11, 12])),
Ok((1, vec![20, 21, 22])),
];
let total_chunks = 3;
let mut chunks: Vec<Vec<u8>> = vec![Vec::new(); total_chunks];
let mut failed_chunks = Vec::new();
for result in results {
match result {
Ok((index, data)) => {
chunks[index as usize] = data;
}
Err((index, _)) => {
failed_chunks.push(index);
}
}
}
assert_eq!(chunks[0], vec![10, 11, 12]);
assert_eq!(chunks[1], vec![20, 21, 22]);
assert_eq!(chunks[2], vec![30, 31, 32]);
assert!(failed_chunks.is_empty());
}
#[test]
fn test_parallel_collects_all_failures() {
let results: Vec<ChunkFetchResult> = vec![
Ok((0, vec![1, 2, 3])),
Err((1, anyhow::anyhow!("Network error"))),
Ok((2, vec![4, 5, 6])),
Err((3, anyhow::anyhow!("Hash mismatch"))),
Err((4, anyhow::anyhow!("Decryption failed"))),
];
let total_chunks = 5;
let mut chunks: Vec<Vec<u8>> = vec![Vec::new(); total_chunks];
let mut failed_chunks = Vec::new();
for result in results {
match result {
Ok((index, data)) => {
chunks[index as usize] = data;
}
Err((index, _)) => {
failed_chunks.push(index);
}
}
}
assert_eq!(chunks[0], vec![1, 2, 3]);
assert_eq!(chunks[2], vec![4, 5, 6]);
assert_eq!(failed_chunks.len(), 3);
assert!(failed_chunks.contains(&1));
assert!(failed_chunks.contains(&3));
assert!(failed_chunks.contains(&4));
}
#[test]
fn test_hash_verification() {
let data = b"test chunk data";
let computed_hash = blake3::hash(data).to_hex().to_string();
let expected_hash = blake3::hash(data).to_hex().to_string();
assert_eq!(computed_hash, expected_hash);
let different_data = b"different data";
let different_hash = blake3::hash(different_data).to_hex().to_string();
assert_ne!(computed_hash, different_hash);
}
#[tokio::test]
async fn test_insecure_local_fetch_encrypted() {
use crate::encryption::{generate_salt, EncryptionContext};
use std::fs;
use std::io::Write;
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let file_name = "test_encrypted_file.txt";
let local_dir = std::env::temp_dir().join("vflight-local").join(file_name);
fs::create_dir_all(&local_dir).unwrap();
let password = "test_password";
let salt = generate_salt();
let ctx = EncryptionContext::new(password, &salt).unwrap();
let meta_path = local_dir.join("encryption.json");
let meta = serde_json::json!({
"salt": base64::engine::general_purpose::STANDARD.encode(&salt),
"nonce": base64::engine::general_purpose::STANDARD.encode(ctx.session_nonce()),
});
fs::write(&meta_path, serde_json::to_string_pretty(&meta).unwrap()).unwrap();
let original_data = b"Hello, encrypted world!";
let encrypted_data = ctx.encrypt_chunk(0, original_data).unwrap();
let chunk_path = local_dir.join("chunk_000000.bin");
let mut f = fs::File::create(&chunk_path).unwrap();
f.write_all(&encrypted_data).unwrap();
let output_dir = temp_dir.path();
let result = fetch_file(
file_name,
output_dir,
true, Some(password),
8, true, false, None, )
.await;
assert!(result.is_ok());
let output_path = output_dir.join(file_name);
let output_data = fs::read(&output_path).unwrap();
assert_eq!(output_data, original_data);
fs::remove_dir_all(&local_dir).ok();
}
#[tokio::test]
async fn test_insecure_local_fetch_unencrypted() {
use std::fs;
use std::io::Write;
use tempfile::tempdir;
let temp_dir = tempdir().unwrap();
let file_name = "test_unencrypted_file.txt";
let local_dir = std::env::temp_dir().join("vflight-local").join(file_name);
fs::create_dir_all(&local_dir).unwrap();
let chunk0_data = b"chunk zero data";
let chunk1_data = b"chunk one data";
let chunk0_path = local_dir.join("chunk_000000.bin");
let chunk1_path = local_dir.join("chunk_000001.bin");
fs::File::create(&chunk0_path)
.unwrap()
.write_all(chunk0_data)
.unwrap();
fs::File::create(&chunk1_path)
.unwrap()
.write_all(chunk1_data)
.unwrap();
let output_dir = temp_dir.path();
let result = fetch_file(
file_name, output_dir, true, None, 8, true, false, None, )
.await;
assert!(result.is_ok());
let output_path = output_dir.join(file_name);
let output_data = fs::read(&output_path).unwrap();
let mut expected = Vec::new();
expected.extend_from_slice(chunk0_data);
expected.extend_from_slice(chunk1_data);
assert_eq!(output_data, expected);
fs::remove_dir_all(&local_dir).ok();
}
}