firecloud-cli 0.2.0

Command-line interface for FireCloud P2P messaging and file sharing
//! Upload command - Upload a file to the network

use anyhow::Result;
use crate::format::{format_bytes, format_speed, format_duration};
use crate::password::{prompt_use_encryption, prompt_password_with_confirmation, derive_kek_with_new_salt};
use firecloud_core::{Bytes, ChunkHash, FileId, FileManifest, FileMetadata};
use firecloud_crypto::{encrypt, generate_dek};
use firecloud_net::{FireCloudNode, NodeConfig, NodeEvent, TransferRequest, TransferResponse};
use firecloud_storage::{compress, ChunkStore, CompressionLevel, FileChunker, ManifestStore};
use indicatif::{ProgressBar, ProgressStyle};
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tracing::{info, warn};

/// Statistics for upload tracking
#[allow(dead_code)]
struct UploadStats {
    start_time: Instant,
    bytes_uploaded: u64,
    chunks_processed: usize,
    chunks_distributed: usize,
    total_chunks: usize,
    total_size: u64,
}

impl UploadStats {
    fn new(total_chunks: usize, total_size: u64) -> Self {
        Self {
            start_time: Instant::now(),
            bytes_uploaded: 0,
            chunks_processed: 0,
            chunks_distributed: 0,
            total_chunks,
            total_size,
        }
    }

    fn elapsed_secs(&self) -> f64 {
        self.start_time.elapsed().as_secs_f64()
    }
}

