ckg-storage 1.1.0

CozoDB-backed storage layer for ckg (per-repo + registry DBs).
Documentation
//! Per-repo advisory flock — used to serialise destructive operations
//! (`ckg index` write phases vs `ckg remove`'s `remove_dir_all`) on the
//! same per-repo Cozo DB.
//!
//! ## Why this exists
//!
//! RocksDB takes a process-wide exclusive lock on its DB directory at
//! `DbInstance::new`, which serialises two concurrent `ckg index` runs
//! against the same repo. But that lock does NOT prevent
//! `std::fs::remove_dir_all` from another process — `rm -rf` doesn't
//! consult RocksDB's lock file. Wave-7 mitigated the manifest-side
//! consequence (a `CURRENT`-file gate skips re-inserting a stale
//! entry) but the *index process* still writes to a directory that's
//! being deleted underneath it, producing orphaned RocksDB files /
//! inode leaks.
//!
//! `with_repo_lock(repo_id, base, op)` takes an exclusive flock on a
//! sentinel file at `<base>/workspace_folders/<repo_id>.lock` for the
//! lifetime of `op`. The lock file lives ALONGSIDE the DB directory
//! (not inside it), so `remove_dir_all` of `<base>/workspace_folders/
//! <repo_id>/` doesn't destroy the lock state.
//!
//! Both `ckg index`'s `index_one` and `ckg remove`'s deletion path
//! must wrap their work in this guard — otherwise the lock provides
//! no benefit.
//!
//! ## Filesystem caveats
//!
//! Inherits all the limitations of `manifest::with_lock`: NFS without
//! `lockd`, tmpfs, and similar degrade `flock(2)` to a no-op. CK is
//! single-user-per-machine in practice; cross-host contention is out
//! of scope. See `manifest::with_lock` for the full caveat list.

use std::fs;
use std::path::Path;
use std::time::{Duration, Instant};

use ckg_core::{Error, Result};
use fs2::FileExt;

/// H5: Compute a jittered sleep duration using PID + monotonic nanos as a
/// lightweight PRNG seed. No external dep required. Returns a value in
/// [base*0.75, base*1.25] — ±25% of the base interval — which is enough
/// to desynchronize two competing processes that entered the poll loop at
/// the same wall-clock time.
fn jittered_poll_interval() -> Duration {
    // Mix PID and nanos with a simple multiply-xor to spread the low bits.
    let pid = std::process::id() as u64;
    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.subsec_nanos() as u64)
        .unwrap_or(0);
    let seed = pid.wrapping_mul(6364136223846793005).wrapping_add(nanos);
    // Map seed into [0, base/2] then offset by base*3/4 → [base*3/4, base*5/4].
    let half_base = LOCK_POLL_BASE_MS / 2;
    let jitter = seed % (half_base + 1); // [0, half_base]
    let ms = LOCK_POLL_BASE_MS.wrapping_sub(half_base / 2).saturating_add(jitter);
    Duration::from_millis(ms)
}

/// M2: RAII guard that releases an advisory `flock` on `Drop`.
struct LockGuard(fs::File);

impl Drop for LockGuard {
    fn drop(&mut self) {
        let _ = FileExt::unlock(&self.0);
    }
}

/// M3: Maximum time to wait for an exclusive lock before giving up.
const LOCK_TIMEOUT: Duration = Duration::from_secs(60);
/// H5: Base poll interval. Actual sleep adds ±25% jitter computed from
/// PID + current nanos to desynchronize competing processes without
/// introducing a new dependency. See `jittered_poll_interval`.
const LOCK_POLL_BASE_MS: u64 = 50;

/// Run `op` while holding an exclusive advisory lock on this repo's
/// sentinel file. Used to serialise destructive operations on the
/// per-repo DB across processes.
///
/// `repo_id` is the 24-hex repo identifier (`RepoId::as_str()`).
/// `base` is the `$CKG_HOME` root.
pub fn with_repo_lock<F, T>(repo_id: &str, base: &Path, op: F) -> Result<T>
where
    F: FnOnce() -> Result<T>,
{
    let parent = base.join("workspace_folders");
    fs::create_dir_all(&parent)?;
    // Sentinel lives ALONGSIDE the per-repo dir so `remove_dir_all` of
    // the dir doesn't take the lock with it.
    let lock_path = parent.join(format!("{repo_id}.lock"));
    let lock_file = fs::OpenOptions::new()
        .create(true)
        .truncate(false)
        .write(true)
        .open(&lock_path)
        .map_err(|e| {
            Error::Storage(format!(
                "could not open per-repo lock at {}: {e}",
                lock_path.display()
            ))
        })?;
    // M3: poll with try_lock_exclusive up to LOCK_TIMEOUT before giving up.
    // Surfaces contention as an explicit error rather than blocking forever.
    let deadline = Instant::now() + LOCK_TIMEOUT;
    loop {
        match lock_file.try_lock_exclusive() {
            Ok(()) => break,
            Err(e) if Instant::now() < deadline => {
                let _ = e;
                std::thread::sleep(jittered_poll_interval());
            }
            Err(e) => {
                return Err(Error::Storage(format!(
                    "could not acquire per-repo lock at {} after {}s: {e}",
                    lock_path.display(),
                    LOCK_TIMEOUT.as_secs()
                )));
            }
        }
    }
    // M2: RAII guard releases the lock on Drop — even on panic or early return.
    let _guard = LockGuard(lock_file);
    op()
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;

    #[test]
    fn lock_acquired_and_released() {
        // Smoke test: sequential acquisitions both succeed (single-process).
        let dir = tempdir().unwrap();
        let id = "abcdef0123456789abcd1234";
        with_repo_lock(id, dir.path(), || Ok(())).unwrap();
        with_repo_lock(id, dir.path(), || Ok(())).unwrap();
    }

    /// M2: RAII guard releases the lock even when the closure returns Err.
    #[test]
    fn lock_guard_releases_on_error_return() {
        let dir = tempdir().unwrap();
        let id = "abcdef0123456789abcd9999";
        let _ = with_repo_lock(id, dir.path(), || -> ckg_core::Result<()> {
            Err(ckg_core::Error::Storage("simulated failure".into()))
        });
        // Second call must succeed (lock freed by Drop).
        with_repo_lock(id, dir.path(), || Ok(()))
            .expect("lock must be released after error return");
    }

    #[test]
    fn lock_file_persists_after_inner_dir_removal() {
        // The lock file must live OUTSIDE the per-repo dir so a
        // `remove_dir_all` of the dir doesn't take the lock with it.
        let dir = tempdir().unwrap();
        let id = "0123456789abcdef01234567";
        let workspace = dir.path().join("workspace_folders");
        fs::create_dir_all(workspace.join(id)).unwrap();
        with_repo_lock(id, dir.path(), || {
            // Simulate `ckg remove` deleting the per-repo dir while
            // the lock is held.
            fs::remove_dir_all(workspace.join(id))?;
            Ok(())
        })
        .unwrap();
        // Lock file at <workspace>/<id>.lock must still exist.
        assert!(workspace.join(format!("{id}.lock")).is_file());
    }
}