vflight 0.9.2

Share files over the Veilid distributed network with content-addressable storage
Documentation
use anyhow::{Context, Result};
use base64::Engine;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{debug, info, instrument, trace, warn};
use veilid_core::{DHTSchema, Sequencing, Stability, VeilidAPI, VeilidUpdate, CRYPTO_KIND_VLD0};

use crate::chunker::{chunk_data, Chunk};
use crate::compression;
use crate::encryption::{generate_salt, EncryptionContext};
use crate::metrics::{global_metrics, MetricCategory};
use crate::node::{start_node, stop_node, wait_for_attach};
use crate::protocol::{encode_response, FileMetadata, Request, Response};
use crate::throttle::Throttler;

#[instrument(
    level = "info",
    skip(path, password, throttle),
    fields(
        file_name = %path.file_name().and_then(|n| n.to_str()).unwrap_or("unnamed"),
        file_path = %path.display(),
        encrypted = password.is_some(),
        compressed = compress
    )
)]
pub async fn seed_file(
    path: &Path,
    insecure_local_fallback: bool,
    password: Option<&str>,
    compress: bool,
    throttle: Option<Arc<Throttler>>,
) -> Result<()> {
    if insecure_local_fallback {
        use std::fs;
        use std::io::Write;
        let file_name = path
            .file_name()
            .and_then(|n| n.to_str())
            .unwrap_or("unnamed");

        let file_data =
            fs::read(path).with_context(|| format!("Failed to read file: {}", path.display()))?;
        let original_size = file_data.len();
        let data_to_chunk = if compress {
            let compressed = compression::compress(&file_data)?;
            info!(
                "Compressed {} -> {} bytes ({:.1}%)",
                original_size,
                compressed.len(),
                compressed.len() as f64 / original_size as f64 * 100.0
            );
            compressed
        } else {
            file_data
        };
        let chunks = chunk_data(&data_to_chunk);

        let out_dir = std::env::temp_dir().join("vflight-local").join(file_name);
        fs::create_dir_all(&out_dir)?;

        // Set up encryption if password provided
        let encryption_ctx = if let Some(pwd) = password {
            let salt = generate_salt();
            let ctx = EncryptionContext::new(pwd, &salt)?;
            let meta_path = out_dir.join("encryption.json");
            let meta = serde_json::json!({
                "salt": base64::engine::general_purpose::STANDARD.encode(salt),
                "nonce": base64::engine::general_purpose::STANDARD.encode(ctx.session_nonce()),
            });
            fs::write(&meta_path, serde_json::to_string_pretty(&meta)?)?;
            Some(ctx)
        } else {
            None
        };

        // Save compression flag
        let compression_meta_path = out_dir.join("compression.json");
        fs::write(
            &compression_meta_path,
            serde_json::to_string_pretty(&serde_json::json!({ "compressed": compress }))?,
        )?;

        for chunk in &chunks {
            let chunk_path = out_dir.join(format!("chunk_{:06}.bin", chunk.index));
            let mut f = fs::File::create(&chunk_path)?;
            let data = if let Some(ref ctx) = encryption_ctx {
                ctx.encrypt_chunk(chunk.index, &chunk.data)?
            } else {
                chunk.data.clone()
            };
            f.write_all(&data)?;
        }
        info!(
            "\n[INSECURE LOCAL MODE] Seeded {} chunks to {}",
            chunks.len(),
            out_dir.display()
        );
        if encryption_ctx.is_some() {
            info!("Chunks are encrypted. Use the same password to fetch.");
        }
        if compress {
            info!("File is compressed. Fetcher will decompress automatically.");
        }
        info!("Anyone with access to this directory can fetch the file chunks.\n");
        return Ok(());
    }
    let file_name = path
        .file_name()
        .and_then(|n| n.to_str())
        .unwrap_or("unnamed")
        .to_string();

    let file_data =
        std::fs::read(path).with_context(|| format!("Failed to read file: {}", path.display()))?;
    let original_size = file_data.len() as u64;

    let data_to_chunk = if compress {
        let compressed = compression::compress(&file_data)?;
        info!(
            "Compressed {} -> {} bytes ({:.1}%)",
            original_size,
            compressed.len(),
            compressed.len() as f64 / original_size as f64 * 100.0
        );
        compressed
    } else {
        file_data
    };

    info!(file = %path.display(), "Chunking file");
    let chunks = chunk_data(&data_to_chunk);
    let total_chunks = chunks.len() as u64;
    let chunk_hashes: Vec<String> = chunks.iter().map(|c| c.hash.clone()).collect();

    info!(
        file = %file_name,
        size = original_size,
        chunks = total_chunks,
        compressed = compress,
        "File chunked successfully"
    );

    info!("Starting Veilid node");
    let (api, mut rx) = start_node("seeder").await?;

    debug!("Waiting for network attachment");
    wait_for_attach(&mut rx).await?;
    info!("Attached to network");

    let routing_context = api
        .routing_context()
        .context("Failed to get routing context")?;

    // Create private route for receiving chunk requests
    debug!("Creating private route");
    let route_blob = api
        .new_custom_private_route(
            &[CRYPTO_KIND_VLD0],
            Stability::Reliable,
            Sequencing::PreferOrdered,
        )
        .await
        .context("Failed to create private route")?;

    let route_id = route_blob.route_id;
    let route_blob_b64 = base64::engine::general_purpose::STANDARD.encode(&route_blob.blob);
    debug!("Private route created");

    // Create DHT record with DFLT schema (2 subkeys: metadata + status)
    debug!("Creating DHT record");
    let dht_create_start = Instant::now();
    let dht_record = routing_context
        .create_dht_record(CRYPTO_KIND_VLD0, DHTSchema::dflt(2)?, None)
        .await
        .context("Failed to create DHT record")?;
    global_metrics().record(MetricCategory::DhtOperation, dht_create_start.elapsed(), 0);

    let dht_key = dht_record.key();
    info!(dht_key = %dht_key, "DHT record created");

    // Set up encryption if password provided
    let (encryption_ctx, encryption_salt, encryption_nonce) = if let Some(pwd) = password {
        let salt = generate_salt();
        let ctx = EncryptionContext::new(pwd, &salt)?;
        let salt_b64 = base64::engine::general_purpose::STANDARD.encode(salt);
        let nonce_b64 = base64::engine::general_purpose::STANDARD.encode(ctx.session_nonce());
        info!("Encryption enabled - chunks will be encrypted with provided password");
        (Some(ctx), Some(salt_b64), Some(nonce_b64))
    } else {
        (None, None, None)
    };

    // Build and write metadata to subkey 0
    let metadata = FileMetadata {
        name: file_name.clone(),
        size: original_size,
        total_chunks,
        chunk_hashes: chunk_hashes.clone(),
        route_blob: route_blob_b64,
        encryption_salt,
        encryption_nonce,
        compressed: compress,
    };

    let metadata_json = serde_json::to_vec(&metadata)?;
    let dht_write_start = Instant::now();
    routing_context
        .set_dht_value(dht_key.clone(), 0, metadata_json.clone(), None)
        .await
        .context("Failed to write metadata to DHT")?;
    global_metrics().record(
        MetricCategory::DhtOperation,
        dht_write_start.elapsed(),
        metadata_json.len() as u64,
    );

    println!("\n========================================");
    println!("Seeding: {}", file_name);
    println!("Size: {} bytes", original_size);
    println!("Chunks: {}", total_chunks);
    println!("DHT Key: {}", dht_key);
    if compress {
        let compressed_size: u64 = chunks.iter().map(|c| c.data.len() as u64).sum();
        println!(
            "Compression: ENABLED ({:.1}%)",
            compressed_size as f64 / original_size as f64 * 100.0
        );
    }
    if encryption_ctx.is_some() {
        println!("Encryption: ENABLED");
    }
    if let Some(ref t) = throttle {
        println!("Throttle: {} KB/s", t.rate_kb_s());
    }
    println!("========================================");
    println!("Share this DHT key with others to allow them to fetch the file.");
    if encryption_ctx.is_some() {
        println!("NOTE: Recipients must use the same password to decrypt.");
    }
    println!("Press Ctrl+C to stop seeding.\n");

    // Wrap chunks and encryption context in Arc for sharing with handler
    let chunks = Arc::new(chunks);
    let encryption_ctx = encryption_ctx.map(Arc::new);

    // Handle incoming AppCall requests
    handle_requests(
        api.clone(),
        &mut rx,
        chunks,
        &metadata,
        encryption_ctx,
        throttle,
    )
    .await?;

    // Cleanup
    info!("Shutting down seeder");
    api.release_private_route(route_id).ok();
    routing_context.close_dht_record(dht_key).await.ok();
    stop_node(api).await?;

    info!("Seeder stopped");
    Ok(())
}

