1use std::collections::{HashMap, HashSet};
19use std::fs;
20use std::path::{Path, PathBuf};
21#[cfg(test)]
22use std::time::SystemTime;
23use std::time::{Duration, Instant};
24
25use anyhow::{Context, Result, bail};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info, warn};
28use uuid::Uuid;
29
30use super::board;
31use super::comms::{self, Channel};
32#[cfg(test)]
33use super::config::OrchestratorPosition;
34use super::config::{RoleType, TeamConfig};
35use super::delivery::{FailedDelivery, PendingMessage};
36use super::events::EventSink;
37use super::events::TeamEvent;
38use super::failure_patterns::FailureTracker;
39use super::hierarchy::MemberInstance;
40use super::inbox;
41use super::merge;
42use super::standup::{self, MemberState};
43use super::status;
44use super::task_cmd;
45#[cfg(test)]
46use super::task_loop::next_unclaimed_task;
47use super::task_loop::{
48 branch_is_merged_into, checkout_worktree_branch_from_main, current_worktree_branch,
49 engineer_base_branch_name, is_worktree_safe_to_mutate, setup_engineer_worktree,
50};
51use super::watcher::{SessionTrackerConfig, SessionWatcher, WatcherState};
52use super::{AssignmentDeliveryResult, AssignmentResultStatus, now_unix, store_assignment_result};
53use crate::agent::{self, BackendHealth};
54use crate::tmux;
55use dispatch::DispatchQueueEntry;
56
57#[path = "daemon/agent_handle.rs"]
58pub(super) mod agent_handle;
59#[path = "daemon/automation.rs"]
60mod automation;
61#[path = "dispatch/mod.rs"]
62mod dispatch;
63#[path = "daemon/error_handling.rs"]
64mod error_handling;
65#[path = "daemon/health/mod.rs"]
66mod health;
67#[path = "daemon/helpers.rs"]
68mod helpers;
69#[path = "daemon/hot_reload.rs"]
70mod hot_reload;
71#[path = "daemon/interventions/mod.rs"]
72mod interventions;
73#[path = "launcher.rs"]
74mod launcher;
75#[path = "daemon/poll.rs"]
76mod poll;
77#[path = "daemon/shim_spawn.rs"]
78mod shim_spawn;
79#[path = "daemon/shim_state.rs"]
80mod shim_state;
81#[path = "daemon/state.rs"]
82mod state;
83#[path = "telegram_bridge.rs"]
84mod telegram_bridge;
85#[path = "daemon/telemetry.rs"]
86mod telemetry;
87
88#[cfg(test)]
89use self::dispatch::normalized_assignment_dir;
90use self::helpers::{extract_nudge_section, role_prompt_path};
91use self::hot_reload::consume_hot_reload_marker;
92#[cfg(test)]
93use self::hot_reload::{
94 BinaryFingerprint, hot_reload_daemon_args, hot_reload_marker_path, write_hot_reload_marker,
95};
96pub(crate) use self::interventions::NudgeSchedule;
97use self::interventions::OwnedTaskInterventionState;
98use self::launcher::{
99 duplicate_claude_session_ids, load_launch_state, member_session_tracker_config,
100};
101pub use self::state::load_dispatch_queue_snapshot;
102#[cfg(test)]
103use self::state::{
104 PersistedDaemonState, PersistedNudgeState, daemon_state_path, load_daemon_state,
105 save_daemon_state,
106};
107pub(super) use super::delivery::MessageDelivery;
108
109pub struct DaemonConfig {
111 pub project_root: PathBuf,
112 pub team_config: TeamConfig,
113 pub session: String,
114 pub members: Vec<MemberInstance>,
115 pub pane_map: HashMap<String, String>,
116}
117
118pub struct TeamDaemon {
120 pub(super) config: DaemonConfig,
121 pub(super) watchers: HashMap<String, SessionWatcher>,
122 pub(super) states: HashMap<String, MemberState>,
123 pub(super) idle_started_at: HashMap<String, Instant>,
124 pub(super) active_tasks: HashMap<String, u32>,
125 pub(super) retry_counts: HashMap<String, u32>,
126 pub(super) dispatch_queue: Vec<DispatchQueueEntry>,
127 pub(super) triage_idle_epochs: HashMap<String, u64>,
128 pub(super) triage_interventions: HashMap<String, u64>,
129 pub(super) owned_task_interventions: HashMap<String, OwnedTaskInterventionState>,
130 pub(super) intervention_cooldowns: HashMap<String, Instant>,
131 pub(super) channels: HashMap<String, Box<dyn Channel>>,
132 pub(super) nudges: HashMap<String, NudgeSchedule>,
133 pub(super) telegram_bot: Option<super::telegram::TelegramBot>,
134 pub(super) failure_tracker: FailureTracker,
135 pub(super) event_sink: EventSink,
136 pub(super) paused_standups: HashSet<String>,
137 pub(super) last_standup: HashMap<String, Instant>,
138 pub(super) last_board_rotation: Instant,
139 pub(super) last_auto_archive: Instant,
140 pub(super) last_auto_dispatch: Instant,
141 pub(super) pipeline_starvation_fired: bool,
142 pub(super) pipeline_starvation_last_fired: Option<Instant>,
143 pub(super) retro_generated: bool,
144 pub(super) failed_deliveries: Vec<FailedDelivery>,
145 pub(super) review_first_seen: HashMap<u32, u64>,
146 pub(super) review_nudge_sent: HashSet<u32>,
147 pub(super) poll_interval: Duration,
148 pub(super) is_git_repo: bool,
149 pub(super) is_multi_repo: bool,
151 pub(super) sub_repo_names: Vec<String>,
153 pub(super) subsystem_error_counts: HashMap<String, u32>,
155 pub(super) auto_merge_overrides: HashMap<u32, bool>,
156 pub(super) recent_dispatches: HashMap<(u32, String), Instant>,
158 pub(super) telemetry_db: Option<rusqlite::Connection>,
160 pub(super) manual_assign_cooldowns: HashMap<String, Instant>,
162 pub(super) backend_health: HashMap<String, BackendHealth>,
164 pub(super) last_health_check: Instant,
166 pub(super) last_uncommitted_warn: HashMap<String, Instant>,
168 pub(super) pending_delivery_queue: HashMap<String, Vec<PendingMessage>>,
171 pub(super) shim_handles: HashMap<String, agent_handle::AgentHandle>,
173 pub(super) last_shim_health_check: Instant,
175}
176
177impl TeamDaemon {
178 #[allow(dead_code)]
179 pub(super) fn watcher_mut(&mut self, name: &str) -> Result<&mut SessionWatcher> {
180 self.watchers
181 .get_mut(name)
182 .with_context(|| format!("watcher registry missing member '{name}'"))
183 }
184
185 pub fn new(config: DaemonConfig) -> Result<Self> {
187 let is_git_repo = super::git_cmd::is_git_repo(&config.project_root);
188 let (is_multi_repo, sub_repo_names) = if is_git_repo {
189 (false, Vec::new())
190 } else {
191 let subs = super::git_cmd::discover_sub_repos(&config.project_root);
192 if subs.is_empty() {
193 (false, Vec::new())
194 } else {
195 let names: Vec<String> = subs
196 .iter()
197 .filter_map(|p| p.file_name().map(|n| n.to_string_lossy().to_string()))
198 .collect();
199 info!(
200 sub_repos = ?names,
201 "Detected multi-repo project with {} sub-repos",
202 names.len()
203 );
204 (true, names)
205 }
206 };
207 if !is_git_repo && !is_multi_repo {
208 info!("Project is not a git repository \u{2014} git operations disabled");
209 }
210
211 let team_config_dir = config.project_root.join(".batty").join("team_config");
212 let events_path = team_config_dir.join("events.jsonl");
213 let event_sink =
214 EventSink::new_with_max_bytes(&events_path, config.team_config.event_log_max_bytes)?;
215
216 let mut watchers = HashMap::new();
218 let stale_secs = config.team_config.standup.interval_secs * 2;
219 for (name, pane_id) in &config.pane_map {
220 let session_tracker = config
221 .members
222 .iter()
223 .find(|member| member.name == *name)
224 .and_then(|member| member_session_tracker_config(&config.project_root, member));
225 watchers.insert(
226 name.clone(),
227 SessionWatcher::new(pane_id, name, stale_secs, session_tracker),
228 );
229 }
230
231 let mut channels: HashMap<String, Box<dyn Channel>> = HashMap::new();
233 for role in &config.team_config.roles {
234 if role.role_type == RoleType::User {
235 if let (Some(ch_type), Some(ch_config)) = (&role.channel, &role.channel_config) {
236 match comms::channel_from_config(ch_type, ch_config) {
237 Ok(ch) => {
238 channels.insert(role.name.clone(), ch);
239 }
240 Err(e) => {
241 warn!(role = %role.name, error = %e, "failed to create channel");
242 }
243 }
244 }
245 }
246 }
247
248 let telegram_bot = telegram_bridge::build_telegram_bot(&config.team_config);
250
251 let states = HashMap::new();
252
253 let mut nudges = HashMap::new();
255 for role in &config.team_config.roles {
256 if let Some(interval_secs) = role.nudge_interval_secs {
257 let prompt_path =
258 role_prompt_path(&team_config_dir, role.prompt.as_deref(), role.role_type);
259 if let Some(nudge_text) = extract_nudge_section(&prompt_path) {
260 let instance_names: Vec<String> = config
262 .members
263 .iter()
264 .filter(|m| m.role_name == role.name)
265 .map(|m| m.name.clone())
266 .collect();
267 for name in instance_names {
268 info!(member = %name, interval_secs, "registered nudge");
269 nudges.insert(
270 name,
271 NudgeSchedule {
272 text: nudge_text.clone(),
273 interval: Duration::from_secs(interval_secs),
274 idle_since: Some(Instant::now()),
276 fired_this_idle: false,
277 paused: false,
278 },
279 );
280 }
281 }
282 }
283 }
284
285 let telemetry_db = match super::telemetry_db::open(&config.project_root) {
287 Ok(conn) => {
288 info!("telemetry database opened");
289 Some(conn)
290 }
291 Err(error) => {
292 warn!(error = %error, "failed to open telemetry database; telemetry disabled");
293 None
294 }
295 };
296
297 Ok(Self {
298 config,
299 watchers,
300 states,
301 idle_started_at: HashMap::new(),
302 active_tasks: HashMap::new(),
303 retry_counts: HashMap::new(),
304 dispatch_queue: Vec::new(),
305 triage_idle_epochs: HashMap::new(),
306 triage_interventions: HashMap::new(),
307 owned_task_interventions: HashMap::new(),
308 intervention_cooldowns: HashMap::new(),
309 channels,
310 nudges,
311 telegram_bot,
312 failure_tracker: FailureTracker::new(20),
313 event_sink,
314 paused_standups: HashSet::new(),
315 last_standup: HashMap::new(),
316 last_board_rotation: Instant::now(),
317 last_auto_archive: Instant::now(),
318 last_auto_dispatch: Instant::now(),
319 pipeline_starvation_fired: false,
320 pipeline_starvation_last_fired: None,
321 retro_generated: false,
322 failed_deliveries: Vec::new(),
323 review_first_seen: HashMap::new(),
324 review_nudge_sent: HashSet::new(),
325 poll_interval: Duration::from_secs(5),
326 is_git_repo,
327 is_multi_repo,
328 sub_repo_names,
329 subsystem_error_counts: HashMap::new(),
330 auto_merge_overrides: HashMap::new(),
331 recent_dispatches: HashMap::new(),
332 telemetry_db,
333 manual_assign_cooldowns: HashMap::new(),
334 backend_health: HashMap::new(),
335 last_health_check: Instant::now() - Duration::from_secs(3600),
337 last_uncommitted_warn: HashMap::new(),
338 pending_delivery_queue: HashMap::new(),
339 shim_handles: HashMap::new(),
340 last_shim_health_check: Instant::now(),
341 })
342 }
343
344 pub(super) fn member_nudge_text(&self, member: &MemberInstance) -> Option<String> {
345 let prompt_path = role_prompt_path(
346 &super::team_config_dir(&self.config.project_root),
347 member.prompt.as_deref(),
348 member.role_type,
349 );
350 extract_nudge_section(&prompt_path)
351 }
352
353 pub(super) fn prepend_member_nudge(
354 &self,
355 member: &MemberInstance,
356 body: impl AsRef<str>,
357 ) -> String {
358 let body = body.as_ref();
359 match self.member_nudge_text(member) {
360 Some(nudge) => format!("{nudge}\n\n{body}"),
361 None => body.to_string(),
362 }
363 }
364
365 pub(super) fn mark_member_working(&mut self, member_name: &str) {
366 self.states
367 .insert(member_name.to_string(), MemberState::Working);
368 if let Some(watcher) = self.watchers.get_mut(member_name) {
369 watcher.activate();
370 }
371 self.update_automation_timers_for_state(member_name, MemberState::Working);
372 }
373
374 pub(super) fn set_member_idle(&mut self, member_name: &str) {
375 self.states
376 .insert(member_name.to_string(), MemberState::Idle);
377 if let Some(watcher) = self.watchers.get_mut(member_name) {
378 watcher.deactivate();
379 }
380 self.update_automation_timers_for_state(member_name, MemberState::Idle);
381 }
382
383 pub(super) fn active_task_id(&self, engineer: &str) -> Option<u32> {
384 self.active_tasks.get(engineer).copied()
385 }
386
387 pub(super) fn project_root(&self) -> &Path {
388 &self.config.project_root
389 }
390
391 #[cfg(test)]
392 pub(super) fn set_auto_merge_override(&mut self, task_id: u32, enabled: bool) {
393 self.auto_merge_overrides.insert(task_id, enabled);
394 }
395
396 pub(super) fn auto_merge_override(&self, task_id: u32) -> Option<bool> {
397 if let Some(&value) = self.auto_merge_overrides.get(&task_id) {
399 return Some(value);
400 }
401 let disk_overrides = super::auto_merge::load_overrides(&self.config.project_root);
402 disk_overrides.get(&task_id).copied()
403 }
404
405 pub(super) fn worktree_dir(&self, engineer: &str) -> PathBuf {
406 self.config
407 .project_root
408 .join(".batty")
409 .join("worktrees")
410 .join(engineer)
411 }
412
413 pub(super) fn board_dir(&self) -> PathBuf {
414 self.config
415 .project_root
416 .join(".batty")
417 .join("team_config")
418 .join("board")
419 }
420
421 pub(super) fn member_uses_worktrees(&self, engineer: &str) -> bool {
422 if !self.is_git_repo && !self.is_multi_repo {
423 return false;
424 }
425 self.config
426 .members
427 .iter()
428 .find(|member| member.name == engineer)
429 .map(|member| member.use_worktrees)
430 .unwrap_or(false)
431 }
432
433 pub(super) fn manager_name(&self, engineer: &str) -> Option<String> {
434 self.config
435 .members
436 .iter()
437 .find(|member| member.name == engineer)
438 .and_then(|member| member.reports_to.clone())
439 }
440
441 #[cfg(test)]
442 pub(super) fn set_active_task_for_test(&mut self, engineer: &str, task_id: u32) {
443 self.active_tasks.insert(engineer.to_string(), task_id);
444 }
445
446 #[cfg(test)]
447 pub(super) fn retry_count_for_test(&self, engineer: &str) -> Option<u32> {
448 self.retry_counts.get(engineer).copied()
449 }
450
451 #[cfg(test)]
452 pub(super) fn member_state_for_test(&self, engineer: &str) -> Option<MemberState> {
453 self.states.get(engineer).copied()
454 }
455
456 #[cfg(test)]
457 pub(super) fn set_member_state_for_test(&mut self, engineer: &str, state: MemberState) {
458 self.states.insert(engineer.to_string(), state);
459 }
460
461 pub(super) fn increment_retry(&mut self, engineer: &str) -> u32 {
462 let count = self.retry_counts.entry(engineer.to_string()).or_insert(0);
463 *count += 1;
464 *count
465 }
466
467 pub(super) fn clear_active_task(&mut self, engineer: &str) {
468 self.active_tasks.remove(engineer);
469 self.retry_counts.remove(engineer);
470 super::checkpoint::remove_checkpoint(&self.config.project_root, engineer);
472 }
473
474 pub(super) fn notify_reports_to(&mut self, from_role: &str, msg: &str) -> Result<()> {
476 let parent = self
477 .config
478 .members
479 .iter()
480 .find(|m| m.name == from_role)
481 .and_then(|m| m.reports_to.clone());
482 let Some(parent_name) = parent else {
483 return Ok(());
484 };
485 self.queue_message(from_role, &parent_name, msg)?;
486 self.mark_member_working(&parent_name);
487 Ok(())
488 }
489
490 pub(super) fn update_automation_timers_for_state(
492 &mut self,
493 member_name: &str,
494 new_state: MemberState,
495 ) {
496 match new_state {
497 MemberState::Idle => {
498 self.idle_started_at
499 .insert(member_name.to_string(), Instant::now());
500 }
501 MemberState::Working => {
502 self.idle_started_at.remove(member_name);
503 }
504 }
505 self.update_nudge_for_state(member_name, new_state);
506 standup::update_timer_for_state(
507 &self.config.team_config,
508 &self.config.members,
509 &mut self.paused_standups,
510 &mut self.last_standup,
511 member_name,
512 new_state,
513 );
514 self.update_triage_intervention_for_state(member_name, new_state);
515 }
516}
517
518#[cfg(test)]
519#[path = "daemon/tests.rs"]
520mod tests;