Skip to main content

talon_core/sync/
lock.rs

1//! Advisory file lock with stale-PID detection.
2//!
3//! Ports `services/talon/sync/sync-lock.ts`. The lock is implemented as a
4//! `JSON` file containing the holding process's `pid` and start timestamp.
5//! Acquisition opens the file with `O_CREAT | O_EXCL`; if creation fails
6//! because the file exists, the holder's `pid` is checked with
7//! [`rustix::process::test_kill_process`]. If the holder is gone the stale
8//! lock is removed and a single retry is attempted.
9
10use std::path::{Path, PathBuf};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13use fs_err as fs;
14#[cfg(not(windows))]
15use rustix::process::{Pid, test_kill_process};
16use serde::{Deserialize, Serialize};
17#[cfg(windows)]
18use windows_sys::Win32::Foundation::{CloseHandle, STILL_ACTIVE};
19#[cfg(windows)]
20use windows_sys::Win32::System::Threading::{
21    GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION,
22};
23
24/// On-disk representation of the lock file.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct LockMetadata {
27    /// Operating-system process id of the holder.
28    pub pid: u32,
29    /// Wall-clock acquisition time, milliseconds since `UNIX_EPOCH`.
30    pub started_at: u64,
31}
32
33/// Errors returned by [`acquire_sync_lock`].
34#[derive(Debug, thiserror::Error)]
35#[non_exhaustive]
36pub enum SyncLockError {
37    /// Another live process holds the lock.
38    #[error("another talon sync is in progress")]
39    Busy,
40    /// Filesystem operation failed for a reason other than contention.
41    #[error("lock io error: {0}")]
42    Io(#[from] std::io::Error),
43}
44
45/// RAII guard that releases the lock on drop if (and only if) the current
46/// process still owns it. Use [`acquire_sync_lock`] to construct one.
47#[must_use = "the lock is released as soon as the guard is dropped"]
48#[derive(Debug)]
49pub struct SyncLock {
50    path: PathBuf,
51    pid: u32,
52}
53
54impl SyncLock {
55    /// Returns the path of the lock file backing this guard.
56    #[must_use]
57    pub fn path(&self) -> &Path {
58        &self.path
59    }
60}
61
62impl Drop for SyncLock {
63    fn drop(&mut self) {
64        if let Some(holder) = read_pid(&self.path)
65            && holder == self.pid
66        {
67            let _ = fs::remove_file(&self.path);
68        }
69    }
70}
71
72/// Acquires the sync lock at `path`, creating any missing parent directories.
73///
74/// Performs one stale-lock recovery attempt: if the file exists but its
75/// recorded `pid` is no longer alive, it is removed and re-acquired. If the
76/// recorded `pid` is alive (or the file cannot be parsed), [`SyncLockError::Busy`]
77/// is returned.
78///
79/// # Errors
80///
81/// Returns [`SyncLockError::Busy`] under contention and [`SyncLockError::Io`]
82/// for any other filesystem failure.
83pub fn acquire_sync_lock(path: &Path) -> Result<SyncLock, SyncLockError> {
84    if let Some(parent) = path.parent()
85        && !parent.as_os_str().is_empty()
86    {
87        fs::create_dir_all(parent)?;
88    }
89    if try_create_lock(path)? {
90        return Ok(SyncLock {
91            path: path.to_path_buf(),
92            pid: std::process::id(),
93        });
94    }
95
96    if try_remove_stale_lock(path)? && try_create_lock(path)? {
97        return Ok(SyncLock {
98            path: path.to_path_buf(),
99            pid: std::process::id(),
100        });
101    }
102    Err(SyncLockError::Busy)
103}
104
105/// Returns `true` if the lock file at `path` exists and names a live process.
106#[must_use]
107pub fn is_sync_lock_held_by_live_process(path: &Path) -> bool {
108    let Some(pid) = read_pid(path) else {
109        return false;
110    };
111    is_process_alive(pid)
112}
113
114fn try_create_lock(path: &Path) -> Result<bool, SyncLockError> {
115    use std::io::Write as _;
116    match fs::OpenOptions::new()
117        .write(true)
118        .create_new(true)
119        .open(path)
120    {
121        Ok(mut file) => {
122            let metadata = LockMetadata {
123                pid: std::process::id(),
124                started_at: SystemTime::now()
125                    .duration_since(UNIX_EPOCH)
126                    .map_or(0, |d| u64::try_from(d.as_millis()).unwrap_or(u64::MAX)),
127            };
128            let payload = serde_json::to_string(&metadata).unwrap_or_else(|_| String::from("{}"));
129            file.write_all(payload.as_bytes())?;
130            Ok(true)
131        }
132        Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => Ok(false),
133        Err(err) => Err(SyncLockError::Io(err)),
134    }
135}
136
137fn try_remove_stale_lock(path: &Path) -> Result<bool, SyncLockError> {
138    let Some(pid) = read_pid(path) else {
139        // Unparseable lock — treat as stale and try removal.
140        match fs::remove_file(path) {
141            Ok(()) => return Ok(true),
142            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(true),
143            Err(err) => return Err(SyncLockError::Io(err)),
144        }
145    };
146    if is_process_alive(pid) {
147        return Ok(false);
148    }
149    match fs::remove_file(path) {
150        Ok(()) => Ok(true),
151        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(true),
152        Err(err) => Err(SyncLockError::Io(err)),
153    }
154}
155
156fn read_pid(path: &Path) -> Option<u32> {
157    let raw = fs::read_to_string(path).ok()?;
158    let metadata: LockMetadata = serde_json::from_str(&raw).ok()?;
159    Some(metadata.pid)
160}
161
162#[cfg(not(windows))]
163fn is_process_alive(pid: u32) -> bool {
164    let Ok(raw_pid) = i32::try_from(pid) else {
165        return false;
166    };
167    let Some(typed_pid) = Pid::from_raw(raw_pid) else {
168        return false;
169    };
170    test_kill_process(typed_pid).is_ok()
171}
172
173#[cfg(windows)]
174#[expect(
175    unsafe_code,
176    reason = "checking Windows process liveness requires Win32 handle APIs"
177)]
178fn is_process_alive(pid: u32) -> bool {
179    let handle = unsafe { OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid) };
180    if handle.is_null() {
181        return false;
182    }
183
184    let mut exit_code = 0;
185    let is_alive = unsafe { GetExitCodeProcess(handle, &mut exit_code) } != 0
186        && exit_code == u32::try_from(STILL_ACTIVE).unwrap_or(259);
187    unsafe {
188        CloseHandle(handle);
189    }
190    is_alive
191}
192
193#[cfg(test)]
194#[allow(clippy::unwrap_used, clippy::expect_used)]
195mod tests {
196    use super::*;
197    use std::env::temp_dir;
198    use std::sync::atomic::{AtomicU64, Ordering};
199
200    fn unique_lock_path(label: &str) -> PathBuf {
201        static COUNTER: AtomicU64 = AtomicU64::new(0);
202        let n = COUNTER.fetch_add(1, Ordering::Relaxed);
203        let pid = std::process::id();
204        temp_dir()
205            .join(format!("talon-lock-test-{label}-{pid}-{n}"))
206            .join("sync.lock")
207    }
208
209    #[test]
210    fn lock_is_acquired_then_released_on_drop() {
211        let path = unique_lock_path("rt");
212        {
213            let lock = acquire_sync_lock(&path).unwrap();
214            assert_eq!(lock.path(), path.as_path());
215            assert!(path.exists());
216        }
217        // After drop, the file should be gone.
218        assert!(!path.exists());
219        let _ = fs::remove_dir_all(path.parent().unwrap());
220    }
221
222    #[test]
223    fn second_acquisition_while_held_returns_busy() {
224        let path = unique_lock_path("busy");
225        let _held = acquire_sync_lock(&path).unwrap();
226        let result = acquire_sync_lock(&path);
227        assert!(matches!(result, Err(SyncLockError::Busy)));
228        // _held drops at end of test, releasing the lock.
229        let _ = fs::remove_dir_all(path.parent().unwrap());
230    }
231
232    #[test]
233    fn stale_lock_with_dead_pid_is_recovered() {
234        let path = unique_lock_path("stale");
235        if let Some(parent) = path.parent() {
236            fs::create_dir_all(parent).unwrap();
237        }
238        // Pid 1 is the init process — definitely alive — so we want a guaranteed
239        // dead pid. Use the highest possible pid value, which on every common
240        // OS is far above any real PID.
241        let stale = LockMetadata {
242            pid: u32::MAX - 1,
243            started_at: 0,
244        };
245        fs::write(&path, serde_json::to_string(&stale).unwrap()).unwrap();
246
247        let lock = acquire_sync_lock(&path).unwrap();
248        assert!(path.exists());
249        drop(lock);
250        let _ = fs::remove_dir_all(path.parent().unwrap());
251    }
252
253    #[test]
254    fn unparseable_lock_is_recovered() {
255        let path = unique_lock_path("garbage");
256        if let Some(parent) = path.parent() {
257            fs::create_dir_all(parent).unwrap();
258        }
259        fs::write(&path, "not json at all").unwrap();
260        let _lock = acquire_sync_lock(&path).unwrap();
261        let _ = fs::remove_dir_all(path.parent().unwrap());
262    }
263
264    #[test]
265    fn is_held_returns_false_when_no_lock_file() {
266        let path = unique_lock_path("missing");
267        assert!(!is_sync_lock_held_by_live_process(&path));
268    }
269}