kg-cli 0.2.14

A knowledge graph CLI tool for managing structured information
Documentation
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

use anyhow::{Context, Result, bail};

const DEFAULT_LOCK_TIMEOUT_MS: u64 = 30_000;
const LOCK_RETRY_SLEEP_MS: u64 = 50;

pub struct GraphWriteLock {
    path: PathBuf,
}

impl Drop for GraphWriteLock {
    fn drop(&mut self) {
        let _ = fs::remove_file(&self.path);
    }
}

pub fn acquire_for_graph(graph_path: &Path) -> Result<GraphWriteLock> {
    let timeout_ms = std::env::var("KG_GRAPH_LOCK_TIMEOUT_MS")
        .ok()
        .and_then(|raw| raw.parse::<u64>().ok())
        .unwrap_or(DEFAULT_LOCK_TIMEOUT_MS);
    acquire_for_graph_with_timeout(graph_path, Duration::from_millis(timeout_ms))
}

pub(crate) fn acquire_for_graph_with_timeout(
    graph_path: &Path,
    timeout: Duration,
) -> Result<GraphWriteLock> {
    let lock_path = lock_path_for_graph(graph_path);
    if let Some(parent) = lock_path.parent() {
        fs::create_dir_all(parent)
            .with_context(|| format!("failed to create cache directory: {}", parent.display()))?;
    }

    let start = SystemTime::now();
    loop {
        match OpenOptions::new()
            .create_new(true)
            .write(true)
            .open(&lock_path)
        {
            Ok(mut file) => {
                let pid = std::process::id();
                let now = SystemTime::now()
                    .duration_since(UNIX_EPOCH)
                    .unwrap_or_default()
                    .as_secs();
                let _ = writeln!(file, "pid={pid} ts={now}");
                return Ok(GraphWriteLock { path: lock_path });
            }
            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
                let elapsed = start.elapsed().unwrap_or_default();
                if elapsed >= timeout {
                    bail!(
                        "graph is locked by another process: {} (waited {} ms)",
                        graph_path.display(),
                        timeout.as_millis()
                    );
                }
                thread::sleep(Duration::from_millis(LOCK_RETRY_SLEEP_MS));
            }
            Err(err) => {
                return Err(err)
                    .with_context(|| format!("failed to acquire lock: {}", lock_path.display()));
            }
        }
    }
}

fn lock_path_for_graph(graph_path: &Path) -> PathBuf {
    let stem = graph_path
        .file_stem()
        .and_then(|s| s.to_str())
        .unwrap_or("graph");
    let ext = graph_path
        .extension()
        .and_then(|s| s.to_str())
        .unwrap_or("json");
    crate::cache_paths::cache_root_for_graph(graph_path).join(format!("{stem}.{ext}.write.lock"))
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn graph_lock_blocks_parallel_writer() {
        let unique = format!(
            "kg-lock-{}-{}",
            std::process::id(),
            SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap_or_default()
                .as_nanos()
        );
        let root = std::env::temp_dir().join(unique).join(".kg").join("graphs");
        fs::create_dir_all(&root).expect("create graph root");
        let graph_path = root.join("fridge.kg");
        fs::write(&graph_path, "{}").expect("write graph");

        let _first = acquire_for_graph_with_timeout(&graph_path, Duration::from_millis(50))
            .expect("first lock");
        let second = acquire_for_graph_with_timeout(&graph_path, Duration::from_millis(120));
        assert!(second.is_err());

        let parent = root.parent().and_then(|p| p.parent()).expect("temp parent");
        let _ = fs::remove_dir_all(parent);
    }
}