Skip to main content

batty_cli/team/daemon/
poll.rs

1//! Main daemon poll loop — signal handling, subsystem sequencing, heartbeat.
2
3use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::config_reload::ConfigReloadMonitor;
9use super::hot_reload::HotReloadMonitor;
10use super::tick_report::TickReport;
11use super::{TeamDaemon, standup, status};
12use crate::team;
13use crate::team::config::RoleType;
14use crate::tmux;
15
16impl TeamDaemon {
17    /// Run the daemon loop. Blocks until the session is killed or an error occurs.
18    ///
19    /// If `resume` is true, agents are launched with session-resume flags
20    /// (`claude --resume <session-id>` / `codex resume --last`) instead of fresh starts.
21    pub fn run(&mut self, resume: bool) -> Result<()> {
22        self.record_daemon_started();
23        let is_hot_reload = self.acknowledge_hot_reload_marker();
24        info!(session = %self.config.session, resume, "daemon started");
25        self.record_orchestrator_action(format!(
26            "runtime: orchestrator started (mode={}, resume={resume})",
27            self.config.team_config.workflow_mode.as_str()
28        ));
29
30        // Install signal handler so we log clean shutdowns
31        let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
32        let flag_clone = shutdown_flag.clone();
33        if let Err(e) = ctrlc::set_handler(move || {
34            flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
35        }) {
36            warn!(error = %e, "failed to install signal handler");
37        }
38
39        self.run_startup_preflight()?;
40
41        // Spawn agents in all panes
42        self.spawn_all_agents(resume)?;
43        if resume {
44            self.restore_runtime_state();
45        }
46        // After a hot-reload, agents are freshly spawned and have no memory of
47        // their prior tasks. Clear active_tasks so the board becomes the source
48        // of truth again; reconcile will not reconstruct in-progress ownership
49        // from stale worktree branches.
50        if is_hot_reload {
51            info!(
52                cleared = self.active_tasks.len(),
53                "hot-reload: clearing active_tasks to rely on board state after restart"
54            );
55            self.active_tasks.clear();
56        }
57        self.persist_runtime_state(false)?;
58
59        let started_at = Instant::now();
60        let heartbeat_interval = Duration::from_secs(300); // 5 minutes
61        let mut last_heartbeat = Instant::now();
62        let mut hot_reload = match HotReloadMonitor::for_current_exe() {
63            Ok(monitor) => Some(monitor),
64            Err(error) => {
65                warn!(error = %error, "failed to initialize daemon hot-reload monitor");
66                None
67            }
68        };
69        let config_path = team::team_config_path(&self.config.project_root);
70        let mut config_reload = match ConfigReloadMonitor::new(&config_path) {
71            Ok(monitor) => Some(monitor),
72            Err(error) => {
73                warn!(error = %error, "failed to initialize config reload monitor");
74                None
75            }
76        };
77
78        // Main polling loop
79        let shutdown_reason;
80        loop {
81            // Check for signal-based shutdown
82            if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
83                shutdown_reason = "signal";
84                info!("received shutdown signal");
85                break;
86            }
87
88            if !tmux::session_exists(&self.config.session) {
89                shutdown_reason = "session_gone";
90                info!("tmux session gone, shutting down");
91                break;
92            }
93
94            // Run one productive iteration of the daemon's work. Hot-reload
95            // and heartbeat persistence stay outside of `tick()` because
96            // they need run-loop state (the HotReloadMonitor / ConfigReloadMonitor
97            // owned by `run()`).
98            let _tick_report = self.tick();
99
100            // Hot-reload checks: kept in `run()` because they own the
101            // monitor handles. They reuse the same recoverable-step error
102            // handling so a panic is logged, not fatal.
103            self.run_recoverable_step("maybe_reload_binary", |daemon| {
104                daemon.maybe_hot_reload_binary(hot_reload.as_mut())
105            });
106            self.run_recoverable_step("maybe_reload_config", |daemon| {
107                daemon.maybe_hot_reload_config(config_reload.as_mut())
108            });
109
110            // Periodic heartbeat
111            if last_heartbeat.elapsed() >= heartbeat_interval {
112                let uptime = started_at.elapsed().as_secs();
113                self.record_daemon_heartbeat(uptime);
114                if let Err(error) = self.persist_runtime_state(false) {
115                    warn!(error = %error, "failed to persist daemon checkpoint");
116                }
117                debug!(uptime_secs = uptime, "daemon heartbeat");
118                last_heartbeat = Instant::now();
119            }
120
121            std::thread::sleep(self.poll_interval);
122        }
123
124        // Graceful shutdown of all shim subprocesses
125        self.shutdown_all_shims();
126
127        // Save shim state for session resume
128        if let Err(error) = self.save_shim_state() {
129            warn!(error = %error, "failed to save shim state for resume");
130        }
131
132        let uptime = started_at.elapsed().as_secs();
133        if let Err(error) = self.persist_runtime_state(true) {
134            warn!(error = %error, "failed to persist final daemon checkpoint");
135        }
136        self.record_daemon_stopped(shutdown_reason, uptime);
137        Ok(())
138    }
139
140    /// Run one iteration of the daemon's productive work without sleeping
141    /// or touching hot-reload state. This is the inner body of `run()`'s
142    /// loop, factored out so tests (and a future `batty debug tick`
143    /// subcommand) can drive the daemon a single step at a time.
144    ///
145    /// Returns a [`TickReport`] capturing observable side effects of the
146    /// tick. Phase 1 populates `cycle` and `subsystem_errors`; the other
147    /// fields are placeholders that the scenario framework fills in by
148    /// snapshotting state around the call.
149    pub fn tick(&mut self) -> TickReport {
150        // Reset per-tick error capture so the returned report only reflects
151        // failures that happened during this call.
152        self.current_tick_errors.clear();
153
154        self.poll_cycle_count = self.poll_cycle_count.saturating_add(1);
155
156        // -- Recoverable subsystems: log-and-skip with consecutive-failure tracking --
157        self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
158        self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
159        self.run_recoverable_step("check_working_state_timeouts", |daemon| {
160            daemon.check_working_state_timeouts()
161        });
162        self.run_recoverable_step("check_narration_loops", |daemon| {
163            daemon.check_narration_loops()
164        });
165        self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
166            daemon.sync_launch_state_session_ids()
167        });
168        self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
169            daemon.drain_legacy_command_queue()
170        });
171
172        // -- Critical subsystems: errors logged but no consecutive-failure tracking --
173        self.run_loop_step("deliver_inbox_messages", |daemon| {
174            daemon.deliver_inbox_messages()
175        });
176        self.run_loop_step("retry_failed_deliveries", |daemon| {
177            daemon.retry_failed_deliveries()
178        });
179        self.run_recoverable_step("expire_stale_pending_messages", |daemon| {
180            daemon.expire_stale_pending_messages()
181        });
182
183        // -- Recoverable subsystems --
184        self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
185            daemon.maybe_intervene_triage_backlog()
186        });
187        self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
188            daemon.maybe_intervene_owned_tasks()
189        });
190        self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
191            daemon.maybe_intervene_review_backlog()
192        });
193        self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
194            daemon.maybe_escalate_stale_reviews()
195        });
196        self.run_recoverable_step("maybe_emit_task_aging_alerts", |daemon| {
197            daemon.maybe_emit_task_aging_alerts()
198        });
199        self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
200            daemon.maybe_auto_unblock_blocked_tasks()
201        });
202        self.run_recoverable_step("process_merge_queue", |daemon| daemon.process_merge_queue());
203
204        // -- Critical subsystems --
205        self.run_loop_step("reconcile_active_tasks", |daemon| {
206            daemon.reconcile_active_tasks()
207        });
208        self.run_loop_step("maybe_manage_task_claim_ttls", |daemon| {
209            daemon.maybe_manage_task_claim_ttls()
210        });
211        self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
212        self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
213            daemon.maybe_recycle_cron_tasks()
214        });
215
216        // -- Recoverable subsystems --
217        self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
218            daemon.maybe_intervene_manager_dispatch_gap()
219        });
220        self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
221            daemon.maybe_intervene_architect_utilization()
222        });
223        self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
224            daemon.maybe_intervene_board_replenishment()
225        });
226        self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
227            daemon.maybe_detect_pipeline_starvation()
228        });
229        self.run_recoverable_step("tact_check", |daemon| daemon.tact_check());
230
231        // -- Recoverable with catch_unwind (panic-safe) --
232        self.run_optional_subsystem_step("process_discord_queue", "discord", |daemon| {
233            daemon.process_discord_queue()
234        });
235        self.run_optional_subsystem_step("process_telegram_queue", "telegram", |daemon| {
236            daemon.process_telegram_queue()
237        });
238        self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
239        self.run_recoverable_step("check_backend_health", |daemon| {
240            daemon.check_backend_health()
241        });
242        self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
243            daemon.maybe_reconcile_stale_worktrees()
244        });
245        self.run_recoverable_step("check_worktree_staleness", |daemon| {
246            daemon.check_worktree_staleness()
247        });
248        self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
249            daemon.maybe_warn_uncommitted_work()
250        });
251        self.run_recoverable_step("maybe_cleanup_shared_cargo_target", |daemon| {
252            daemon.maybe_cleanup_shared_cargo_target()
253        });
254        self.run_recoverable_step("maybe_run_disk_hygiene", |daemon| {
255            daemon.maybe_run_disk_hygiene()
256        });
257        self.run_recoverable_step("record_parity_snapshot", |daemon| {
258            if daemon.config.team_config.automation.clean_room_mode {
259                daemon.sync_cleanroom_specs()?;
260                if let Ok(report) =
261                    crate::team::parity::ParityReport::load(&daemon.config.project_root)
262                {
263                    daemon.record_parity_updated(&report.summary());
264                }
265                crate::team::parity::sync_gap_tasks(&daemon.config.project_root)?;
266            }
267            Ok(())
268        });
269        self.run_optional_subsystem_step("maybe_generate_standup", "standup", |daemon| {
270            let generated = standup::maybe_generate_standup(standup::StandupGenerationContext {
271                project_root: &daemon.config.project_root,
272                team_config: &daemon.config.team_config,
273                members: &daemon.config.members,
274                watchers: &daemon.watchers,
275                states: &daemon.states,
276                pane_map: &daemon.config.pane_map,
277                telegram_bot: daemon.telegram_bot.as_ref(),
278                paused_standups: &daemon.paused_standups,
279                last_standup: &mut daemon.last_standup,
280                backend_health: &daemon.backend_health,
281            })?;
282            for recipient in generated {
283                daemon.record_standup_generated(&recipient);
284            }
285            Ok(())
286        });
287        self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
288        self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
289        self.run_recoverable_step("run_auto_doctor", |daemon| {
290            daemon.run_auto_doctor().map(|_| ())
291        });
292        self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
293            daemon.maybe_generate_retrospective()
294        });
295        self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
296            daemon.maybe_notify_failure_patterns()
297        });
298        status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
299            project_root: &self.config.project_root,
300            members: &self.config.members,
301            pane_map: &self.config.pane_map,
302            states: &self.states,
303            nudges: &self.nudges,
304            last_standup: &self.last_standup,
305            paused_standups: &self.paused_standups,
306            standup_interval_for_member: |member_name| {
307                standup::standup_interval_for_member_name(
308                    &self.config.team_config,
309                    &self.config.members,
310                    member_name,
311                )
312            },
313        });
314
315        // Drain the per-tick error buffer into a fresh report.
316        let mut report = TickReport::new(self.poll_cycle_count);
317        report.subsystem_errors = std::mem::take(&mut self.current_tick_errors);
318        report
319    }
320
321    /// Send Shutdown to all active shim handles, wait for exit, fall back to Kill.
322    fn shutdown_all_shims(&mut self) {
323        self.warn_members_about_shutdown();
324
325        if self.shim_handles.is_empty() {
326            return;
327        }
328
329        let warning_secs = self
330            .config
331            .team_config
332            .workflow_policy
333            .graceful_shutdown_timeout_secs;
334        self.warn_agents_of_shutdown(warning_secs);
335        self.preserve_work_before_shutdown();
336
337        let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
338        info!(
339            count = self.shim_handles.len(),
340            timeout_secs, "sending graceful shutdown to shim subprocesses"
341        );
342
343        // Phase 1: Send Shutdown command to all handles
344        let names: Vec<String> = self.shim_handles.keys().cloned().collect();
345        for name in &names {
346            if let Some(handle) = self.shim_handles.get_mut(name) {
347                if handle.is_terminal() {
348                    continue;
349                }
350                if let Err(error) = handle.send_shutdown(timeout_secs) {
351                    warn!(
352                        member = name.as_str(),
353                        error = %error,
354                        "failed to send shim shutdown, sending kill"
355                    );
356                    let _ = handle.send_kill();
357                }
358            }
359        }
360
361        // Phase 2: Wait for child processes to exit within the timeout
362        let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
363        let mut pids: Vec<(String, u32)> = names
364            .iter()
365            .filter_map(|name| {
366                self.shim_handles
367                    .get(name)
368                    .filter(|h| !h.is_terminal())
369                    .map(|h| (name.clone(), h.child_pid))
370            })
371            .collect();
372
373        while !pids.is_empty() && Instant::now() < deadline {
374            pids.retain(|(name, pid)| {
375                // Check if process still alive via kill(0)
376                let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
377                if !alive {
378                    debug!(member = name.as_str(), pid, "shim process exited cleanly");
379                }
380                alive
381            });
382            if !pids.is_empty() {
383                std::thread::sleep(Duration::from_millis(100));
384            }
385        }
386
387        // Phase 3: Force-kill any survivors
388        for (name, pid) in &pids {
389            warn!(
390                member = name.as_str(),
391                pid, "shim did not exit within timeout, sending Kill"
392            );
393            if let Some(handle) = self.shim_handles.get_mut(name) {
394                let _ = handle.send_kill();
395            }
396            // Also send SIGKILL directly
397            unsafe {
398                libc::kill(*pid as i32, libc::SIGKILL);
399            }
400        }
401    }
402
403    fn warn_agents_of_shutdown(&mut self, warning_secs: u64) {
404        let body = format!("Shutting down in {warning_secs}s — commit your work now");
405        let mut delivered = 0usize;
406
407        for (member_name, handle) in self.shim_handles.iter_mut() {
408            if handle.is_terminal() || !handle.is_ready() {
409                debug!(
410                    member = member_name.as_str(),
411                    state = %handle.state,
412                    "skipping shutdown warning because agent is not ready for live delivery"
413                );
414                continue;
415            }
416
417            match handle.send_message("daemon", &body) {
418                Ok(()) => {
419                    delivered += 1;
420                    let _ = crate::team::append_shim_event_log(
421                        &self.config.project_root,
422                        member_name,
423                        &format!("-> daemon: {body}"),
424                    );
425                }
426                Err(error) => {
427                    warn!(
428                        member = member_name.as_str(),
429                        error = %error,
430                        "failed to send live shutdown warning"
431                    );
432                }
433            }
434        }
435
436        info!(
437            warning_secs,
438            delivered, "sent live shutdown warning to ready agents"
439        );
440        if warning_secs > 0 {
441            std::thread::sleep(Duration::from_secs(warning_secs));
442        }
443    }
444
445    fn preserve_work_before_shutdown(&mut self) {
446        let names: Vec<String> = self
447            .config
448            .members
449            .iter()
450            .filter(|member| member.use_worktrees)
451            .map(|member| member.name.clone())
452            .collect();
453        for member_name in names {
454            let worktree = self.worktree_dir(&member_name);
455            self.preserve_worktree_before_restart(&member_name, &worktree, "daemon shutdown");
456        }
457    }
458
459    fn warn_members_about_shutdown(&mut self) {
460        let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
461        let recipients: Vec<String> = self
462            .config
463            .members
464            .iter()
465            .filter(|member| member.role_type != RoleType::User)
466            .map(|member| member.name.clone())
467            .collect();
468        if recipients.is_empty() {
469            return;
470        }
471
472        let warning = format!("Shutting down in {timeout_secs}s - commit your work now");
473        info!(
474            recipients = recipients.len(),
475            timeout_secs, "warning members before shutdown"
476        );
477        for recipient in recipients {
478            let delivery_result = if let Some(handle) = self.shim_handles.get_mut(&recipient) {
479                if handle.is_terminal() {
480                    Ok(())
481                } else {
482                    handle.send_message("daemon", &warning)
483                }
484            } else {
485                self.queue_message("daemon", &recipient, &warning)
486            };
487
488            if let Err(error) = delivery_result {
489                warn!(
490                    member = recipient.as_str(),
491                    error = %error,
492                    "failed to send shutdown warning"
493                );
494            }
495        }
496
497        if timeout_secs > 0 {
498            std::thread::sleep(Duration::from_secs(timeout_secs as u64));
499        }
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use crate::team::test_support::TestDaemonBuilder;
506
507    /// Ticket #636 acceptance test: invoking `tick()` on an empty daemon
508    /// produces a `Default`-shaped report (cycle advances, no errors, all
509    /// observability vecs empty). This pins the `TickReport` contract for
510    /// later phases of the scenario framework.
511    #[test]
512    fn tick_on_empty_daemon_returns_default_shaped_report() {
513        let tmp = tempfile::tempdir().unwrap();
514        // Bootstrap the board tasks directory so subsystems that read it
515        // (owned-tasks intervention, review backlog, auto-unblock, cron
516        // recycling, manager dispatch gap, architect utilization, board
517        // replenishment, pipeline starvation) see an empty-but-valid
518        // board instead of logging a read-directory failure.
519        let tasks_dir = tmp.path().join(".batty/team_config/board/tasks");
520        std::fs::create_dir_all(&tasks_dir).unwrap();
521
522        let mut daemon = TestDaemonBuilder::new(tmp.path()).build();
523
524        let report = daemon.tick();
525
526        assert_eq!(report.cycle, 1, "first tick should bump cycle to 1");
527        assert!(
528            report.subsystem_errors.is_empty(),
529            "empty daemon should record no subsystem errors, got {:?}",
530            report.subsystem_errors
531        );
532        assert!(report.events_emitted.is_empty());
533        assert!(report.state_transitions.is_empty());
534        assert!(report.main_advanced_to.is_none());
535        assert!(report.inbox_delivered.is_empty());
536        assert!(report.tasks_transitioned.is_empty());
537        assert!(report.ok(), "report.ok() should be true with no errors");
538
539        // Ticking again advances the cycle counter monotonically.
540        let second = daemon.tick();
541        assert_eq!(second.cycle, 2, "second tick should bump cycle to 2");
542        assert!(second.ok());
543    }
544}