1use std::fs::File;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::Duration;
11
12use anyhow::{Context, Result, bail};
13use serde::{Deserialize, Serialize};
14use tracing::{error, info, warn};
15
16use super::{config, daemon, events, hierarchy, inbox, layout, team_config_path};
17use crate::tmux;
18
19pub(crate) const LOG_ROTATION_BYTES: u64 = 5 * 1024 * 1024;
20const LOG_ROTATION_KEEP: usize = 3;
21pub(super) const DAEMON_SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
22const DAEMON_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(100);
23const WATCHDOG_POLL_INTERVAL: Duration = Duration::from_millis(200);
24const WATCHDOG_INITIAL_BACKOFF_SECS: u64 = 1;
25const WATCHDOG_MAX_BACKOFF_SECS: u64 = 30;
26const WATCHDOG_CIRCUIT_BREAKER_THRESHOLD: usize = 5;
27const WATCHDOG_CIRCUIT_BREAKER_WINDOW_SECS: u64 = 60;
28const DAEMON_CHILD_PID_FILE: &str = "daemon-child.pid";
29
30#[cfg(unix)]
31static WATCHDOG_SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
32
33fn daemon_pid_path(project_root: &Path) -> PathBuf {
35 project_root.join(".batty").join("daemon.pid")
36}
37
38pub(crate) fn daemon_log_path(project_root: &Path) -> PathBuf {
40 project_root.join(".batty").join("daemon.log")
41}
42
43fn rotated_log_path(path: &Path, generation: usize) -> PathBuf {
44 PathBuf::from(format!("{}.{}", path.display(), generation))
45}
46
47pub(crate) fn rotate_log_if_needed(path: &Path) -> Result<()> {
48 let len = match std::fs::metadata(path) {
49 Ok(metadata) => metadata.len(),
50 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
51 Err(error) => {
52 return Err(error).with_context(|| format!("failed to stat {}", path.display()));
53 }
54 };
55
56 if len <= LOG_ROTATION_BYTES {
57 return Ok(());
58 }
59
60 let oldest = rotated_log_path(path, LOG_ROTATION_KEEP);
61 if oldest.exists() {
62 std::fs::remove_file(&oldest)
63 .with_context(|| format!("failed to remove {}", oldest.display()))?;
64 }
65
66 for generation in (1..LOG_ROTATION_KEEP).rev() {
67 let source = rotated_log_path(path, generation);
68 if !source.exists() {
69 continue;
70 }
71 let destination = rotated_log_path(path, generation + 1);
72 std::fs::rename(&source, &destination).with_context(|| {
73 format!(
74 "failed to rotate {} to {}",
75 source.display(),
76 destination.display()
77 )
78 })?;
79 }
80
81 let rotated = rotated_log_path(path, 1);
82 std::fs::rename(path, &rotated).with_context(|| {
83 format!(
84 "failed to rotate {} to {}",
85 path.display(),
86 rotated.display()
87 )
88 })?;
89 Ok(())
90}
91
92pub(crate) fn open_log_for_append(path: &Path) -> Result<File> {
93 if let Some(parent) = path.parent() {
94 std::fs::create_dir_all(parent)?;
95 }
96 rotate_log_if_needed(path)?;
97 File::options()
98 .append(true)
99 .create(true)
100 .open(path)
101 .with_context(|| format!("failed to open log file: {}", path.display()))
102}
103
104fn daemon_spawn_args(root_str: &str, resume: bool) -> Vec<String> {
105 let mut args = vec![
106 "-v".to_string(),
107 "daemon".to_string(),
108 "--project-root".to_string(),
109 root_str.to_string(),
110 ];
111 if resume {
112 args.push("--resume".to_string());
113 }
114 args
115}
116
117fn watchdog_spawn_args(root_str: &str, resume: bool) -> Vec<String> {
118 let mut args = vec![
119 "-v".to_string(),
120 "watchdog".to_string(),
121 "--project-root".to_string(),
122 root_str.to_string(),
123 ];
124 if resume {
125 args.push("--resume".to_string());
126 }
127 args
128}
129
130pub(crate) fn daemon_state_path(project_root: &Path) -> PathBuf {
131 project_root.join(".batty").join("daemon-state.json")
132}
133
134pub(crate) fn watchdog_state_path(project_root: &Path) -> PathBuf {
135 project_root.join(".batty").join("watchdog-state.json")
136}
137
138fn daemon_child_pid_path(project_root: &Path) -> PathBuf {
139 project_root.join(".batty").join(DAEMON_CHILD_PID_FILE)
140}
141
142#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
143pub(crate) struct PersistedWatchdogState {
144 #[serde(default)]
145 pub restart_count: u32,
146 #[serde(default)]
147 pub crash_timestamps: Vec<u64>,
148 #[serde(default)]
149 pub circuit_breaker_tripped: bool,
150 #[serde(default)]
151 pub child_pid: Option<u32>,
152 #[serde(default)]
153 pub current_backoff_secs: Option<u64>,
154 #[serde(default)]
155 pub last_exit_reason: Option<String>,
156}
157
158fn load_watchdog_state(project_root: &Path) -> Result<PersistedWatchdogState> {
159 let path = watchdog_state_path(project_root);
160 if !path.exists() {
161 return Ok(PersistedWatchdogState::default());
162 }
163 let content = std::fs::read_to_string(&path)
164 .with_context(|| format!("failed to read {}", path.display()))?;
165 serde_json::from_str(&content).with_context(|| format!("failed to parse {}", path.display()))
166}
167
168fn save_watchdog_state(project_root: &Path, state: &PersistedWatchdogState) -> Result<()> {
169 let path = watchdog_state_path(project_root);
170 if let Some(parent) = path.parent() {
171 std::fs::create_dir_all(parent)
172 .with_context(|| format!("failed to create {}", parent.display()))?;
173 }
174 let content =
175 serde_json::to_string_pretty(state).context("failed to serialize watchdog state")?;
176 std::fs::write(&path, content).with_context(|| format!("failed to write {}", path.display()))
177}
178
179fn spawn_detached_process(
184 project_root: &Path,
185 args: &[String],
186 pid_path: &Path,
187 process_name: &str,
188) -> Result<u32> {
189 use std::process::{Command, Stdio};
190
191 let log_path = daemon_log_path(project_root);
192
193 if let Some(parent) = log_path.parent() {
195 std::fs::create_dir_all(parent)?;
196 }
197
198 let log_file = open_log_for_append(&log_path)?;
199 let log_err = log_file
200 .try_clone()
201 .context("failed to clone log file handle")?;
202
203 let exe = std::env::current_exe().context("failed to resolve current executable")?;
204
205 let mut cmd = Command::new(exe);
206 cmd.args(args)
207 .stdin(Stdio::null())
208 .stdout(log_file)
209 .stderr(log_err);
210
211 #[cfg(unix)]
213 {
214 use std::os::unix::process::CommandExt;
215 cmd.process_group(0);
216 }
217
218 let mut child = cmd.spawn().context("failed to spawn daemon process")?;
219 let pid = child.id();
220
221 std::thread::sleep(std::time::Duration::from_millis(500));
224 match child.try_wait() {
225 Ok(Some(status)) => {
226 let _ = std::fs::remove_file(pid_path);
227 let tail = std::fs::read_to_string(&log_path).ok().and_then(|s| {
229 let lines: Vec<&str> = s.lines().collect();
230 let start = lines.len().saturating_sub(5);
231 let tail = lines[start..].join("\n");
232 if tail.trim().is_empty() {
233 None
234 } else {
235 Some(tail)
236 }
237 });
238 match tail {
239 Some(detail) => bail!(
240 "{process_name} process exited immediately with {status}\n\n\
241 {detail}\n\n\
242 see full log: {log}",
243 log = log_path.display(),
244 ),
245 None => bail!(
246 "{process_name} process exited immediately with {status}; \
247 see {log} for details",
248 log = log_path.display(),
249 ),
250 }
251 }
252 Ok(None) => {} Err(e) => {
254 warn!(pid, error = %e, "failed to check daemon process status");
255 }
256 }
257
258 std::fs::write(pid_path, pid.to_string())
259 .with_context(|| format!("failed to write PID file: {}", pid_path.display()))?;
260
261 info!(pid, log = %log_path.display(), process = process_name, "background process spawned");
262 Ok(pid)
263}
264
265fn spawn_watchdog(project_root: &Path, resume: bool) -> Result<u32> {
266 let root_str = project_root
267 .canonicalize()
268 .unwrap_or_else(|_| project_root.to_path_buf())
269 .to_string_lossy()
270 .to_string();
271 let args = watchdog_spawn_args(&root_str, resume);
272 spawn_detached_process(
273 project_root,
274 &args,
275 &daemon_pid_path(project_root),
276 "watchdog",
277 )
278}
279
280fn spawn_daemon_child(project_root: &Path, resume: bool) -> Result<std::process::Child> {
281 use std::process::Command;
282
283 let exe = std::env::current_exe().context("failed to resolve current executable")?;
284 let root_str = project_root
285 .canonicalize()
286 .unwrap_or_else(|_| project_root.to_path_buf())
287 .to_string_lossy()
288 .to_string();
289
290 let mut cmd = Command::new(exe);
291 cmd.args(daemon_spawn_args(&root_str, resume));
292 cmd.spawn().context("failed to spawn daemon child")
293}
294
295#[cfg(unix)]
296extern "C" fn handle_watchdog_shutdown_signal(_signal: libc::c_int) {
297 WATCHDOG_SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
298}
299
300#[cfg(unix)]
301fn install_watchdog_signal_handlers() -> Result<()> {
302 unsafe {
303 libc::signal(
304 libc::SIGTERM,
305 handle_watchdog_shutdown_signal as *const () as libc::sighandler_t,
306 );
307 libc::signal(
308 libc::SIGINT,
309 handle_watchdog_shutdown_signal as *const () as libc::sighandler_t,
310 );
311 libc::signal(
312 libc::SIGHUP,
313 handle_watchdog_shutdown_signal as *const () as libc::sighandler_t,
314 );
315 }
316 WATCHDOG_SHUTDOWN_REQUESTED.store(false, Ordering::SeqCst);
317 Ok(())
318}
319
320#[cfg(not(unix))]
321fn install_watchdog_signal_handlers() -> Result<()> {
322 Ok(())
323}
324
325#[cfg(unix)]
326fn watchdog_shutdown_requested() -> bool {
327 WATCHDOG_SHUTDOWN_REQUESTED.load(Ordering::SeqCst)
328}
329
330#[cfg(not(unix))]
331fn watchdog_shutdown_requested() -> bool {
332 false
333}
334
335fn record_watchdog_crash(
336 project_root: &Path,
337 state: &mut PersistedWatchdogState,
338 reason: String,
339) -> Result<Option<u64>> {
340 let now = super::now_unix();
341 state.restart_count += 1;
342 state.last_exit_reason = Some(reason);
343 state.child_pid = None;
344 state
345 .crash_timestamps
346 .retain(|ts| now.saturating_sub(*ts) < WATCHDOG_CIRCUIT_BREAKER_WINDOW_SECS);
347 state.crash_timestamps.push(now);
348
349 if state.crash_timestamps.len() >= WATCHDOG_CIRCUIT_BREAKER_THRESHOLD {
350 state.circuit_breaker_tripped = true;
351 state.current_backoff_secs = None;
352 save_watchdog_state(project_root, state)?;
353 return Ok(None);
354 }
355
356 let exponent = state.crash_timestamps.len().saturating_sub(1) as u32;
357 let backoff_secs = (WATCHDOG_INITIAL_BACKOFF_SECS
358 .saturating_mul(2u64.saturating_pow(exponent)))
359 .min(WATCHDOG_MAX_BACKOFF_SECS);
360 state.current_backoff_secs = Some(backoff_secs);
361 save_watchdog_state(project_root, state)?;
362 Ok(Some(backoff_secs))
363}
364
365fn clear_watchdog_child_pid(project_root: &Path, state: &mut PersistedWatchdogState) -> Result<()> {
366 state.child_pid = None;
367 let _ = std::fs::remove_file(daemon_child_pid_path(project_root));
368 save_watchdog_state(project_root, state)
369}
370
371fn terminate_daemon_child(child: &mut std::process::Child) {
372 #[cfg(unix)]
373 {
374 let _ = send_unix_signal(child.id(), libc::SIGTERM);
375 }
376
377 let deadline = std::time::Instant::now() + DAEMON_SHUTDOWN_GRACE_PERIOD;
378 loop {
379 match child.try_wait() {
380 Ok(Some(_)) => return,
381 Ok(None) if std::time::Instant::now() < deadline => {
382 std::thread::sleep(DAEMON_SHUTDOWN_POLL_INTERVAL);
383 }
384 Ok(None) | Err(_) => {
385 #[cfg(unix)]
386 {
387 let _ = send_unix_signal(child.id(), libc::SIGKILL);
388 }
389 let _ = child.wait();
390 return;
391 }
392 }
393 }
394}
395
396fn read_daemon_pid(project_root: &Path) -> Option<u32> {
398 let pid_path = daemon_pid_path(project_root);
399 let pid_str = std::fs::read_to_string(pid_path).ok()?;
400 pid_str.trim().parse::<u32>().ok()
401}
402
403#[cfg(unix)]
404fn send_unix_signal(pid: u32, signal: libc::c_int) -> bool {
405 let status = unsafe { libc::kill(pid as libc::pid_t, signal) };
406 if status == 0 {
407 true
408 } else {
409 let error = std::io::Error::last_os_error();
410 warn!(pid, signal, error = %error, "failed to signal daemon");
411 false
412 }
413}
414
415#[cfg(not(unix))]
416fn send_unix_signal(_pid: u32, _signal: i32) -> bool {
417 false
418}
419
420#[cfg(unix)]
421fn daemon_process_exists(pid: u32) -> bool {
422 let status = unsafe { libc::kill(pid as libc::pid_t, 0) };
423 if status == 0 {
424 true
425 } else {
426 !matches!(
427 std::io::Error::last_os_error().raw_os_error(),
428 Some(libc::ESRCH)
429 )
430 }
431}
432
433#[cfg(not(unix))]
434fn daemon_process_exists(_pid: u32) -> bool {
435 false
436}
437
438fn wait_for_graceful_daemon_shutdown(
439 project_root: &Path,
440 pid: u32,
441 previous_saved_at: Option<u64>,
442 timeout: Duration,
443) -> bool {
444 let deadline = std::time::Instant::now() + timeout;
445 loop {
446 let clean_snapshot = daemon_state_indicates_clean_shutdown(project_root, previous_saved_at);
447 if clean_snapshot {
448 let _ = std::fs::remove_file(daemon_pid_path(project_root));
449 return true;
450 }
451 let running = daemon_process_exists(pid);
452 if !running {
453 let _ = std::fs::remove_file(daemon_pid_path(project_root));
454 return false;
455 }
456 if std::time::Instant::now() >= deadline {
457 return false;
458 }
459 std::thread::sleep(DAEMON_SHUTDOWN_POLL_INTERVAL);
460 }
461}
462
463pub(super) fn request_graceful_daemon_shutdown(project_root: &Path, timeout: Duration) -> bool {
464 let Some(pid) = read_daemon_pid(project_root) else {
465 return true;
466 };
467
468 let previous_saved_at = read_daemon_state_probe(project_root).and_then(|state| state.saved_at);
469 #[cfg(unix)]
470 {
471 if !send_unix_signal(pid, libc::SIGTERM) {
472 return false;
473 }
474 info!(pid, "sent SIGTERM to daemon");
475 }
476 #[cfg(not(unix))]
477 {
478 warn!(
479 pid,
480 "graceful daemon shutdown is not supported on this platform"
481 );
482 return false;
483 }
484
485 wait_for_graceful_daemon_shutdown(project_root, pid, previous_saved_at, timeout)
486}
487
488pub(super) fn force_kill_daemon(project_root: &Path) {
489 let Some(pid) = read_daemon_pid(project_root) else {
490 return;
491 };
492
493 #[cfg(unix)]
494 {
495 if send_unix_signal(pid, libc::SIGKILL) {
496 info!(pid, "sent SIGKILL to daemon");
497 }
498 }
499 #[cfg(not(unix))]
500 {
501 warn!(pid, "cannot force-kill daemon on this platform");
502 }
503
504 let _ = std::fs::remove_file(daemon_pid_path(project_root));
505}
506
507pub fn start_team(project_root: &Path, attach: bool) -> Result<String> {
512 let config_path = team_config_path(project_root);
513 if !config_path.exists() {
514 bail!(
515 "no team config found at {}; run `batty init` first",
516 config_path.display()
517 );
518 }
519
520 let team_config = config::TeamConfig::load(&config_path)?;
521 team_config.validate()?;
522
523 let members = hierarchy::resolve_hierarchy(&team_config)?;
524 let session = format!("batty-{}", team_config.name);
525
526 if tmux::session_exists(&session) {
527 bail!("session '{session}' already exists; use `batty attach` or `batty stop` first");
528 }
529
530 layout::build_layout(
531 &session,
532 &members,
533 &team_config.layout,
534 project_root,
535 team_config.workflow_mode,
536 team_config.orchestrator_enabled(),
537 team_config.orchestrator_position,
538 )?;
539
540 let inboxes = inbox::inboxes_root(project_root);
542 for member in &members {
543 inbox::init_inbox(&inboxes, &member.name)?;
544 }
545
546 let marker = resume_marker_path(project_root);
548 let resume = marker.exists() || should_resume_from_daemon_state(project_root);
549 if resume {
550 if marker.exists() {
551 std::fs::remove_file(&marker).ok();
553 }
554 info!("resuming agent sessions from previous run");
555 }
556
557 info!(session = %session, members = members.len(), resume, "team session started");
558
559 let pid = spawn_watchdog(project_root, resume)?;
562 info!(pid, "watchdog process launched");
563
564 std::thread::sleep(std::time::Duration::from_secs(2));
566
567 if attach {
568 tmux::attach(&session)?;
569 }
570
571 Ok(session)
572}
573
574pub fn run_daemon(project_root: &Path, resume: bool) -> Result<()> {
578 let config_path = team_config_path(project_root);
579 if !config_path.exists() {
580 bail!(
581 "no team config found at {}; run `batty init` first",
582 config_path.display()
583 );
584 }
585
586 let team_config = config::TeamConfig::load(&config_path)?;
587 let members = hierarchy::resolve_hierarchy(&team_config)?;
588 let session = format!("batty-{}", team_config.name);
589
590 for _ in 0..30 {
592 if tmux::session_exists(&session) {
593 break;
594 }
595 std::thread::sleep(std::time::Duration::from_millis(200));
596 }
597
598 if !tmux::session_exists(&session) {
599 bail!("tmux session '{session}' not found — did `batty start` create it?");
600 }
601
602 let mut pane_map = std::collections::HashMap::new();
604 for member in &members {
605 if let Some(pane_id) = find_pane_for_member(&session, &member.name) {
607 pane_map.insert(member.name.clone(), pane_id);
608 }
609 }
610
611 let daemon_config = daemon::DaemonConfig {
612 project_root: project_root.to_path_buf(),
613 team_config,
614 session,
615 members,
616 pane_map,
617 };
618
619 let events_path = project_root
620 .join(".batty")
621 .join("team_config")
622 .join("events.jsonl");
623
624 let mut d = daemon::TeamDaemon::new(daemon_config)?;
625
626 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| d.run(resume)));
628
629 match result {
630 Ok(Ok(())) => Ok(()),
631 Ok(Err(e)) => {
632 error!(error = %e, "daemon exited with error");
633 eprintln!("daemon exited with error: {e:#}");
634 if let Ok(mut sink) = events::EventSink::new(&events_path) {
636 let _ = sink.emit(events::TeamEvent::daemon_stopped_with_reason(
637 &format!("error: {e:#}"),
638 0,
639 ));
640 }
641 Err(e)
642 }
643 Err(panic_payload) => {
644 let reason = match panic_payload.downcast_ref::<&str>() {
645 Some(s) => s.to_string(),
646 None => match panic_payload.downcast_ref::<String>() {
647 Some(s) => s.clone(),
648 None => "unknown panic".to_string(),
649 },
650 };
651 error!(reason = %reason, "daemon panicked");
652 eprintln!("daemon panicked: {reason}");
653 if let Ok(mut sink) = events::EventSink::new(&events_path) {
655 let _ = sink.emit(events::TeamEvent::daemon_panic(&reason));
656 }
657 std::panic::resume_unwind(panic_payload);
658 }
659 }
660}
661
662pub fn run_watchdog(project_root: &Path, resume: bool) -> Result<()> {
663 install_watchdog_signal_handlers()?;
664
665 let mut state = load_watchdog_state(project_root).unwrap_or_default();
666 state.circuit_breaker_tripped = false;
667 state.current_backoff_secs = None;
668 state.last_exit_reason = None;
669 state.child_pid = None;
670 save_watchdog_state(project_root, &state)?;
671
672 let mut resume_on_launch = resume;
673
674 loop {
675 if watchdog_shutdown_requested() {
676 let _ = std::fs::remove_file(daemon_pid_path(project_root));
677 let _ = std::fs::remove_file(daemon_child_pid_path(project_root));
678 state.child_pid = None;
679 state.current_backoff_secs = None;
680 save_watchdog_state(project_root, &state)?;
681 return Ok(());
682 }
683
684 let mut child = spawn_daemon_child(project_root, resume_on_launch)?;
685 resume_on_launch = true;
686
687 state.child_pid = Some(child.id());
688 state.current_backoff_secs = None;
689 save_watchdog_state(project_root, &state)?;
690 std::fs::write(daemon_child_pid_path(project_root), child.id().to_string()).with_context(
691 || {
692 format!(
693 "failed to write child PID file: {}",
694 daemon_child_pid_path(project_root).display()
695 )
696 },
697 )?;
698
699 loop {
700 if watchdog_shutdown_requested() {
701 terminate_daemon_child(&mut child);
702 let _ = std::fs::remove_file(daemon_pid_path(project_root));
703 clear_watchdog_child_pid(project_root, &mut state)?;
704 return Ok(());
705 }
706
707 match child.try_wait() {
708 Ok(Some(exit_status)) => {
709 clear_watchdog_child_pid(project_root, &mut state)?;
710 let reason = if let Some(code) = exit_status.code() {
711 format!("daemon exited with status {code}")
712 } else {
713 "daemon exited from signal".to_string()
714 };
715 if let Some(backoff_secs) =
716 record_watchdog_crash(project_root, &mut state, reason.clone())?
717 {
718 warn!(backoff_secs, reason = %reason, "daemon crashed; watchdog restarting with backoff");
719 std::thread::sleep(Duration::from_secs(backoff_secs));
720 break;
721 }
722
723 warn!(
724 reason = %reason,
725 threshold = WATCHDOG_CIRCUIT_BREAKER_THRESHOLD,
726 window_secs = WATCHDOG_CIRCUIT_BREAKER_WINDOW_SECS,
727 "watchdog circuit breaker tripped; daemon will not be restarted"
728 );
729 let _ = std::fs::remove_file(daemon_pid_path(project_root));
730 return Ok(());
731 }
732 Ok(None) => std::thread::sleep(WATCHDOG_POLL_INTERVAL),
733 Err(error) => {
734 clear_watchdog_child_pid(project_root, &mut state)?;
735 let reason = format!("failed to poll daemon child: {error}");
736 if let Some(backoff_secs) =
737 record_watchdog_crash(project_root, &mut state, reason.clone())?
738 {
739 warn!(backoff_secs, reason = %reason, "watchdog poll failed; retrying daemon launch");
740 std::thread::sleep(Duration::from_secs(backoff_secs));
741 break;
742 }
743 warn!(
744 reason = %reason,
745 threshold = WATCHDOG_CIRCUIT_BREAKER_THRESHOLD,
746 window_secs = WATCHDOG_CIRCUIT_BREAKER_WINDOW_SECS,
747 "watchdog circuit breaker tripped after daemon poll failures"
748 );
749 let _ = std::fs::remove_file(daemon_pid_path(project_root));
750 return Ok(());
751 }
752 }
753 }
754 }
755}
756
757fn find_pane_for_member(session: &str, member_name: &str) -> Option<String> {
759 let output = std::process::Command::new("tmux")
760 .args([
761 "list-panes",
762 "-t",
763 session,
764 "-F",
765 "#{pane_id} #{@batty_role}",
766 ])
767 .output()
768 .ok()?;
769
770 if !output.status.success() {
771 return None;
772 }
773
774 let stdout = String::from_utf8_lossy(&output.stdout);
775 for line in stdout.lines() {
776 let parts: Vec<&str> = line.splitn(2, ' ').collect();
777 if parts.len() == 2 && parts[1] == member_name {
778 return Some(parts[0].to_string());
779 }
780 }
781 None
782}
783
784pub(super) fn resume_marker_path(project_root: &Path) -> PathBuf {
786 project_root.join(".batty").join("resume")
787}
788
789#[derive(Debug, Deserialize)]
790struct DaemonStateResumeProbe {
791 #[serde(default)]
792 clean_shutdown: bool,
793 #[serde(default)]
794 saved_at: Option<u64>,
795}
796
797fn read_daemon_state_probe(project_root: &Path) -> Option<DaemonStateResumeProbe> {
798 let path = daemon_state_path(project_root);
799 let content = std::fs::read_to_string(&path).ok()?;
800
801 match serde_json::from_str::<DaemonStateResumeProbe>(&content) {
802 Ok(state) => Some(state),
803 Err(error) => {
804 warn!(
805 path = %path.display(),
806 error = %error,
807 "failed to parse daemon state while probing for resume"
808 );
809 None
810 }
811 }
812}
813
814fn daemon_state_indicates_clean_shutdown(
815 project_root: &Path,
816 previous_saved_at: Option<u64>,
817) -> bool {
818 let Some(state) = read_daemon_state_probe(project_root) else {
819 return false;
820 };
821
822 state.clean_shutdown
823 && match (state.saved_at, previous_saved_at) {
824 (Some(saved_at), Some(previous_saved_at)) => saved_at > previous_saved_at,
825 (Some(_), None) => true,
826 (None, Some(_)) => false,
827 (None, None) => true,
828 }
829}
830
831fn should_resume_from_daemon_state(project_root: &Path) -> bool {
832 read_daemon_state_probe(project_root)
833 .map(|state| !state.clean_shutdown)
834 .unwrap_or(false)
835}
836
837#[cfg(test)]
838mod tests {
839 use super::*;
840 use serial_test::serial;
841
842 #[test]
843 fn daemon_state_probe_requests_resume_after_unclean_shutdown() {
844 let tmp = tempfile::tempdir().unwrap();
845 let path = daemon_state_path(tmp.path());
846 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
847 std::fs::write(&path, r#"{"clean_shutdown":false}"#).unwrap();
848
849 assert!(should_resume_from_daemon_state(tmp.path()));
850 }
851
852 #[test]
853 fn daemon_state_probe_ignores_clean_shutdown() {
854 let tmp = tempfile::tempdir().unwrap();
855 let path = daemon_state_path(tmp.path());
856 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
857 std::fs::write(&path, r#"{"clean_shutdown":true}"#).unwrap();
858
859 assert!(!should_resume_from_daemon_state(tmp.path()));
860 }
861
862 #[cfg(unix)]
863 fn write_daemon_script(script_path: &Path, body: &str) {
864 std::fs::write(script_path, body).unwrap();
865 use std::os::unix::fs::PermissionsExt;
866 std::fs::set_permissions(script_path, std::fs::Permissions::from_mode(0o755)).unwrap();
867 }
868
869 #[cfg(unix)]
870 #[test]
871 #[serial]
872 fn graceful_daemon_shutdown_waits_for_clean_snapshot() {
873 let tmp = tempfile::tempdir().unwrap();
874 let state_path = daemon_state_path(tmp.path());
875 let state_dir = state_path.parent().unwrap();
876 std::fs::create_dir_all(state_dir).unwrap();
877 std::fs::write(&state_path, r#"{"clean_shutdown":false,"saved_at":1}"#).unwrap();
878
879 let state_path_for_thread = state_path.clone();
880 let state_dir_for_thread = state_dir.to_path_buf();
881 let writer = std::thread::spawn(move || {
882 std::thread::sleep(Duration::from_millis(200));
883 std::fs::create_dir_all(&state_dir_for_thread).unwrap();
884 std::fs::write(
885 &state_path_for_thread,
886 r#"{"clean_shutdown":true,"saved_at":2}"#,
887 )
888 .unwrap();
889 });
890
891 assert!(wait_for_graceful_daemon_shutdown(
892 tmp.path(),
893 std::process::id(),
894 Some(1),
895 Duration::from_secs(2)
896 ));
897
898 writer.join().unwrap();
899 assert!(daemon_state_indicates_clean_shutdown(tmp.path(), Some(1)));
900 }
901
902 #[cfg(unix)]
903 #[test]
904 #[serial]
905 fn graceful_daemon_shutdown_times_out_before_force_kill_fallback() {
906 let tmp = tempfile::tempdir().unwrap();
907 let script_path = tmp.path().join("stubborn-daemon.sh");
908 write_daemon_script(
909 &script_path,
910 "#!/bin/sh\ntrap '' TERM\nwhile :; do :; done\n",
911 );
912
913 let mut child = std::process::Command::new(&script_path).spawn().unwrap();
914 std::fs::create_dir_all(tmp.path().join(".batty")).unwrap();
915 std::fs::write(daemon_pid_path(tmp.path()), child.id().to_string()).unwrap();
916 std::thread::sleep(Duration::from_millis(200));
917
918 assert!(!request_graceful_daemon_shutdown(
919 tmp.path(),
920 Duration::from_millis(300)
921 ));
922 assert!(daemon_process_exists(child.id()));
923
924 force_kill_daemon(tmp.path());
925 let _ = child.wait().unwrap();
926 assert!(!daemon_pid_path(tmp.path()).exists());
927 }
928
929 #[test]
930 fn test_rotate_log_shifts_files() {
931 let tmp = tempfile::tempdir().unwrap();
932 let log_path = daemon_log_path(tmp.path());
933 std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
934 std::fs::write(&log_path, b"current").unwrap();
935 std::fs::write(rotated_log_path(&log_path, 1), b"older-1").unwrap();
936 std::fs::write(rotated_log_path(&log_path, 2), b"older-2").unwrap();
937 std::fs::OpenOptions::new()
938 .write(true)
939 .open(&log_path)
940 .unwrap()
941 .set_len(LOG_ROTATION_BYTES + 1)
942 .unwrap();
943
944 rotate_log_if_needed(&log_path).unwrap();
945
946 assert!(!log_path.exists());
947 assert_eq!(
948 std::fs::read(rotated_log_path(&log_path, 1)).unwrap().len() as u64,
949 LOG_ROTATION_BYTES + 1
950 );
951 assert_eq!(
952 std::fs::read_to_string(rotated_log_path(&log_path, 2)).unwrap(),
953 "older-1"
954 );
955 assert_eq!(
956 std::fs::read_to_string(rotated_log_path(&log_path, 3)).unwrap(),
957 "older-2"
958 );
959 }
960
961 #[test]
962 fn test_rotate_log_keeps_max_3() {
963 let tmp = tempfile::tempdir().unwrap();
964 let log_path = crate::team::orchestrator_log_path(tmp.path());
965 std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
966 std::fs::write(&log_path, b"current").unwrap();
967 std::fs::write(rotated_log_path(&log_path, 1), b"older-1").unwrap();
968 std::fs::write(rotated_log_path(&log_path, 2), b"older-2").unwrap();
969 std::fs::write(rotated_log_path(&log_path, 3), b"older-3").unwrap();
970 std::fs::OpenOptions::new()
971 .write(true)
972 .open(&log_path)
973 .unwrap()
974 .set_len(LOG_ROTATION_BYTES + 1)
975 .unwrap();
976
977 rotate_log_if_needed(&log_path).unwrap();
978
979 assert_eq!(
980 std::fs::read(rotated_log_path(&log_path, 1)).unwrap().len() as u64,
981 LOG_ROTATION_BYTES + 1
982 );
983 assert_eq!(
984 std::fs::read_to_string(rotated_log_path(&log_path, 2)).unwrap(),
985 "older-1"
986 );
987 assert_eq!(
988 std::fs::read_to_string(rotated_log_path(&log_path, 3)).unwrap(),
989 "older-2"
990 );
991 assert!(!rotated_log_path(&log_path, 4).exists());
992 }
993
994 #[test]
995 fn test_rotate_log_noop_under_threshold() {
996 let tmp = tempfile::tempdir().unwrap();
997 let log_path = daemon_log_path(tmp.path());
998 std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
999 std::fs::write(&log_path, b"small-log").unwrap();
1000
1001 rotate_log_if_needed(&log_path).unwrap();
1002
1003 assert_eq!(std::fs::read_to_string(&log_path).unwrap(), "small-log");
1004 assert!(!rotated_log_path(&log_path, 1).exists());
1005 }
1006
1007 #[test]
1008 fn test_daemon_log_append_mode() {
1009 let tmp = tempfile::tempdir().unwrap();
1010 let log_path = daemon_log_path(tmp.path());
1011
1012 {
1013 let mut file = open_log_for_append(&log_path).unwrap();
1014 use std::io::Write;
1015 writeln!(file, "first").unwrap();
1016 }
1017
1018 {
1019 let mut file = open_log_for_append(&log_path).unwrap();
1020 use std::io::Write;
1021 writeln!(file, "second").unwrap();
1022 }
1023
1024 assert_eq!(
1025 std::fs::read_to_string(&log_path).unwrap(),
1026 "first\nsecond\n"
1027 );
1028 }
1029
1030 #[test]
1031 fn daemon_spawn_args_include_verbose_and_resume() {
1032 assert_eq!(
1033 daemon_spawn_args("/tmp/project", false),
1034 vec![
1035 "-v".to_string(),
1036 "daemon".to_string(),
1037 "--project-root".to_string(),
1038 "/tmp/project".to_string()
1039 ]
1040 );
1041 assert_eq!(
1042 daemon_spawn_args("/tmp/project", true),
1043 vec![
1044 "-v".to_string(),
1045 "daemon".to_string(),
1046 "--project-root".to_string(),
1047 "/tmp/project".to_string(),
1048 "--resume".to_string()
1049 ]
1050 );
1051 }
1052
1053 #[test]
1054 fn watchdog_spawn_args_include_verbose_and_resume() {
1055 assert_eq!(
1056 watchdog_spawn_args("/tmp/project", false),
1057 vec![
1058 "-v".to_string(),
1059 "watchdog".to_string(),
1060 "--project-root".to_string(),
1061 "/tmp/project".to_string()
1062 ]
1063 );
1064 assert_eq!(
1065 watchdog_spawn_args("/tmp/project", true),
1066 vec![
1067 "-v".to_string(),
1068 "watchdog".to_string(),
1069 "--project-root".to_string(),
1070 "/tmp/project".to_string(),
1071 "--resume".to_string()
1072 ]
1073 );
1074 }
1075
1076 #[test]
1077 fn record_watchdog_crash_applies_exponential_backoff_until_circuit_breaker() {
1078 let tmp = tempfile::tempdir().unwrap();
1079 let mut state = PersistedWatchdogState::default();
1080
1081 assert_eq!(
1082 record_watchdog_crash(tmp.path(), &mut state, "boom-1".to_string()).unwrap(),
1083 Some(1)
1084 );
1085 assert_eq!(
1086 record_watchdog_crash(tmp.path(), &mut state, "boom-2".to_string()).unwrap(),
1087 Some(2)
1088 );
1089 assert_eq!(
1090 record_watchdog_crash(tmp.path(), &mut state, "boom-3".to_string()).unwrap(),
1091 Some(4)
1092 );
1093 assert_eq!(
1094 record_watchdog_crash(tmp.path(), &mut state, "boom-4".to_string()).unwrap(),
1095 Some(8)
1096 );
1097 assert_eq!(
1098 record_watchdog_crash(tmp.path(), &mut state, "boom-5".to_string()).unwrap(),
1099 None
1100 );
1101 assert!(state.circuit_breaker_tripped);
1102 assert_eq!(state.restart_count, 5);
1103 }
1104}