1use std::fs::File;
7use std::path::Path;
8use std::path::PathBuf;
9use std::time::Duration;
10
11use anyhow::{Context, Result, bail};
12use serde::Deserialize;
13use tracing::{info, warn};
14
15use super::{config, daemon, events, hierarchy, inbox, layout, team_config_path};
16use crate::tmux;
17
18pub(crate) const LOG_ROTATION_BYTES: u64 = 5 * 1024 * 1024;
19const LOG_ROTATION_KEEP: usize = 3;
20pub(super) const DAEMON_SHUTDOWN_GRACE_PERIOD: Duration = Duration::from_secs(5);
21const DAEMON_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(100);
22
23fn daemon_pid_path(project_root: &Path) -> PathBuf {
25 project_root.join(".batty").join("daemon.pid")
26}
27
28pub(crate) fn daemon_log_path(project_root: &Path) -> PathBuf {
30 project_root.join(".batty").join("daemon.log")
31}
32
33fn rotated_log_path(path: &Path, generation: usize) -> PathBuf {
34 PathBuf::from(format!("{}.{}", path.display(), generation))
35}
36
37pub(crate) fn rotate_log_if_needed(path: &Path) -> Result<()> {
38 let len = match std::fs::metadata(path) {
39 Ok(metadata) => metadata.len(),
40 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(()),
41 Err(error) => {
42 return Err(error).with_context(|| format!("failed to stat {}", path.display()));
43 }
44 };
45
46 if len <= LOG_ROTATION_BYTES {
47 return Ok(());
48 }
49
50 let oldest = rotated_log_path(path, LOG_ROTATION_KEEP);
51 if oldest.exists() {
52 std::fs::remove_file(&oldest)
53 .with_context(|| format!("failed to remove {}", oldest.display()))?;
54 }
55
56 for generation in (1..LOG_ROTATION_KEEP).rev() {
57 let source = rotated_log_path(path, generation);
58 if !source.exists() {
59 continue;
60 }
61 let destination = rotated_log_path(path, generation + 1);
62 std::fs::rename(&source, &destination).with_context(|| {
63 format!(
64 "failed to rotate {} to {}",
65 source.display(),
66 destination.display()
67 )
68 })?;
69 }
70
71 let rotated = rotated_log_path(path, 1);
72 std::fs::rename(path, &rotated).with_context(|| {
73 format!(
74 "failed to rotate {} to {}",
75 path.display(),
76 rotated.display()
77 )
78 })?;
79 Ok(())
80}
81
82pub(crate) fn open_log_for_append(path: &Path) -> Result<File> {
83 if let Some(parent) = path.parent() {
84 std::fs::create_dir_all(parent)?;
85 }
86 rotate_log_if_needed(path)?;
87 File::options()
88 .append(true)
89 .create(true)
90 .open(path)
91 .with_context(|| format!("failed to open log file: {}", path.display()))
92}
93
94fn daemon_spawn_args(root_str: &str, resume: bool) -> Vec<String> {
95 let mut args = vec![
96 "-v".to_string(),
97 "daemon".to_string(),
98 "--project-root".to_string(),
99 root_str.to_string(),
100 ];
101 if resume {
102 args.push("--resume".to_string());
103 }
104 args
105}
106
107pub(crate) fn daemon_state_path(project_root: &Path) -> PathBuf {
108 project_root.join(".batty").join("daemon-state.json")
109}
110
111fn spawn_daemon(project_root: &Path, resume: bool) -> Result<u32> {
116 use std::process::{Command, Stdio};
117
118 let log_path = daemon_log_path(project_root);
119 let pid_path = daemon_pid_path(project_root);
120
121 if let Some(parent) = log_path.parent() {
123 std::fs::create_dir_all(parent)?;
124 }
125
126 let log_file = open_log_for_append(&log_path)?;
127 let log_err = log_file
128 .try_clone()
129 .context("failed to clone log file handle")?;
130
131 let exe = std::env::current_exe().context("failed to resolve current executable")?;
132 let root_str = project_root
133 .canonicalize()
134 .unwrap_or_else(|_| project_root.to_path_buf())
135 .to_string_lossy()
136 .to_string();
137
138 let mut cmd = Command::new(exe);
139 let args = daemon_spawn_args(&root_str, resume);
140 cmd.args(&args)
141 .stdin(Stdio::null())
142 .stdout(log_file)
143 .stderr(log_err);
144
145 #[cfg(unix)]
147 {
148 use std::os::unix::process::CommandExt;
149 cmd.process_group(0);
150 }
151
152 let mut child = cmd.spawn().context("failed to spawn daemon process")?;
153 let pid = child.id();
154
155 std::thread::sleep(std::time::Duration::from_millis(500));
158 match child.try_wait() {
159 Ok(Some(status)) => {
160 let _ = std::fs::remove_file(&pid_path);
161 let tail = std::fs::read_to_string(&log_path).ok().and_then(|s| {
163 let lines: Vec<&str> = s.lines().collect();
164 let start = lines.len().saturating_sub(5);
165 let tail = lines[start..].join("\n");
166 if tail.trim().is_empty() {
167 None
168 } else {
169 Some(tail)
170 }
171 });
172 match tail {
173 Some(detail) => bail!(
174 "daemon process exited immediately with {status}\n\n\
175 {detail}\n\n\
176 see full log: {log}",
177 log = log_path.display(),
178 ),
179 None => bail!(
180 "daemon process exited immediately with {status}; \
181 see {log} for details",
182 log = log_path.display(),
183 ),
184 }
185 }
186 Ok(None) => {} Err(e) => {
188 warn!(pid, error = %e, "failed to check daemon process status");
189 }
190 }
191
192 std::fs::write(&pid_path, pid.to_string())
193 .with_context(|| format!("failed to write PID file: {}", pid_path.display()))?;
194
195 info!(pid, log = %log_path.display(), "daemon spawned");
196 Ok(pid)
197}
198
199fn read_daemon_pid(project_root: &Path) -> Option<u32> {
201 let pid_path = daemon_pid_path(project_root);
202 let pid_str = std::fs::read_to_string(pid_path).ok()?;
203 pid_str.trim().parse::<u32>().ok()
204}
205
206#[cfg(unix)]
207fn send_unix_signal(pid: u32, signal: libc::c_int) -> bool {
208 let status = unsafe { libc::kill(pid as libc::pid_t, signal) };
209 if status == 0 {
210 true
211 } else {
212 let error = std::io::Error::last_os_error();
213 warn!(pid, signal, error = %error, "failed to signal daemon");
214 false
215 }
216}
217
218#[cfg(not(unix))]
219fn send_unix_signal(_pid: u32, _signal: i32) -> bool {
220 false
221}
222
223#[cfg(unix)]
224fn daemon_process_exists(pid: u32) -> bool {
225 let status = unsafe { libc::kill(pid as libc::pid_t, 0) };
226 if status == 0 {
227 true
228 } else {
229 !matches!(
230 std::io::Error::last_os_error().raw_os_error(),
231 Some(libc::ESRCH)
232 )
233 }
234}
235
236#[cfg(not(unix))]
237fn daemon_process_exists(_pid: u32) -> bool {
238 false
239}
240
241fn wait_for_graceful_daemon_shutdown(
242 project_root: &Path,
243 pid: u32,
244 previous_saved_at: Option<u64>,
245 timeout: Duration,
246) -> bool {
247 let deadline = std::time::Instant::now() + timeout;
248 loop {
249 let clean_snapshot = daemon_state_indicates_clean_shutdown(project_root, previous_saved_at);
250 if clean_snapshot {
251 let _ = std::fs::remove_file(daemon_pid_path(project_root));
252 return true;
253 }
254 let running = daemon_process_exists(pid);
255 if !running {
256 let _ = std::fs::remove_file(daemon_pid_path(project_root));
257 return false;
258 }
259 if std::time::Instant::now() >= deadline {
260 return false;
261 }
262 std::thread::sleep(DAEMON_SHUTDOWN_POLL_INTERVAL);
263 }
264}
265
266pub(super) fn request_graceful_daemon_shutdown(project_root: &Path, timeout: Duration) -> bool {
267 let Some(pid) = read_daemon_pid(project_root) else {
268 return true;
269 };
270
271 let previous_saved_at = read_daemon_state_probe(project_root).and_then(|state| state.saved_at);
272 #[cfg(unix)]
273 {
274 if !send_unix_signal(pid, libc::SIGTERM) {
275 return false;
276 }
277 info!(pid, "sent SIGTERM to daemon");
278 }
279 #[cfg(not(unix))]
280 {
281 warn!(
282 pid,
283 "graceful daemon shutdown is not supported on this platform"
284 );
285 return false;
286 }
287
288 wait_for_graceful_daemon_shutdown(project_root, pid, previous_saved_at, timeout)
289}
290
291pub(super) fn force_kill_daemon(project_root: &Path) {
292 let Some(pid) = read_daemon_pid(project_root) else {
293 return;
294 };
295
296 #[cfg(unix)]
297 {
298 if send_unix_signal(pid, libc::SIGKILL) {
299 info!(pid, "sent SIGKILL to daemon");
300 }
301 }
302 #[cfg(not(unix))]
303 {
304 warn!(pid, "cannot force-kill daemon on this platform");
305 }
306
307 let _ = std::fs::remove_file(daemon_pid_path(project_root));
308}
309
310pub fn start_team(project_root: &Path, attach: bool) -> Result<String> {
315 let config_path = team_config_path(project_root);
316 if !config_path.exists() {
317 bail!(
318 "no team config found at {}; run `batty init` first",
319 config_path.display()
320 );
321 }
322
323 let team_config = config::TeamConfig::load(&config_path)?;
324 team_config.validate()?;
325
326 let members = hierarchy::resolve_hierarchy(&team_config)?;
327 let session = format!("batty-{}", team_config.name);
328
329 if tmux::session_exists(&session) {
330 bail!("session '{session}' already exists; use `batty attach` or `batty stop` first");
331 }
332
333 layout::build_layout(
334 &session,
335 &members,
336 &team_config.layout,
337 project_root,
338 team_config.workflow_mode,
339 team_config.orchestrator_enabled(),
340 team_config.orchestrator_position,
341 )?;
342
343 let inboxes = inbox::inboxes_root(project_root);
345 for member in &members {
346 inbox::init_inbox(&inboxes, &member.name)?;
347 }
348
349 let marker = resume_marker_path(project_root);
351 let resume = marker.exists() || should_resume_from_daemon_state(project_root);
352 if resume {
353 if marker.exists() {
354 std::fs::remove_file(&marker).ok();
356 }
357 info!("resuming agent sessions from previous run");
358 }
359
360 info!(session = %session, members = members.len(), resume, "team session started");
361
362 let pid = spawn_daemon(project_root, resume)?;
364 info!(pid, "daemon process launched");
365
366 std::thread::sleep(std::time::Duration::from_secs(2));
368
369 if attach {
370 tmux::attach(&session)?;
371 }
372
373 Ok(session)
374}
375
376pub fn run_daemon(project_root: &Path, resume: bool) -> Result<()> {
380 let config_path = team_config_path(project_root);
381 if !config_path.exists() {
382 bail!(
383 "no team config found at {}; run `batty init` first",
384 config_path.display()
385 );
386 }
387
388 let team_config = config::TeamConfig::load(&config_path)?;
389 let members = hierarchy::resolve_hierarchy(&team_config)?;
390 let session = format!("batty-{}", team_config.name);
391
392 for _ in 0..30 {
394 if tmux::session_exists(&session) {
395 break;
396 }
397 std::thread::sleep(std::time::Duration::from_millis(200));
398 }
399
400 if !tmux::session_exists(&session) {
401 bail!("tmux session '{session}' not found — did `batty start` create it?");
402 }
403
404 let mut pane_map = std::collections::HashMap::new();
406 for member in &members {
407 if let Some(pane_id) = find_pane_for_member(&session, &member.name) {
409 pane_map.insert(member.name.clone(), pane_id);
410 }
411 }
412
413 let daemon_config = daemon::DaemonConfig {
414 project_root: project_root.to_path_buf(),
415 team_config,
416 session,
417 members,
418 pane_map,
419 };
420
421 let events_path = project_root
422 .join(".batty")
423 .join("team_config")
424 .join("events.jsonl");
425
426 let mut d = daemon::TeamDaemon::new(daemon_config)?;
427
428 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| d.run(resume)));
430
431 match result {
432 Ok(Ok(())) => Ok(()),
433 Ok(Err(e)) => {
434 eprintln!("daemon exited with error: {e:#}");
435 if let Ok(mut sink) = events::EventSink::new(&events_path) {
437 let _ = sink.emit(events::TeamEvent::daemon_stopped_with_reason(
438 &format!("error: {e:#}"),
439 0,
440 ));
441 }
442 Err(e)
443 }
444 Err(panic_payload) => {
445 let reason = match panic_payload.downcast_ref::<&str>() {
446 Some(s) => s.to_string(),
447 None => match panic_payload.downcast_ref::<String>() {
448 Some(s) => s.clone(),
449 None => "unknown panic".to_string(),
450 },
451 };
452 eprintln!("daemon panicked: {reason}");
453 if let Ok(mut sink) = events::EventSink::new(&events_path) {
455 let _ = sink.emit(events::TeamEvent::daemon_panic(&reason));
456 }
457 std::panic::resume_unwind(panic_payload);
458 }
459 }
460}
461
462fn find_pane_for_member(session: &str, member_name: &str) -> Option<String> {
464 let output = std::process::Command::new("tmux")
465 .args([
466 "list-panes",
467 "-t",
468 session,
469 "-F",
470 "#{pane_id} #{@batty_role}",
471 ])
472 .output()
473 .ok()?;
474
475 if !output.status.success() {
476 return None;
477 }
478
479 let stdout = String::from_utf8_lossy(&output.stdout);
480 for line in stdout.lines() {
481 let parts: Vec<&str> = line.splitn(2, ' ').collect();
482 if parts.len() == 2 && parts[1] == member_name {
483 return Some(parts[0].to_string());
484 }
485 }
486 None
487}
488
489pub(super) fn resume_marker_path(project_root: &Path) -> PathBuf {
491 project_root.join(".batty").join("resume")
492}
493
494#[derive(Debug, Deserialize)]
495struct DaemonStateResumeProbe {
496 #[serde(default)]
497 clean_shutdown: bool,
498 #[serde(default)]
499 saved_at: Option<u64>,
500}
501
502fn read_daemon_state_probe(project_root: &Path) -> Option<DaemonStateResumeProbe> {
503 let path = daemon_state_path(project_root);
504 let content = std::fs::read_to_string(&path).ok()?;
505
506 match serde_json::from_str::<DaemonStateResumeProbe>(&content) {
507 Ok(state) => Some(state),
508 Err(error) => {
509 warn!(
510 path = %path.display(),
511 error = %error,
512 "failed to parse daemon state while probing for resume"
513 );
514 None
515 }
516 }
517}
518
519fn daemon_state_indicates_clean_shutdown(
520 project_root: &Path,
521 previous_saved_at: Option<u64>,
522) -> bool {
523 let Some(state) = read_daemon_state_probe(project_root) else {
524 return false;
525 };
526
527 state.clean_shutdown
528 && match (state.saved_at, previous_saved_at) {
529 (Some(saved_at), Some(previous_saved_at)) => saved_at > previous_saved_at,
530 (Some(_), None) => true,
531 (None, Some(_)) => false,
532 (None, None) => true,
533 }
534}
535
536fn should_resume_from_daemon_state(project_root: &Path) -> bool {
537 read_daemon_state_probe(project_root)
538 .map(|state| !state.clean_shutdown)
539 .unwrap_or(false)
540}
541
542#[cfg(test)]
543mod tests {
544 use super::*;
545 use serial_test::serial;
546
547 #[test]
548 fn daemon_state_probe_requests_resume_after_unclean_shutdown() {
549 let tmp = tempfile::tempdir().unwrap();
550 let path = daemon_state_path(tmp.path());
551 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
552 std::fs::write(&path, r#"{"clean_shutdown":false}"#).unwrap();
553
554 assert!(should_resume_from_daemon_state(tmp.path()));
555 }
556
557 #[test]
558 fn daemon_state_probe_ignores_clean_shutdown() {
559 let tmp = tempfile::tempdir().unwrap();
560 let path = daemon_state_path(tmp.path());
561 std::fs::create_dir_all(path.parent().unwrap()).unwrap();
562 std::fs::write(&path, r#"{"clean_shutdown":true}"#).unwrap();
563
564 assert!(!should_resume_from_daemon_state(tmp.path()));
565 }
566
567 #[cfg(unix)]
568 fn write_daemon_script(script_path: &Path, body: &str) {
569 std::fs::write(script_path, body).unwrap();
570 use std::os::unix::fs::PermissionsExt;
571 std::fs::set_permissions(script_path, std::fs::Permissions::from_mode(0o755)).unwrap();
572 }
573
574 #[cfg(unix)]
575 #[test]
576 #[serial]
577 fn graceful_daemon_shutdown_waits_for_clean_snapshot() {
578 let tmp = tempfile::tempdir().unwrap();
579 let state_path = daemon_state_path(tmp.path());
580 let state_dir = state_path.parent().unwrap();
581 std::fs::create_dir_all(state_dir).unwrap();
582 std::fs::write(&state_path, r#"{"clean_shutdown":false,"saved_at":1}"#).unwrap();
583
584 let state_path_for_thread = state_path.clone();
585 let state_dir_for_thread = state_dir.to_path_buf();
586 let writer = std::thread::spawn(move || {
587 std::thread::sleep(Duration::from_millis(200));
588 std::fs::create_dir_all(&state_dir_for_thread).unwrap();
589 std::fs::write(
590 &state_path_for_thread,
591 r#"{"clean_shutdown":true,"saved_at":2}"#,
592 )
593 .unwrap();
594 });
595
596 assert!(wait_for_graceful_daemon_shutdown(
597 tmp.path(),
598 std::process::id(),
599 Some(1),
600 Duration::from_secs(2)
601 ));
602
603 writer.join().unwrap();
604 assert!(daemon_state_indicates_clean_shutdown(tmp.path(), Some(1)));
605 }
606
607 #[cfg(unix)]
608 #[test]
609 #[serial]
610 fn graceful_daemon_shutdown_times_out_before_force_kill_fallback() {
611 let tmp = tempfile::tempdir().unwrap();
612 let script_path = tmp.path().join("stubborn-daemon.sh");
613 write_daemon_script(
614 &script_path,
615 "#!/bin/sh\ntrap '' TERM\nwhile :; do :; done\n",
616 );
617
618 let mut child = std::process::Command::new(&script_path).spawn().unwrap();
619 std::fs::create_dir_all(tmp.path().join(".batty")).unwrap();
620 std::fs::write(daemon_pid_path(tmp.path()), child.id().to_string()).unwrap();
621 std::thread::sleep(Duration::from_millis(200));
622
623 assert!(!request_graceful_daemon_shutdown(
624 tmp.path(),
625 Duration::from_millis(300)
626 ));
627 assert!(daemon_process_exists(child.id()));
628
629 force_kill_daemon(tmp.path());
630 let _ = child.wait().unwrap();
631 assert!(!daemon_pid_path(tmp.path()).exists());
632 }
633
634 #[test]
635 fn test_rotate_log_shifts_files() {
636 let tmp = tempfile::tempdir().unwrap();
637 let log_path = daemon_log_path(tmp.path());
638 std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
639 std::fs::write(&log_path, b"current").unwrap();
640 std::fs::write(rotated_log_path(&log_path, 1), b"older-1").unwrap();
641 std::fs::write(rotated_log_path(&log_path, 2), b"older-2").unwrap();
642 std::fs::OpenOptions::new()
643 .write(true)
644 .open(&log_path)
645 .unwrap()
646 .set_len(LOG_ROTATION_BYTES + 1)
647 .unwrap();
648
649 rotate_log_if_needed(&log_path).unwrap();
650
651 assert!(!log_path.exists());
652 assert_eq!(
653 std::fs::read(rotated_log_path(&log_path, 1)).unwrap().len() as u64,
654 LOG_ROTATION_BYTES + 1
655 );
656 assert_eq!(
657 std::fs::read_to_string(rotated_log_path(&log_path, 2)).unwrap(),
658 "older-1"
659 );
660 assert_eq!(
661 std::fs::read_to_string(rotated_log_path(&log_path, 3)).unwrap(),
662 "older-2"
663 );
664 }
665
666 #[test]
667 fn test_rotate_log_keeps_max_3() {
668 let tmp = tempfile::tempdir().unwrap();
669 let log_path = crate::team::orchestrator_log_path(tmp.path());
670 std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
671 std::fs::write(&log_path, b"current").unwrap();
672 std::fs::write(rotated_log_path(&log_path, 1), b"older-1").unwrap();
673 std::fs::write(rotated_log_path(&log_path, 2), b"older-2").unwrap();
674 std::fs::write(rotated_log_path(&log_path, 3), b"older-3").unwrap();
675 std::fs::OpenOptions::new()
676 .write(true)
677 .open(&log_path)
678 .unwrap()
679 .set_len(LOG_ROTATION_BYTES + 1)
680 .unwrap();
681
682 rotate_log_if_needed(&log_path).unwrap();
683
684 assert_eq!(
685 std::fs::read(rotated_log_path(&log_path, 1)).unwrap().len() as u64,
686 LOG_ROTATION_BYTES + 1
687 );
688 assert_eq!(
689 std::fs::read_to_string(rotated_log_path(&log_path, 2)).unwrap(),
690 "older-1"
691 );
692 assert_eq!(
693 std::fs::read_to_string(rotated_log_path(&log_path, 3)).unwrap(),
694 "older-2"
695 );
696 assert!(!rotated_log_path(&log_path, 4).exists());
697 }
698
699 #[test]
700 fn test_rotate_log_noop_under_threshold() {
701 let tmp = tempfile::tempdir().unwrap();
702 let log_path = daemon_log_path(tmp.path());
703 std::fs::create_dir_all(log_path.parent().unwrap()).unwrap();
704 std::fs::write(&log_path, b"small-log").unwrap();
705
706 rotate_log_if_needed(&log_path).unwrap();
707
708 assert_eq!(std::fs::read_to_string(&log_path).unwrap(), "small-log");
709 assert!(!rotated_log_path(&log_path, 1).exists());
710 }
711
712 #[test]
713 fn test_daemon_log_append_mode() {
714 let tmp = tempfile::tempdir().unwrap();
715 let log_path = daemon_log_path(tmp.path());
716
717 {
718 let mut file = open_log_for_append(&log_path).unwrap();
719 use std::io::Write;
720 writeln!(file, "first").unwrap();
721 }
722
723 {
724 let mut file = open_log_for_append(&log_path).unwrap();
725 use std::io::Write;
726 writeln!(file, "second").unwrap();
727 }
728
729 assert_eq!(
730 std::fs::read_to_string(&log_path).unwrap(),
731 "first\nsecond\n"
732 );
733 }
734
735 #[test]
736 fn daemon_spawn_args_include_verbose_and_resume() {
737 assert_eq!(
738 daemon_spawn_args("/tmp/project", false),
739 vec![
740 "-v".to_string(),
741 "daemon".to_string(),
742 "--project-root".to_string(),
743 "/tmp/project".to_string()
744 ]
745 );
746 assert_eq!(
747 daemon_spawn_args("/tmp/project", true),
748 vec![
749 "-v".to_string(),
750 "daemon".to_string(),
751 "--project-root".to_string(),
752 "/tmp/project".to_string(),
753 "--resume".to_string()
754 ]
755 );
756 }
757}