Skip to main content

batty_cli/team/daemon/
poll.rs

1//! Main daemon poll loop — signal handling, subsystem sequencing, heartbeat.
2
3use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::config_reload::ConfigReloadMonitor;
9use super::hot_reload::HotReloadMonitor;
10use super::{TeamDaemon, standup, status};
11use crate::team;
12use crate::tmux;
13
14impl TeamDaemon {
15    /// Run the daemon loop. Blocks until the session is killed or an error occurs.
16    ///
17    /// If `resume` is true, agents are launched with session-resume flags
18    /// (`claude --resume <session-id>` / `codex resume --last`) instead of fresh starts.
19    pub fn run(&mut self, resume: bool) -> Result<()> {
20        self.record_daemon_started();
21        self.acknowledge_hot_reload_marker();
22        info!(session = %self.config.session, resume, "daemon started");
23        self.record_orchestrator_action(format!(
24            "runtime: orchestrator started (mode={}, resume={resume})",
25            self.config.team_config.workflow_mode.as_str()
26        ));
27
28        // Install signal handler so we log clean shutdowns
29        let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
30        let flag_clone = shutdown_flag.clone();
31        if let Err(e) = ctrlc::set_handler(move || {
32            flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
33        }) {
34            warn!(error = %e, "failed to install signal handler");
35        }
36
37        self.run_startup_preflight()?;
38
39        // Spawn agents in all panes
40        self.spawn_all_agents(resume)?;
41        if resume {
42            self.restore_runtime_state();
43        }
44        self.persist_runtime_state(false)?;
45
46        let started_at = Instant::now();
47        let heartbeat_interval = Duration::from_secs(300); // 5 minutes
48        let mut last_heartbeat = Instant::now();
49        let mut hot_reload = match HotReloadMonitor::for_current_exe() {
50            Ok(monitor) => Some(monitor),
51            Err(error) => {
52                warn!(error = %error, "failed to initialize daemon hot-reload monitor");
53                None
54            }
55        };
56        let config_path = team::team_config_path(&self.config.project_root);
57        let mut config_reload = match ConfigReloadMonitor::new(&config_path) {
58            Ok(monitor) => Some(monitor),
59            Err(error) => {
60                warn!(error = %error, "failed to initialize config reload monitor");
61                None
62            }
63        };
64
65        // Main polling loop
66        let shutdown_reason;
67        loop {
68            // Check for signal-based shutdown
69            if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
70                shutdown_reason = "signal";
71                info!("received shutdown signal");
72                break;
73            }
74
75            if !tmux::session_exists(&self.config.session) {
76                shutdown_reason = "session_gone";
77                info!("tmux session gone, shutting down");
78                break;
79            }
80
81            // -- Recoverable subsystems: log-and-skip with consecutive-failure tracking --
82            self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
83            self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
84            self.run_recoverable_step("check_working_state_timeouts", |daemon| {
85                daemon.check_working_state_timeouts()
86            });
87            self.run_recoverable_step("check_narration_loops", |daemon| {
88                daemon.check_narration_loops()
89            });
90            self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
91                daemon.sync_launch_state_session_ids()
92            });
93            self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
94                daemon.drain_legacy_command_queue()
95            });
96
97            // -- Critical subsystems: errors logged but no consecutive-failure tracking --
98            self.run_loop_step("deliver_inbox_messages", |daemon| {
99                daemon.deliver_inbox_messages()
100            });
101            self.run_loop_step("retry_failed_deliveries", |daemon| {
102                daemon.retry_failed_deliveries()
103            });
104            self.run_recoverable_step("expire_stale_pending_messages", |daemon| {
105                daemon.expire_stale_pending_messages()
106            });
107
108            // -- Recoverable subsystems --
109            self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
110                daemon.maybe_intervene_triage_backlog()
111            });
112            self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
113                daemon.maybe_intervene_owned_tasks()
114            });
115            self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
116                daemon.maybe_intervene_review_backlog()
117            });
118            self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
119                daemon.maybe_escalate_stale_reviews()
120            });
121            self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
122                daemon.maybe_auto_unblock_blocked_tasks()
123            });
124
125            // -- Critical subsystems --
126            self.run_loop_step("reconcile_active_tasks", |daemon| {
127                daemon.reconcile_active_tasks()
128            });
129            self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
130            self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
131                daemon.maybe_recycle_cron_tasks()
132            });
133
134            // -- Recoverable subsystems --
135            self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
136                daemon.maybe_intervene_manager_dispatch_gap()
137            });
138            self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
139                daemon.maybe_intervene_architect_utilization()
140            });
141            self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
142                daemon.maybe_intervene_board_replenishment()
143            });
144            self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
145                daemon.maybe_detect_pipeline_starvation()
146            });
147            self.run_recoverable_step("maybe_trigger_planning_cycle", |daemon| {
148                daemon.maybe_trigger_planning_cycle()
149            });
150
151            // -- Recoverable with catch_unwind (panic-safe) --
152            self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
153                daemon.process_telegram_queue()
154            });
155            self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
156            self.run_recoverable_step("check_backend_health", |daemon| {
157                daemon.check_backend_health()
158            });
159            self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
160                daemon.maybe_reconcile_stale_worktrees()
161            });
162            self.run_recoverable_step("check_worktree_staleness", |daemon| {
163                daemon.check_worktree_staleness()
164            });
165            self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
166                daemon.maybe_warn_uncommitted_work()
167            });
168            self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
169                let generated =
170                    standup::maybe_generate_standup(standup::StandupGenerationContext {
171                        project_root: &daemon.config.project_root,
172                        team_config: &daemon.config.team_config,
173                        members: &daemon.config.members,
174                        watchers: &daemon.watchers,
175                        states: &daemon.states,
176                        pane_map: &daemon.config.pane_map,
177                        telegram_bot: daemon.telegram_bot.as_ref(),
178                        paused_standups: &daemon.paused_standups,
179                        last_standup: &mut daemon.last_standup,
180                        backend_health: &daemon.backend_health,
181                    })?;
182                for recipient in generated {
183                    daemon.record_standup_generated(&recipient);
184                }
185                Ok(())
186            });
187            self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
188            self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
189            self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
190                daemon.maybe_generate_retrospective()
191            });
192            self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
193                daemon.maybe_notify_failure_patterns()
194            });
195            self.run_recoverable_step("maybe_reload_binary", |daemon| {
196                daemon.maybe_hot_reload_binary(hot_reload.as_mut())
197            });
198            self.run_recoverable_step("maybe_reload_config", |daemon| {
199                daemon.maybe_hot_reload_config(config_reload.as_mut())
200            });
201            status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
202                project_root: &self.config.project_root,
203                members: &self.config.members,
204                pane_map: &self.config.pane_map,
205                states: &self.states,
206                nudges: &self.nudges,
207                last_standup: &self.last_standup,
208                paused_standups: &self.paused_standups,
209                standup_interval_for_member: |member_name| {
210                    standup::standup_interval_for_member_name(
211                        &self.config.team_config,
212                        &self.config.members,
213                        member_name,
214                    )
215                },
216            });
217
218            // Periodic heartbeat
219            if last_heartbeat.elapsed() >= heartbeat_interval {
220                let uptime = started_at.elapsed().as_secs();
221                self.record_daemon_heartbeat(uptime);
222                if let Err(error) = self.persist_runtime_state(false) {
223                    warn!(error = %error, "failed to persist daemon checkpoint");
224                }
225                debug!(uptime_secs = uptime, "daemon heartbeat");
226                last_heartbeat = Instant::now();
227            }
228
229            std::thread::sleep(self.poll_interval);
230        }
231
232        // Graceful shutdown of all shim subprocesses
233        self.shutdown_all_shims();
234
235        // Save shim state for session resume
236        if let Err(error) = self.save_shim_state() {
237            warn!(error = %error, "failed to save shim state for resume");
238        }
239
240        let uptime = started_at.elapsed().as_secs();
241        if let Err(error) = self.persist_runtime_state(true) {
242            warn!(error = %error, "failed to persist final daemon checkpoint");
243        }
244        self.record_daemon_stopped(shutdown_reason, uptime);
245        Ok(())
246    }
247
248    /// Send Shutdown to all active shim handles, wait for exit, fall back to Kill.
249    fn shutdown_all_shims(&mut self) {
250        if self.shim_handles.is_empty() {
251            return;
252        }
253
254        let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
255        info!(
256            count = self.shim_handles.len(),
257            timeout_secs, "sending graceful shutdown to shim subprocesses"
258        );
259
260        // Phase 1: Send Shutdown command to all handles
261        let names: Vec<String> = self.shim_handles.keys().cloned().collect();
262        for name in &names {
263            if let Some(handle) = self.shim_handles.get_mut(name) {
264                if handle.is_terminal() {
265                    continue;
266                }
267                if let Err(error) = handle.send_shutdown(timeout_secs) {
268                    warn!(
269                        member = name.as_str(),
270                        error = %error,
271                        "failed to send shim shutdown, sending kill"
272                    );
273                    let _ = handle.send_kill();
274                }
275            }
276        }
277
278        // Phase 2: Wait for child processes to exit within the timeout
279        let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
280        let mut pids: Vec<(String, u32)> = names
281            .iter()
282            .filter_map(|name| {
283                self.shim_handles
284                    .get(name)
285                    .filter(|h| !h.is_terminal())
286                    .map(|h| (name.clone(), h.child_pid))
287            })
288            .collect();
289
290        while !pids.is_empty() && Instant::now() < deadline {
291            pids.retain(|(name, pid)| {
292                // Check if process still alive via kill(0)
293                let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
294                if !alive {
295                    debug!(member = name.as_str(), pid, "shim process exited cleanly");
296                }
297                alive
298            });
299            if !pids.is_empty() {
300                std::thread::sleep(Duration::from_millis(100));
301            }
302        }
303
304        // Phase 3: Force-kill any survivors
305        for (name, pid) in &pids {
306            warn!(
307                member = name.as_str(),
308                pid, "shim did not exit within timeout, sending Kill"
309            );
310            if let Some(handle) = self.shim_handles.get_mut(name) {
311                let _ = handle.send_kill();
312            }
313            // Also send SIGKILL directly
314            unsafe {
315                libc::kill(*pid as i32, libc::SIGKILL);
316            }
317        }
318    }
319}