ckg-storage 1.0.4

CozoDB-backed storage layer for ckg (per-repo + registry DBs).
Documentation
//! `manifest.json` at `~/.codebase-kg/manifest.json` — index of all known repos.
//!
//! ## Concurrency
//!
//! Two `ckg scan` processes can race on `read → mutate → write_atomic`.
//! Without locking the loser's update silently disappears.
//! `with_lock` takes an exclusive advisory lock on `<manifest>.lock` for
//! the duration of the read-modify-write window so concurrent scans
//! serialize cleanly.
//!
//! ## Durability
//!
//! `write_atomic` does tmp + fsync(file) + rename + fsync(parent). The
//! parent-dir fsync is required on ext4/xfs to make the rename survive a
//! power loss; without it, the file is durable but the directory entry
//! pointing at it can be lost.

use std::collections::BTreeMap;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

use ckg_core::{Error, Result};
use fs2::FileExt;
use serde::{Deserialize, Serialize};

/// M2: RAII guard that releases an advisory `flock` on `Drop` so the lock is
/// always released even if the guarded operation panics or returns early.
struct LockGuard(fs::File);

impl Drop for LockGuard {
    fn drop(&mut self) {
        let _ = FileExt::unlock(&self.0);
    }
}

/// M3: Maximum time to wait for an exclusive lock before giving up.
const LOCK_TIMEOUT: Duration = Duration::from_secs(60);
/// H5: Base poll interval. Actual sleep adds ±25% jitter. See `jittered_poll_interval`.
const LOCK_POLL_BASE_MS: u64 = 50;

/// H5: Returns a jittered sleep duration in [base*0.75, base*1.25].
/// Uses PID + subsecond nanos as a cheap seed — no new dependency needed.
fn jittered_poll_interval() -> Duration {
    let pid = std::process::id() as u64;
    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.subsec_nanos() as u64)
        .unwrap_or(0);
    let seed = pid.wrapping_mul(6364136223846793005).wrapping_add(nanos);
    let half_base = LOCK_POLL_BASE_MS / 2;
    let jitter = seed % (half_base + 1);
    let ms = LOCK_POLL_BASE_MS.wrapping_sub(half_base / 2).saturating_add(jitter);
    Duration::from_millis(ms)
}

/// One repo entry in the manifest.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ManifestEntry {
    pub repo_id: String,
    pub head_sha: String,
    pub last_indexed_at: String,
    #[serde(default)]
    pub status: String,
}

/// Top-level manifest document.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Manifest {
    pub version: u32,
    #[serde(default)]
    pub repos: BTreeMap<String, ManifestEntry>,
}

impl Default for Manifest {
    fn default() -> Self {
        Self {
            version: 1,
            repos: BTreeMap::new(),
        }
    }
}

impl Manifest {
    /// Load manifest from `path`; returns `Manifest::default()` if missing.
    ///
    /// On parse failure the error is `Error::Json(serde_json::Error)` which
    /// includes line/column position. CLI display via `Display` produces e.g.
    /// `"expected value at line 3 column 7"` — actionable without additional
    /// wrapping (L6 verification).
    pub fn read(path: &Path) -> Result<Self> {
        if !path.exists() {
            return Ok(Self::default());
        }
        let data = fs::read(path)?;
        let m: Manifest = serde_json::from_slice(&data)?;
        Ok(m)
    }

    /// Atomic durable write: serialize → tmp file → fsync → rename →
    /// fsync(parent). The parent-dir fsync is required so the rename is
    /// durable across power loss on ext4/xfs.
    pub fn write_atomic(&self, path: &Path) -> Result<()> {
        let parent = path
            .parent()
            .map(|p| p.to_path_buf())
            .unwrap_or_else(|| PathBuf::from("."));
        fs::create_dir_all(&parent)?;
        // Tempfile sibling so the rename is intra-directory (atomic on POSIX).
        let tmp = path.with_extension("json.tmp");
        {
            let mut f = fs::File::create(&tmp)?;
            let data = serde_json::to_vec_pretty(self)?;
            f.write_all(&data)?;
            f.sync_all()?;
        }
        fs::rename(&tmp, path)?;
        // Persist the rename: open parent dir and fsync it. On macOS this
        // is best-effort (HFS/APFS don't strictly require it but tolerate
        // the call); on ext4/xfs it's load-bearing.
        if let Ok(dir) = fs::File::open(&parent) {
            // M1: surface fsync failures so operators can investigate.
            if let Err(e) = dir.sync_all() {
                tracing::warn!(
                    path = %parent.display(),
                    err = %e,
                    "parent-dir fsync failed; rename may not be durable on crash"
                );
            }
        }
        Ok(())
    }
}

