use anyhow::{anyhow, Result};
use crate::format::{format_bytes, format_speed, format_duration};
use crate::password::{prompt_password, derive_kek};
use firecloud_core::ChunkHash;
use firecloud_crypto::decrypt;
use firecloud_net::{
FireCloudNode, NodeConfig, NodeEvent, OutboundRequestId, PeerId, TransferRequest,
TransferResponse,
};
use firecloud_storage::{decompress, ChunkStore, ManifestStore};
use indicatif::{ProgressBar, ProgressStyle};
use std::collections::HashMap;
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
const MAX_PARALLEL_DOWNLOADS: usize = 8;
struct DownloadStats {
start_time: Instant,
bytes_downloaded: u64,
chunks_downloaded: usize,
chunks_local: usize,
#[allow(dead_code)]
total_chunks: usize,
#[allow(dead_code)]
total_size: u64,
}
impl DownloadStats {
fn new(total_chunks: usize, total_size: u64) -> Self {
Self {
start_time: Instant::now(),
bytes_downloaded: 0,
chunks_downloaded: 0,
chunks_local: 0,
total_chunks,
total_size,
}
}
fn add_downloaded(&mut self, bytes: u64) {
self.bytes_downloaded += bytes;
self.chunks_downloaded += 1;
}
fn add_local(&mut self) {
self.chunks_local += 1;
}
fn elapsed_secs(&self) -> f64 {
self.start_time.elapsed().as_secs_f64()
}
fn display_summary(&self) {
println!();
println!("📊 Transfer Statistics:");
println!(" Chunks from network: {}", self.chunks_downloaded);
println!(" Chunks from cache: {}", self.chunks_local);
println!(" Data transferred: {}", format_bytes(self.bytes_downloaded));
println!(" Time elapsed: {}", format_duration(self.elapsed_secs()));
if self.bytes_downloaded > 0 {
println!(" Transfer speed: {}", format_speed(self.bytes_downloaded as f64 / self.elapsed_secs()));
}
}
}
pub async fn run(
data_dir: PathBuf,
hash: Option<String>,
file_id: Option<String>,
output: Option<PathBuf>,
) -> Result<()> {
match (hash, file_id) {
(Some(h), None) => download_chunk(data_dir, h, output).await,
(None, Some(f)) => download_file(data_dir, f, output).await,
(None, None) => Err(anyhow!("Either --hash or --file must be specified")),
(Some(_), Some(_)) => Err(anyhow!("Cannot specify both --hash and --file")),
}
}
async fn download_chunk(data_dir: PathBuf, hash: String, output: Option<PathBuf>) -> Result<()> {
info!("Downloading chunk: {}", hash);
let store_path = data_dir.join("chunks");
let store = ChunkStore::open(&store_path)?;
let chunk_hash =
ChunkHash::from_hex(&hash).map_err(|e| anyhow!("Invalid chunk hash: {}", e))?;
if let Ok(Some(chunk)) = store.get(&chunk_hash) {
info!("✅ Chunk found locally ({} bytes)", chunk.data.len());
if let Some(output_path) = output {
let mut file = File::create(&output_path)?;
file.write_all(&chunk.data)?;
info!("Written to: {}", output_path.display());
} else {
info!("Chunk data: {} bytes (use --output to save)", chunk.data.len());
}
return Ok(());
}
info!("Chunk not found locally, searching peers...");
let (chunk_data, original_size) = fetch_chunk_from_network(&hash).await?;
if let Some(output_path) = output {
let mut file = File::create(&output_path)?;
file.write_all(&chunk_data)?;
info!("Written to: {}", output_path.display());
} else {
info!("Chunk data: {} bytes (use --output to save)", chunk_data.len());
}
let metadata = firecloud_core::ChunkMetadata {
hash: chunk_hash,
size: chunk_data.len() as u64,
original_size,
compression: firecloud_core::CompressionType::None,
encrypted: true,
};
let chunk = firecloud_core::Chunk {
metadata,
data: chunk_data.into(),
};
store.put(&chunk)?;
info!("Cached chunk locally");
Ok(())
}
async fn download_file(
data_dir: PathBuf,
file_id_str: String,
output: Option<PathBuf>,
) -> Result<()> {
info!("Downloading file: {}", file_id_str);
let chunk_store_path = data_dir.join("chunks");
let chunk_store = ChunkStore::open(&chunk_store_path)?;
let manifest_store_path = data_dir.join("manifests");
let manifest_store = ManifestStore::open(&manifest_store_path)?;
let manifest = match manifest_store.get_by_id_str(&file_id_str)? {
Some(m) => {
info!("✅ Manifest found locally");
m
}
None => {
info!("Manifest not found locally, searching DHT for providers...");
fetch_manifest_from_dht(&file_id_str, &manifest_store).await?
}
};
println!();
println!("📁 File: {} ({} bytes)", manifest.metadata.name, manifest.metadata.size);
println!("📦 Chunks: {}", manifest.chunks.len());
let dek: [u8; 32] = if let Some(encrypted_dek_bytes) = &manifest.encrypted_dek {
if let Some(salt_bytes) = &manifest.salt {
println!();
let password = prompt_password("🔐 Enter password to decrypt file: ")?;
println!("⏳ Deriving decryption key...");
let kek = derive_kek(&password, salt_bytes)?;
let dek_vec = kek.decrypt_dek(encrypted_dek_bytes)
.map_err(|e| anyhow!("Failed to decrypt DEK (wrong password?): {}", e))?;
println!("✅ Password verified\n");
dek_vec.try_into()
.map_err(|_| anyhow!("Invalid DEK size after decryption"))?
} else {
println!("⚠️ Warning: File uses legacy unencrypted DEK format");
encrypted_dek_bytes.clone().try_into()
.map_err(|_| anyhow!("Invalid DEK size in manifest"))?
}
} else {
return Err(anyhow!("No DEK in manifest - file cannot be decrypted"));
};
let output_path = output.unwrap_or_else(|| PathBuf::from(&manifest.metadata.name));
let mut local_chunks: HashMap<ChunkHash, Vec<u8>> = HashMap::new();
let mut missing_hashes: Vec<String> = Vec::new();
let mut stats = DownloadStats::new(manifest.chunks.len(), manifest.metadata.size);
println!("🔍 Checking for local chunks...");
for chunk_hash in &manifest.chunks {
match chunk_store.get(chunk_hash)? {
Some(chunk) => {
debug!(" ✅ {} (local)", &chunk_hash.to_hex()[..16]);
local_chunks.insert(chunk_hash.clone(), chunk.data.to_vec());
stats.add_local();
}
None => {
debug!(" ❌ {} (need to fetch)", &chunk_hash.to_hex()[..16]);
missing_hashes.push(chunk_hash.to_hex());
}
}
}
println!(" Found {} chunks locally, need to fetch {}", local_chunks.len(), missing_hashes.len());
if !missing_hashes.is_empty() {
let fetched = fetch_chunks_parallel(&chunk_store, &missing_hashes, &mut stats).await?;
for (hash_str, data) in fetched {
let chunk_hash = ChunkHash::from_hex(&hash_str)?;
local_chunks.insert(chunk_hash, data);
}
}
println!();
println!("📝 Reassembling file...");
let reassemble_bar = ProgressBar::new(manifest.chunks.len() as u64);
reassemble_bar.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{bar:40.cyan/blue}] {pos}/{len} chunks")
.unwrap()
.progress_chars("█▓░"),
);
let mut output_file = File::create(&output_path)?;
let mut total_written = 0u64;
for chunk_hash in &manifest.chunks {
let encrypted_data = local_chunks
.get(chunk_hash)
.ok_or_else(|| anyhow!("Missing chunk: {}", chunk_hash.to_hex()))?;
let compressed = decrypt(&dek, encrypted_data)
.map_err(|e| anyhow!("Failed to decrypt chunk {}: {}", chunk_hash, e))?;
let original = decompress(&compressed, false)
.map_err(|e| anyhow!("Failed to decompress chunk {}: {}", chunk_hash, e))?;
output_file.write_all(&original)?;
total_written += original.len() as u64;
reassemble_bar.inc(1);
}
reassemble_bar.finish_with_message("Done!");
output_file.flush()?;
stats.display_summary();
println!();
println!("✅ Download complete!");
println!("📁 File: {}", output_path.display());
println!("📏 Size: {} bytes", total_written);
let file_content = std::fs::read(&output_path)?;
let content_hash = ChunkHash::hash(&file_content);
if content_hash == manifest.metadata.content_hash {
println!("🔐 Content hash verified ✓");
} else {
println!("⚠️ Content hash mismatch! File may be corrupted.");
}
Ok(())
}
async fn fetch_manifest_from_dht(
file_id_str: &str,
manifest_store: &ManifestStore,
) -> Result<firecloud_core::FileManifest> {
use firecloud_core::FileManifest;
let config = NodeConfig {
port: 0,
enable_mdns: true,
bootstrap_peers: Vec::new(),
bootstrap_relays: vec![],
};
let mut node = FireCloudNode::new(config).await?;
info!("Local peer ID: {}", node.local_peer_id());
let _peers = discover_peers(&mut node, Duration::from_secs(3)).await?;
info!("🔍 Querying DHT for file providers...");
let _query_id = node.find_file_providers(file_id_str);
let timeout = tokio::time::Instant::now() + Duration::from_secs(10);
let mut found_providers: Vec<PeerId> = Vec::new();
while tokio::time::Instant::now() < timeout {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::ProvidersFound { key, providers, .. } if key.contains(file_id_str) => {
info!("📡 Found {} providers for file", providers.len());
found_providers = providers;
break;
}
NodeEvent::PeerDiscovered(peer) => {
debug!("Discovered peer: {}", peer);
}
_ => {}
}
}
}
}
}
if found_providers.is_empty() {
let discovered = node.known_peers().clone();
if !discovered.is_empty() {
info!("No DHT providers found, trying {} mDNS peers...", discovered.len());
found_providers = discovered.into_iter().collect();
} else {
return Err(anyhow!(
"No providers found for file {}.\nMake sure the file exists on another node that is running.",
file_id_str
));
}
}
if let Some(best) = node.choose_best_peer(&found_providers) {
found_providers.retain(|p| *p != best);
found_providers.insert(0, best);
}
for provider in &found_providers {
let is_local = node.is_local_peer(provider);
info!("📥 Requesting manifest from {}{}...", provider,
if is_local { " (local)" } else { "" });
let request_id = node.send_transfer_request(
provider,
TransferRequest::GetManifest { file_id: file_id_str.to_string() },
);
let response_timeout = tokio::time::Instant::now() + Duration::from_secs(10);
while tokio::time::Instant::now() < response_timeout {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::TransferResponse { request_id: rid, response, peer } => {
if rid == request_id {
match response {
TransferResponse::Manifest { manifest_data, .. } => {
info!("✅ Received manifest from {}", peer);
let file_manifest: FileManifest = ciborium::from_reader(&manifest_data[..])
.map_err(|e| anyhow!("Failed to deserialize manifest: {}", e))?;
manifest_store.put(&file_manifest)?;
info!("📦 Manifest cached locally");
return Ok(file_manifest);
}
TransferResponse::NotFound { .. } => {
info!(" ❌ Manifest not found on {}", peer);
break;
}
TransferResponse::Error { message } => {
warn!(" ❌ Error from {}: {}", peer, message);
break;
}
_ => {}
}
}
}
NodeEvent::TransferFailed { request_id: rid, error, peer } => {
if rid == request_id {
warn!(" ❌ Transfer failed from {}: {}", peer, error);
break;
}
}
_ => {}
}
}
}
}
}
}
Err(anyhow!("Failed to fetch manifest from any provider for file {}", file_id_str))
}
async fn fetch_chunk_from_network(hash: &str) -> Result<(Vec<u8>, u64)> {
let config = NodeConfig {
port: 0,
enable_mdns: true,
bootstrap_peers: Vec::new(),
bootstrap_relays: vec![],
};
let mut node = FireCloudNode::new(config).await?;
info!("Local peer ID: {}", node.local_peer_id());
let peers = discover_peers(&mut node, Duration::from_secs(5)).await?;
if peers.is_empty() {
return Err(anyhow!("No peers found. Run `firecloud node` on another device first."));
}
let peers_with_chunk = find_peers_with_chunk(&mut node, &peers, hash).await?;
if peers_with_chunk.is_empty() {
return Err(anyhow!("Chunk not found on any peer. Hash: {}", hash));
}
let target_peer = node.choose_best_peer(&peers_with_chunk).unwrap_or(peers_with_chunk[0]);
info!("📥 Requesting chunk from {}{}...", target_peer,
if node.is_local_peer(&target_peer) { " (local)" } else { "" });
let request_id = node.send_transfer_request(
&target_peer,
TransferRequest::GetChunk { hash: hash.to_string() },
);
let timeout = tokio::time::Instant::now() + Duration::from_secs(30);
while tokio::time::Instant::now() < timeout {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::TransferResponse { request_id: rid, response, peer } => {
if rid == request_id {
match response {
TransferResponse::Chunk { data, original_size, .. } => {
info!("✅ Received chunk ({} bytes)", data.len());
return Ok((data, original_size));
}
TransferResponse::NotFound { .. } => {
return Err(anyhow!("Chunk not found on peer {}", peer));
}
TransferResponse::Error { message } => {
return Err(anyhow!("Error from peer {}: {}", peer, message));
}
_ => {}
}
}
}
NodeEvent::TransferFailed { request_id: rid, error, peer } => {
if rid == request_id {
return Err(anyhow!("Transfer failed from {}: {}", peer, error));
}
}
_ => {}
}
}
}
}
}
Err(anyhow!("Download timed out"))
}
async fn fetch_chunks_parallel(
chunk_store: &ChunkStore,
hash_strs: &[String],
stats: &mut DownloadStats,
) -> Result<HashMap<String, Vec<u8>>> {
if hash_strs.is_empty() {
return Ok(HashMap::new());
}
let config = NodeConfig {
port: 0,
enable_mdns: true,
bootstrap_peers: Vec::new(),
bootstrap_relays: vec![],
};
let mut node = FireCloudNode::new(config).await?;
let peers = discover_peers(&mut node, Duration::from_secs(5)).await?;
if peers.is_empty() {
return Err(anyhow!("No peers found. Run `firecloud node` on another device first."));
}
let progress = ProgressBar::new(hash_strs.len() as u64);
progress.set_style(
ProgressStyle::default_bar()
.template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} chunks")
.unwrap()
.progress_chars("█▓░"),
);
progress.set_message(format!(
"📥 Downloading {} chunks from {} peers...",
hash_strs.len(),
peers.len(),
));
let mut chunk_to_peer: HashMap<String, PeerId> = HashMap::new();
println!("🔍 Locating chunks on peers...");
for hash_str in hash_strs {
let peers_with = find_peers_with_chunk(&mut node, &peers, hash_str).await?;
if peers_with.is_empty() {
return Err(anyhow!("Chunk {} not found on any peer", &hash_str[..16]));
}
let best_peer = node.choose_best_peer(&peers_with).unwrap_or(peers_with[0]);
chunk_to_peer.insert(hash_str.clone(), best_peer);
}
let mut fetched: HashMap<String, Vec<u8>> = HashMap::new();
let mut pending_requests: HashMap<OutboundRequestId, String> = HashMap::new();
let total_chunks = hash_strs.len();
let mut sent_count = 0;
for hash_str in hash_strs.iter().take(MAX_PARALLEL_DOWNLOADS) {
let peer = chunk_to_peer.get(hash_str).unwrap();
let request_id = node.send_transfer_request(
peer,
TransferRequest::GetChunk { hash: hash_str.clone() },
);
pending_requests.insert(request_id, hash_str.clone());
sent_count += 1;
}
let timeout = tokio::time::Instant::now() + Duration::from_secs(120);
while fetched.len() < total_chunks && tokio::time::Instant::now() < timeout {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(50)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::TransferResponse { request_id, response, peer } => {
if let Some(_hash_str) = pending_requests.remove(&request_id) {
match response {
TransferResponse::Chunk { hash, data, original_size } => {
let chunk_hash = ChunkHash::from_hex(&hash)?;
let metadata = firecloud_core::ChunkMetadata {
hash: chunk_hash,
size: data.len() as u64,
original_size,
compression: firecloud_core::CompressionType::None,
encrypted: true,
};
let chunk = firecloud_core::Chunk {
metadata,
data: data.clone().into(),
};
chunk_store.put(&chunk)?;
let bytes = data.len() as u64;
stats.add_downloaded(bytes);
fetched.insert(hash, data);
progress.inc(1);
if sent_count < total_chunks {
let next_hash = &hash_strs[sent_count];
let next_peer = chunk_to_peer.get(next_hash).unwrap();
let next_req_id = node.send_transfer_request(
next_peer,
TransferRequest::GetChunk { hash: next_hash.clone() },
);
pending_requests.insert(next_req_id, next_hash.clone());
sent_count += 1;
}
}
TransferResponse::NotFound { hash } => {
warn!(" ❌ Chunk {} not found on {}", &hash[..16], peer);
}
TransferResponse::Error { message } => {
warn!(" ❌ Error from {}: {}", peer, message);
}
_ => {}
}
}
}
NodeEvent::TransferFailed { request_id, error, peer } => {
if let Some(hash_str) = pending_requests.remove(&request_id) {
warn!(" ❌ Transfer failed for {} from {}: {}", &hash_str[..16], peer, error);
}
}
_ => {}
}
}
}
}
}
progress.finish_with_message("Downloads complete!");
if fetched.len() < total_chunks {
return Err(anyhow!(
"Failed to fetch all chunks: got {}/{}",
fetched.len(),
total_chunks
));
}
Ok(fetched)
}
async fn discover_peers(node: &mut FireCloudNode, duration: Duration) -> Result<Vec<PeerId>> {
let mut peers = Vec::new();
let deadline = tokio::time::Instant::now() + duration;
info!("🔍 Discovering peers...");
while tokio::time::Instant::now() < deadline {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::PeerDiscovered(peer_id) => {
if peer_id != node.local_peer_id() && !peers.contains(&peer_id) {
info!("📡 Found peer: {}", peer_id);
peers.push(peer_id);
}
}
NodeEvent::Listening(addr) => {
info!("👂 Listening on {}", addr);
}
_ => {}
}
}
}
}
}
info!("Found {} peer(s)", peers.len());
Ok(peers)
}
async fn find_peers_with_chunk(
node: &mut FireCloudNode,
peers: &[PeerId],
hash: &str,
) -> Result<Vec<PeerId>> {
let mut pending: HashMap<_, _> = HashMap::new();
let mut peers_with_chunk = Vec::new();
for peer in peers {
let request_id = node.send_transfer_request(
peer,
TransferRequest::HasChunk { hash: hash.to_string() },
);
pending.insert(request_id, *peer);
}
let timeout = tokio::time::Instant::now() + Duration::from_secs(10);
while !pending.is_empty() && tokio::time::Instant::now() < timeout {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::TransferResponse { request_id, response, peer } => {
if pending.remove(&request_id).is_some() {
match response {
TransferResponse::HasChunk { has_it: true, .. } => {
debug!(" ✅ Peer {} has chunk", peer);
peers_with_chunk.push(peer);
}
TransferResponse::HasChunk { has_it: false, .. } => {
debug!(" ❌ Peer {} doesn't have chunk", peer);
}
_ => {}
}
}
}
NodeEvent::TransferFailed { request_id, .. } => {
pending.remove(&request_id);
}
_ => {}
}
}
}
}
}
Ok(peers_with_chunk)
}