1use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::config_reload::ConfigReloadMonitor;
9use super::hot_reload::HotReloadMonitor;
10use super::{TeamDaemon, standup, status};
11use crate::team;
12use crate::tmux;
13
14impl TeamDaemon {
15 pub fn run(&mut self, resume: bool) -> Result<()> {
20 self.record_daemon_started();
21 let is_hot_reload = self.acknowledge_hot_reload_marker();
22 info!(session = %self.config.session, resume, "daemon started");
23 self.record_orchestrator_action(format!(
24 "runtime: orchestrator started (mode={}, resume={resume})",
25 self.config.team_config.workflow_mode.as_str()
26 ));
27
28 let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
30 let flag_clone = shutdown_flag.clone();
31 if let Err(e) = ctrlc::set_handler(move || {
32 flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
33 }) {
34 warn!(error = %e, "failed to install signal handler");
35 }
36
37 self.run_startup_preflight()?;
38
39 self.spawn_all_agents(resume)?;
41 if resume {
42 self.restore_runtime_state();
43 }
44 if is_hot_reload {
49 info!(
50 cleared = self.active_tasks.len(),
51 "hot-reload: clearing active_tasks to rely on board state after restart"
52 );
53 self.active_tasks.clear();
54 }
55 self.persist_runtime_state(false)?;
56
57 let started_at = Instant::now();
58 let heartbeat_interval = Duration::from_secs(300); let mut last_heartbeat = Instant::now();
60 let mut hot_reload = match HotReloadMonitor::for_current_exe() {
61 Ok(monitor) => Some(monitor),
62 Err(error) => {
63 warn!(error = %error, "failed to initialize daemon hot-reload monitor");
64 None
65 }
66 };
67 let config_path = team::team_config_path(&self.config.project_root);
68 let mut config_reload = match ConfigReloadMonitor::new(&config_path) {
69 Ok(monitor) => Some(monitor),
70 Err(error) => {
71 warn!(error = %error, "failed to initialize config reload monitor");
72 None
73 }
74 };
75
76 let shutdown_reason;
78 loop {
79 if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
81 shutdown_reason = "signal";
82 info!("received shutdown signal");
83 break;
84 }
85
86 if !tmux::session_exists(&self.config.session) {
87 shutdown_reason = "session_gone";
88 info!("tmux session gone, shutting down");
89 break;
90 }
91
92 self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
94 self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
95 self.run_recoverable_step("check_working_state_timeouts", |daemon| {
96 daemon.check_working_state_timeouts()
97 });
98 self.run_recoverable_step("check_narration_loops", |daemon| {
99 daemon.check_narration_loops()
100 });
101 self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
102 daemon.sync_launch_state_session_ids()
103 });
104 self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
105 daemon.drain_legacy_command_queue()
106 });
107
108 self.run_loop_step("deliver_inbox_messages", |daemon| {
110 daemon.deliver_inbox_messages()
111 });
112 self.run_loop_step("retry_failed_deliveries", |daemon| {
113 daemon.retry_failed_deliveries()
114 });
115 self.run_recoverable_step("expire_stale_pending_messages", |daemon| {
116 daemon.expire_stale_pending_messages()
117 });
118
119 self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
121 daemon.maybe_intervene_triage_backlog()
122 });
123 self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
124 daemon.maybe_intervene_owned_tasks()
125 });
126 self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
127 daemon.maybe_intervene_review_backlog()
128 });
129 self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
130 daemon.maybe_escalate_stale_reviews()
131 });
132 self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
133 daemon.maybe_auto_unblock_blocked_tasks()
134 });
135
136 self.run_loop_step("reconcile_active_tasks", |daemon| {
138 daemon.reconcile_active_tasks()
139 });
140 self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
141 self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
142 daemon.maybe_recycle_cron_tasks()
143 });
144
145 self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
147 daemon.maybe_intervene_manager_dispatch_gap()
148 });
149 self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
150 daemon.maybe_intervene_architect_utilization()
151 });
152 self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
153 daemon.maybe_intervene_board_replenishment()
154 });
155 self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
156 daemon.maybe_detect_pipeline_starvation()
157 });
158 self.run_recoverable_step("maybe_trigger_planning_cycle", |daemon| {
159 daemon.maybe_trigger_planning_cycle()
160 });
161
162 self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
164 daemon.process_telegram_queue()
165 });
166 self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
167 self.run_recoverable_step("check_backend_health", |daemon| {
168 daemon.check_backend_health()
169 });
170 self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
171 daemon.maybe_reconcile_stale_worktrees()
172 });
173 self.run_recoverable_step("check_worktree_staleness", |daemon| {
174 daemon.check_worktree_staleness()
175 });
176 self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
177 daemon.maybe_warn_uncommitted_work()
178 });
179 self.run_recoverable_step("maybe_cleanup_shared_cargo_target", |daemon| {
180 daemon.maybe_cleanup_shared_cargo_target()
181 });
182 self.run_recoverable_step("record_parity_snapshot", |daemon| {
183 if daemon.config.team_config.automation.clean_room_mode {
184 daemon.sync_cleanroom_specs()?;
185 if let Ok(report) =
186 crate::team::parity::ParityReport::load(&daemon.config.project_root)
187 {
188 daemon.record_parity_updated(&report.summary());
189 }
190 crate::team::parity::sync_gap_tasks(&daemon.config.project_root)?;
191 }
192 Ok(())
193 });
194 self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
195 let generated =
196 standup::maybe_generate_standup(standup::StandupGenerationContext {
197 project_root: &daemon.config.project_root,
198 team_config: &daemon.config.team_config,
199 members: &daemon.config.members,
200 watchers: &daemon.watchers,
201 states: &daemon.states,
202 pane_map: &daemon.config.pane_map,
203 telegram_bot: daemon.telegram_bot.as_ref(),
204 paused_standups: &daemon.paused_standups,
205 last_standup: &mut daemon.last_standup,
206 backend_health: &daemon.backend_health,
207 })?;
208 for recipient in generated {
209 daemon.record_standup_generated(&recipient);
210 }
211 Ok(())
212 });
213 self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
214 self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
215 self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
216 daemon.maybe_generate_retrospective()
217 });
218 self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
219 daemon.maybe_notify_failure_patterns()
220 });
221 self.run_recoverable_step("maybe_reload_binary", |daemon| {
222 daemon.maybe_hot_reload_binary(hot_reload.as_mut())
223 });
224 self.run_recoverable_step("maybe_reload_config", |daemon| {
225 daemon.maybe_hot_reload_config(config_reload.as_mut())
226 });
227 status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
228 project_root: &self.config.project_root,
229 members: &self.config.members,
230 pane_map: &self.config.pane_map,
231 states: &self.states,
232 nudges: &self.nudges,
233 last_standup: &self.last_standup,
234 paused_standups: &self.paused_standups,
235 standup_interval_for_member: |member_name| {
236 standup::standup_interval_for_member_name(
237 &self.config.team_config,
238 &self.config.members,
239 member_name,
240 )
241 },
242 });
243
244 if last_heartbeat.elapsed() >= heartbeat_interval {
246 let uptime = started_at.elapsed().as_secs();
247 self.record_daemon_heartbeat(uptime);
248 if let Err(error) = self.persist_runtime_state(false) {
249 warn!(error = %error, "failed to persist daemon checkpoint");
250 }
251 debug!(uptime_secs = uptime, "daemon heartbeat");
252 last_heartbeat = Instant::now();
253 }
254
255 std::thread::sleep(self.poll_interval);
256 }
257
258 self.shutdown_all_shims();
260
261 if let Err(error) = self.save_shim_state() {
263 warn!(error = %error, "failed to save shim state for resume");
264 }
265
266 let uptime = started_at.elapsed().as_secs();
267 if let Err(error) = self.persist_runtime_state(true) {
268 warn!(error = %error, "failed to persist final daemon checkpoint");
269 }
270 self.record_daemon_stopped(shutdown_reason, uptime);
271 Ok(())
272 }
273
274 fn shutdown_all_shims(&mut self) {
276 if self.shim_handles.is_empty() {
277 return;
278 }
279
280 self.preserve_work_before_shutdown();
281
282 let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
283 info!(
284 count = self.shim_handles.len(),
285 timeout_secs, "sending graceful shutdown to shim subprocesses"
286 );
287
288 let names: Vec<String> = self.shim_handles.keys().cloned().collect();
290 for name in &names {
291 if let Some(handle) = self.shim_handles.get_mut(name) {
292 if handle.is_terminal() {
293 continue;
294 }
295 if let Err(error) = handle.send_shutdown(timeout_secs) {
296 warn!(
297 member = name.as_str(),
298 error = %error,
299 "failed to send shim shutdown, sending kill"
300 );
301 let _ = handle.send_kill();
302 }
303 }
304 }
305
306 let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
308 let mut pids: Vec<(String, u32)> = names
309 .iter()
310 .filter_map(|name| {
311 self.shim_handles
312 .get(name)
313 .filter(|h| !h.is_terminal())
314 .map(|h| (name.clone(), h.child_pid))
315 })
316 .collect();
317
318 while !pids.is_empty() && Instant::now() < deadline {
319 pids.retain(|(name, pid)| {
320 let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
322 if !alive {
323 debug!(member = name.as_str(), pid, "shim process exited cleanly");
324 }
325 alive
326 });
327 if !pids.is_empty() {
328 std::thread::sleep(Duration::from_millis(100));
329 }
330 }
331
332 for (name, pid) in &pids {
334 warn!(
335 member = name.as_str(),
336 pid, "shim did not exit within timeout, sending Kill"
337 );
338 if let Some(handle) = self.shim_handles.get_mut(name) {
339 let _ = handle.send_kill();
340 }
341 unsafe {
343 libc::kill(*pid as i32, libc::SIGKILL);
344 }
345 }
346 }
347
348 fn preserve_work_before_shutdown(&self) {
349 let names: Vec<String> = self
350 .config
351 .members
352 .iter()
353 .filter(|member| member.use_worktrees)
354 .map(|member| member.name.clone())
355 .collect();
356 for member_name in names {
357 let worktree = self.worktree_dir(&member_name);
358 self.preserve_worktree_before_restart(&member_name, &worktree, "daemon shutdown");
359 }
360 }
361}