1use crate::constants::limits::MAX_RETRIES;
26use crate::constants::timeouts::DELAYS_MS;
27use crate::fsutil::sync_dir_best_effort;
28use crate::timeutil;
29use anyhow::{Context, Result, anyhow};
30use std::fs;
31use std::io::Write;
32use std::path::{Path, PathBuf};
33use std::sync::atomic::{AtomicUsize, Ordering};
34use std::thread;
35use std::time::Duration;
36
37pub(crate) const TASK_OWNER_PREFIX: &str = "owner_task_";
39
40static TASK_OWNER_COUNTER: AtomicUsize = AtomicUsize::new(0);
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum PidLiveness {
54 Running,
56 NotRunning,
58 Indeterminate,
60}
61
62impl PidLiveness {
63 pub fn is_definitely_not_running(self) -> bool {
68 matches!(self, Self::NotRunning)
69 }
70
71 pub fn is_running_or_indeterminate(self) -> bool {
76 matches!(self, Self::Running | Self::Indeterminate)
77 }
78}
79
80pub fn pid_liveness(pid: u32) -> PidLiveness {
85 match pid_is_running(pid) {
86 Some(true) => PidLiveness::Running,
87 Some(false) => PidLiveness::NotRunning,
88 None => PidLiveness::Indeterminate,
89 }
90}
91
92#[derive(Debug)]
93pub struct DirLock {
94 lock_dir: PathBuf,
95 owner_path: PathBuf,
96}
97
98impl Drop for DirLock {
99 fn drop(&mut self) {
100 if let Err(e) = cleanup_lock_dir(&self.lock_dir, &self.owner_path, false) {
104 log::warn!("Failed to clean up lock directory after retries: {}", e);
105 }
106 }
107}
108
109pub(crate) fn is_task_owner_file(name: &str) -> bool {
114 name.starts_with(TASK_OWNER_PREFIX)
115}
116
117fn is_task_sidecar_owner(owner_path: &Path) -> bool {
119 owner_path
120 .file_name()
121 .and_then(|n| n.to_str())
122 .map(is_task_owner_file)
123 .unwrap_or(false)
124}
125
126fn has_other_owner_files(lock_dir: &Path, removed_owner_path: &Path) -> Result<bool> {
130 if !lock_dir.exists() {
131 return Ok(false);
132 }
133
134 for entry in fs::read_dir(lock_dir)? {
135 let entry = entry?;
136 let path = entry.path();
137
138 if !path.is_file() {
140 continue;
141 }
142
143 if path == removed_owner_path {
145 continue;
146 }
147
148 let file_name = entry.file_name();
149 let name = file_name.to_str().unwrap_or("");
150
151 if name == "owner" || is_task_owner_file(name) {
153 return Ok(true);
154 }
155 }
156
157 Ok(false)
158}
159
160fn cleanup_lock_dir(lock_dir: &Path, owner_path: &Path, force: bool) -> Result<()> {
179 let is_task_sidecar = is_task_sidecar_owner(owner_path);
181
182 if let Err(e) = fs::remove_file(owner_path) {
184 if e.kind() != std::io::ErrorKind::NotFound {
186 log::debug!(
187 "Failed to remove owner file {}: {}",
188 owner_path.display(),
189 e
190 );
191 }
192 }
193
194 if is_task_sidecar {
198 match has_other_owner_files(lock_dir, owner_path) {
199 Ok(true) => {
200 log::debug!(
201 "Skipping directory cleanup for task lock {} - other owners remain",
202 lock_dir.display()
203 );
204 return Ok(());
205 }
206 Ok(false) => {
207 }
209 Err(e) => {
210 log::debug!(
211 "Failed to check for other owner files in {}: {}. Proceeding with cleanup...",
212 lock_dir.display(),
213 e
214 );
215 }
216 }
217 }
218
219 for attempt in 0..MAX_RETRIES {
221 match fs::remove_dir(lock_dir) {
223 Ok(()) => return Ok(()),
224 Err(e) => {
225 if e.kind() == std::io::ErrorKind::NotFound {
227 return Ok(());
228 }
229
230 if attempt == MAX_RETRIES - 1 && force {
232 log::debug!(
233 "Attempting force cleanup of lock directory {}",
234 lock_dir.display()
235 );
236 match fs::remove_dir_all(lock_dir) {
237 Ok(()) => return Ok(()),
238 Err(force_err) => {
239 return Err(anyhow::anyhow!(
240 "Failed to force remove lock directory {}: {} (original error: {})",
241 lock_dir.display(),
242 force_err,
243 e
244 ));
245 }
246 }
247 }
248
249 log::warn!(
251 "Lock directory cleanup attempt {}/{} failed for {}: {}. Retrying...",
252 attempt + 1,
253 MAX_RETRIES,
254 lock_dir.display(),
255 e
256 );
257
258 if attempt < MAX_RETRIES - 1 {
259 thread::sleep(Duration::from_millis(DELAYS_MS[attempt as usize]));
260 }
261 }
262 }
263 }
264
265 Err(anyhow::anyhow!(
266 "Failed to remove lock directory {} after {} attempts",
267 lock_dir.display(),
268 MAX_RETRIES
269 ))
270}
271
272#[derive(Debug, Clone)]
274pub struct LockOwner {
275 pub pid: u32,
277 pub started_at: String,
279 pub command: String,
281 pub label: String,
283}
284
285impl LockOwner {
286 fn render(&self) -> String {
287 format!(
288 "pid: {}\nstarted_at: {}\ncommand: {}\nlabel: {}\n",
289 self.pid, self.started_at, self.command, self.label
290 )
291 }
292}
293
294pub fn queue_lock_dir(repo_root: &Path) -> PathBuf {
295 repo_root.join(".ralph").join("lock")
296}
297
298fn is_supervising_label(label: &str) -> bool {
299 matches!(label, "run one" | "run loop")
300}
301
302pub fn is_supervising_process(lock_dir: &Path) -> Result<bool> {
306 let owner_path = lock_dir.join("owner");
307
308 let raw = match fs::read_to_string(&owner_path) {
309 Ok(raw) => raw,
310 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(false),
311 Err(err) => {
312 return Err(anyhow!(err))
313 .with_context(|| format!("read lock owner {}", owner_path.display()));
314 }
315 };
316
317 let owner = match parse_lock_owner(&raw) {
318 Some(owner) => owner,
319 None => return Ok(false),
320 };
321
322 Ok(is_supervising_label(&owner.label))
323}
324
325pub fn acquire_dir_lock(lock_dir: &Path, label: &str, force: bool) -> Result<DirLock> {
326 log::debug!(
327 "acquiring dir lock: {} (label: {})",
328 lock_dir.display(),
329 label
330 );
331 if let Some(parent) = lock_dir.parent() {
332 fs::create_dir_all(parent)
333 .with_context(|| format!("create lock parent {}", parent.display()))?;
334 }
335
336 let trimmed_label = label.trim();
337 let is_task_label = trimmed_label == "task";
338
339 match fs::create_dir(lock_dir) {
340 Ok(()) => {}
341 Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => {
342 let mut owner_unreadable = false;
343 let owner = match read_lock_owner(lock_dir) {
344 Ok(owner) => owner,
345 Err(_) => {
346 owner_unreadable = true;
347 None
348 }
349 };
350
351 let is_stale = owner
352 .as_ref()
353 .is_some_and(|o| pid_liveness(o.pid).is_definitely_not_running());
354
355 if force && is_stale {
356 if let Err(e) = fs::remove_dir_all(lock_dir) {
357 log::debug!(
358 "Failed to remove stale lock directory {}: {}",
359 lock_dir.display(),
360 e
361 );
362 }
363 return acquire_dir_lock(lock_dir, label, false);
365 }
366
367 if is_task_label
369 && owner
370 .as_ref()
371 .is_some_and(|o| is_supervising_label(&o.label))
372 {
373 } else {
375 let msg = format_lock_error(lock_dir, owner.as_ref(), is_stale, owner_unreadable);
376 return Err(anyhow!(msg));
377 }
378 }
379 Err(err) => {
380 return Err(anyhow!(err))
381 .with_context(|| format!("create lock dir {}", lock_dir.display()));
382 }
383 }
384
385 let effective_label = if trimmed_label.is_empty() {
386 "unspecified"
387 } else {
388 trimmed_label
389 };
390 let owner = LockOwner {
391 pid: std::process::id(),
392 started_at: timeutil::now_utc_rfc3339()?,
393 command: command_line(),
394 label: effective_label.to_string(),
395 };
396
397 let owner_path = if is_task_label && lock_dir.exists() {
401 let counter = TASK_OWNER_COUNTER.fetch_add(1, Ordering::SeqCst);
402 lock_dir.join(format!("owner_task_{}_{}", std::process::id(), counter))
403 } else {
404 lock_dir.join("owner")
405 };
406
407 if let Err(err) = write_lock_owner(&owner_path, &owner) {
408 if let Err(e) = fs::remove_file(&owner_path) {
409 log::debug!(
410 "Failed to remove owner file {}: {}",
411 owner_path.display(),
412 e
413 );
414 }
415
416 if let Err(e) = fs::remove_dir(lock_dir) {
419 log::debug!(
420 "Failed to remove lock directory {}: {}",
421 lock_dir.display(),
422 e
423 );
424 }
425
426 return Err(err);
427 }
428
429 Ok(DirLock {
430 lock_dir: lock_dir.to_path_buf(),
431 owner_path,
432 })
433}
434
435fn format_lock_error(
436 lock_dir: &Path,
437 owner: Option<&LockOwner>,
438 is_stale: bool,
439 owner_unreadable: bool,
440) -> String {
441 let mut msg = format!("Queue lock already held at: {}", lock_dir.display());
442 if is_stale {
443 msg.push_str(" (STALE PID)");
444 }
445 if owner_unreadable {
446 msg.push_str(" (owner metadata unreadable)");
447 }
448
449 msg.push_str("\n\nLock Holder:");
450 if let Some(owner) = owner {
451 msg.push_str(&format!(
452 "\n PID: {}\n Label: {}\n Started At: {}\n Command: {}",
453 owner.pid, owner.label, owner.started_at, owner.command
454 ));
455 } else {
456 msg.push_str("\n (owner metadata missing)");
457 }
458
459 msg.push_str("\n\nSuggested Action:");
460 if is_stale {
461 msg.push_str(&format!(
462 "\n The process that held this lock is no longer running.\n Use --force to automatically clear it, or use the built-in unlock command (unsafe if another ralph is running):\n ralph queue unlock\n Or remove the directory manually:\n rm -rf {}",
463 lock_dir.display()
464 ));
465 } else {
466 msg.push_str(&format!(
467 "\n If you are sure no other ralph process is running, use the built-in unlock command:\n ralph queue unlock\n Or remove the lock directory manually:\n rm -rf {}",
468 lock_dir.display()
469 ));
470 }
471 msg
472}
473
474fn write_lock_owner(owner_path: &Path, owner: &LockOwner) -> Result<()> {
475 log::debug!("writing lock owner: {}", owner_path.display());
476 let mut file = fs::File::create(owner_path)
477 .with_context(|| format!("create lock owner {}", owner_path.display()))?;
478 file.write_all(owner.render().as_bytes())
479 .context("write lock owner")?;
480 file.flush().context("flush lock owner")?;
481 file.sync_all().context("sync lock owner")?;
482 if let Some(parent) = owner_path.parent() {
483 sync_dir_best_effort(parent);
484 }
485 Ok(())
486}
487
488pub fn read_lock_owner(lock_dir: &Path) -> Result<Option<LockOwner>> {
493 let owner_path = lock_dir.join("owner");
494 let raw = match fs::read_to_string(&owner_path) {
495 Ok(raw) => raw,
496 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None),
497 Err(err) => {
498 return Err(anyhow!(err))
499 .with_context(|| format!("read lock owner {}", owner_path.display()));
500 }
501 };
502 Ok(parse_lock_owner(&raw))
503}
504
505fn parse_lock_owner(raw: &str) -> Option<LockOwner> {
506 let mut pid = None;
507 let mut started_at = None;
508 let mut command = None;
509 let mut label = None;
510
511 for line in raw.lines() {
512 let trimmed = line.trim();
513 if trimmed.is_empty() {
514 continue;
515 }
516 if let Some((key, value)) = trimmed.split_once(':') {
517 let value = value.trim().to_string();
518 match key.trim() {
519 "pid" => {
520 pid = value
521 .parse::<u32>()
522 .inspect_err(|e| {
523 log::debug!("Lock file has invalid pid '{}': {}", value, e)
524 })
525 .ok()
526 }
527 "started_at" => started_at = Some(value),
528 "command" => command = Some(value),
529 "label" => label = Some(value),
530 _ => {}
531 }
532 }
533 }
534
535 let pid = pid?;
536 Some(LockOwner {
537 pid,
538 started_at: started_at.unwrap_or_else(|| "unknown".to_string()),
539 command: command.unwrap_or_else(|| "unknown".to_string()),
540 label: label.unwrap_or_else(|| "unknown".to_string()),
541 })
542}
543
544#[cfg(windows)]
552fn pid_exists_via_toolhelp(pid: u32) -> Option<bool> {
553 use windows_sys::Win32::Foundation::{CloseHandle, INVALID_HANDLE_VALUE};
554 use windows_sys::Win32::System::Diagnostics::ToolHelp::{
555 CreateToolhelp32Snapshot, PROCESSENTRY32, Process32First, Process32Next, TH32CS_SNAPPROCESS,
556 };
557
558 unsafe {
563 let snapshot = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
564 if snapshot == INVALID_HANDLE_VALUE {
565 log::debug!(
566 "CreateToolhelp32Snapshot failed for PID existence check, error: {}",
567 windows_sys::Win32::Foundation::GetLastError()
568 );
569 return None;
570 }
571
572 let result = {
573 let mut entry: PROCESSENTRY32 = std::mem::zeroed();
574 entry.dwSize = std::mem::size_of::<PROCESSENTRY32>() as u32;
575
576 if Process32First(snapshot, &mut entry) == 0 {
577 log::debug!(
578 "Process32First failed, error: {}",
579 windows_sys::Win32::Foundation::GetLastError()
580 );
581 None
582 } else {
583 let mut found = false;
584 loop {
585 if entry.th32ProcessID == pid {
586 found = true;
587 break;
588 }
589 if Process32Next(snapshot, &mut entry) == 0 {
590 break;
591 }
592 }
593 Some(found)
594 }
595 };
596
597 CloseHandle(snapshot);
598 result
599 }
600}
601
602pub fn pid_is_running(pid: u32) -> Option<bool> {
609 #[cfg(unix)]
610 {
611 let result = unsafe { libc::kill(pid as i32, 0) };
616 if result == 0 {
617 return Some(true);
618 }
619 let err = std::io::Error::last_os_error();
620 if err.raw_os_error() == Some(libc::ESRCH) {
621 return Some(false);
622 }
623 None
624 }
625
626 #[cfg(windows)]
627 {
628 use windows_sys::Win32::Foundation::{
629 CloseHandle, ERROR_ACCESS_DENIED, ERROR_INVALID_PARAMETER,
630 };
631 use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_INFORMATION};
632
633 unsafe {
639 let handle = OpenProcess(PROCESS_QUERY_INFORMATION, 0, pid);
640 if handle != 0 {
641 CloseHandle(handle);
643 Some(true)
644 } else {
645 let err = windows_sys::Win32::Foundation::GetLastError();
647 if err == ERROR_INVALID_PARAMETER {
648 Some(false)
650 } else if err == ERROR_ACCESS_DENIED {
651 log::debug!(
654 "OpenProcess({}) failed with ERROR_ACCESS_DENIED, falling back to ToolHelp enumeration",
655 pid
656 );
657 pid_exists_via_toolhelp(pid)
658 } else {
659 log::debug!("OpenProcess({}) failed with unexpected error: {}", pid, err);
661 None
662 }
663 }
664 }
665 }
666
667 #[cfg(not(any(unix, windows)))]
668 {
669 let _ = pid;
672 None
673 }
674}
675
676fn command_line() -> String {
677 let args: Vec<String> = std::env::args().collect();
678 let joined = args.join(" ");
679 let trimmed = joined.trim();
680 if trimmed.is_empty() {
681 "unknown".to_string()
682 } else {
683 trimmed.to_string()
684 }
685}
686
687#[cfg(test)]
688mod tests {
689 use super::*;
690
691 #[test]
694 fn test_pid_is_running_current_process() {
695 let current_pid = std::process::id();
696 let result = pid_is_running(current_pid);
697 assert_eq!(
699 result,
700 Some(true),
701 "Current process should be detected as running"
702 );
703 }
704
705 #[test]
708 fn test_pid_is_running_nonexistent() {
709 let result = pid_is_running(0xFFFFFFFE);
712 assert_ne!(
714 result,
715 Some(true),
716 "Non-existent PID should not return Some(true)"
717 );
718 }
719
720 #[test]
724 fn test_pid_is_running_system_idle() {
725 let result = pid_is_running(0);
726 if result == Some(false) {
729 panic!("PID 0 should not be reported as not running");
730 }
731 }
732
733 #[test]
735 fn test_is_task_owner_file() {
736 assert!(is_task_owner_file("owner_task_1234"));
737 assert!(is_task_owner_file("owner_task_1234_0"));
738 assert!(is_task_owner_file("owner_task_1234_42"));
739 assert!(!is_task_owner_file("owner"));
740 assert!(!is_task_owner_file("owner_other"));
741 assert!(!is_task_owner_file("owner_task"));
742 assert!(!is_task_owner_file(""));
743 assert!(!is_task_owner_file("task_owner_1234"));
744 }
745
746 #[test]
748 fn test_pid_liveness_helpers() {
749 assert!(PidLiveness::NotRunning.is_definitely_not_running());
750 assert!(!PidLiveness::Running.is_definitely_not_running());
751 assert!(!PidLiveness::Indeterminate.is_definitely_not_running());
752
753 assert!(PidLiveness::Running.is_running_or_indeterminate());
754 assert!(PidLiveness::Indeterminate.is_running_or_indeterminate());
755 assert!(!PidLiveness::NotRunning.is_running_or_indeterminate());
756 }
757
758 #[test]
760 fn test_pid_liveness_wrapper() {
761 let current_pid = std::process::id();
762 assert_eq!(pid_liveness(current_pid), PidLiveness::Running);
763
764 let result = pid_liveness(0xFFFFFFFE);
766 assert!(matches!(
767 result,
768 PidLiveness::NotRunning | PidLiveness::Indeterminate
769 ));
770 }
771
772 #[test]
774 fn test_stale_lock_detection_is_conservative() {
775 assert!(!PidLiveness::Running.is_definitely_not_running());
777 assert!(!PidLiveness::Indeterminate.is_definitely_not_running());
778 assert!(PidLiveness::NotRunning.is_definitely_not_running());
779 }
780
781 #[cfg(windows)]
783 mod windows_tests {
784 use super::*;
785
786 #[test]
788 fn test_toolhelp_finds_current_process() {
789 let current_pid = std::process::id();
790 let result = pid_exists_via_toolhelp(current_pid);
791 assert_eq!(
792 result,
793 Some(true),
794 "ToolHelp should find current process PID {}",
795 current_pid
796 );
797 }
798
799 #[test]
801 fn test_toolhelp_nonexistent_pid() {
802 let result = pid_exists_via_toolhelp(0xFFFFFFFE);
803 assert_eq!(
804 result,
805 Some(false),
806 "ToolHelp should not find non-existent PID"
807 );
808 }
809
810 #[test]
814 fn test_access_denied_fallback() {
815 let result = pid_is_running(4);
818 assert_eq!(
820 result,
821 Some(true),
822 "System process (PID 4) should be detected as running via ToolHelp fallback"
823 );
824 }
825 }
826}