Skip to main content

libgrite_ipc/
lock.rs

1//! Daemon lock management
2//!
3//! The daemon lock prevents multiple processes from owning the same
4//! actor data directory. It uses a lease-based approach with heartbeats.
5
6use std::fs;
7use std::path::{Path, PathBuf};
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use serde::{Deserialize, Serialize};
11
12use crate::error::IpcError;
13use crate::DEFAULT_LEASE_MS;
14
15/// Daemon lock stored at `.git/grite/actors/<actor_id>/daemon.lock`
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct DaemonLock {
18    /// Process ID of the lock holder
19    pub pid: u32,
20    /// When the daemon started (Unix timestamp in ms)
21    pub started_ts: u64,
22    /// Repository root path
23    pub repo_root: String,
24    /// Actor ID (hex-encoded)
25    pub actor_id: String,
26    /// Stable host identifier
27    pub host_id: String,
28    /// IPC endpoint (e.g., "ipc:///tmp/grite-daemon.sock")
29    pub ipc_endpoint: String,
30    /// Lease duration in milliseconds
31    pub lease_ms: u64,
32    /// Last heartbeat timestamp (Unix timestamp in ms)
33    pub last_heartbeat_ts: u64,
34    /// When the lock expires (Unix timestamp in ms)
35    pub expires_ts: u64,
36}
37
38impl DaemonLock {
39    /// Create a new daemon lock
40    pub fn new(
41        pid: u32,
42        repo_root: String,
43        actor_id: String,
44        host_id: String,
45        ipc_endpoint: String,
46    ) -> Self {
47        let now = current_time_ms();
48        Self {
49            pid,
50            started_ts: now,
51            repo_root,
52            actor_id,
53            host_id,
54            ipc_endpoint,
55            lease_ms: DEFAULT_LEASE_MS,
56            last_heartbeat_ts: now,
57            expires_ts: now + DEFAULT_LEASE_MS,
58        }
59    }
60
61    /// Create a lock with a custom lease duration
62    pub fn with_lease(mut self, lease_ms: u64) -> Self {
63        let now = current_time_ms();
64        self.lease_ms = lease_ms;
65        self.expires_ts = now + lease_ms;
66        self
67    }
68
69    /// Check if the lock has expired
70    pub fn is_expired(&self) -> bool {
71        current_time_ms() > self.expires_ts
72    }
73
74    /// Check if the lock is held by this process
75    pub fn is_owned_by_current_process(&self) -> bool {
76        self.pid == std::process::id()
77    }
78
79    /// Remaining time until expiration in milliseconds
80    pub fn time_remaining_ms(&self) -> u64 {
81        let now = current_time_ms();
82        if now >= self.expires_ts {
83            0
84        } else {
85            self.expires_ts - now
86        }
87    }
88
89    /// Refresh the heartbeat and extend the lease
90    pub fn refresh(&mut self) {
91        let now = current_time_ms();
92        self.last_heartbeat_ts = now;
93        self.expires_ts = now + self.lease_ms;
94    }
95
96    /// Get the lock file path for an actor data directory
97    pub fn lock_path(data_dir: &Path) -> PathBuf {
98        data_dir.join("daemon.lock")
99    }
100
101    /// Read a lock from the filesystem
102    pub fn read(data_dir: &Path) -> Result<Option<Self>, IpcError> {
103        let path = Self::lock_path(data_dir);
104        if !path.exists() {
105            return Ok(None);
106        }
107
108        let contents = fs::read_to_string(&path)?;
109        let lock: DaemonLock = serde_json::from_str(&contents)?;
110        Ok(Some(lock))
111    }
112
113    /// Write the lock to the filesystem
114    pub fn write(&self, data_dir: &Path) -> Result<(), IpcError> {
115        let path = Self::lock_path(data_dir);
116        let contents = serde_json::to_string_pretty(self)?;
117        fs::write(&path, contents)?;
118        Ok(())
119    }
120
121    /// Remove the lock file
122    pub fn remove(data_dir: &Path) -> Result<(), IpcError> {
123        let path = Self::lock_path(data_dir);
124        if path.exists() {
125            fs::remove_file(&path)?;
126        }
127        Ok(())
128    }
129
130    /// Try to acquire the lock
131    ///
132    /// Returns Ok(lock) if acquired, Err if held by another non-expired process
133    pub fn acquire(
134        data_dir: &Path,
135        repo_root: String,
136        actor_id: String,
137        host_id: String,
138        ipc_endpoint: String,
139    ) -> Result<Self, IpcError> {
140        // Check for existing lock
141        if let Some(existing) = Self::read(data_dir)? {
142            if !existing.is_expired() {
143                return Err(IpcError::LockHeld {
144                    pid: existing.pid,
145                    expires_in_ms: existing.time_remaining_ms(),
146                });
147            }
148            // Lock is expired, we can take it
149        }
150
151        let lock = DaemonLock::new(
152            std::process::id(),
153            repo_root,
154            actor_id,
155            host_id,
156            ipc_endpoint,
157        );
158        lock.write(data_dir)?;
159        Ok(lock)
160    }
161
162    /// Release the lock (only if owned by current process)
163    pub fn release(data_dir: &Path) -> Result<(), IpcError> {
164        if let Some(lock) = Self::read(data_dir)? {
165            if lock.is_owned_by_current_process() {
166                Self::remove(data_dir)?;
167            }
168        }
169        Ok(())
170    }
171}
172
173/// Get current time in milliseconds since Unix epoch
174fn current_time_ms() -> u64 {
175    SystemTime::now()
176        .duration_since(UNIX_EPOCH)
177        .unwrap()
178        .as_millis() as u64
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184    use tempfile::TempDir;
185
186    #[test]
187    fn test_lock_creation() {
188        let lock = DaemonLock::new(
189            1234,
190            "/repo".to_string(),
191            "actor123".to_string(),
192            "host456".to_string(),
193            "ipc:///tmp/test.sock".to_string(),
194        );
195
196        assert_eq!(lock.pid, 1234);
197        assert_eq!(lock.repo_root, "/repo");
198        assert_eq!(lock.actor_id, "actor123");
199        assert!(!lock.is_expired());
200    }
201
202    #[test]
203    fn test_lock_expiration() {
204        let mut lock = DaemonLock::new(
205            1234,
206            "/repo".to_string(),
207            "actor".to_string(),
208            "host".to_string(),
209            "ipc:///tmp/test.sock".to_string(),
210        );
211
212        // Set expiration to the past
213        lock.expires_ts = 0;
214        assert!(lock.is_expired());
215
216        // Refresh should extend the lease
217        lock.refresh();
218        assert!(!lock.is_expired());
219    }
220
221    #[test]
222    fn test_lock_read_write() {
223        let temp = TempDir::new().unwrap();
224        let data_dir = temp.path();
225
226        let lock = DaemonLock::new(
227            std::process::id(),
228            "/repo".to_string(),
229            "actor123".to_string(),
230            "host456".to_string(),
231            "ipc:///tmp/test.sock".to_string(),
232        );
233
234        // Write
235        lock.write(data_dir).unwrap();
236
237        // Read back
238        let read_lock = DaemonLock::read(data_dir).unwrap().unwrap();
239        assert_eq!(read_lock.pid, lock.pid);
240        assert_eq!(read_lock.actor_id, lock.actor_id);
241    }
242
243    #[test]
244    fn test_lock_acquire_release() {
245        let temp = TempDir::new().unwrap();
246        let data_dir = temp.path();
247
248        // Acquire lock
249        let lock = DaemonLock::acquire(
250            data_dir,
251            "/repo".to_string(),
252            "actor".to_string(),
253            "host".to_string(),
254            "ipc:///tmp/test.sock".to_string(),
255        )
256        .unwrap();
257
258        assert!(lock.is_owned_by_current_process());
259
260        // Release lock
261        DaemonLock::release(data_dir).unwrap();
262
263        // Lock should be gone
264        assert!(DaemonLock::read(data_dir).unwrap().is_none());
265    }
266
267    #[test]
268    fn test_lock_acquire_expired() {
269        let temp = TempDir::new().unwrap();
270        let data_dir = temp.path();
271
272        // Create an expired lock
273        let mut old_lock = DaemonLock::new(
274            9999, // Different PID
275            "/repo".to_string(),
276            "actor".to_string(),
277            "host".to_string(),
278            "ipc:///tmp/old.sock".to_string(),
279        );
280        old_lock.expires_ts = 0; // Expired
281        old_lock.write(data_dir).unwrap();
282
283        // Should be able to acquire over expired lock
284        let new_lock = DaemonLock::acquire(
285            data_dir,
286            "/repo".to_string(),
287            "actor".to_string(),
288            "host".to_string(),
289            "ipc:///tmp/new.sock".to_string(),
290        )
291        .unwrap();
292
293        assert!(new_lock.is_owned_by_current_process());
294    }
295
296    #[test]
297    fn test_custom_lease() {
298        let lock = DaemonLock::new(
299            1234,
300            "/repo".to_string(),
301            "actor".to_string(),
302            "host".to_string(),
303            "ipc:///tmp/test.sock".to_string(),
304        )
305        .with_lease(60_000); // 60 seconds
306
307        assert_eq!(lock.lease_ms, 60_000);
308    }
309}