Skip to main content

mana_core/
locks.rs

1//! File locking for concurrent agents.
2//!
3//! When `file_locking` is enabled in config, agents lock files they work on
4//! to prevent concurrent writes. Locks are stored as JSON files in `.mana/locks/`.
5//!
6//! Lock lifecycle:
7//! - Pre-emptive: `mana run` locks files listed in the unit's `paths` field on spawn.
8//! - On-write: The pi extension locks files on first write (safety net).
9//! - Release: Locks are released when the agent finishes or is killed.
10
11use std::fs;
12use std::io::Write;
13use std::path::{Path, PathBuf};
14
15use anyhow::{Context, Result};
16use serde::{Deserialize, Serialize};
17use sha2::{Digest, Sha256};
18
19/// Information stored in each lock file.
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LockInfo {
22    pub unit_id: String,
23    pub pid: u32,
24    pub file_path: String,
25    pub locked_at: i64,
26}
27
28/// A lock with its file system path.
29#[derive(Debug)]
30pub struct ActiveLock {
31    pub info: LockInfo,
32    pub lock_path: PathBuf,
33}
34
35// ---------------------------------------------------------------------------
36// Lock directory
37// ---------------------------------------------------------------------------
38
39/// Return the locks directory, creating it if needed.
40pub fn lock_dir(mana_dir: &Path) -> Result<PathBuf> {
41    let dir = mana_dir.join("locks");
42    fs::create_dir_all(&dir)
43        .with_context(|| format!("Failed to create locks directory: {}", dir.display()))?;
44    Ok(dir)
45}
46
47/// Hash a file path to a lock filename.
48fn lock_filename(file_path: &str) -> String {
49    let mut hasher = Sha256::new();
50    hasher.update(file_path.as_bytes());
51    let hash = hasher.finalize();
52    format!("{:x}.lock", hash)
53}
54
55/// Full path to the lock file for a given file path.
56fn lock_file_path(mana_dir: &Path, file_path: &str) -> Result<PathBuf> {
57    let dir = lock_dir(mana_dir)?;
58    Ok(dir.join(lock_filename(file_path)))
59}
60
61// ---------------------------------------------------------------------------
62// Lock operations
63// ---------------------------------------------------------------------------
64
65/// Acquire a lock on a file for a unit agent.
66///
67/// Returns `Ok(true)` if the lock was acquired, `Ok(false)` if already locked
68/// by another live process. Stale locks (dead PID) are automatically cleaned.
69///
70/// Uses atomic file creation (`O_CREAT | O_EXCL`) to prevent TOCTOU races
71/// when multiple agents attempt to lock the same file concurrently.
72pub fn acquire(mana_dir: &Path, unit_id: &str, pid: u32, file_path: &str) -> Result<bool> {
73    let lock_path = lock_file_path(mana_dir, file_path)?;
74
75    let info = LockInfo {
76        unit_id: unit_id.to_string(),
77        pid,
78        file_path: file_path.to_string(),
79        locked_at: chrono::Utc::now().timestamp(),
80    };
81
82    let content = serde_json::to_string_pretty(&info).context("Failed to serialize lock info")?;
83
84    // Attempt atomic creation — retries once after cleaning a stale lock.
85    for _ in 0..2 {
86        match fs::OpenOptions::new()
87            .write(true)
88            .create_new(true)
89            .open(&lock_path)
90        {
91            Ok(mut file) => {
92                file.write_all(content.as_bytes()).with_context(|| {
93                    format!("Failed to write lock file: {}", lock_path.display())
94                })?;
95                return Ok(true);
96            }
97            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
98                match read_lock(&lock_path) {
99                    Some(existing) if existing.unit_id == unit_id && existing.pid == pid => {
100                        // Same owner re-acquiring — idempotent success
101                        return Ok(true);
102                    }
103                    Some(existing) if is_process_alive(existing.pid) => {
104                        // Held by a live process
105                        return Ok(false);
106                    }
107                    _ => {
108                        // Stale or corrupt — remove and retry
109                        let _ = fs::remove_file(&lock_path);
110                    }
111                }
112            }
113            Err(e) => {
114                return Err(e).with_context(|| {
115                    format!("Failed to create lock file: {}", lock_path.display())
116                });
117            }
118        }
119    }
120
121    // Both attempts failed — another agent won the race
122    Ok(false)
123}
124
125/// Release all locks held by a specific unit.
126pub fn release_all_for_unit(mana_dir: &Path, unit_id: &str) -> Result<u32> {
127    let mut released = 0;
128    for lock in list_locks(mana_dir)? {
129        if lock.info.unit_id == unit_id {
130            let _ = fs::remove_file(&lock.lock_path);
131            released += 1;
132        }
133    }
134    Ok(released)
135}
136
137/// Force-clear all locks.
138pub fn clear_all(mana_dir: &Path) -> Result<u32> {
139    let mut cleared = 0;
140    for lock in list_locks(mana_dir)? {
141        let _ = fs::remove_file(&lock.lock_path);
142        cleared += 1;
143    }
144    Ok(cleared)
145}
146
147/// List all active locks, cleaning stale ones (dead PIDs) along the way.
148pub fn list_locks(mana_dir: &Path) -> Result<Vec<ActiveLock>> {
149    let dir = lock_dir(mana_dir)?;
150    let mut locks = Vec::new();
151
152    let entries = match fs::read_dir(&dir) {
153        Ok(entries) => entries,
154        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(locks),
155        Err(e) => return Err(e).context("Failed to read locks directory"),
156    };
157
158    for entry in entries {
159        let entry = entry?;
160        let path = entry.path();
161
162        if path.extension().and_then(|e| e.to_str()) != Some("lock") {
163            continue;
164        }
165
166        match read_lock(&path) {
167            Some(info) if is_process_alive(info.pid) => {
168                locks.push(ActiveLock {
169                    info,
170                    lock_path: path,
171                });
172            }
173            _ => {
174                // Stale or corrupt — clean up
175                let _ = fs::remove_file(&path);
176            }
177        }
178    }
179
180    Ok(locks)
181}
182
183/// Check if a file is currently locked.
184///
185/// Returns the lock info if locked by a live process, None otherwise.
186/// Automatically cleans stale locks.
187pub fn check_lock(mana_dir: &Path, file_path: &str) -> Result<Option<LockInfo>> {
188    let lock_path = lock_file_path(mana_dir, file_path)?;
189
190    if !lock_path.exists() {
191        return Ok(None);
192    }
193
194    match read_lock(&lock_path) {
195        Some(info) => {
196            if is_process_alive(info.pid) {
197                Ok(Some(info))
198            } else {
199                // Stale — clean it up
200                let _ = fs::remove_file(&lock_path);
201                Ok(None)
202            }
203        }
204        None => {
205            // Corrupt — clean it up
206            let _ = fs::remove_file(&lock_path);
207            Ok(None)
208        }
209    }
210}
211
212// ---------------------------------------------------------------------------
213// Helpers
214// ---------------------------------------------------------------------------
215
216fn read_lock(path: &Path) -> Option<LockInfo> {
217    let content = fs::read_to_string(path).ok()?;
218    serde_json::from_str(&content).ok()
219}
220
221fn is_process_alive(pid: u32) -> bool {
222    // signal 0 checks existence without actually signaling.
223    // Returns 0 if alive and we have permission to signal.
224    // Returns -1 with EPERM if alive but owned by another user.
225    // Returns -1 with ESRCH if the process does not exist.
226    let ret = unsafe { libc::kill(pid as i32, 0) };
227    if ret == 0 {
228        return true;
229    }
230    std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
231}
232
233// ---------------------------------------------------------------------------
234// Tests
235// ---------------------------------------------------------------------------
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    fn temp_mana_dir() -> (tempfile::TempDir, PathBuf) {
242        let dir = tempfile::tempdir().unwrap();
243        let mana_dir = dir.path().join(".mana");
244        fs::create_dir_all(&mana_dir).unwrap();
245        (dir, mana_dir)
246    }
247
248    #[test]
249    fn acquire_and_release_via_unit() {
250        let (_dir, mana_dir) = temp_mana_dir();
251        let pid = std::process::id();
252
253        let acquired = acquire(&mana_dir, "1.1", pid, "/tmp/test.rs").unwrap();
254        assert!(acquired);
255
256        let info = check_lock(&mana_dir, "/tmp/test.rs").unwrap();
257        assert!(info.is_some());
258        assert_eq!(info.unwrap().unit_id, "1.1");
259
260        release_all_for_unit(&mana_dir, "1.1").unwrap();
261
262        let info = check_lock(&mana_dir, "/tmp/test.rs").unwrap();
263        assert!(info.is_none());
264    }
265
266    #[test]
267    fn release_all_for_unit_works() {
268        let (_dir, mana_dir) = temp_mana_dir();
269        let pid = std::process::id();
270
271        acquire(&mana_dir, "2.1", pid, "/tmp/a.rs").unwrap();
272        acquire(&mana_dir, "2.1", pid, "/tmp/b.rs").unwrap();
273        acquire(&mana_dir, "2.2", pid, "/tmp/c.rs").unwrap();
274
275        let released = release_all_for_unit(&mana_dir, "2.1").unwrap();
276        assert_eq!(released, 2);
277
278        // c.rs should still be locked
279        assert!(check_lock(&mana_dir, "/tmp/c.rs").unwrap().is_some());
280        assert!(check_lock(&mana_dir, "/tmp/a.rs").unwrap().is_none());
281    }
282
283    #[test]
284    fn list_locks_returns_all() {
285        let (_dir, mana_dir) = temp_mana_dir();
286        let pid = std::process::id();
287
288        acquire(&mana_dir, "3.1", pid, "/tmp/x.rs").unwrap();
289        acquire(&mana_dir, "3.2", pid, "/tmp/y.rs").unwrap();
290
291        let locks = list_locks(&mana_dir).unwrap();
292        assert_eq!(locks.len(), 2);
293    }
294
295    #[test]
296    fn clear_all_removes_everything() {
297        let (_dir, mana_dir) = temp_mana_dir();
298        let pid = std::process::id();
299
300        acquire(&mana_dir, "4.1", pid, "/tmp/p.rs").unwrap();
301        acquire(&mana_dir, "4.2", pid, "/tmp/q.rs").unwrap();
302
303        let cleared = clear_all(&mana_dir).unwrap();
304        assert_eq!(cleared, 2);
305
306        let locks = list_locks(&mana_dir).unwrap();
307        assert!(locks.is_empty());
308    }
309
310    #[test]
311    fn stale_lock_is_cleaned() {
312        let (_dir, mana_dir) = temp_mana_dir();
313
314        // Write a lock with a dead PID
315        let lock_path = lock_file_path(&mana_dir, "/tmp/stale.rs").unwrap();
316        let info = LockInfo {
317            unit_id: "5.1".to_string(),
318            pid: 999_999_999, // almost certainly dead
319            file_path: "/tmp/stale.rs".to_string(),
320            locked_at: 0,
321        };
322        fs::write(&lock_path, serde_json::to_string(&info).unwrap()).unwrap();
323
324        // check_lock should clean it
325        let result = check_lock(&mana_dir, "/tmp/stale.rs").unwrap();
326        assert!(result.is_none());
327        assert!(!lock_path.exists());
328    }
329
330    #[test]
331    fn acquire_cleans_stale_and_succeeds() {
332        let (_dir, mana_dir) = temp_mana_dir();
333
334        // Plant a stale lock
335        let lock_path = lock_file_path(&mana_dir, "/tmp/stale2.rs").unwrap();
336        let info = LockInfo {
337            unit_id: "6.1".to_string(),
338            pid: 999_999_999,
339            file_path: "/tmp/stale2.rs".to_string(),
340            locked_at: 0,
341        };
342        fs::write(&lock_path, serde_json::to_string(&info).unwrap()).unwrap();
343
344        // Acquire should clean the stale lock and succeed
345        let acquired = acquire(&mana_dir, "6.2", std::process::id(), "/tmp/stale2.rs").unwrap();
346        assert!(acquired);
347    }
348
349    #[test]
350    fn same_owner_reacquire_is_idempotent() {
351        let (_dir, mana_dir) = temp_mana_dir();
352        let pid = std::process::id();
353
354        let first = acquire(&mana_dir, "7.1", pid, "/tmp/idem.rs").unwrap();
355        assert!(first);
356
357        // Same unit + PID re-acquiring should succeed, not block itself
358        let second = acquire(&mana_dir, "7.1", pid, "/tmp/idem.rs").unwrap();
359        assert!(second);
360
361        // Lock should still be valid
362        let info = check_lock(&mana_dir, "/tmp/idem.rs").unwrap();
363        assert!(info.is_some());
364        assert_eq!(info.unwrap().unit_id, "7.1");
365    }
366
367    #[test]
368    fn different_owner_blocked_by_live_lock() {
369        let (_dir, mana_dir) = temp_mana_dir();
370        let pid = std::process::id();
371
372        let first = acquire(&mana_dir, "8.1", pid, "/tmp/contested.rs").unwrap();
373        assert!(first);
374
375        // Different unit trying to acquire the same file should be blocked
376        let second = acquire(&mana_dir, "8.2", pid + 1, "/tmp/contested.rs").unwrap();
377        assert!(!second);
378    }
379
380    #[test]
381    fn list_locks_filters_stale() {
382        let (_dir, mana_dir) = temp_mana_dir();
383        let pid = std::process::id();
384
385        // One live lock
386        acquire(&mana_dir, "9.1", pid, "/tmp/live.rs").unwrap();
387
388        // One stale lock (manually planted)
389        let stale_path = lock_file_path(&mana_dir, "/tmp/ghost.rs").unwrap();
390        let stale = LockInfo {
391            unit_id: "9.2".to_string(),
392            pid: 999_999_999,
393            file_path: "/tmp/ghost.rs".to_string(),
394            locked_at: 0,
395        };
396        fs::write(&stale_path, serde_json::to_string(&stale).unwrap()).unwrap();
397
398        // list_locks should return only the live one and clean the stale one
399        let locks = list_locks(&mana_dir).unwrap();
400        assert_eq!(locks.len(), 1);
401        assert_eq!(locks[0].info.unit_id, "9.1");
402        assert!(!stale_path.exists());
403    }
404}