Skip to main content

batty_cli/team/daemon/
poll.rs

1//! Main daemon poll loop — signal handling, subsystem sequencing, heartbeat.
2
3use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::config_reload::ConfigReloadMonitor;
9use super::hot_reload::HotReloadMonitor;
10use super::{TeamDaemon, standup, status};
11use crate::team;
12use crate::tmux;
13
14impl TeamDaemon {
15    /// Run the daemon loop. Blocks until the session is killed or an error occurs.
16    ///
17    /// If `resume` is true, agents are launched with session-resume flags
18    /// (`claude --resume <session-id>` / `codex resume --last`) instead of fresh starts.
19    pub fn run(&mut self, resume: bool) -> Result<()> {
20        self.record_daemon_started();
21        let is_hot_reload = self.acknowledge_hot_reload_marker();
22        info!(session = %self.config.session, resume, "daemon started");
23        self.record_orchestrator_action(format!(
24            "runtime: orchestrator started (mode={}, resume={resume})",
25            self.config.team_config.workflow_mode.as_str()
26        ));
27
28        // Install signal handler so we log clean shutdowns
29        let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
30        let flag_clone = shutdown_flag.clone();
31        if let Err(e) = ctrlc::set_handler(move || {
32            flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
33        }) {
34            warn!(error = %e, "failed to install signal handler");
35        }
36
37        self.run_startup_preflight()?;
38
39        // Spawn agents in all panes
40        self.spawn_all_agents(resume)?;
41        if resume {
42            self.restore_runtime_state();
43        }
44        // After a hot-reload, agents are freshly spawned and have no memory of
45        // their prior tasks. Clear active_tasks so the board becomes the source
46        // of truth again; reconcile will not reconstruct in-progress ownership
47        // from stale worktree branches.
48        if is_hot_reload {
49            info!(
50                cleared = self.active_tasks.len(),
51                "hot-reload: clearing active_tasks to rely on board state after restart"
52            );
53            self.active_tasks.clear();
54        }
55        self.persist_runtime_state(false)?;
56
57        let started_at = Instant::now();
58        let heartbeat_interval = Duration::from_secs(300); // 5 minutes
59        let mut last_heartbeat = Instant::now();
60        let mut hot_reload = match HotReloadMonitor::for_current_exe() {
61            Ok(monitor) => Some(monitor),
62            Err(error) => {
63                warn!(error = %error, "failed to initialize daemon hot-reload monitor");
64                None
65            }
66        };
67        let config_path = team::team_config_path(&self.config.project_root);
68        let mut config_reload = match ConfigReloadMonitor::new(&config_path) {
69            Ok(monitor) => Some(monitor),
70            Err(error) => {
71                warn!(error = %error, "failed to initialize config reload monitor");
72                None
73            }
74        };
75
76        // Main polling loop
77        let shutdown_reason;
78        loop {
79            // Check for signal-based shutdown
80            if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
81                shutdown_reason = "signal";
82                info!("received shutdown signal");
83                break;
84            }
85
86            if !tmux::session_exists(&self.config.session) {
87                shutdown_reason = "session_gone";
88                info!("tmux session gone, shutting down");
89                break;
90            }
91
92            // -- Recoverable subsystems: log-and-skip with consecutive-failure tracking --
93            self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
94            self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
95            self.run_recoverable_step("check_working_state_timeouts", |daemon| {
96                daemon.check_working_state_timeouts()
97            });
98            self.run_recoverable_step("check_narration_loops", |daemon| {
99                daemon.check_narration_loops()
100            });
101            self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
102                daemon.sync_launch_state_session_ids()
103            });
104            self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
105                daemon.drain_legacy_command_queue()
106            });
107
108            // -- Critical subsystems: errors logged but no consecutive-failure tracking --
109            self.run_loop_step("deliver_inbox_messages", |daemon| {
110                daemon.deliver_inbox_messages()
111            });
112            self.run_loop_step("retry_failed_deliveries", |daemon| {
113                daemon.retry_failed_deliveries()
114            });
115            self.run_recoverable_step("expire_stale_pending_messages", |daemon| {
116                daemon.expire_stale_pending_messages()
117            });
118
119            // -- Recoverable subsystems --
120            self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
121                daemon.maybe_intervene_triage_backlog()
122            });
123            self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
124                daemon.maybe_intervene_owned_tasks()
125            });
126            self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
127                daemon.maybe_intervene_review_backlog()
128            });
129            self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
130                daemon.maybe_escalate_stale_reviews()
131            });
132            self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
133                daemon.maybe_auto_unblock_blocked_tasks()
134            });
135
136            // -- Critical subsystems --
137            self.run_loop_step("reconcile_active_tasks", |daemon| {
138                daemon.reconcile_active_tasks()
139            });
140            self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
141            self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
142                daemon.maybe_recycle_cron_tasks()
143            });
144
145            // -- Recoverable subsystems --
146            self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
147                daemon.maybe_intervene_manager_dispatch_gap()
148            });
149            self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
150                daemon.maybe_intervene_architect_utilization()
151            });
152            self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
153                daemon.maybe_intervene_board_replenishment()
154            });
155            self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
156                daemon.maybe_detect_pipeline_starvation()
157            });
158            self.run_recoverable_step("maybe_trigger_planning_cycle", |daemon| {
159                daemon.maybe_trigger_planning_cycle()
160            });
161
162            // -- Recoverable with catch_unwind (panic-safe) --
163            self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
164                daemon.process_telegram_queue()
165            });
166            self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
167            self.run_recoverable_step("check_backend_health", |daemon| {
168                daemon.check_backend_health()
169            });
170            self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
171                daemon.maybe_reconcile_stale_worktrees()
172            });
173            self.run_recoverable_step("check_worktree_staleness", |daemon| {
174                daemon.check_worktree_staleness()
175            });
176            self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
177                daemon.maybe_warn_uncommitted_work()
178            });
179            self.run_recoverable_step("maybe_cleanup_shared_cargo_target", |daemon| {
180                daemon.maybe_cleanup_shared_cargo_target()
181            });
182            self.run_recoverable_step("record_parity_snapshot", |daemon| {
183                if daemon.config.team_config.automation.clean_room_mode {
184                    daemon.sync_cleanroom_specs()?;
185                    if let Ok(report) =
186                        crate::team::parity::ParityReport::load(&daemon.config.project_root)
187                    {
188                        daemon.record_parity_updated(&report.summary());
189                    }
190                    crate::team::parity::sync_gap_tasks(&daemon.config.project_root)?;
191                }
192                Ok(())
193            });
194            self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
195                let generated =
196                    standup::maybe_generate_standup(standup::StandupGenerationContext {
197                        project_root: &daemon.config.project_root,
198                        team_config: &daemon.config.team_config,
199                        members: &daemon.config.members,
200                        watchers: &daemon.watchers,
201                        states: &daemon.states,
202                        pane_map: &daemon.config.pane_map,
203                        telegram_bot: daemon.telegram_bot.as_ref(),
204                        paused_standups: &daemon.paused_standups,
205                        last_standup: &mut daemon.last_standup,
206                        backend_health: &daemon.backend_health,
207                    })?;
208                for recipient in generated {
209                    daemon.record_standup_generated(&recipient);
210                }
211                Ok(())
212            });
213            self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
214            self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
215            self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
216                daemon.maybe_generate_retrospective()
217            });
218            self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
219                daemon.maybe_notify_failure_patterns()
220            });
221            self.run_recoverable_step("maybe_reload_binary", |daemon| {
222                daemon.maybe_hot_reload_binary(hot_reload.as_mut())
223            });
224            self.run_recoverable_step("maybe_reload_config", |daemon| {
225                daemon.maybe_hot_reload_config(config_reload.as_mut())
226            });
227            status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
228                project_root: &self.config.project_root,
229                members: &self.config.members,
230                pane_map: &self.config.pane_map,
231                states: &self.states,
232                nudges: &self.nudges,
233                last_standup: &self.last_standup,
234                paused_standups: &self.paused_standups,
235                standup_interval_for_member: |member_name| {
236                    standup::standup_interval_for_member_name(
237                        &self.config.team_config,
238                        &self.config.members,
239                        member_name,
240                    )
241                },
242            });
243
244            // Periodic heartbeat
245            if last_heartbeat.elapsed() >= heartbeat_interval {
246                let uptime = started_at.elapsed().as_secs();
247                self.record_daemon_heartbeat(uptime);
248                if let Err(error) = self.persist_runtime_state(false) {
249                    warn!(error = %error, "failed to persist daemon checkpoint");
250                }
251                debug!(uptime_secs = uptime, "daemon heartbeat");
252                last_heartbeat = Instant::now();
253            }
254
255            std::thread::sleep(self.poll_interval);
256        }
257
258        // Graceful shutdown of all shim subprocesses
259        self.shutdown_all_shims();
260
261        // Save shim state for session resume
262        if let Err(error) = self.save_shim_state() {
263            warn!(error = %error, "failed to save shim state for resume");
264        }
265
266        let uptime = started_at.elapsed().as_secs();
267        if let Err(error) = self.persist_runtime_state(true) {
268            warn!(error = %error, "failed to persist final daemon checkpoint");
269        }
270        self.record_daemon_stopped(shutdown_reason, uptime);
271        Ok(())
272    }
273
274    /// Send Shutdown to all active shim handles, wait for exit, fall back to Kill.
275    fn shutdown_all_shims(&mut self) {
276        if self.shim_handles.is_empty() {
277            return;
278        }
279
280        self.preserve_work_before_shutdown();
281
282        let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
283        info!(
284            count = self.shim_handles.len(),
285            timeout_secs, "sending graceful shutdown to shim subprocesses"
286        );
287
288        // Phase 1: Send Shutdown command to all handles
289        let names: Vec<String> = self.shim_handles.keys().cloned().collect();
290        for name in &names {
291            if let Some(handle) = self.shim_handles.get_mut(name) {
292                if handle.is_terminal() {
293                    continue;
294                }
295                if let Err(error) = handle.send_shutdown(timeout_secs) {
296                    warn!(
297                        member = name.as_str(),
298                        error = %error,
299                        "failed to send shim shutdown, sending kill"
300                    );
301                    let _ = handle.send_kill();
302                }
303            }
304        }
305
306        // Phase 2: Wait for child processes to exit within the timeout
307        let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
308        let mut pids: Vec<(String, u32)> = names
309            .iter()
310            .filter_map(|name| {
311                self.shim_handles
312                    .get(name)
313                    .filter(|h| !h.is_terminal())
314                    .map(|h| (name.clone(), h.child_pid))
315            })
316            .collect();
317
318        while !pids.is_empty() && Instant::now() < deadline {
319            pids.retain(|(name, pid)| {
320                // Check if process still alive via kill(0)
321                let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
322                if !alive {
323                    debug!(member = name.as_str(), pid, "shim process exited cleanly");
324                }
325                alive
326            });
327            if !pids.is_empty() {
328                std::thread::sleep(Duration::from_millis(100));
329            }
330        }
331
332        // Phase 3: Force-kill any survivors
333        for (name, pid) in &pids {
334            warn!(
335                member = name.as_str(),
336                pid, "shim did not exit within timeout, sending Kill"
337            );
338            if let Some(handle) = self.shim_handles.get_mut(name) {
339                let _ = handle.send_kill();
340            }
341            // Also send SIGKILL directly
342            unsafe {
343                libc::kill(*pid as i32, libc::SIGKILL);
344            }
345        }
346    }
347
348    fn preserve_work_before_shutdown(&self) {
349        let names: Vec<String> = self
350            .config
351            .members
352            .iter()
353            .filter(|member| member.use_worktrees)
354            .map(|member| member.name.clone())
355            .collect();
356        for member_name in names {
357            let worktree = self.worktree_dir(&member_name);
358            self.preserve_worktree_before_restart(&member_name, &worktree, "daemon shutdown");
359        }
360    }
361}