use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, info, warn};
use crate::protocol::FileMetadata;
pub const RESUME_VERSION: u32 = 1;
const RESUME_DIR_NAME: &str = ".vflight-resume";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResumeState {
pub version: u32,
pub dht_key: String,
pub metadata: ResumeMetadata,
pub completed_chunks: HashSet<u64>,
pub last_updated: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ResumeMetadata {
pub name: String,
pub size: u64,
pub total_chunks: u64,
pub chunk_hashes: Vec<String>,
pub encrypted: bool,
}
impl ResumeState {
pub fn new(dht_key: &str, metadata: &FileMetadata) -> Self {
Self {
version: RESUME_VERSION,
dht_key: dht_key.to_string(),
metadata: ResumeMetadata::from(metadata),
completed_chunks: HashSet::new(),
last_updated: current_timestamp(),
}
}
pub fn pending_chunks(&self) -> Vec<u64> {
(0..self.metadata.total_chunks)
.filter(|i| !self.completed_chunks.contains(i))
.collect()
}
pub fn is_complete(&self) -> bool {
self.completed_chunks.len() as u64 == self.metadata.total_chunks
}
pub fn completed_count(&self) -> u64 {
self.completed_chunks.len() as u64
}
}
impl From<&FileMetadata> for ResumeMetadata {
fn from(metadata: &FileMetadata) -> Self {
Self {
name: metadata.name.clone(),
size: metadata.size,
total_chunks: metadata.total_chunks,
chunk_hashes: metadata.chunk_hashes.clone(),
encrypted: metadata.encryption_salt.is_some(),
}
}
}
impl ResumeMetadata {
pub fn matches(&self, metadata: &FileMetadata) -> bool {
self.name == metadata.name
&& self.size == metadata.size
&& self.total_chunks == metadata.total_chunks
&& self.chunk_hashes == metadata.chunk_hashes
}
}
pub struct ResumeManager {
resume_dir: PathBuf,
state_path: PathBuf,
chunks_dir: PathBuf,
state: ResumeState,
}
impl ResumeManager {
pub fn init(output_dir: &Path, dht_key: &str, metadata: &FileMetadata) -> Result<Self> {
let resume_dir = Self::resume_dir_path(output_dir, dht_key);
let state_path = resume_dir.join("resume.json");
let chunks_dir = resume_dir.join("chunks");
if state_path.exists() {
match Self::load_and_validate(&state_path, dht_key, metadata) {
Ok(state) => {
info!(
completed = state.completed_count(),
total = state.metadata.total_chunks,
"Loaded existing resume state"
);
return Ok(Self {
resume_dir,
state_path,
chunks_dir,
state,
});
}
Err(e) => {
warn!(error = %e, "Invalid resume state, starting fresh");
if resume_dir.exists() {
fs::remove_dir_all(&resume_dir).ok();
}
}
}
}
Self::new_fresh(output_dir, dht_key, metadata)
}
pub fn new_fresh(output_dir: &Path, dht_key: &str, metadata: &FileMetadata) -> Result<Self> {
let resume_dir = Self::resume_dir_path(output_dir, dht_key);
let state_path = resume_dir.join("resume.json");
let chunks_dir = resume_dir.join("chunks");
if resume_dir.exists() {
fs::remove_dir_all(&resume_dir)
.context("Failed to remove existing resume directory")?;
}
fs::create_dir_all(&chunks_dir).context("Failed to create resume chunks directory")?;
let state = ResumeState::new(dht_key, metadata);
let manager = Self {
resume_dir,
state_path,
chunks_dir,
state,
};
manager.save_state()?;
debug!("Created fresh resume state");
Ok(manager)
}
pub fn pending_chunks(&self) -> Vec<u64> {
self.state.pending_chunks()
}
pub fn completed_count(&self) -> u64 {
self.state.completed_count()
}
pub fn total_chunks(&self) -> u64 {
self.state.metadata.total_chunks
}
pub fn is_encrypted(&self) -> bool {
self.state.metadata.encrypted
}
pub fn save_chunk(&self, index: u64, data: &[u8]) -> Result<()> {
let chunk_path = self.chunk_path(index);
let temp_path = chunk_path.with_extension("tmp");
fs::write(&temp_path, data).context("Failed to write chunk file")?;
fs::rename(&temp_path, &chunk_path).context("Failed to rename chunk file")?;
debug!(chunk = index, "Saved chunk to disk");
Ok(())
}
pub fn mark_chunk_complete(&mut self, index: u64) -> Result<()> {
self.state.completed_chunks.insert(index);
self.state.last_updated = current_timestamp();
self.save_state()?;
Ok(())
}
pub fn save_and_mark_complete(&mut self, index: u64, data: &[u8]) -> Result<()> {
self.save_chunk(index, data)?;
self.mark_chunk_complete(index)?;
Ok(())
}
pub fn load_chunk(&self, index: u64) -> Result<Vec<u8>> {
let chunk_path = self.chunk_path(index);
fs::read(&chunk_path).with_context(|| format!("Failed to read chunk {} from cache", index))
}
pub fn verify_chunk(&self, index: u64, expected_hash: &str) -> Result<bool> {
let chunk_path = self.chunk_path(index);
if !chunk_path.exists() {
return Ok(false);
}
let data = fs::read(&chunk_path)?;
let computed_hash = blake3::hash(&data).to_hex().to_string();
Ok(computed_hash == expected_hash)
}
pub fn verify_and_invalidate_corrupted(&mut self) -> Result<Vec<u64>> {
let mut invalidated = Vec::new();
let completed: Vec<u64> = self.state.completed_chunks.iter().copied().collect();
for index in completed {
let expected_hash = self
.state
.metadata
.chunk_hashes
.get(index as usize)
.map(|s| s.as_str())
.unwrap_or("");
match self.verify_chunk(index, expected_hash) {
Ok(true) => {
}
Ok(false) | Err(_) => {
warn!(
chunk = index,
"Cached chunk corrupted or missing, will re-download"
);
self.state.completed_chunks.remove(&index);
let chunk_path = self.chunk_path(index);
fs::remove_file(&chunk_path).ok();
invalidated.push(index);
}
}
}
if !invalidated.is_empty() {
self.state.last_updated = current_timestamp();
self.save_state()?;
}
Ok(invalidated)
}
pub fn cleanup(self) -> Result<()> {
if self.resume_dir.exists() {
fs::remove_dir_all(&self.resume_dir).context("Failed to remove resume directory")?;
debug!("Cleaned up resume state");
}
Ok(())
}
fn chunk_path(&self, index: u64) -> PathBuf {
self.chunks_dir.join(format!("chunk_{:06}.bin", index))
}
fn resume_dir_path(output_dir: &Path, dht_key: &str) -> PathBuf {
let hash = blake3::hash(dht_key.as_bytes()).to_hex();
let prefix = &hash[..16];
output_dir.join(RESUME_DIR_NAME).join(prefix)
}
fn save_state(&self) -> Result<()> {
let json = serde_json::to_string_pretty(&self.state)?;
let temp_path = self.state_path.with_extension("tmp");
fs::write(&temp_path, &json).context("Failed to write resume state")?;
fs::rename(&temp_path, &self.state_path).context("Failed to rename resume state")?;
Ok(())
}
fn load_and_validate(
state_path: &Path,
dht_key: &str,
metadata: &FileMetadata,
) -> Result<ResumeState> {
let json = fs::read_to_string(state_path).context("Failed to read resume state")?;
let state: ResumeState =
serde_json::from_str(&json).context("Failed to parse resume state")?;
if state.version != RESUME_VERSION {
anyhow::bail!(
"Resume state version mismatch: expected {}, got {}",
RESUME_VERSION,
state.version
);
}
if state.dht_key != dht_key {
anyhow::bail!("Resume state DHT key mismatch");
}
if !state.metadata.matches(metadata) {
anyhow::bail!("File metadata has changed since last download attempt");
}
Ok(state)
}
}
fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
fn create_test_metadata(chunks: u64) -> FileMetadata {
let chunk_hashes: Vec<String> = (0..chunks)
.map(|i| {
blake3::hash(format!("chunk{}", i).as_bytes())
.to_hex()
.to_string()
})
.collect();
FileMetadata {
name: "test_file.bin".to_string(),
size: chunks * 30000,
total_chunks: chunks,
chunk_hashes,
route_blob: String::new(),
encryption_salt: None,
encryption_nonce: None,
compressed: false,
}
}
#[test]
fn test_resume_state_new() {
let metadata = create_test_metadata(10);
let state = ResumeState::new("VLD0:test_key", &metadata);
assert_eq!(state.version, RESUME_VERSION);
assert_eq!(state.dht_key, "VLD0:test_key");
assert_eq!(state.metadata.total_chunks, 10);
assert!(state.completed_chunks.is_empty());
assert!(!state.is_complete());
}
#[test]
fn test_resume_state_pending_chunks() {
let metadata = create_test_metadata(5);
let mut state = ResumeState::new("VLD0:test", &metadata);
assert_eq!(state.pending_chunks(), vec![0, 1, 2, 3, 4]);
state.completed_chunks.insert(1);
state.completed_chunks.insert(3);
assert_eq!(state.pending_chunks(), vec![0, 2, 4]);
assert_eq!(state.completed_count(), 2);
assert!(!state.is_complete());
state.completed_chunks.insert(0);
state.completed_chunks.insert(2);
state.completed_chunks.insert(4);
assert!(state.pending_chunks().is_empty());
assert!(state.is_complete());
}
#[test]
fn test_resume_state_serialization() {
let metadata = create_test_metadata(3);
let mut state = ResumeState::new("VLD0:test", &metadata);
state.completed_chunks.insert(0);
state.completed_chunks.insert(2);
let json = serde_json::to_string(&state).unwrap();
let restored: ResumeState = serde_json::from_str(&json).unwrap();
assert_eq!(restored.version, state.version);
assert_eq!(restored.dht_key, state.dht_key);
assert_eq!(restored.metadata, state.metadata);
assert_eq!(restored.completed_chunks, state.completed_chunks);
}
#[test]
fn test_resume_metadata_matches() {
let metadata = create_test_metadata(5);
let resume_meta = ResumeMetadata::from(&metadata);
assert!(resume_meta.matches(&metadata));
let mut different = metadata.clone();
different.size = 999;
assert!(!resume_meta.matches(&different));
let mut different = metadata.clone();
different.total_chunks = 10;
assert!(!resume_meta.matches(&different));
}
#[test]
fn test_resume_manager_init_fresh() {
let temp = tempdir().unwrap();
let metadata = create_test_metadata(5);
let manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
assert_eq!(manager.pending_chunks().len(), 5);
assert_eq!(manager.completed_count(), 0);
assert!(manager.state_path.exists());
}
#[test]
fn test_resume_manager_init_existing() {
let temp = tempdir().unwrap();
let metadata = create_test_metadata(5);
{
let mut manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
manager.save_chunk(0, b"chunk0data").unwrap();
manager.mark_chunk_complete(0).unwrap();
manager.save_chunk(2, b"chunk2data").unwrap();
manager.mark_chunk_complete(2).unwrap();
}
let manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
assert_eq!(manager.completed_count(), 2);
assert_eq!(manager.pending_chunks(), vec![1, 3, 4]);
}
#[test]
fn test_resume_manager_init_corrupted() {
let temp = tempdir().unwrap();
let metadata = create_test_metadata(5);
let hash = blake3::hash("VLD0:test".as_bytes()).to_hex();
let resume_dir = temp.path().join(RESUME_DIR_NAME).join(&hash[..16]);
fs::create_dir_all(&resume_dir).unwrap();
fs::write(resume_dir.join("resume.json"), "invalid json").unwrap();
let manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
assert_eq!(manager.completed_count(), 0);
}
#[test]
fn test_resume_manager_save_and_load_chunk() {
let temp = tempdir().unwrap();
let metadata = create_test_metadata(3);
let manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
let test_data = b"test chunk data for verification";
manager.save_chunk(1, test_data).unwrap();
let loaded = manager.load_chunk(1).unwrap();
assert_eq!(loaded, test_data);
}
#[test]
fn test_resume_manager_verify_chunk_valid() {
let temp = tempdir().unwrap();
let test_data = b"test chunk data";
let hash = blake3::hash(test_data).to_hex().to_string();
let metadata = FileMetadata {
name: "test.bin".to_string(),
size: test_data.len() as u64,
total_chunks: 1,
chunk_hashes: vec![hash.clone()],
route_blob: String::new(),
encryption_salt: None,
encryption_nonce: None,
compressed: false,
};
let manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
manager.save_chunk(0, test_data).unwrap();
assert!(manager.verify_chunk(0, &hash).unwrap());
}
#[test]
fn test_resume_manager_verify_chunk_corrupted() {
let temp = tempdir().unwrap();
let metadata = create_test_metadata(1);
let manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
manager.save_chunk(0, b"wrong data").unwrap();
let expected_hash = &metadata.chunk_hashes[0];
assert!(!manager.verify_chunk(0, expected_hash).unwrap());
}
#[test]
fn test_resume_manager_verify_and_invalidate() {
let temp = tempdir().unwrap();
let good_data = b"good chunk data";
let good_hash = blake3::hash(good_data).to_hex().to_string();
let metadata = FileMetadata {
name: "test.bin".to_string(),
size: 60000,
total_chunks: 2,
chunk_hashes: vec![good_hash.clone(), "badhash".to_string()],
route_blob: String::new(),
encryption_salt: None,
encryption_nonce: None,
compressed: false,
};
let mut manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
manager.save_chunk(0, good_data).unwrap();
manager.mark_chunk_complete(0).unwrap();
manager.save_chunk(1, b"bad data").unwrap();
manager.mark_chunk_complete(1).unwrap();
assert_eq!(manager.completed_count(), 2);
let invalidated = manager.verify_and_invalidate_corrupted().unwrap();
assert_eq!(invalidated, vec![1]);
assert_eq!(manager.completed_count(), 1);
assert!(manager.state.completed_chunks.contains(&0));
assert!(!manager.state.completed_chunks.contains(&1));
}
#[test]
fn test_resume_manager_cleanup() {
let temp = tempdir().unwrap();
let metadata = create_test_metadata(3);
let manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
manager.save_chunk(0, b"data").unwrap();
let resume_dir = manager.resume_dir.clone();
assert!(resume_dir.exists());
manager.cleanup().unwrap();
assert!(!resume_dir.exists());
}
#[test]
fn test_resume_manager_atomic_state_update() {
let temp = tempdir().unwrap();
let metadata = create_test_metadata(5);
let mut manager = ResumeManager::init(temp.path(), "VLD0:test", &metadata).unwrap();
for i in 0..5 {
manager
.save_chunk(i, format!("chunk{}", i).as_bytes())
.unwrap();
manager.mark_chunk_complete(i).unwrap();
let json = fs::read_to_string(&manager.state_path).unwrap();
let state: ResumeState = serde_json::from_str(&json).unwrap();
assert_eq!(state.completed_count(), i + 1);
}
}
#[test]
fn test_resume_metadata_encrypted() {
let mut metadata = create_test_metadata(3);
metadata.encryption_salt = Some("salt".to_string());
metadata.encryption_nonce = Some("nonce".to_string());
let resume_meta = ResumeMetadata::from(&metadata);
assert!(resume_meta.encrypted);
let metadata = create_test_metadata(3);
let resume_meta = ResumeMetadata::from(&metadata);
assert!(!resume_meta.encrypted);
}
}