Skip to main content

batty_cli/team/
daemon_mgmt.rs

1//! Daemon management: spawning, PID tracking, signal handling, log rotation,
2//! graceful shutdown, and team start / daemon entry point.
3//!
4//! Extracted from `lifecycle.rs` — pure refactor, zero logic changes.
5
6use std::fs::File;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::Duration;
11
12use anyhow::{Context, Result, bail};
13use serde::{Deserialize, Serialize};
14use tracing::{error, info, warn};
15
16use super::{config, daemon, events, hierarchy, inbox, layout, team_config_path};
17use crate::tmux;
18
19pub(crate) const LOG_ROTATION_BYTES: u64 = 5 * 1024 * 1024;
20const LOG_ROTATION_KEEP: usize = 3;
21pub(super) const DAEMON_SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
22const DAEMON_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(100);
23const WATCHDOG_POLL_INTERVAL: Duration = Duration::from_millis(200);
24const WATCHDOG_INITIAL_BACKOFF_SECS: u64 = 1;
25const WATCHDOG_MAX_BACKOFF_SECS: u64 = 30;
26const WATCHDOG_CIRCUIT_BREAKER_THRESHOLD: usize = 5;
27const WATCHDOG_CIRCUIT_BREAKER_WINDOW_SECS: u64 = 60;
28const DAEMON_CHILD_PID_FILE: &str = "daemon-child.pid";
29
30#[cfg(unix)]
31static WATCHDOG_SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
32
33/// Path to the daemon PID file.
34fn daemon_pid_path(project_root: &Path) -> PathBuf {
35    project_root.join(".batty").join("daemon.pid")
36}
37
38/// Path to the daemon log file.
39pub(crate) fn daemon_log_path(project_root: &Path) -> PathBuf {
40    project_root.join(".batty").join("daemon.log")
41}
42
43fn rotated_log_path(path: &Path, generation: usize) -> PathBuf {
44    PathBuf::from(format!("{}.{}", path.display(), generation))
45}
46
47pub(crate) fn rotate_log_if_needed(path: &Path) -> Result<()> {
48    let len = match std::fs::metadata(path) {
49        Ok(metadata) => metadata.len(),
50        Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
51        Err(error) => {
52            return Err(error).with_context(|| format!("failed to stat {}", path.display()));
53        }
54    };
55
56    if len <= LOG_ROTATION_BYTES {
57        return Ok(());
58    }
59
60    let oldest = rotated_log_path(path, LOG_ROTATION_KEEP);
61    if oldest.exists() {
62        std::fs::remove_file(&oldest)
63            .with_context(|| format!("failed to remove {}", oldest.display()))?;
64    }
65
66    for generation in (1..LOG_ROTATION_KEEP).rev() {
67        let source = rotated_log_path(path, generation);
68        if !source.exists() {
69            continue;
70        }
71        let destination = rotated_log_path(path, generation + 1);
72        std::fs::rename(&source, &destination).with_context(|| {
73            format!(
74                "failed to rotate {} to {}",
75                source.display(),
76                destination.display()
77            )
78        })?;
79    }
80
81    let rotated = rotated_log_path(path, 1);
82    std::fs::rename(path, &rotated).with_context(|| {
83        format!(
84            "failed to rotate {} to {}",
85            path.display(),
86            rotated.display()
87        )
88    })?;
89    Ok(())
90}
91
92pub(crate) fn open_log_for_append(path: &Path) -> Result<File> {
93    if let Some(parent) = path.parent() {
94        std::fs::create_dir_all(parent)?;
95    }
96    rotate_log_if_needed(path)?;
97    File::options()
98        .append(true)
99        .create(true)
100        .open(path)
101        .with_context(|| format!("failed to open log file: {}", path.display()))
102}
103
104fn daemon_spawn_args(root_str: &str, resume: bool) -> Vec<String> {
105    let mut args = vec![
106        "-v".to_string(),
107        "daemon".to_string(),
108        "--project-root".to_string(),
109        root_str.to_string(),
110    ];
111    if resume {
112        args.push("--resume".to_string());
113    }
114    args
115}
116
117fn watchdog_spawn_args(root_str: &str, resume: bool) -> Vec<String> {
118    let mut args = vec![
119        "-v".to_string(),
120        "watchdog".to_string(),
121        "--project-root".to_string(),
122        root_str.to_string(),
123    ];
124    if resume {
125        args.push("--resume".to_string());
126    }
127    args
128}
129
130pub(crate) fn daemon_state_path(project_root: &Path) -> PathBuf {
131    project_root.join(".batty").join("daemon-state.json")
132}
133
134pub(crate) fn watchdog_state_path(project_root: &Path) -> PathBuf {
135    project_root.join(".batty").join("watchdog-state.json")
136}
137
138fn daemon_child_pid_path(project_root: &Path) -> PathBuf {
139    project_root.join(".batty").join(DAEMON_CHILD_PID_FILE)
140}
141
142#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
143pub(crate) struct PersistedWatchdogState {
144    #[serde(default)]
145    pub restart_count: u32,
146    #[serde(default)]
147    pub crash_timestamps: Vec<u64>,
148    #[serde(default)]
149    pub circuit_breaker_tripped: bool,
150    #[serde(default)]
151    pub child_pid: Option<u32>,
152    #[serde(default)]
153    pub current_backoff_secs: Option<u64>,
154    #[serde(default)]
155    pub last_exit_reason: Option<String>,
156}
157
158fn load_watchdog_state(project_root: &Path) -> Result<PersistedWatchdogState> {
159    let path = watchdog_state_path(project_root);
160    if !path.exists() {
161        return Ok(PersistedWatchdogState::default());
162    }
163    let content = std::fs::read_to_string(&path)
164        .with_context(|| format!("failed to read {}", path.display()))?;
165    serde_json::from_str(&content).with_context(|| format!("failed to parse {}", path.display()))
166}
167
168fn save_watchdog_state(project_root: &Path, state: &PersistedWatchdogState) -> Result<()> {
169    let path = watchdog_state_path(project_root);
170    if let Some(parent) = path.parent() {
171        std::fs::create_dir_all(parent)
172            .with_context(|| format!("failed to create {}", parent.display()))?;
173    }
174    let content =
175        serde_json::to_string_pretty(state).context("failed to serialize watchdog state")?;
176    std::fs::write(&path, content).with_context(|| format!("failed to write {}", path.display()))
177}
178
179/// Spawn the daemon as a detached background process.
180///
181/// The daemon runs in its own process group with stdio redirected to a log
182/// file, so it survives terminal closure. PID is saved to `.batty/daemon.pid`.
183fn spawn_detached_process(
184    project_root: &Path,
185    args: &[String],
186    pid_path: &Path,
187    process_name: &str,
188) -> Result<u32> {
189    use std::process::{Command, Stdio};
190
191    let log_path = daemon_log_path(project_root);
192
193    // Ensure .batty/ exists
194    if let Some(parent) = log_path.parent() {
195        std::fs::create_dir_all(parent)?;
196    }
197
198    let log_file = open_log_for_append(&log_path)?;
199    let log_err = log_file
200        .try_clone()
201        .context("failed to clone log file handle")?;
202
203    let exe = std::env::current_exe().context("failed to resolve current executable")?;
204
205    let mut cmd = Command::new(exe);
206    cmd.args(args)
207        .stdin(Stdio::null())
208        .stdout(log_file)
209        .stderr(log_err);
210
211    // Detach into a new process group so it survives terminal closure
212    #[cfg(unix)]
213    {
214        use std::os::unix::process::CommandExt;
215        cmd.process_group(0);
216    }
217
218    let mut child = cmd.spawn().context("failed to spawn daemon process")?;
219    let pid = child.id();
220
221    // Give the child a moment to start up and verify it didn't exit immediately
222    // (e.g. due to an unrecognized subcommand in an outdated binary).
223    std::thread::sleep(std::time::Duration::from_millis(500));
224    match child.try_wait() {
225        Ok(Some(status)) => {
226            let _ = std::fs::remove_file(pid_path);
227            // Read the last few lines of the daemon log for the actual error
228            let tail = std::fs::read_to_string(&log_path).ok().and_then(|s| {
229                let lines: Vec<&str> = s.lines().collect();
230                let start = lines.len().saturating_sub(5);
231                let tail = lines[start..].join("\n");
232                if tail.trim().is_empty() {
233                    None
234                } else {
235                    Some(tail)
236                }
237            });
238            match tail {
239                Some(detail) => bail!(
240                    "{process_name} process exited immediately with {status}\n\n\
241                     {detail}\n\n\
242                     see full log: {log}",
243                    log = log_path.display(),
244                ),
245                None => bail!(
246                    "{process_name} process exited immediately with {status}; \
247                     see {log} for details",
248                    log = log_path.display(),
249                ),
250            }
251        }
252        Ok(None) => {} // still running — good
253        Err(e) => {
254            warn!(pid, error = %e, "failed to check daemon process status");
255        }
256    }
257
258    std::fs::write(pid_path, pid.to_string())
259        .with_context(|| format!("failed to write PID file: {}", pid_path.display()))?;
260
261    info!(pid, log = %log_path.display(), process = process_name, "background process spawned");
262    Ok(pid)
263}
264
265fn spawn_watchdog(project_root: &Path, resume: bool) -> Result<u32> {
266    let root_str = project_root
267        .canonicalize()
268        .unwrap_or_else(|_| project_root.to_path_buf())
269        .to_string_lossy()
270        .to_string();
271    let args = watchdog_spawn_args(&root_str, resume);
272    spawn_detached_process(
273        project_root,
274        &args,
275        &daemon_pid_path(project_root),
276        "watchdog",
277    )
278}
279
280fn spawn_daemon_child(project_root: &Path, resume: bool) -> Result<std::process::Child> {
281    use std::process::Command;
282
283    let exe = std::env::current_exe().context("failed to resolve current executable")?;
284    let root_str = project_root
285        .canonicalize()
286        .unwrap_or_else(|_| project_root.to_path_buf())
287        .to_string_lossy()
288        .to_string();
289
290    let mut cmd = Command::new(exe);
291    cmd.args(daemon_spawn_args(&root_str, resume));
292    cmd.spawn().context("failed to spawn daemon child")
293}
294
295#[cfg(unix)]
296extern "C" fn handle_watchdog_shutdown_signal(_signal: libc::c_int) {
297    WATCHDOG_SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
298}
299
300#[cfg(unix)]
301fn install_watchdog_signal_handlers() -> Result<()> {
302    unsafe {
303        libc::signal(
304            libc::SIGTERM,
305            handle_watchdog_shutdown_signal as *const () as libc::sighandler_t,
306        );
307        libc::signal(
308            libc::SIGINT,
309            handle_watchdog_shutdown_signal as *const () as libc::sighandler_t,
310        );
311        libc::signal(
312            libc::SIGHUP,
313            handle_watchdog_shutdown_signal as *const () as libc::sighandler_t,
314        );
315    }
316    WATCHDOG_SHUTDOWN_REQUESTED.store(false, Ordering::SeqCst);
317    Ok(())
318}
319
320#[cfg(not(unix))]
321fn install_watchdog_signal_handlers() -> Result<()> {
322    Ok(())
323}
324
325#[cfg(unix)]
326fn watchdog_shutdown_requested() -> bool {
327    WATCHDOG_SHUTDOWN_REQUESTED.load(Ordering::SeqCst)
328}
329
330#[cfg(not(unix))]
331fn watchdog_shutdown_requested() -> bool {
332    false
333}
334
335fn record_watchdog_crash(
336    project_root: &Path,
337    state: &mut PersistedWatchdogState,
338    reason: String,
339) -> Result<Option<u64>> {
340    let now = super::now_unix();
341    state.restart_count += 1;
342    state.last_exit_reason = Some(reason);
343    state.child_pid = None;
344    state
345        .crash_timestamps
346        .retain(|ts| now.saturating_sub(*ts) < WATCHDOG_CIRCUIT_BREAKER_WINDOW_SECS);
347    state.crash_timestamps.push(now);
348
349    if state.crash_timestamps.len() >= WATCHDOG_CIRCUIT_BREAKER_THRESHOLD {
350        state.circuit_breaker_tripped = true;
351        state.current_backoff_secs = None;
352        save_watchdog_state(project_root, state)?;
353        return Ok(None);
354    }
355
356    let exponent = state.crash_timestamps.len().saturating_sub(1) as u32;
357    let backoff_secs = (WATCHDOG_INITIAL_BACKOFF_SECS
358        .saturating_mul(2u64.saturating_pow(exponent)))
359    .min(WATCHDOG_MAX_BACKOFF_SECS);
360    state.current_backoff_secs = Some(backoff_secs);
361    save_watchdog_state(project_root, state)?;
362    Ok(Some(backoff_secs))
363}
364
365fn clear_watchdog_child_pid(project_root: &Path, state: &mut PersistedWatchdogState) -> Result<()> {
366    state.child_pid = None;
367    let _ = std::fs::remove_file(daemon_child_pid_path(project_root));
368    save_watchdog_state(project_root, state)
369}
370
371fn terminate_daemon_child(child: &mut std::process::Child) {
372    #[cfg(unix)]
373    {
374        let _ = send_unix_signal(child.id(), libc::SIGTERM);
375    }
376
377    let deadline = std::time::Instant::now() + DAEMON_SHUTDOWN_GRACE_PERIOD;
378    loop {
379        match child.try_wait() {
380            Ok(Some(_)) => return,
381            Ok(None) if std::time::Instant::now() < deadline => {
382                std::thread::sleep(DAEMON_SHUTDOWN_POLL_INTERVAL);
383            }
384            Ok(None) | Err(_) => {
385                #[cfg(unix)]
386                {
387                    let _ = send_unix_signal(child.id(), libc::SIGKILL);
388                }
389                let _ = child.wait();
390                return;
391            }
392        }
393    }
394}
395
396/// Kill the daemon process if it's running.
397fn read_daemon_pid(project_root: &Path) -> Option<u32> {
398    let pid_path = daemon_pid_path(project_root);
399    let pid_str = std::fs::read_to_string(pid_path).ok()?;
400    pid_str.trim().parse::<u32>().ok()
401}
402
403#[cfg(unix)]
404fn send_unix_signal(pid: u32, signal: libc::c_int) -> bool {
405    let status = unsafe { libc::kill(pid as libc::pid_t, signal) };
406    if status == 0 {
407        true
408    } else {
409        let error = std::io::Error::last_os_error();
410        warn!(pid, signal, error = %error, "failed to signal daemon");
411        false
412    }
413}
414
415#[cfg(not(unix))]
416fn send_unix_signal(_pid: u32, _signal: i32) -> bool {
417    false
418}
419
420#[cfg(unix)]
421fn daemon_process_exists(pid: u32) -> bool {
422    let status = unsafe { libc::kill(pid as libc::pid_t, 0) };
423    if status == 0 {
424        true
425    } else {
426        !matches!(
427            std::io::Error::last_os_error().raw_os_error(),
428            Some(libc::ESRCH)
429        )
430    }
431}
432
433#[cfg(not(unix))]
434fn daemon_process_exists(_pid: u32) -> bool {
435    false
436}
437
438fn wait_for_graceful_daemon_shutdown(
439    project_root: &Path,
440    pid: u32,
441    previous_saved_at: Option<u64>,
442    timeout: Duration,
443) -> bool {
444    let deadline = std::time::Instant::now() + timeout;
445    loop {
446        let clean_snapshot = daemon_state_indicates_clean_shutdown(project_root, previous_saved_at);
447        if clean_snapshot {
448            let _ = std::fs::remove_file(daemon_pid_path(project_root));
449            return true;
450        }
451        let running = daemon_process_exists(pid);
452        if !running {
453            let _ = std::fs::remove_file(daemon_pid_path(project_root));
454            return false;
455        }
456        if std::time::Instant::now() >= deadline {
457            return false;
458        }
459        std::thread::sleep(DAEMON_SHUTDOWN_POLL_INTERVAL);
460    }
461}
462
463pub(super) fn request_graceful_daemon_shutdown(project_root: &Path, timeout: Duration) -> bool {
464    let Some(pid) = read_daemon_pid(project_root) else {
465        return true;
466    };
467
468    let previous_saved_at = read_daemon_state_probe(project_root).and_then(|state| state.saved_at);
469    #[cfg(unix)]
470    {
471        if !send_unix_signal(pid, libc::SIGTERM) {
472            return false;
473        }
474        info!(pid, "sent SIGTERM to daemon");
475    }
476    #[cfg(not(unix))]
477    {
478        warn!(
479            pid,
480            "graceful daemon shutdown is not supported on this platform"
481        );
482        return false;
483    }
484
485    wait_for_graceful_daemon_shutdown(project_root, pid, previous_saved_at, timeout)
486}
487
488pub(super) fn force_kill_daemon(project_root: &Path) {
489    let Some(pid) = read_daemon_pid(project_root) else {
490        return;
491    };
492
493    #[cfg(unix)]
494    {
495        if send_unix_signal(pid, libc::SIGKILL) {
496            info!(pid, "sent SIGKILL to daemon");
497        }
498    }
499    #[cfg(not(unix))]
500    {
501        warn!(pid, "cannot force-kill daemon on this platform");
502    }
503
504    let _ = std::fs::remove_file(daemon_pid_path(project_root));
505}
506
507/// Start a team session: load config, resolve hierarchy, create tmux layout,
508/// spawn the daemon as a background process, and optionally attach.
509///
510/// Returns the tmux session name.
511pub fn start_team(project_root: &Path, attach: bool) -> Result<String> {
512    let config_path = team_config_path(project_root);
513    if !config_path.exists() {
514        bail!(
515            "no team config found at {}; run `batty init` first",
516            config_path.display()
517        );
518    }
519
520    let team_config = config::TeamConfig::load(&config_path)?;
521    team_config.validate()?;
522
523    let members = hierarchy::resolve_hierarchy(&team_config)?;
524    let session = format!("batty-{}", team_config.name);
525
526    if tmux::session_exists(&session) {
527        bail!("session '{session}' already exists; use `batty attach` or `batty stop` first");
528    }
529
530    layout::build_layout(
531        &session,
532        &members,
533        &team_config.layout,
534        project_root,
535        team_config.workflow_mode,
536        team_config.orchestrator_enabled(),
537        team_config.orchestrator_position,
538    )?;
539
540    // Initialize Maildir inboxes for all members
541    let inboxes = inbox::inboxes_root(project_root);
542    for member in &members {
543        inbox::init_inbox(&inboxes, &member.name)?;
544    }
545
546    // Check for resume marker (left by a prior `batty stop`)
547    let marker = resume_marker_path(project_root);
548    let resume = marker.exists() || should_resume_from_daemon_state(project_root);
549    if resume {
550        if marker.exists() {
551            // Consume the marker — it's a one-shot flag
552            std::fs::remove_file(&marker).ok();
553        }
554        info!("resuming agent sessions from previous run");
555    }
556
557    info!(session = %session, members = members.len(), resume, "team session started");
558
559    // Spawn watchdog as a detached background process. It supervises the daemon
560    // child and handles crash backoff/restart policy.
561    let pid = spawn_watchdog(project_root, resume)?;
562    info!(pid, "watchdog process launched");
563
564    // Give daemon a moment to start spawning agents
565    std::thread::sleep(std::time::Duration::from_secs(2));
566
567    if attach {
568        tmux::attach(&session)?;
569    }
570
571    Ok(session)
572}
573
574/// Run the daemon loop directly (called by the hidden `batty daemon` subcommand).
575///
576/// This is the entry point for the daemonized background process.
577pub fn run_daemon(project_root: &Path, resume: bool) -> Result<()> {
578    let config_path = team_config_path(project_root);
579    if !config_path.exists() {
580        bail!(
581            "no team config found at {}; run `batty init` first",
582            config_path.display()
583        );
584    }
585
586    let team_config = config::TeamConfig::load(&config_path)?;
587    let members = hierarchy::resolve_hierarchy(&team_config)?;
588    let session = format!("batty-{}", team_config.name);
589
590    // Wait for tmux session to be ready (start_team creates it before spawning us)
591    for _ in 0..30 {
592        if tmux::session_exists(&session) {
593            break;
594        }
595        std::thread::sleep(std::time::Duration::from_millis(200));
596    }
597
598    if !tmux::session_exists(&session) {
599        bail!("tmux session '{session}' not found — did `batty start` create it?");
600    }
601
602    // Reconstruct pane_map from tmux pane options
603    let mut pane_map = std::collections::HashMap::new();
604    for member in &members {
605        // Query tmux for the pane ID tagged with this member's role
606        if let Some(pane_id) = find_pane_for_member(&session, &member.name) {
607            pane_map.insert(member.name.clone(), pane_id);
608        }
609    }
610
611    let daemon_config = daemon::DaemonConfig {
612        project_root: project_root.to_path_buf(),
613        team_config,
614        session,
615        members,
616        pane_map,
617    };
618
619    let events_path = project_root
620        .join(".batty")
621        .join("team_config")
622        .join("events.jsonl");
623
624    let mut d = daemon::TeamDaemon::new(daemon_config)?;
625
626    // Wrap in catch_unwind so panics are logged to events before exit
627    let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| d.run(resume)));
628
629    match result {
630        Ok(Ok(())) => Ok(()),
631        Ok(Err(e)) => {
632            error!(error = %e, "daemon exited with error");
633            eprintln!("daemon exited with error: {e:#}");
634            // Try to log the error event
635            if let Ok(mut sink) = events::EventSink::new(&events_path) {
636                let _ = sink.emit(events::TeamEvent::daemon_stopped_with_reason(
637                    &format!("error: {e:#}"),
638                    0,
639                ));
640            }
641            Err(e)
642        }
643        Err(panic_payload) => {
644            let reason = match panic_payload.downcast_ref::<&str>() {
645                Some(s) => s.to_string(),
646                None => match panic_payload.downcast_ref::<String>() {
647                    Some(s) => s.clone(),
648                    None => "unknown panic".to_string(),
649                },
650            };
651            error!(reason = %reason, "daemon panicked");
652            eprintln!("daemon panicked: {reason}");
653            // Log panic event
654            if let Ok(mut sink) = events::EventSink::new(&events_path) {
655                let _ = sink.emit(events::TeamEvent::daemon_panic(&reason));
656            }
657            std::panic::resume_unwind(panic_payload);
658        }
659    }
660}
661
662pub fn run_watchdog(project_root: &Path, resume: bool) -> Result<()> {
663    install_watchdog_signal_handlers()?;
664
665    let mut state = load_watchdog_state(project_root).unwrap_or_default();
666    state.circuit_breaker_tripped = false;
667    state.current_backoff_secs = None;
668    state.last_exit_reason = None;
669    state.child_pid = None;
670    save_watchdog_state(project_root, &state)?;
671
672    let mut resume_on_launch = resume;
673
674    loop {
675        if watchdog_shutdown_requested() {
676            let _ = std::fs::remove_file(daemon_pid_path(project_root));
677            let _ = std::fs::remove_file(daemon_child_pid_path(project_root));
678            state.child_pid = None;
679            state.current_backoff_secs = None;
680            save_watchdog_state(project_root, &state)?;
681            return Ok(());
682        }
683
684        let mut child = spawn_daemon_child(project_root, resume_on_launch)?;
685        resume_on_launch = true;
686
687        state.child_pid = Some(child.id());
688        state.current_backoff_secs = None;
689        save_watchdog_state(project_root, &state)?;
690        std::fs::write(daemon_child_pid_path(project_root), child.id().to_string()).with_context(
691            || {
692                format!(
693                    "failed to write child PID file: {}",
694                    daemon_child_pid_path(project_root).display()
695                )
696            },
697        )?;
698
699        loop {
700            if watchdog_shutdown_requested() {
701                terminate_daemon_child(&mut child);
702                let _ = std::fs::remove_file(daemon_pid_path(project_root));
703                clear_watchdog_child_pid(project_root, &mut state)?;
704                return Ok(());
705            }
706
707            match child.try_wait() {
708                Ok(Some(exit_status)) => {
709                    clear_watchdog_child_pid(project_root, &mut state)?;
710                    let reason = if let Some(code) = exit_status.code() {
711                        format!("daemon exited with status {code}")
712                    } else {
713                        "daemon exited from signal".to_string()
714                    };
715                    if let Some(backoff_secs) =
716                        record_watchdog_crash(project_root, &mut state, reason.clone())?
717                    {
718                        warn!(backoff_secs, reason = %reason, "daemon crashed; watchdog restarting with backoff");
719                        std::thread::sleep(Duration::from_secs(backoff_secs));
720                        break;
721                    }
722
723                    warn!(
724                        reason = %reason,
725                        threshold = WATCHDOG_CIRCUIT_BREAKER_THRESHOLD,
726                        window_secs = WATCHDOG_CIRCUIT_BREAKER_WINDOW_SECS,
727                        "watchdog circuit breaker tripped; daemon will not be restarted"
728                    );
729                    let _ = std::fs::remove_file(daemon_pid_path(project_root));
730                    return Ok(());
731                }
732                Ok(None) => std::thread::sleep(WATCHDOG_POLL_INTERVAL),
733                Err(error) => {
734                    clear_watchdog_child_pid(project_root, &mut state)?;
735                    let reason = format!("failed to poll daemon child: {error}");
736                    if let Some(backoff_secs) =
737                        record_watchdog_crash(project_root, &mut state, reason.clone())?
738                    {
739                        warn!(backoff_secs, reason = %reason, "watchdog poll failed; retrying daemon launch");
740                        std::thread::sleep(Duration::from_secs(backoff_secs));
741                        break;
742                    }
743                    warn!(
744                        reason = %reason,
745                        threshold = WATCHDOG_CIRCUIT_BREAKER_THRESHOLD,
746                        window_secs = WATCHDOG_CIRCUIT_BREAKER_WINDOW_SECS,
747                        "watchdog circuit breaker tripped after daemon poll failures"
748                    );
749                    let _ = std::fs::remove_file(daemon_pid_path(project_root));
750                    return Ok(());
751                }
752            }
753        }
754    }
755}
756
757/// Find the tmux pane ID tagged with `@batty_role=<member_name>` in a session.
758fn find_pane_for_member(session: &str, member_name: &str) -> Option<String> {
759    let output = std::process::Command::new("tmux")
760        .args([
761            "list-panes",
762            "-t",
763            session,
764            "-F",
765            "#{pane_id} #{@batty_role}",
766        ])
767        .output()
768        .ok()?;
769
770    if !output.status.success() {
771        return None;
772    }
773
774    let stdout = String::from_utf8_lossy(&output.stdout);
775    for line in stdout.lines() {
776        let parts: Vec<&str> = line.splitn(2, ' ').collect();
777        if parts.len() == 2 && parts[1] == member_name {
778            return Some(parts[0].to_string());
779        }
780    }
781    None
782}
783
784/// Path to the resume marker file. Presence indicates agents have prior sessions.
785pub(super) fn resume_marker_path(project_root: &Path) -> PathBuf {
786    project_root.join(".batty").join("resume")
787}
788
789#[derive(Debug, Deserialize)]
790struct DaemonStateResumeProbe {
791    #[serde(default)]
792    clean_shutdown: bool,
793    #[serde(default)]
794    saved_at: Option<u64>,
795}
796
797fn read_daemon_state_probe(project_root: &Path) -> Option<DaemonStateResumeProbe> {
798    let path = daemon_state_path(project_root);
799    let content = std::fs::read_to_string(&path).ok()?;
800
801    match serde_json::from_str::<DaemonStateResumeProbe>(&content) {
802        Ok(state) => Some(state),
803        Err(error) => {
804            warn!(
805                path = %path.display(),
806                error = %error,
807                "failed to parse daemon state while probing for resume"
808            );
809            None
810        }
811    }
812}
813
814fn daemon_state_indicates_clean_shutdown(
815    project_root: &Path,
816    previous_saved_at: Option<u64>,
817) -> bool {
818    let Some(state) = read_daemon_state_probe(project_root) else {
819        return false;
820    };
821
822    state.clean_shutdown
823        && match (state.saved_at, previous_saved_at) {
824            (Some(saved_at), Some(previous_saved_at)) => saved_at > previous_saved_at,
825            (Some(_), None) => true,
826            (None, Some(_)) => false,
827            (None, None) => true,
828        }
829}
830
831fn should_resume_from_daemon_state(project_root: &Path) -> bool {
832    read_daemon_state_probe(project_root)
833        .map(|state| !state.clean_shutdown)
834        .unwrap_or(false)
835}
836
837#[cfg(test)]
838mod tests {
839    use super::*;
840    use serial_test::serial;
841
842    #[test]
843    fn daemon_state_probe_requests_resume_after_unclean_shutdown() {
844        let tmp = tempfile::tempdir().unwrap();
845        let path = daemon_state_path(tmp.path());
846        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
847        std::fs::write(&path, r#"{"clean_shutdown":false}"#).unwrap();
848
849        assert!(should_resume_from_daemon_state(tmp.path()));
850    }
851
852    #[test]
853    fn daemon_state_probe_ignores_clean_shutdown() {
854        let tmp = tempfile::tempdir().unwrap();
855        let path = daemon_state_path(tmp.path());
856        std::fs::create_dir_all(path.parent().unwrap()).unwrap();
857        std::fs::write(&path, r#"{"clean_shutdown":true}"#).unwrap();
858
859        assert!(!should_resume_from_daemon_state(tmp.path()));
860    }
861
862    #[cfg(unix)]
863    fn write_daemon_script(script_path: &Path, body: &str) {
864        std::fs::write(script_path, body).unwrap();
865        use std::os::unix::fs::PermissionsExt;
866        std::fs::set_permissions(script_path, std::fs::Permissions::from_mode(0o755)).unwrap();
867    }
868
869    #[cfg(unix)]
870    #[test]
871    #[serial]
872    fn graceful_daemon_shutdown_waits_for_clean_snapshot() {
873        let tmp = tempfile::tempdir().unwrap();
874        let state_path = daemon_state_path(tmp.path());
875        let state_dir = state_path.parent().unwrap();
876        std::fs::create_dir_all(state_dir).unwrap();
877        std::fs::write(&state_path, r#"{"clean_shutdown":false,"saved_at":1}"#).unwrap();
878
879        let state_path_for_thread = state_path.clone();
880        let state_dir_for_thread = state_dir.to_path_buf();
881        let writer = std::thread::spawn(move || {
882            std::thread::sleep(Duration::from_millis(200));
883            std::fs::create_dir_all(&state_dir_for_thread).unwrap();
884            std::fs::write(
885                &state_path_for_thread,
886                r#"{"clean_shutdown":true,"saved_at":2}"#,
887            )
888            .unwrap();
889        });
890
891        assert!(wait_for_graceful_daemon_shutdown(
892            tmp.path(),
893            std::process::id(),
894            Some(1),
895            Duration::from_secs(2)
896        ));
897
898        writer.join().unwrap();
899        assert!(daemon_state_indicates_clean_shutdown(tmp.path(), Some(1)));
900    }
901
902    #[cfg(unix)]
903    #[test]
904    #[serial]
905    fn graceful_daemon_shutdown_times_out_before_force_kill_fallback() {
906        let tmp = tempfile::tempdir().unwrap();
907        let script_path = tmp.path().join("stubborn-daemon.sh");
908        write_daemon_script(
909            &script_path,
910            "#!/bin/sh\ntrap '' TERM\nwhile :; do :; done\n",
911        );
912
913        let mut child = std::process::Command::new(&script_path).spawn().unwrap();
914        std::fs::create_dir_all(tmp.path().join(".batty")).unwrap();
915        std::fs::write(daemon_pid_path(tmp.path()), child.id().to_string()).unwrap();
916        std::thread::sleep(Duration::from_millis(200));
917
918        assert!(!request_graceful_daemon_shutdown(
919            tmp.path(),
920            Duration::from_millis(300)
921        ));
922        assert!(daemon_process_exists(child.id()));
923
924        force_kill_daemon(tmp.path());
925        let _ = child.wait().unwrap();
926        assert!(!daemon_pid_path(tmp.path()).exists());
927    }
928
929    #[test]
930    fn test_rotate_log_shifts_files() {
931        let tmp = tempfile::tempdir().unwrap();
932        let log_path = daemon_log_path(tmp.path());
933        std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
934        std::fs::write(&log_path, b"current").unwrap();
935        std::fs::write(rotated_log_path(&log_path, 1), b"older-1").unwrap();
936        std::fs::write(rotated_log_path(&log_path, 2), b"older-2").unwrap();
937        std::fs::OpenOptions::new()
938            .write(true)
939            .open(&log_path)
940            .unwrap()
941            .set_len(LOG_ROTATION_BYTES + 1)
942            .unwrap();
943
944        rotate_log_if_needed(&log_path).unwrap();
945
946        assert!(!log_path.exists());
947        assert_eq!(
948            std::fs::read(rotated_log_path(&log_path, 1)).unwrap().len() as u64,
949            LOG_ROTATION_BYTES + 1
950        );
951        assert_eq!(
952            std::fs::read_to_string(rotated_log_path(&log_path, 2)).unwrap(),
953            "older-1"
954        );
955        assert_eq!(
956            std::fs::read_to_string(rotated_log_path(&log_path, 3)).unwrap(),
957            "older-2"
958        );
959    }
960
961    #[test]
962    fn test_rotate_log_keeps_max_3() {
963        let tmp = tempfile::tempdir().unwrap();
964        let log_path = crate::team::orchestrator_log_path(tmp.path());
965        std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
966        std::fs::write(&log_path, b"current").unwrap();
967        std::fs::write(rotated_log_path(&log_path, 1), b"older-1").unwrap();
968        std::fs::write(rotated_log_path(&log_path, 2), b"older-2").unwrap();
969        std::fs::write(rotated_log_path(&log_path, 3), b"older-3").unwrap();
970        std::fs::OpenOptions::new()
971            .write(true)
972            .open(&log_path)
973            .unwrap()
974            .set_len(LOG_ROTATION_BYTES + 1)
975            .unwrap();
976
977        rotate_log_if_needed(&log_path).unwrap();
978
979        assert_eq!(
980            std::fs::read(rotated_log_path(&log_path, 1)).unwrap().len() as u64,
981            LOG_ROTATION_BYTES + 1
982        );
983        assert_eq!(
984            std::fs::read_to_string(rotated_log_path(&log_path, 2)).unwrap(),
985            "older-1"
986        );
987        assert_eq!(
988            std::fs::read_to_string(rotated_log_path(&log_path, 3)).unwrap(),
989            "older-2"
990        );
991        assert!(!rotated_log_path(&log_path, 4).exists());
992    }
993
994    #[test]
995    fn test_rotate_log_noop_under_threshold() {
996        let tmp = tempfile::tempdir().unwrap();
997        let log_path = daemon_log_path(tmp.path());
998        std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
999        std::fs::write(&log_path, b"small-log").unwrap();
1000
1001        rotate_log_if_needed(&log_path).unwrap();
1002
1003        assert_eq!(std::fs::read_to_string(&log_path).unwrap(), "small-log");
1004        assert!(!rotated_log_path(&log_path, 1).exists());
1005    }
1006
1007    #[test]
1008    fn test_daemon_log_append_mode() {
1009        let tmp = tempfile::tempdir().unwrap();
1010        let log_path = daemon_log_path(tmp.path());
1011
1012        {
1013            let mut file = open_log_for_append(&log_path).unwrap();
1014            use std::io::Write;
1015            writeln!(file, "first").unwrap();
1016        }
1017
1018        {
1019            let mut file = open_log_for_append(&log_path).unwrap();
1020            use std::io::Write;
1021            writeln!(file, "second").unwrap();
1022        }
1023
1024        assert_eq!(
1025            std::fs::read_to_string(&log_path).unwrap(),
1026            "first\nsecond\n"
1027        );
1028    }
1029
1030    #[test]
1031    fn daemon_spawn_args_include_verbose_and_resume() {
1032        assert_eq!(
1033            daemon_spawn_args("/tmp/project", false),
1034            vec![
1035                "-v".to_string(),
1036                "daemon".to_string(),
1037                "--project-root".to_string(),
1038                "/tmp/project".to_string()
1039            ]
1040        );
1041        assert_eq!(
1042            daemon_spawn_args("/tmp/project", true),
1043            vec![
1044                "-v".to_string(),
1045                "daemon".to_string(),
1046                "--project-root".to_string(),
1047                "/tmp/project".to_string(),
1048                "--resume".to_string()
1049            ]
1050        );
1051    }
1052
1053    #[test]
1054    fn watchdog_spawn_args_include_verbose_and_resume() {
1055        assert_eq!(
1056            watchdog_spawn_args("/tmp/project", false),
1057            vec![
1058                "-v".to_string(),
1059                "watchdog".to_string(),
1060                "--project-root".to_string(),
1061                "/tmp/project".to_string()
1062            ]
1063        );
1064        assert_eq!(
1065            watchdog_spawn_args("/tmp/project", true),
1066            vec![
1067                "-v".to_string(),
1068                "watchdog".to_string(),
1069                "--project-root".to_string(),
1070                "/tmp/project".to_string(),
1071                "--resume".to_string()
1072            ]
1073        );
1074    }
1075
1076    #[test]
1077    fn record_watchdog_crash_applies_exponential_backoff_until_circuit_breaker() {
1078        let tmp = tempfile::tempdir().unwrap();
1079        let mut state = PersistedWatchdogState::default();
1080
1081        assert_eq!(
1082            record_watchdog_crash(tmp.path(), &mut state, "boom-1".to_string()).unwrap(),
1083            Some(1)
1084        );
1085        assert_eq!(
1086            record_watchdog_crash(tmp.path(), &mut state, "boom-2".to_string()).unwrap(),
1087            Some(2)
1088        );
1089        assert_eq!(
1090            record_watchdog_crash(tmp.path(), &mut state, "boom-3".to_string()).unwrap(),
1091            Some(4)
1092        );
1093        assert_eq!(
1094            record_watchdog_crash(tmp.path(), &mut state, "boom-4".to_string()).unwrap(),
1095            Some(8)
1096        );
1097        assert_eq!(
1098            record_watchdog_crash(tmp.path(), &mut state, "boom-5".to_string()).unwrap(),
1099            None
1100        );
1101        assert!(state.circuit_breaker_tripped);
1102        assert_eq!(state.restart_count, 5);
1103    }
1104}