1use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::config_reload::ConfigReloadMonitor;
9use super::hot_reload::HotReloadMonitor;
10use super::tick_report::TickReport;
11use super::{TeamDaemon, standup, status};
12use crate::team;
13use crate::team::config::RoleType;
14use crate::tmux;
15
16impl TeamDaemon {
17 pub fn run(&mut self, resume: bool) -> Result<()> {
22 self.record_daemon_started();
23 let is_hot_reload = self.acknowledge_hot_reload_marker();
24 info!(session = %self.config.session, resume, "daemon started");
25 self.record_orchestrator_action(format!(
26 "runtime: orchestrator started (mode={}, resume={resume})",
27 self.config.team_config.workflow_mode.as_str()
28 ));
29
30 let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
32 let flag_clone = shutdown_flag.clone();
33 if let Err(e) = ctrlc::set_handler(move || {
34 flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
35 }) {
36 warn!(error = %e, "failed to install signal handler");
37 }
38
39 self.run_startup_preflight()?;
40
41 self.spawn_all_agents(resume)?;
43 if resume {
44 self.restore_runtime_state();
45 }
46 if is_hot_reload {
51 info!(
52 cleared = self.active_tasks.len(),
53 "hot-reload: clearing active_tasks to rely on board state after restart"
54 );
55 self.active_tasks.clear();
56 }
57 self.persist_runtime_state(false)?;
58
59 let started_at = Instant::now();
60 let heartbeat_interval = Duration::from_secs(300); let mut last_heartbeat = Instant::now();
62 let mut hot_reload = match HotReloadMonitor::for_current_exe() {
63 Ok(monitor) => Some(monitor),
64 Err(error) => {
65 warn!(error = %error, "failed to initialize daemon hot-reload monitor");
66 None
67 }
68 };
69 let config_path = team::team_config_path(&self.config.project_root);
70 let mut config_reload = match ConfigReloadMonitor::new(&config_path) {
71 Ok(monitor) => Some(monitor),
72 Err(error) => {
73 warn!(error = %error, "failed to initialize config reload monitor");
74 None
75 }
76 };
77
78 let shutdown_reason;
80 loop {
81 if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
83 shutdown_reason = "signal";
84 info!("received shutdown signal");
85 break;
86 }
87
88 if !tmux::session_exists(&self.config.session) {
89 shutdown_reason = "session_gone";
90 info!("tmux session gone, shutting down");
91 break;
92 }
93
94 let _tick_report = self.tick();
99
100 self.run_recoverable_step("maybe_reload_binary", |daemon| {
104 daemon.maybe_hot_reload_binary(hot_reload.as_mut())
105 });
106 self.run_recoverable_step("maybe_reload_config", |daemon| {
107 daemon.maybe_hot_reload_config(config_reload.as_mut())
108 });
109
110 if last_heartbeat.elapsed() >= heartbeat_interval {
112 let uptime = started_at.elapsed().as_secs();
113 self.record_daemon_heartbeat(uptime);
114 if let Err(error) = self.persist_runtime_state(false) {
115 warn!(error = %error, "failed to persist daemon checkpoint");
116 }
117 debug!(uptime_secs = uptime, "daemon heartbeat");
118 last_heartbeat = Instant::now();
119 }
120
121 std::thread::sleep(self.poll_interval);
122 }
123
124 self.shutdown_all_shims();
126
127 if let Err(error) = self.save_shim_state() {
129 warn!(error = %error, "failed to save shim state for resume");
130 }
131
132 let uptime = started_at.elapsed().as_secs();
133 if let Err(error) = self.persist_runtime_state(true) {
134 warn!(error = %error, "failed to persist final daemon checkpoint");
135 }
136 self.record_daemon_stopped(shutdown_reason, uptime);
137 Ok(())
138 }
139
140 pub fn tick(&mut self) -> TickReport {
150 self.current_tick_errors.clear();
153
154 self.poll_cycle_count = self.poll_cycle_count.saturating_add(1);
155
156 self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
158 self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
159 self.run_recoverable_step("check_working_state_timeouts", |daemon| {
160 daemon.check_working_state_timeouts()
161 });
162 self.run_recoverable_step("check_narration_loops", |daemon| {
163 daemon.check_narration_loops()
164 });
165 self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
166 daemon.sync_launch_state_session_ids()
167 });
168 self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
169 daemon.drain_legacy_command_queue()
170 });
171
172 self.run_loop_step("deliver_inbox_messages", |daemon| {
174 daemon.deliver_inbox_messages()
175 });
176 self.run_loop_step("retry_failed_deliveries", |daemon| {
177 daemon.retry_failed_deliveries()
178 });
179 self.run_recoverable_step("expire_stale_pending_messages", |daemon| {
180 daemon.expire_stale_pending_messages()
181 });
182
183 self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
185 daemon.maybe_intervene_triage_backlog()
186 });
187 self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
188 daemon.maybe_intervene_owned_tasks()
189 });
190 self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
191 daemon.maybe_intervene_review_backlog()
192 });
193 self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
194 daemon.maybe_escalate_stale_reviews()
195 });
196 self.run_recoverable_step("maybe_emit_task_aging_alerts", |daemon| {
197 daemon.maybe_emit_task_aging_alerts()
198 });
199 self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
200 daemon.maybe_auto_unblock_blocked_tasks()
201 });
202 self.run_recoverable_step("process_merge_queue", |daemon| daemon.process_merge_queue());
203
204 self.run_loop_step("reconcile_active_tasks", |daemon| {
206 daemon.reconcile_active_tasks()
207 });
208 self.run_loop_step("maybe_manage_task_claim_ttls", |daemon| {
209 daemon.maybe_manage_task_claim_ttls()
210 });
211 self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
212 self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
213 daemon.maybe_recycle_cron_tasks()
214 });
215
216 self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
218 daemon.maybe_intervene_manager_dispatch_gap()
219 });
220 self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
221 daemon.maybe_intervene_architect_utilization()
222 });
223 self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
224 daemon.maybe_intervene_board_replenishment()
225 });
226 self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
227 daemon.maybe_detect_pipeline_starvation()
228 });
229 self.run_recoverable_step("tact_check", |daemon| daemon.tact_check());
230
231 self.run_optional_subsystem_step("process_discord_queue", "discord", |daemon| {
233 daemon.process_discord_queue()
234 });
235 self.run_optional_subsystem_step("process_telegram_queue", "telegram", |daemon| {
236 daemon.process_telegram_queue()
237 });
238 self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
239 self.run_recoverable_step("check_backend_health", |daemon| {
240 daemon.check_backend_health()
241 });
242 self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
243 daemon.maybe_reconcile_stale_worktrees()
244 });
245 self.run_recoverable_step("check_worktree_staleness", |daemon| {
246 daemon.check_worktree_staleness()
247 });
248 self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
249 daemon.maybe_warn_uncommitted_work()
250 });
251 self.run_recoverable_step("maybe_cleanup_shared_cargo_target", |daemon| {
252 daemon.maybe_cleanup_shared_cargo_target()
253 });
254 self.run_recoverable_step("maybe_run_disk_hygiene", |daemon| {
255 daemon.maybe_run_disk_hygiene()
256 });
257 self.run_recoverable_step("record_parity_snapshot", |daemon| {
258 if daemon.config.team_config.automation.clean_room_mode {
259 daemon.sync_cleanroom_specs()?;
260 if let Ok(report) =
261 crate::team::parity::ParityReport::load(&daemon.config.project_root)
262 {
263 daemon.record_parity_updated(&report.summary());
264 }
265 crate::team::parity::sync_gap_tasks(&daemon.config.project_root)?;
266 }
267 Ok(())
268 });
269 self.run_optional_subsystem_step("maybe_generate_standup", "standup", |daemon| {
270 let generated = standup::maybe_generate_standup(standup::StandupGenerationContext {
271 project_root: &daemon.config.project_root,
272 team_config: &daemon.config.team_config,
273 members: &daemon.config.members,
274 watchers: &daemon.watchers,
275 states: &daemon.states,
276 pane_map: &daemon.config.pane_map,
277 telegram_bot: daemon.telegram_bot.as_ref(),
278 paused_standups: &daemon.paused_standups,
279 last_standup: &mut daemon.last_standup,
280 backend_health: &daemon.backend_health,
281 })?;
282 for recipient in generated {
283 daemon.record_standup_generated(&recipient);
284 }
285 Ok(())
286 });
287 self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
288 self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
289 self.run_recoverable_step("run_auto_doctor", |daemon| {
290 daemon.run_auto_doctor().map(|_| ())
291 });
292 self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
293 daemon.maybe_generate_retrospective()
294 });
295 self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
296 daemon.maybe_notify_failure_patterns()
297 });
298 status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
299 project_root: &self.config.project_root,
300 members: &self.config.members,
301 pane_map: &self.config.pane_map,
302 states: &self.states,
303 nudges: &self.nudges,
304 last_standup: &self.last_standup,
305 paused_standups: &self.paused_standups,
306 standup_interval_for_member: |member_name| {
307 standup::standup_interval_for_member_name(
308 &self.config.team_config,
309 &self.config.members,
310 member_name,
311 )
312 },
313 });
314
315 let mut report = TickReport::new(self.poll_cycle_count);
317 report.subsystem_errors = std::mem::take(&mut self.current_tick_errors);
318 report
319 }
320
321 fn shutdown_all_shims(&mut self) {
323 self.warn_members_about_shutdown();
324
325 if self.shim_handles.is_empty() {
326 return;
327 }
328
329 let warning_secs = self
330 .config
331 .team_config
332 .workflow_policy
333 .graceful_shutdown_timeout_secs;
334 self.warn_agents_of_shutdown(warning_secs);
335 self.preserve_work_before_shutdown();
336
337 let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
338 info!(
339 count = self.shim_handles.len(),
340 timeout_secs, "sending graceful shutdown to shim subprocesses"
341 );
342
343 let names: Vec<String> = self.shim_handles.keys().cloned().collect();
345 for name in &names {
346 if let Some(handle) = self.shim_handles.get_mut(name) {
347 if handle.is_terminal() {
348 continue;
349 }
350 if let Err(error) = handle.send_shutdown(timeout_secs) {
351 warn!(
352 member = name.as_str(),
353 error = %error,
354 "failed to send shim shutdown, sending kill"
355 );
356 let _ = handle.send_kill();
357 }
358 }
359 }
360
361 let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
363 let mut pids: Vec<(String, u32)> = names
364 .iter()
365 .filter_map(|name| {
366 self.shim_handles
367 .get(name)
368 .filter(|h| !h.is_terminal())
369 .map(|h| (name.clone(), h.child_pid))
370 })
371 .collect();
372
373 while !pids.is_empty() && Instant::now() < deadline {
374 pids.retain(|(name, pid)| {
375 let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
377 if !alive {
378 debug!(member = name.as_str(), pid, "shim process exited cleanly");
379 }
380 alive
381 });
382 if !pids.is_empty() {
383 std::thread::sleep(Duration::from_millis(100));
384 }
385 }
386
387 for (name, pid) in &pids {
389 warn!(
390 member = name.as_str(),
391 pid, "shim did not exit within timeout, sending Kill"
392 );
393 if let Some(handle) = self.shim_handles.get_mut(name) {
394 let _ = handle.send_kill();
395 }
396 unsafe {
398 libc::kill(*pid as i32, libc::SIGKILL);
399 }
400 }
401 }
402
403 fn warn_agents_of_shutdown(&mut self, warning_secs: u64) {
404 let body = format!("Shutting down in {warning_secs}s — commit your work now");
405 let mut delivered = 0usize;
406
407 for (member_name, handle) in self.shim_handles.iter_mut() {
408 if handle.is_terminal() || !handle.is_ready() {
409 debug!(
410 member = member_name.as_str(),
411 state = %handle.state,
412 "skipping shutdown warning because agent is not ready for live delivery"
413 );
414 continue;
415 }
416
417 match handle.send_message("daemon", &body) {
418 Ok(()) => {
419 delivered += 1;
420 let _ = crate::team::append_shim_event_log(
421 &self.config.project_root,
422 member_name,
423 &format!("-> daemon: {body}"),
424 );
425 }
426 Err(error) => {
427 warn!(
428 member = member_name.as_str(),
429 error = %error,
430 "failed to send live shutdown warning"
431 );
432 }
433 }
434 }
435
436 info!(
437 warning_secs,
438 delivered, "sent live shutdown warning to ready agents"
439 );
440 if warning_secs > 0 {
441 std::thread::sleep(Duration::from_secs(warning_secs));
442 }
443 }
444
445 fn preserve_work_before_shutdown(&mut self) {
446 let names: Vec<String> = self
447 .config
448 .members
449 .iter()
450 .filter(|member| member.use_worktrees)
451 .map(|member| member.name.clone())
452 .collect();
453 for member_name in names {
454 let worktree = self.worktree_dir(&member_name);
455 self.preserve_worktree_before_restart(&member_name, &worktree, "daemon shutdown");
456 }
457 }
458
459 fn warn_members_about_shutdown(&mut self) {
460 let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
461 let recipients: Vec<String> = self
462 .config
463 .members
464 .iter()
465 .filter(|member| member.role_type != RoleType::User)
466 .map(|member| member.name.clone())
467 .collect();
468 if recipients.is_empty() {
469 return;
470 }
471
472 let warning = format!("Shutting down in {timeout_secs}s - commit your work now");
473 info!(
474 recipients = recipients.len(),
475 timeout_secs, "warning members before shutdown"
476 );
477 for recipient in recipients {
478 let delivery_result = if let Some(handle) = self.shim_handles.get_mut(&recipient) {
479 if handle.is_terminal() {
480 Ok(())
481 } else {
482 handle.send_message("daemon", &warning)
483 }
484 } else {
485 self.queue_message("daemon", &recipient, &warning)
486 };
487
488 if let Err(error) = delivery_result {
489 warn!(
490 member = recipient.as_str(),
491 error = %error,
492 "failed to send shutdown warning"
493 );
494 }
495 }
496
497 if timeout_secs > 0 {
498 std::thread::sleep(Duration::from_secs(timeout_secs as u64));
499 }
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use crate::team::test_support::TestDaemonBuilder;
506
507 #[test]
512 fn tick_on_empty_daemon_returns_default_shaped_report() {
513 let tmp = tempfile::tempdir().unwrap();
514 let tasks_dir = tmp.path().join(".batty/team_config/board/tasks");
520 std::fs::create_dir_all(&tasks_dir).unwrap();
521
522 let mut daemon = TestDaemonBuilder::new(tmp.path()).build();
523
524 let report = daemon.tick();
525
526 assert_eq!(report.cycle, 1, "first tick should bump cycle to 1");
527 assert!(
528 report.subsystem_errors.is_empty(),
529 "empty daemon should record no subsystem errors, got {:?}",
530 report.subsystem_errors
531 );
532 assert!(report.events_emitted.is_empty());
533 assert!(report.state_transitions.is_empty());
534 assert!(report.main_advanced_to.is_none());
535 assert!(report.inbox_delivered.is_empty());
536 assert!(report.tasks_transitioned.is_empty());
537 assert!(report.ok(), "report.ok() should be true with no errors");
538
539 let second = daemon.tick();
541 assert_eq!(second.cycle, 2, "second tick should bump cycle to 2");
542 assert!(second.ok());
543 }
544}