1use std::fs::{self, OpenOptions};
7use std::io::Write;
8use std::path::{Path, PathBuf};
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use serde::{Deserialize, Serialize};
12
13use crate::error::IpcError;
14use crate::DEFAULT_LEASE_MS;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct DaemonLock {
19 pub pid: u32,
21 pub started_ts: u64,
23 pub repo_root: String,
25 pub actor_id: String,
27 pub host_id: String,
29 pub ipc_endpoint: String,
31 pub lease_ms: u64,
33 pub last_heartbeat_ts: u64,
35 pub expires_ts: u64,
37}
38
39impl DaemonLock {
40 pub fn new(
42 pid: u32,
43 repo_root: String,
44 actor_id: String,
45 host_id: String,
46 ipc_endpoint: String,
47 ) -> Self {
48 let now = current_time_ms();
49 Self {
50 pid,
51 started_ts: now,
52 repo_root,
53 actor_id,
54 host_id,
55 ipc_endpoint,
56 lease_ms: DEFAULT_LEASE_MS,
57 last_heartbeat_ts: now,
58 expires_ts: now + DEFAULT_LEASE_MS,
59 }
60 }
61
62 pub fn with_lease(mut self, lease_ms: u64) -> Self {
64 let now = current_time_ms();
65 self.lease_ms = lease_ms;
66 self.expires_ts = now + lease_ms;
67 self
68 }
69
70 pub fn is_expired(&self) -> bool {
72 current_time_ms() > self.expires_ts
73 }
74
75 pub fn is_owned_by_current_process(&self) -> bool {
77 self.pid == std::process::id()
78 }
79
80 pub fn time_remaining_ms(&self) -> u64 {
82 self.expires_ts.saturating_sub(current_time_ms())
83 }
84
85 pub fn refresh(&mut self) {
87 let now = current_time_ms();
88 self.last_heartbeat_ts = now;
89 self.expires_ts = now + self.lease_ms;
90 }
91
92 pub fn lock_path(data_dir: &Path) -> PathBuf {
94 data_dir.join("daemon.lock")
95 }
96
97 pub fn read(data_dir: &Path) -> Result<Option<Self>, IpcError> {
99 let path = Self::lock_path(data_dir);
100 match fs::read_to_string(&path) {
101 Ok(contents) => {
102 let lock: DaemonLock = serde_json::from_str(&contents)?;
103 Ok(Some(lock))
104 }
105 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
106 Err(e) => Err(e.into()),
107 }
108 }
109
110 pub fn write(&self, data_dir: &Path) -> Result<(), IpcError> {
112 let path = Self::lock_path(data_dir);
113 let contents = serde_json::to_string_pretty(self)?;
114 fs::write(&path, contents)?;
115 Ok(())
116 }
117
118 pub fn remove(data_dir: &Path) -> Result<(), IpcError> {
120 let path = Self::lock_path(data_dir);
121 match fs::remove_file(&path) {
122 Ok(()) => Ok(()),
123 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
124 Err(e) => Err(e.into()),
125 }
126 }
127
128 pub fn acquire(
134 data_dir: &Path,
135 repo_root: String,
136 actor_id: String,
137 host_id: String,
138 ipc_endpoint: String,
139 ) -> Result<Self, IpcError> {
140 let path = Self::lock_path(data_dir);
141
142 if let Some(existing) = Self::read(data_dir)? {
144 if !existing.is_expired() {
145 return Err(IpcError::LockHeld {
146 pid: existing.pid,
147 expires_in_ms: existing.time_remaining_ms(),
148 });
149 }
150 let _ = std::fs::remove_file(&path);
152 }
153
154 let lock = DaemonLock::new(
155 std::process::id(),
156 repo_root,
157 actor_id,
158 host_id,
159 ipc_endpoint,
160 );
161
162 let contents = serde_json::to_string_pretty(&lock)?;
163
164 match OpenOptions::new().write(true).create_new(true).open(&path) {
167 Ok(mut f) => {
168 f.write_all(contents.as_bytes())?;
169 Ok(lock)
170 }
171 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
172 Err(IpcError::LockRace)
174 }
175 Err(e) => Err(e.into()),
176 }
177 }
178
179 pub fn release(data_dir: &Path) -> Result<(), IpcError> {
181 if let Some(lock) = Self::read(data_dir)? {
182 if lock.is_owned_by_current_process() {
183 Self::remove(data_dir)?;
184 }
185 }
186 Ok(())
187 }
188}
189
190fn current_time_ms() -> u64 {
192 SystemTime::now()
193 .duration_since(UNIX_EPOCH)
194 .unwrap_or_default()
195 .as_millis() as u64
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use tempfile::TempDir;
202
203 #[test]
204 fn test_lock_creation() {
205 let lock = DaemonLock::new(
206 1234,
207 "/repo".to_string(),
208 "actor123".to_string(),
209 "host456".to_string(),
210 "/tmp/test.sock".to_string(),
211 );
212
213 assert_eq!(lock.pid, 1234);
214 assert_eq!(lock.repo_root, "/repo");
215 assert_eq!(lock.actor_id, "actor123");
216 assert!(!lock.is_expired());
217 }
218
219 #[test]
220 fn test_lock_expiration() {
221 let mut lock = DaemonLock::new(
222 1234,
223 "/repo".to_string(),
224 "actor".to_string(),
225 "host".to_string(),
226 "/tmp/test.sock".to_string(),
227 );
228
229 lock.expires_ts = 0;
231 assert!(lock.is_expired());
232
233 lock.refresh();
235 assert!(!lock.is_expired());
236 }
237
238 #[test]
239 fn test_lock_read_write() {
240 let temp = TempDir::new().unwrap();
241 let data_dir = temp.path();
242
243 let lock = DaemonLock::new(
244 std::process::id(),
245 "/repo".to_string(),
246 "actor123".to_string(),
247 "host456".to_string(),
248 "/tmp/test.sock".to_string(),
249 );
250
251 lock.write(data_dir).unwrap();
253
254 let read_lock = DaemonLock::read(data_dir).unwrap().unwrap();
256 assert_eq!(read_lock.pid, lock.pid);
257 assert_eq!(read_lock.actor_id, lock.actor_id);
258 }
259
260 #[test]
261 fn test_lock_acquire_release() {
262 let temp = TempDir::new().unwrap();
263 let data_dir = temp.path();
264
265 let lock = DaemonLock::acquire(
267 data_dir,
268 "/repo".to_string(),
269 "actor".to_string(),
270 "host".to_string(),
271 "/tmp/test.sock".to_string(),
272 )
273 .unwrap();
274
275 assert!(lock.is_owned_by_current_process());
276
277 DaemonLock::release(data_dir).unwrap();
279
280 assert!(DaemonLock::read(data_dir).unwrap().is_none());
282 }
283
284 #[test]
285 fn test_lock_acquire_expired() {
286 let temp = TempDir::new().unwrap();
287 let data_dir = temp.path();
288
289 let mut old_lock = DaemonLock::new(
291 9999, "/repo".to_string(),
293 "actor".to_string(),
294 "host".to_string(),
295 "/tmp/old.sock".to_string(),
296 );
297 old_lock.expires_ts = 0; old_lock.write(data_dir).unwrap();
299
300 let new_lock = DaemonLock::acquire(
302 data_dir,
303 "/repo".to_string(),
304 "actor".to_string(),
305 "host".to_string(),
306 "/tmp/new.sock".to_string(),
307 )
308 .unwrap();
309
310 assert!(new_lock.is_owned_by_current_process());
311 }
312
313 #[test]
314 fn test_custom_lease() {
315 let lock = DaemonLock::new(
316 1234,
317 "/repo".to_string(),
318 "actor".to_string(),
319 "host".to_string(),
320 "/tmp/test.sock".to_string(),
321 )
322 .with_lease(60_000); assert_eq!(lock.lease_ms, 60_000);
325 }
326}