sparsync 0.1.12

rsync-style high-performance file synchronization over QUIC and Spargio
use crate::protocol::{InitAction, InitFileRequest, InitFileResponse};
use crate::util::parse_quick_check_token;
use anyhow::{Context, Result};
use rand::Rng;
use serde::{Deserialize, Serialize};
use spargio::{RuntimeHandle, fs};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompleteFile {
    pub file_hash: String,
    pub size: u64,
    pub mode: u32,
    pub mtime_sec: i64,
    #[serde(default)]
    pub xattr_sig: Option<u64>,
    pub total_chunks: usize,
    pub updated_at_sec: i64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartialFile {
    pub file_hash: String,
    pub size: u64,
    pub chunk_size: usize,
    pub total_chunks: usize,
    pub completed_chunks: usize,
    pub updated_at_sec: i64,
}

#[derive(Debug, Clone)]
pub struct CompleteFileInput {
    pub relative_path: String,
    pub file_hash: String,
    pub size: u64,
    pub mode: u32,
    pub mtime_sec: i64,
    pub xattr_sig: Option<u64>,
    pub total_chunks: usize,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize)]
struct ResumeState {
    complete: HashMap<String, CompleteFile>,
    partial: HashMap<String, PartialFile>,
}

#[derive(Clone)]
pub struct StateStore {
    handle: RuntimeHandle,
    state_path: PathBuf,
    partial_root: PathBuf,
    inner: Arc<Mutex<ResumeState>>,
    persist_counter: Arc<AtomicUsize>,
    persist_stride: usize,
}

impl StateStore {
    pub async fn open(handle: RuntimeHandle, destination: &Path) -> Result<Self> {
        let state_root = destination.join(".sparsync");
        let partial_root = state_root.join("partials");
        fs::create_dir_all(&handle, &partial_root)
            .await
            .with_context(|| format!("create state root {}", partial_root.display()))?;

        let state_path = state_root.join("state.json");
        let state = match fs::read(&handle, &state_path).await {
            Ok(bytes) => serde_json::from_slice(&bytes)
                .with_context(|| format!("parse state file {}", state_path.display()))?,
            Err(err) if err.kind() == std::io::ErrorKind::NotFound => ResumeState::default(),
            Err(err) => {
                return Err(err)
                    .with_context(|| format!("read state file {}", state_path.display()));
            }
        };

        Ok(Self {
            handle,
            state_path,
            partial_root,
            inner: Arc::new(Mutex::new(state)),
            persist_counter: Arc::new(AtomicUsize::new(0)),
            persist_stride: 1024,
        })
    }

    pub fn partial_root(&self) -> &Path {
        &self.partial_root
    }

    pub fn is_complete_match(
        &self,
        relative_path: &str,
        file_hash: &str,
        size: u64,
    ) -> Result<bool> {
        let guard = self
            .inner
            .lock()
            .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
        if let Some((token_size, token_mtime)) = parse_quick_check_token(file_hash) {
            return Ok(guard.complete.get(relative_path).is_some_and(|done| {
                done.size == size && token_size == size && done.mtime_sec == token_mtime
            }));
        }
        Ok(guard
            .complete
            .get(relative_path)
            .is_some_and(|done| done.file_hash == file_hash && done.size == size))
    }

    pub async fn initialize_file(
        &self,
        req: &InitFileRequest,
        existing_partial_chunks: Option<usize>,
    ) -> Result<InitFileResponse> {
        let now = now_sec();
        let response = {
            let mut guard = self
                .inner
                .lock()
                .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;

            if let Some(done) = guard.complete.get(&req.relative_path) {
                let complete_match = if let Some((token_size, token_mtime)) =
                    parse_quick_check_token(&req.file_hash)
                {
                    done.size == req.size && token_size == req.size && done.mtime_sec == token_mtime
                } else {
                    done.file_hash == req.file_hash && done.size == req.size
                };
                if complete_match {
                    let resp = InitFileResponse {
                        action: InitAction::Skip,
                        next_chunk: req.total_chunks,
                        metadata_sync_required: done.mode != req.mode
                            || done.mtime_sec != req.mtime_sec
                            || done.xattr_sig != req.xattr_sig,
                        message: "already up to date".to_string(),
                    };
                    return Ok(resp);
                }
            }

            let mut next_chunk = 0usize;
            if req.resume {
                if let Some(partial) = guard.partial.get(&req.relative_path) {
                    if partial.file_hash == req.file_hash
                        && partial.size == req.size
                        && partial.chunk_size == req.chunk_size
                    {
                        next_chunk = partial.completed_chunks.min(req.total_chunks);
                    }
                }
                if let Some(existing) = existing_partial_chunks {
                    next_chunk = next_chunk.max(existing.min(req.total_chunks));
                }
            }

            guard.partial.insert(
                req.relative_path.clone(),
                PartialFile {
                    file_hash: req.file_hash.clone(),
                    size: req.size,
                    chunk_size: req.chunk_size,
                    total_chunks: req.total_chunks,
                    completed_chunks: next_chunk,
                    updated_at_sec: now,
                },
            );

            InitFileResponse {
                action: InitAction::Upload,
                next_chunk,
                metadata_sync_required: false,
                message: if next_chunk > 0 {
                    format!("resuming at chunk {next_chunk}")
                } else {
                    "starting upload".to_string()
                },
            }
        };

        self.persist_if_needed(false).await?;
        Ok(response)
    }

