Skip to main content

rvf_runtime/
locking.rs

1//! Writer lock management for single-writer / multi-reader concurrency.
2//!
3//! Implements the advisory lock file protocol from spec 09:
4//! - Lock file at `{path}.lock` with PID, hostname, timestamp, UUID
5//! - Stale lock detection via PID liveness and age threshold
6//! - Atomic creation via O_CREAT | O_EXCL
7
8use std::fs;
9use std::io::{self, Read, Write};
10use std::path::{Path, PathBuf};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13/// The lock file magic: "RVLF" in ASCII (big-endian).
14const LOCK_MAGIC: u32 = 0x52564C46;
15
16/// Lock protocol version.
17const LOCK_VERSION: u32 = 1;
18
19/// Lock file total size in bytes.
20const LOCK_FILE_SIZE: usize = 104;
21
22/// Stale lock age threshold for same-host (30 seconds in nanoseconds).
23const STALE_AGE_NS: u64 = 30_000_000_000;
24
25/// Represents an acquired writer lock.
26pub(crate) struct WriterLock {
27    lock_path: PathBuf,
28    writer_id: [u8; 16],
29}
30
31impl WriterLock {
32    /// Attempt to acquire the writer lock for the given RVF file path.
33    ///
34    /// Returns `Ok(WriterLock)` on success, or an `io::Error` if the lock
35    /// is held by another active writer.
36    pub(crate) fn acquire(rvf_path: &Path) -> io::Result<Self> {
37        let lock_path = lock_path_for(rvf_path);
38        let pid = std::process::id();
39        let hostname = get_hostname();
40        let timestamp_ns = now_ns();
41        let writer_id = random_uuid();
42
43        // Build lock file content.
44        let content = build_lock_content(pid, &hostname, timestamp_ns, &writer_id);
45
46        // Attempt atomic creation.
47        match atomic_create_file(&lock_path, &content) {
48            Ok(()) => Ok(WriterLock { lock_path, writer_id }),
49            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
50                // Check for stale lock.
51                if try_break_stale_lock(&lock_path)? {
52                    // Retry after breaking stale lock.
53                    atomic_create_file(&lock_path, &content)?;
54                    Ok(WriterLock { lock_path, writer_id })
55                } else {
56                    Err(io::Error::new(
57                        io::ErrorKind::WouldBlock,
58                        "another writer holds the lock",
59                    ))
60                }
61            }
62            Err(e) => Err(e),
63        }
64    }
65
66    /// Release the writer lock.
67    ///
68    /// Verifies that the lock file still contains our writer_id before
69    /// removing it, preventing deletion of a lock legitimately taken over.
70    pub(crate) fn release(self) -> io::Result<()> {
71        // Verify our writer_id is still in the lock.
72        if let Ok(content) = fs::read(&self.lock_path) {
73            if content.len() >= LOCK_FILE_SIZE {
74                let stored_id = &content[0x50..0x60];
75                if stored_id == self.writer_id {
76                    let _ = fs::remove_file(&self.lock_path);
77                }
78            }
79        }
80        Ok(())
81    }
82
83    /// Check if the lock is still held by us.
84    #[allow(dead_code)]
85    pub(crate) fn is_valid(&self) -> bool {
86        if let Ok(content) = fs::read(&self.lock_path) {
87            if content.len() >= LOCK_FILE_SIZE {
88                let stored_id = &content[0x50..0x60];
89                return stored_id == self.writer_id;
90            }
91        }
92        false
93    }
94}
95
96impl Drop for WriterLock {
97    fn drop(&mut self) {
98        // Best-effort release on drop.
99        if let Ok(content) = fs::read(&self.lock_path) {
100            if content.len() >= LOCK_FILE_SIZE {
101                let stored_id = &content[0x50..0x60];
102                if stored_id == self.writer_id {
103                    let _ = fs::remove_file(&self.lock_path);
104                }
105            }
106        }
107    }
108}
109
110/// Compute the lock file path for a given RVF file.
111pub(crate) fn lock_path_for(rvf_path: &Path) -> PathBuf {
112    let mut p = rvf_path.as_os_str().to_os_string();
113    p.push(".lock");
114    PathBuf::from(p)
115}
116
117/// Try to break a stale lock. Returns `true` if the lock was broken.
118fn try_break_stale_lock(lock_path: &Path) -> io::Result<bool> {
119    let content = match fs::read(lock_path) {
120        Ok(c) => c,
121        Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(true),
122        Err(e) => return Err(e),
123    };
124
125    if content.len() < LOCK_FILE_SIZE {
126        // Invalid lock file — delete it.
127        let _ = fs::remove_file(lock_path);
128        return Ok(true);
129    }
130
131    // Validate magic.
132    let magic = u32::from_le_bytes([content[0], content[1], content[2], content[3]]);
133    if magic != LOCK_MAGIC {
134        let _ = fs::remove_file(lock_path);
135        return Ok(true);
136    }
137
138    // Read PID and timestamp.
139    let lock_pid = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
140    let lock_timestamp = u64::from_le_bytes([
141        content[0x48], content[0x49], content[0x4A], content[0x4B],
142        content[0x4C], content[0x4D], content[0x4E], content[0x4F],
143    ]);
144
145    let current_time = now_ns();
146    let age = current_time.saturating_sub(lock_timestamp);
147
148    // Read hostname.
149    let lock_hostname = read_hostname_from_lock(&content[0x08..0x48]);
150    let current_hostname = get_hostname();
151    let same_host = lock_hostname == current_hostname;
152
153    // Check if PID is alive (same host only).
154    let pid_alive = if same_host {
155        is_pid_alive(lock_pid)
156    } else {
157        // Cannot check remote PID; rely on age only.
158        true
159    };
160
161    // Stale conditions:
162    // - PID is dead AND age > threshold (same host)
163    // - Age > extended threshold (cross-host)
164    let threshold = if same_host { STALE_AGE_NS } else { 300_000_000_000 };
165
166    if !pid_alive && age > threshold {
167        let _ = fs::remove_file(lock_path);
168        return Ok(true);
169    }
170
171    if !same_host && age > threshold {
172        let _ = fs::remove_file(lock_path);
173        return Ok(true);
174    }
175
176    Ok(false)
177}
178
179fn build_lock_content(pid: u32, hostname: &str, timestamp_ns: u64, writer_id: &[u8; 16]) -> Vec<u8> {
180    let mut buf = vec![0u8; LOCK_FILE_SIZE];
181
182    // Magic (0x00).
183    buf[0..4].copy_from_slice(&LOCK_MAGIC.to_le_bytes());
184    // PID (0x04).
185    buf[4..8].copy_from_slice(&pid.to_le_bytes());
186    // Hostname (0x08, max 64 bytes, null-terminated).
187    let host_bytes = hostname.as_bytes();
188    let copy_len = host_bytes.len().min(62); // Reserve byte for null terminator
189    buf[0x08..0x08 + copy_len].copy_from_slice(&host_bytes[..copy_len]);
190    buf[0x08 + copy_len] = 0; // Explicit null terminator
191    // Timestamp (0x48).
192    buf[0x48..0x50].copy_from_slice(&timestamp_ns.to_le_bytes());
193    // Writer ID (0x50).
194    buf[0x50..0x60].copy_from_slice(writer_id);
195    // Lock version (0x60).
196    buf[0x60..0x64].copy_from_slice(&LOCK_VERSION.to_le_bytes());
197    // CRC32 (0x64) — simplified: we use a basic checksum.
198    let crc = simple_crc32(&buf[0..0x64]);
199    buf[0x64..0x68].copy_from_slice(&crc.to_le_bytes());
200
201    buf
202}
203
204fn atomic_create_file(path: &Path, content: &[u8]) -> io::Result<()> {
205    // Use O_CREAT | O_EXCL semantics via OpenOptions.
206    let mut file = fs::OpenOptions::new()
207        .write(true)
208        .create_new(true)
209        .open(path)?;
210    file.write_all(content)?;
211    file.sync_all()?;
212    Ok(())
213}
214
215fn read_hostname_from_lock(buf: &[u8]) -> String {
216    let end = buf.iter().position(|&b| b == 0).unwrap_or(buf.len());
217    String::from_utf8_lossy(&buf[..end]).into_owned()
218}
219
220fn get_hostname() -> String {
221    std::env::var("HOSTNAME").unwrap_or_else(|_| {
222        fs::read_to_string("/etc/hostname")
223            .unwrap_or_else(|_| "unknown".into())
224            .trim()
225            .to_string()
226    })
227}
228
229fn now_ns() -> u64 {
230    SystemTime::now()
231        .duration_since(UNIX_EPOCH)
232        .map(|d| d.as_nanos() as u64)
233        .unwrap_or(0)
234}
235
236fn random_uuid() -> [u8; 16] {
237    // Simple random UUID generation using /dev/urandom or time-based fallback.
238    let mut buf = [0u8; 16];
239    if let Ok(mut f) = fs::File::open("/dev/urandom") {
240        let _ = f.read_exact(&mut buf);
241    } else {
242        // Fallback: use timestamp + PID.
243        let ts = now_ns();
244        buf[0..8].copy_from_slice(&ts.to_le_bytes());
245        buf[8..12].copy_from_slice(&std::process::id().to_le_bytes());
246    }
247    buf
248}
249
250fn is_pid_alive(pid: u32) -> bool {
251    // On Unix, kill(pid, 0) checks process existence without sending a signal.
252    // A return of 0 means the process exists and we have permission to signal it.
253    // EPERM (errno = 1) means the process exists but belongs to a different user
254    // -- still alive. Any other error (ESRCH = no such process) means dead.
255    #[cfg(unix)]
256    {
257        let ret = libc_kill(pid as i32, 0);
258        if ret == 0 {
259            return true;
260        }
261        // Check errno for EPERM -- process exists but we lack permission
262        let err = unsafe { *libc_errno() };
263        err == EPERM
264    }
265    #[cfg(not(unix))]
266    {
267        // On non-Unix platforms, we cannot determine PID liveness.
268        // Conservatively assume alive to avoid breaking stale locks
269        // that might still be held. The age-based fallback in
270        // try_break_stale_lock will handle truly stale locks.
271        let _ = pid;
272        true
273    }
274}
275
276#[cfg(unix)]
277extern "C" {
278    fn kill(pid: i32, sig: i32) -> i32;
279    fn __errno_location() -> *mut i32;
280}
281
282/// Permission denied errno -- process exists but belongs to another user.
283#[cfg(unix)]
284const EPERM: i32 = 1;
285
286#[cfg(unix)]
287fn libc_kill(pid: i32, sig: i32) -> i32 {
288    unsafe { kill(pid, sig) }
289}
290
291/// Get a pointer to the thread-local errno value.
292#[cfg(unix)]
293fn libc_errno() -> *mut i32 {
294    unsafe { __errno_location() }
295}
296
297/// Simple CRC32 (not CRC32C) for lock file checksumming.
298fn simple_crc32(data: &[u8]) -> u32 {
299    let mut crc: u32 = 0xFFFFFFFF;
300    for &byte in data {
301        crc ^= byte as u32;
302        for _ in 0..8 {
303            if crc & 1 != 0 {
304                crc = (crc >> 1) ^ 0xEDB88320;
305            } else {
306                crc >>= 1;
307            }
308        }
309    }
310    !crc
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use tempfile::TempDir;
317
318    #[test]
319    fn lock_path_computation() {
320        let p = Path::new("/tmp/data.rvf");
321        assert_eq!(lock_path_for(p), PathBuf::from("/tmp/data.rvf.lock"));
322    }
323
324    #[test]
325    fn acquire_and_release() {
326        let dir = TempDir::new().unwrap();
327        let rvf_path = dir.path().join("test.rvf");
328        fs::write(&rvf_path, b"").unwrap();
329
330        let lock = WriterLock::acquire(&rvf_path).unwrap();
331        assert!(lock.is_valid());
332
333        // Second acquisition should fail.
334        let result = WriterLock::acquire(&rvf_path);
335        assert!(result.is_err());
336
337        lock.release().unwrap();
338
339        // Now acquisition should succeed again.
340        let lock2 = WriterLock::acquire(&rvf_path).unwrap();
341        assert!(lock2.is_valid());
342    }
343
344    #[test]
345    fn stale_lock_detection() {
346        let dir = TempDir::new().unwrap();
347        let rvf_path = dir.path().join("test2.rvf");
348        fs::write(&rvf_path, b"").unwrap();
349        let lock_path = lock_path_for(&rvf_path);
350
351        // Write a lock with PID 999999999 (almost certainly dead) and old timestamp.
352        let fake_pid = 999999999u32;
353        let old_ts = now_ns().saturating_sub(60_000_000_000); // 60s ago
354        let fake_id = [0xABu8; 16];
355        let content = build_lock_content(fake_pid, &get_hostname(), old_ts, &fake_id);
356        fs::write(&lock_path, &content).unwrap();
357
358        // Should be able to acquire despite existing lock (stale).
359        let lock = WriterLock::acquire(&rvf_path).unwrap();
360        assert!(lock.is_valid());
361    }
362
363    #[test]
364    fn simple_crc32_works() {
365        let data = b"hello";
366        let crc = simple_crc32(data);
367        assert_ne!(crc, 0);
368        // Same input produces same output.
369        assert_eq!(crc, simple_crc32(data));
370    }
371}