Skip to main content

libgrite_ipc/
lock.rs

1//! Daemon lock management
2//!
3//! The daemon lock prevents multiple processes from owning the same
4//! actor data directory. It uses a lease-based approach with heartbeats.
5
6use std::fs::{self, OpenOptions};
7use std::io::Write;
8use std::path::{Path, PathBuf};
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use serde::{Deserialize, Serialize};
12
13use crate::error::IpcError;
14use crate::DEFAULT_LEASE_MS;
15
16/// Daemon lock stored at `.git/grite/actors/<actor_id>/daemon.lock`
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DaemonLock {
19    /// Process ID of the lock holder
20    pub pid: u32,
21    /// When the daemon started (Unix timestamp in ms)
22    pub started_ts: u64,
23    /// Repository root path
24    pub repo_root: String,
25    /// Actor ID (hex-encoded)
26    pub actor_id: String,
27    /// Stable host identifier
28    pub host_id: String,
29    /// Unix socket path (e.g., "/tmp/grite-daemon.sock")
30    pub ipc_endpoint: String,
31    /// Lease duration in milliseconds
32    pub lease_ms: u64,
33    /// Last heartbeat timestamp (Unix timestamp in ms)
34    pub last_heartbeat_ts: u64,
35    /// When the lock expires (Unix timestamp in ms)
36    pub expires_ts: u64,
37}
38
39impl DaemonLock {
40    /// Create a new daemon lock
41    pub fn new(
42        pid: u32,
43        repo_root: String,
44        actor_id: String,
45        host_id: String,
46        ipc_endpoint: String,
47    ) -> Self {
48        let now = current_time_ms();
49        Self {
50            pid,
51            started_ts: now,
52            repo_root,
53            actor_id,
54            host_id,
55            ipc_endpoint,
56            lease_ms: DEFAULT_LEASE_MS,
57            last_heartbeat_ts: now,
58            expires_ts: now + DEFAULT_LEASE_MS,
59        }
60    }
61
62    /// Create a lock with a custom lease duration
63    pub fn with_lease(mut self, lease_ms: u64) -> Self {
64        let now = current_time_ms();
65        self.lease_ms = lease_ms;
66        self.expires_ts = now + lease_ms;
67        self
68    }
69
70    /// Check if the lock has expired
71    pub fn is_expired(&self) -> bool {
72        current_time_ms() > self.expires_ts
73    }
74
75    /// Check if the lock is held by this process
76    pub fn is_owned_by_current_process(&self) -> bool {
77        self.pid == std::process::id()
78    }
79
80    /// Remaining time until expiration in milliseconds
81    pub fn time_remaining_ms(&self) -> u64 {
82        self.expires_ts.saturating_sub(current_time_ms())
83    }
84
85    /// Refresh the heartbeat and extend the lease
86    pub fn refresh(&mut self) {
87        let now = current_time_ms();
88        self.last_heartbeat_ts = now;
89        self.expires_ts = now + self.lease_ms;
90    }
91
92    /// Get the lock file path for an actor data directory
93    pub fn lock_path(data_dir: &Path) -> PathBuf {
94        data_dir.join("daemon.lock")
95    }
96
97    /// Read a lock from the filesystem
98    pub fn read(data_dir: &Path) -> Result<Option<Self>, IpcError> {
99        let path = Self::lock_path(data_dir);
100        match fs::read_to_string(&path) {
101            Ok(contents) => {
102                let lock: DaemonLock = serde_json::from_str(&contents)?;
103                Ok(Some(lock))
104            }
105            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
106            Err(e) => Err(e.into()),
107        }
108    }
109
110    /// Write the lock to the filesystem
111    pub fn write(&self, data_dir: &Path) -> Result<(), IpcError> {
112        let path = Self::lock_path(data_dir);
113        let contents = serde_json::to_string_pretty(self)?;
114        fs::write(&path, contents)?;
115        Ok(())
116    }
117
118    /// Remove the lock file
119    pub fn remove(data_dir: &Path) -> Result<(), IpcError> {
120        let path = Self::lock_path(data_dir);
121        match fs::remove_file(&path) {
122            Ok(()) => Ok(()),
123            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
124            Err(e) => Err(e.into()),
125        }
126    }
127
128    /// Try to acquire the lock
129    ///
130    /// Returns Ok(lock) if acquired, Err if held by another non-expired process.
131    /// Uses atomic file creation to prevent two processes from acquiring
132    /// the lock simultaneously when an expired lock is being replaced.
133    pub fn acquire(
134        data_dir: &Path,
135        repo_root: String,
136        actor_id: String,
137        host_id: String,
138        ipc_endpoint: String,
139    ) -> Result<Self, IpcError> {
140        let path = Self::lock_path(data_dir);
141
142        // Check for existing lock
143        if let Some(existing) = Self::read(data_dir)? {
144            if !existing.is_expired() {
145                return Err(IpcError::LockHeld {
146                    pid: existing.pid,
147                    expires_in_ms: existing.time_remaining_ms(),
148                });
149            }
150            // Lock is expired — remove it so we can create exclusively
151            let _ = std::fs::remove_file(&path);
152        }
153
154        let lock = DaemonLock::new(
155            std::process::id(),
156            repo_root,
157            actor_id,
158            host_id,
159            ipc_endpoint,
160        );
161
162        let contents = serde_json::to_string_pretty(&lock)?;
163
164        // O_CREAT | O_EXCL: fails if another process created the file
165        // between our remove and this create.
166        match OpenOptions::new().write(true).create_new(true).open(&path) {
167            Ok(mut f) => {
168                f.write_all(contents.as_bytes())?;
169                Ok(lock)
170            }
171            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
172                // Another process beat us to it between our remove and create
173                Err(IpcError::LockRace)
174            }
175            Err(e) => Err(e.into()),
176        }
177    }
178
179    /// Release the lock (only if owned by current process)
180    pub fn release(data_dir: &Path) -> Result<(), IpcError> {
181        if let Some(lock) = Self::read(data_dir)? {
182            if lock.is_owned_by_current_process() {
183                Self::remove(data_dir)?;
184            }
185        }
186        Ok(())
187    }
188}
189
190/// Get current time in milliseconds since Unix epoch
191fn current_time_ms() -> u64 {
192    SystemTime::now()
193        .duration_since(UNIX_EPOCH)
194        .unwrap_or_default()
195        .as_millis() as u64
196}
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201    use tempfile::TempDir;
202
203    #[test]
204    fn test_lock_creation() {
205        let lock = DaemonLock::new(
206            1234,
207            "/repo".to_string(),
208            "actor123".to_string(),
209            "host456".to_string(),
210            "/tmp/test.sock".to_string(),
211        );
212
213        assert_eq!(lock.pid, 1234);
214        assert_eq!(lock.repo_root, "/repo");
215        assert_eq!(lock.actor_id, "actor123");
216        assert!(!lock.is_expired());
217    }
218
219    #[test]
220    fn test_lock_expiration() {
221        let mut lock = DaemonLock::new(
222            1234,
223            "/repo".to_string(),
224            "actor".to_string(),
225            "host".to_string(),
226            "/tmp/test.sock".to_string(),
227        );
228
229        // Set expiration to the past
230        lock.expires_ts = 0;
231        assert!(lock.is_expired());
232
233        // Refresh should extend the lease
234        lock.refresh();
235        assert!(!lock.is_expired());
236    }
237
238    #[test]
239    fn test_lock_read_write() {
240        let temp = TempDir::new().unwrap();
241        let data_dir = temp.path();
242
243        let lock = DaemonLock::new(
244            std::process::id(),
245            "/repo".to_string(),
246            "actor123".to_string(),
247            "host456".to_string(),
248            "/tmp/test.sock".to_string(),
249        );
250
251        // Write
252        lock.write(data_dir).unwrap();
253
254        // Read back
255        let read_lock = DaemonLock::read(data_dir).unwrap().unwrap();
256        assert_eq!(read_lock.pid, lock.pid);
257        assert_eq!(read_lock.actor_id, lock.actor_id);
258    }
259
260    #[test]
261    fn test_lock_acquire_release() {
262        let temp = TempDir::new().unwrap();
263        let data_dir = temp.path();
264
265        // Acquire lock
266        let lock = DaemonLock::acquire(
267            data_dir,
268            "/repo".to_string(),
269            "actor".to_string(),
270            "host".to_string(),
271            "/tmp/test.sock".to_string(),
272        )
273        .unwrap();
274
275        assert!(lock.is_owned_by_current_process());
276
277        // Release lock
278        DaemonLock::release(data_dir).unwrap();
279
280        // Lock should be gone
281        assert!(DaemonLock::read(data_dir).unwrap().is_none());
282    }
283
284    #[test]
285    fn test_lock_acquire_expired() {
286        let temp = TempDir::new().unwrap();
287        let data_dir = temp.path();
288
289        // Create an expired lock
290        let mut old_lock = DaemonLock::new(
291            9999, // Different PID
292            "/repo".to_string(),
293            "actor".to_string(),
294            "host".to_string(),
295            "/tmp/old.sock".to_string(),
296        );
297        old_lock.expires_ts = 0; // Expired
298        old_lock.write(data_dir).unwrap();
299
300        // Should be able to acquire over expired lock
301        let new_lock = DaemonLock::acquire(
302            data_dir,
303            "/repo".to_string(),
304            "actor".to_string(),
305            "host".to_string(),
306            "/tmp/new.sock".to_string(),
307        )
308        .unwrap();
309
310        assert!(new_lock.is_owned_by_current_process());
311    }
312
313    #[test]
314    fn test_custom_lease() {
315        let lock = DaemonLock::new(
316            1234,
317            "/repo".to_string(),
318            "actor".to_string(),
319            "host".to_string(),
320            "/tmp/test.sock".to_string(),
321        )
322        .with_lease(60_000); // 60 seconds
323
324        assert_eq!(lock.lease_ms, 60_000);
325    }
326}