Skip to main content

batty_cli/team/
daemon.rs

1//! Core team daemon: poll loop, lifecycle coordination, and routing.
2//!
3//! `TeamDaemon` owns the long-running control loop for a Batty team session.
4//! It starts and resumes member agents, polls tmux-backed watchers, routes
5//! messages across panes, inboxes, and external channels, persists runtime
6//! state, and runs periodic automation such as standups and board rotation.
7//!
8//! Focused subsystems that were extracted from this file stay close to the
9//! daemon boundary:
10//! - `merge` handles engineer completion, test gating, and merge/escalation
11//!   flow once a task is reported done.
12//! - `interventions` handles idle nudges and manager/architect intervention
13//!   automation without changing the daemon's main control flow.
14//!
15//! This module remains the integration layer that sequences those subsystems
16//! inside each poll iteration.
17
18use std::collections::{HashMap, HashSet};
19use std::fs;
20use std::path::{Path, PathBuf};
21use std::time::{Duration, Instant, SystemTime};
22
23use anyhow::{Context, Result, bail};
24use serde::{Deserialize, Serialize};
25use tracing::{debug, info, warn};
26use uuid::Uuid;
27
28use super::board;
29use super::board_cmd;
30use super::comms::{self, Channel};
31#[cfg(test)]
32use super::config::OrchestratorPosition;
33use super::config::{RoleType, TeamConfig};
34use super::delivery::{FailedDelivery, PendingMessage};
35use super::events::EventSink;
36use super::events::TeamEvent;
37use super::failure_patterns::FailureTracker;
38use super::hierarchy::MemberInstance;
39use super::inbox;
40use super::merge;
41use super::standup::{self, MemberState};
42use super::status;
43use super::task_cmd;
44#[cfg(test)]
45use super::task_loop::next_unclaimed_task;
46use super::task_loop::{
47    branch_is_merged_into, checkout_worktree_branch_from_main, current_worktree_branch,
48    engineer_base_branch_name, is_worktree_safe_to_mutate, setup_engineer_worktree,
49};
50use super::watcher::{SessionTrackerConfig, SessionWatcher, WatcherState};
51use super::{AssignmentDeliveryResult, AssignmentResultStatus, now_unix, store_assignment_result};
52use crate::agent::{self, BackendHealth};
53use crate::tmux;
54use dispatch::DispatchQueueEntry;
55
56#[path = "daemon/automation.rs"]
57mod automation;
58#[path = "dispatch/mod.rs"]
59mod dispatch;
60#[path = "daemon/error_handling.rs"]
61mod error_handling;
62#[path = "daemon/health.rs"]
63mod health;
64#[path = "daemon/interventions/mod.rs"]
65mod interventions;
66#[path = "launcher.rs"]
67mod launcher;
68#[path = "telegram_bridge.rs"]
69mod telegram_bridge;
70#[path = "daemon/telemetry.rs"]
71mod telemetry;
72
73#[cfg(test)]
74use self::dispatch::normalized_assignment_dir;
75pub(crate) use self::interventions::NudgeSchedule;
76use self::interventions::OwnedTaskInterventionState;
77use self::launcher::{
78    duplicate_claude_session_ids, load_launch_state, member_session_tracker_config,
79};
80pub(super) use super::delivery::MessageDelivery;
81
82/// Daemon configuration derived from TeamConfig.
83pub struct DaemonConfig {
84    pub project_root: PathBuf,
85    pub team_config: TeamConfig,
86    pub session: String,
87    pub members: Vec<MemberInstance>,
88    pub pane_map: HashMap<String, String>,
89}
90
91const HOT_RELOAD_CHECK_INTERVAL: Duration = Duration::from_secs(30);
92const HOT_RELOAD_MIN_INTERVAL: Duration = Duration::from_secs(30);
93
94/// The running team daemon.
95pub struct TeamDaemon {
96    pub(super) config: DaemonConfig,
97    pub(super) watchers: HashMap<String, SessionWatcher>,
98    pub(super) states: HashMap<String, MemberState>,
99    pub(super) idle_started_at: HashMap<String, Instant>,
100    pub(super) active_tasks: HashMap<String, u32>,
101    pub(super) retry_counts: HashMap<String, u32>,
102    pub(super) dispatch_queue: Vec<DispatchQueueEntry>,
103    pub(super) triage_idle_epochs: HashMap<String, u64>,
104    pub(super) triage_interventions: HashMap<String, u64>,
105    pub(super) owned_task_interventions: HashMap<String, OwnedTaskInterventionState>,
106    pub(super) intervention_cooldowns: HashMap<String, Instant>,
107    pub(super) channels: HashMap<String, Box<dyn Channel>>,
108    pub(super) nudges: HashMap<String, NudgeSchedule>,
109    pub(super) telegram_bot: Option<super::telegram::TelegramBot>,
110    pub(super) failure_tracker: FailureTracker,
111    pub(super) event_sink: EventSink,
112    pub(super) paused_standups: HashSet<String>,
113    pub(super) last_standup: HashMap<String, Instant>,
114    pub(super) last_board_rotation: Instant,
115    pub(super) last_auto_archive: Instant,
116    pub(super) last_auto_dispatch: Instant,
117    pub(super) pipeline_starvation_fired: bool,
118    pub(super) pipeline_starvation_last_fired: Option<Instant>,
119    pub(super) retro_generated: bool,
120    pub(super) failed_deliveries: Vec<FailedDelivery>,
121    pub(super) review_first_seen: HashMap<u32, u64>,
122    pub(super) review_nudge_sent: HashSet<u32>,
123    pub(super) poll_interval: Duration,
124    pub(super) is_git_repo: bool,
125    /// Consecutive error counts per recoverable subsystem name.
126    pub(super) subsystem_error_counts: HashMap<String, u32>,
127    pub(super) auto_merge_overrides: HashMap<u32, bool>,
128    /// Tracks recent (task_id, engineer) dispatch pairs for deduplication.
129    pub(super) recent_dispatches: HashMap<(u32, String), Instant>,
130    /// SQLite telemetry database connection (None if open failed).
131    pub(super) telemetry_db: Option<rusqlite::Connection>,
132    /// Timestamp of the last manual assignment per engineer (for cooldown).
133    pub(super) manual_assign_cooldowns: HashMap<String, Instant>,
134    /// Per-member agent backend health state.
135    pub(super) backend_health: HashMap<String, BackendHealth>,
136    /// When the last periodic health check was run.
137    pub(super) last_health_check: Instant,
138    /// Rate-limiting: last time each engineer received an uncommitted-work warning.
139    pub(super) last_uncommitted_warn: HashMap<String, Instant>,
140    /// Messages deferred because the target agent was still starting.
141    /// Drained automatically when the agent transitions to ready.
142    pub(super) pending_delivery_queue: HashMap<String, Vec<PendingMessage>>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146struct MemberWorktreeContext {
147    path: PathBuf,
148    branch: Option<String>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
152struct PersistedNudgeState {
153    idle_elapsed_secs: Option<u64>,
154    fired_this_idle: bool,
155    paused: bool,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
159struct PersistedDaemonState {
160    clean_shutdown: bool,
161    saved_at: u64,
162    states: HashMap<String, MemberState>,
163    active_tasks: HashMap<String, u32>,
164    retry_counts: HashMap<String, u32>,
165    #[serde(default)]
166    dispatch_queue: Vec<DispatchQueueEntry>,
167    paused_standups: HashSet<String>,
168    last_standup_elapsed_secs: HashMap<String, u64>,
169    nudge_state: HashMap<String, PersistedNudgeState>,
170    pipeline_starvation_fired: bool,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174struct BinaryFingerprint {
175    path: PathBuf,
176    modified: SystemTime,
177    len: u64,
178    #[cfg(unix)]
179    inode: u64,
180}
181
182impl BinaryFingerprint {
183    fn capture(path: &Path) -> Result<Self> {
184        let metadata =
185            fs::metadata(path).with_context(|| format!("failed to stat {}", path.display()))?;
186        let modified = metadata
187            .modified()
188            .with_context(|| format!("failed to read mtime for {}", path.display()))?;
189        Ok(Self {
190            path: path.to_path_buf(),
191            modified,
192            len: metadata.len(),
193            #[cfg(unix)]
194            inode: std::os::unix::fs::MetadataExt::ino(&metadata),
195        })
196    }
197
198    fn changed_from(&self, previous: &Self) -> bool {
199        self.modified != previous.modified || self.len != previous.len || {
200            #[cfg(unix)]
201            {
202                self.inode != previous.inode
203            }
204            #[cfg(not(unix))]
205            {
206                false
207            }
208        }
209    }
210}
211
212#[derive(Debug, Clone)]
213struct HotReloadMonitor {
214    binary: BinaryFingerprint,
215    last_checked: Instant,
216    last_reload_attempt: Option<Instant>,
217}
218
219impl HotReloadMonitor {
220    fn new(binary: BinaryFingerprint) -> Self {
221        Self {
222            binary,
223            last_checked: Instant::now(),
224            last_reload_attempt: None,
225        }
226    }
227
228    fn for_current_exe() -> Result<Self> {
229        let path = std::env::current_exe().context("failed to resolve current executable")?;
230        Ok(Self::new(BinaryFingerprint::capture(&path)?))
231    }
232
233    fn should_check(&self) -> bool {
234        self.last_checked.elapsed() >= HOT_RELOAD_CHECK_INTERVAL
235    }
236
237    fn changed_binary(&mut self) -> Result<Option<BinaryFingerprint>> {
238        self.last_checked = Instant::now();
239        let current = BinaryFingerprint::capture(&self.binary.path)?;
240        Ok(current.changed_from(&self.binary).then_some(current))
241    }
242
243    fn can_attempt_reload(&self) -> bool {
244        self.last_reload_attempt
245            .map(|instant| instant.elapsed() >= HOT_RELOAD_MIN_INTERVAL)
246            .unwrap_or(true)
247    }
248
249    fn mark_reload_attempt(&mut self) {
250        self.last_reload_attempt = Some(Instant::now());
251    }
252}
253
254impl TeamDaemon {
255    pub(super) fn watcher_mut(&mut self, name: &str) -> Result<&mut SessionWatcher> {
256        self.watchers
257            .get_mut(name)
258            .with_context(|| format!("watcher registry missing member '{name}'"))
259    }
260
261    /// Create a new daemon from resolved config and layout.
262    pub fn new(config: DaemonConfig) -> Result<Self> {
263        let is_git_repo = super::git_cmd::is_git_repo(&config.project_root);
264        if !is_git_repo {
265            info!("Project is not a git repository \u{2014} git operations disabled");
266        }
267
268        let team_config_dir = config.project_root.join(".batty").join("team_config");
269        let events_path = team_config_dir.join("events.jsonl");
270        let event_sink =
271            EventSink::new_with_max_bytes(&events_path, config.team_config.event_log_max_bytes)?;
272
273        // Create watchers for each pane member
274        let mut watchers = HashMap::new();
275        let stale_secs = config.team_config.standup.interval_secs * 2;
276        for (name, pane_id) in &config.pane_map {
277            let session_tracker = config
278                .members
279                .iter()
280                .find(|member| member.name == *name)
281                .and_then(|member| member_session_tracker_config(&config.project_root, member));
282            watchers.insert(
283                name.clone(),
284                SessionWatcher::new(pane_id, name, stale_secs, session_tracker),
285            );
286        }
287
288        // Create channels for user roles
289        let mut channels: HashMap<String, Box<dyn Channel>> = HashMap::new();
290        for role in &config.team_config.roles {
291            if role.role_type == RoleType::User {
292                if let (Some(ch_type), Some(ch_config)) = (&role.channel, &role.channel_config) {
293                    match comms::channel_from_config(ch_type, ch_config) {
294                        Ok(ch) => {
295                            channels.insert(role.name.clone(), ch);
296                        }
297                        Err(e) => {
298                            warn!(role = %role.name, error = %e, "failed to create channel");
299                        }
300                    }
301                }
302            }
303        }
304
305        // Create Telegram bot for inbound polling (if configured)
306        let telegram_bot = telegram_bridge::build_telegram_bot(&config.team_config);
307
308        let states = HashMap::new();
309
310        // Build nudge schedules from role configs + prompt files
311        let mut nudges = HashMap::new();
312        for role in &config.team_config.roles {
313            if let Some(interval_secs) = role.nudge_interval_secs {
314                let prompt_path =
315                    role_prompt_path(&team_config_dir, role.prompt.as_deref(), role.role_type);
316                if let Some(nudge_text) = extract_nudge_section(&prompt_path) {
317                    // Apply nudge to all instances of this role
318                    let instance_names: Vec<String> = config
319                        .members
320                        .iter()
321                        .filter(|m| m.role_name == role.name)
322                        .map(|m| m.name.clone())
323                        .collect();
324                    for name in instance_names {
325                        info!(member = %name, interval_secs, "registered nudge");
326                        nudges.insert(
327                            name,
328                            NudgeSchedule {
329                                text: nudge_text.clone(),
330                                interval: Duration::from_secs(interval_secs),
331                                // All roles start idle, so begin the countdown
332                                idle_since: Some(Instant::now()),
333                                fired_this_idle: false,
334                                paused: false,
335                            },
336                        );
337                    }
338                }
339            }
340        }
341
342        // Open telemetry database (best-effort — log and continue if it fails).
343        let telemetry_db = match super::telemetry_db::open(&config.project_root) {
344            Ok(conn) => {
345                info!("telemetry database opened");
346                Some(conn)
347            }
348            Err(error) => {
349                warn!(error = %error, "failed to open telemetry database; telemetry disabled");
350                None
351            }
352        };
353
354        Ok(Self {
355            config,
356            watchers,
357            states,
358            idle_started_at: HashMap::new(),
359            active_tasks: HashMap::new(),
360            retry_counts: HashMap::new(),
361            dispatch_queue: Vec::new(),
362            triage_idle_epochs: HashMap::new(),
363            triage_interventions: HashMap::new(),
364            owned_task_interventions: HashMap::new(),
365            intervention_cooldowns: HashMap::new(),
366            channels,
367            nudges,
368            telegram_bot,
369            failure_tracker: FailureTracker::new(20),
370            event_sink,
371            paused_standups: HashSet::new(),
372            last_standup: HashMap::new(),
373            last_board_rotation: Instant::now(),
374            last_auto_archive: Instant::now(),
375            last_auto_dispatch: Instant::now(),
376            pipeline_starvation_fired: false,
377            pipeline_starvation_last_fired: None,
378            retro_generated: false,
379            failed_deliveries: Vec::new(),
380            review_first_seen: HashMap::new(),
381            review_nudge_sent: HashSet::new(),
382            poll_interval: Duration::from_secs(5),
383            is_git_repo,
384            subsystem_error_counts: HashMap::new(),
385            auto_merge_overrides: HashMap::new(),
386            recent_dispatches: HashMap::new(),
387            telemetry_db,
388            manual_assign_cooldowns: HashMap::new(),
389            backend_health: HashMap::new(),
390            // Start far enough in the past to trigger an immediate check.
391            last_health_check: Instant::now() - Duration::from_secs(3600),
392            last_uncommitted_warn: HashMap::new(),
393            pending_delivery_queue: HashMap::new(),
394        })
395    }
396
397    pub(super) fn member_nudge_text(&self, member: &MemberInstance) -> Option<String> {
398        let prompt_path = role_prompt_path(
399            &super::team_config_dir(&self.config.project_root),
400            member.prompt.as_deref(),
401            member.role_type,
402        );
403        extract_nudge_section(&prompt_path)
404    }
405
406    pub(super) fn prepend_member_nudge(
407        &self,
408        member: &MemberInstance,
409        body: impl AsRef<str>,
410    ) -> String {
411        let body = body.as_ref();
412        match self.member_nudge_text(member) {
413            Some(nudge) => format!("{nudge}\n\n{body}"),
414            None => body.to_string(),
415        }
416    }
417
418    /// Run the daemon loop. Blocks until the session is killed or an error occurs.
419    ///
420    /// If `resume` is true, agents are launched with session-resume flags
421    /// (`claude --resume <session-id>` / `codex resume --last`) instead of fresh starts.
422    pub fn run(&mut self, resume: bool) -> Result<()> {
423        self.record_daemon_started();
424        self.acknowledge_hot_reload_marker();
425        info!(session = %self.config.session, resume, "daemon started");
426        self.record_orchestrator_action(format!(
427            "runtime: orchestrator started (mode={}, resume={resume})",
428            self.config.team_config.workflow_mode.as_str()
429        ));
430
431        // Install signal handler so we log clean shutdowns
432        let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
433        let flag_clone = shutdown_flag.clone();
434        if let Err(e) = ctrlc::set_handler(move || {
435            flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
436        }) {
437            warn!(error = %e, "failed to install signal handler");
438        }
439
440        self.run_startup_preflight()?;
441
442        // Spawn agents in all panes
443        self.spawn_all_agents(resume)?;
444        if resume {
445            self.restore_runtime_state();
446        }
447        self.persist_runtime_state(false)?;
448
449        let started_at = Instant::now();
450        let heartbeat_interval = Duration::from_secs(300); // 5 minutes
451        let mut last_heartbeat = Instant::now();
452        let mut hot_reload = match HotReloadMonitor::for_current_exe() {
453            Ok(monitor) => Some(monitor),
454            Err(error) => {
455                warn!(error = %error, "failed to initialize daemon hot-reload monitor");
456                None
457            }
458        };
459
460        // Main polling loop
461        let shutdown_reason;
462        loop {
463            // Check for signal-based shutdown
464            if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
465                shutdown_reason = "signal";
466                info!("received shutdown signal");
467                break;
468            }
469
470            if !tmux::session_exists(&self.config.session) {
471                shutdown_reason = "session_gone";
472                info!("tmux session gone, shutting down");
473                break;
474            }
475
476            // -- Recoverable subsystems: log-and-skip with consecutive-failure tracking --
477            self.run_recoverable_step("poll_watchers", |daemon| daemon.poll_watchers());
478            self.run_recoverable_step("restart_dead_members", |daemon| {
479                daemon.restart_dead_members()
480            });
481            self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
482                daemon.sync_launch_state_session_ids()
483            });
484            self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
485                daemon.drain_legacy_command_queue()
486            });
487
488            // -- Critical subsystems: errors logged but no consecutive-failure tracking --
489            self.run_loop_step("deliver_inbox_messages", |daemon| {
490                daemon.deliver_inbox_messages()
491            });
492            self.run_loop_step("retry_failed_deliveries", |daemon| {
493                daemon.retry_failed_deliveries()
494            });
495
496            // -- Recoverable subsystems --
497            self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
498                daemon.maybe_intervene_triage_backlog()
499            });
500            self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
501                daemon.maybe_intervene_owned_tasks()
502            });
503            self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
504                daemon.maybe_intervene_review_backlog()
505            });
506            self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
507                daemon.maybe_escalate_stale_reviews()
508            });
509            self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
510                daemon.maybe_auto_unblock_blocked_tasks()
511            });
512
513            // -- Critical subsystems --
514            self.run_loop_step("reconcile_active_tasks", |daemon| {
515                daemon.reconcile_active_tasks()
516            });
517            self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
518            self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
519                daemon.maybe_recycle_cron_tasks()
520            });
521
522            // -- Recoverable subsystems --
523            self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
524                daemon.maybe_intervene_manager_dispatch_gap()
525            });
526            self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
527                daemon.maybe_intervene_architect_utilization()
528            });
529            self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
530                daemon.maybe_intervene_board_replenishment()
531            });
532            self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
533                daemon.maybe_detect_pipeline_starvation()
534            });
535
536            // -- Recoverable with catch_unwind (panic-safe) --
537            self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
538                daemon.process_telegram_queue()
539            });
540            self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
541            self.run_recoverable_step("check_backend_health", |daemon| {
542                daemon.check_backend_health()
543            });
544            self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
545                daemon.maybe_reconcile_stale_worktrees()
546            });
547            self.run_recoverable_step("check_worktree_staleness", |daemon| {
548                daemon.check_worktree_staleness()
549            });
550            self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
551                daemon.maybe_warn_uncommitted_work()
552            });
553            self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
554                let generated =
555                    standup::maybe_generate_standup(standup::StandupGenerationContext {
556                        project_root: &daemon.config.project_root,
557                        team_config: &daemon.config.team_config,
558                        members: &daemon.config.members,
559                        watchers: &daemon.watchers,
560                        states: &daemon.states,
561                        pane_map: &daemon.config.pane_map,
562                        telegram_bot: daemon.telegram_bot.as_ref(),
563                        paused_standups: &daemon.paused_standups,
564                        last_standup: &mut daemon.last_standup,
565                        backend_health: &daemon.backend_health,
566                    })?;
567                for recipient in generated {
568                    daemon.record_standup_generated(&recipient);
569                }
570                Ok(())
571            });
572            self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
573            self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
574            self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
575                daemon.maybe_generate_retrospective()
576            });
577            self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
578                daemon.maybe_notify_failure_patterns()
579            });
580            self.run_recoverable_step("maybe_reload_binary", |daemon| {
581                daemon.maybe_hot_reload_binary(hot_reload.as_mut())
582            });
583            status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
584                project_root: &self.config.project_root,
585                members: &self.config.members,
586                pane_map: &self.config.pane_map,
587                states: &self.states,
588                nudges: &self.nudges,
589                last_standup: &self.last_standup,
590                paused_standups: &self.paused_standups,
591                standup_interval_for_member: |member_name| {
592                    standup::standup_interval_for_member_name(
593                        &self.config.team_config,
594                        &self.config.members,
595                        member_name,
596                    )
597                },
598            });
599
600            // Periodic heartbeat
601            if last_heartbeat.elapsed() >= heartbeat_interval {
602                let uptime = started_at.elapsed().as_secs();
603                self.record_daemon_heartbeat(uptime);
604                if let Err(error) = self.persist_runtime_state(false) {
605                    warn!(error = %error, "failed to persist daemon checkpoint");
606                }
607                debug!(uptime_secs = uptime, "daemon heartbeat");
608                last_heartbeat = Instant::now();
609            }
610
611            std::thread::sleep(self.poll_interval);
612        }
613
614        let uptime = started_at.elapsed().as_secs();
615        if let Err(error) = self.persist_runtime_state(true) {
616            warn!(error = %error, "failed to persist final daemon checkpoint");
617        }
618        self.record_daemon_stopped(shutdown_reason, uptime);
619        Ok(())
620    }
621
622    fn maybe_hot_reload_binary(&mut self, monitor: Option<&mut HotReloadMonitor>) -> Result<()> {
623        let Some(monitor) = monitor else {
624            return Ok(());
625        };
626        if !monitor.should_check() {
627            return Ok(());
628        }
629
630        let Some(updated_binary) = monitor.changed_binary()? else {
631            return Ok(());
632        };
633
634        if !monitor.can_attempt_reload() {
635            warn!(
636                path = %updated_binary.path.display(),
637                "binary changed again but reload attempt is rate-limited"
638            );
639            return Ok(());
640        }
641
642        if !binary_is_reloadable(&updated_binary.path) {
643            warn!(
644                path = %updated_binary.path.display(),
645                "binary changed but is not safe to hot-reload yet"
646            );
647            return Ok(());
648        }
649
650        monitor.mark_reload_attempt();
651        self.persist_runtime_state(false)?;
652        self.record_daemon_reloading();
653        self.record_orchestrator_action(format!(
654            "runtime: daemon reloading after binary change ({})",
655            updated_binary.path.display()
656        ));
657        write_hot_reload_marker(&self.config.project_root)?;
658
659        if let Err(error) = exec_reloaded_daemon(&updated_binary.path, &self.config.project_root) {
660            let _ = clear_hot_reload_marker(&self.config.project_root);
661            warn!(
662                path = %updated_binary.path.display(),
663                error = %error,
664                "failed to exec updated daemon binary; continuing on existing process"
665            );
666            self.record_orchestrator_action(format!("runtime: daemon reload failed ({error})"));
667        }
668
669        Ok(())
670    }
671
672    pub(super) fn mark_member_working(&mut self, member_name: &str) {
673        self.states
674            .insert(member_name.to_string(), MemberState::Working);
675        if let Some(watcher) = self.watchers.get_mut(member_name) {
676            watcher.activate();
677        }
678        self.update_automation_timers_for_state(member_name, MemberState::Working);
679    }
680
681    pub(super) fn set_member_idle(&mut self, member_name: &str) {
682        self.states
683            .insert(member_name.to_string(), MemberState::Idle);
684        if let Some(watcher) = self.watchers.get_mut(member_name) {
685            watcher.deactivate();
686        }
687        self.update_automation_timers_for_state(member_name, MemberState::Idle);
688    }
689
690    pub(super) fn active_task_id(&self, engineer: &str) -> Option<u32> {
691        self.active_tasks.get(engineer).copied()
692    }
693
694    pub(super) fn project_root(&self) -> &Path {
695        &self.config.project_root
696    }
697
698    #[cfg(test)]
699    pub(super) fn set_auto_merge_override(&mut self, task_id: u32, enabled: bool) {
700        self.auto_merge_overrides.insert(task_id, enabled);
701    }
702
703    pub(super) fn auto_merge_override(&self, task_id: u32) -> Option<bool> {
704        // In-memory overrides take priority, then check disk
705        if let Some(&value) = self.auto_merge_overrides.get(&task_id) {
706            return Some(value);
707        }
708        let disk_overrides = super::auto_merge::load_overrides(&self.config.project_root);
709        disk_overrides.get(&task_id).copied()
710    }
711
712    pub(super) fn worktree_dir(&self, engineer: &str) -> PathBuf {
713        self.config
714            .project_root
715            .join(".batty")
716            .join("worktrees")
717            .join(engineer)
718    }
719
720    pub(super) fn board_dir(&self) -> PathBuf {
721        self.config
722            .project_root
723            .join(".batty")
724            .join("team_config")
725            .join("board")
726    }
727
728    pub(super) fn member_uses_worktrees(&self, engineer: &str) -> bool {
729        if !self.is_git_repo {
730            return false;
731        }
732        self.config
733            .members
734            .iter()
735            .find(|member| member.name == engineer)
736            .map(|member| member.use_worktrees)
737            .unwrap_or(false)
738    }
739
740    pub(super) fn manager_name(&self, engineer: &str) -> Option<String> {
741        self.config
742            .members
743            .iter()
744            .find(|member| member.name == engineer)
745            .and_then(|member| member.reports_to.clone())
746    }
747
748    #[cfg(test)]
749    pub(super) fn set_active_task_for_test(&mut self, engineer: &str, task_id: u32) {
750        self.active_tasks.insert(engineer.to_string(), task_id);
751    }
752
753    #[cfg(test)]
754    pub(super) fn retry_count_for_test(&self, engineer: &str) -> Option<u32> {
755        self.retry_counts.get(engineer).copied()
756    }
757
758    #[cfg(test)]
759    pub(super) fn member_state_for_test(&self, engineer: &str) -> Option<MemberState> {
760        self.states.get(engineer).copied()
761    }
762
763    #[cfg(test)]
764    pub(super) fn set_member_state_for_test(&mut self, engineer: &str, state: MemberState) {
765        self.states.insert(engineer.to_string(), state);
766    }
767
768    pub(super) fn increment_retry(&mut self, engineer: &str) -> u32 {
769        let count = self.retry_counts.entry(engineer.to_string()).or_insert(0);
770        *count += 1;
771        *count
772    }
773
774    pub(super) fn clear_active_task(&mut self, engineer: &str) {
775        self.active_tasks.remove(engineer);
776        self.retry_counts.remove(engineer);
777        // Clean up any progress checkpoint left from a prior restart.
778        super::checkpoint::remove_checkpoint(&self.config.project_root, engineer);
779    }
780
781    /// Remove active_task entries for tasks that are done, archived, or no longer on the board.
782    pub(super) fn notify_reports_to(&mut self, from_role: &str, msg: &str) -> Result<()> {
783        let parent = self
784            .config
785            .members
786            .iter()
787            .find(|m| m.name == from_role)
788            .and_then(|m| m.reports_to.clone());
789        let Some(parent_name) = parent else {
790            return Ok(());
791        };
792        self.queue_message(from_role, &parent_name, msg)?;
793        self.mark_member_working(&parent_name);
794        Ok(())
795    }
796
797    /// Update automation countdowns when a member's state changes.
798    pub(super) fn update_automation_timers_for_state(
799        &mut self,
800        member_name: &str,
801        new_state: MemberState,
802    ) {
803        match new_state {
804            MemberState::Idle => {
805                self.idle_started_at
806                    .insert(member_name.to_string(), Instant::now());
807            }
808            MemberState::Working => {
809                self.idle_started_at.remove(member_name);
810            }
811        }
812        self.update_nudge_for_state(member_name, new_state);
813        standup::update_timer_for_state(
814            &self.config.team_config,
815            &self.config.members,
816            &mut self.paused_standups,
817            &mut self.last_standup,
818            member_name,
819            new_state,
820        );
821        self.update_triage_intervention_for_state(member_name, new_state);
822    }
823
824    fn restore_runtime_state(&mut self) {
825        let Some(state) = load_daemon_state(&self.config.project_root) else {
826            return;
827        };
828
829        self.states = state.states;
830        self.idle_started_at = self
831            .states
832            .iter()
833            .filter(|(_, state)| matches!(state, MemberState::Idle))
834            .map(|(member, _)| (member.clone(), Instant::now()))
835            .collect();
836        self.active_tasks = state.active_tasks;
837        self.retry_counts = state.retry_counts;
838        self.dispatch_queue = state.dispatch_queue;
839        self.paused_standups = state.paused_standups;
840        self.last_standup = standup::restore_timer_state(state.last_standup_elapsed_secs);
841
842        for (member_name, persisted) in state.nudge_state {
843            let Some(schedule) = self.nudges.get_mut(&member_name) else {
844                continue;
845            };
846            schedule.idle_since = persisted.idle_elapsed_secs.map(|elapsed_secs| {
847                Instant::now()
848                    .checked_sub(Duration::from_secs(elapsed_secs))
849                    .unwrap_or_else(Instant::now)
850            });
851            schedule.fired_this_idle = persisted.fired_this_idle;
852            schedule.paused = persisted.paused;
853        }
854        self.pipeline_starvation_fired = state.pipeline_starvation_fired;
855    }
856
857    fn persist_runtime_state(&self, clean_shutdown: bool) -> Result<()> {
858        let state = PersistedDaemonState {
859            clean_shutdown,
860            saved_at: now_unix(),
861            states: self.states.clone(),
862            active_tasks: self.active_tasks.clone(),
863            retry_counts: self.retry_counts.clone(),
864            dispatch_queue: self.dispatch_queue.clone(),
865            paused_standups: self.paused_standups.clone(),
866            last_standup_elapsed_secs: standup::snapshot_timer_state(&self.last_standup),
867            nudge_state: self
868                .nudges
869                .iter()
870                .map(|(member, schedule)| {
871                    (
872                        member.clone(),
873                        PersistedNudgeState {
874                            idle_elapsed_secs: schedule.idle_since.map(|t| t.elapsed().as_secs()),
875                            fired_this_idle: schedule.fired_this_idle,
876                            paused: schedule.paused,
877                        },
878                    )
879                })
880                .collect(),
881            pipeline_starvation_fired: self.pipeline_starvation_fired,
882        };
883        save_daemon_state(&self.config.project_root, &state)
884    }
885}
886
887fn describe_command_failure(command: &str, args: &[&str], output: &std::process::Output) -> String {
888    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
889    let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
890    let details = if !stderr.is_empty() {
891        stderr
892    } else if !stdout.is_empty() {
893        stdout
894    } else {
895        format!("process exited with status {}", output.status)
896    };
897
898    format!("`{command} {}` failed: {details}", args.join(" "))
899}
900
901fn default_prompt_file_for_role(role_type: RoleType) -> &'static str {
902    match role_type {
903        RoleType::Architect => "architect.md",
904        RoleType::Manager => "manager.md",
905        RoleType::Engineer => "engineer.md",
906        RoleType::User => "architect.md",
907    }
908}
909
910fn role_prompt_path(
911    team_config_dir: &Path,
912    prompt_override: Option<&str>,
913    role_type: RoleType,
914) -> PathBuf {
915    team_config_dir.join(prompt_override.unwrap_or(default_prompt_file_for_role(role_type)))
916}
917
918/// Extract the `## Nudge` section from a prompt .md file.
919///
920/// Returns the text after `## Nudge` up to the next `## ` heading or EOF.
921/// Returns `None` if no `## Nudge` section is found.
922fn extract_nudge_section(prompt_path: &Path) -> Option<String> {
923    let content = std::fs::read_to_string(prompt_path).ok()?;
924    let mut in_nudge = false;
925    let mut lines = Vec::new();
926
927    for line in content.lines() {
928        if line.starts_with("## Nudge") {
929            in_nudge = true;
930            continue;
931        }
932        if in_nudge {
933            // Stop at next heading
934            if line.starts_with("## ") {
935                break;
936            }
937            lines.push(line);
938        }
939    }
940
941    if lines.is_empty() {
942        return None;
943    }
944
945    let text = lines.join("\n").trim().to_string();
946    if text.is_empty() { None } else { Some(text) }
947}
948
949/// Strip the `## Nudge` section from prompt text so it's not sent to the agent.
950///
951/// The nudge content is daemon-only — injected periodically, not part of the
952/// initial prompt.
953fn format_stuck_duration(stuck_age_secs: u64) -> String {
954    if stuck_age_secs >= 3600 {
955        let hours = stuck_age_secs / 3600;
956        let mins = (stuck_age_secs % 3600) / 60;
957        format!("{hours}h {mins}m")
958    } else if stuck_age_secs >= 60 {
959        let mins = stuck_age_secs / 60;
960        let secs = stuck_age_secs % 60;
961        format!("{mins}m {secs}s")
962    } else {
963        format!("{stuck_age_secs}s")
964    }
965}
966
967fn ensure_tmux_session_ready(session: &str) -> Result<()> {
968    if tmux::session_exists(session) {
969        Ok(())
970    } else {
971        bail!("daemon startup pre-flight failed: tmux session '{session}' is missing")
972    }
973}
974
975fn ensure_kanban_available() -> Result<()> {
976    let output = std::process::Command::new("kanban-md")
977        .arg("--help")
978        .output()
979        .context(
980            "daemon startup pre-flight failed while verifying board tooling: could not execute `kanban-md --help`",
981        )?;
982    if output.status.success() {
983        return Ok(());
984    }
985
986    let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
987    let detail = if stderr.is_empty() {
988        "unknown error".to_string()
989    } else {
990        stderr
991    };
992    bail!("daemon startup pre-flight failed: `kanban-md --help` failed: {detail}");
993}
994
995fn board_dir(project_root: &Path) -> PathBuf {
996    project_root
997        .join(".batty")
998        .join("team_config")
999        .join("board")
1000}
1001
1002fn ensure_board_initialized(project_root: &Path) -> Result<bool> {
1003    let board_dir = board_dir(project_root);
1004    if board_dir.join("tasks").is_dir() {
1005        return Ok(false);
1006    }
1007
1008    board_cmd::init(&board_dir).map_err(|error| {
1009        anyhow::anyhow!(
1010            "daemon startup pre-flight failed: unable to initialize board at '{}': {error}",
1011            board_dir.display()
1012        )
1013    })?;
1014    Ok(true)
1015}
1016
1017fn hot_reload_marker_path(project_root: &Path) -> PathBuf {
1018    project_root.join(".batty").join("reload")
1019}
1020
1021fn write_hot_reload_marker(project_root: &Path) -> Result<()> {
1022    let path = hot_reload_marker_path(project_root);
1023    if let Some(parent) = path.parent() {
1024        fs::create_dir_all(parent)
1025            .with_context(|| format!("failed to create {}", parent.display()))?;
1026    }
1027    fs::write(&path, now_unix().to_string())
1028        .with_context(|| format!("failed to write {}", path.display()))?;
1029    Ok(())
1030}
1031
1032fn clear_hot_reload_marker(project_root: &Path) -> Result<()> {
1033    let path = hot_reload_marker_path(project_root);
1034    if !path.exists() {
1035        return Ok(());
1036    }
1037    fs::remove_file(&path).with_context(|| format!("failed to remove {}", path.display()))?;
1038    Ok(())
1039}
1040
1041fn consume_hot_reload_marker(project_root: &Path) -> bool {
1042    let path = hot_reload_marker_path(project_root);
1043    if !path.exists() {
1044        return false;
1045    }
1046    clear_hot_reload_marker(project_root).is_ok()
1047}
1048
1049fn hot_reload_daemon_args(project_root: &Path) -> Vec<String> {
1050    let root = project_root
1051        .canonicalize()
1052        .unwrap_or_else(|_| project_root.to_path_buf())
1053        .to_string_lossy()
1054        .to_string();
1055    vec![
1056        "-v".to_string(),
1057        "daemon".to_string(),
1058        "--project-root".to_string(),
1059        root,
1060        "--resume".to_string(),
1061    ]
1062}
1063
1064fn binary_is_reloadable(path: &Path) -> bool {
1065    let Ok(metadata) = fs::metadata(path) else {
1066        return false;
1067    };
1068    if !metadata.is_file() {
1069        return false;
1070    }
1071
1072    #[cfg(unix)]
1073    {
1074        use std::os::unix::fs::PermissionsExt;
1075        if metadata.permissions().mode() & 0o111 == 0 {
1076            return false;
1077        }
1078    }
1079
1080    #[cfg(target_os = "macos")]
1081    {
1082        let Ok(status) = std::process::Command::new("codesign")
1083            .args(["--verify", path.to_string_lossy().as_ref()])
1084            .status()
1085        else {
1086            return false;
1087        };
1088        if !status.success() {
1089            return false;
1090        }
1091    }
1092
1093    true
1094}
1095
1096#[cfg(unix)]
1097fn exec_reloaded_daemon(executable: &Path, project_root: &Path) -> Result<()> {
1098    use std::os::unix::process::CommandExt;
1099
1100    let error = std::process::Command::new(executable)
1101        .args(hot_reload_daemon_args(project_root))
1102        .exec();
1103    Err(anyhow::Error::new(error).context(format!("failed to exec {}", executable.display())))
1104}
1105
1106#[cfg(not(unix))]
1107fn exec_reloaded_daemon(_executable: &Path, _project_root: &Path) -> Result<()> {
1108    bail!("daemon hot reload via exec is only supported on unix")
1109}
1110
1111fn daemon_state_path(project_root: &Path) -> PathBuf {
1112    super::daemon_state_path(project_root)
1113}
1114
1115fn load_daemon_state(project_root: &Path) -> Option<PersistedDaemonState> {
1116    let path = daemon_state_path(project_root);
1117    let Ok(content) = fs::read_to_string(&path) else {
1118        return None;
1119    };
1120
1121    match serde_json::from_str(&content) {
1122        Ok(state) => Some(state),
1123        Err(error) => {
1124            warn!(path = %path.display(), error = %error, "failed to parse daemon state, ignoring");
1125            None
1126        }
1127    }
1128}
1129
1130pub fn load_dispatch_queue_snapshot(project_root: &Path) -> Vec<DispatchQueueEntry> {
1131    load_daemon_state(project_root)
1132        .map(|state| state.dispatch_queue)
1133        .unwrap_or_default()
1134}
1135
1136fn save_daemon_state(project_root: &Path, state: &PersistedDaemonState) -> Result<()> {
1137    let path = daemon_state_path(project_root);
1138    if let Some(parent) = path.parent() {
1139        fs::create_dir_all(parent)
1140            .with_context(|| format!("failed to create {}", parent.display()))?;
1141    }
1142    let content =
1143        serde_json::to_string_pretty(state).context("failed to serialize daemon state")?;
1144    fs::write(&path, content).with_context(|| format!("failed to write {}", path.display()))?;
1145    Ok(())
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150    use super::*;
1151    use crate::team::config::AutomationConfig;
1152    use crate::team::config::{BoardConfig, RoleDef, StandupConfig, WorkflowMode, WorkflowPolicy};
1153    use crate::team::events::EventSink;
1154    use crate::team::test_helpers::make_test_daemon;
1155    use crate::team::test_support::{
1156        TestDaemonBuilder, architect_member, backdate_idle_grace, engineer_member, init_git_repo,
1157        manager_member, write_board_task_file, write_open_task_file, write_owned_task_file,
1158    };
1159    use std::time::UNIX_EPOCH;
1160
1161    use serial_test::serial;
1162    use std::collections::HashMap;
1163    use std::fs::OpenOptions;
1164    use std::io::Write;
1165    use std::path::{Path, PathBuf};
1166    fn production_unwrap_expect_count(path: &Path) -> usize {
1167        let content = std::fs::read_to_string(path).unwrap();
1168        let test_split = content.split("\n#[cfg(test)]").next().unwrap_or(&content);
1169        test_split
1170            .lines()
1171            .filter(|line| line.contains(".unwrap(") || line.contains(".expect("))
1172            .count()
1173    }
1174    fn setup_fake_codex(
1175        project_root: &Path,
1176        log_root: &Path,
1177        member_name: &str,
1178    ) -> (PathBuf, PathBuf) {
1179        let project_slug = project_root
1180            .file_name()
1181            .map(|name| name.to_string_lossy().into_owned())
1182            .unwrap_or_else(|| "default".to_string());
1183        let fake_bin = std::env::temp_dir().join(format!("batty-bin-{project_slug}-{member_name}"));
1184        let _ = std::fs::remove_dir_all(&fake_bin);
1185        std::fs::create_dir_all(&fake_bin).unwrap();
1186
1187        let fake_log = log_root.join(format!("{member_name}-fake-codex.log"));
1188        let fake_codex = fake_bin.join("codex");
1189        std::fs::write(
1190            &fake_codex,
1191            format!(
1192                "#!/bin/bash\nprintf 'PWD:%s\\nARGS:%s\\n' \"$PWD\" \"$*\" >> '{}'\nsleep 1\n",
1193                fake_log.display()
1194            ),
1195        )
1196        .unwrap();
1197        #[cfg(unix)]
1198        {
1199            use std::os::unix::fs::PermissionsExt;
1200            std::fs::set_permissions(&fake_codex, std::fs::Permissions::from_mode(0o755)).unwrap();
1201        }
1202
1203        (fake_bin, fake_log)
1204    }
1205
1206    fn write_codex_session_meta(cwd: &Path) -> PathBuf {
1207        let home = PathBuf::from(std::env::var("HOME").expect("HOME must be set for tests"));
1208        let session_dir = home
1209            .join(".codex")
1210            .join("sessions")
1211            .join("2099")
1212            .join("12")
1213            .join("31");
1214        std::fs::create_dir_all(&session_dir).unwrap();
1215
1216        let unique = format!(
1217            "batty-daemon-lifecycle-{}-{}.jsonl",
1218            std::process::id(),
1219            SystemTime::now()
1220                .duration_since(std::time::UNIX_EPOCH)
1221                .unwrap()
1222                .as_nanos()
1223        );
1224        let session_file = session_dir.join(unique);
1225        std::fs::write(
1226            &session_file,
1227            format!(
1228                "{{\"type\":\"session_meta\",\"payload\":{{\"cwd\":\"{}\"}}}}\n",
1229                cwd.display()
1230            ),
1231        )
1232        .unwrap();
1233        session_file
1234    }
1235
1236    fn append_codex_task_complete(session_file: &Path) {
1237        let mut handle = OpenOptions::new().append(true).open(session_file).unwrap();
1238        writeln!(
1239            handle,
1240            "{{\"type\":\"event_msg\",\"payload\":{{\"type\":\"task_complete\"}}}}"
1241        )
1242        .unwrap();
1243        handle.flush().unwrap();
1244    }
1245
1246    fn wait_for_log_contains(log_path: &Path, needle: &str) -> String {
1247        (0..300)
1248            .find_map(|_| {
1249                let content = match std::fs::read_to_string(log_path) {
1250                    Ok(content) => content,
1251                    Err(_) => {
1252                        std::thread::sleep(Duration::from_millis(100));
1253                        return None;
1254                    }
1255                };
1256                if content.contains(needle) {
1257                    Some(content)
1258                } else {
1259                    std::thread::sleep(Duration::from_millis(100));
1260                    None
1261                }
1262            })
1263            .unwrap_or_else(|| panic!("log {} never contained `{needle}`", log_path.display()))
1264    }
1265
1266    fn starvation_test_daemon(tmp: &tempfile::TempDir, threshold: Option<usize>) -> TeamDaemon {
1267        let mut daemon = TestDaemonBuilder::new(tmp.path())
1268            .members(vec![
1269                architect_member("architect"),
1270                engineer_member("eng-1", Some("architect"), false),
1271                engineer_member("eng-2", Some("architect"), false),
1272            ])
1273            .workflow_policy(WorkflowPolicy {
1274                pipeline_starvation_threshold: threshold,
1275                ..WorkflowPolicy::default()
1276            })
1277            .build();
1278        daemon.states = HashMap::from([
1279            ("eng-1".to_string(), MemberState::Idle),
1280            ("eng-2".to_string(), MemberState::Idle),
1281        ]);
1282        daemon
1283    }
1284
1285    #[test]
1286    fn extract_nudge_from_file() {
1287        let tmp = tempfile::NamedTempFile::new().unwrap();
1288        std::fs::write(
1289            tmp.path(),
1290            "# Architect\n\n## Nudge\n\nCheck work.\nUpdate roadmap.\n\n## Other\n\nstuff\n",
1291        )
1292        .unwrap();
1293        let nudge = extract_nudge_section(tmp.path()).unwrap();
1294        assert!(nudge.contains("Check work."));
1295        assert!(nudge.contains("Update roadmap."));
1296        assert!(!nudge.contains("## Other"));
1297    }
1298
1299    #[test]
1300    fn extract_nudge_returns_none_when_absent() {
1301        let tmp = tempfile::NamedTempFile::new().unwrap();
1302        std::fs::write(tmp.path(), "# Engineer\n\n## Workflow\n\n- code\n").unwrap();
1303        assert!(extract_nudge_section(tmp.path()).is_none());
1304    }
1305
1306    #[test]
1307    fn extract_nudge_returns_none_when_malformed() {
1308        let tmp = tempfile::NamedTempFile::new().unwrap();
1309        std::fs::write(
1310            tmp.path(),
1311            "# Engineer\n\n## Nudge\n\n## Workflow\n\n- code\n",
1312        )
1313        .unwrap();
1314        assert!(extract_nudge_section(tmp.path()).is_none());
1315    }
1316
1317    #[test]
1318    fn daemon_registers_per_role_nudge_intervals_from_prompt_sections() {
1319        let tmp = tempfile::tempdir().unwrap();
1320        let team_config_dir = tmp.path().join(".batty").join("team_config");
1321        std::fs::create_dir_all(&team_config_dir).unwrap();
1322        std::fs::write(
1323            team_config_dir.join("manager.md"),
1324            "# Manager\n\n## Nudge\n\nManager follow-up.\n",
1325        )
1326        .unwrap();
1327        std::fs::write(
1328            team_config_dir.join("engineer.md"),
1329            "# Engineer\n\n## Nudge\n\nEngineer follow-up.\n",
1330        )
1331        .unwrap();
1332
1333        let daemon = TeamDaemon::new(DaemonConfig {
1334            project_root: tmp.path().to_path_buf(),
1335            team_config: TeamConfig {
1336                name: "test".to_string(),
1337                agent: None,
1338                workflow_mode: WorkflowMode::Legacy,
1339                board: BoardConfig::default(),
1340                standup: StandupConfig::default(),
1341                automation: AutomationConfig::default(),
1342                automation_sender: None,
1343                external_senders: Vec::new(),
1344                orchestrator_pane: true,
1345                orchestrator_position: OrchestratorPosition::Bottom,
1346                layout: None,
1347                workflow_policy: WorkflowPolicy::default(),
1348                cost: Default::default(),
1349                event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1350                retro_min_duration_secs: 60,
1351                roles: vec![
1352                    RoleDef {
1353                        name: "manager".to_string(),
1354                        role_type: RoleType::Manager,
1355                        agent: Some("claude".to_string()),
1356                        instances: 1,
1357                        prompt: None,
1358                        talks_to: vec![],
1359                        channel: None,
1360                        channel_config: None,
1361                        nudge_interval_secs: Some(120),
1362                        receives_standup: None,
1363                        standup_interval_secs: None,
1364                        owns: Vec::new(),
1365                        use_worktrees: false,
1366                    },
1367                    RoleDef {
1368                        name: "engineer".to_string(),
1369                        role_type: RoleType::Engineer,
1370                        agent: Some("codex".to_string()),
1371                        instances: 1,
1372                        prompt: None,
1373                        talks_to: vec![],
1374                        channel: None,
1375                        channel_config: None,
1376                        nudge_interval_secs: Some(300),
1377                        receives_standup: None,
1378                        standup_interval_secs: None,
1379                        owns: Vec::new(),
1380                        use_worktrees: false,
1381                    },
1382                ],
1383            },
1384            session: "test".to_string(),
1385            members: vec![
1386                MemberInstance {
1387                    name: "lead".to_string(),
1388                    role_name: "manager".to_string(),
1389                    role_type: RoleType::Manager,
1390                    agent: Some("claude".to_string()),
1391                    prompt: None,
1392                    reports_to: None,
1393                    use_worktrees: false,
1394                },
1395                MemberInstance {
1396                    name: "eng-1".to_string(),
1397                    role_name: "engineer".to_string(),
1398                    role_type: RoleType::Engineer,
1399                    agent: Some("codex".to_string()),
1400                    prompt: None,
1401                    reports_to: Some("lead".to_string()),
1402                    use_worktrees: true,
1403                },
1404            ],
1405            pane_map: HashMap::new(),
1406        })
1407        .unwrap();
1408
1409        assert_eq!(
1410            daemon
1411                .nudges
1412                .get("lead")
1413                .map(|schedule| schedule.text.as_str()),
1414            Some("Manager follow-up.")
1415        );
1416        assert_eq!(
1417            daemon.nudges.get("lead").map(|schedule| schedule.interval),
1418            Some(Duration::from_secs(120))
1419        );
1420        assert_eq!(
1421            daemon
1422                .nudges
1423                .get("eng-1")
1424                .map(|schedule| schedule.text.as_str()),
1425            Some("Engineer follow-up.")
1426        );
1427        assert_eq!(
1428            daemon.nudges.get("eng-1").map(|schedule| schedule.interval),
1429            Some(Duration::from_secs(300))
1430        );
1431    }
1432
1433    #[test]
1434    fn format_nudge_status_marks_sent_after_fire() {
1435        let schedule = NudgeSchedule {
1436            text: "check in".to_string(),
1437            interval: Duration::from_secs(600),
1438            idle_since: Some(Instant::now() - Duration::from_secs(601)),
1439            fired_this_idle: true,
1440            paused: false,
1441        };
1442
1443        assert_eq!(
1444            status::format_nudge_status(Some(&schedule)),
1445            " #[fg=magenta]nudge sent#[default]"
1446        );
1447    }
1448
1449    #[test]
1450    fn daemon_state_round_trip_preserves_runtime_fields() {
1451        let tmp = tempfile::tempdir().unwrap();
1452        let state = PersistedDaemonState {
1453            clean_shutdown: false,
1454            saved_at: 123,
1455            states: HashMap::from([("eng-1".to_string(), MemberState::Working)]),
1456            active_tasks: HashMap::from([("eng-1".to_string(), 42)]),
1457            retry_counts: HashMap::from([("eng-1".to_string(), 2)]),
1458            dispatch_queue: vec![DispatchQueueEntry {
1459                engineer: "eng-1".to_string(),
1460                task_id: 77,
1461                task_title: "queued".to_string(),
1462                queued_at: 999,
1463                validation_failures: 1,
1464                last_failure: Some("waiting for stabilization".to_string()),
1465            }],
1466            paused_standups: HashSet::from(["manager".to_string()]),
1467            last_standup_elapsed_secs: HashMap::from([("architect".to_string(), 55)]),
1468            nudge_state: HashMap::from([(
1469                "eng-1".to_string(),
1470                PersistedNudgeState {
1471                    idle_elapsed_secs: Some(88),
1472                    fired_this_idle: true,
1473                    paused: false,
1474                },
1475            )]),
1476            pipeline_starvation_fired: true,
1477        };
1478
1479        save_daemon_state(tmp.path(), &state).unwrap();
1480
1481        let loaded = load_daemon_state(tmp.path()).unwrap();
1482        assert_eq!(loaded, state);
1483    }
1484
1485    #[test]
1486    fn watcher_mut_returns_context_for_unknown_member() {
1487        let tmp = tempfile::tempdir().unwrap();
1488        std::fs::create_dir_all(tmp.path().join(".batty").join("team_config")).unwrap();
1489        let mut daemon = make_test_daemon(tmp.path(), vec![manager_member("manager", None)]);
1490
1491        let error = match daemon.watcher_mut("missing") {
1492            Ok(_) => panic!("expected missing watcher to return an error"),
1493            Err(error) => error,
1494        };
1495
1496        assert!(
1497            error
1498                .to_string()
1499                .contains("watcher registry missing member 'missing'")
1500        );
1501    }
1502
1503    #[test]
1504    fn test_auto_dispatch_filters_idle_engineers_only() {
1505        let tmp = tempfile::tempdir().unwrap();
1506        let roles = vec![
1507            RoleDef {
1508                name: "architect".to_string(),
1509                role_type: RoleType::Architect,
1510                agent: Some("claude".to_string()),
1511                instances: 1,
1512                prompt: None,
1513                talks_to: vec![],
1514                channel: None,
1515                channel_config: None,
1516                nudge_interval_secs: None,
1517                receives_standup: None,
1518                standup_interval_secs: None,
1519                owns: Vec::new(),
1520                use_worktrees: false,
1521            },
1522            RoleDef {
1523                name: "manager".to_string(),
1524                role_type: RoleType::Manager,
1525                agent: Some("claude".to_string()),
1526                instances: 1,
1527                prompt: None,
1528                talks_to: vec![],
1529                channel: None,
1530                channel_config: None,
1531                nudge_interval_secs: None,
1532                receives_standup: None,
1533                standup_interval_secs: None,
1534                owns: Vec::new(),
1535                use_worktrees: false,
1536            },
1537            RoleDef {
1538                name: "eng-1".to_string(),
1539                role_type: RoleType::Engineer,
1540                agent: Some("claude".to_string()),
1541                instances: 1,
1542                prompt: None,
1543                talks_to: vec![],
1544                channel: None,
1545                channel_config: None,
1546                nudge_interval_secs: None,
1547                receives_standup: None,
1548                standup_interval_secs: None,
1549                owns: Vec::new(),
1550                use_worktrees: false,
1551            },
1552            RoleDef {
1553                name: "eng-2".to_string(),
1554                role_type: RoleType::Engineer,
1555                agent: Some("claude".to_string()),
1556                instances: 1,
1557                prompt: None,
1558                talks_to: vec![],
1559                channel: None,
1560                channel_config: None,
1561                nudge_interval_secs: None,
1562                receives_standup: None,
1563                standup_interval_secs: None,
1564                owns: Vec::new(),
1565                use_worktrees: false,
1566            },
1567        ];
1568        let members = vec![
1569            MemberInstance {
1570                name: "architect".to_string(),
1571                role_name: "architect".to_string(),
1572                role_type: RoleType::Architect,
1573                agent: Some("claude".to_string()),
1574                prompt: None,
1575                reports_to: None,
1576                use_worktrees: false,
1577            },
1578            MemberInstance {
1579                name: "manager".to_string(),
1580                role_name: "manager".to_string(),
1581                role_type: RoleType::Manager,
1582                agent: Some("claude".to_string()),
1583                prompt: None,
1584                reports_to: Some("architect".to_string()),
1585                use_worktrees: false,
1586            },
1587            MemberInstance {
1588                name: "eng-1".to_string(),
1589                role_name: "eng-1".to_string(),
1590                role_type: RoleType::Engineer,
1591                agent: Some("claude".to_string()),
1592                prompt: None,
1593                reports_to: Some("manager".to_string()),
1594                use_worktrees: false,
1595            },
1596            MemberInstance {
1597                name: "eng-2".to_string(),
1598                role_name: "eng-2".to_string(),
1599                role_type: RoleType::Engineer,
1600                agent: Some("claude".to_string()),
1601                prompt: None,
1602                reports_to: Some("manager".to_string()),
1603                use_worktrees: false,
1604            },
1605        ];
1606
1607        let mut daemon = TeamDaemon::new(DaemonConfig {
1608            project_root: tmp.path().to_path_buf(),
1609            team_config: TeamConfig {
1610                name: "test".to_string(),
1611                agent: None,
1612                workflow_mode: WorkflowMode::Legacy,
1613                workflow_policy: WorkflowPolicy::default(),
1614                board: BoardConfig::default(),
1615                standup: StandupConfig::default(),
1616                automation: AutomationConfig::default(),
1617                automation_sender: None,
1618                external_senders: Vec::new(),
1619                orchestrator_pane: true,
1620                orchestrator_position: OrchestratorPosition::Bottom,
1621                layout: None,
1622                cost: Default::default(),
1623                event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1624                retro_min_duration_secs: 60,
1625                roles,
1626            },
1627            session: "test".to_string(),
1628            members,
1629            pane_map: HashMap::new(),
1630        })
1631        .unwrap();
1632
1633        daemon
1634            .states
1635            .insert("architect".to_string(), MemberState::Idle);
1636        daemon
1637            .states
1638            .insert("manager".to_string(), MemberState::Idle);
1639        daemon.states.insert("eng-1".to_string(), MemberState::Idle);
1640        daemon
1641            .states
1642            .insert("eng-2".to_string(), MemberState::Working);
1643
1644        let board_dir = tmp.path().join(".batty").join("team_config").join("board");
1645        let tasks_dir = board_dir.join("tasks");
1646        std::fs::create_dir_all(&tasks_dir).unwrap();
1647        std::fs::write(
1648            tasks_dir.join("001-auto-task.md"),
1649            "---\nid: 1\ntitle: auto-task\nstatus: todo\npriority: high\nclass: standard\n---\n\nTask description.\n",
1650        )
1651        .unwrap();
1652
1653        assert_eq!(daemon.idle_engineer_names(), vec!["eng-1".to_string()]);
1654        let task = next_unclaimed_task(&board_dir).unwrap().unwrap();
1655        assert_eq!(task.id, 1);
1656    }
1657
1658    #[test]
1659    fn test_maybe_auto_dispatch_respects_rate_limit() {
1660        let tmp = tempfile::tempdir().unwrap();
1661        let mut daemon = TestDaemonBuilder::new(tmp.path()).build();
1662
1663        let before = daemon.last_auto_dispatch;
1664        daemon.maybe_auto_dispatch().unwrap();
1665        assert_eq!(daemon.last_auto_dispatch, before);
1666    }
1667
1668    #[test]
1669    fn test_maybe_auto_dispatch_skips_when_disabled() {
1670        let tmp = tempfile::tempdir().unwrap();
1671        let mut daemon = TestDaemonBuilder::new(tmp.path())
1672            .board(BoardConfig {
1673                auto_dispatch: false,
1674                ..BoardConfig::default()
1675            })
1676            .build();
1677        daemon.last_auto_dispatch = Instant::now() - Duration::from_secs(30);
1678
1679        let before = daemon.last_auto_dispatch;
1680        daemon.maybe_auto_dispatch().unwrap();
1681        assert_eq!(daemon.last_auto_dispatch, before);
1682    }
1683
1684    #[test]
1685    #[serial]
1686    #[cfg_attr(not(feature = "integration"), ignore)]
1687    fn daemon_lifecycle_happy_path_exercises_decomposed_modules() {
1688        let session = format!("batty-test-daemon-lifecycle-{}", std::process::id());
1689        let _ = crate::tmux::kill_session(&session);
1690
1691        let tmp = tempfile::tempdir().unwrap();
1692        let repo = init_git_repo(&tmp, "batty-daemon-lifecycle");
1693        write_open_task_file(&repo, 42, "lifecycle-task", "todo");
1694
1695        let member_name = "eng-lifecycle";
1696        let (fake_bin, fake_log) = setup_fake_codex(&repo, tmp.path(), member_name);
1697
1698        crate::tmux::create_session(&session, "bash", &[], repo.to_string_lossy().as_ref())
1699            .unwrap();
1700        crate::tmux::create_window(
1701            &session,
1702            "keeper",
1703            "sleep",
1704            &["30".to_string()],
1705            repo.to_string_lossy().as_ref(),
1706        )
1707        .unwrap();
1708        let pane_id = crate::tmux::pane_id(&session).unwrap();
1709
1710        let engineer = MemberInstance {
1711            name: member_name.to_string(),
1712            role_name: "engineer".to_string(),
1713            role_type: RoleType::Engineer,
1714            agent: Some("codex".to_string()),
1715            prompt: None,
1716            reports_to: None,
1717            use_worktrees: true,
1718        };
1719        let mut daemon = TeamDaemon::new(DaemonConfig {
1720            project_root: repo.clone(),
1721            team_config: TeamConfig {
1722                name: "test".to_string(),
1723                agent: None,
1724                workflow_mode: WorkflowMode::Legacy,
1725                workflow_policy: WorkflowPolicy::default(),
1726                board: BoardConfig::default(),
1727                standup: StandupConfig::default(),
1728                automation: AutomationConfig::default(),
1729                automation_sender: None,
1730                external_senders: Vec::new(),
1731                orchestrator_pane: true,
1732                orchestrator_position: OrchestratorPosition::Bottom,
1733                layout: None,
1734                cost: Default::default(),
1735                event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1736                retro_min_duration_secs: 60,
1737                roles: Vec::new(),
1738            },
1739            session: session.clone(),
1740            members: vec![engineer],
1741            pane_map: HashMap::from([(member_name.to_string(), pane_id)]),
1742        })
1743        .unwrap();
1744        daemon.spawn_all_agents(false).unwrap();
1745        let spawn_log = wait_for_log_contains(&fake_log, "PWD:");
1746        assert!(spawn_log.contains("PWD:"));
1747        std::thread::sleep(Duration::from_millis(1200));
1748
1749        let assignment = "Task #42: lifecycle-task\n\nTask description.";
1750        daemon
1751            .assign_task_with_task_id(member_name, assignment, Some(42))
1752            .unwrap();
1753        daemon.active_tasks.insert(member_name.to_string(), 42);
1754        assert_eq!(daemon.active_task_id(member_name), Some(42));
1755        assert_eq!(daemon.states.get(member_name), Some(&MemberState::Working));
1756
1757        let worktree_dir = repo.join(".batty").join("worktrees").join(member_name);
1758        assert!(worktree_dir.exists());
1759        assert_eq!(
1760            crate::team::test_support::git_stdout(&worktree_dir, &["branch", "--show-current"]),
1761            format!("{member_name}/42")
1762        );
1763
1764        let codex_cwd = worktree_dir
1765            .join(".batty")
1766            .join("codex-context")
1767            .join(member_name);
1768        let session_file = write_codex_session_meta(&codex_cwd);
1769
1770        daemon.run_loop_step("poll_watchers", |daemon| daemon.poll_watchers());
1771        daemon.run_loop_step("sync_launch_state_session_ids", |daemon| {
1772            daemon.sync_launch_state_session_ids()
1773        });
1774
1775        std::fs::write(worktree_dir.join("note.txt"), "done\n").unwrap();
1776        crate::team::test_support::git_ok(&worktree_dir, &["add", "note.txt"]);
1777        crate::team::test_support::git_ok(&worktree_dir, &["commit", "-m", "finish task"]);
1778        append_codex_task_complete(&session_file);
1779
1780        daemon.run_loop_step("poll_watchers", |daemon| daemon.poll_watchers());
1781
1782        assert_eq!(daemon.active_task_id(member_name), None);
1783        assert_eq!(daemon.states.get(member_name), Some(&MemberState::Idle));
1784        assert_eq!(
1785            std::fs::read_to_string(repo.join("note.txt")).unwrap(),
1786            "done\n"
1787        );
1788
1789        let events = crate::team::events::read_events(
1790            &repo.join(".batty").join("team_config").join("events.jsonl"),
1791        )
1792        .unwrap();
1793        assert!(events.iter().any(|event| {
1794            event.event == "task_assigned"
1795                && event.role.as_deref() == Some(member_name)
1796                && event
1797                    .task
1798                    .as_deref()
1799                    .is_some_and(|task| task.contains("Task #42: lifecycle-task"))
1800        }));
1801        assert!(events.iter().any(|event| {
1802            event.event == "task_completed" && event.role.as_deref() == Some(member_name)
1803        }));
1804
1805        let launch_state = load_launch_state(&repo);
1806        let identity = launch_state.get(member_name).expect("missing launch state");
1807        assert_eq!(identity.agent, "codex-cli");
1808        assert_eq!(
1809            identity.session_id.as_deref(),
1810            session_file.file_stem().and_then(|stem| stem.to_str())
1811        );
1812
1813        crate::tmux::kill_session(&session).unwrap();
1814        let _ = std::fs::remove_file(&session_file);
1815        let _ = std::fs::remove_dir_all(&fake_bin);
1816    }
1817    #[test]
1818    #[serial]
1819    #[cfg_attr(not(feature = "integration"), ignore)]
1820    fn maybe_fire_nudges_marks_member_working_after_live_delivery() {
1821        let session = "batty-test-nudge-live-delivery";
1822        let mut delivered_live = false;
1823
1824        // A freshly created tmux pane can occasionally reject the first live
1825        // injection under heavy suite load. Retry the full setup a few times so
1826        // this test only fails on a real regression in the live-delivery path.
1827        for _attempt in 0..5 {
1828            let _ = crate::tmux::kill_session(session);
1829
1830            crate::tmux::create_session(session, "cat", &[], "/tmp").unwrap();
1831            let pane_id = crate::tmux::pane_id(session).unwrap();
1832            std::thread::sleep(Duration::from_millis(300));
1833
1834            let tmp = tempfile::tempdir().unwrap();
1835            let mut watchers = HashMap::new();
1836            let mut scientist_watcher = SessionWatcher::new(&pane_id, "scientist", 300, None);
1837            scientist_watcher.confirm_ready();
1838            watchers.insert("scientist".to_string(), scientist_watcher);
1839            let mut daemon = TestDaemonBuilder::new(tmp.path())
1840                .session(session)
1841                .members(vec![architect_member("scientist")])
1842                .pane_map(HashMap::from([("scientist".to_string(), pane_id.clone())]))
1843                .watchers(watchers)
1844                .states(HashMap::from([(
1845                    "scientist".to_string(),
1846                    MemberState::Idle,
1847                )]))
1848                .nudges(HashMap::from([(
1849                    "scientist".to_string(),
1850                    NudgeSchedule {
1851                        text: "Please make progress.".to_string(),
1852                        interval: Duration::from_secs(1),
1853                        idle_since: Some(Instant::now() - Duration::from_secs(5)),
1854                        fired_this_idle: false,
1855                        paused: false,
1856                    },
1857                )]))
1858                .build();
1859
1860            backdate_idle_grace(&mut daemon, "scientist");
1861            daemon.maybe_fire_nudges().unwrap();
1862
1863            if daemon.states.get("scientist") == Some(&MemberState::Working) {
1864                let schedule = daemon.nudges.get("scientist").unwrap();
1865                assert!(schedule.paused);
1866                assert!(schedule.idle_since.is_none());
1867                assert!(!schedule.fired_this_idle);
1868                delivered_live = true;
1869                crate::tmux::kill_session(session).unwrap();
1870                break;
1871            }
1872
1873            crate::tmux::kill_session(session).unwrap();
1874            std::thread::sleep(Duration::from_millis(100));
1875        }
1876
1877        assert!(
1878            delivered_live,
1879            "expected at least one successful live nudge delivery"
1880        );
1881    }
1882
1883    #[test]
1884    #[serial]
1885    #[cfg_attr(not(feature = "integration"), ignore)]
1886    fn maybe_intervene_triage_backlog_marks_member_working_after_live_delivery() {
1887        let session = format!("batty-test-triage-live-delivery-{}", std::process::id());
1888        let _ = crate::tmux::kill_session(&session);
1889
1890        crate::tmux::create_session(&session, "cat", &[], "/tmp").unwrap();
1891        let pane_id = crate::tmux::pane_id(&session).unwrap();
1892        std::thread::sleep(Duration::from_millis(300));
1893
1894        let tmp = tempfile::tempdir().unwrap();
1895        let mut watchers = HashMap::new();
1896        let mut lead_watcher = SessionWatcher::new(&pane_id, "lead", 300, None);
1897        lead_watcher.confirm_ready();
1898        watchers.insert("lead".to_string(), lead_watcher);
1899        let mut daemon = TestDaemonBuilder::new(tmp.path())
1900            .session(&session)
1901            .members(vec![
1902                manager_member("lead", Some("architect")),
1903                engineer_member("eng-1", Some("lead"), false),
1904            ])
1905            .pane_map(HashMap::from([("lead".to_string(), pane_id.clone())]))
1906            .watchers(watchers)
1907            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
1908            .build();
1909
1910        let root = inbox::inboxes_root(tmp.path());
1911        inbox::init_inbox(&root, "lead").unwrap();
1912        inbox::init_inbox(&root, "eng-1").unwrap();
1913        let mut result = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
1914        result.timestamp = super::now_unix();
1915        let id = inbox::deliver_to_inbox(&root, &result).unwrap();
1916        inbox::mark_delivered(&root, "lead", &id).unwrap();
1917
1918        daemon.update_automation_timers_for_state("lead", MemberState::Working);
1919        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
1920        backdate_idle_grace(&mut daemon, "lead");
1921        daemon.maybe_intervene_triage_backlog().unwrap();
1922
1923        assert_eq!(daemon.triage_interventions.get("lead"), Some(&1));
1924        if daemon.states.get("lead") == Some(&MemberState::Working) {
1925            let pane = (0..50)
1926                .find_map(|_| {
1927                    let pane = tmux::capture_pane(&pane_id).unwrap_or_default();
1928                    if pane.contains("batty inbox lead") {
1929                        Some(pane)
1930                    } else {
1931                        std::thread::sleep(Duration::from_millis(200));
1932                        None
1933                    }
1934                })
1935                .unwrap_or_else(|| tmux::capture_pane(&pane_id).unwrap_or_default());
1936            assert!(pane.contains("batty inbox lead"));
1937            assert!(pane.contains("batty read lead <ref>"));
1938            assert!(pane.contains("batty send eng-1"));
1939            assert!(pane.contains("batty assign eng-1"));
1940            assert!(pane.contains("batty send architect"));
1941            assert!(pane.contains("next time you become idle"));
1942        } else {
1943            let pending = inbox::pending_messages(&root, "lead").unwrap();
1944            assert_eq!(pending.len(), 1);
1945            assert!(pending[0].body.contains("batty inbox lead"));
1946        }
1947
1948        crate::tmux::kill_session(&session).unwrap();
1949    }
1950
1951    #[test]
1952    fn maybe_intervene_triage_backlog_queues_when_live_delivery_falls_back_to_inbox() {
1953        let tmp = tempfile::tempdir().unwrap();
1954        let mut daemon = TestDaemonBuilder::new(tmp.path())
1955            .members(vec![
1956                manager_member("lead", Some("architect")),
1957                engineer_member("eng-1", Some("lead"), false),
1958            ])
1959            .pane_map(HashMap::from([(
1960                "lead".to_string(),
1961                "%9999999".to_string(),
1962            )]))
1963            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
1964            .build();
1965
1966        let root = inbox::inboxes_root(tmp.path());
1967        inbox::init_inbox(&root, "lead").unwrap();
1968        inbox::init_inbox(&root, "eng-1").unwrap();
1969        let mut result = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
1970        result.timestamp = super::now_unix();
1971        let id = inbox::deliver_to_inbox(&root, &result).unwrap();
1972        inbox::mark_delivered(&root, "lead", &id).unwrap();
1973
1974        daemon.update_automation_timers_for_state("lead", MemberState::Working);
1975        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
1976        backdate_idle_grace(&mut daemon, "lead");
1977        daemon.maybe_intervene_triage_backlog().unwrap();
1978
1979        assert_eq!(daemon.states.get("lead"), Some(&MemberState::Idle));
1980        assert_eq!(daemon.triage_interventions.get("lead"), Some(&1));
1981        let pending = inbox::pending_messages(&root, "lead").unwrap();
1982        assert_eq!(pending.len(), 1);
1983        assert_eq!(pending[0].from, "architect");
1984        assert!(pending[0].body.contains("Triage backlog detected"));
1985        assert!(pending[0].body.contains("batty inbox lead"));
1986        assert!(pending[0].body.contains("batty read lead <ref>"));
1987        assert!(pending[0].body.contains("batty send eng-1"));
1988        assert!(pending[0].body.contains("batty assign eng-1"));
1989        assert!(pending[0].body.contains("batty send architect"));
1990        assert!(pending[0].body.contains("next time you become idle"));
1991    }
1992
1993    #[test]
1994    fn maybe_intervene_triage_backlog_does_not_fire_on_startup_idle() {
1995        let tmp = tempfile::tempdir().unwrap();
1996        let mut daemon = TestDaemonBuilder::new(tmp.path())
1997            .members(vec![
1998                manager_member("lead", Some("architect")),
1999                engineer_member("eng-1", Some("lead"), false),
2000            ])
2001            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2002            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2003            .build();
2004
2005        let root = inbox::inboxes_root(tmp.path());
2006        inbox::init_inbox(&root, "lead").unwrap();
2007        inbox::init_inbox(&root, "eng-1").unwrap();
2008        let mut result = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
2009        result.timestamp = super::now_unix();
2010        let id = inbox::deliver_to_inbox(&root, &result).unwrap();
2011        inbox::mark_delivered(&root, "lead", &id).unwrap();
2012
2013        daemon.maybe_intervene_triage_backlog().unwrap();
2014
2015        assert!(!daemon.triage_interventions.contains_key("lead"));
2016        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2017        assert_eq!(daemon.states.get("lead"), Some(&MemberState::Idle));
2018    }
2019
2020    #[test]
2021    fn maybe_intervene_owned_tasks_queues_when_idle_member_owns_unfinished_task() {
2022        let tmp = tempfile::tempdir().unwrap();
2023        let mut daemon = TestDaemonBuilder::new(tmp.path())
2024            .members(vec![
2025                manager_member("lead", Some("architect")),
2026                engineer_member("eng-1", Some("lead"), false),
2027            ])
2028            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2029            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2030            .build();
2031
2032        let root = inbox::inboxes_root(tmp.path());
2033        inbox::init_inbox(&root, "lead").unwrap();
2034        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2035
2036        daemon.update_automation_timers_for_state("lead", MemberState::Working);
2037        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2038        backdate_idle_grace(&mut daemon, "lead");
2039        daemon.maybe_intervene_owned_tasks().unwrap();
2040
2041        assert_eq!(daemon.states.get("lead"), Some(&MemberState::Idle));
2042        assert_eq!(
2043            daemon
2044                .owned_task_interventions
2045                .get("lead")
2046                .map(|state| state.idle_epoch),
2047            Some(1)
2048        );
2049        let pending = inbox::pending_messages(&root, "lead").unwrap();
2050        assert_eq!(pending.len(), 1);
2051        assert_eq!(pending[0].from, "architect");
2052        assert!(pending[0].body.contains("Task #191"));
2053        assert!(
2054            pending[0]
2055                .body
2056                .contains("Owned active task backlog detected")
2057        );
2058        assert!(pending[0].body.contains("kanban-md list --dir"));
2059        assert!(pending[0].body.contains("kanban-md show --dir"));
2060        assert!(pending[0].body.contains("191"));
2061        assert!(pending[0].body.contains("sed -n '1,220p'"));
2062        assert!(pending[0].body.contains("batty assign eng-1"));
2063        assert!(pending[0].body.contains("batty send architect"));
2064        assert!(pending[0].body.contains("kanban-md move --dir"));
2065        assert!(pending[0].body.contains("next time you become idle"));
2066    }
2067
2068    #[test]
2069    fn maybe_intervene_owned_tasks_engineer_message_captures_initial_state() {
2070        let tmp = tempfile::tempdir().unwrap();
2071        let mut daemon = TestDaemonBuilder::new(tmp.path())
2072            .members(vec![
2073                manager_member("lead", Some("architect")),
2074                engineer_member("eng-1", Some("lead"), false),
2075            ])
2076            .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2077            .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2078            .build();
2079
2080        let root = inbox::inboxes_root(tmp.path());
2081        inbox::init_inbox(&root, "eng-1").unwrap();
2082        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2083
2084        daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2085        daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2086        backdate_idle_grace(&mut daemon, "eng-1");
2087        daemon.maybe_intervene_owned_tasks().unwrap();
2088
2089        let pending = inbox::pending_messages(&root, "eng-1").unwrap();
2090        assert_eq!(pending.len(), 1);
2091        assert_eq!(pending[0].from, "lead");
2092        assert!(
2093            pending[0]
2094                .body
2095                .contains("Owned active task backlog detected")
2096        );
2097        assert!(pending[0].body.contains("Task #191"));
2098        assert!(pending[0].body.contains("batty send lead"));
2099
2100        let state = daemon.owned_task_interventions.get("eng-1").unwrap();
2101        assert_eq!(state.idle_epoch, 1);
2102        assert_eq!(state.signature, "191:in-progress");
2103        assert!(!state.escalation_sent);
2104    }
2105
2106    #[test]
2107    fn maybe_intervene_owned_tasks_fires_for_persistent_startup_idle_state() {
2108        let tmp = tempfile::tempdir().unwrap();
2109        let mut daemon = TestDaemonBuilder::new(tmp.path())
2110            .members(vec![
2111                manager_member("lead", Some("architect")),
2112                engineer_member("eng-1", Some("lead"), false),
2113            ])
2114            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2115            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2116            .build();
2117
2118        let root = inbox::inboxes_root(tmp.path());
2119        inbox::init_inbox(&root, "lead").unwrap();
2120        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2121
2122        backdate_idle_grace(&mut daemon, "lead");
2123        daemon.maybe_intervene_owned_tasks().unwrap();
2124
2125        let pending = inbox::pending_messages(&root, "lead").unwrap();
2126        assert_eq!(pending.len(), 1);
2127        assert_eq!(pending[0].from, "architect");
2128        assert!(pending[0].body.contains("Task #191"));
2129        assert_eq!(
2130            daemon
2131                .owned_task_interventions
2132                .get("lead")
2133                .map(|state| state.idle_epoch),
2134            Some(0)
2135        );
2136        assert_eq!(daemon.states.get("lead"), Some(&MemberState::Idle));
2137    }
2138
2139    #[test]
2140    fn maybe_intervene_owned_tasks_waits_for_idle_grace() {
2141        let tmp = tempfile::tempdir().unwrap();
2142        let mut daemon = TestDaemonBuilder::new(tmp.path())
2143            .members(vec![manager_member("lead", Some("architect"))])
2144            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2145            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2146            .build();
2147
2148        let root = inbox::inboxes_root(tmp.path());
2149        inbox::init_inbox(&root, "lead").unwrap();
2150        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2151
2152        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2153        daemon.maybe_intervene_owned_tasks().unwrap();
2154        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2155
2156        backdate_idle_grace(&mut daemon, "lead");
2157        daemon.maybe_intervene_owned_tasks().unwrap();
2158        assert_eq!(inbox::pending_messages(&root, "lead").unwrap().len(), 1);
2159    }
2160
2161    #[test]
2162    fn maybe_intervene_owned_tasks_skips_when_pending_inbox_exists() {
2163        let tmp = tempfile::tempdir().unwrap();
2164        let mut daemon = TestDaemonBuilder::new(tmp.path())
2165            .members(vec![manager_member("lead", Some("architect"))])
2166            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2167            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2168            .build();
2169
2170        let root = inbox::inboxes_root(tmp.path());
2171        inbox::init_inbox(&root, "lead").unwrap();
2172        let message = inbox::InboxMessage::new_send("architect", "lead", "Check this first.");
2173        inbox::deliver_to_inbox(&root, &message).unwrap();
2174        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2175
2176        backdate_idle_grace(&mut daemon, "lead");
2177        daemon.maybe_intervene_owned_tasks().unwrap();
2178
2179        let pending = inbox::pending_messages(&root, "lead").unwrap();
2180        assert_eq!(pending.len(), 1);
2181        assert_eq!(pending[0].from, "architect");
2182        assert!(
2183            !daemon.owned_task_interventions.contains_key("lead"),
2184            "pending inbox should block new interventions"
2185        );
2186    }
2187
2188    #[test]
2189    fn maybe_intervene_owned_tasks_ignores_review_only_claims() {
2190        let tmp = tempfile::tempdir().unwrap();
2191        let mut daemon = TestDaemonBuilder::new(tmp.path())
2192            .members(vec![manager_member("lead", Some("architect"))])
2193            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2194            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2195            .build();
2196
2197        let root = inbox::inboxes_root(tmp.path());
2198        inbox::init_inbox(&root, "lead").unwrap();
2199        write_owned_task_file(tmp.path(), 191, "review-task", "review", "lead");
2200
2201        daemon.update_automation_timers_for_state("lead", MemberState::Working);
2202        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2203        daemon.maybe_intervene_owned_tasks().unwrap();
2204
2205        assert!(!daemon.owned_task_interventions.contains_key("lead"));
2206        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2207    }
2208
2209    #[test]
2210    fn maybe_intervene_owned_tasks_dedupes_same_active_signature_across_idle_epochs() {
2211        let tmp = tempfile::tempdir().unwrap();
2212        let mut daemon = TestDaemonBuilder::new(tmp.path())
2213            .members(vec![manager_member("lead", Some("architect"))])
2214            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2215            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2216            .build();
2217
2218        let root = inbox::inboxes_root(tmp.path());
2219        inbox::init_inbox(&root, "lead").unwrap();
2220        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2221
2222        daemon.update_automation_timers_for_state("lead", MemberState::Working);
2223        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2224        backdate_idle_grace(&mut daemon, "lead");
2225        daemon.maybe_intervene_owned_tasks().unwrap();
2226
2227        daemon
2228            .states
2229            .insert("lead".to_string(), MemberState::Working);
2230        daemon.update_automation_timers_for_state("lead", MemberState::Working);
2231        daemon.states.insert("lead".to_string(), MemberState::Idle);
2232        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2233        backdate_idle_grace(&mut daemon, "lead");
2234        daemon.maybe_intervene_owned_tasks().unwrap();
2235
2236        let pending = inbox::pending_messages(&root, "lead").unwrap();
2237        assert_eq!(pending.len(), 1);
2238        assert_eq!(
2239            daemon
2240                .owned_task_interventions
2241                .get("lead")
2242                .map(|state| state.idle_epoch),
2243            Some(2)
2244        );
2245    }
2246
2247    fn backdate_intervention_cooldown(daemon: &mut TeamDaemon, key: &str) {
2248        let cooldown = Duration::from_secs(
2249            daemon
2250                .config
2251                .team_config
2252                .automation
2253                .intervention_cooldown_secs,
2254        ) + Duration::from_secs(1);
2255        daemon
2256            .intervention_cooldowns
2257            .insert(key.to_string(), Instant::now() - cooldown);
2258    }
2259
2260    #[test]
2261    fn owned_task_intervention_updates_signature_when_board_state_changes() {
2262        let tmp = tempfile::tempdir().unwrap();
2263        let mut daemon = TestDaemonBuilder::new(tmp.path())
2264            .members(vec![
2265                manager_member("lead", Some("architect")),
2266                engineer_member("eng-1", Some("lead"), false),
2267            ])
2268            .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2269            .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2270            .build();
2271
2272        let root = inbox::inboxes_root(tmp.path());
2273        inbox::init_inbox(&root, "eng-1").unwrap();
2274        write_owned_task_file(tmp.path(), 191, "first-task", "in-progress", "eng-1");
2275
2276        daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2277        daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2278        backdate_idle_grace(&mut daemon, "eng-1");
2279        daemon.maybe_intervene_owned_tasks().unwrap();
2280
2281        let initial = daemon.owned_task_interventions.get("eng-1").unwrap();
2282        assert_eq!(initial.signature, "191:in-progress");
2283
2284        for message in inbox::pending_messages(&root, "eng-1").unwrap() {
2285            inbox::mark_delivered(&root, "eng-1", &message.id).unwrap();
2286        }
2287
2288        write_owned_task_file(tmp.path(), 192, "second-task", "in-progress", "eng-1");
2289        backdate_intervention_cooldown(&mut daemon, "eng-1");
2290        daemon.maybe_intervene_owned_tasks().unwrap();
2291
2292        let pending = inbox::pending_messages(&root, "eng-1").unwrap();
2293        assert_eq!(pending.len(), 1);
2294        assert!(pending[0].body.contains("Task #191"));
2295        assert!(pending[0].body.contains("#192 (in-progress) second-task"));
2296
2297        let updated = daemon.owned_task_interventions.get("eng-1").unwrap();
2298        assert_eq!(updated.signature, "191:in-progress|192:in-progress");
2299        assert!(!updated.escalation_sent);
2300    }
2301
2302    #[test]
2303    fn owned_task_intervention_respects_cooldown() {
2304        let tmp = tempfile::tempdir().unwrap();
2305        let mut daemon = TestDaemonBuilder::new(tmp.path())
2306            .members(vec![manager_member("lead", Some("architect"))])
2307            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2308            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2309            .build();
2310
2311        let root = inbox::inboxes_root(tmp.path());
2312        inbox::init_inbox(&root, "lead").unwrap();
2313        write_owned_task_file(tmp.path(), 191, "first-task", "in-progress", "lead");
2314
2315        // First fire: should deliver intervention.
2316        backdate_idle_grace(&mut daemon, "lead");
2317        daemon.maybe_intervene_owned_tasks().unwrap();
2318        let pending = inbox::pending_messages(&root, "lead").unwrap();
2319        assert_eq!(pending.len(), 1, "first intervention should fire");
2320
2321        // Acknowledge the message so inbox is clear for next check.
2322        for msg in pending {
2323            inbox::mark_delivered(&root, "lead", &msg.id).unwrap();
2324        }
2325
2326        // Change signature (add another task) — should still be blocked by cooldown.
2327        write_owned_task_file(tmp.path(), 192, "second-task", "in-progress", "lead");
2328        daemon.maybe_intervene_owned_tasks().unwrap();
2329        let pending = inbox::pending_messages(&root, "lead").unwrap();
2330        assert_eq!(pending.len(), 0, "cooldown should prevent refire");
2331
2332        // Expire the cooldown — should fire again.
2333        backdate_intervention_cooldown(&mut daemon, "lead");
2334        daemon.maybe_intervene_owned_tasks().unwrap();
2335        let pending = inbox::pending_messages(&root, "lead").unwrap();
2336        assert_eq!(pending.len(), 1, "should fire after cooldown expires");
2337    }
2338
2339    #[test]
2340    fn triage_intervention_respects_cooldown() {
2341        let tmp = tempfile::tempdir().unwrap();
2342        let mut daemon = TestDaemonBuilder::new(tmp.path())
2343            .members(vec![
2344                manager_member("lead", Some("architect")),
2345                engineer_member("eng-1", Some("lead"), false),
2346            ])
2347            .pane_map(HashMap::from([
2348                ("lead".to_string(), "%999".to_string()),
2349                ("eng-1".to_string(), "%998".to_string()),
2350            ]))
2351            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2352            .build();
2353        daemon.triage_idle_epochs = HashMap::from([("lead".to_string(), 1)]);
2354
2355        let root = inbox::inboxes_root(tmp.path());
2356        inbox::init_inbox(&root, "lead").unwrap();
2357        inbox::init_inbox(&root, "eng-1").unwrap();
2358
2359        // Deliver a message from eng-1 to lead's inbox so triage finds something.
2360        let msg = inbox::InboxMessage::new_send("eng-1", "lead", "done with task 42");
2361        let msg_id = inbox::deliver_to_inbox(&root, &msg).unwrap();
2362        inbox::mark_delivered(&root, "lead", &msg_id).unwrap();
2363
2364        // First fire: should work.
2365        backdate_idle_grace(&mut daemon, "lead");
2366        daemon.maybe_intervene_triage_backlog().unwrap();
2367        let pending = inbox::pending_messages(&root, "lead").unwrap();
2368        assert_eq!(pending.len(), 1, "first triage intervention should fire");
2369
2370        // Acknowledge so inbox is clear.
2371        for p in pending {
2372            inbox::mark_delivered(&root, "lead", &p.id).unwrap();
2373        }
2374
2375        // Advance epoch (Working → Idle transition).
2376        daemon.update_automation_timers_for_state("lead", MemberState::Working);
2377        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2378        backdate_idle_grace(&mut daemon, "lead");
2379
2380        // New epoch should normally allow refire, but cooldown blocks it.
2381        daemon.maybe_intervene_triage_backlog().unwrap();
2382        let pending = inbox::pending_messages(&root, "lead").unwrap();
2383        assert_eq!(pending.len(), 0, "cooldown should prevent triage refire");
2384
2385        // Expire cooldown — should fire.
2386        backdate_intervention_cooldown(&mut daemon, "triage::lead");
2387        daemon.maybe_intervene_triage_backlog().unwrap();
2388        let pending = inbox::pending_messages(&root, "lead").unwrap();
2389        assert_eq!(
2390            pending.len(),
2391            1,
2392            "triage should fire after cooldown expires"
2393        );
2394    }
2395
2396    #[test]
2397    fn maybe_intervene_owned_tasks_escalates_stuck_signature_to_parent() {
2398        let tmp = tempfile::tempdir().unwrap();
2399        let events_path = tmp.path().join("events.jsonl");
2400        let mut daemon = TestDaemonBuilder::new(tmp.path())
2401            .members(vec![
2402                manager_member("lead", Some("architect")),
2403                engineer_member("eng-1", Some("lead"), false),
2404            ])
2405            .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2406            .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2407            .workflow_policy(WorkflowPolicy {
2408                escalation_threshold_secs: 120,
2409                ..WorkflowPolicy::default()
2410            })
2411            .build();
2412        daemon.event_sink = EventSink::new(&events_path).unwrap();
2413
2414        let root = inbox::inboxes_root(tmp.path());
2415        inbox::init_inbox(&root, "eng-1").unwrap();
2416        inbox::init_inbox(&root, "lead").unwrap();
2417        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2418
2419        daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2420        daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2421        backdate_idle_grace(&mut daemon, "eng-1");
2422        daemon.maybe_intervene_owned_tasks().unwrap();
2423
2424        let state = daemon.owned_task_interventions.get_mut("eng-1").unwrap();
2425        state.detected_at = Instant::now() - Duration::from_secs(121);
2426
2427        daemon.maybe_intervene_owned_tasks().unwrap();
2428
2429        let engineer_pending = inbox::pending_messages(&root, "eng-1").unwrap();
2430        assert_eq!(engineer_pending.len(), 1);
2431        let lead_pending = inbox::pending_messages(&root, "lead").unwrap();
2432        assert_eq!(lead_pending.len(), 1);
2433        assert_eq!(lead_pending[0].from, "daemon");
2434        assert!(lead_pending[0].body.contains("Stuck task escalation"));
2435        assert!(lead_pending[0].body.contains("eng-1"));
2436        assert!(lead_pending[0].body.contains("Task #191"));
2437        assert!(lead_pending[0].body.contains("kanban-md edit --dir"));
2438        assert!(lead_pending[0].body.contains("batty assign eng-1"));
2439        assert!(
2440            daemon
2441                .owned_task_interventions
2442                .get("eng-1")
2443                .is_some_and(|state| state.escalation_sent)
2444        );
2445
2446        let events = super::super::events::read_events(&events_path).unwrap();
2447        assert!(
2448            events.iter().any(|event| {
2449                event.event == "task_escalated"
2450                    && event.role.as_deref() == Some("eng-1")
2451                    && event.task.as_deref() == Some("191")
2452            }),
2453            "expected task_escalated event for stuck owned task"
2454        );
2455    }
2456
2457    #[test]
2458    fn maybe_intervene_owned_tasks_only_escalates_stuck_signature_once() {
2459        let tmp = tempfile::tempdir().unwrap();
2460        let events_path = tmp.path().join("events.jsonl");
2461        let mut daemon = TestDaemonBuilder::new(tmp.path())
2462            .members(vec![
2463                manager_member("lead", Some("architect")),
2464                engineer_member("eng-1", Some("lead"), false),
2465            ])
2466            .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2467            .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2468            .workflow_policy(WorkflowPolicy {
2469                escalation_threshold_secs: 120,
2470                ..WorkflowPolicy::default()
2471            })
2472            .build();
2473        daemon.event_sink = EventSink::new(&events_path).unwrap();
2474
2475        let root = inbox::inboxes_root(tmp.path());
2476        inbox::init_inbox(&root, "eng-1").unwrap();
2477        inbox::init_inbox(&root, "lead").unwrap();
2478        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2479
2480        daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2481        daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2482        backdate_idle_grace(&mut daemon, "eng-1");
2483        daemon.maybe_intervene_owned_tasks().unwrap();
2484
2485        let state = daemon.owned_task_interventions.get_mut("eng-1").unwrap();
2486        state.detected_at = Instant::now() - Duration::from_secs(121);
2487
2488        daemon.maybe_intervene_owned_tasks().unwrap();
2489        daemon.maybe_intervene_owned_tasks().unwrap();
2490
2491        let lead_pending = inbox::pending_messages(&root, "lead").unwrap();
2492        assert_eq!(lead_pending.len(), 1);
2493        assert!(lead_pending[0].body.contains("Stuck task escalation"));
2494        assert!(
2495            daemon
2496                .owned_task_interventions
2497                .get("eng-1")
2498                .is_some_and(|state| state.escalation_sent)
2499        );
2500
2501        let events = super::super::events::read_events(&events_path).unwrap();
2502        assert_eq!(
2503            events
2504                .iter()
2505                .filter(|event| {
2506                    event.event == "task_escalated"
2507                        && event.role.as_deref() == Some("eng-1")
2508                        && event.task.as_deref() == Some("191")
2509                })
2510                .count(),
2511            1
2512        );
2513    }
2514
2515    #[test]
2516    fn maybe_intervene_owned_tasks_waits_for_escalation_threshold() {
2517        let tmp = tempfile::tempdir().unwrap();
2518        let mut daemon = TestDaemonBuilder::new(tmp.path())
2519            .members(vec![
2520                manager_member("lead", Some("architect")),
2521                engineer_member("eng-1", Some("lead"), false),
2522            ])
2523            .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2524            .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2525            .workflow_policy(WorkflowPolicy {
2526                escalation_threshold_secs: 120,
2527                ..WorkflowPolicy::default()
2528            })
2529            .build();
2530
2531        let root = inbox::inboxes_root(tmp.path());
2532        inbox::init_inbox(&root, "eng-1").unwrap();
2533        inbox::init_inbox(&root, "lead").unwrap();
2534        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2535
2536        daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2537        daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2538        backdate_idle_grace(&mut daemon, "eng-1");
2539        daemon.maybe_intervene_owned_tasks().unwrap();
2540
2541        let state = daemon.owned_task_interventions.get_mut("eng-1").unwrap();
2542        state.detected_at = Instant::now() - Duration::from_secs(119);
2543
2544        daemon.maybe_intervene_owned_tasks().unwrap();
2545
2546        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2547        assert!(
2548            daemon
2549                .owned_task_interventions
2550                .get("eng-1")
2551                .is_some_and(|state| !state.escalation_sent)
2552        );
2553    }
2554
2555    #[test]
2556    fn maybe_intervene_review_backlog_queues_for_idle_manager_with_branch_and_worktree_context() {
2557        let tmp = tempfile::tempdir().unwrap();
2558        let repo = init_git_repo(&tmp, "batty-daemon-test");
2559        let team_config_dir = repo.join(".batty").join("team_config");
2560        let worktree_dir = repo.join(".batty").join("worktrees").join("eng-1");
2561        setup_engineer_worktree(&repo, &worktree_dir, "eng-1", &team_config_dir).unwrap();
2562        write_owned_task_file(&repo, 191, "review-task", "review", "eng-1");
2563
2564        let mut daemon = TestDaemonBuilder::new(&repo)
2565            .members(vec![
2566                manager_member("lead", Some("architect")),
2567                engineer_member("eng-1", Some("lead"), true),
2568            ])
2569            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2570            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2571            .build();
2572
2573        let root = inbox::inboxes_root(&repo);
2574        inbox::init_inbox(&root, "lead").unwrap();
2575
2576        daemon.update_automation_timers_for_state("lead", MemberState::Working);
2577        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2578        backdate_idle_grace(&mut daemon, "lead");
2579        daemon.maybe_intervene_review_backlog().unwrap();
2580
2581        let pending = inbox::pending_messages(&root, "lead").unwrap();
2582        assert_eq!(pending.len(), 1);
2583        assert_eq!(pending[0].from, "architect");
2584        assert!(pending[0].body.contains("Review backlog detected"));
2585        assert!(pending[0].body.contains("#191 by eng-1"));
2586        assert!(pending[0].body.contains("batty inbox lead"));
2587        assert!(pending[0].body.contains("batty read lead <ref>"));
2588        assert!(pending[0].body.contains("batty merge eng-1"));
2589        assert!(pending[0].body.contains("kanban-md move --dir"));
2590        assert!(pending[0].body.contains("191 done"));
2591        assert!(pending[0].body.contains("191 archived"));
2592        assert!(pending[0].body.contains("191 in-progress"));
2593        assert!(pending[0].body.contains("batty assign eng-1"));
2594        assert!(pending[0].body.contains("batty send architect"));
2595        assert!(
2596            pending[0]
2597                .body
2598                .contains(worktree_dir.to_string_lossy().as_ref())
2599        );
2600        assert!(pending[0].body.contains("branch: eng-1"));
2601        assert_eq!(
2602            daemon
2603                .owned_task_interventions
2604                .get("review::lead")
2605                .map(|state| state.idle_epoch),
2606            Some(1)
2607        );
2608    }
2609
2610    #[test]
2611    fn maybe_intervene_review_backlog_does_not_fire_on_startup_idle() {
2612        let tmp = tempfile::tempdir().unwrap();
2613        write_owned_task_file(tmp.path(), 191, "review-task", "review", "eng-1");
2614
2615        let mut daemon = TestDaemonBuilder::new(tmp.path())
2616            .members(vec![
2617                manager_member("lead", Some("architect")),
2618                engineer_member("eng-1", Some("lead"), false),
2619            ])
2620            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2621            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2622            .build();
2623
2624        let root = inbox::inboxes_root(tmp.path());
2625        inbox::init_inbox(&root, "lead").unwrap();
2626
2627        daemon.maybe_intervene_review_backlog().unwrap();
2628
2629        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2630        assert!(!daemon.owned_task_interventions.contains_key("review::lead"));
2631    }
2632
2633    #[test]
2634    fn maybe_intervene_manager_dispatch_gap_queues_for_idle_lead_with_idle_reports() {
2635        let tmp = tempfile::tempdir().unwrap();
2636        let mut daemon = TestDaemonBuilder::new(tmp.path())
2637            .members(vec![
2638                architect_member("architect"),
2639                manager_member("lead", Some("architect")),
2640                engineer_member("eng-1", Some("lead"), false),
2641                engineer_member("eng-2", Some("lead"), false),
2642            ])
2643            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2644            .states(HashMap::from([
2645                ("lead".to_string(), MemberState::Idle),
2646                ("eng-1".to_string(), MemberState::Idle),
2647                ("eng-2".to_string(), MemberState::Idle),
2648            ]))
2649            .build();
2650
2651        let root = inbox::inboxes_root(tmp.path());
2652        inbox::init_inbox(&root, "lead").unwrap();
2653        inbox::init_inbox(&root, "eng-1").unwrap();
2654        inbox::init_inbox(&root, "eng-2").unwrap();
2655        write_owned_task_file(tmp.path(), 191, "active-task", "in-progress", "eng-1");
2656        let tasks_dir = tmp
2657            .path()
2658            .join(".batty")
2659            .join("team_config")
2660            .join("board")
2661            .join("tasks");
2662        std::fs::write(
2663            tasks_dir.join("192-open-task.md"),
2664            "---\nid: 192\ntitle: open-task\nstatus: todo\npriority: high\nclass: standard\n---\n\nTask description.\n",
2665        )
2666        .unwrap();
2667
2668        backdate_idle_grace(&mut daemon, "lead");
2669        daemon.maybe_intervene_manager_dispatch_gap().unwrap();
2670
2671        let pending = inbox::pending_messages(&root, "lead").unwrap();
2672        assert_eq!(pending.len(), 1);
2673        assert_eq!(pending[0].from, "architect");
2674        assert!(pending[0].body.contains("Dispatch recovery needed"));
2675        assert!(pending[0].body.contains("eng-1 on #191"));
2676        assert!(pending[0].body.contains("eng-2"));
2677        assert!(pending[0].body.contains("batty status"));
2678        assert!(pending[0].body.contains("batty send eng-1"));
2679        assert!(pending[0].body.contains("batty assign eng-2"));
2680        assert!(pending[0].body.contains("batty send architect"));
2681        assert!(
2682            daemon
2683                .owned_task_interventions
2684                .contains_key("dispatch::lead")
2685        );
2686    }
2687
2688    #[test]
2689    fn maybe_intervene_architect_utilization_queues_for_underloaded_idle_architect() {
2690        let tmp = tempfile::tempdir().unwrap();
2691        let mut daemon = TestDaemonBuilder::new(tmp.path())
2692            .members(vec![
2693                architect_member("architect"),
2694                manager_member("lead", Some("architect")),
2695                engineer_member("eng-1", Some("lead"), false),
2696                engineer_member("eng-2", Some("lead"), false),
2697            ])
2698            .pane_map(HashMap::from([(
2699                "architect".to_string(),
2700                "%999".to_string(),
2701            )]))
2702            .states(HashMap::from([
2703                ("architect".to_string(), MemberState::Idle),
2704                ("lead".to_string(), MemberState::Idle),
2705                ("eng-1".to_string(), MemberState::Idle),
2706                ("eng-2".to_string(), MemberState::Idle),
2707            ]))
2708            .build();
2709
2710        let root = inbox::inboxes_root(tmp.path());
2711        inbox::init_inbox(&root, "architect").unwrap();
2712        write_owned_task_file(tmp.path(), 191, "active-task", "in-progress", "eng-1");
2713        let tasks_dir = tmp
2714            .path()
2715            .join(".batty")
2716            .join("team_config")
2717            .join("board")
2718            .join("tasks");
2719        std::fs::write(
2720            tasks_dir.join("192-open-task.md"),
2721            "---\nid: 192\ntitle: open-task\nstatus: backlog\npriority: high\nclass: standard\n---\n\nTask description.\n",
2722        )
2723        .unwrap();
2724
2725        backdate_idle_grace(&mut daemon, "architect");
2726        daemon.maybe_intervene_architect_utilization().unwrap();
2727
2728        let pending = inbox::pending_messages(&root, "architect").unwrap();
2729        assert_eq!(pending.len(), 1);
2730        assert_eq!(pending[0].from, "daemon");
2731        assert!(pending[0].body.contains("Utilization recovery needed"));
2732        assert!(pending[0].body.contains("eng-1 on #191"));
2733        assert!(pending[0].body.contains("eng-2"));
2734        assert!(pending[0].body.contains("batty status"));
2735        assert!(pending[0].body.contains("batty send lead"));
2736        assert!(pending[0].body.contains("Start Task #192 on eng-2"));
2737        assert!(
2738            daemon
2739                .owned_task_interventions
2740                .contains_key("utilization::architect")
2741        );
2742    }
2743
2744    #[test]
2745    fn zero_engineers_topology_skips_executor_interventions() {
2746        let tmp = tempfile::tempdir().unwrap();
2747        let mut daemon = TestDaemonBuilder::new(tmp.path())
2748            .members(vec![
2749                architect_member("architect"),
2750                manager_member("lead", Some("architect")),
2751            ])
2752            .pane_map(HashMap::from([
2753                ("architect".to_string(), "%998".to_string()),
2754                ("lead".to_string(), "%999".to_string()),
2755            ]))
2756            .states(HashMap::from([
2757                ("architect".to_string(), MemberState::Idle),
2758                ("lead".to_string(), MemberState::Idle),
2759            ]))
2760            .build();
2761
2762        let root = inbox::inboxes_root(tmp.path());
2763        inbox::init_inbox(&root, "architect").unwrap();
2764        inbox::init_inbox(&root, "lead").unwrap();
2765        write_open_task_file(tmp.path(), 191, "queued-task", "todo");
2766
2767        backdate_idle_grace(&mut daemon, "architect");
2768        backdate_idle_grace(&mut daemon, "lead");
2769        daemon.maybe_intervene_manager_dispatch_gap().unwrap();
2770        daemon.maybe_intervene_architect_utilization().unwrap();
2771
2772        assert!(
2773            inbox::pending_messages(&root, "architect")
2774                .unwrap()
2775                .is_empty()
2776        );
2777        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2778        assert!(
2779            !daemon
2780                .owned_task_interventions
2781                .contains_key("dispatch::lead")
2782        );
2783        assert!(
2784            !daemon
2785                .owned_task_interventions
2786                .contains_key("utilization::architect")
2787        );
2788    }
2789
2790    #[test]
2791    fn single_role_topology_nudges_idle_member() {
2792        let tmp = tempfile::tempdir().unwrap();
2793        let mut daemon = TestDaemonBuilder::new(tmp.path())
2794            .members(vec![architect_member("solo")])
2795            .pane_map(HashMap::from([("solo".to_string(), "%999".to_string())]))
2796            .states(HashMap::from([("solo".to_string(), MemberState::Idle)]))
2797            .nudges(HashMap::from([(
2798                "solo".to_string(),
2799                NudgeSchedule {
2800                    text: "Solo mode should keep moving.".to_string(),
2801                    interval: Duration::from_secs(1),
2802                    idle_since: Some(Instant::now() - Duration::from_secs(5)),
2803                    fired_this_idle: false,
2804                    paused: false,
2805                },
2806            )]))
2807            .build();
2808
2809        let root = inbox::inboxes_root(tmp.path());
2810        inbox::init_inbox(&root, "solo").unwrap();
2811
2812        backdate_idle_grace(&mut daemon, "solo");
2813        daemon.maybe_fire_nudges().unwrap();
2814
2815        let pending = inbox::pending_messages(&root, "solo").unwrap();
2816        assert_eq!(pending.len(), 1);
2817        assert_eq!(pending[0].from, "daemon");
2818        assert!(pending[0].body.contains("Solo mode should keep moving."));
2819        assert!(pending[0].body.contains("Idle nudge:"));
2820        assert_eq!(daemon.states.get("solo"), Some(&MemberState::Idle));
2821        assert!(
2822            daemon
2823                .nudges
2824                .get("solo")
2825                .is_some_and(|schedule| schedule.fired_this_idle)
2826        );
2827    }
2828
2829    #[test]
2830    fn all_members_working_suppresses_interventions() {
2831        let tmp = tempfile::tempdir().unwrap();
2832        let mut daemon = TestDaemonBuilder::new(tmp.path())
2833            .members(vec![
2834                architect_member("architect"),
2835                manager_member("lead", Some("architect")),
2836                engineer_member("eng-1", Some("lead"), false),
2837                engineer_member("eng-2", Some("lead"), false),
2838            ])
2839            .pane_map(HashMap::from([
2840                ("architect".to_string(), "%997".to_string()),
2841                ("lead".to_string(), "%998".to_string()),
2842                ("eng-1".to_string(), "%999".to_string()),
2843                ("eng-2".to_string(), "%996".to_string()),
2844            ]))
2845            .states(HashMap::from([
2846                ("architect".to_string(), MemberState::Working),
2847                ("lead".to_string(), MemberState::Working),
2848                ("eng-1".to_string(), MemberState::Working),
2849                ("eng-2".to_string(), MemberState::Working),
2850            ]))
2851            .build();
2852
2853        let root = inbox::inboxes_root(tmp.path());
2854        inbox::init_inbox(&root, "architect").unwrap();
2855        inbox::init_inbox(&root, "lead").unwrap();
2856        inbox::init_inbox(&root, "eng-1").unwrap();
2857        inbox::init_inbox(&root, "eng-2").unwrap();
2858
2859        let mut triage_message = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
2860        triage_message.timestamp = super::now_unix();
2861        let triage_id = inbox::deliver_to_inbox(&root, &triage_message).unwrap();
2862        inbox::mark_delivered(&root, "lead", &triage_id).unwrap();
2863
2864        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2865        write_owned_task_file(tmp.path(), 192, "review-task", "review", "eng-2");
2866        write_open_task_file(tmp.path(), 193, "open-task", "todo");
2867
2868        daemon.maybe_intervene_triage_backlog().unwrap();
2869        daemon.maybe_intervene_owned_tasks().unwrap();
2870        daemon.maybe_intervene_review_backlog().unwrap();
2871        daemon.maybe_intervene_manager_dispatch_gap().unwrap();
2872        daemon.maybe_intervene_architect_utilization().unwrap();
2873
2874        assert!(
2875            inbox::pending_messages(&root, "architect")
2876                .unwrap()
2877                .is_empty()
2878        );
2879        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2880        assert!(inbox::pending_messages(&root, "eng-1").unwrap().is_empty());
2881        assert!(inbox::pending_messages(&root, "eng-2").unwrap().is_empty());
2882        assert!(daemon.triage_interventions.is_empty());
2883        assert!(daemon.owned_task_interventions.is_empty());
2884    }
2885
2886    #[test]
2887    fn manager_dispatch_gap_skips_when_pending_inbox_exists() {
2888        let tmp = tempfile::tempdir().unwrap();
2889        let mut daemon = TestDaemonBuilder::new(tmp.path())
2890            .members(vec![
2891                architect_member("architect"),
2892                manager_member("lead", Some("architect")),
2893                engineer_member("eng-1", Some("lead"), false),
2894                engineer_member("eng-2", Some("lead"), false),
2895            ])
2896            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2897            .states(HashMap::from([
2898                ("lead".to_string(), MemberState::Idle),
2899                ("eng-1".to_string(), MemberState::Idle),
2900                ("eng-2".to_string(), MemberState::Idle),
2901            ]))
2902            .build();
2903
2904        let root = inbox::inboxes_root(tmp.path());
2905        inbox::init_inbox(&root, "lead").unwrap();
2906        inbox::init_inbox(&root, "eng-1").unwrap();
2907        inbox::init_inbox(&root, "eng-2").unwrap();
2908        let message = inbox::InboxMessage::new_send("architect", "lead", "Handle this first.");
2909        inbox::deliver_to_inbox(&root, &message).unwrap();
2910
2911        write_owned_task_file(tmp.path(), 191, "active-task", "in-progress", "eng-1");
2912        write_open_task_file(tmp.path(), 192, "open-task", "todo");
2913
2914        backdate_idle_grace(&mut daemon, "lead");
2915        daemon.maybe_intervene_manager_dispatch_gap().unwrap();
2916
2917        let pending = inbox::pending_messages(&root, "lead").unwrap();
2918        assert_eq!(pending.len(), 1);
2919        assert_eq!(pending[0].from, "architect");
2920        assert!(
2921            !daemon
2922                .owned_task_interventions
2923                .contains_key("dispatch::lead")
2924        );
2925    }
2926
2927    #[test]
2928    fn owned_task_intervention_refires_at_exact_cooldown_boundary() {
2929        let tmp = tempfile::tempdir().unwrap();
2930        let mut daemon = TestDaemonBuilder::new(tmp.path())
2931            .members(vec![manager_member("lead", Some("architect"))])
2932            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2933            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2934            .build();
2935
2936        let root = inbox::inboxes_root(tmp.path());
2937        inbox::init_inbox(&root, "lead").unwrap();
2938        write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2939
2940        let cooldown = Duration::from_secs(
2941            daemon
2942                .config
2943                .team_config
2944                .automation
2945                .intervention_cooldown_secs,
2946        );
2947
2948        backdate_idle_grace(&mut daemon, "lead");
2949        daemon.intervention_cooldowns.insert(
2950            "lead".to_string(),
2951            Instant::now() - (cooldown - Duration::from_secs(1)),
2952        );
2953        daemon.maybe_intervene_owned_tasks().unwrap();
2954        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2955
2956        daemon
2957            .intervention_cooldowns
2958            .insert("lead".to_string(), Instant::now() - cooldown);
2959        daemon.maybe_intervene_owned_tasks().unwrap();
2960
2961        let pending = inbox::pending_messages(&root, "lead").unwrap();
2962        assert_eq!(pending.len(), 1);
2963        assert!(
2964            pending[0]
2965                .body
2966                .contains("Owned active task backlog detected")
2967        );
2968        assert!(daemon.owned_task_interventions.contains_key("lead"));
2969    }
2970
2971    #[test]
2972    fn empty_board_skips_interventions() {
2973        let tmp = tempfile::tempdir().unwrap();
2974        let mut daemon = TestDaemonBuilder::new(tmp.path())
2975            .members(vec![
2976                architect_member("architect"),
2977                manager_member("lead", Some("architect")),
2978                engineer_member("eng-1", Some("lead"), false),
2979            ])
2980            .pane_map(HashMap::from([
2981                ("architect".to_string(), "%997".to_string()),
2982                ("lead".to_string(), "%998".to_string()),
2983                ("eng-1".to_string(), "%999".to_string()),
2984            ]))
2985            .states(HashMap::from([
2986                ("architect".to_string(), MemberState::Idle),
2987                ("lead".to_string(), MemberState::Idle),
2988                ("eng-1".to_string(), MemberState::Idle),
2989            ]))
2990            .build();
2991
2992        std::fs::create_dir_all(
2993            tmp.path()
2994                .join(".batty")
2995                .join("team_config")
2996                .join("board")
2997                .join("tasks"),
2998        )
2999        .unwrap();
3000        let root = inbox::inboxes_root(tmp.path());
3001        inbox::init_inbox(&root, "architect").unwrap();
3002        inbox::init_inbox(&root, "lead").unwrap();
3003        inbox::init_inbox(&root, "eng-1").unwrap();
3004
3005        backdate_idle_grace(&mut daemon, "architect");
3006        backdate_idle_grace(&mut daemon, "lead");
3007        backdate_idle_grace(&mut daemon, "eng-1");
3008
3009        daemon.maybe_intervene_triage_backlog().unwrap();
3010        daemon.maybe_intervene_owned_tasks().unwrap();
3011        daemon.maybe_intervene_review_backlog().unwrap();
3012        daemon.maybe_intervene_manager_dispatch_gap().unwrap();
3013        daemon.maybe_intervene_architect_utilization().unwrap();
3014
3015        assert!(
3016            inbox::pending_messages(&root, "architect")
3017                .unwrap()
3018                .is_empty()
3019        );
3020        assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
3021        assert!(inbox::pending_messages(&root, "eng-1").unwrap().is_empty());
3022        assert!(daemon.triage_interventions.is_empty());
3023        assert!(daemon.owned_task_interventions.is_empty());
3024    }
3025
3026    #[test]
3027    fn test_starvation_detected() {
3028        let tmp = tempfile::tempdir().unwrap();
3029        let mut daemon = starvation_test_daemon(&tmp, Some(1));
3030        let root = inbox::inboxes_root(tmp.path());
3031        inbox::init_inbox(&root, "architect").unwrap();
3032        write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3033
3034        daemon.maybe_detect_pipeline_starvation().unwrap();
3035
3036        let pending = inbox::pending_messages(&root, "architect").unwrap();
3037        assert_eq!(pending.len(), 1);
3038        assert_eq!(pending[0].from, "daemon");
3039        assert_eq!(
3040            pending[0].body,
3041            "Pipeline running dry: 2 idle engineers, 1 todo tasks."
3042        );
3043        assert!(daemon.pipeline_starvation_fired);
3044    }
3045
3046    #[test]
3047    fn test_debounce() {
3048        let tmp = tempfile::tempdir().unwrap();
3049        let mut daemon = starvation_test_daemon(&tmp, Some(1));
3050        let root = inbox::inboxes_root(tmp.path());
3051        inbox::init_inbox(&root, "architect").unwrap();
3052        write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3053
3054        daemon.maybe_detect_pipeline_starvation().unwrap();
3055        daemon.maybe_detect_pipeline_starvation().unwrap();
3056
3057        let pending = inbox::pending_messages(&root, "architect").unwrap();
3058        assert_eq!(pending.len(), 1);
3059        assert!(daemon.pipeline_starvation_fired);
3060    }
3061
3062    #[test]
3063    fn test_threshold_config() {
3064        let tmp = tempfile::tempdir().unwrap();
3065        let mut daemon = starvation_test_daemon(&tmp, Some(2));
3066        let root = inbox::inboxes_root(tmp.path());
3067        inbox::init_inbox(&root, "architect").unwrap();
3068        write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3069
3070        daemon.maybe_detect_pipeline_starvation().unwrap();
3071        assert!(
3072            inbox::pending_messages(&root, "architect")
3073                .unwrap()
3074                .is_empty()
3075        );
3076        assert!(!daemon.pipeline_starvation_fired);
3077
3078        let disabled_tmp = tempfile::tempdir().unwrap();
3079        let mut disabled_daemon = starvation_test_daemon(&disabled_tmp, None);
3080        let disabled_root = inbox::inboxes_root(disabled_tmp.path());
3081        inbox::init_inbox(&disabled_root, "architect").unwrap();
3082        write_open_task_file(disabled_tmp.path(), 101, "queued-task", "todo");
3083
3084        disabled_daemon.maybe_detect_pipeline_starvation().unwrap();
3085        assert!(
3086            inbox::pending_messages(&disabled_root, "architect")
3087                .unwrap()
3088                .is_empty()
3089        );
3090        assert!(!disabled_daemon.pipeline_starvation_fired);
3091    }
3092
3093    #[test]
3094    fn test_reset_when_work_added() {
3095        let tmp = tempfile::tempdir().unwrap();
3096        let mut daemon = starvation_test_daemon(&tmp, Some(1));
3097        let root = inbox::inboxes_root(tmp.path());
3098        inbox::init_inbox(&root, "architect").unwrap();
3099        write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3100
3101        daemon.maybe_detect_pipeline_starvation().unwrap();
3102        assert!(daemon.pipeline_starvation_fired);
3103
3104        // Parity (2 tasks == 2 engineers) does NOT clear — need surplus to reset
3105        write_open_task_file(tmp.path(), 102, "queued-task-2", "backlog");
3106        daemon.maybe_detect_pipeline_starvation().unwrap();
3107        assert!(daemon.pipeline_starvation_fired);
3108
3109        // Surplus (3 tasks > 2 engineers) clears the flag
3110        write_open_task_file(tmp.path(), 103, "queued-task-3", "backlog");
3111        daemon.maybe_detect_pipeline_starvation().unwrap();
3112        assert!(!daemon.pipeline_starvation_fired);
3113
3114        // Remove surplus — back to 1 task for 2 engineers, starvation re-fires after cooldown
3115        std::fs::remove_file(
3116            tmp.path()
3117                .join(".batty")
3118                .join("team_config")
3119                .join("board")
3120                .join("tasks")
3121                .join("102-queued-task-2.md"),
3122        )
3123        .unwrap();
3124        std::fs::remove_file(
3125            tmp.path()
3126                .join(".batty")
3127                .join("team_config")
3128                .join("board")
3129                .join("tasks")
3130                .join("103-queued-task-3.md"),
3131        )
3132        .unwrap();
3133        daemon.pipeline_starvation_last_fired = Some(Instant::now() - Duration::from_secs(301));
3134        daemon.maybe_detect_pipeline_starvation().unwrap();
3135
3136        let pending = inbox::pending_messages(&root, "architect").unwrap();
3137        assert_eq!(pending.len(), 2);
3138        assert!(daemon.pipeline_starvation_fired);
3139    }
3140
3141    #[test]
3142    fn starvation_suppressed_when_engineer_has_active_board_item() {
3143        let tmp = tempfile::tempdir().unwrap();
3144        let mut daemon = starvation_test_daemon(&tmp, Some(1));
3145        let root = inbox::inboxes_root(tmp.path());
3146        inbox::init_inbox(&root, "architect").unwrap();
3147
3148        // Create one unclaimed todo task and one in-review task claimed by eng-1
3149        write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3150        write_board_task_file(
3151            tmp.path(),
3152            102,
3153            "review-task",
3154            "review",
3155            Some("eng-1"),
3156            &[],
3157            None,
3158        );
3159
3160        daemon.maybe_detect_pipeline_starvation().unwrap();
3161
3162        // eng-1 has an active board item (review), so only eng-2 is truly idle.
3163        // 1 idle engineer, 1 unclaimed todo task => no deficit => no alert
3164        let pending = inbox::pending_messages(&root, "architect").unwrap();
3165        assert!(pending.is_empty());
3166        assert!(!daemon.pipeline_starvation_fired);
3167    }
3168
3169    #[test]
3170    fn starvation_suppressed_when_manager_working() {
3171        let tmp = tempfile::tempdir().unwrap();
3172        let mut daemon = TestDaemonBuilder::new(tmp.path())
3173            .members(vec![
3174                architect_member("architect"),
3175                manager_member("lead", Some("architect")),
3176                engineer_member("eng-1", Some("lead"), false),
3177                engineer_member("eng-2", Some("lead"), false),
3178            ])
3179            .workflow_policy(WorkflowPolicy {
3180                pipeline_starvation_threshold: Some(1),
3181                ..WorkflowPolicy::default()
3182            })
3183            .build();
3184        daemon.states = HashMap::from([
3185            ("lead".to_string(), MemberState::Working),
3186            ("eng-1".to_string(), MemberState::Idle),
3187            ("eng-2".to_string(), MemberState::Idle),
3188        ]);
3189        let root = inbox::inboxes_root(tmp.path());
3190        inbox::init_inbox(&root, "architect").unwrap();
3191        write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3192
3193        daemon.maybe_detect_pipeline_starvation().unwrap();
3194
3195        // Manager is working, so starvation alert should be suppressed
3196        let pending = inbox::pending_messages(&root, "architect").unwrap();
3197        assert!(pending.is_empty());
3198    }
3199
3200    #[test]
3201    fn maybe_intervene_triage_backlog_does_not_refire_while_prior_intervention_remains_pending() {
3202        let tmp = tempfile::tempdir().unwrap();
3203        let mut daemon = TestDaemonBuilder::new(tmp.path())
3204            .members(vec![
3205                manager_member("lead", Some("architect")),
3206                engineer_member("eng-1", Some("lead"), false),
3207            ])
3208            .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
3209            .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
3210            .build();
3211
3212        let root = inbox::inboxes_root(tmp.path());
3213        inbox::init_inbox(&root, "lead").unwrap();
3214        inbox::init_inbox(&root, "eng-1").unwrap();
3215        let mut result = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
3216        result.timestamp = super::now_unix();
3217        let id = inbox::deliver_to_inbox(&root, &result).unwrap();
3218        inbox::mark_delivered(&root, "lead", &id).unwrap();
3219
3220        daemon.update_automation_timers_for_state("lead", MemberState::Working);
3221        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
3222        backdate_idle_grace(&mut daemon, "lead");
3223        daemon.maybe_intervene_triage_backlog().unwrap();
3224
3225        daemon
3226            .states
3227            .insert("lead".to_string(), MemberState::Working);
3228        daemon.update_automation_timers_for_state("lead", MemberState::Working);
3229        daemon.states.insert("lead".to_string(), MemberState::Idle);
3230        daemon.update_automation_timers_for_state("lead", MemberState::Idle);
3231        backdate_idle_grace(&mut daemon, "lead");
3232        daemon.maybe_intervene_triage_backlog().unwrap();
3233
3234        assert_eq!(daemon.triage_interventions.get("lead"), Some(&1));
3235        let pending = inbox::pending_messages(&root, "lead").unwrap();
3236        assert_eq!(pending.len(), 1);
3237        assert!(pending.iter().all(|message| message.from == "architect"));
3238    }
3239
3240    #[test]
3241    fn maybe_fire_nudges_keeps_member_idle_when_delivery_falls_back_to_inbox() {
3242        let tmp = tempfile::tempdir().unwrap();
3243        let mut daemon = TestDaemonBuilder::new(tmp.path())
3244            .members(vec![architect_member("scientist")])
3245            .pane_map(HashMap::from([(
3246                "scientist".to_string(),
3247                "%999".to_string(),
3248            )]))
3249            .states(HashMap::from([(
3250                "scientist".to_string(),
3251                MemberState::Idle,
3252            )]))
3253            .nudges(HashMap::from([(
3254                "scientist".to_string(),
3255                NudgeSchedule {
3256                    text: "Please make progress.".to_string(),
3257                    interval: Duration::from_secs(1),
3258                    idle_since: Some(Instant::now() - Duration::from_secs(5)),
3259                    fired_this_idle: false,
3260                    paused: false,
3261                },
3262            )]))
3263            .build();
3264
3265        backdate_idle_grace(&mut daemon, "scientist");
3266        daemon.maybe_fire_nudges().unwrap();
3267
3268        assert_eq!(daemon.states.get("scientist"), Some(&MemberState::Idle));
3269        let schedule = daemon.nudges.get("scientist").unwrap();
3270        assert!(!schedule.paused);
3271        assert!(schedule.fired_this_idle);
3272
3273        let messages =
3274            inbox::pending_messages(&inbox::inboxes_root(tmp.path()), "scientist").unwrap();
3275        assert_eq!(messages.len(), 1);
3276        assert_eq!(messages[0].from, "daemon");
3277        assert!(messages[0].body.contains("Please make progress."));
3278        assert!(messages[0].body.contains("Idle nudge:"));
3279    }
3280
3281    #[test]
3282    fn maybe_fire_nudges_skips_when_pending_inbox_exists() {
3283        let tmp = tempfile::tempdir().unwrap();
3284        let mut daemon = TestDaemonBuilder::new(tmp.path())
3285            .members(vec![architect_member("scientist")])
3286            .pane_map(HashMap::from([(
3287                "scientist".to_string(),
3288                "%999".to_string(),
3289            )]))
3290            .states(HashMap::from([(
3291                "scientist".to_string(),
3292                MemberState::Idle,
3293            )]))
3294            .nudges(HashMap::from([(
3295                "scientist".to_string(),
3296                NudgeSchedule {
3297                    text: "Please make progress.".to_string(),
3298                    interval: Duration::from_secs(1),
3299                    idle_since: Some(Instant::now()),
3300                    fired_this_idle: false,
3301                    paused: false,
3302                },
3303            )]))
3304            .build();
3305
3306        let root = inbox::inboxes_root(tmp.path());
3307        inbox::init_inbox(&root, "scientist").unwrap();
3308        let message =
3309            inbox::InboxMessage::new_send("architect", "scientist", "Process this first.");
3310        inbox::deliver_to_inbox(&root, &message).unwrap();
3311
3312        backdate_idle_grace(&mut daemon, "scientist");
3313        daemon.maybe_fire_nudges().unwrap();
3314
3315        let messages = inbox::pending_messages(&root, "scientist").unwrap();
3316        assert_eq!(messages.len(), 1);
3317        assert_eq!(messages[0].from, "architect");
3318        let schedule = daemon.nudges.get("scientist").unwrap();
3319        assert!(!schedule.fired_this_idle);
3320        assert_eq!(daemon.states.get("scientist"), Some(&MemberState::Idle));
3321    }
3322
3323    #[test]
3324    fn automation_sender_prefers_direct_manager_and_config_fallback() {
3325        let tmp = tempfile::tempdir().unwrap();
3326        let mut daemon = TestDaemonBuilder::new(tmp.path())
3327            .members(vec![
3328                architect_member("architect"),
3329                manager_member("lead", Some("architect")),
3330                engineer_member("eng-1", Some("lead"), false),
3331            ])
3332            .build();
3333        daemon.config.team_config.automation_sender = Some("human".to_string());
3334
3335        assert_eq!(daemon.automation_sender_for("eng-1"), "lead");
3336        assert_eq!(daemon.automation_sender_for("lead"), "architect");
3337        assert_eq!(daemon.automation_sender_for("architect"), "human");
3338
3339        daemon.config.team_config.automation_sender = None;
3340        assert_eq!(daemon.automation_sender_for("architect"), "daemon");
3341    }
3342
3343    #[test]
3344    fn hot_reload_marker_round_trip() {
3345        let tmp = tempfile::tempdir().unwrap();
3346        let marker = hot_reload_marker_path(tmp.path());
3347
3348        write_hot_reload_marker(tmp.path()).unwrap();
3349        assert!(marker.exists());
3350        assert!(consume_hot_reload_marker(tmp.path()));
3351        assert!(!marker.exists());
3352        assert!(!consume_hot_reload_marker(tmp.path()));
3353    }
3354
3355    #[test]
3356    fn hot_reload_resume_args_include_resume_flag() {
3357        let tmp = tempfile::tempdir().unwrap();
3358        let args = hot_reload_daemon_args(tmp.path());
3359        let canonical_root = tmp.path().canonicalize().unwrap();
3360        assert_eq!(
3361            args,
3362            vec![
3363                "-v".to_string(),
3364                "daemon".to_string(),
3365                "--project-root".to_string(),
3366                canonical_root.to_string_lossy().to_string(),
3367                "--resume".to_string(),
3368            ]
3369        );
3370    }
3371
3372    #[test]
3373    fn hot_reload_fingerprint_detects_binary_change() {
3374        let tmp = tempfile::tempdir().unwrap();
3375        let binary = tmp.path().join("batty");
3376        fs::write(&binary, "old-binary").unwrap();
3377        let before = BinaryFingerprint::capture(&binary).unwrap();
3378
3379        std::thread::sleep(Duration::from_millis(1100));
3380        fs::write(&binary, "new-binary-build").unwrap();
3381        let after = BinaryFingerprint::capture(&binary).unwrap();
3382
3383        assert!(after.changed_from(&before));
3384    }
3385
3386    #[test]
3387    fn resume_decision_logged_to_orchestrator() {
3388        let tmp = tempfile::tempdir().unwrap();
3389        let member = MemberInstance {
3390            name: "architect".to_string(),
3391            role_name: "architect".to_string(),
3392            role_type: RoleType::Architect,
3393            agent: Some("claude".to_string()),
3394            prompt: None,
3395            reports_to: None,
3396            use_worktrees: false,
3397        };
3398        let mut daemon = TeamDaemon::new(DaemonConfig {
3399            project_root: tmp.path().to_path_buf(),
3400            team_config: TeamConfig {
3401                name: "test".to_string(),
3402                agent: None,
3403                workflow_mode: WorkflowMode::Hybrid,
3404                workflow_policy: WorkflowPolicy::default(),
3405                board: BoardConfig::default(),
3406                standup: StandupConfig::default(),
3407                automation: AutomationConfig::default(),
3408                automation_sender: None,
3409                external_senders: Vec::new(),
3410                orchestrator_pane: true,
3411                orchestrator_position: OrchestratorPosition::Bottom,
3412                layout: None,
3413                cost: Default::default(),
3414                event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
3415                retro_min_duration_secs: 60,
3416                roles: Vec::new(),
3417            },
3418            session: "test".to_string(),
3419            members: vec![member],
3420            pane_map: HashMap::from([("architect".to_string(), "%999".to_string())]),
3421        })
3422        .unwrap();
3423
3424        daemon.spawn_all_agents(false).unwrap();
3425
3426        let content =
3427            fs::read_to_string(tmp.path().join(".batty").join("orchestrator.log")).unwrap();
3428        assert!(content.contains("resume: architect=no (resume disabled)"));
3429    }
3430
3431    #[test]
3432    fn reconcile_clears_done_task() {
3433        let tmp = tempfile::tempdir().unwrap();
3434        std::fs::create_dir_all(tmp.path().join(".batty").join("team_config")).unwrap();
3435        write_owned_task_file(tmp.path(), 42, "finished-work", "done", "eng-1");
3436        let mut daemon = make_test_daemon(
3437            tmp.path(),
3438            vec![engineer_member("eng-1", Some("manager"), false)],
3439        );
3440        daemon.active_tasks.insert("eng-1".to_string(), 42);
3441
3442        daemon.reconcile_active_tasks().unwrap();
3443
3444        assert_eq!(daemon.active_task_id("eng-1"), None);
3445    }
3446
3447    #[test]
3448    fn reconcile_keeps_in_progress_task() {
3449        let tmp = tempfile::tempdir().unwrap();
3450        std::fs::create_dir_all(tmp.path().join(".batty").join("team_config")).unwrap();
3451        write_owned_task_file(tmp.path(), 42, "active-work", "in-progress", "eng-1");
3452        let mut daemon = make_test_daemon(
3453            tmp.path(),
3454            vec![engineer_member("eng-1", Some("manager"), false)],
3455        );
3456        daemon.active_tasks.insert("eng-1".to_string(), 42);
3457
3458        daemon.reconcile_active_tasks().unwrap();
3459
3460        assert_eq!(daemon.active_task_id("eng-1"), Some(42));
3461    }
3462
3463    #[test]
3464    fn reconcile_clears_missing_task() {
3465        let tmp = tempfile::tempdir().unwrap();
3466        let tasks_dir = tmp
3467            .path()
3468            .join(".batty")
3469            .join("team_config")
3470            .join("board")
3471            .join("tasks");
3472        std::fs::create_dir_all(&tasks_dir).unwrap();
3473        // No task file for ID 99 — it doesn't exist on the board
3474        let mut daemon = make_test_daemon(
3475            tmp.path(),
3476            vec![engineer_member("eng-1", Some("manager"), false)],
3477        );
3478        daemon.active_tasks.insert("eng-1".to_string(), 99);
3479
3480        daemon.reconcile_active_tasks().unwrap();
3481
3482        assert_eq!(daemon.active_task_id("eng-1"), None);
3483    }
3484
3485    #[test]
3486    fn production_daemon_has_no_unwrap_or_expect_calls() {
3487        let count = production_unwrap_expect_count(Path::new(file!()));
3488        assert_eq!(count, 0, "production daemon.rs should avoid unwrap/expect");
3489    }
3490
3491    #[test]
3492    fn non_git_repo_disables_worktrees() {
3493        use crate::team::harness::TestHarness;
3494        use crate::team::test_support::engineer_member;
3495
3496        let harness = TestHarness::new()
3497            .with_member(engineer_member("eng-1", Some("manager"), true))
3498            .with_member_state("eng-1", MemberState::Idle);
3499        let daemon = harness.build_daemon().unwrap();
3500
3501        // Test harness temp dir is not a git repo
3502        assert!(!daemon.is_git_repo);
3503        // member_uses_worktrees should return false even though the member config has use_worktrees=true
3504        assert!(
3505            !daemon.member_uses_worktrees("eng-1"),
3506            "worktrees should be disabled when project is not a git repo"
3507        );
3508    }
3509
3510    #[test]
3511    fn git_repo_enables_worktrees() {
3512        use crate::team::harness::TestHarness;
3513        use crate::team::test_support::engineer_member;
3514
3515        let harness = TestHarness::new()
3516            .with_member(engineer_member("eng-1", Some("manager"), true))
3517            .with_member_state("eng-1", MemberState::Idle);
3518        let mut daemon = harness.build_daemon().unwrap();
3519
3520        // Simulate being in a git repo
3521        daemon.is_git_repo = true;
3522
3523        assert!(
3524            daemon.member_uses_worktrees("eng-1"),
3525            "worktrees should be enabled when project is a git repo and member has use_worktrees=true"
3526        );
3527    }
3528
3529    // --- Stale review escalation tests ---
3530
3531    fn write_review_task(project_root: &Path, id: u32, review_owner: &str) {
3532        write_review_task_with_priority(project_root, id, review_owner, "high");
3533    }
3534
3535    fn write_review_task_with_priority(
3536        project_root: &Path,
3537        id: u32,
3538        review_owner: &str,
3539        priority: &str,
3540    ) {
3541        let tasks_dir = project_root
3542            .join(".batty")
3543            .join("team_config")
3544            .join("board")
3545            .join("tasks");
3546        std::fs::create_dir_all(&tasks_dir).unwrap();
3547        std::fs::write(
3548            tasks_dir.join(format!("{id:03}-review-task-{id}.md")),
3549            format!(
3550                "---\nid: {id}\ntitle: review-task-{id}\nstatus: review\npriority: {priority}\nclass: standard\nclaimed_by: eng-1\nreview_owner: {review_owner}\n---\n\nTask description.\n"
3551            ),
3552        )
3553        .unwrap();
3554    }
3555
3556    fn stale_review_daemon(tmp: &tempfile::TempDir) -> TeamDaemon {
3557        TestDaemonBuilder::new(tmp.path())
3558            .members(vec![
3559                architect_member("architect"),
3560                manager_member("manager", Some("architect")),
3561                engineer_member("eng-1", Some("manager"), false),
3562            ])
3563            .workflow_policy(WorkflowPolicy {
3564                review_nudge_threshold_secs: 1800,
3565                review_timeout_secs: 7200,
3566                ..WorkflowPolicy::default()
3567            })
3568            .build()
3569    }
3570
3571    #[test]
3572    fn stale_review_sends_nudge_at_threshold() {
3573        let tmp = tempfile::tempdir().unwrap();
3574        write_review_task(tmp.path(), 42, "manager");
3575        let mut daemon = stale_review_daemon(&tmp);
3576
3577        // Seed the first_seen time to 1801 seconds ago
3578        let now = SystemTime::now()
3579            .duration_since(UNIX_EPOCH)
3580            .unwrap_or_default()
3581            .as_secs();
3582        daemon.review_first_seen.insert(42, now - 1801);
3583
3584        daemon.maybe_escalate_stale_reviews().unwrap();
3585
3586        // Nudge should have been sent
3587        assert!(daemon.review_nudge_sent.contains(&42));
3588
3589        // Event should be emitted (check event sink wrote something)
3590        let events_path = tmp
3591            .path()
3592            .join(".batty")
3593            .join("team_config")
3594            .join("events.jsonl");
3595        let events = std::fs::read_to_string(&events_path).unwrap_or_default();
3596        assert!(events.contains("review_nudge_sent"));
3597    }
3598
3599    #[test]
3600    fn stale_review_escalates_at_timeout() {
3601        let tmp = tempfile::tempdir().unwrap();
3602        write_review_task(tmp.path(), 42, "manager");
3603        let mut daemon = stale_review_daemon(&tmp);
3604
3605        // Seed the first_seen time to 7201 seconds ago
3606        let now = SystemTime::now()
3607            .duration_since(UNIX_EPOCH)
3608            .unwrap_or_default()
3609            .as_secs();
3610        daemon.review_first_seen.insert(42, now - 7201);
3611
3612        daemon.maybe_escalate_stale_reviews().unwrap();
3613
3614        // Task should no longer be tracked (it was escalated)
3615        assert!(!daemon.review_first_seen.contains_key(&42));
3616        assert!(!daemon.review_nudge_sent.contains(&42));
3617
3618        // Task should be transitioned to blocked
3619        let tasks_dir = tmp
3620            .path()
3621            .join(".batty")
3622            .join("team_config")
3623            .join("board")
3624            .join("tasks");
3625        let tasks = crate::task::load_tasks_from_dir(&tasks_dir).unwrap();
3626        let task = tasks.iter().find(|t| t.id == 42).unwrap();
3627        assert_eq!(task.status, "blocked");
3628        assert_eq!(
3629            task.blocked_on.as_deref(),
3630            Some("review timeout escalated to architect")
3631        );
3632
3633        // Event should be emitted
3634        let events_path = tmp
3635            .path()
3636            .join(".batty")
3637            .join("team_config")
3638            .join("events.jsonl");
3639        let events = std::fs::read_to_string(&events_path).unwrap_or_default();
3640        assert!(events.contains("review_escalated"));
3641    }
3642
3643    #[test]
3644    fn nudge_only_sent_once() {
3645        let tmp = tempfile::tempdir().unwrap();
3646        write_review_task(tmp.path(), 42, "manager");
3647        let mut daemon = stale_review_daemon(&tmp);
3648
3649        let now = SystemTime::now()
3650            .duration_since(UNIX_EPOCH)
3651            .unwrap_or_default()
3652            .as_secs();
3653        daemon.review_first_seen.insert(42, now - 1801);
3654
3655        // First call: nudge sent
3656        daemon.maybe_escalate_stale_reviews().unwrap();
3657        assert!(daemon.review_nudge_sent.contains(&42));
3658
3659        // Count events
3660        let events_path = tmp
3661            .path()
3662            .join(".batty")
3663            .join("team_config")
3664            .join("events.jsonl");
3665        let events_before = std::fs::read_to_string(&events_path)
3666            .unwrap_or_default()
3667            .matches("review_nudge_sent")
3668            .count();
3669
3670        // Second call: nudge should NOT fire again
3671        daemon.maybe_escalate_stale_reviews().unwrap();
3672        let events_after = std::fs::read_to_string(&events_path)
3673            .unwrap_or_default()
3674            .matches("review_nudge_sent")
3675            .count();
3676
3677        assert_eq!(events_before, events_after, "nudge should not fire twice");
3678    }
3679
3680    #[test]
3681    fn config_nudge_threshold_defaults() {
3682        let policy = WorkflowPolicy::default();
3683        assert_eq!(policy.review_nudge_threshold_secs, 1800);
3684        assert_eq!(policy.review_timeout_secs, 7200);
3685    }
3686
3687    // --- Per-priority review timeout override tests ---
3688
3689    fn stale_review_daemon_with_overrides(tmp: &tempfile::TempDir) -> TeamDaemon {
3690        use crate::team::config::ReviewTimeoutOverride;
3691        let mut overrides = std::collections::HashMap::new();
3692        overrides.insert(
3693            "critical".to_string(),
3694            ReviewTimeoutOverride {
3695                review_nudge_threshold_secs: Some(300),
3696                review_timeout_secs: Some(600),
3697            },
3698        );
3699        overrides.insert(
3700            "high".to_string(),
3701            ReviewTimeoutOverride {
3702                review_nudge_threshold_secs: Some(900),
3703                review_timeout_secs: Some(3600),
3704            },
3705        );
3706        TestDaemonBuilder::new(tmp.path())
3707            .members(vec![
3708                architect_member("architect"),
3709                manager_member("manager", Some("architect")),
3710                engineer_member("eng-1", Some("manager"), false),
3711            ])
3712            .workflow_policy(WorkflowPolicy {
3713                review_nudge_threshold_secs: 1800,
3714                review_timeout_secs: 7200,
3715                review_timeout_overrides: overrides,
3716                ..WorkflowPolicy::default()
3717            })
3718            .build()
3719    }
3720
3721    #[test]
3722    fn critical_task_nudges_at_priority_override_threshold() {
3723        let tmp = tempfile::tempdir().unwrap();
3724        write_review_task_with_priority(tmp.path(), 50, "manager", "critical");
3725        let mut daemon = stale_review_daemon_with_overrides(&tmp);
3726
3727        let now = SystemTime::now()
3728            .duration_since(UNIX_EPOCH)
3729            .unwrap_or_default()
3730            .as_secs();
3731        // 301s > critical nudge threshold of 300s
3732        daemon.review_first_seen.insert(50, now - 301);
3733
3734        daemon.maybe_escalate_stale_reviews().unwrap();
3735
3736        assert!(
3737            daemon.review_nudge_sent.contains(&50),
3738            "critical task should be nudged at 300s override"
3739        );
3740    }
3741
3742    #[test]
3743    fn critical_task_not_nudged_below_override_threshold() {
3744        let tmp = tempfile::tempdir().unwrap();
3745        write_review_task_with_priority(tmp.path(), 50, "manager", "critical");
3746        let mut daemon = stale_review_daemon_with_overrides(&tmp);
3747
3748        let now = SystemTime::now()
3749            .duration_since(UNIX_EPOCH)
3750            .unwrap_or_default()
3751            .as_secs();
3752        // 200s < critical nudge threshold of 300s
3753        daemon.review_first_seen.insert(50, now - 200);
3754
3755        daemon.maybe_escalate_stale_reviews().unwrap();
3756
3757        assert!(
3758            !daemon.review_nudge_sent.contains(&50),
3759            "critical task should not be nudged before 300s"
3760        );
3761    }
3762
3763    #[test]
3764    fn critical_task_escalates_at_priority_override_threshold() {
3765        let tmp = tempfile::tempdir().unwrap();
3766        write_review_task_with_priority(tmp.path(), 50, "manager", "critical");
3767        let mut daemon = stale_review_daemon_with_overrides(&tmp);
3768
3769        let now = SystemTime::now()
3770            .duration_since(UNIX_EPOCH)
3771            .unwrap_or_default()
3772            .as_secs();
3773        // 601s > critical escalation threshold of 600s
3774        daemon.review_first_seen.insert(50, now - 601);
3775
3776        daemon.maybe_escalate_stale_reviews().unwrap();
3777
3778        // Task escalated — removed from tracking
3779        assert!(!daemon.review_first_seen.contains_key(&50));
3780
3781        // Task should be blocked
3782        let tasks_dir = tmp
3783            .path()
3784            .join(".batty")
3785            .join("team_config")
3786            .join("board")
3787            .join("tasks");
3788        let tasks = crate::task::load_tasks_from_dir(&tasks_dir).unwrap();
3789        let task = tasks.iter().find(|t| t.id == 50).unwrap();
3790        assert_eq!(task.status, "blocked");
3791    }
3792
3793    #[test]
3794    fn medium_task_uses_global_thresholds_when_no_override() {
3795        let tmp = tempfile::tempdir().unwrap();
3796        write_review_task_with_priority(tmp.path(), 51, "manager", "medium");
3797        let mut daemon = stale_review_daemon_with_overrides(&tmp);
3798
3799        let now = SystemTime::now()
3800            .duration_since(UNIX_EPOCH)
3801            .unwrap_or_default()
3802            .as_secs();
3803        // 1000s > critical override (300s) but < global nudge (1800s)
3804        daemon.review_first_seen.insert(51, now - 1000);
3805
3806        daemon.maybe_escalate_stale_reviews().unwrap();
3807
3808        assert!(
3809            !daemon.review_nudge_sent.contains(&51),
3810            "medium task should use global 1800s threshold, not critical 300s"
3811        );
3812    }
3813
3814    #[test]
3815    fn mixed_priority_tasks_get_different_thresholds() {
3816        let tmp = tempfile::tempdir().unwrap();
3817        write_review_task_with_priority(tmp.path(), 60, "manager", "critical");
3818        write_review_task_with_priority(tmp.path(), 61, "manager", "medium");
3819        let mut daemon = stale_review_daemon_with_overrides(&tmp);
3820
3821        let now = SystemTime::now()
3822            .duration_since(UNIX_EPOCH)
3823            .unwrap_or_default()
3824            .as_secs();
3825        // Both at 400s age: exceeds critical nudge (300s) but not medium nudge (1800s)
3826        daemon.review_first_seen.insert(60, now - 400);
3827        daemon.review_first_seen.insert(61, now - 400);
3828
3829        daemon.maybe_escalate_stale_reviews().unwrap();
3830
3831        assert!(
3832            daemon.review_nudge_sent.contains(&60),
3833            "critical task should be nudged at 400s (threshold 300s)"
3834        );
3835        assert!(
3836            !daemon.review_nudge_sent.contains(&61),
3837            "medium task should NOT be nudged at 400s (threshold 1800s)"
3838        );
3839    }
3840}