1use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::config_reload::ConfigReloadMonitor;
9use super::hot_reload::HotReloadMonitor;
10use super::{TeamDaemon, standup, status};
11use crate::team;
12use crate::tmux;
13
14impl TeamDaemon {
15 pub fn run(&mut self, resume: bool) -> Result<()> {
20 self.record_daemon_started();
21 self.acknowledge_hot_reload_marker();
22 info!(session = %self.config.session, resume, "daemon started");
23 self.record_orchestrator_action(format!(
24 "runtime: orchestrator started (mode={}, resume={resume})",
25 self.config.team_config.workflow_mode.as_str()
26 ));
27
28 let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
30 let flag_clone = shutdown_flag.clone();
31 if let Err(e) = ctrlc::set_handler(move || {
32 flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
33 }) {
34 warn!(error = %e, "failed to install signal handler");
35 }
36
37 self.run_startup_preflight()?;
38
39 self.spawn_all_agents(resume)?;
41 if resume {
42 self.restore_runtime_state();
43 }
44 self.persist_runtime_state(false)?;
45
46 let started_at = Instant::now();
47 let heartbeat_interval = Duration::from_secs(300); let mut last_heartbeat = Instant::now();
49 let mut hot_reload = match HotReloadMonitor::for_current_exe() {
50 Ok(monitor) => Some(monitor),
51 Err(error) => {
52 warn!(error = %error, "failed to initialize daemon hot-reload monitor");
53 None
54 }
55 };
56 let config_path = team::team_config_path(&self.config.project_root);
57 let mut config_reload = match ConfigReloadMonitor::new(&config_path) {
58 Ok(monitor) => Some(monitor),
59 Err(error) => {
60 warn!(error = %error, "failed to initialize config reload monitor");
61 None
62 }
63 };
64
65 let shutdown_reason;
67 loop {
68 if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
70 shutdown_reason = "signal";
71 info!("received shutdown signal");
72 break;
73 }
74
75 if !tmux::session_exists(&self.config.session) {
76 shutdown_reason = "session_gone";
77 info!("tmux session gone, shutting down");
78 break;
79 }
80
81 self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
83 self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
84 self.run_recoverable_step("check_working_state_timeouts", |daemon| {
85 daemon.check_working_state_timeouts()
86 });
87 self.run_recoverable_step("check_narration_loops", |daemon| {
88 daemon.check_narration_loops()
89 });
90 self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
91 daemon.sync_launch_state_session_ids()
92 });
93 self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
94 daemon.drain_legacy_command_queue()
95 });
96
97 self.run_loop_step("deliver_inbox_messages", |daemon| {
99 daemon.deliver_inbox_messages()
100 });
101 self.run_loop_step("retry_failed_deliveries", |daemon| {
102 daemon.retry_failed_deliveries()
103 });
104 self.run_recoverable_step("expire_stale_pending_messages", |daemon| {
105 daemon.expire_stale_pending_messages()
106 });
107
108 self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
110 daemon.maybe_intervene_triage_backlog()
111 });
112 self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
113 daemon.maybe_intervene_owned_tasks()
114 });
115 self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
116 daemon.maybe_intervene_review_backlog()
117 });
118 self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
119 daemon.maybe_escalate_stale_reviews()
120 });
121 self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
122 daemon.maybe_auto_unblock_blocked_tasks()
123 });
124
125 self.run_loop_step("reconcile_active_tasks", |daemon| {
127 daemon.reconcile_active_tasks()
128 });
129 self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
130 self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
131 daemon.maybe_recycle_cron_tasks()
132 });
133
134 self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
136 daemon.maybe_intervene_manager_dispatch_gap()
137 });
138 self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
139 daemon.maybe_intervene_architect_utilization()
140 });
141 self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
142 daemon.maybe_intervene_board_replenishment()
143 });
144 self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
145 daemon.maybe_detect_pipeline_starvation()
146 });
147 self.run_recoverable_step("maybe_trigger_planning_cycle", |daemon| {
148 daemon.maybe_trigger_planning_cycle()
149 });
150
151 self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
153 daemon.process_telegram_queue()
154 });
155 self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
156 self.run_recoverable_step("check_backend_health", |daemon| {
157 daemon.check_backend_health()
158 });
159 self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
160 daemon.maybe_reconcile_stale_worktrees()
161 });
162 self.run_recoverable_step("check_worktree_staleness", |daemon| {
163 daemon.check_worktree_staleness()
164 });
165 self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
166 daemon.maybe_warn_uncommitted_work()
167 });
168 self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
169 let generated =
170 standup::maybe_generate_standup(standup::StandupGenerationContext {
171 project_root: &daemon.config.project_root,
172 team_config: &daemon.config.team_config,
173 members: &daemon.config.members,
174 watchers: &daemon.watchers,
175 states: &daemon.states,
176 pane_map: &daemon.config.pane_map,
177 telegram_bot: daemon.telegram_bot.as_ref(),
178 paused_standups: &daemon.paused_standups,
179 last_standup: &mut daemon.last_standup,
180 backend_health: &daemon.backend_health,
181 })?;
182 for recipient in generated {
183 daemon.record_standup_generated(&recipient);
184 }
185 Ok(())
186 });
187 self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
188 self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
189 self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
190 daemon.maybe_generate_retrospective()
191 });
192 self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
193 daemon.maybe_notify_failure_patterns()
194 });
195 self.run_recoverable_step("maybe_reload_binary", |daemon| {
196 daemon.maybe_hot_reload_binary(hot_reload.as_mut())
197 });
198 self.run_recoverable_step("maybe_reload_config", |daemon| {
199 daemon.maybe_hot_reload_config(config_reload.as_mut())
200 });
201 status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
202 project_root: &self.config.project_root,
203 members: &self.config.members,
204 pane_map: &self.config.pane_map,
205 states: &self.states,
206 nudges: &self.nudges,
207 last_standup: &self.last_standup,
208 paused_standups: &self.paused_standups,
209 standup_interval_for_member: |member_name| {
210 standup::standup_interval_for_member_name(
211 &self.config.team_config,
212 &self.config.members,
213 member_name,
214 )
215 },
216 });
217
218 if last_heartbeat.elapsed() >= heartbeat_interval {
220 let uptime = started_at.elapsed().as_secs();
221 self.record_daemon_heartbeat(uptime);
222 if let Err(error) = self.persist_runtime_state(false) {
223 warn!(error = %error, "failed to persist daemon checkpoint");
224 }
225 debug!(uptime_secs = uptime, "daemon heartbeat");
226 last_heartbeat = Instant::now();
227 }
228
229 std::thread::sleep(self.poll_interval);
230 }
231
232 self.shutdown_all_shims();
234
235 if let Err(error) = self.save_shim_state() {
237 warn!(error = %error, "failed to save shim state for resume");
238 }
239
240 let uptime = started_at.elapsed().as_secs();
241 if let Err(error) = self.persist_runtime_state(true) {
242 warn!(error = %error, "failed to persist final daemon checkpoint");
243 }
244 self.record_daemon_stopped(shutdown_reason, uptime);
245 Ok(())
246 }
247
248 fn shutdown_all_shims(&mut self) {
250 if self.shim_handles.is_empty() {
251 return;
252 }
253
254 let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
255 info!(
256 count = self.shim_handles.len(),
257 timeout_secs, "sending graceful shutdown to shim subprocesses"
258 );
259
260 let names: Vec<String> = self.shim_handles.keys().cloned().collect();
262 for name in &names {
263 if let Some(handle) = self.shim_handles.get_mut(name) {
264 if handle.is_terminal() {
265 continue;
266 }
267 if let Err(error) = handle.send_shutdown(timeout_secs) {
268 warn!(
269 member = name.as_str(),
270 error = %error,
271 "failed to send shim shutdown, sending kill"
272 );
273 let _ = handle.send_kill();
274 }
275 }
276 }
277
278 let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
280 let mut pids: Vec<(String, u32)> = names
281 .iter()
282 .filter_map(|name| {
283 self.shim_handles
284 .get(name)
285 .filter(|h| !h.is_terminal())
286 .map(|h| (name.clone(), h.child_pid))
287 })
288 .collect();
289
290 while !pids.is_empty() && Instant::now() < deadline {
291 pids.retain(|(name, pid)| {
292 let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
294 if !alive {
295 debug!(member = name.as_str(), pid, "shim process exited cleanly");
296 }
297 alive
298 });
299 if !pids.is_empty() {
300 std::thread::sleep(Duration::from_millis(100));
301 }
302 }
303
304 for (name, pid) in &pids {
306 warn!(
307 member = name.as_str(),
308 pid, "shim did not exit within timeout, sending Kill"
309 );
310 if let Some(handle) = self.shim_handles.get_mut(name) {
311 let _ = handle.send_kill();
312 }
313 unsafe {
315 libc::kill(*pid as i32, libc::SIGKILL);
316 }
317 }
318 }
319}