1use std::fs::{self, OpenOptions};
2use std::io::Write;
3use std::path::{Path, PathBuf};
4use std::thread;
5use std::time::{Duration, SystemTime, UNIX_EPOCH};
6
7use anyhow::{Context, Result, bail};
8
9const DEFAULT_LOCK_TIMEOUT_MS: u64 = 30_000;
10const LOCK_RETRY_SLEEP_MS: u64 = 50;
11
12pub struct GraphWriteLock {
13 path: PathBuf,
14}
15
16impl Drop for GraphWriteLock {
17 fn drop(&mut self) {
18 let _ = fs::remove_file(&self.path);
19 }
20}
21
22pub fn acquire_for_graph(graph_path: &Path) -> Result<GraphWriteLock> {
23 let timeout_ms = std::env::var("KG_GRAPH_LOCK_TIMEOUT_MS")
24 .ok()
25 .and_then(|raw| raw.parse::<u64>().ok())
26 .unwrap_or(DEFAULT_LOCK_TIMEOUT_MS);
27 acquire_for_graph_with_timeout(graph_path, Duration::from_millis(timeout_ms))
28}
29
30pub(crate) fn acquire_for_graph_with_timeout(
31 graph_path: &Path,
32 timeout: Duration,
33) -> Result<GraphWriteLock> {
34 let lock_path = lock_path_for_graph(graph_path);
35 if let Some(parent) = lock_path.parent() {
36 fs::create_dir_all(parent)
37 .with_context(|| format!("failed to create cache directory: {}", parent.display()))?;
38 }
39
40 let start = SystemTime::now();
41 loop {
42 match OpenOptions::new()
43 .create_new(true)
44 .write(true)
45 .open(&lock_path)
46 {
47 Ok(mut file) => {
48 let pid = std::process::id();
49 let now = SystemTime::now()
50 .duration_since(UNIX_EPOCH)
51 .unwrap_or_default()
52 .as_secs();
53 let _ = writeln!(file, "pid={pid} ts={now}");
54 return Ok(GraphWriteLock { path: lock_path });
55 }
56 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
57 let elapsed = start.elapsed().unwrap_or_default();
58 if elapsed >= timeout {
59 bail!(
60 "graph is locked by another process: {} (waited {} ms)",
61 graph_path.display(),
62 timeout.as_millis()
63 );
64 }
65 thread::sleep(Duration::from_millis(LOCK_RETRY_SLEEP_MS));
66 }
67 Err(err) => {
68 return Err(err)
69 .with_context(|| format!("failed to acquire lock: {}", lock_path.display()));
70 }
71 }
72 }
73}
74
75fn lock_path_for_graph(graph_path: &Path) -> PathBuf {
76 let stem = graph_path
77 .file_stem()
78 .and_then(|s| s.to_str())
79 .unwrap_or("graph");
80 let ext = graph_path
81 .extension()
82 .and_then(|s| s.to_str())
83 .unwrap_or("json");
84 crate::cache_paths::cache_root_for_graph(graph_path).join(format!("{stem}.{ext}.write.lock"))
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90
91 #[test]
92 fn graph_lock_blocks_parallel_writer() {
93 let unique = format!(
94 "kg-lock-{}-{}",
95 std::process::id(),
96 SystemTime::now()
97 .duration_since(UNIX_EPOCH)
98 .unwrap_or_default()
99 .as_nanos()
100 );
101 let root = std::env::temp_dir().join(unique).join(".kg").join("graphs");
102 fs::create_dir_all(&root).expect("create graph root");
103 let graph_path = root.join("fridge.kg");
104 fs::write(&graph_path, "{}").expect("write graph");
105
106 let _first = acquire_for_graph_with_timeout(&graph_path, Duration::from_millis(50))
107 .expect("first lock");
108 let second = acquire_for_graph_with_timeout(&graph_path, Duration::from_millis(120));
109 assert!(second.is_err());
110
111 let parent = root.parent().and_then(|p| p.parent()).expect("temp parent");
112 let _ = fs::remove_dir_all(parent);
113 }
114}