use anyhow::Result;
use crate::format::{format_bytes, format_speed, format_duration};
use crate::password::{prompt_use_encryption, prompt_password_with_confirmation, derive_kek_with_new_salt};
use firecloud_core::{Bytes, ChunkHash, FileId, FileManifest, FileMetadata};
use firecloud_crypto::{encrypt, generate_dek};
use firecloud_net::{FireCloudNode, NodeConfig, NodeEvent, TransferRequest, TransferResponse};
use firecloud_storage::{compress, ChunkStore, CompressionLevel, FileChunker, ManifestStore};
use indicatif::{ProgressBar, ProgressStyle};
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tracing::{info, warn};
#[allow(dead_code)]
struct UploadStats {
start_time: Instant,
bytes_uploaded: u64,
chunks_processed: usize,
chunks_distributed: usize,
total_chunks: usize,
total_size: u64,
}
impl UploadStats {
fn new(total_chunks: usize, total_size: u64) -> Self {
Self {
start_time: Instant::now(),
bytes_uploaded: 0,
chunks_processed: 0,
chunks_distributed: 0,
total_chunks,
total_size,
}
}
fn elapsed_secs(&self) -> f64 {
self.start_time.elapsed().as_secs_f64()
}
}
pub async fn run(data_dir: PathBuf, file_path: PathBuf) -> Result<()> {
info!("Uploading file: {}", file_path.display());
let chunk_store_path = data_dir.join("chunks");
let chunk_store = ChunkStore::open(&chunk_store_path)?;
let manifest_store_path = data_dir.join("manifests");
let manifest_store = ManifestStore::open(&manifest_store_path)?;
let mut file = File::open(&file_path)?;
let file_size = file.metadata()?.len();
let file_name = file_path
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| "unknown".to_string());
let mut file_content = Vec::with_capacity(file_size as usize);
file.read_to_end(&mut file_content)?;
let content_hash = ChunkHash::hash(&file_content);
info!("File: {} ({} bytes)", file_name, file_size);
info!("Content hash: {}", content_hash.to_hex());
let chunker = FileChunker::new();
let chunks = chunker.chunk_bytes(&file_content)?;
let num_chunks = chunks.len();
println!("📦 Chunking file into {} pieces...", num_chunks);
let dek = generate_dek();
let file_id = FileId::new();
println!("📝 File ID: {}", file_id);
let mut stats = UploadStats::new(num_chunks, file_size);
let process_progress = ProgressBar::new(num_chunks as u64);
process_progress.set_style(
ProgressStyle::default_bar()
.template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} chunks")
.unwrap()
.progress_chars("█▓░"),
);
process_progress.set_message("🔐 Processing chunks (compress + encrypt)...");
let mut chunk_hashes: Vec<ChunkHash> = Vec::new();
let mut chunk_data: Vec<(String, Vec<u8>, u64)> = Vec::new();
for (_i, mut chunk) in chunks.into_iter().enumerate() {
let compressed = compress(&chunk.data, CompressionLevel::Balanced)?;
let encrypted = encrypt(&dek, &compressed)?;
let original_size = chunk.metadata.original_size;
chunk.data = Bytes::from(encrypted.clone());
chunk.metadata.encrypted = true;
chunk.metadata.size = chunk.data.len() as u64;
let chunk_hash = chunk.hash().clone();
let hash_str = chunk_hash.to_hex();
chunk_store.put(&chunk)?;
chunk_hashes.push(chunk_hash);
chunk_data.push((hash_str, encrypted, original_size));
stats.chunks_processed += 1;
process_progress.inc(1);
}
process_progress.finish_with_message("✅ Chunks processed and stored locally");
let now = chrono::Utc::now().timestamp_millis();
let metadata = FileMetadata {
id: file_id,
name: file_name.clone(),
mime_type: mime_guess::from_path(&file_path)
.first()
.map(|m| m.to_string()),
size: file_size,
created_at: now,
modified_at: now,
content_hash,
};
let (encrypted_dek, salt) = if prompt_use_encryption()? {
println!("\n🔐 Setting up encryption...");
let password = prompt_password_with_confirmation("Enter encryption password: ")?;
println!("⏳ Deriving encryption key (this may take a few seconds)...");
let (kek, salt) = derive_kek_with_new_salt(&password)?;
let encrypted = kek.encrypt_dek(&dek)?;
println!("✅ File encryption key secured with password\n");
(Some(encrypted), Some(salt.to_vec()))
} else {
println!("⚠️ Warning: File will be stored without password protection\n");
(None, None)
};
let mut manifest = FileManifest::new(metadata, chunk_hashes);
manifest.encrypted_dek = encrypted_dek;
manifest.salt = salt;
manifest_store.put(&manifest)?;
info!("Manifest stored locally");
info!("🔍 Discovering peers for distribution...");
let config = NodeConfig {
port: 0, enable_mdns: true,
bootstrap_peers: Vec::new(),
bootstrap_relays: vec![],
};
let mut node = FireCloudNode::new(config).await?;
info!("Local peer ID: {}", node.local_peer_id());
let mut all_peers = Vec::new();
let mut peers_found = false; let discovery_timeout = tokio::time::Instant::now() + Duration::from_secs(5);
while tokio::time::Instant::now() < discovery_timeout {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::PeerDiscovered(peer_id) => {
if peer_id != node.local_peer_id() {
info!("📡 Found peer: {}", peer_id);
all_peers.push(peer_id);
}
}
NodeEvent::Listening(addr) => {
info!("👂 Listening on {}", addr);
}
_ => {}
}
}
}
}
}
if all_peers.is_empty() {
peers_found = false;
println!();
println!("🔍 Peer Discovery Result:");
println!(" ❌ No peers detected on network");
println!();
println!("⚠️ File will be stored LOCALLY ONLY (not distributed)");
println!();
println!("💡 To enable network distribution:");
println!(" 1. Start a node: firecloud node --port 4001");
println!(" 2. Or join existing network (same WiFi auto-discovers)");
println!();
} else {
peers_found = true;
println!();
println!("🔍 Peer Discovery Result:");
println!(" ✅ Detected {} peer(s) on network", all_peers.len());
println!();
println!("✅ Querying for storage providers...");
let storage_providers = match node.find_storage_providers(10).await {
Ok(providers) if !providers.is_empty() => {
println!("🎯 Found {} storage provider(s) - using intelligent distribution", providers.len());
providers.iter().map(|p| p.peer_id).collect::<Vec<_>>()
}
_ => {
println!("📌 No dedicated storage providers found - distributing to all {} peers", all_peers.len());
all_peers.clone()
}
};
let peers = storage_providers;
let total_transfers = chunk_data.len() * peers.len();
let dist_progress = ProgressBar::new(total_transfers as u64);
dist_progress.set_style(
ProgressStyle::default_bar()
.template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} transfers ({per_sec})")
.unwrap()
.progress_chars("█▓░"),
);
dist_progress.set_message(format!(
"📤 Distributing {} chunks to {} peers...",
chunk_data.len(),
peers.len(),
));
let mut pending: HashMap<_, (String, usize, u64)> = HashMap::new();
for (hash, data, original_size) in &chunk_data {
let data_size = data.len() as u64;
for (idx, peer) in peers.iter().enumerate() {
let request_id = node.send_transfer_request(
peer,
TransferRequest::StoreChunk {
hash: hash.clone(),
data: data.clone(),
original_size: *original_size,
},
);
pending.insert(request_id, (hash.clone(), idx, data_size));
}
}
let transfer_timeout = tokio::time::Instant::now() + Duration::from_secs(30);
let mut success_count = 0;
let mut fail_count = 0;
while !pending.is_empty() && tokio::time::Instant::now() < transfer_timeout {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::TransferResponse { request_id, response, peer } => {
if let Some((_hash, _, data_size)) = pending.remove(&request_id) {
match response {
TransferResponse::Stored { success: true, .. } => {
success_count += 1;
stats.bytes_uploaded += data_size;
stats.chunks_distributed += 1;
dist_progress.inc(1);
}
TransferResponse::Stored { success: false, message, .. } => {
warn!("Chunk rejected by {}: {:?}", peer, message);
fail_count += 1;
dist_progress.inc(1);
}
_other => {
fail_count += 1;
dist_progress.inc(1);
}
}
}
}
NodeEvent::TransferFailed { request_id, error, peer } => {
if let Some((_hash, _, _)) = pending.remove(&request_id) {
warn!("Failed to send to {}: {}", peer, error);
fail_count += 1;
dist_progress.inc(1);
}
}
_ => {}
}
}
}
}
}
if !pending.is_empty() {
warn!("⚠️ {} transfers timed out", pending.len());
fail_count += pending.len();
}
dist_progress.finish_with_message(format!(
"✅ Distribution complete: {} succeeded, {} failed",
success_count, fail_count
));
println!("📢 Announcing file in DHT...");
let file_id_str = file_id.to_string();
node.announce_file(&file_id_str);
let announce_timeout = tokio::time::Instant::now() + Duration::from_secs(3);
while tokio::time::Instant::now() < announce_timeout {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
event = node.poll_event() => {
if let Some(event) = event {
match event {
NodeEvent::ProvideStarted { key } => {
println!(" ✅ File announced in DHT: {}", &key[..8.min(key.len())]);
break;
}
_ => {}
}
}
}
}
}
}
println!();
println!("📊 Upload Statistics:");
println!(" Chunks processed: {}", stats.chunks_processed);
println!(" Data uploaded: {}", format_bytes(stats.bytes_uploaded));
println!(" Time elapsed: {}", format_duration(stats.elapsed_secs()));
println!(" Transfer speed: {}", format_speed(stats.bytes_uploaded as f64 / stats.elapsed_secs()));
println!();
if !peers_found || stats.bytes_uploaded == 0 {
println!("✅ File encrypted and stored locally");
println!("⚠️ NOT distributed to network (no peers detected)");
println!();
println!("💡 File is safe on your device but not backed up to network.");
println!(" Start nodes on other devices to enable distribution.");
} else {
println!("✅ Upload complete - File distributed to network!");
println!(" {} chunks sent to network peers", stats.chunks_distributed);
}
println!();
println!("📁 File ID: {}", file_id);
println!("📄 File: {}", file_name);
println!("📦 Chunks: {}", chunk_data.len());
println!("🔐 Content hash: {}", content_hash.to_hex());
println!();
println!("To download this file, use:");
println!(" firecloud download --file {}", file_id);
Ok(())
}