1use std::fs;
12use std::io::Write;
13use std::path::{Path, PathBuf};
14
15use anyhow::{Context, Result};
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LockInfo {
22 pub unit_id: String,
23 pub pid: u32,
24 pub file_path: String,
25 pub locked_at: i64,
26}
27
28#[derive(Debug)]
30pub struct ActiveLock {
31 pub info: LockInfo,
32 pub lock_path: PathBuf,
33}
34
35pub fn lock_dir(mana_dir: &Path) -> Result<PathBuf> {
41 let dir = mana_dir.join("locks");
42 fs::create_dir_all(&dir)
43 .with_context(|| format!("Failed to create locks directory: {}", dir.display()))?;
44 Ok(dir)
45}
46
47fn lock_filename(file_path: &str) -> String {
49 let mut hasher = Sha256::new();
50 hasher.update(file_path.as_bytes());
51 let hash = hasher.finalize();
52 format!("{:x}.lock", hash)
53}
54
55fn lock_file_path(mana_dir: &Path, file_path: &str) -> Result<PathBuf> {
57 let dir = lock_dir(mana_dir)?;
58 Ok(dir.join(lock_filename(file_path)))
59}
60
61pub fn acquire(mana_dir: &Path, unit_id: &str, pid: u32, file_path: &str) -> Result<bool> {
73 let lock_path = lock_file_path(mana_dir, file_path)?;
74
75 let info = LockInfo {
76 unit_id: unit_id.to_string(),
77 pid,
78 file_path: file_path.to_string(),
79 locked_at: chrono::Utc::now().timestamp(),
80 };
81
82 let content = serde_json::to_string_pretty(&info).context("Failed to serialize lock info")?;
83
84 for _ in 0..2 {
86 match fs::OpenOptions::new()
87 .write(true)
88 .create_new(true)
89 .open(&lock_path)
90 {
91 Ok(mut file) => {
92 file.write_all(content.as_bytes()).with_context(|| {
93 format!("Failed to write lock file: {}", lock_path.display())
94 })?;
95 return Ok(true);
96 }
97 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
98 match read_lock(&lock_path) {
99 Some(existing) if existing.unit_id == unit_id && existing.pid == pid => {
100 return Ok(true);
102 }
103 Some(existing) if is_process_alive(existing.pid) => {
104 return Ok(false);
106 }
107 _ => {
108 let _ = fs::remove_file(&lock_path);
110 }
111 }
112 }
113 Err(e) => {
114 return Err(e).with_context(|| {
115 format!("Failed to create lock file: {}", lock_path.display())
116 });
117 }
118 }
119 }
120
121 Ok(false)
123}
124
125pub fn release_all_for_unit(mana_dir: &Path, unit_id: &str) -> Result<u32> {
127 let mut released = 0;
128 for lock in list_locks(mana_dir)? {
129 if lock.info.unit_id == unit_id {
130 let _ = fs::remove_file(&lock.lock_path);
131 released += 1;
132 }
133 }
134 Ok(released)
135}
136
137pub fn clear_all(mana_dir: &Path) -> Result<u32> {
139 let mut cleared = 0;
140 for lock in list_locks(mana_dir)? {
141 let _ = fs::remove_file(&lock.lock_path);
142 cleared += 1;
143 }
144 Ok(cleared)
145}
146
147pub fn list_locks(mana_dir: &Path) -> Result<Vec<ActiveLock>> {
149 let dir = lock_dir(mana_dir)?;
150 let mut locks = Vec::new();
151
152 let entries = match fs::read_dir(&dir) {
153 Ok(entries) => entries,
154 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(locks),
155 Err(e) => return Err(e).context("Failed to read locks directory"),
156 };
157
158 for entry in entries {
159 let entry = entry?;
160 let path = entry.path();
161
162 if path.extension().and_then(|e| e.to_str()) != Some("lock") {
163 continue;
164 }
165
166 match read_lock(&path) {
167 Some(info) if is_process_alive(info.pid) => {
168 locks.push(ActiveLock {
169 info,
170 lock_path: path,
171 });
172 }
173 _ => {
174 let _ = fs::remove_file(&path);
176 }
177 }
178 }
179
180 Ok(locks)
181}
182
183pub fn check_lock(mana_dir: &Path, file_path: &str) -> Result<Option<LockInfo>> {
188 let lock_path = lock_file_path(mana_dir, file_path)?;
189
190 if !lock_path.exists() {
191 return Ok(None);
192 }
193
194 match read_lock(&lock_path) {
195 Some(info) => {
196 if is_process_alive(info.pid) {
197 Ok(Some(info))
198 } else {
199 let _ = fs::remove_file(&lock_path);
201 Ok(None)
202 }
203 }
204 None => {
205 let _ = fs::remove_file(&lock_path);
207 Ok(None)
208 }
209 }
210}
211
212fn read_lock(path: &Path) -> Option<LockInfo> {
217 let content = fs::read_to_string(path).ok()?;
218 serde_json::from_str(&content).ok()
219}
220
221fn is_process_alive(pid: u32) -> bool {
222 let ret = unsafe { libc::kill(pid as i32, 0) };
227 if ret == 0 {
228 return true;
229 }
230 std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
231}
232
233#[cfg(test)]
238mod tests {
239 use super::*;
240
241 fn temp_mana_dir() -> (tempfile::TempDir, PathBuf) {
242 let dir = tempfile::tempdir().unwrap();
243 let mana_dir = dir.path().join(".mana");
244 fs::create_dir_all(&mana_dir).unwrap();
245 (dir, mana_dir)
246 }
247
248 #[test]
249 fn acquire_and_release_via_unit() {
250 let (_dir, mana_dir) = temp_mana_dir();
251 let pid = std::process::id();
252
253 let acquired = acquire(&mana_dir, "1.1", pid, "/tmp/test.rs").unwrap();
254 assert!(acquired);
255
256 let info = check_lock(&mana_dir, "/tmp/test.rs").unwrap();
257 assert!(info.is_some());
258 assert_eq!(info.unwrap().unit_id, "1.1");
259
260 release_all_for_unit(&mana_dir, "1.1").unwrap();
261
262 let info = check_lock(&mana_dir, "/tmp/test.rs").unwrap();
263 assert!(info.is_none());
264 }
265
266 #[test]
267 fn release_all_for_unit_works() {
268 let (_dir, mana_dir) = temp_mana_dir();
269 let pid = std::process::id();
270
271 acquire(&mana_dir, "2.1", pid, "/tmp/a.rs").unwrap();
272 acquire(&mana_dir, "2.1", pid, "/tmp/b.rs").unwrap();
273 acquire(&mana_dir, "2.2", pid, "/tmp/c.rs").unwrap();
274
275 let released = release_all_for_unit(&mana_dir, "2.1").unwrap();
276 assert_eq!(released, 2);
277
278 assert!(check_lock(&mana_dir, "/tmp/c.rs").unwrap().is_some());
280 assert!(check_lock(&mana_dir, "/tmp/a.rs").unwrap().is_none());
281 }
282
283 #[test]
284 fn list_locks_returns_all() {
285 let (_dir, mana_dir) = temp_mana_dir();
286 let pid = std::process::id();
287
288 acquire(&mana_dir, "3.1", pid, "/tmp/x.rs").unwrap();
289 acquire(&mana_dir, "3.2", pid, "/tmp/y.rs").unwrap();
290
291 let locks = list_locks(&mana_dir).unwrap();
292 assert_eq!(locks.len(), 2);
293 }
294
295 #[test]
296 fn clear_all_removes_everything() {
297 let (_dir, mana_dir) = temp_mana_dir();
298 let pid = std::process::id();
299
300 acquire(&mana_dir, "4.1", pid, "/tmp/p.rs").unwrap();
301 acquire(&mana_dir, "4.2", pid, "/tmp/q.rs").unwrap();
302
303 let cleared = clear_all(&mana_dir).unwrap();
304 assert_eq!(cleared, 2);
305
306 let locks = list_locks(&mana_dir).unwrap();
307 assert!(locks.is_empty());
308 }
309
310 #[test]
311 fn stale_lock_is_cleaned() {
312 let (_dir, mana_dir) = temp_mana_dir();
313
314 let lock_path = lock_file_path(&mana_dir, "/tmp/stale.rs").unwrap();
316 let info = LockInfo {
317 unit_id: "5.1".to_string(),
318 pid: 999_999_999, file_path: "/tmp/stale.rs".to_string(),
320 locked_at: 0,
321 };
322 fs::write(&lock_path, serde_json::to_string(&info).unwrap()).unwrap();
323
324 let result = check_lock(&mana_dir, "/tmp/stale.rs").unwrap();
326 assert!(result.is_none());
327 assert!(!lock_path.exists());
328 }
329
330 #[test]
331 fn acquire_cleans_stale_and_succeeds() {
332 let (_dir, mana_dir) = temp_mana_dir();
333
334 let lock_path = lock_file_path(&mana_dir, "/tmp/stale2.rs").unwrap();
336 let info = LockInfo {
337 unit_id: "6.1".to_string(),
338 pid: 999_999_999,
339 file_path: "/tmp/stale2.rs".to_string(),
340 locked_at: 0,
341 };
342 fs::write(&lock_path, serde_json::to_string(&info).unwrap()).unwrap();
343
344 let acquired = acquire(&mana_dir, "6.2", std::process::id(), "/tmp/stale2.rs").unwrap();
346 assert!(acquired);
347 }
348
349 #[test]
350 fn same_owner_reacquire_is_idempotent() {
351 let (_dir, mana_dir) = temp_mana_dir();
352 let pid = std::process::id();
353
354 let first = acquire(&mana_dir, "7.1", pid, "/tmp/idem.rs").unwrap();
355 assert!(first);
356
357 let second = acquire(&mana_dir, "7.1", pid, "/tmp/idem.rs").unwrap();
359 assert!(second);
360
361 let info = check_lock(&mana_dir, "/tmp/idem.rs").unwrap();
363 assert!(info.is_some());
364 assert_eq!(info.unwrap().unit_id, "7.1");
365 }
366
367 #[test]
368 fn different_owner_blocked_by_live_lock() {
369 let (_dir, mana_dir) = temp_mana_dir();
370 let pid = std::process::id();
371
372 let first = acquire(&mana_dir, "8.1", pid, "/tmp/contested.rs").unwrap();
373 assert!(first);
374
375 let second = acquire(&mana_dir, "8.2", pid + 1, "/tmp/contested.rs").unwrap();
377 assert!(!second);
378 }
379
380 #[test]
381 fn list_locks_filters_stale() {
382 let (_dir, mana_dir) = temp_mana_dir();
383 let pid = std::process::id();
384
385 acquire(&mana_dir, "9.1", pid, "/tmp/live.rs").unwrap();
387
388 let stale_path = lock_file_path(&mana_dir, "/tmp/ghost.rs").unwrap();
390 let stale = LockInfo {
391 unit_id: "9.2".to_string(),
392 pid: 999_999_999,
393 file_path: "/tmp/ghost.rs".to_string(),
394 locked_at: 0,
395 };
396 fs::write(&stale_path, serde_json::to_string(&stale).unwrap()).unwrap();
397
398 let locks = list_locks(&mana_dir).unwrap();
400 assert_eq!(locks.len(), 1);
401 assert_eq!(locks[0].info.unit_id, "9.1");
402 assert!(!stale_path.exists());
403 }
404}