pub async fn run(data_dir: PathBuf, file_path: PathBuf) -> Result<()> {
    info!("Uploading file: {}", file_path.display());

    // Open stores
    let chunk_store_path = data_dir.join("chunks");
    let chunk_store = ChunkStore::open(&chunk_store_path)?;

    let manifest_store_path = data_dir.join("manifests");
    let manifest_store = ManifestStore::open(&manifest_store_path)?;

    // Read file to compute content hash
    let mut file = File::open(&file_path)?;
    let file_size = file.metadata()?.len();
    let file_name = file_path
        .file_name()
        .map(|n| n.to_string_lossy().to_string())
        .unwrap_or_else(|| "unknown".to_string());

    // Read entire file to compute content hash
    let mut file_content = Vec::with_capacity(file_size as usize);
    file.read_to_end(&mut file_content)?;
    let content_hash = ChunkHash::hash(&file_content);

    info!("File: {} ({} bytes)", file_name, file_size);
    info!("Content hash: {}", content_hash.to_hex());

    // Chunk file
    let chunker = FileChunker::new();
    let chunks = chunker.chunk_bytes(&file_content)?;
    let num_chunks = chunks.len();

    println!("📦 Chunking file into {} pieces...", num_chunks);

    // Generate DEK for this file
    let dek = generate_dek();

    // Generate file ID
    let file_id = FileId::new();
    println!("📝 File ID: {}", file_id);

    // Initialize stats
    let mut stats = UploadStats::new(num_chunks, file_size);

    // Create progress bar for processing phase
    let process_progress = ProgressBar::new(num_chunks as u64);
    process_progress.set_style(
        ProgressStyle::default_bar()
            .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} chunks")
            .unwrap()
            .progress_chars("█▓░"),
    );
    process_progress.set_message("🔐 Processing chunks (compress + encrypt)...");

    // Process and store each chunk locally
    let mut chunk_hashes: Vec<ChunkHash> = Vec::new();
    let mut chunk_data: Vec<(String, Vec<u8>, u64)> = Vec::new(); // (hash, data, original_size)

    for (_i, mut chunk) in chunks.into_iter().enumerate() {
        // Compress
        let compressed = compress(&chunk.data, CompressionLevel::Balanced)?;

        // Encrypt
        let encrypted = encrypt(&dek, &compressed)?;

        // Update chunk with encrypted data
        let original_size = chunk.metadata.original_size;
        chunk.data = Bytes::from(encrypted.clone());
        chunk.metadata.encrypted = true;
        chunk.metadata.size = chunk.data.len() as u64;

        // Compute hash of encrypted data (content-addressed)
        let chunk_hash = chunk.hash().clone();

        // Store locally
        let hash_str = chunk_hash.to_hex();
        chunk_store.put(&chunk)?;

        chunk_hashes.push(chunk_hash);
        chunk_data.push((hash_str, encrypted, original_size));
        
        stats.chunks_processed += 1;
        process_progress.inc(1);
    }

    process_progress.finish_with_message("✅ Chunks processed and stored locally");

    // Create file manifest
    let now = chrono::Utc::now().timestamp_millis();
    let metadata = FileMetadata {
        id: file_id,
        name: file_name.clone(),
        mime_type: mime_guess::from_path(&file_path)
            .first()
            .map(|m| m.to_string()),
        size: file_size,
        created_at: now,
        modified_at: now,
        content_hash,
    };

    // Ask user about encryption
    let (encrypted_dek, salt) = if prompt_use_encryption()? {
        println!("\n🔐 Setting up encryption...");
        let password = prompt_password_with_confirmation("Enter encryption password: ")?;
        
        println!("⏳ Deriving encryption key (this may take a few seconds)...");
        let (kek, salt) = derive_kek_with_new_salt(&password)?;
        
        // Encrypt the DEK with the KEK
        let encrypted = kek.encrypt_dek(&dek)?;
        println!("✅ File encryption key secured with password\n");
        
        (Some(encrypted), Some(salt.to_vec()))
    } else {
        println!("⚠️  Warning: File will be stored without password protection\n");
        (None, None)
    };

    let mut manifest = FileManifest::new(metadata, chunk_hashes);
    manifest.encrypted_dek = encrypted_dek;
    manifest.salt = salt;

    // Store manifest locally
    manifest_store.put(&manifest)?;
    info!("Manifest stored locally");

    // Now distribute to peers
    info!("🔍 Discovering peers for distribution...");

    let config = NodeConfig {
        port: 0, // Random port for upload client
        enable_mdns: true,
        bootstrap_peers: Vec::new(),
        bootstrap_relays: vec![],
    };

    let mut node = FireCloudNode::new(config).await?;
    info!("Local peer ID: {}", node.local_peer_id());

    // Collect peers for a few seconds
    let mut all_peers = Vec::new();
    let mut peers_found = false;  // Track if any peers were found
    let discovery_timeout = tokio::time::Instant::now() + Duration::from_secs(5);

    while tokio::time::Instant::now() < discovery_timeout {
        tokio::select! {
            _ = tokio::time::sleep(Duration::from_millis(100)) => {}
            event = node.poll_event() => {
                if let Some(event) = event {
                    match event {
                        NodeEvent::PeerDiscovered(peer_id) => {
                            if peer_id != node.local_peer_id() {
                                info!("📡 Found peer: {}", peer_id);
                                all_peers.push(peer_id);
                            }
                        }
                        NodeEvent::Listening(addr) => {
                            info!("👂 Listening on {}", addr);
                        }
                        _ => {}
                    }
                }
            }
        }
    }

    if all_peers.is_empty() {
        peers_found = false;
        println!();
        println!("🔍 Peer Discovery Result:");
        println!("   ❌ No peers detected on network");
        println!();
        println!("⚠️  File will be stored LOCALLY ONLY (not distributed)");
        println!();
        println!("💡 To enable network distribution:");
        println!("   1. Start a node: firecloud node --port 4001");
        println!("   2. Or join existing network (same WiFi auto-discovers)");
        println!();
    } else {
        peers_found = true;
        println!();
        println!("🔍 Peer Discovery Result:");
        println!("   ✅ Detected {} peer(s) on network", all_peers.len());
        println!();
        println!("✅ Querying for storage providers...");
        
        // Try to find storage providers via DHT
        let storage_providers = match node.find_storage_providers(10).await {
            Ok(providers) if !providers.is_empty() => {
                println!("🎯 Found {} storage provider(s) - using intelligent distribution", providers.len());
                // Extract peer IDs from providers
                providers.iter().map(|p| p.peer_id).collect::<Vec<_>>()
            }
            _ => {
                println!("📌 No dedicated storage providers found - distributing to all {} peers", all_peers.len());
                all_peers.clone()
            }
        };
        
        let peers = storage_providers;

        // Calculate total transfers (chunks × peers)
        let total_transfers = chunk_data.len() * peers.len();
        
        // Create progress bar for distribution
        let dist_progress = ProgressBar::new(total_transfers as u64);
        dist_progress.set_style(
            ProgressStyle::default_bar()
                .template("{msg}\n{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} transfers ({per_sec})")
                .unwrap()
                .progress_chars("█▓░"),
        );
        dist_progress.set_message(format!(
            "📤 Distributing {} chunks to {} peers...",
            chunk_data.len(),
            peers.len(),
        ));

        // Track pending requests
        let mut pending: HashMap<_, (String, usize, u64)> = HashMap::new();  // request_id -> (hash, peer_index, size)

        // Send each chunk to each peer
        for (hash, data, original_size) in &chunk_data {
            let data_size = data.len() as u64;
            for (idx, peer) in peers.iter().enumerate() {
                let request_id = node.send_transfer_request(
                    peer,
                    TransferRequest::StoreChunk {
                        hash: hash.clone(),
                        data: data.clone(),
                        original_size: *original_size,
                    },
                );
                pending.insert(request_id, (hash.clone(), idx, data_size));
            }
        }

        // Wait for responses with timeout
        let transfer_timeout = tokio::time::Instant::now() + Duration::from_secs(30);
        let mut success_count = 0;
        let mut fail_count = 0;

        while !pending.is_empty() && tokio::time::Instant::now() < transfer_timeout {
            tokio::select! {
                _ = tokio::time::sleep(Duration::from_millis(100)) => {}
                event = node.poll_event() => {
                    if let Some(event) = event {
                        match event {
                            NodeEvent::TransferResponse { request_id, response, peer } => {
                                if let Some((_hash, _, data_size)) = pending.remove(&request_id) {
                                    match response {
                                        TransferResponse::Stored { success: true, .. } => {
                                            success_count += 1;
                                            stats.bytes_uploaded += data_size;
                                            stats.chunks_distributed += 1;
                                            dist_progress.inc(1);
                                        }
                                        TransferResponse::Stored { success: false, message, .. } => {
                                            warn!("Chunk rejected by {}: {:?}", peer, message);
                                            fail_count += 1;
                                            dist_progress.inc(1);
                                        }
                                        _other => {
                                            fail_count += 1;
                                            dist_progress.inc(1);
                                        }
                                    }
                                }
                            }
                            NodeEvent::TransferFailed { request_id, error, peer } => {
                                if let Some((_hash, _, _)) = pending.remove(&request_id) {
                                    warn!("Failed to send to {}: {}", peer, error);
                                    fail_count += 1;
                                    dist_progress.inc(1);
                                }
                            }
                            _ => {}
                        }
                    }
                }
            }
        }

        if !pending.is_empty() {
            warn!("⚠️  {} transfers timed out", pending.len());
            fail_count += pending.len();
        }

        dist_progress.finish_with_message(format!(
            "✅ Distribution complete: {} succeeded, {} failed",
            success_count, fail_count
        ));

        // Announce file in DHT so other peers can find us
        println!("📢 Announcing file in DHT...");
        let file_id_str = file_id.to_string();
        node.announce_file(&file_id_str);
        
        // Wait briefly for announcement to propagate
        let announce_timeout = tokio::time::Instant::now() + Duration::from_secs(3);
        while tokio::time::Instant::now() < announce_timeout {
            tokio::select! {
                _ = tokio::time::sleep(Duration::from_millis(100)) => {}
                event = node.poll_event() => {
                    if let Some(event) = event {
                        match event {
                            NodeEvent::ProvideStarted { key } => {
                                println!("  ✅ File announced in DHT: {}", &key[..8.min(key.len())]);
                                break;
                            }
                            _ => {}
                        }
                    }
                }
            }
        }
    }

    // Print final statistics
    println!();
    println!("📊 Upload Statistics:");
    println!("   Chunks processed:    {}", stats.chunks_processed);
    println!("   Data uploaded:       {}", format_bytes(stats.bytes_uploaded));
    println!("   Time elapsed:        {}", format_duration(stats.elapsed_secs()));
    println!("   Transfer speed:      {}", format_speed(stats.bytes_uploaded as f64 / stats.elapsed_secs()));
    println!();
    
    // Different message based on whether distribution happened
    if !peers_found || stats.bytes_uploaded == 0 {
        println!("✅ File encrypted and stored locally");
        println!("⚠️  NOT distributed to network (no peers detected)");
        println!();
        println!("💡 File is safe on your device but not backed up to network.");
        println!("   Start nodes on other devices to enable distribution.");
    } else {
        println!("✅ Upload complete - File distributed to network!");
        println!("   {} chunks sent to network peers", stats.chunks_distributed);
    }
    
    println!();
    println!("📁 File ID: {}", file_id);
    println!("📄 File: {}", file_name);
    println!("📦 Chunks: {}", chunk_data.len());
    println!("🔐 Content hash: {}", content_hash.to_hex());
    println!();
    println!("To download this file, use:");
    println!("  firecloud download --file {}", file_id);

    Ok(())
}