Skip to main content

batty_cli/team/daemon/
poll.rs

1//! Main daemon poll loop — signal handling, subsystem sequencing, heartbeat.
2
3use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::config_reload::ConfigReloadMonitor;
9use super::hot_reload::HotReloadMonitor;
10use super::{TeamDaemon, standup, status};
11use crate::team;
12use crate::tmux;
13
14impl TeamDaemon {
15    /// Run the daemon loop. Blocks until the session is killed or an error occurs.
16    ///
17    /// If `resume` is true, agents are launched with session-resume flags
18    /// (`claude --resume <session-id>` / `codex resume --last`) instead of fresh starts.
19    pub fn run(&mut self, resume: bool) -> Result<()> {
20        self.record_daemon_started();
21        let is_hot_reload = self.acknowledge_hot_reload_marker();
22        info!(session = %self.config.session, resume, "daemon started");
23        self.record_orchestrator_action(format!(
24            "runtime: orchestrator started (mode={}, resume={resume})",
25            self.config.team_config.workflow_mode.as_str()
26        ));
27
28        // Install signal handler so we log clean shutdowns
29        let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
30        let flag_clone = shutdown_flag.clone();
31        if let Err(e) = ctrlc::set_handler(move || {
32            flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
33        }) {
34            warn!(error = %e, "failed to install signal handler");
35        }
36
37        self.run_startup_preflight()?;
38
39        // Spawn agents in all panes
40        self.spawn_all_agents(resume)?;
41        if resume {
42            self.restore_runtime_state();
43        }
44        // After a hot-reload, agents are freshly spawned and have no memory of
45        // their prior tasks. Clear active_tasks so replay_owned_tasks_for_idle_engineers
46        // can immediately re-dispatch tasks on the next poll cycle, instead of
47        // waiting for the 20-minute intervention grace period.
48        if is_hot_reload {
49            info!(
50                cleared = self.active_tasks.len(),
51                "hot-reload: clearing active_tasks to allow immediate task replay"
52            );
53            self.active_tasks.clear();
54        }
55        self.persist_runtime_state(false)?;
56
57        let started_at = Instant::now();
58        let heartbeat_interval = Duration::from_secs(300); // 5 minutes
59        let mut last_heartbeat = Instant::now();
60        let mut hot_reload = match HotReloadMonitor::for_current_exe() {
61            Ok(monitor) => Some(monitor),
62            Err(error) => {
63                warn!(error = %error, "failed to initialize daemon hot-reload monitor");
64                None
65            }
66        };
67        let config_path = team::team_config_path(&self.config.project_root);
68        let mut config_reload = match ConfigReloadMonitor::new(&config_path) {
69            Ok(monitor) => Some(monitor),
70            Err(error) => {
71                warn!(error = %error, "failed to initialize config reload monitor");
72                None
73            }
74        };
75
76        // Main polling loop
77        let shutdown_reason;
78        loop {
79            // Check for signal-based shutdown
80            if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
81                shutdown_reason = "signal";
82                info!("received shutdown signal");
83                break;
84            }
85
86            if !tmux::session_exists(&self.config.session) {
87                shutdown_reason = "session_gone";
88                info!("tmux session gone, shutting down");
89                break;
90            }
91
92            // -- Recoverable subsystems: log-and-skip with consecutive-failure tracking --
93            self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
94            self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
95            self.run_recoverable_step("check_working_state_timeouts", |daemon| {
96                daemon.check_working_state_timeouts()
97            });
98            self.run_recoverable_step("check_narration_loops", |daemon| {
99                daemon.check_narration_loops()
100            });
101            self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
102                daemon.sync_launch_state_session_ids()
103            });
104            self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
105                daemon.drain_legacy_command_queue()
106            });
107
108            // -- Critical subsystems: errors logged but no consecutive-failure tracking --
109            self.run_loop_step("deliver_inbox_messages", |daemon| {
110                daemon.deliver_inbox_messages()
111            });
112            self.run_loop_step("retry_failed_deliveries", |daemon| {
113                daemon.retry_failed_deliveries()
114            });
115            self.run_recoverable_step("expire_stale_pending_messages", |daemon| {
116                daemon.expire_stale_pending_messages()
117            });
118
119            // -- Recoverable subsystems --
120            self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
121                daemon.maybe_intervene_triage_backlog()
122            });
123            self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
124                daemon.maybe_intervene_owned_tasks()
125            });
126            self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
127                daemon.maybe_intervene_review_backlog()
128            });
129            self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
130                daemon.maybe_escalate_stale_reviews()
131            });
132            self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
133                daemon.maybe_auto_unblock_blocked_tasks()
134            });
135
136            // -- Critical subsystems --
137            self.run_loop_step("reconcile_active_tasks", |daemon| {
138                daemon.reconcile_active_tasks()
139            });
140            self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
141            self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
142                daemon.maybe_recycle_cron_tasks()
143            });
144
145            // -- Recoverable subsystems --
146            self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
147                daemon.maybe_intervene_manager_dispatch_gap()
148            });
149            self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
150                daemon.maybe_intervene_architect_utilization()
151            });
152            self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
153                daemon.maybe_intervene_board_replenishment()
154            });
155            self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
156                daemon.maybe_detect_pipeline_starvation()
157            });
158            self.run_recoverable_step("maybe_trigger_planning_cycle", |daemon| {
159                daemon.maybe_trigger_planning_cycle()
160            });
161
162            // -- Recoverable with catch_unwind (panic-safe) --
163            self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
164                daemon.process_telegram_queue()
165            });
166            self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
167            self.run_recoverable_step("check_backend_health", |daemon| {
168                daemon.check_backend_health()
169            });
170            self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
171                daemon.maybe_reconcile_stale_worktrees()
172            });
173            self.run_recoverable_step("check_worktree_staleness", |daemon| {
174                daemon.check_worktree_staleness()
175            });
176            self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
177                daemon.maybe_warn_uncommitted_work()
178            });
179            self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
180                let generated =
181                    standup::maybe_generate_standup(standup::StandupGenerationContext {
182                        project_root: &daemon.config.project_root,
183                        team_config: &daemon.config.team_config,
184                        members: &daemon.config.members,
185                        watchers: &daemon.watchers,
186                        states: &daemon.states,
187                        pane_map: &daemon.config.pane_map,
188                        telegram_bot: daemon.telegram_bot.as_ref(),
189                        paused_standups: &daemon.paused_standups,
190                        last_standup: &mut daemon.last_standup,
191                        backend_health: &daemon.backend_health,
192                    })?;
193                for recipient in generated {
194                    daemon.record_standup_generated(&recipient);
195                }
196                Ok(())
197            });
198            self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
199            self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
200            self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
201                daemon.maybe_generate_retrospective()
202            });
203            self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
204                daemon.maybe_notify_failure_patterns()
205            });
206            self.run_recoverable_step("maybe_reload_binary", |daemon| {
207                daemon.maybe_hot_reload_binary(hot_reload.as_mut())
208            });
209            self.run_recoverable_step("maybe_reload_config", |daemon| {
210                daemon.maybe_hot_reload_config(config_reload.as_mut())
211            });
212            status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
213                project_root: &self.config.project_root,
214                members: &self.config.members,
215                pane_map: &self.config.pane_map,
216                states: &self.states,
217                nudges: &self.nudges,
218                last_standup: &self.last_standup,
219                paused_standups: &self.paused_standups,
220                standup_interval_for_member: |member_name| {
221                    standup::standup_interval_for_member_name(
222                        &self.config.team_config,
223                        &self.config.members,
224                        member_name,
225                    )
226                },
227            });
228
229            // Periodic heartbeat
230            if last_heartbeat.elapsed() >= heartbeat_interval {
231                let uptime = started_at.elapsed().as_secs();
232                self.record_daemon_heartbeat(uptime);
233                if let Err(error) = self.persist_runtime_state(false) {
234                    warn!(error = %error, "failed to persist daemon checkpoint");
235                }
236                debug!(uptime_secs = uptime, "daemon heartbeat");
237                last_heartbeat = Instant::now();
238            }
239
240            std::thread::sleep(self.poll_interval);
241        }
242
243        // Graceful shutdown of all shim subprocesses
244        self.shutdown_all_shims();
245
246        // Save shim state for session resume
247        if let Err(error) = self.save_shim_state() {
248            warn!(error = %error, "failed to save shim state for resume");
249        }
250
251        let uptime = started_at.elapsed().as_secs();
252        if let Err(error) = self.persist_runtime_state(true) {
253            warn!(error = %error, "failed to persist final daemon checkpoint");
254        }
255        self.record_daemon_stopped(shutdown_reason, uptime);
256        Ok(())
257    }
258
259    /// Send Shutdown to all active shim handles, wait for exit, fall back to Kill.
260    fn shutdown_all_shims(&mut self) {
261        if self.shim_handles.is_empty() {
262            return;
263        }
264
265        let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
266        info!(
267            count = self.shim_handles.len(),
268            timeout_secs, "sending graceful shutdown to shim subprocesses"
269        );
270
271        // Phase 1: Send Shutdown command to all handles
272        let names: Vec<String> = self.shim_handles.keys().cloned().collect();
273        for name in &names {
274            if let Some(handle) = self.shim_handles.get_mut(name) {
275                if handle.is_terminal() {
276                    continue;
277                }
278                if let Err(error) = handle.send_shutdown(timeout_secs) {
279                    warn!(
280                        member = name.as_str(),
281                        error = %error,
282                        "failed to send shim shutdown, sending kill"
283                    );
284                    let _ = handle.send_kill();
285                }
286            }
287        }
288
289        // Phase 2: Wait for child processes to exit within the timeout
290        let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
291        let mut pids: Vec<(String, u32)> = names
292            .iter()
293            .filter_map(|name| {
294                self.shim_handles
295                    .get(name)
296                    .filter(|h| !h.is_terminal())
297                    .map(|h| (name.clone(), h.child_pid))
298            })
299            .collect();
300
301        while !pids.is_empty() && Instant::now() < deadline {
302            pids.retain(|(name, pid)| {
303                // Check if process still alive via kill(0)
304                let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
305                if !alive {
306                    debug!(member = name.as_str(), pid, "shim process exited cleanly");
307                }
308                alive
309            });
310            if !pids.is_empty() {
311                std::thread::sleep(Duration::from_millis(100));
312            }
313        }
314
315        // Phase 3: Force-kill any survivors
316        for (name, pid) in &pids {
317            warn!(
318                member = name.as_str(),
319                pid, "shim did not exit within timeout, sending Kill"
320            );
321            if let Some(handle) = self.shim_handles.get_mut(name) {
322                let _ = handle.send_kill();
323            }
324            // Also send SIGKILL directly
325            unsafe {
326                libc::kill(*pid as i32, libc::SIGKILL);
327            }
328        }
329    }
330}