    pub async fn current_chunk(&self, key: &str) -> Result<Option<usize>> {
        let guard = self
            .inner
            .lock()
            .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
        Ok(guard.partial.get(key).map(|p| p.completed_chunks))
    }

    pub async fn update_partial_progress(
        &self,
        key: &str,
        completed_chunks: usize,
        persist: bool,
    ) -> Result<usize> {
        let now = now_sec();
        let next_chunk = {
            let mut guard = self
                .inner
                .lock()
                .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
            let partial = guard
                .partial
                .get_mut(key)
                .ok_or_else(|| anyhow::anyhow!("no partial state for {key}"))?;

            if completed_chunks > partial.completed_chunks {
                partial.completed_chunks = completed_chunks.min(partial.total_chunks);
            }
            partial.updated_at_sec = now;

            partial.completed_chunks
        };

        if persist {
            self.persist_if_needed(false).await?;
        }
        Ok(next_chunk)
    }

    pub async fn complete_file(
        &self,
        relative_path: &str,
        file_hash: &str,
        size: u64,
        mode: u32,
        mtime_sec: i64,
        xattr_sig: Option<u64>,
        total_chunks: usize,
    ) -> Result<()> {
        let now = now_sec();
        {
            let mut guard = self
                .inner
                .lock()
                .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;

            guard.partial.remove(relative_path);
            guard.complete.insert(
                relative_path.to_string(),
                CompleteFile {
                    file_hash: file_hash.to_string(),
                    size,
                    mode,
                    mtime_sec,
                    xattr_sig,
                    total_chunks,
                    updated_at_sec: now,
                },
            );
        }

        self.persist_if_needed(false).await
    }

    pub async fn complete_files_batch(&self, entries: &[CompleteFileInput]) -> Result<()> {
        if entries.is_empty() {
            return Ok(());
        }

        let now = now_sec();
        {
            let mut guard = self
                .inner
                .lock()
                .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;

            for entry in entries {
                guard.partial.remove(&entry.relative_path);
                guard.complete.insert(
                    entry.relative_path.clone(),
                    CompleteFile {
                        file_hash: entry.file_hash.clone(),
                        size: entry.size,
                        mode: entry.mode,
                        mtime_sec: entry.mtime_sec,
                        xattr_sig: entry.xattr_sig,
                        total_chunks: entry.total_chunks,
                        updated_at_sec: now,
                    },
                );
            }
        }

        self.persist_if_needed(false).await
    }

    pub async fn flush(&self) -> Result<()> {
        self.persist_if_needed(true).await
    }

    async fn persist(&self, bytes: Vec<u8>) -> Result<()> {
        let tmp = self
            .state_path
            .with_extension(format!("json.tmp.{}", rand::rng().random::<u64>()));
        fs::write(&self.handle, &tmp, bytes)
            .await
            .with_context(|| format!("write state temp {}", tmp.display()))?;
        fs::rename(&self.handle, &tmp, &self.state_path)
            .await
            .with_context(|| {
                format!(
                    "replace state file {} from {}",
                    self.state_path.display(),
                    tmp.display()
                )
            })?;
        Ok(())
    }

    async fn persist_if_needed(&self, force: bool) -> Result<()> {
        let persist_now = if force {
            true
        } else {
            let count = self.persist_counter.fetch_add(1, Ordering::Relaxed) + 1;
            count % self.persist_stride == 0
        };

        if !persist_now {
            return Ok(());
        }

        let snapshot = {
            let guard = self
                .inner
                .lock()
                .map_err(|_| anyhow::anyhow!("state mutex poisoned"))?;
            serde_json::to_vec(&*guard).context("serialize state")?
        };
        self.persist(snapshot).await
    }
}

fn now_sec() -> i64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_secs() as i64)
        .unwrap_or_default()
}