Skip to main content

batty_cli/team/
daemon.rs

1//! Core team daemon: poll loop, lifecycle coordination, and routing.
2//!
3//! `TeamDaemon` owns the long-running control loop for a Batty team session.
4//! It starts and resumes member agents, polls tmux-backed watchers, routes
5//! messages across panes, inboxes, and external channels, persists runtime
6//! state, and runs periodic automation such as standups and board rotation.
7//!
8//! Focused subsystems that were extracted from this file stay close to the
9//! daemon boundary:
10//! - `merge` handles engineer completion, test gating, and merge/escalation
11//!   flow once a task is reported done.
12//! - `interventions` handles idle nudges and manager/architect intervention
13//!   automation without changing the daemon's main control flow.
14//!
15//! This module remains the integration layer that sequences those subsystems
16//! inside each poll iteration.
17
18use std::collections::{HashMap, HashSet};
19use std::fs;
20use std::path::{Path, PathBuf};
21#[cfg(test)]
22use std::time::SystemTime;
23use std::time::{Duration, Instant};
24
25use anyhow::{Context, Result, bail};
26use serde::{Deserialize, Serialize};
27use tracing::{debug, info, warn};
28use uuid::Uuid;
29
30use super::board;
31use super::comms::{self, Channel};
32#[cfg(test)]
33use super::config::OrchestratorPosition;
34use super::config::{RoleType, TeamConfig};
35use super::delivery::{FailedDelivery, PendingMessage};
36use super::events::EventSink;
37use super::events::TeamEvent;
38use super::failure_patterns::FailureTracker;
39use super::hierarchy::MemberInstance;
40use super::inbox;
41use super::merge;
42use super::standup::{self, MemberState};
43use super::status;
44use super::task_cmd;
45#[cfg(test)]
46use super::task_loop::next_unclaimed_task;
47use super::task_loop::{
48    branch_is_merged_into, checkout_worktree_branch_from_main, current_worktree_branch,
49    engineer_base_branch_name, is_worktree_safe_to_mutate, setup_engineer_worktree,
50};
51use super::watcher::{SessionWatcher, WatcherState};
52use super::{AssignmentDeliveryResult, AssignmentResultStatus, now_unix, store_assignment_result};
53use crate::agent::{self, BackendHealth};
54use crate::tmux;
55use dispatch::DispatchQueueEntry;
56
57#[path = "daemon/agent_handle.rs"]
58pub(super) mod agent_handle;
59#[path = "daemon/automation.rs"]
60mod automation;
61#[path = "daemon/config_reload.rs"]
62mod config_reload;
63#[path = "dispatch/mod.rs"]
64mod dispatch;
65#[path = "daemon/error_handling.rs"]
66mod error_handling;
67#[path = "daemon/health/mod.rs"]
68mod health;
69#[path = "daemon/helpers.rs"]
70mod helpers;
71#[path = "daemon/hot_reload.rs"]
72mod hot_reload;
73#[path = "daemon/interventions/mod.rs"]
74mod interventions;
75#[path = "launcher.rs"]
76mod launcher;
77#[path = "daemon/poll.rs"]
78mod poll;
79#[path = "daemon/reconcile.rs"]
80mod reconcile;
81#[path = "daemon/shim_spawn.rs"]
82mod shim_spawn;
83#[path = "daemon/shim_state.rs"]
84mod shim_state;
85#[path = "daemon/state.rs"]
86mod state;
87#[path = "telegram_bridge.rs"]
88mod telegram_bridge;
89#[path = "daemon/telemetry.rs"]
90mod telemetry;
91
92#[cfg(test)]
93use self::dispatch::normalized_assignment_dir;
94use self::helpers::{extract_nudge_section, role_prompt_path};
95use self::hot_reload::consume_hot_reload_marker;
96#[cfg(test)]
97use self::hot_reload::{
98    BinaryFingerprint, hot_reload_daemon_args, hot_reload_marker_path, write_hot_reload_marker,
99};
100pub(crate) use self::interventions::NudgeSchedule;
101use self::interventions::OwnedTaskInterventionState;
102use self::launcher::{
103    duplicate_claude_session_ids, load_launch_state, member_session_tracker_config,
104};
105pub use self::state::load_dispatch_queue_snapshot;
106#[cfg(test)]
107use self::state::{
108    PersistedDaemonState, PersistedNudgeState, daemon_state_path, load_daemon_state,
109    save_daemon_state,
110};
111pub(super) use super::delivery::MessageDelivery;
112
113/// Daemon configuration derived from TeamConfig.
114pub struct DaemonConfig {
115    pub project_root: PathBuf,
116    pub team_config: TeamConfig,
117    pub session: String,
118    pub members: Vec<MemberInstance>,
119    pub pane_map: HashMap<String, String>,
120}
121
122/// The running team daemon.
123pub struct TeamDaemon {
124    pub(super) config: DaemonConfig,
125    pub(super) watchers: HashMap<String, SessionWatcher>,
126    pub(super) states: HashMap<String, MemberState>,
127    pub(super) idle_started_at: HashMap<String, Instant>,
128    pub(super) active_tasks: HashMap<String, u32>,
129    pub(super) retry_counts: HashMap<String, u32>,
130    pub(super) dispatch_queue: Vec<DispatchQueueEntry>,
131    pub(super) triage_idle_epochs: HashMap<String, u64>,
132    pub(super) triage_interventions: HashMap<String, u64>,
133    pub(super) owned_task_interventions: HashMap<String, OwnedTaskInterventionState>,
134    pub(super) intervention_cooldowns: HashMap<String, Instant>,
135    pub(super) channels: HashMap<String, Box<dyn Channel>>,
136    pub(super) nudges: HashMap<String, NudgeSchedule>,
137    pub(super) telegram_bot: Option<super::telegram::TelegramBot>,
138    pub(super) failure_tracker: FailureTracker,
139    pub(super) event_sink: EventSink,
140    pub(super) paused_standups: HashSet<String>,
141    pub(super) last_standup: HashMap<String, Instant>,
142    pub(super) last_board_rotation: Instant,
143    pub(super) last_auto_archive: Instant,
144    pub(super) last_auto_dispatch: Instant,
145    pub(super) pipeline_starvation_fired: bool,
146    pub(super) pipeline_starvation_last_fired: Option<Instant>,
147    pub(super) planning_cycle_last_fired: Option<Instant>,
148    pub(super) planning_cycle_active: bool,
149    pub(super) retro_generated: bool,
150    pub(super) failed_deliveries: Vec<FailedDelivery>,
151    pub(super) review_first_seen: HashMap<u32, u64>,
152    pub(super) review_nudge_sent: HashSet<u32>,
153    pub(super) poll_interval: Duration,
154    pub(super) is_git_repo: bool,
155    /// True when the project root is not a git repo but contains git sub-repos.
156    pub(super) is_multi_repo: bool,
157    /// Cached list of sub-repo directory names (relative to project root) for multi-repo projects.
158    pub(super) sub_repo_names: Vec<String>,
159    /// Consecutive error counts per recoverable subsystem name.
160    pub(super) subsystem_error_counts: HashMap<String, u32>,
161    pub(super) auto_merge_overrides: HashMap<u32, bool>,
162    /// Tracks recent (task_id, engineer) dispatch pairs for deduplication.
163    pub(super) recent_dispatches: HashMap<(u32, String), Instant>,
164    /// SQLite telemetry database connection (None if open failed).
165    pub(super) telemetry_db: Option<rusqlite::Connection>,
166    /// Timestamp of the last manual assignment per engineer (for cooldown).
167    pub(super) manual_assign_cooldowns: HashMap<String, Instant>,
168    /// Per-member agent backend health state.
169    pub(super) backend_health: HashMap<String, BackendHealth>,
170    /// Rolling capture history used to detect narration loops.
171    pub(super) narration_tracker: health::narration::NarrationTracker,
172    /// Per-session output tracking used for proactive context-pressure handling.
173    pub(super) context_pressure_tracker: health::context::ContextPressureTracker,
174    /// When the last periodic health check was run.
175    pub(super) last_health_check: Instant,
176    /// Rate-limiting: last time each engineer received an uncommitted-work warning.
177    pub(super) last_uncommitted_warn: HashMap<String, Instant>,
178    /// Tracks consecutive "no commits ahead of main" rejections per engineer.
179    /// Used to detect and auto-recover from branches that never diverged.
180    pub(super) completion_rejection_counts: HashMap<String, u32>,
181    /// Messages deferred because the target agent was still starting.
182    /// Drained automatically when the agent transitions to ready.
183    pub(super) pending_delivery_queue: HashMap<String, Vec<PendingMessage>>,
184    /// Per-agent shim handles (only populated when `use_shim` is true).
185    pub(super) shim_handles: HashMap<String, agent_handle::AgentHandle>,
186    /// When the last shim health check (Ping) was sent.
187    pub(super) last_shim_health_check: Instant,
188}
189
190impl TeamDaemon {
191    #[allow(dead_code)]
192    pub(super) fn watcher_mut(&mut self, name: &str) -> Result<&mut SessionWatcher> {
193        self.watchers
194            .get_mut(name)
195            .with_context(|| format!("watcher registry missing member '{name}'"))
196    }
197
198    /// Create a new daemon from resolved config and layout.
199    pub fn new(config: DaemonConfig) -> Result<Self> {
200        let is_git_repo = super::git_cmd::is_git_repo(&config.project_root);
201        let (is_multi_repo, sub_repo_names) = if is_git_repo {
202            (false, Vec::new())
203        } else {
204            let subs = super::git_cmd::discover_sub_repos(&config.project_root);
205            if subs.is_empty() {
206                (false, Vec::new())
207            } else {
208                let names: Vec<String> = subs
209                    .iter()
210                    .filter_map(|p| p.file_name().map(|n| n.to_string_lossy().to_string()))
211                    .collect();
212                info!(
213                    sub_repos = ?names,
214                    "Detected multi-repo project with {} sub-repos",
215                    names.len()
216                );
217                (true, names)
218            }
219        };
220        if !is_git_repo && !is_multi_repo {
221            info!("Project is not a git repository \u{2014} git operations disabled");
222        }
223
224        let team_config_dir = config.project_root.join(".batty").join("team_config");
225        let events_path = team_config_dir.join("events.jsonl");
226        let event_sink =
227            EventSink::new_with_max_bytes(&events_path, config.team_config.event_log_max_bytes)?;
228
229        // Create watchers for each pane member
230        let mut watchers = HashMap::new();
231        let stale_secs = config.team_config.standup.interval_secs * 2;
232        for (name, pane_id) in &config.pane_map {
233            let session_tracker = config
234                .members
235                .iter()
236                .find(|member| member.name == *name)
237                .and_then(|member| member_session_tracker_config(&config.project_root, member));
238            watchers.insert(
239                name.clone(),
240                SessionWatcher::new(pane_id, name, stale_secs, session_tracker),
241            );
242        }
243
244        // Create channels for user roles
245        let mut channels: HashMap<String, Box<dyn Channel>> = HashMap::new();
246        for role in &config.team_config.roles {
247            if role.role_type == RoleType::User {
248                if let (Some(ch_type), Some(ch_config)) = (&role.channel, &role.channel_config) {
249                    match comms::channel_from_config(ch_type, ch_config) {
250                        Ok(ch) => {
251                            channels.insert(role.name.clone(), ch);
252                        }
253                        Err(e) => {
254                            warn!(role = %role.name, error = %e, "failed to create channel");
255                        }
256                    }
257                }
258            }
259        }
260
261        // Create Telegram bot for inbound polling (if configured)
262        let telegram_bot = telegram_bridge::build_telegram_bot(&config.team_config);
263        let narration_detection_threshold = config
264            .team_config
265            .workflow_policy
266            .narration_detection_threshold;
267
268        let states = HashMap::new();
269
270        // Build nudge schedules from role configs + prompt files
271        let mut nudges = HashMap::new();
272        for role in &config.team_config.roles {
273            if let Some(interval_secs) = role.nudge_interval_secs {
274                let prompt_path =
275                    role_prompt_path(&team_config_dir, role.prompt.as_deref(), role.role_type);
276                if let Some(nudge_text) = extract_nudge_section(&prompt_path) {
277                    // Apply nudge to all instances of this role
278                    let instance_names: Vec<String> = config
279                        .members
280                        .iter()
281                        .filter(|m| m.role_name == role.name)
282                        .map(|m| m.name.clone())
283                        .collect();
284                    for name in instance_names {
285                        info!(member = %name, interval_secs, "registered nudge");
286                        nudges.insert(
287                            name,
288                            NudgeSchedule {
289                                text: nudge_text.clone(),
290                                interval: Duration::from_secs(interval_secs),
291                                // All roles start idle, so begin the countdown
292                                idle_since: Some(Instant::now()),
293                                fired_this_idle: false,
294                                paused: false,
295                            },
296                        );
297                    }
298                }
299            }
300        }
301
302        // Open telemetry database (best-effort — log and continue if it fails).
303        let telemetry_db = match super::telemetry_db::open(&config.project_root) {
304            Ok(conn) => {
305                info!("telemetry database opened");
306                Some(conn)
307            }
308            Err(error) => {
309                warn!(error = %error, "failed to open telemetry database; telemetry disabled");
310                None
311            }
312        };
313
314        let context_pressure_threshold = config
315            .team_config
316            .workflow_policy
317            .context_pressure_threshold_bytes;
318        let context_pressure_delay = config
319            .team_config
320            .workflow_policy
321            .context_pressure_restart_delay_secs;
322
323        Ok(Self {
324            config,
325            watchers,
326            states,
327            idle_started_at: HashMap::new(),
328            active_tasks: HashMap::new(),
329            retry_counts: HashMap::new(),
330            dispatch_queue: Vec::new(),
331            triage_idle_epochs: HashMap::new(),
332            triage_interventions: HashMap::new(),
333            owned_task_interventions: HashMap::new(),
334            intervention_cooldowns: HashMap::new(),
335            channels,
336            nudges,
337            telegram_bot,
338            failure_tracker: FailureTracker::new(20),
339            event_sink,
340            paused_standups: HashSet::new(),
341            last_standup: HashMap::new(),
342            last_board_rotation: Instant::now(),
343            last_auto_archive: Instant::now(),
344            last_auto_dispatch: Instant::now(),
345            pipeline_starvation_fired: false,
346            pipeline_starvation_last_fired: None,
347            planning_cycle_last_fired: None,
348            planning_cycle_active: false,
349            retro_generated: false,
350            failed_deliveries: Vec::new(),
351            review_first_seen: HashMap::new(),
352            review_nudge_sent: HashSet::new(),
353            poll_interval: Duration::from_secs(5),
354            is_git_repo,
355            is_multi_repo,
356            sub_repo_names,
357            subsystem_error_counts: HashMap::new(),
358            auto_merge_overrides: HashMap::new(),
359            recent_dispatches: HashMap::new(),
360            telemetry_db,
361            manual_assign_cooldowns: HashMap::new(),
362            backend_health: HashMap::new(),
363            narration_tracker: health::narration::NarrationTracker::new(
364                12,
365                narration_detection_threshold,
366            ),
367            context_pressure_tracker: health::context::ContextPressureTracker::new(
368                context_pressure_threshold,
369                context_pressure_delay,
370            ),
371            // Start far enough in the past to trigger an immediate check.
372            last_health_check: Instant::now() - Duration::from_secs(3600),
373            last_uncommitted_warn: HashMap::new(),
374            completion_rejection_counts: HashMap::new(),
375            pending_delivery_queue: HashMap::new(),
376            shim_handles: HashMap::new(),
377            last_shim_health_check: Instant::now(),
378        })
379    }
380
381    pub(super) fn member_nudge_text(&self, member: &MemberInstance) -> Option<String> {
382        let prompt_path = role_prompt_path(
383            &super::team_config_dir(&self.config.project_root),
384            member.prompt.as_deref(),
385            member.role_type,
386        );
387        extract_nudge_section(&prompt_path)
388    }
389
390    pub(super) fn prepend_member_nudge(
391        &self,
392        member: &MemberInstance,
393        body: impl AsRef<str>,
394    ) -> String {
395        let body = body.as_ref();
396        match self.member_nudge_text(member) {
397            Some(nudge) => format!("{nudge}\n\n{body}"),
398            None => body.to_string(),
399        }
400    }
401
402    pub(super) fn mark_member_working(&mut self, member_name: &str) {
403        // When shim mode is active, the shim is the single source of truth for
404        // agent state. Speculative mark_member_working calls from delivery,
405        // completion, and interventions must not override the shim's verdict —
406        // doing so causes the daemon to permanently think the agent is "working"
407        // when the shim classifier sees it as idle.
408        if self.shim_handles.contains_key(member_name) {
409            return;
410        }
411        self.states
412            .insert(member_name.to_string(), MemberState::Working);
413        if let Some(watcher) = self.watchers.get_mut(member_name) {
414            watcher.activate();
415        }
416        self.update_automation_timers_for_state(member_name, MemberState::Working);
417    }
418
419    pub(super) fn set_member_idle(&mut self, member_name: &str) {
420        // For shim agents: don't override the state (shim is source of truth),
421        // but DO update automation timers so idle_started_at gets populated.
422        // Without this, interventions like review_backlog never fire because
423        // automation_idle_grace_elapsed returns false.
424        if self.shim_handles.contains_key(member_name) {
425            if self.states.get(member_name) == Some(&MemberState::Idle) {
426                self.update_automation_timers_for_state(member_name, MemberState::Idle);
427            }
428            return;
429        }
430        self.states
431            .insert(member_name.to_string(), MemberState::Idle);
432        if let Some(watcher) = self.watchers.get_mut(member_name) {
433            watcher.deactivate();
434        }
435        self.update_automation_timers_for_state(member_name, MemberState::Idle);
436    }
437
438    pub(super) fn active_task_id(&self, engineer: &str) -> Option<u32> {
439        self.active_tasks.get(engineer).copied()
440    }
441
442    pub(super) fn project_root(&self) -> &Path {
443        &self.config.project_root
444    }
445
446    #[cfg(test)]
447    pub(super) fn set_auto_merge_override(&mut self, task_id: u32, enabled: bool) {
448        self.auto_merge_overrides.insert(task_id, enabled);
449    }
450
451    pub(super) fn auto_merge_override(&self, task_id: u32) -> Option<bool> {
452        // In-memory overrides take priority, then check disk
453        if let Some(&value) = self.auto_merge_overrides.get(&task_id) {
454            return Some(value);
455        }
456        let disk_overrides = super::auto_merge::load_overrides(&self.config.project_root);
457        disk_overrides.get(&task_id).copied()
458    }
459
460    pub(super) fn worktree_dir(&self, engineer: &str) -> PathBuf {
461        self.config
462            .project_root
463            .join(".batty")
464            .join("worktrees")
465            .join(engineer)
466    }
467
468    pub(super) fn board_dir(&self) -> PathBuf {
469        self.config
470            .project_root
471            .join(".batty")
472            .join("team_config")
473            .join("board")
474    }
475
476    pub(super) fn member_uses_worktrees(&self, engineer: &str) -> bool {
477        if !self.is_git_repo && !self.is_multi_repo {
478            return false;
479        }
480        self.config
481            .members
482            .iter()
483            .find(|member| member.name == engineer)
484            .map(|member| member.use_worktrees)
485            .unwrap_or(false)
486    }
487
488    pub(super) fn manager_name(&self, engineer: &str) -> Option<String> {
489        self.config
490            .members
491            .iter()
492            .find(|member| member.name == engineer)
493            .and_then(|member| member.reports_to.clone())
494    }
495
496    #[cfg(test)]
497    pub(super) fn set_active_task_for_test(&mut self, engineer: &str, task_id: u32) {
498        self.active_tasks.insert(engineer.to_string(), task_id);
499    }
500
501    #[cfg(test)]
502    pub(super) fn retry_count_for_test(&self, engineer: &str) -> Option<u32> {
503        self.retry_counts.get(engineer).copied()
504    }
505
506    #[cfg(test)]
507    pub(super) fn member_state_for_test(&self, engineer: &str) -> Option<MemberState> {
508        self.states.get(engineer).copied()
509    }
510
511    #[cfg(test)]
512    pub(super) fn set_member_state_for_test(&mut self, engineer: &str, state: MemberState) {
513        self.states.insert(engineer.to_string(), state);
514    }
515
516    pub(super) fn increment_retry(&mut self, engineer: &str) -> u32 {
517        let count = self.retry_counts.entry(engineer.to_string()).or_insert(0);
518        *count += 1;
519        *count
520    }
521
522    pub(super) fn clear_active_task(&mut self, engineer: &str) {
523        self.active_tasks.remove(engineer);
524        self.retry_counts.remove(engineer);
525        // Clean up any progress checkpoint left from a prior restart.
526        super::checkpoint::remove_checkpoint(&self.config.project_root, engineer);
527    }
528
529    /// Remove active_task entries for tasks that are done, archived, or no longer on the board.
530    pub(super) fn notify_reports_to(&mut self, from_role: &str, msg: &str) -> Result<()> {
531        let parent = self
532            .config
533            .members
534            .iter()
535            .find(|m| m.name == from_role)
536            .and_then(|m| m.reports_to.clone());
537        let Some(parent_name) = parent else {
538            return Ok(());
539        };
540        self.queue_message(from_role, &parent_name, msg)?;
541        self.mark_member_working(&parent_name);
542        Ok(())
543    }
544
545    /// Update automation countdowns when a member's state changes.
546    pub(super) fn update_automation_timers_for_state(
547        &mut self,
548        member_name: &str,
549        new_state: MemberState,
550    ) {
551        match new_state {
552            MemberState::Idle => {
553                self.idle_started_at
554                    .insert(member_name.to_string(), Instant::now());
555            }
556            MemberState::Working => {
557                self.idle_started_at.remove(member_name);
558            }
559        }
560        self.update_nudge_for_state(member_name, new_state);
561        standup::update_timer_for_state(
562            &self.config.team_config,
563            &self.config.members,
564            &mut self.paused_standups,
565            &mut self.last_standup,
566            member_name,
567            new_state,
568        );
569        self.update_triage_intervention_for_state(member_name, new_state);
570    }
571}
572
573#[cfg(test)]
574#[path = "daemon/tests.rs"]
575mod tests;