use anyhow::{Context, Result};
use base64::Engine;
use std::path::Path;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;
use tracing::{debug, info, instrument, trace, warn};
use veilid_core::{DHTSchema, Sequencing, Stability, VeilidAPI, VeilidUpdate, CRYPTO_KIND_VLD0};
use crate::chunker::{chunk_data, Chunk};
use crate::compression;
use crate::encryption::{generate_salt, EncryptionContext};
use crate::metrics::{global_metrics, MetricCategory};
use crate::node::{start_node, stop_node, wait_for_attach};
use crate::protocol::{encode_response, FileMetadata, Request, Response};
use crate::throttle::Throttler;
#[instrument(
level = "info",
skip(path, password, throttle),
fields(
file_name = %path.file_name().and_then(|n| n.to_str()).unwrap_or("unnamed"),
file_path = %path.display(),
encrypted = password.is_some(),
compressed = compress
)
)]
pub async fn seed_file(
path: &Path,
insecure_local_fallback: bool,
password: Option<&str>,
compress: bool,
throttle: Option<Arc<Throttler>>,
) -> Result<()> {
if insecure_local_fallback {
use std::fs;
use std::io::Write;
let file_name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unnamed");
let file_data =
fs::read(path).with_context(|| format!("Failed to read file: {}", path.display()))?;
let original_size = file_data.len();
let data_to_chunk = if compress {
let compressed = compression::compress(&file_data)?;
info!(
"Compressed {} -> {} bytes ({:.1}%)",
original_size,
compressed.len(),
compressed.len() as f64 / original_size as f64 * 100.0
);
compressed
} else {
file_data
};
let chunks = chunk_data(&data_to_chunk);
let out_dir = std::env::temp_dir().join("vflight-local").join(file_name);
fs::create_dir_all(&out_dir)?;
let encryption_ctx = if let Some(pwd) = password {
let salt = generate_salt();
let ctx = EncryptionContext::new(pwd, &salt)?;
let meta_path = out_dir.join("encryption.json");
let meta = serde_json::json!({
"salt": base64::engine::general_purpose::STANDARD.encode(salt),
"nonce": base64::engine::general_purpose::STANDARD.encode(ctx.session_nonce()),
});
fs::write(&meta_path, serde_json::to_string_pretty(&meta)?)?;
Some(ctx)
} else {
None
};
let compression_meta_path = out_dir.join("compression.json");
fs::write(
&compression_meta_path,
serde_json::to_string_pretty(&serde_json::json!({ "compressed": compress }))?,
)?;
for chunk in &chunks {
let chunk_path = out_dir.join(format!("chunk_{:06}.bin", chunk.index));
let mut f = fs::File::create(&chunk_path)?;
let data = if let Some(ref ctx) = encryption_ctx {
ctx.encrypt_chunk(chunk.index, &chunk.data)?
} else {
chunk.data.clone()
};
f.write_all(&data)?;
}
info!(
"\n[INSECURE LOCAL MODE] Seeded {} chunks to {}",
chunks.len(),
out_dir.display()
);
if encryption_ctx.is_some() {
info!("Chunks are encrypted. Use the same password to fetch.");
}
if compress {
info!("File is compressed. Fetcher will decompress automatically.");
}
info!("Anyone with access to this directory can fetch the file chunks.\n");
return Ok(());
}
let file_name = path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("unnamed")
.to_string();
let file_data =
std::fs::read(path).with_context(|| format!("Failed to read file: {}", path.display()))?;
let original_size = file_data.len() as u64;
let data_to_chunk = if compress {
let compressed = compression::compress(&file_data)?;
info!(
"Compressed {} -> {} bytes ({:.1}%)",
original_size,
compressed.len(),
compressed.len() as f64 / original_size as f64 * 100.0
);
compressed
} else {
file_data
};
info!(file = %path.display(), "Chunking file");
let chunks = chunk_data(&data_to_chunk);
let total_chunks = chunks.len() as u64;
let chunk_hashes: Vec<String> = chunks.iter().map(|c| c.hash.clone()).collect();
info!(
file = %file_name,
size = original_size,
chunks = total_chunks,
compressed = compress,
"File chunked successfully"
);
info!("Starting Veilid node");
let (api, mut rx) = start_node("seeder").await?;
debug!("Waiting for network attachment");
wait_for_attach(&mut rx).await?;
info!("Attached to network");
let routing_context = api
.routing_context()
.context("Failed to get routing context")?;
debug!("Creating private route");
let route_blob = api
.new_custom_private_route(
&[CRYPTO_KIND_VLD0],
Stability::Reliable,
Sequencing::PreferOrdered,
)
.await
.context("Failed to create private route")?;
let route_id = route_blob.route_id;
let route_blob_b64 = base64::engine::general_purpose::STANDARD.encode(&route_blob.blob);
debug!("Private route created");
debug!("Creating DHT record");
let dht_create_start = Instant::now();
let dht_record = routing_context
.create_dht_record(CRYPTO_KIND_VLD0, DHTSchema::dflt(2)?, None)
.await
.context("Failed to create DHT record")?;
global_metrics().record(MetricCategory::DhtOperation, dht_create_start.elapsed(), 0);
let dht_key = dht_record.key();
info!(dht_key = %dht_key, "DHT record created");
let (encryption_ctx, encryption_salt, encryption_nonce) = if let Some(pwd) = password {
let salt = generate_salt();
let ctx = EncryptionContext::new(pwd, &salt)?;
let salt_b64 = base64::engine::general_purpose::STANDARD.encode(salt);
let nonce_b64 = base64::engine::general_purpose::STANDARD.encode(ctx.session_nonce());
info!("Encryption enabled - chunks will be encrypted with provided password");
(Some(ctx), Some(salt_b64), Some(nonce_b64))
} else {
(None, None, None)
};
let metadata = FileMetadata {
name: file_name.clone(),
size: original_size,
total_chunks,
chunk_hashes: chunk_hashes.clone(),
route_blob: route_blob_b64,
encryption_salt,
encryption_nonce,
compressed: compress,
};
let metadata_json = serde_json::to_vec(&metadata)?;
let dht_write_start = Instant::now();
routing_context
.set_dht_value(dht_key.clone(), 0, metadata_json.clone(), None)
.await
.context("Failed to write metadata to DHT")?;
global_metrics().record(
MetricCategory::DhtOperation,
dht_write_start.elapsed(),
metadata_json.len() as u64,
);
println!("\n========================================");
println!("Seeding: {}", file_name);
println!("Size: {} bytes", original_size);
println!("Chunks: {}", total_chunks);
println!("DHT Key: {}", dht_key);
if compress {
let compressed_size: u64 = chunks.iter().map(|c| c.data.len() as u64).sum();
println!(
"Compression: ENABLED ({:.1}%)",
compressed_size as f64 / original_size as f64 * 100.0
);
}
if encryption_ctx.is_some() {
println!("Encryption: ENABLED");
}
if let Some(ref t) = throttle {
println!("Throttle: {} KB/s", t.rate_kb_s());
}
println!("========================================");
println!("Share this DHT key with others to allow them to fetch the file.");
if encryption_ctx.is_some() {
println!("NOTE: Recipients must use the same password to decrypt.");
}
println!("Press Ctrl+C to stop seeding.\n");
let chunks = Arc::new(chunks);
let encryption_ctx = encryption_ctx.map(Arc::new);
handle_requests(
api.clone(),
&mut rx,
chunks,
&metadata,
encryption_ctx,
throttle,
)
.await?;
info!("Shutting down seeder");
api.release_private_route(route_id).ok();
routing_context.close_dht_record(dht_key).await.ok();
stop_node(api).await?;
info!("Seeder stopped");
Ok(())
}
#[instrument(
level = "debug",
skip(api, rx, chunks, metadata, encryption_ctx, throttle),
fields(file_name = %metadata.name, total_chunks = metadata.total_chunks, encrypted = encryption_ctx.is_some())
)]
async fn handle_requests(
api: VeilidAPI,
rx: &mut mpsc::UnboundedReceiver<VeilidUpdate>,
chunks: Arc<Vec<Chunk>>,
metadata: &FileMetadata,
encryption_ctx: Option<Arc<EncryptionContext>>,
throttle: Option<Arc<Throttler>>,
) -> Result<()> {
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Received shutdown signal (Ctrl+C)");
break;
}
update = rx.recv() => {
match update {
Some(VeilidUpdate::AppCall(app_call)) => {
let call_id = app_call.id();
let message = app_call.message();
trace!(
call_id = ?call_id,
message_len = message.len(),
"Received AppCall"
);
let response = match serde_json::from_slice::<Request>(message) {
Ok(Request::GetChunk { index }) => {
if let Some(chunk) = chunks.get(index as usize) {
debug!(
chunk_index = index,
total_chunks = chunks.len(),
encrypted = encryption_ctx.is_some(),
"Serving chunk"
);
let encryption_result = if let Some(ref ctx) = encryption_ctx {
ctx.encrypt_chunk(index, &chunk.data)
} else {
Ok(chunk.data.clone())
};
match encryption_result {
Ok(chunk_data) => {
if let Some(ref t) = throttle {
t.acquire(chunk_data.len()).await;
}
Response::ChunkData {
index,
data: chunk_data,
hash: chunk.hash.clone(),
}
}
Err(e) => {
warn!(error = %e, chunk_index = index, "Failed to encrypt chunk");
Response::Error {
message: format!(
"Encryption failed for chunk {}",
index
),
}
}
}
} else {
warn!(chunk_index = index, "Chunk not found");
Response::Error {
message: format!("Chunk {} not found", index),
}
}
}
Ok(Request::GetMetadata) => {
debug!("Serving metadata request");
Response::Metadata {
name: metadata.name.clone(),
size: metadata.size,
total_chunks: metadata.total_chunks,
chunk_hashes: metadata.chunk_hashes.clone(),
compressed: metadata.compressed,
}
}
Err(e) => {
warn!(error = %e, "Received invalid request");
Response::Error {
message: format!("Invalid request: {}", e),
}
}
};
trace!(response = ?response, "Sending response");
let response_bytes = encode_response(&response)?;
api.app_call_reply(call_id, response_bytes).await.ok();
}
Some(_) => {}
None => break,
}
}
}
}
Ok(())
}