Skip to main content

batty_cli/team/daemon/
poll.rs

1//! Main daemon poll loop — signal handling, subsystem sequencing, heartbeat.
2
3use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::hot_reload::HotReloadMonitor;
9use super::{TeamDaemon, standup, status};
10use crate::tmux;
11
12impl TeamDaemon {
13    /// Run the daemon loop. Blocks until the session is killed or an error occurs.
14    ///
15    /// If `resume` is true, agents are launched with session-resume flags
16    /// (`claude --resume <session-id>` / `codex resume --last`) instead of fresh starts.
17    pub fn run(&mut self, resume: bool) -> Result<()> {
18        self.record_daemon_started();
19        self.acknowledge_hot_reload_marker();
20        info!(session = %self.config.session, resume, "daemon started");
21        self.record_orchestrator_action(format!(
22            "runtime: orchestrator started (mode={}, resume={resume})",
23            self.config.team_config.workflow_mode.as_str()
24        ));
25
26        // Install signal handler so we log clean shutdowns
27        let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
28        let flag_clone = shutdown_flag.clone();
29        if let Err(e) = ctrlc::set_handler(move || {
30            flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
31        }) {
32            warn!(error = %e, "failed to install signal handler");
33        }
34
35        self.run_startup_preflight()?;
36
37        // Spawn agents in all panes
38        self.spawn_all_agents(resume)?;
39        if resume {
40            self.restore_runtime_state();
41        }
42        self.persist_runtime_state(false)?;
43
44        let started_at = Instant::now();
45        let heartbeat_interval = Duration::from_secs(300); // 5 minutes
46        let mut last_heartbeat = Instant::now();
47        let mut hot_reload = match HotReloadMonitor::for_current_exe() {
48            Ok(monitor) => Some(monitor),
49            Err(error) => {
50                warn!(error = %error, "failed to initialize daemon hot-reload monitor");
51                None
52            }
53        };
54
55        // Main polling loop
56        let shutdown_reason;
57        loop {
58            // Check for signal-based shutdown
59            if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
60                shutdown_reason = "signal";
61                info!("received shutdown signal");
62                break;
63            }
64
65            if !tmux::session_exists(&self.config.session) {
66                shutdown_reason = "session_gone";
67                info!("tmux session gone, shutting down");
68                break;
69            }
70
71            // -- Recoverable subsystems: log-and-skip with consecutive-failure tracking --
72            self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
73            self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
74            self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
75                daemon.sync_launch_state_session_ids()
76            });
77            self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
78                daemon.drain_legacy_command_queue()
79            });
80
81            // -- Critical subsystems: errors logged but no consecutive-failure tracking --
82            self.run_loop_step("deliver_inbox_messages", |daemon| {
83                daemon.deliver_inbox_messages()
84            });
85            self.run_loop_step("retry_failed_deliveries", |daemon| {
86                daemon.retry_failed_deliveries()
87            });
88
89            // -- Recoverable subsystems --
90            self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
91                daemon.maybe_intervene_triage_backlog()
92            });
93            self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
94                daemon.maybe_intervene_owned_tasks()
95            });
96            self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
97                daemon.maybe_intervene_review_backlog()
98            });
99            self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
100                daemon.maybe_escalate_stale_reviews()
101            });
102            self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
103                daemon.maybe_auto_unblock_blocked_tasks()
104            });
105
106            // -- Critical subsystems --
107            self.run_loop_step("reconcile_active_tasks", |daemon| {
108                daemon.reconcile_active_tasks()
109            });
110            self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
111            self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
112                daemon.maybe_recycle_cron_tasks()
113            });
114
115            // -- Recoverable subsystems --
116            self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
117                daemon.maybe_intervene_manager_dispatch_gap()
118            });
119            self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
120                daemon.maybe_intervene_architect_utilization()
121            });
122            self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
123                daemon.maybe_intervene_board_replenishment()
124            });
125            self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
126                daemon.maybe_detect_pipeline_starvation()
127            });
128
129            // -- Recoverable with catch_unwind (panic-safe) --
130            self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
131                daemon.process_telegram_queue()
132            });
133            self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
134            self.run_recoverable_step("check_backend_health", |daemon| {
135                daemon.check_backend_health()
136            });
137            self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
138                daemon.maybe_reconcile_stale_worktrees()
139            });
140            self.run_recoverable_step("check_worktree_staleness", |daemon| {
141                daemon.check_worktree_staleness()
142            });
143            self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
144                daemon.maybe_warn_uncommitted_work()
145            });
146            self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
147                let generated =
148                    standup::maybe_generate_standup(standup::StandupGenerationContext {
149                        project_root: &daemon.config.project_root,
150                        team_config: &daemon.config.team_config,
151                        members: &daemon.config.members,
152                        watchers: &daemon.watchers,
153                        states: &daemon.states,
154                        pane_map: &daemon.config.pane_map,
155                        telegram_bot: daemon.telegram_bot.as_ref(),
156                        paused_standups: &daemon.paused_standups,
157                        last_standup: &mut daemon.last_standup,
158                        backend_health: &daemon.backend_health,
159                    })?;
160                for recipient in generated {
161                    daemon.record_standup_generated(&recipient);
162                }
163                Ok(())
164            });
165            self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
166            self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
167            self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
168                daemon.maybe_generate_retrospective()
169            });
170            self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
171                daemon.maybe_notify_failure_patterns()
172            });
173            self.run_recoverable_step("maybe_reload_binary", |daemon| {
174                daemon.maybe_hot_reload_binary(hot_reload.as_mut())
175            });
176            status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
177                project_root: &self.config.project_root,
178                members: &self.config.members,
179                pane_map: &self.config.pane_map,
180                states: &self.states,
181                nudges: &self.nudges,
182                last_standup: &self.last_standup,
183                paused_standups: &self.paused_standups,
184                standup_interval_for_member: |member_name| {
185                    standup::standup_interval_for_member_name(
186                        &self.config.team_config,
187                        &self.config.members,
188                        member_name,
189                    )
190                },
191            });
192
193            // Periodic heartbeat
194            if last_heartbeat.elapsed() >= heartbeat_interval {
195                let uptime = started_at.elapsed().as_secs();
196                self.record_daemon_heartbeat(uptime);
197                if let Err(error) = self.persist_runtime_state(false) {
198                    warn!(error = %error, "failed to persist daemon checkpoint");
199                }
200                debug!(uptime_secs = uptime, "daemon heartbeat");
201                last_heartbeat = Instant::now();
202            }
203
204            std::thread::sleep(self.poll_interval);
205        }
206
207        // Graceful shutdown of all shim subprocesses
208        self.shutdown_all_shims();
209
210        // Save shim state for session resume
211        if let Err(error) = self.save_shim_state() {
212            warn!(error = %error, "failed to save shim state for resume");
213        }
214
215        let uptime = started_at.elapsed().as_secs();
216        if let Err(error) = self.persist_runtime_state(true) {
217            warn!(error = %error, "failed to persist final daemon checkpoint");
218        }
219        self.record_daemon_stopped(shutdown_reason, uptime);
220        Ok(())
221    }
222
223    /// Send Shutdown to all active shim handles, wait for exit, fall back to Kill.
224    fn shutdown_all_shims(&mut self) {
225        if self.shim_handles.is_empty() {
226            return;
227        }
228
229        let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
230        info!(
231            count = self.shim_handles.len(),
232            timeout_secs, "sending graceful shutdown to shim subprocesses"
233        );
234
235        // Phase 1: Send Shutdown command to all handles
236        let names: Vec<String> = self.shim_handles.keys().cloned().collect();
237        for name in &names {
238            if let Some(handle) = self.shim_handles.get_mut(name) {
239                if handle.is_terminal() {
240                    continue;
241                }
242                if let Err(error) = handle.send_shutdown(timeout_secs) {
243                    warn!(
244                        member = name.as_str(),
245                        error = %error,
246                        "failed to send shim shutdown, sending kill"
247                    );
248                    let _ = handle.send_kill();
249                }
250            }
251        }
252
253        // Phase 2: Wait for child processes to exit within the timeout
254        let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
255        let mut pids: Vec<(String, u32)> = names
256            .iter()
257            .filter_map(|name| {
258                self.shim_handles
259                    .get(name)
260                    .filter(|h| !h.is_terminal())
261                    .map(|h| (name.clone(), h.child_pid))
262            })
263            .collect();
264
265        while !pids.is_empty() && Instant::now() < deadline {
266            pids.retain(|(name, pid)| {
267                // Check if process still alive via kill(0)
268                let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
269                if !alive {
270                    debug!(member = name.as_str(), pid, "shim process exited cleanly");
271                }
272                alive
273            });
274            if !pids.is_empty() {
275                std::thread::sleep(Duration::from_millis(100));
276            }
277        }
278
279        // Phase 3: Force-kill any survivors
280        for (name, pid) in &pids {
281            warn!(
282                member = name.as_str(),
283                pid, "shim did not exit within timeout, sending Kill"
284            );
285            if let Some(handle) = self.shim_handles.get_mut(name) {
286                let _ = handle.send_kill();
287            }
288            // Also send SIGKILL directly
289            unsafe {
290                libc::kill(*pid as i32, libc::SIGKILL);
291            }
292        }
293    }
294}