Skip to main content

batty_cli/team/
daemon.rs

1//! Core team daemon: poll loop, lifecycle coordination, and routing.
2//!
3//! `TeamDaemon` owns the long-running control loop for a Batty team session.
4//! It starts and resumes member agents, polls tmux-backed watchers, routes
5//! messages across panes, inboxes, and external channels, persists runtime
6//! state, and runs periodic automation such as standups and board rotation.
7//!
8//! Focused subsystems that were extracted from this file stay close to the
9//! daemon boundary:
10//! - `merge` handles engineer completion, test gating, and merge/escalation
11//!   flow once a task is reported done.
12//! - `interventions` handles idle nudges and manager/architect intervention
13//!   automation without changing the daemon's main control flow.
14//!
15//! This module remains the integration layer that sequences those subsystems
16//! inside each poll iteration.
17
18use std::collections::{HashMap, HashSet};
19use std::fs;
20use std::path::{Path, PathBuf};
21use std::process::Command;
22#[cfg(test)]
23use std::time::SystemTime;
24use std::time::{Duration, Instant};
25
26use anyhow::{Context, Result, bail};
27use serde::{Deserialize, Serialize};
28use sha2::{Digest, Sha256};
29use tracing::{debug, info, warn};
30use uuid::Uuid;
31
32use super::board;
33use super::comms::{self, Channel};
34#[cfg(test)]
35use super::config::OrchestratorPosition;
36use super::config::{RoleType, TeamConfig};
37use super::delivery::{FailedDelivery, PendingMessage};
38use super::events::EventSink;
39use super::events::TeamEvent;
40use super::failure_patterns::FailureTracker;
41use super::hierarchy::MemberInstance;
42use super::inbox;
43use super::merge;
44use super::standup::{self, MemberState};
45use super::status;
46use super::task_cmd;
47#[cfg(test)]
48use super::task_loop::next_unclaimed_task;
49use super::task_loop::{
50    branch_is_merged_into, current_worktree_branch, engineer_base_branch_name,
51    preserve_worktree_with_commit, setup_engineer_worktree,
52};
53use super::verification::VerificationState;
54use super::watcher::{SessionWatcher, WatcherState};
55use super::{AssignmentDeliveryResult, AssignmentResultStatus, now_unix, store_assignment_result};
56use crate::agent::{self, BackendHealth};
57use crate::tmux;
58use dispatch::DispatchQueueEntry;
59
60const STALLED_MID_TURN_MARKER: &str = "stalled mid-turn";
61const STALLED_MID_TURN_RETRY_BACKOFF_SECS: [u64; 2] = [30, 60];
62
63#[path = "daemon/agent_handle.rs"]
64pub(super) mod agent_handle;
65#[path = "daemon/automation.rs"]
66mod automation;
67#[path = "daemon/config_reload.rs"]
68mod config_reload;
69#[path = "discord_bridge.rs"]
70mod discord_bridge;
71#[path = "dispatch/mod.rs"]
72mod dispatch;
73#[path = "daemon/error_handling.rs"]
74mod error_handling;
75#[path = "daemon/health/mod.rs"]
76mod health;
77#[path = "daemon/helpers.rs"]
78mod helpers;
79#[path = "daemon/hot_reload.rs"]
80mod hot_reload;
81#[path = "daemon/interventions/mod.rs"]
82mod interventions;
83#[path = "launcher.rs"]
84mod launcher;
85#[path = "daemon/merge_queue.rs"]
86mod merge_queue;
87#[path = "daemon/poll.rs"]
88mod poll;
89#[path = "daemon/reconcile.rs"]
90mod reconcile;
91#[cfg(any(test, feature = "scenario-test"))]
92#[path = "daemon/scenario_api.rs"]
93pub mod scenario_api;
94#[path = "daemon/shim_spawn.rs"]
95mod shim_spawn;
96#[path = "daemon/shim_state.rs"]
97mod shim_state;
98#[path = "daemon/spec_gen.rs"]
99mod spec_gen;
100#[path = "daemon/state.rs"]
101mod state;
102#[path = "telegram_bridge.rs"]
103mod telegram_bridge;
104#[path = "daemon/telemetry.rs"]
105pub(crate) mod telemetry;
106#[path = "daemon/tick_report.rs"]
107pub mod tick_report;
108#[path = "daemon/verification.rs"]
109pub(crate) mod verification;
110
111pub(crate) use self::discord_bridge::{
112    build_shutdown_snapshot, send_discord_shutdown_notice, send_discord_shutdown_summary,
113};
114#[cfg(test)]
115use self::dispatch::normalized_assignment_dir;
116pub(crate) use self::error_handling::{optional_subsystem_for_step, optional_subsystem_names};
117use self::helpers::{extract_nudge_section, role_prompt_path};
118use self::hot_reload::consume_hot_reload_marker;
119#[cfg(test)]
120use self::hot_reload::{
121    BinaryFingerprint, hot_reload_daemon_args, hot_reload_marker_path, write_hot_reload_marker,
122};
123pub(crate) use self::interventions::NudgeSchedule;
124use self::interventions::OwnedTaskInterventionState;
125use self::launcher::{
126    duplicate_claude_session_ids, load_launch_state, member_session_tracker_config,
127};
128pub(crate) use self::merge_queue::{MergeQueue, MergeRequest};
129pub use self::state::load_dispatch_queue_snapshot;
130#[cfg(test)]
131use self::state::{
132    PersistedDaemonState, PersistedNudgeState, daemon_state_path, load_daemon_state,
133    save_daemon_state,
134};
135pub(super) use super::delivery::MessageDelivery;
136
137/// Daemon configuration derived from TeamConfig.
138pub struct DaemonConfig {
139    pub project_root: PathBuf,
140    pub team_config: TeamConfig,
141    pub session: String,
142    pub members: Vec<MemberInstance>,
143    pub pane_map: HashMap<String, String>,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
147pub(crate) struct MainSmokeState {
148    pub broken: bool,
149    pub pause_dispatch: bool,
150    pub last_run_at: u64,
151    #[serde(skip_serializing_if = "Option::is_none")]
152    pub last_success_commit: Option<String>,
153    #[serde(skip_serializing_if = "Option::is_none")]
154    pub broken_commit: Option<String>,
155    #[serde(default)]
156    pub suspects: Vec<String>,
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub summary: Option<String>,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub(super) enum CleanroomBackend {
163    SkoolKit,
164    Ghidra,
165}
166
167impl CleanroomBackend {
168    fn detect(input_path: &Path) -> Result<Self> {
169        let extension = input_path
170            .extension()
171            .and_then(|ext| ext.to_str())
172            .map(|ext| ext.to_ascii_lowercase());
173        match extension.as_deref() {
174            Some("z80" | "sna") => Ok(Self::SkoolKit),
175            Some("nes" | "gb" | "gbc" | "com" | "exe") => Ok(Self::Ghidra),
176            _ => bail!(
177                "unsupported clean-room input '{}': expected one of .z80, .sna, .nes, .gb, .gbc, .com, or .exe",
178                input_path.display()
179            ),
180        }
181    }
182}
183
184/// The running team daemon.
185pub struct TeamDaemon {
186    pub(super) config: DaemonConfig,
187    pub(super) watchers: HashMap<String, SessionWatcher>,
188    pub(super) states: HashMap<String, MemberState>,
189    pub(super) idle_started_at: HashMap<String, Instant>,
190    pub(super) active_tasks: HashMap<String, u32>,
191    pub(super) retry_counts: HashMap<String, u32>,
192    pub(super) dispatch_queue: Vec<DispatchQueueEntry>,
193    pub(super) triage_idle_epochs: HashMap<String, u64>,
194    pub(super) triage_interventions: HashMap<String, u64>,
195    pub(super) owned_task_interventions: HashMap<String, OwnedTaskInterventionState>,
196    pub(super) intervention_cooldowns: HashMap<String, Instant>,
197    pub(super) channels: HashMap<String, Box<dyn Channel>>,
198    pub(super) nudges: HashMap<String, NudgeSchedule>,
199    pub(super) discord_bot: Option<super::discord::DiscordBot>,
200    pub(super) discord_event_cursor: usize,
201    pub(super) telegram_bot: Option<super::telegram::TelegramBot>,
202    pub(super) failure_tracker: FailureTracker,
203    pub(super) event_sink: EventSink,
204    pub(super) paused_standups: HashSet<String>,
205    pub(super) last_standup: HashMap<String, Instant>,
206    pub(super) last_board_rotation: Instant,
207    pub(super) last_auto_archive: Instant,
208    pub(super) last_auto_dispatch: Instant,
209    pub(super) last_main_smoke_check: Instant,
210    pub(super) pipeline_starvation_fired: bool,
211    pub(super) pipeline_starvation_last_fired: Option<Instant>,
212    pub(super) planning_cycle_last_fired: Option<Instant>,
213    pub(super) planning_cycle_active: bool,
214    pub(super) retro_generated: bool,
215    pub(super) failed_deliveries: Vec<FailedDelivery>,
216    pub(super) review_first_seen: HashMap<u32, u64>,
217    pub(super) review_nudge_sent: HashSet<u32>,
218    pub(super) poll_cycle_count: u64,
219    pub(super) poll_interval: Duration,
220    /// Errors recorded during the current `tick()` call. Cleared at the
221    /// start of each tick and drained into the returned `TickReport`.
222    /// Always populated (cheap: empty `Vec` cost), so non-test builds can
223    /// also surface tick-level diagnostics from the future
224    /// `batty debug tick` subcommand.
225    pub(super) current_tick_errors: Vec<(String, String)>,
226    pub(super) is_git_repo: bool,
227    /// True when the project root is not a git repo but contains git sub-repos.
228    pub(super) is_multi_repo: bool,
229    /// Cached list of sub-repo directory names (relative to project root) for multi-repo projects.
230    pub(super) sub_repo_names: Vec<String>,
231    /// Consecutive error counts per recoverable subsystem name.
232    pub(super) subsystem_error_counts: HashMap<String, u32>,
233    pub(super) auto_merge_overrides: HashMap<u32, bool>,
234    /// Tracks recent (task_id, engineer) dispatch pairs for deduplication.
235    pub(super) recent_dispatches: HashMap<(u32, String), Instant>,
236    /// Tracks recent escalation keys to suppress repeated alerts.
237    pub(super) recent_escalations: HashMap<String, Instant>,
238    /// Latest periodic main smoke-test outcome.
239    pub(super) main_smoke_state: Option<MainSmokeState>,
240    /// SQLite telemetry database connection (None if open failed).
241    pub(super) telemetry_db: Option<rusqlite::Connection>,
242    /// Timestamp of the last manual assignment per engineer (for cooldown).
243    pub(super) manual_assign_cooldowns: HashMap<String, Instant>,
244    /// Per-member agent backend health state.
245    pub(super) backend_health: HashMap<String, BackendHealth>,
246    /// Rolling capture history used to detect narration loops.
247    pub(super) narration_tracker: health::narration::NarrationTracker,
248    /// Per-session output tracking used for proactive context-pressure handling.
249    pub(super) context_pressure_tracker: health::context::ContextPressureTracker,
250    /// When the last periodic health check was run.
251    pub(super) last_health_check: Instant,
252    /// Rate-limiting: last time each engineer received an uncommitted-work warning.
253    pub(super) last_uncommitted_warn: HashMap<String, Instant>,
254    /// Last time the daemon checked for stale per-worktree cargo targets to prune.
255    pub(super) last_shared_target_cleanup: Instant,
256    /// Last time the daemon ran a full disk hygiene pass.
257    pub(super) last_disk_hygiene_check: Instant,
258    /// Per-engineer completion verification loop state.
259    pub(super) verification_states: HashMap<String, VerificationState>,
260    /// Tracks consecutive narration-only rejections per task (commits exist
261    /// but the branch still has no file diff). After threshold, escalates.
262    pub(super) narration_rejection_counts: HashMap<u32, u32>,
263    /// Tracks consecutive shim completions whose worktree still has zero diff
264    /// against main, so the daemon can break completion loops proactively.
265    pub(super) zero_diff_completion_counts: HashMap<u32, u32>,
266    /// Messages deferred because the target agent was still starting.
267    /// Drained automatically when the agent transitions to ready.
268    pub(super) pending_delivery_queue: HashMap<String, Vec<PendingMessage>>,
269    /// Per-agent shim handles (only populated when `use_shim` is true).
270    pub(super) shim_handles: HashMap<String, agent_handle::AgentHandle>,
271    /// When the last shim health check (Ping) was sent.
272    pub(super) last_shim_health_check: Instant,
273    /// Serial daemon-owned merge queue for auto-merge execution.
274    pub(super) merge_queue: MergeQueue,
275}
276
277#[cfg(any(test, feature = "scenario-test"))]
278impl TeamDaemon {
279    /// Acquire the scenario framework's test-API hooks. Gated by
280    /// `#[cfg(any(test, feature = "scenario-test"))]` so it does not
281    /// exist in release builds. See
282    /// [`scenario_api::ScenarioHooks`](crate::team::daemon::scenario_api::ScenarioHooks)
283    /// for the list of supported operations.
284    pub fn scenario_hooks(&mut self) -> scenario_api::ScenarioHooks<'_> {
285        scenario_api::ScenarioHooks::new(self)
286    }
287}
288
289impl TeamDaemon {
290    pub(in crate::team) fn report_preserve_failure(
291        &mut self,
292        member_name: &str,
293        task_id: Option<u32>,
294        context: &str,
295        detail: &str,
296    ) {
297        let reason = match task_id {
298            Some(task_id) => format!(
299                "Task #{task_id} is blocked because Batty could not safely auto-save {member_name}'s dirty worktree before {context}. {detail}"
300            ),
301            None => format!(
302                "Batty could not safely auto-save {member_name}'s dirty worktree before {context}. {detail}"
303            ),
304        };
305
306        // Deduplicate repeated preserve-failure alerts for the same
307        // (member, task, context, detail). Without this, the daemon fires the
308        // same alert to engineer + manager on every reconciliation cycle as
309        // long as the stale branch condition persists, creating tight
310        // acknowledgement loops that flood the inbox without forward progress.
311        let detail_digest: u64 = {
312            use std::collections::hash_map::DefaultHasher;
313            use std::hash::{Hash, Hasher};
314            let mut hasher = DefaultHasher::new();
315            detail.hash(&mut hasher);
316            hasher.finish()
317        };
318        let task_key = task_id
319            .map(|id| id.to_string())
320            .unwrap_or_else(|| "-".to_string());
321        let dedup_key = format!("preserve:{member_name}:{task_key}:{context}:{detail_digest}");
322        if self.suppress_recent_escalation(dedup_key, Duration::from_secs(600)) {
323            return;
324        }
325
326        if let Some(task_id) = task_id {
327            if let Err(error) =
328                task_cmd::block_task_with_reason(&self.board_dir(), task_id, &reason)
329            {
330                warn!(
331                    member = member_name,
332                    task_id,
333                    error = %error,
334                    "failed to block task after dirty worktree preservation failure"
335                );
336            }
337        }
338        let manager = self.assignment_sender(member_name);
339        let _ = self.queue_daemon_message(member_name, &reason);
340        let _ = self.queue_daemon_message(&manager, &reason);
341        self.record_orchestrator_action(format!("blocked recovery: {reason}"));
342    }
343
344    #[allow(dead_code)]
345    pub(super) fn preserve_member_worktree(
346        &mut self,
347        member_name: &str,
348        commit_message: &str,
349    ) -> bool {
350        let policy = &self.config.team_config.workflow_policy;
351        if !policy.auto_commit_on_restart {
352            return false;
353        }
354
355        let Some(member) = self
356            .config
357            .members
358            .iter()
359            .find(|member| member.name == member_name)
360        else {
361            return false;
362        };
363        if member.role_type != RoleType::Engineer || !member.use_worktrees {
364            return false;
365        }
366
367        let worktree_dir = self
368            .config
369            .project_root
370            .join(".batty")
371            .join("worktrees")
372            .join(member_name);
373        if !worktree_dir.exists() {
374            return false;
375        }
376
377        match preserve_worktree_with_commit(
378            &worktree_dir,
379            commit_message,
380            Duration::from_secs(policy.graceful_shutdown_timeout_secs),
381        ) {
382            Ok(saved) => {
383                if saved {
384                    info!(
385                        member = member_name,
386                        worktree = %worktree_dir.display(),
387                        "auto-saved worktree before restart/shutdown"
388                    );
389                }
390                saved
391            }
392            Err(error) => {
393                warn!(
394                    member = member_name,
395                    worktree = %worktree_dir.display(),
396                    error = %error,
397                    "failed to auto-save worktree before restart/shutdown"
398                );
399                self.report_preserve_failure(
400                    member_name,
401                    self.active_task_id(member_name),
402                    "restart or shutdown",
403                    &error.to_string(),
404                );
405                false
406            }
407        }
408    }
409
410    #[allow(dead_code)]
411    pub(super) fn watcher_mut(&mut self, name: &str) -> Result<&mut SessionWatcher> {
412        self.watchers
413            .get_mut(name)
414            .with_context(|| format!("watcher registry missing member '{name}'"))
415    }
416
417    /// Create a new daemon from resolved config and layout.
418    pub fn new(config: DaemonConfig) -> Result<Self> {
419        let is_git_repo = super::git_cmd::is_git_repo(&config.project_root);
420        let (is_multi_repo, sub_repo_names) = if is_git_repo {
421            (false, Vec::new())
422        } else {
423            let subs = super::git_cmd::discover_sub_repos(&config.project_root);
424            if subs.is_empty() {
425                (false, Vec::new())
426            } else {
427                let names: Vec<String> = subs
428                    .iter()
429                    .filter_map(|p| p.file_name().map(|n| n.to_string_lossy().to_string()))
430                    .collect();
431                info!(
432                    sub_repos = ?names,
433                    "Detected multi-repo project with {} sub-repos",
434                    names.len()
435                );
436                (true, names)
437            }
438        };
439        if !is_git_repo && !is_multi_repo {
440            info!("Project is not a git repository \u{2014} git operations disabled");
441        }
442
443        let team_config_dir = config.project_root.join(".batty").join("team_config");
444        let events_path = team_config_dir.join("events.jsonl");
445        let event_sink =
446            EventSink::new_with_max_bytes(&events_path, config.team_config.event_log_max_bytes)?;
447
448        // Create watchers for each pane member
449        let mut watchers = HashMap::new();
450        let stale_secs = config.team_config.standup.interval_secs * 2;
451        for (name, pane_id) in &config.pane_map {
452            let session_tracker = config
453                .members
454                .iter()
455                .find(|member| member.name == *name)
456                .and_then(|member| member_session_tracker_config(&config.project_root, member));
457            watchers.insert(
458                name.clone(),
459                SessionWatcher::new(pane_id, name, stale_secs, session_tracker),
460            );
461        }
462
463        // Create channels for user roles
464        let mut channels: HashMap<String, Box<dyn Channel>> = HashMap::new();
465        for role in &config.team_config.roles {
466            if role.role_type == RoleType::User {
467                if let (Some(ch_type), Some(ch_config)) = (&role.channel, &role.channel_config) {
468                    match comms::channel_from_config(ch_type, ch_config) {
469                        Ok(ch) => {
470                            channels.insert(role.name.clone(), ch);
471                        }
472                        Err(e) => {
473                            warn!(role = %role.name, error = %e, "failed to create channel");
474                        }
475                    }
476                }
477            }
478        }
479
480        // Create Discord bot for inbound polling and event mirroring (if configured)
481        let discord_bot = discord_bridge::build_discord_bot(&config.team_config);
482        // Create Telegram bot for inbound polling (if configured)
483        let telegram_bot = telegram_bridge::build_telegram_bot(&config.team_config);
484        let narration_detection_enabled = config
485            .team_config
486            .workflow_policy
487            .narration_detection_enabled;
488        let narration_threshold_polls =
489            config.team_config.workflow_policy.narration_threshold_polls;
490
491        let states = HashMap::new();
492
493        // Build nudge schedules from role configs + prompt files
494        let mut nudges = HashMap::new();
495        for role in &config.team_config.roles {
496            if let Some(interval_secs) = role.nudge_interval_secs {
497                let prompt_path =
498                    role_prompt_path(&team_config_dir, role.prompt.as_deref(), role.role_type);
499                if let Some(nudge_text) = extract_nudge_section(&prompt_path) {
500                    // Apply nudge to all instances of this role
501                    let instance_names: Vec<String> = config
502                        .members
503                        .iter()
504                        .filter(|m| m.role_name == role.name)
505                        .map(|m| m.name.clone())
506                        .collect();
507                    for name in instance_names {
508                        info!(member = %name, interval_secs, "registered nudge");
509                        nudges.insert(
510                            name,
511                            NudgeSchedule {
512                                text: nudge_text.clone(),
513                                interval: Duration::from_secs(interval_secs),
514                                // All roles start idle, so begin the countdown
515                                idle_since: Some(Instant::now()),
516                                fired_this_idle: false,
517                                paused: false,
518                            },
519                        );
520                    }
521                }
522            }
523        }
524
525        // Open telemetry database (best-effort — log and continue if it fails).
526        let telemetry_db = match super::telemetry_db::open(&config.project_root) {
527            Ok(conn) => {
528                info!("telemetry database opened");
529                Some(conn)
530            }
531            Err(error) => {
532                warn!(error = %error, "failed to open telemetry database; telemetry disabled");
533                None
534            }
535        };
536
537        let context_pressure_threshold = config
538            .team_config
539            .workflow_policy
540            .context_pressure_threshold;
541        let context_pressure_threshold_bytes = config
542            .team_config
543            .workflow_policy
544            .context_pressure_threshold_bytes;
545
546        Ok(Self {
547            config,
548            watchers,
549            states,
550            idle_started_at: HashMap::new(),
551            active_tasks: HashMap::new(),
552            retry_counts: HashMap::new(),
553            dispatch_queue: Vec::new(),
554            triage_idle_epochs: HashMap::new(),
555            triage_interventions: HashMap::new(),
556            owned_task_interventions: HashMap::new(),
557            intervention_cooldowns: HashMap::new(),
558            channels,
559            nudges,
560            discord_bot,
561            // Skip backlog on startup — only send events that happen after boot.
562            // Reading the entire history on first start hammers Discord API with
563            // hundreds of events and triggers rate limits (429).
564            discord_event_cursor: crate::team::events::read_events(event_sink.path())
565                .map(|events| events.len())
566                .unwrap_or(0),
567            telegram_bot,
568            failure_tracker: FailureTracker::new(20),
569            event_sink,
570            paused_standups: HashSet::new(),
571            last_standup: HashMap::new(),
572            last_board_rotation: Instant::now(),
573            last_auto_archive: Instant::now(),
574            last_auto_dispatch: Instant::now(),
575            last_main_smoke_check: Instant::now(),
576            pipeline_starvation_fired: false,
577            pipeline_starvation_last_fired: None,
578            planning_cycle_last_fired: None,
579            planning_cycle_active: false,
580            retro_generated: false,
581            failed_deliveries: Vec::new(),
582            review_first_seen: HashMap::new(),
583            review_nudge_sent: HashSet::new(),
584            poll_cycle_count: 0,
585            current_tick_errors: Vec::new(),
586            poll_interval: Duration::from_secs(5),
587            is_git_repo,
588            is_multi_repo,
589            sub_repo_names,
590            subsystem_error_counts: HashMap::new(),
591            auto_merge_overrides: HashMap::new(),
592            recent_dispatches: HashMap::new(),
593            recent_escalations: HashMap::new(),
594            main_smoke_state: None,
595            telemetry_db,
596            manual_assign_cooldowns: HashMap::new(),
597            backend_health: HashMap::new(),
598            narration_tracker: health::narration::NarrationTracker::new(
599                narration_detection_enabled,
600                narration_threshold_polls,
601            ),
602            context_pressure_tracker: health::context::ContextPressureTracker::new(
603                context_pressure_threshold,
604                context_pressure_threshold_bytes,
605            ),
606            // Start far enough in the past to trigger an immediate check.
607            last_health_check: Instant::now() - Duration::from_secs(3600),
608            last_uncommitted_warn: HashMap::new(),
609            last_shared_target_cleanup: Instant::now() - Duration::from_secs(3600),
610            last_disk_hygiene_check: Instant::now() - Duration::from_secs(3600),
611            verification_states: HashMap::new(),
612            narration_rejection_counts: HashMap::new(),
613            zero_diff_completion_counts: HashMap::new(),
614            pending_delivery_queue: HashMap::new(),
615            shim_handles: HashMap::new(),
616            last_shim_health_check: Instant::now(),
617            merge_queue: MergeQueue::default(),
618        })
619    }
620
621    pub(crate) fn suppress_recent_escalation(
622        &mut self,
623        key: impl Into<String>,
624        window: Duration,
625    ) -> bool {
626        let now = Instant::now();
627        self.recent_escalations
628            .retain(|_, seen_at| now.duration_since(*seen_at) < window);
629
630        let key = key.into();
631        if self.recent_escalations.contains_key(&key) {
632            return true;
633        }
634
635        self.recent_escalations.insert(key, now);
636        false
637    }
638
639    pub(super) fn member_nudge_text(&self, member: &MemberInstance) -> Option<String> {
640        let prompt_path = role_prompt_path(
641            &super::team_config_dir(&self.config.project_root),
642            member.prompt.as_deref(),
643            member.role_type,
644        );
645        extract_nudge_section(&prompt_path)
646    }
647
648    pub(super) fn prepend_member_nudge(
649        &self,
650        member: &MemberInstance,
651        body: impl AsRef<str>,
652    ) -> String {
653        let body = body.as_ref();
654        match self.member_nudge_text(member) {
655            Some(nudge) => format!("{nudge}\n\n{body}"),
656            None => body.to_string(),
657        }
658    }
659
660    pub(super) fn mark_member_working(&mut self, member_name: &str) {
661        // When shim mode is active, the shim is the single source of truth for
662        // agent state. Speculative mark_member_working calls from delivery,
663        // completion, and interventions must not override the shim's verdict —
664        // doing so causes the daemon to permanently think the agent is "working"
665        // when the shim classifier sees it as idle.
666        if self.shim_handles.contains_key(member_name) {
667            return;
668        }
669        self.states
670            .insert(member_name.to_string(), MemberState::Working);
671        if let Some(watcher) = self.watchers.get_mut(member_name) {
672            watcher.activate();
673        }
674        self.update_automation_timers_for_state(member_name, MemberState::Working);
675    }
676
677    pub(super) fn set_member_idle(&mut self, member_name: &str) {
678        // For shim agents: don't override the state (shim is source of truth),
679        // but DO update automation timers so idle_started_at gets populated.
680        // Without this, interventions like review_backlog never fire because
681        // automation_idle_grace_elapsed returns false.
682        if self.shim_handles.contains_key(member_name) {
683            if self.states.get(member_name) == Some(&MemberState::Idle) {
684                self.update_automation_timers_for_state(member_name, MemberState::Idle);
685            }
686            return;
687        }
688        self.states
689            .insert(member_name.to_string(), MemberState::Idle);
690        if let Some(watcher) = self.watchers.get_mut(member_name) {
691            watcher.deactivate();
692        }
693        self.update_automation_timers_for_state(member_name, MemberState::Idle);
694    }
695
696    pub(super) fn active_task_id(&self, engineer: &str) -> Option<u32> {
697        self.active_tasks.get(engineer).copied()
698    }
699
700    pub(super) fn preserve_worktree_before_restart(
701        &mut self,
702        member_name: &str,
703        worktree_dir: &Path,
704        reason: &str,
705    ) {
706        let Some(member) = self
707            .config
708            .members
709            .iter()
710            .find(|member| member.name == member_name)
711        else {
712            return;
713        };
714        if member.role_type != RoleType::Engineer || !member.use_worktrees {
715            return;
716        }
717        if !self
718            .config
719            .team_config
720            .workflow_policy
721            .auto_commit_on_restart
722            || !worktree_dir.exists()
723        {
724            return;
725        }
726
727        let timeout = Duration::from_secs(
728            self.config
729                .team_config
730                .workflow_policy
731                .graceful_shutdown_timeout_secs,
732        );
733        match preserve_worktree_with_commit(
734            worktree_dir,
735            "wip: auto-save before restart [batty]",
736            timeout,
737        ) {
738            Ok(true) => info!(
739                member = member_name,
740                worktree = %worktree_dir.display(),
741                reason,
742                "auto-saved dirty worktree before restart"
743            ),
744            Ok(false) => {}
745            Err(error) => {
746                warn!(
747                    member = member_name,
748                    worktree = %worktree_dir.display(),
749                    reason,
750                    error = %error,
751                    "failed to auto-save dirty worktree before restart"
752                );
753                self.report_preserve_failure(
754                    member_name,
755                    self.active_task_id(member_name),
756                    reason,
757                    &error.to_string(),
758                );
759            }
760        }
761    }
762
763    pub(super) fn project_root(&self) -> &Path {
764        &self.config.project_root
765    }
766
767    pub(super) fn dispatch_paused_by_main_smoke(&self) -> bool {
768        self.main_smoke_state
769            .as_ref()
770            .is_some_and(|state| state.broken && state.pause_dispatch)
771    }
772
773    pub(super) fn maybe_run_main_smoke(&mut self) -> Result<()> {
774        const DEFAULT_MAIN_SMOKE_SUSPECT_COMMITS: usize = 5;
775
776        let policy = self.config.team_config.workflow_policy.main_smoke.clone();
777        if !policy.enabled {
778            return Ok(());
779        }
780
781        let interval = Duration::from_secs(policy.interval_secs);
782        if self.last_main_smoke_check.elapsed() < interval {
783            return Ok(());
784        }
785        self.last_main_smoke_check = Instant::now();
786
787        if !self.is_git_repo || self.is_multi_repo {
788            return Ok(());
789        }
790
791        let command = policy.command.trim();
792        if command.is_empty() {
793            warn!("main smoke command is empty; skipping");
794            return Ok(());
795        }
796
797        let head = Self::short_head_commit(self.project_root())?;
798        self.record_orchestrator_action(format!("main smoke: running `{command}` at {head}"));
799
800        let test_run =
801            crate::team::task_loop::run_tests_in_worktree(self.project_root(), Some(command))
802                .with_context(|| {
803                    format!(
804                        "failed while running main smoke command `{command}` in {}",
805                        self.project_root().display()
806                    )
807                })?;
808
809        if test_run.passed {
810            let was_broken = self
811                .main_smoke_state
812                .as_ref()
813                .is_some_and(|state| state.broken);
814            self.main_smoke_state = Some(MainSmokeState {
815                broken: false,
816                pause_dispatch: policy.pause_dispatch_on_failure,
817                last_run_at: now_unix(),
818                last_success_commit: Some(head.clone()),
819                broken_commit: None,
820                suspects: Vec::new(),
821                summary: Some(format!("`{command}` passed on {head}")),
822            });
823            if was_broken {
824                self.emit_event(TeamEvent::main_smoke_recovered(&head, command));
825                self.record_orchestrator_action(format!(
826                    "main smoke: recovered on {head}; dispatch gate cleared"
827                ));
828            }
829            return Ok(());
830        }
831
832        let suspects =
833            Self::recent_main_suspects(self.project_root(), DEFAULT_MAIN_SMOKE_SUSPECT_COMMITS)?;
834        let summary = Self::summarize_smoke_output(&test_run.output);
835        let should_emit = self.main_smoke_state.as_ref().is_none_or(|state| {
836            state.broken_commit.as_deref() != Some(head.as_str())
837                || state.summary.as_deref() != Some(summary.as_str())
838        });
839
840        let last_success_commit = self
841            .main_smoke_state
842            .as_ref()
843            .and_then(|state| state.last_success_commit.clone());
844        self.main_smoke_state = Some(MainSmokeState {
845            broken: true,
846            pause_dispatch: policy.pause_dispatch_on_failure,
847            last_run_at: now_unix(),
848            last_success_commit,
849            broken_commit: Some(head.clone()),
850            suspects: suspects.clone(),
851            summary: Some(summary.clone()),
852        });
853
854        if should_emit {
855            self.emit_event(TeamEvent::main_broken(&head, &suspects, &summary));
856        }
857        self.record_orchestrator_action(format!(
858            "main smoke: BROKEN at {head}; suspects [{}]; {summary}",
859            suspects.join(", ")
860        ));
861
862        if policy.auto_revert {
863            self.maybe_auto_revert_broken_main(&head)?;
864        }
865        Ok(())
866    }
867
868    fn maybe_auto_revert_broken_main(&mut self, broken_commit: &str) -> Result<()> {
869        let parent_line = super::git_cmd::run_git(
870            self.project_root(),
871            &["rev-list", "--parents", "-n", "1", "HEAD"],
872        )
873        .with_context(|| {
874            format!("failed to inspect parents for broken main commit {broken_commit}")
875        })?
876        .stdout;
877        let parent_count = parent_line.split_whitespace().count().saturating_sub(1);
878        let revert_args = if parent_count > 1 {
879            vec!["revert", "-m", "1", "--no-edit", "HEAD"]
880        } else {
881            vec!["revert", "--no-edit", "HEAD"]
882        };
883        let output = Command::new("git")
884            .args(&revert_args)
885            .current_dir(self.project_root())
886            .output()
887            .with_context(|| {
888                format!("failed to launch auto-revert for broken main commit {broken_commit}")
889            })?;
890        if output.status.success() {
891            let reverted_to = Self::short_head_commit(self.project_root())?;
892            info!(
893                broken_commit,
894                reverted_to, "main smoke auto-reverted most recent main commit"
895            );
896            self.record_orchestrator_action(format!(
897                "main smoke: auto-reverted broken commit {broken_commit}; main is now {reverted_to}"
898            ));
899        } else {
900            let stderr = String::from_utf8_lossy(&output.stderr);
901            warn!(
902                broken_commit,
903                error = %stderr.trim(),
904                "main smoke auto-revert failed"
905            );
906            self.record_orchestrator_action(format!(
907                "main smoke: auto-revert failed for {broken_commit} ({})",
908                stderr.trim()
909            ));
910        }
911        Ok(())
912    }
913
914    #[cfg(test)]
915    pub(super) fn set_auto_merge_override(&mut self, task_id: u32, enabled: bool) {
916        self.auto_merge_overrides.insert(task_id, enabled);
917    }
918
919    pub(super) fn auto_merge_override(&self, task_id: u32) -> Option<bool> {
920        // In-memory overrides take priority, then check disk
921        if let Some(&value) = self.auto_merge_overrides.get(&task_id) {
922            return Some(value);
923        }
924        let disk_overrides = super::auto_merge::load_overrides(&self.config.project_root);
925        disk_overrides.get(&task_id).copied()
926    }
927
928    pub(super) fn worktree_dir(&self, engineer: &str) -> PathBuf {
929        let base = self.config.project_root.join(".batty").join("worktrees");
930        match self.member_barrier_group(engineer) {
931            Some(group) if self.config.team_config.workflow_policy.clean_room_mode => {
932                base.join(group).join(engineer)
933            }
934            _ => base.join(engineer),
935        }
936    }
937
938    pub(super) fn board_dir(&self) -> PathBuf {
939        self.config
940            .project_root
941            .join(".batty")
942            .join("team_config")
943            .join("board")
944    }
945
946    pub(super) fn member_uses_worktrees(&self, engineer: &str) -> bool {
947        if !self.is_git_repo && !self.is_multi_repo {
948            return false;
949        }
950        self.config
951            .members
952            .iter()
953            .find(|member| member.name == engineer)
954            .map(|member| member.use_worktrees)
955            .unwrap_or(false)
956    }
957
958    pub(super) fn handoff_dir(&self) -> PathBuf {
959        self.config.project_root.join(
960            self.config
961                .team_config
962                .workflow_policy
963                .handoff_directory
964                .as_str(),
965        )
966    }
967
968    pub(super) fn member_barrier_group(&self, member_name: &str) -> Option<&str> {
969        let member = self
970            .config
971            .members
972            .iter()
973            .find(|member| member.name == member_name)?;
974        self.config
975            .team_config
976            .role_barrier_group(&member.role_name)
977    }
978
979    fn barrier_worktree_root(&self, barrier_group: &str) -> PathBuf {
980        self.config
981            .project_root
982            .join(".batty")
983            .join("worktrees")
984            .join(barrier_group)
985    }
986
987    #[cfg_attr(not(test), allow(dead_code))]
988    pub(super) fn analysis_dir(&self, member_name: &str) -> Result<PathBuf> {
989        let Some(group) = self.member_barrier_group(member_name) else {
990            bail!(
991                "member '{}' is not assigned to a clean-room barrier group",
992                member_name
993            );
994        };
995        if group != "analysis" {
996            bail!(
997                "member '{}' is in barrier group '{}' and cannot write analysis artifacts",
998                member_name,
999                group
1000            );
1001        }
1002
1003        Ok(self.worktree_dir(member_name).join("analysis"))
1004    }
1005
1006    pub(super) fn validate_member_work_dir(
1007        &self,
1008        member_name: &str,
1009        work_dir: &Path,
1010    ) -> Result<()> {
1011        if !self.config.team_config.workflow_policy.clean_room_mode {
1012            return Ok(());
1013        }
1014
1015        let expected = self.worktree_dir(member_name);
1016        if work_dir == expected {
1017            return Ok(());
1018        }
1019        bail!(
1020            "clean-room barrier violation: member '{}' launch dir '{}' does not match '{}'",
1021            member_name,
1022            work_dir.display(),
1023            expected.display()
1024        );
1025    }
1026
1027    #[cfg_attr(not(test), allow(dead_code))]
1028    pub(super) fn validate_member_barrier_path(
1029        &mut self,
1030        member_name: &str,
1031        path: &Path,
1032        access: &str,
1033    ) -> Result<()> {
1034        if !self.config.team_config.workflow_policy.clean_room_mode {
1035            return Ok(());
1036        }
1037
1038        let Some(group) = self.member_barrier_group(member_name) else {
1039            return Ok(());
1040        };
1041        let member_root = self.worktree_dir(member_name);
1042        let barrier_root = self.barrier_worktree_root(group);
1043        let handoff_root = self.handoff_dir();
1044        if path.starts_with(&member_root)
1045            || path.starts_with(&barrier_root)
1046            || path.starts_with(&handoff_root)
1047        {
1048            return Ok(());
1049        }
1050
1051        self.record_barrier_violation_attempt(
1052            member_name,
1053            &path.display().to_string(),
1054            &format!("{access} outside barrier group '{group}'"),
1055        );
1056        bail!(
1057            "clean-room barrier violation: '{}' cannot {} '{}'",
1058            member_name,
1059            access,
1060            path.display()
1061        );
1062    }
1063
1064    #[cfg_attr(not(test), allow(dead_code))]
1065    pub(super) fn write_handoff_artifact(
1066        &mut self,
1067        author_role: &str,
1068        relative_path: &Path,
1069        content: &[u8],
1070    ) -> Result<PathBuf> {
1071        if relative_path.is_absolute()
1072            || relative_path
1073                .components()
1074                .any(|component| matches!(component, std::path::Component::ParentDir))
1075        {
1076            self.record_barrier_violation_attempt(
1077                author_role,
1078                &relative_path.display().to_string(),
1079                "handoff writes must stay within the shared handoff directory",
1080            );
1081            bail!(
1082                "invalid handoff artifact path '{}': must be relative and stay under handoff/",
1083                relative_path.display()
1084            );
1085        }
1086        let handoff_root = self.handoff_dir();
1087        let artifact_path = handoff_root.join(relative_path);
1088        let Some(parent) = artifact_path.parent() else {
1089            bail!(
1090                "handoff artifact path '{}' has no parent",
1091                artifact_path.display()
1092            );
1093        };
1094        std::fs::create_dir_all(parent)
1095            .with_context(|| format!("failed to create {}", parent.display()))?;
1096        std::fs::write(&artifact_path, content)
1097            .with_context(|| format!("failed to write {}", artifact_path.display()))?;
1098
1099        let content_hash = format!("{:x}", Sha256::digest(content));
1100        self.record_barrier_artifact_created(
1101            author_role,
1102            &artifact_path.display().to_string(),
1103            &content_hash,
1104        );
1105        Ok(artifact_path)
1106    }
1107
1108    #[cfg_attr(not(test), allow(dead_code))]
1109    pub(super) fn write_analysis_artifact(
1110        &mut self,
1111        author_role: &str,
1112        relative_path: &Path,
1113        content: &[u8],
1114    ) -> Result<PathBuf> {
1115        if relative_path.is_absolute()
1116            || relative_path
1117                .components()
1118                .any(|component| matches!(component, std::path::Component::ParentDir))
1119        {
1120            self.record_barrier_violation_attempt(
1121                author_role,
1122                &relative_path.display().to_string(),
1123                "analysis artifact writes must stay within the analysis worktree",
1124            );
1125            bail!(
1126                "invalid analysis artifact path '{}': must be relative and stay under analysis/",
1127                relative_path.display()
1128            );
1129        }
1130
1131        let artifact_path = self.analysis_dir(author_role)?.join(relative_path);
1132        let Some(parent) = artifact_path.parent() else {
1133            bail!(
1134                "analysis artifact path '{}' has no parent",
1135                artifact_path.display()
1136            );
1137        };
1138        std::fs::create_dir_all(parent)
1139            .with_context(|| format!("failed to create {}", parent.display()))?;
1140        std::fs::write(&artifact_path, content)
1141            .with_context(|| format!("failed to write {}", artifact_path.display()))?;
1142
1143        let content_hash = format!("{:x}", Sha256::digest(content));
1144        self.record_barrier_artifact_created(
1145            author_role,
1146            &artifact_path.display().to_string(),
1147            &content_hash,
1148        );
1149        Ok(artifact_path)
1150    }
1151
1152    #[cfg_attr(not(test), allow(dead_code))]
1153    pub(super) fn run_skoolkit_disassembly(
1154        &mut self,
1155        author_role: &str,
1156        snapshot_path: &Path,
1157        output_relative_path: &Path,
1158    ) -> Result<PathBuf> {
1159        let snapshot_extension = snapshot_path
1160            .extension()
1161            .and_then(|ext| ext.to_str())
1162            .map(|ext| ext.to_ascii_lowercase());
1163        if !matches!(snapshot_extension.as_deref(), Some("z80" | "sna")) {
1164            bail!(
1165                "unsupported SkoolKit snapshot '{}': expected .z80 or .sna",
1166                snapshot_path.display()
1167            );
1168        }
1169
1170        let sna2skool =
1171            std::env::var("BATTY_SKOOLKIT_SNA2SKOOL").unwrap_or_else(|_| "sna2skool".to_string());
1172        let output = std::process::Command::new(&sna2skool)
1173            .arg(snapshot_path)
1174            .output()
1175            .with_context(|| {
1176                format!(
1177                    "failed to launch '{}' for snapshot '{}'",
1178                    sna2skool,
1179                    snapshot_path.display()
1180                )
1181            })?;
1182        if !output.status.success() {
1183            bail!(
1184                "SkoolKit disassembly failed for '{}': {}",
1185                snapshot_path.display(),
1186                String::from_utf8_lossy(&output.stderr).trim()
1187            );
1188        }
1189
1190        self.write_analysis_artifact(author_role, output_relative_path, &output.stdout)
1191    }
1192
1193    #[cfg_attr(not(test), allow(dead_code))]
1194    pub(super) fn run_ghidra_disassembly(
1195        &mut self,
1196        author_role: &str,
1197        binary_path: &Path,
1198        output_relative_path: &Path,
1199    ) -> Result<PathBuf> {
1200        let backend = CleanroomBackend::detect(binary_path)?;
1201        if backend != CleanroomBackend::Ghidra {
1202            bail!(
1203                "unsupported Ghidra target '{}': expected .nes, .gb, .gbc, .com, or .exe",
1204                binary_path.display()
1205            );
1206        }
1207
1208        let analyze_headless = std::env::var("BATTY_GHIDRA_HEADLESS")
1209            .unwrap_or_else(|_| "analyzeHeadless".to_string());
1210        let output = std::process::Command::new(&analyze_headless)
1211            .arg(binary_path)
1212            .output()
1213            .with_context(|| {
1214                format!(
1215                    "failed to launch '{}' for target '{}'",
1216                    analyze_headless,
1217                    binary_path.display()
1218                )
1219            })?;
1220        if !output.status.success() {
1221            bail!(
1222                "Ghidra disassembly failed for '{}': {}",
1223                binary_path.display(),
1224                String::from_utf8_lossy(&output.stderr).trim()
1225            );
1226        }
1227
1228        self.write_analysis_artifact(author_role, output_relative_path, &output.stdout)
1229    }
1230
1231    #[cfg_attr(not(test), allow(dead_code))]
1232    pub(super) fn run_cleanroom_disassembly(
1233        &mut self,
1234        author_role: &str,
1235        input_path: &Path,
1236        output_relative_path: &Path,
1237    ) -> Result<PathBuf> {
1238        match CleanroomBackend::detect(input_path)? {
1239            CleanroomBackend::SkoolKit => {
1240                self.run_skoolkit_disassembly(author_role, input_path, output_relative_path)
1241            }
1242            CleanroomBackend::Ghidra => {
1243                self.run_ghidra_disassembly(author_role, input_path, output_relative_path)
1244            }
1245        }
1246    }
1247
1248    #[cfg_attr(not(test), allow(dead_code))]
1249    pub(super) fn read_handoff_artifact(
1250        &mut self,
1251        reader_role: &str,
1252        relative_path: &Path,
1253    ) -> Result<Vec<u8>> {
1254        if relative_path.is_absolute()
1255            || relative_path
1256                .components()
1257                .any(|component| matches!(component, std::path::Component::ParentDir))
1258        {
1259            self.record_barrier_violation_attempt(
1260                reader_role,
1261                &relative_path.display().to_string(),
1262                "handoff reads must stay within the shared handoff directory",
1263            );
1264            bail!(
1265                "invalid handoff artifact path '{}': must be relative and stay under handoff/",
1266                relative_path.display()
1267            );
1268        }
1269        let artifact_path = self.handoff_dir().join(relative_path);
1270        self.validate_member_barrier_path(reader_role, &artifact_path, "read")?;
1271        let content = std::fs::read(&artifact_path)
1272            .with_context(|| format!("failed to read {}", artifact_path.display()))?;
1273        let content_hash = format!("{:x}", Sha256::digest(&content));
1274        self.record_barrier_artifact_read(
1275            reader_role,
1276            &artifact_path.display().to_string(),
1277            &content_hash,
1278        );
1279        Ok(content)
1280    }
1281
1282    pub(super) fn manager_name(&self, engineer: &str) -> Option<String> {
1283        self.config
1284            .members
1285            .iter()
1286            .find(|member| member.name == engineer)
1287            .and_then(|member| member.reports_to.clone())
1288    }
1289
1290    pub(super) fn architect_names(&self) -> Vec<String> {
1291        self.config
1292            .members
1293            .iter()
1294            .filter(|member| member.role_type == RoleType::Architect)
1295            .map(|member| member.name.clone())
1296            .collect()
1297    }
1298
1299    fn short_head_commit(project_root: &Path) -> Result<String> {
1300        Ok(
1301            super::git_cmd::run_git(project_root, &["rev-parse", "--short", "HEAD"])?
1302                .stdout
1303                .trim()
1304                .to_string(),
1305        )
1306    }
1307
1308    fn recent_main_suspects(project_root: &Path, count: usize) -> Result<Vec<String>> {
1309        let limit = count.max(1).to_string();
1310        let output = super::git_cmd::run_git(
1311            project_root,
1312            &["log", "--format=%h %s", "-n", limit.as_str(), "main"],
1313        )?;
1314        Ok(output
1315            .stdout
1316            .lines()
1317            .map(str::trim)
1318            .filter(|line| !line.is_empty())
1319            .map(str::to_string)
1320            .collect())
1321    }
1322
1323    fn summarize_smoke_output(output: &str) -> String {
1324        // Cargo emits ANSI color codes when CARGO_TERM_COLOR=always (set by CI),
1325        // which would bypass the starts_with checks below. Strip them first.
1326        fn strip_ansi(s: &str) -> String {
1327            let mut out = String::with_capacity(s.len());
1328            let mut chars = s.chars().peekable();
1329            while let Some(c) = chars.next() {
1330                if c == '\u{1b}' && chars.peek() == Some(&'[') {
1331                    chars.next();
1332                    for esc in chars.by_ref() {
1333                        if esc.is_ascii_alphabetic() {
1334                            break;
1335                        }
1336                    }
1337                } else {
1338                    out.push(c);
1339                }
1340            }
1341            out
1342        }
1343
1344        let summary = output
1345            .lines()
1346            .map(|line| strip_ansi(line).trim().to_string())
1347            .find(|line| {
1348                !line.is_empty()
1349                    && !line.starts_with("Compiling ")
1350                    && !line.starts_with("Checking ")
1351                    && !line.starts_with("Blocking waiting for file lock")
1352                    && !line.starts_with("Finished ")
1353                    && !line.starts_with("Running ")
1354            })
1355            .unwrap_or_else(|| "main smoke command failed".to_string());
1356        summary.chars().take(240).collect()
1357    }
1358
1359    #[cfg(test)]
1360    pub(super) fn set_active_task_for_test(&mut self, engineer: &str, task_id: u32) {
1361        self.active_tasks.insert(engineer.to_string(), task_id);
1362    }
1363
1364    #[cfg(test)]
1365    pub(super) fn retry_count_for_test(&self, engineer: &str) -> Option<u32> {
1366        self.retry_counts.get(engineer).copied()
1367    }
1368
1369    #[cfg(test)]
1370    pub(super) fn member_state_for_test(&self, engineer: &str) -> Option<MemberState> {
1371        self.states.get(engineer).copied()
1372    }
1373
1374    #[cfg(test)]
1375    pub(super) fn set_member_state_for_test(&mut self, engineer: &str, state: MemberState) {
1376        self.states.insert(engineer.to_string(), state);
1377    }
1378
1379    #[cfg(test)]
1380    pub(crate) fn queued_merge_count_for_test(&self) -> usize {
1381        self.merge_queue.queued_len()
1382    }
1383
1384    #[cfg(test)]
1385    pub(crate) fn process_merge_queue_for_test(&mut self) -> Result<()> {
1386        self.process_merge_queue()
1387    }
1388
1389    pub(super) fn increment_retry(&mut self, engineer: &str) -> u32 {
1390        let count = self.retry_counts.entry(engineer.to_string()).or_insert(0);
1391        *count += 1;
1392        *count
1393    }
1394
1395    pub(super) fn response_is_stalled_mid_turn(&self, response: &str) -> bool {
1396        response
1397            .lines()
1398            .next()
1399            .is_some_and(|line| line.contains(STALLED_MID_TURN_MARKER))
1400    }
1401
1402    pub(super) fn handle_stalled_mid_turn_completion(
1403        &mut self,
1404        member_name: &str,
1405        response: &str,
1406    ) -> Result<bool> {
1407        if !self.response_is_stalled_mid_turn(response) {
1408            return Ok(false);
1409        }
1410
1411        let attempt = self.increment_retry(member_name);
1412        if let Some(backoff_secs) = stalled_mid_turn_backoff_secs(attempt) {
1413            warn!(
1414                member = member_name,
1415                attempt,
1416                backoff_secs,
1417                "shim reported stalled mid-turn completion; retrying after backoff"
1418            );
1419            self.record_orchestrator_action(format!(
1420                "stall: shim reported stalled mid-turn for {member_name}; retry {attempt} after {backoff_secs}s"
1421            ));
1422            sleep_stalled_mid_turn_backoff(Duration::from_secs(backoff_secs));
1423
1424            let retry_notice = format!(
1425                "Claude SDK stalled mid-turn and Batty released the stuck turn. Waited {backoff_secs}s before retrying.\n{response}\n\nContinue from the current worktree state. Do not restart or discard prior work unless the task requires it."
1426            );
1427            self.queue_message("daemon", member_name, &retry_notice)?;
1428            self.mark_member_working(member_name);
1429            return Ok(true);
1430        }
1431
1432        warn!(
1433            member = member_name,
1434            attempt, "shim reported stalled mid-turn completion; restarting agent"
1435        );
1436        self.record_orchestrator_action(format!(
1437            "stall: shim reported stalled mid-turn for {member_name}; restarting on attempt {attempt}"
1438        ));
1439        self.restart_member_with_task_context(member_name, "stalled mid-turn")?;
1440        if let Some(task_id) = self.active_task_id(member_name) {
1441            self.record_agent_restarted(
1442                member_name,
1443                task_id.to_string(),
1444                "stalled_mid_turn",
1445                attempt,
1446            );
1447        }
1448        Ok(true)
1449    }
1450
1451    pub(super) fn clear_active_task(&mut self, engineer: &str) {
1452        if let Some(task_id) = self.active_tasks.remove(engineer) {
1453            self.narration_rejection_counts.remove(&task_id);
1454            self.zero_diff_completion_counts.remove(&task_id);
1455        }
1456        self.retry_counts.remove(engineer);
1457        self.verification_states.remove(engineer);
1458        // Clean up any progress checkpoint left from a prior restart.
1459        super::checkpoint::remove_checkpoint(&self.config.project_root, engineer);
1460        let work_dir = self
1461            .config
1462            .members
1463            .iter()
1464            .find(|member| member.name == engineer)
1465            .map(|member| self.member_work_dir(member))
1466            .unwrap_or_else(|| self.config.project_root.clone());
1467        super::checkpoint::remove_restart_context(&work_dir);
1468    }
1469
1470    pub(super) fn note_zero_diff_completion(&mut self, task_id: u32) -> u32 {
1471        let count = self.zero_diff_completion_counts.entry(task_id).or_insert(0);
1472        *count += 1;
1473        *count
1474    }
1475
1476    pub(super) fn clear_zero_diff_completion(&mut self, task_id: u32) {
1477        self.zero_diff_completion_counts.remove(&task_id);
1478    }
1479
1480    /// Remove active_task entries for tasks that are done, archived, or no longer on the board.
1481    pub(super) fn notify_reports_to(&mut self, from_role: &str, msg: &str) -> Result<()> {
1482        let parent = self
1483            .config
1484            .members
1485            .iter()
1486            .find(|m| m.name == from_role)
1487            .and_then(|m| m.reports_to.clone());
1488        let Some(parent_name) = parent else {
1489            return Ok(());
1490        };
1491        self.queue_message(from_role, &parent_name, msg)?;
1492        self.mark_member_working(&parent_name);
1493        Ok(())
1494    }
1495
1496    pub(super) fn notify_architects(&mut self, msg: &str) -> Result<()> {
1497        for architect in self.architect_names() {
1498            self.queue_message("daemon", &architect, msg)?;
1499            self.mark_member_working(&architect);
1500        }
1501        Ok(())
1502    }
1503
1504    /// Update automation countdowns when a member's state changes.
1505    pub(super) fn update_automation_timers_for_state(
1506        &mut self,
1507        member_name: &str,
1508        new_state: MemberState,
1509    ) {
1510        match new_state {
1511            MemberState::Idle => {
1512                self.idle_started_at
1513                    .insert(member_name.to_string(), Instant::now());
1514            }
1515            MemberState::Working => {
1516                self.idle_started_at.remove(member_name);
1517            }
1518        }
1519        self.update_nudge_for_state(member_name, new_state);
1520        standup::update_timer_for_state(
1521            &self.config.team_config,
1522            &self.config.members,
1523            &mut self.paused_standups,
1524            &mut self.last_standup,
1525            member_name,
1526            new_state,
1527        );
1528        self.update_triage_intervention_for_state(member_name, new_state);
1529    }
1530}
1531
1532fn stalled_mid_turn_backoff_secs(attempt: u32) -> Option<u64> {
1533    STALLED_MID_TURN_RETRY_BACKOFF_SECS
1534        .get(attempt.saturating_sub(1) as usize)
1535        .copied()
1536}
1537
1538#[cfg(not(test))]
1539fn sleep_stalled_mid_turn_backoff(duration: Duration) {
1540    std::thread::sleep(duration);
1541}
1542
1543#[cfg(test)]
1544fn sleep_stalled_mid_turn_backoff(_duration: Duration) {}
1545
1546#[cfg(test)]
1547#[path = "daemon/tests.rs"]
1548mod tests;
1549
1550#[cfg(test)]
1551mod stalled_mid_turn_tests {
1552    use super::*;
1553    use crate::team::inbox;
1554    use crate::team::test_support::{TestDaemonBuilder, engineer_member, write_owned_task_file};
1555
1556    #[test]
1557    fn stalled_mid_turn_backoff_schedule_matches_task_requirements() {
1558        assert_eq!(stalled_mid_turn_backoff_secs(1), Some(30));
1559        assert_eq!(stalled_mid_turn_backoff_secs(2), Some(60));
1560        assert_eq!(stalled_mid_turn_backoff_secs(3), None);
1561    }
1562
1563    #[test]
1564    fn stalled_mid_turn_detection_matches_marker_prefix() {
1565        let tmp = tempfile::tempdir().unwrap();
1566        let daemon = TestDaemonBuilder::new(tmp.path()).build();
1567        assert!(daemon.response_is_stalled_mid_turn(
1568            "stalled mid-turn: no stdout from Claude SDK for 120s while working."
1569        ));
1570        assert!(!daemon.response_is_stalled_mid_turn("normal completion"));
1571    }
1572
1573    #[test]
1574    fn stalled_mid_turn_first_retry_requeues_message_after_backoff() {
1575        let tmp = tempfile::tempdir().unwrap();
1576        let member_name = "eng-1";
1577        write_owned_task_file(tmp.path(), 42, "sdk-stall", "in-progress", member_name);
1578        let inbox_root = inbox::inboxes_root(tmp.path());
1579        inbox::init_inbox(&inbox_root, member_name).unwrap();
1580
1581        let mut daemon = TestDaemonBuilder::new(tmp.path())
1582            .members(vec![engineer_member(member_name, Some("manager"), true)])
1583            .build();
1584        daemon.active_tasks.insert(member_name.to_string(), 42);
1585
1586        let handled = daemon
1587            .handle_stalled_mid_turn_completion(
1588                member_name,
1589                "stalled mid-turn: no stdout from Claude SDK for 120s while working.\nlast_sent_message_from: manager",
1590            )
1591            .unwrap();
1592
1593        assert!(handled);
1594        assert_eq!(daemon.retry_count_for_test(member_name), Some(1));
1595        assert_eq!(
1596            daemon.member_state_for_test(member_name),
1597            Some(MemberState::Working)
1598        );
1599
1600        let inbox_entries = inbox::pending_messages(&inbox_root, member_name).unwrap();
1601        assert_eq!(inbox_entries.len(), 1);
1602        assert!(inbox_entries[0].body.contains("Waited 30s before retrying"));
1603        assert!(inbox_entries[0].body.contains("stalled mid-turn"));
1604    }
1605}