#[instrument(
    level = "debug",
    skip(api, rx, chunks, metadata, encryption_ctx, throttle),
    fields(file_name = %metadata.name, total_chunks = metadata.total_chunks, encrypted = encryption_ctx.is_some())
)]
async fn handle_requests(
    api: VeilidAPI,
    rx: &mut mpsc::UnboundedReceiver<VeilidUpdate>,
    chunks: Arc<Vec<Chunk>>,
    metadata: &FileMetadata,
    encryption_ctx: Option<Arc<EncryptionContext>>,
    throttle: Option<Arc<Throttler>>,
) -> Result<()> {
    loop {
        tokio::select! {
            _ = tokio::signal::ctrl_c() => {
                info!("Received shutdown signal (Ctrl+C)");
                break;
            }
            update = rx.recv() => {
                match update {
                    Some(VeilidUpdate::AppCall(app_call)) => {
                        let call_id = app_call.id();
                        let message = app_call.message();

                        trace!(
                            call_id = ?call_id,
                            message_len = message.len(),
                            "Received AppCall"
                        );

                        let response = match serde_json::from_slice::<Request>(message) {
                            Ok(Request::GetChunk { index }) => {
                                if let Some(chunk) = chunks.get(index as usize) {
                                    debug!(
                                        chunk_index = index,
                                        total_chunks = chunks.len(),
                                        encrypted = encryption_ctx.is_some(),
                                        "Serving chunk"
                                    );
                                    let encryption_result = if let Some(ref ctx) = encryption_ctx {
                                        ctx.encrypt_chunk(index, &chunk.data)
                                    } else {
                                        Ok(chunk.data.clone())
                                    };
                                    match encryption_result {
                                        Ok(chunk_data) => {
                                            if let Some(ref t) = throttle {
                                                t.acquire(chunk_data.len()).await;
                                            }
                                            Response::ChunkData {
                                                index,
                                                data: chunk_data,
                                                hash: chunk.hash.clone(),
                                            }
                                        }
                                        Err(e) => {
                                            warn!(error = %e, chunk_index = index, "Failed to encrypt chunk");
                                            Response::Error {
                                                message: format!(
                                                    "Encryption failed for chunk {}",
                                                    index
                                                ),
                                            }
                                        }
                                    }
                                } else {
                                    warn!(chunk_index = index, "Chunk not found");
                                    Response::Error {
                                        message: format!("Chunk {} not found", index),
                                    }
                                }
                            }
                            Ok(Request::GetMetadata) => {
                                debug!("Serving metadata request");
                                Response::Metadata {
                                    name: metadata.name.clone(),
                                    size: metadata.size,
                                    total_chunks: metadata.total_chunks,
                                    chunk_hashes: metadata.chunk_hashes.clone(),
                                    compressed: metadata.compressed,
                                }
                            }
                            Err(e) => {
                                warn!(error = %e, "Received invalid request");
                                Response::Error {
                                    message: format!("Invalid request: {}", e),
                                }
                            }
                        };

                        trace!(response = ?response, "Sending response");
                        let response_bytes = encode_response(&response)?;
                        api.app_call_reply(call_id, response_bytes).await.ok();
                    }
                    Some(_) => {}
                    None => break,
                }
            }
        }
    }
    Ok(())
}