/// Run `op` while holding an exclusive advisory lock on `<path>.lock`.
/// Concurrent `ckg scan` processes serialize cleanly through this lock,
/// so a read-modify-write sequence on the manifest cannot drop updates.
///
/// The lock file persists across runs (it's just a sentinel — its
/// contents are unused), and the lock is released when `op` returns or
/// unwinds.
///
/// ## Lock file persistence (M12)
///
/// The `.json.lock` sentinel file persists across runs by design — it is
/// never deleted. On Unix `flock(2)` releases automatically when the file
/// descriptor closes (normal exit, panic unwind, SIGKILL). Leftover lock
/// files are harmless: `flock` operates on the inode, not the file
/// contents. Deleting the lock file is safe but unnecessary.
///
/// ## Filesystem caveats (NI3)
///
/// `fs2::FileExt::lock_exclusive` uses `flock(2)` on Unix.
/// **Cross-host** semantics depend on the filesystem:
/// - **NFS**: the server must run `lockd`/`nlm`. With `mount -o nolock`
///   the call is silently a no-op — multiple writers will race.
/// - **Docker bind-mounts** inherit host filesystem semantics
///   (typically ext4 → fine).
/// - **`tmpfs`**: doesn't honor flock; lock degrades to advisory no-op.
///
/// CK is single-user-per-machine in practice, so cross-host contention
/// is out of scope. If `$CKG_HOME` lives on a flock-incapable FS, the
/// manifest read-modify-write window is unprotected against concurrent
/// processes.
pub fn with_lock<F, T>(manifest_path: &Path, op: F) -> Result<T>
where
    F: FnOnce() -> Result<T>,
{
    let parent = manifest_path
        .parent()
        .map(|p| p.to_path_buf())
        .unwrap_or_else(|| PathBuf::from("."));
    fs::create_dir_all(&parent)?;
    let lock_path = manifest_path.with_extension("json.lock");
    // OpenOptions::create+write produces a sentinel; we don't actually
    // care about contents.
    let lock_file = fs::OpenOptions::new()
        .create(true)
        .truncate(false)
        .write(true)
        .open(&lock_path)
        .map_err(|e| {
            Error::Storage(format!(
                "could not open manifest lock at {}: {e}",
                lock_path.display()
            ))
        })?;
    // M3: poll with try_lock_exclusive up to LOCK_TIMEOUT before giving up.
    // This surfaces contention as an explicit error rather than blocking
    // forever — a stuck long-running `ckg scan` in another terminal doesn't
    // freeze the UI silently.
    let deadline = Instant::now() + LOCK_TIMEOUT;
    loop {
        match lock_file.try_lock_exclusive() {
            Ok(()) => break,
            Err(e) if Instant::now() < deadline => {
                // Contended — sleep briefly with jitter, then retry.
                let _ = e; // suppress unused warning
                std::thread::sleep(jittered_poll_interval());
            }
            Err(e) => {
                return Err(Error::Storage(format!(
                    "could not acquire manifest lock at {} after {}s: {e}",
                    lock_path.display(),
                    LOCK_TIMEOUT.as_secs()
                )));
            }
        }
    }
    // M2: RAII guard releases the lock on Drop — even on panic or early return.
    let _guard = LockGuard(lock_file);
    op()
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[test]
    fn read_missing_returns_default() {
        let dir = tempdir().unwrap();
        let m = Manifest::read(&dir.path().join("nope.json")).unwrap();
        assert_eq!(m.version, 1);
        assert!(m.repos.is_empty());
    }

    #[test]
    fn round_trip() {
        let dir = tempdir().unwrap();
        let path = dir.path().join("manifest.json");
        let mut m = Manifest::default();
        m.repos.insert(
            "/some/path".into(),
            ManifestEntry {
                repo_id: "abcd1234abcd1234abcd1234".into(),
                head_sha: "deadbeef".into(),
                last_indexed_at: "2026-05-07T00:00:00Z".into(),
                status: "ok".into(),
            },
        );
        m.write_atomic(&path).unwrap();
        let loaded = Manifest::read(&path).unwrap();
        assert_eq!(loaded.repos.len(), 1);
        assert_eq!(loaded.repos["/some/path"].repo_id, "abcd1234abcd1234abcd1234");
    }

    #[test]
    fn lock_serializes_writers() {
        // Smoke test: lock is acquired and released; second acquisition
        // (sequential) succeeds. True concurrency stress requires
        // multi-process which is hard to express in cargo test.
        let dir = tempdir().unwrap();
        let path = dir.path().join("m.json");
        with_lock(&path, || {
            let m = Manifest::default();
            m.write_atomic(&path)?;
            Ok(())
        })
        .unwrap();
        with_lock(&path, || {
            let _ = Manifest::read(&path)?;
            Ok(())
        })
        .unwrap();
    }

    /// M2: the RAII `LockGuard` must release the lock even when the guarded
    /// closure returns `Err`. The lock file is reopened and re-acquired in the
    /// same process to confirm release (sequential — multi-process flock
    /// semantics can't be tested inside a single cargo-test binary).
    #[test]
    fn lock_guard_releases_on_error_return() {
        let dir = tempdir().unwrap();
        let path = dir.path().join("raii_test.json");
        // First call returns Err — lock must be released.
        let _ = with_lock(&path, || -> Result<()> {
            Err(Error::Storage("simulated failure".into()))
        });
        // Second call must succeed (lock freed by Drop).
        with_lock(&path, || Ok(())).expect("lock must be released after error return");
    }
}