1use std::time::{Duration, Instant};
4
5use anyhow::Result;
6use tracing::{debug, info, warn};
7
8use super::hot_reload::HotReloadMonitor;
9use super::{TeamDaemon, standup, status};
10use crate::tmux;
11
12impl TeamDaemon {
13 pub fn run(&mut self, resume: bool) -> Result<()> {
18 self.record_daemon_started();
19 self.acknowledge_hot_reload_marker();
20 info!(session = %self.config.session, resume, "daemon started");
21 self.record_orchestrator_action(format!(
22 "runtime: orchestrator started (mode={}, resume={resume})",
23 self.config.team_config.workflow_mode.as_str()
24 ));
25
26 let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
28 let flag_clone = shutdown_flag.clone();
29 if let Err(e) = ctrlc::set_handler(move || {
30 flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
31 }) {
32 warn!(error = %e, "failed to install signal handler");
33 }
34
35 self.run_startup_preflight()?;
36
37 self.spawn_all_agents(resume)?;
39 if resume {
40 self.restore_runtime_state();
41 }
42 self.persist_runtime_state(false)?;
43
44 let started_at = Instant::now();
45 let heartbeat_interval = Duration::from_secs(300); let mut last_heartbeat = Instant::now();
47 let mut hot_reload = match HotReloadMonitor::for_current_exe() {
48 Ok(monitor) => Some(monitor),
49 Err(error) => {
50 warn!(error = %error, "failed to initialize daemon hot-reload monitor");
51 None
52 }
53 };
54
55 let shutdown_reason;
57 loop {
58 if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
60 shutdown_reason = "signal";
61 info!("received shutdown signal");
62 break;
63 }
64
65 if !tmux::session_exists(&self.config.session) {
66 shutdown_reason = "session_gone";
67 info!("tmux session gone, shutting down");
68 break;
69 }
70
71 self.run_recoverable_step("poll_shim_handles", |daemon| daemon.poll_shim_handles());
73 self.run_recoverable_step("shim_health_check", |daemon| daemon.shim_health_check());
74 self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
75 daemon.sync_launch_state_session_ids()
76 });
77 self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
78 daemon.drain_legacy_command_queue()
79 });
80
81 self.run_loop_step("deliver_inbox_messages", |daemon| {
83 daemon.deliver_inbox_messages()
84 });
85 self.run_loop_step("retry_failed_deliveries", |daemon| {
86 daemon.retry_failed_deliveries()
87 });
88
89 self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
91 daemon.maybe_intervene_triage_backlog()
92 });
93 self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
94 daemon.maybe_intervene_owned_tasks()
95 });
96 self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
97 daemon.maybe_intervene_review_backlog()
98 });
99 self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
100 daemon.maybe_escalate_stale_reviews()
101 });
102 self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
103 daemon.maybe_auto_unblock_blocked_tasks()
104 });
105
106 self.run_loop_step("reconcile_active_tasks", |daemon| {
108 daemon.reconcile_active_tasks()
109 });
110 self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
111 self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
112 daemon.maybe_recycle_cron_tasks()
113 });
114
115 self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
117 daemon.maybe_intervene_manager_dispatch_gap()
118 });
119 self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
120 daemon.maybe_intervene_architect_utilization()
121 });
122 self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
123 daemon.maybe_intervene_board_replenishment()
124 });
125 self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
126 daemon.maybe_detect_pipeline_starvation()
127 });
128
129 self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
131 daemon.process_telegram_queue()
132 });
133 self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
134 self.run_recoverable_step("check_backend_health", |daemon| {
135 daemon.check_backend_health()
136 });
137 self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
138 daemon.maybe_reconcile_stale_worktrees()
139 });
140 self.run_recoverable_step("check_worktree_staleness", |daemon| {
141 daemon.check_worktree_staleness()
142 });
143 self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
144 daemon.maybe_warn_uncommitted_work()
145 });
146 self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
147 let generated =
148 standup::maybe_generate_standup(standup::StandupGenerationContext {
149 project_root: &daemon.config.project_root,
150 team_config: &daemon.config.team_config,
151 members: &daemon.config.members,
152 watchers: &daemon.watchers,
153 states: &daemon.states,
154 pane_map: &daemon.config.pane_map,
155 telegram_bot: daemon.telegram_bot.as_ref(),
156 paused_standups: &daemon.paused_standups,
157 last_standup: &mut daemon.last_standup,
158 backend_health: &daemon.backend_health,
159 })?;
160 for recipient in generated {
161 daemon.record_standup_generated(&recipient);
162 }
163 Ok(())
164 });
165 self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
166 self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
167 self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
168 daemon.maybe_generate_retrospective()
169 });
170 self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
171 daemon.maybe_notify_failure_patterns()
172 });
173 self.run_recoverable_step("maybe_reload_binary", |daemon| {
174 daemon.maybe_hot_reload_binary(hot_reload.as_mut())
175 });
176 status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
177 project_root: &self.config.project_root,
178 members: &self.config.members,
179 pane_map: &self.config.pane_map,
180 states: &self.states,
181 nudges: &self.nudges,
182 last_standup: &self.last_standup,
183 paused_standups: &self.paused_standups,
184 standup_interval_for_member: |member_name| {
185 standup::standup_interval_for_member_name(
186 &self.config.team_config,
187 &self.config.members,
188 member_name,
189 )
190 },
191 });
192
193 if last_heartbeat.elapsed() >= heartbeat_interval {
195 let uptime = started_at.elapsed().as_secs();
196 self.record_daemon_heartbeat(uptime);
197 if let Err(error) = self.persist_runtime_state(false) {
198 warn!(error = %error, "failed to persist daemon checkpoint");
199 }
200 debug!(uptime_secs = uptime, "daemon heartbeat");
201 last_heartbeat = Instant::now();
202 }
203
204 std::thread::sleep(self.poll_interval);
205 }
206
207 self.shutdown_all_shims();
209
210 if let Err(error) = self.save_shim_state() {
212 warn!(error = %error, "failed to save shim state for resume");
213 }
214
215 let uptime = started_at.elapsed().as_secs();
216 if let Err(error) = self.persist_runtime_state(true) {
217 warn!(error = %error, "failed to persist final daemon checkpoint");
218 }
219 self.record_daemon_stopped(shutdown_reason, uptime);
220 Ok(())
221 }
222
223 fn shutdown_all_shims(&mut self) {
225 if self.shim_handles.is_empty() {
226 return;
227 }
228
229 let timeout_secs = self.config.team_config.shim_shutdown_timeout_secs;
230 info!(
231 count = self.shim_handles.len(),
232 timeout_secs, "sending graceful shutdown to shim subprocesses"
233 );
234
235 let names: Vec<String> = self.shim_handles.keys().cloned().collect();
237 for name in &names {
238 if let Some(handle) = self.shim_handles.get_mut(name) {
239 if handle.is_terminal() {
240 continue;
241 }
242 if let Err(error) = handle.send_shutdown(timeout_secs) {
243 warn!(
244 member = name.as_str(),
245 error = %error,
246 "failed to send shim shutdown, sending kill"
247 );
248 let _ = handle.send_kill();
249 }
250 }
251 }
252
253 let deadline = Instant::now() + Duration::from_secs(timeout_secs as u64);
255 let mut pids: Vec<(String, u32)> = names
256 .iter()
257 .filter_map(|name| {
258 self.shim_handles
259 .get(name)
260 .filter(|h| !h.is_terminal())
261 .map(|h| (name.clone(), h.child_pid))
262 })
263 .collect();
264
265 while !pids.is_empty() && Instant::now() < deadline {
266 pids.retain(|(name, pid)| {
267 let alive = unsafe { libc::kill(*pid as i32, 0) } == 0;
269 if !alive {
270 debug!(member = name.as_str(), pid, "shim process exited cleanly");
271 }
272 alive
273 });
274 if !pids.is_empty() {
275 std::thread::sleep(Duration::from_millis(100));
276 }
277 }
278
279 for (name, pid) in &pids {
281 warn!(
282 member = name.as_str(),
283 pid, "shim did not exit within timeout, sending Kill"
284 );
285 if let Some(handle) = self.shim_handles.get_mut(name) {
286 let _ = handle.send_kill();
287 }
288 unsafe {
290 libc::kill(*pid as i32, libc::SIGKILL);
291 }
292 }
293 }
294}