Skip to main content

kg/
graph_lock.rs

1use std::fs::{self, OpenOptions};
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::thread;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8
9const DEFAULT_LOCK_TIMEOUT_MS: u64 = 30_000;
10const LOCK_RETRY_SLEEP_MS: u64 = 50;
11
12pub struct GraphWriteLock {
13    path: PathBuf,
14}
15
16impl Drop for GraphWriteLock {
17    fn drop(&mut self) {
18        let _ = fs::remove_file(&self.path);
19    }
20}
21
22pub fn acquire_for_graph(graph_path: &Path) -> Result<GraphWriteLock> {
23    let timeout_ms = std::env::var("KG_GRAPH_LOCK_TIMEOUT_MS")
24        .ok()
25        .and_then(|raw| raw.parse::<u64>().ok())
26        .unwrap_or(DEFAULT_LOCK_TIMEOUT_MS);
27    acquire_for_graph_with_timeout(graph_path, Duration::from_millis(timeout_ms))
28}
29
30pub(crate) fn acquire_for_graph_with_timeout(
31    graph_path: &Path,
32    timeout: Duration,
33) -> Result<GraphWriteLock> {
34    let lock_path = lock_path_for_graph(graph_path);
35    if let Some(parent) = lock_path.parent() {
36        fs::create_dir_all(parent)
37            .with_context(|| format!("failed to create cache directory: {}", parent.display()))?;
38    }
39
40    let start = SystemTime::now();
41    loop {
42        match OpenOptions::new()
43            .create_new(true)
44            .write(true)
45            .open(&lock_path)
46        {
47            Ok(mut file) => {
48                let pid = std::process::id();
49                let now = SystemTime::now()
50                    .duration_since(UNIX_EPOCH)
51                    .unwrap_or_default()
52                    .as_secs();
53                let _ = writeln!(file, "pid={pid} ts={now}");
54                return Ok(GraphWriteLock { path: lock_path });
55            }
56            Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
57                let elapsed = start.elapsed().unwrap_or_default();
58                if elapsed >= timeout {
59                    bail!(
60                        "graph is locked by another process: {} (waited {} ms)",
61                        graph_path.display(),
62                        timeout.as_millis()
63                    );
64                }
65                thread::sleep(Duration::from_millis(LOCK_RETRY_SLEEP_MS));
66            }
67            Err(err) => {
68                return Err(err)
69                    .with_context(|| format!("failed to acquire lock: {}", lock_path.display()));
70            }
71        }
72    }
73}
74
75fn lock_path_for_graph(graph_path: &Path) -> PathBuf {
76    let stem = graph_path
77        .file_stem()
78        .and_then(|s| s.to_str())
79        .unwrap_or("graph");
80    let ext = graph_path
81        .extension()
82        .and_then(|s| s.to_str())
83        .unwrap_or("json");
84    crate::cache_paths::cache_root_for_graph(graph_path).join(format!("{stem}.{ext}.write.lock"))
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90
91    #[test]
92    fn graph_lock_blocks_parallel_writer() {
93        let unique = format!(
94            "kg-lock-{}-{}",
95            std::process::id(),
96            SystemTime::now()
97                .duration_since(UNIX_EPOCH)
98                .unwrap_or_default()
99                .as_nanos()
100        );
101        let root = std::env::temp_dir().join(unique).join(".kg").join("graphs");
102        fs::create_dir_all(&root).expect("create graph root");
103        let graph_path = root.join("fridge.kg");
104        fs::write(&graph_path, "{}").expect("write graph");
105
106        let _first = acquire_for_graph_with_timeout(&graph_path, Duration::from_millis(50))
107            .expect("first lock");
108        let second = acquire_for_graph_with_timeout(&graph_path, Duration::from_millis(120));
109        assert!(second.is_err());
110
111        let parent = root.parent().and_then(|p| p.parent()).expect("temp parent");
112        let _ = fs::remove_dir_all(parent);
113    }
114}