1use std::fs;
9use std::io::{self, Read, Write};
10use std::path::{Path, PathBuf};
11use std::time::{SystemTime, UNIX_EPOCH};
12
13const LOCK_MAGIC: u32 = 0x52564C46;
15
16const LOCK_VERSION: u32 = 1;
18
19const LOCK_FILE_SIZE: usize = 104;
21
22const STALE_AGE_NS: u64 = 30_000_000_000;
24
25pub(crate) struct WriterLock {
27 lock_path: PathBuf,
28 writer_id: [u8; 16],
29}
30
31impl WriterLock {
32 pub(crate) fn acquire(rvf_path: &Path) -> io::Result<Self> {
37 let lock_path = lock_path_for(rvf_path);
38 let pid = std::process::id();
39 let hostname = get_hostname();
40 let timestamp_ns = now_ns();
41 let writer_id = random_uuid();
42
43 let content = build_lock_content(pid, &hostname, timestamp_ns, &writer_id);
45
46 match atomic_create_file(&lock_path, &content) {
48 Ok(()) => Ok(WriterLock { lock_path, writer_id }),
49 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
50 if try_break_stale_lock(&lock_path)? {
52 atomic_create_file(&lock_path, &content)?;
54 Ok(WriterLock { lock_path, writer_id })
55 } else {
56 Err(io::Error::new(
57 io::ErrorKind::WouldBlock,
58 "another writer holds the lock",
59 ))
60 }
61 }
62 Err(e) => Err(e),
63 }
64 }
65
66 pub(crate) fn release(self) -> io::Result<()> {
71 if let Ok(content) = fs::read(&self.lock_path) {
73 if content.len() >= LOCK_FILE_SIZE {
74 let stored_id = &content[0x50..0x60];
75 if stored_id == self.writer_id {
76 let _ = fs::remove_file(&self.lock_path);
77 }
78 }
79 }
80 Ok(())
81 }
82
83 #[allow(dead_code)]
85 pub(crate) fn is_valid(&self) -> bool {
86 if let Ok(content) = fs::read(&self.lock_path) {
87 if content.len() >= LOCK_FILE_SIZE {
88 let stored_id = &content[0x50..0x60];
89 return stored_id == self.writer_id;
90 }
91 }
92 false
93 }
94}
95
96impl Drop for WriterLock {
97 fn drop(&mut self) {
98 if let Ok(content) = fs::read(&self.lock_path) {
100 if content.len() >= LOCK_FILE_SIZE {
101 let stored_id = &content[0x50..0x60];
102 if stored_id == self.writer_id {
103 let _ = fs::remove_file(&self.lock_path);
104 }
105 }
106 }
107 }
108}
109
110pub(crate) fn lock_path_for(rvf_path: &Path) -> PathBuf {
112 let mut p = rvf_path.as_os_str().to_os_string();
113 p.push(".lock");
114 PathBuf::from(p)
115}
116
117fn try_break_stale_lock(lock_path: &Path) -> io::Result<bool> {
119 let content = match fs::read(lock_path) {
120 Ok(c) => c,
121 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(true),
122 Err(e) => return Err(e),
123 };
124
125 if content.len() < LOCK_FILE_SIZE {
126 let _ = fs::remove_file(lock_path);
128 return Ok(true);
129 }
130
131 let magic = u32::from_le_bytes([content[0], content[1], content[2], content[3]]);
133 if magic != LOCK_MAGIC {
134 let _ = fs::remove_file(lock_path);
135 return Ok(true);
136 }
137
138 let lock_pid = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
140 let lock_timestamp = u64::from_le_bytes([
141 content[0x48], content[0x49], content[0x4A], content[0x4B],
142 content[0x4C], content[0x4D], content[0x4E], content[0x4F],
143 ]);
144
145 let current_time = now_ns();
146 let age = current_time.saturating_sub(lock_timestamp);
147
148 let lock_hostname = read_hostname_from_lock(&content[0x08..0x48]);
150 let current_hostname = get_hostname();
151 let same_host = lock_hostname == current_hostname;
152
153 let pid_alive = if same_host {
155 is_pid_alive(lock_pid)
156 } else {
157 true
159 };
160
161 let threshold = if same_host { STALE_AGE_NS } else { 300_000_000_000 };
165
166 if !pid_alive && age > threshold {
167 let _ = fs::remove_file(lock_path);
168 return Ok(true);
169 }
170
171 if !same_host && age > threshold {
172 let _ = fs::remove_file(lock_path);
173 return Ok(true);
174 }
175
176 Ok(false)
177}
178
179fn build_lock_content(pid: u32, hostname: &str, timestamp_ns: u64, writer_id: &[u8; 16]) -> Vec<u8> {
180 let mut buf = vec![0u8; LOCK_FILE_SIZE];
181
182 buf[0..4].copy_from_slice(&LOCK_MAGIC.to_le_bytes());
184 buf[4..8].copy_from_slice(&pid.to_le_bytes());
186 let host_bytes = hostname.as_bytes();
188 let copy_len = host_bytes.len().min(62); buf[0x08..0x08 + copy_len].copy_from_slice(&host_bytes[..copy_len]);
190 buf[0x08 + copy_len] = 0; buf[0x48..0x50].copy_from_slice(×tamp_ns.to_le_bytes());
193 buf[0x50..0x60].copy_from_slice(writer_id);
195 buf[0x60..0x64].copy_from_slice(&LOCK_VERSION.to_le_bytes());
197 let crc = simple_crc32(&buf[0..0x64]);
199 buf[0x64..0x68].copy_from_slice(&crc.to_le_bytes());
200
201 buf
202}
203
204fn atomic_create_file(path: &Path, content: &[u8]) -> io::Result<()> {
205 let mut file = fs::OpenOptions::new()
207 .write(true)
208 .create_new(true)
209 .open(path)?;
210 file.write_all(content)?;
211 file.sync_all()?;
212 Ok(())
213}
214
215fn read_hostname_from_lock(buf: &[u8]) -> String {
216 let end = buf.iter().position(|&b| b == 0).unwrap_or(buf.len());
217 String::from_utf8_lossy(&buf[..end]).into_owned()
218}
219
220fn get_hostname() -> String {
221 std::env::var("HOSTNAME").unwrap_or_else(|_| {
222 fs::read_to_string("/etc/hostname")
223 .unwrap_or_else(|_| "unknown".into())
224 .trim()
225 .to_string()
226 })
227}
228
229fn now_ns() -> u64 {
230 SystemTime::now()
231 .duration_since(UNIX_EPOCH)
232 .map(|d| d.as_nanos() as u64)
233 .unwrap_or(0)
234}
235
236fn random_uuid() -> [u8; 16] {
237 let mut buf = [0u8; 16];
239 if let Ok(mut f) = fs::File::open("/dev/urandom") {
240 let _ = f.read_exact(&mut buf);
241 } else {
242 let ts = now_ns();
244 buf[0..8].copy_from_slice(&ts.to_le_bytes());
245 buf[8..12].copy_from_slice(&std::process::id().to_le_bytes());
246 }
247 buf
248}
249
250fn is_pid_alive(pid: u32) -> bool {
251 #[cfg(unix)]
256 {
257 let ret = libc_kill(pid as i32, 0);
258 if ret == 0 {
259 return true;
260 }
261 let err = unsafe { *libc_errno() };
263 err == EPERM
264 }
265 #[cfg(not(unix))]
266 {
267 let _ = pid;
272 true
273 }
274}
275
276#[cfg(unix)]
277extern "C" {
278 fn kill(pid: i32, sig: i32) -> i32;
279 fn __errno_location() -> *mut i32;
280}
281
282#[cfg(unix)]
284const EPERM: i32 = 1;
285
286#[cfg(unix)]
287fn libc_kill(pid: i32, sig: i32) -> i32 {
288 unsafe { kill(pid, sig) }
289}
290
291#[cfg(unix)]
293fn libc_errno() -> *mut i32 {
294 unsafe { __errno_location() }
295}
296
297fn simple_crc32(data: &[u8]) -> u32 {
299 let mut crc: u32 = 0xFFFFFFFF;
300 for &byte in data {
301 crc ^= byte as u32;
302 for _ in 0..8 {
303 if crc & 1 != 0 {
304 crc = (crc >> 1) ^ 0xEDB88320;
305 } else {
306 crc >>= 1;
307 }
308 }
309 }
310 !crc
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316 use tempfile::TempDir;
317
318 #[test]
319 fn lock_path_computation() {
320 let p = Path::new("/tmp/data.rvf");
321 assert_eq!(lock_path_for(p), PathBuf::from("/tmp/data.rvf.lock"));
322 }
323
324 #[test]
325 fn acquire_and_release() {
326 let dir = TempDir::new().unwrap();
327 let rvf_path = dir.path().join("test.rvf");
328 fs::write(&rvf_path, b"").unwrap();
329
330 let lock = WriterLock::acquire(&rvf_path).unwrap();
331 assert!(lock.is_valid());
332
333 let result = WriterLock::acquire(&rvf_path);
335 assert!(result.is_err());
336
337 lock.release().unwrap();
338
339 let lock2 = WriterLock::acquire(&rvf_path).unwrap();
341 assert!(lock2.is_valid());
342 }
343
344 #[test]
345 fn stale_lock_detection() {
346 let dir = TempDir::new().unwrap();
347 let rvf_path = dir.path().join("test2.rvf");
348 fs::write(&rvf_path, b"").unwrap();
349 let lock_path = lock_path_for(&rvf_path);
350
351 let fake_pid = 999999999u32;
353 let old_ts = now_ns().saturating_sub(60_000_000_000); let fake_id = [0xABu8; 16];
355 let content = build_lock_content(fake_pid, &get_hostname(), old_ts, &fake_id);
356 fs::write(&lock_path, &content).unwrap();
357
358 let lock = WriterLock::acquire(&rvf_path).unwrap();
360 assert!(lock.is_valid());
361 }
362
363 #[test]
364 fn simple_crc32_works() {
365 let data = b"hello";
366 let crc = simple_crc32(data);
367 assert_ne!(crc, 0);
368 assert_eq!(crc, simple_crc32(data));
370 }
371}