1use std::collections::{HashMap, HashSet};
19use std::fs;
20use std::path::{Path, PathBuf};
21use std::time::{Duration, Instant, SystemTime};
22
23use anyhow::{Context, Result, bail};
24use serde::{Deserialize, Serialize};
25use tracing::{debug, info, warn};
26use uuid::Uuid;
27
28use super::board;
29use super::board_cmd;
30use super::comms::{self, Channel};
31#[cfg(test)]
32use super::config::OrchestratorPosition;
33use super::config::{RoleType, TeamConfig};
34use super::delivery::{FailedDelivery, PendingMessage};
35use super::events::EventSink;
36use super::events::TeamEvent;
37use super::failure_patterns::FailureTracker;
38use super::hierarchy::MemberInstance;
39use super::inbox;
40use super::merge;
41use super::standup::{self, MemberState};
42use super::status;
43use super::task_cmd;
44#[cfg(test)]
45use super::task_loop::next_unclaimed_task;
46use super::task_loop::{
47 branch_is_merged_into, checkout_worktree_branch_from_main, current_worktree_branch,
48 engineer_base_branch_name, is_worktree_safe_to_mutate, setup_engineer_worktree,
49};
50use super::watcher::{SessionTrackerConfig, SessionWatcher, WatcherState};
51use super::{AssignmentDeliveryResult, AssignmentResultStatus, now_unix, store_assignment_result};
52use crate::agent::{self, BackendHealth};
53use crate::tmux;
54use dispatch::DispatchQueueEntry;
55
56#[path = "daemon/automation.rs"]
57mod automation;
58#[path = "dispatch/mod.rs"]
59mod dispatch;
60#[path = "daemon/error_handling.rs"]
61mod error_handling;
62#[path = "daemon/health.rs"]
63mod health;
64#[path = "daemon/interventions/mod.rs"]
65mod interventions;
66#[path = "launcher.rs"]
67mod launcher;
68#[path = "telegram_bridge.rs"]
69mod telegram_bridge;
70#[path = "daemon/telemetry.rs"]
71mod telemetry;
72
73#[cfg(test)]
74use self::dispatch::normalized_assignment_dir;
75pub(crate) use self::interventions::NudgeSchedule;
76use self::interventions::OwnedTaskInterventionState;
77use self::launcher::{
78 duplicate_claude_session_ids, load_launch_state, member_session_tracker_config,
79};
80pub(super) use super::delivery::MessageDelivery;
81
82pub struct DaemonConfig {
84 pub project_root: PathBuf,
85 pub team_config: TeamConfig,
86 pub session: String,
87 pub members: Vec<MemberInstance>,
88 pub pane_map: HashMap<String, String>,
89}
90
91const HOT_RELOAD_CHECK_INTERVAL: Duration = Duration::from_secs(30);
92const HOT_RELOAD_MIN_INTERVAL: Duration = Duration::from_secs(30);
93
94pub struct TeamDaemon {
96 pub(super) config: DaemonConfig,
97 pub(super) watchers: HashMap<String, SessionWatcher>,
98 pub(super) states: HashMap<String, MemberState>,
99 pub(super) idle_started_at: HashMap<String, Instant>,
100 pub(super) active_tasks: HashMap<String, u32>,
101 pub(super) retry_counts: HashMap<String, u32>,
102 pub(super) dispatch_queue: Vec<DispatchQueueEntry>,
103 pub(super) triage_idle_epochs: HashMap<String, u64>,
104 pub(super) triage_interventions: HashMap<String, u64>,
105 pub(super) owned_task_interventions: HashMap<String, OwnedTaskInterventionState>,
106 pub(super) intervention_cooldowns: HashMap<String, Instant>,
107 pub(super) channels: HashMap<String, Box<dyn Channel>>,
108 pub(super) nudges: HashMap<String, NudgeSchedule>,
109 pub(super) telegram_bot: Option<super::telegram::TelegramBot>,
110 pub(super) failure_tracker: FailureTracker,
111 pub(super) event_sink: EventSink,
112 pub(super) paused_standups: HashSet<String>,
113 pub(super) last_standup: HashMap<String, Instant>,
114 pub(super) last_board_rotation: Instant,
115 pub(super) last_auto_archive: Instant,
116 pub(super) last_auto_dispatch: Instant,
117 pub(super) pipeline_starvation_fired: bool,
118 pub(super) pipeline_starvation_last_fired: Option<Instant>,
119 pub(super) retro_generated: bool,
120 pub(super) failed_deliveries: Vec<FailedDelivery>,
121 pub(super) review_first_seen: HashMap<u32, u64>,
122 pub(super) review_nudge_sent: HashSet<u32>,
123 pub(super) poll_interval: Duration,
124 pub(super) is_git_repo: bool,
125 pub(super) subsystem_error_counts: HashMap<String, u32>,
127 pub(super) auto_merge_overrides: HashMap<u32, bool>,
128 pub(super) recent_dispatches: HashMap<(u32, String), Instant>,
130 pub(super) telemetry_db: Option<rusqlite::Connection>,
132 pub(super) manual_assign_cooldowns: HashMap<String, Instant>,
134 pub(super) backend_health: HashMap<String, BackendHealth>,
136 pub(super) last_health_check: Instant,
138 pub(super) last_uncommitted_warn: HashMap<String, Instant>,
140 pub(super) pending_delivery_queue: HashMap<String, Vec<PendingMessage>>,
143}
144
145#[derive(Debug, Clone, PartialEq, Eq)]
146struct MemberWorktreeContext {
147 path: PathBuf,
148 branch: Option<String>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
152struct PersistedNudgeState {
153 idle_elapsed_secs: Option<u64>,
154 fired_this_idle: bool,
155 paused: bool,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
159struct PersistedDaemonState {
160 clean_shutdown: bool,
161 saved_at: u64,
162 states: HashMap<String, MemberState>,
163 active_tasks: HashMap<String, u32>,
164 retry_counts: HashMap<String, u32>,
165 #[serde(default)]
166 dispatch_queue: Vec<DispatchQueueEntry>,
167 paused_standups: HashSet<String>,
168 last_standup_elapsed_secs: HashMap<String, u64>,
169 nudge_state: HashMap<String, PersistedNudgeState>,
170 pipeline_starvation_fired: bool,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174struct BinaryFingerprint {
175 path: PathBuf,
176 modified: SystemTime,
177 len: u64,
178 #[cfg(unix)]
179 inode: u64,
180}
181
182impl BinaryFingerprint {
183 fn capture(path: &Path) -> Result<Self> {
184 let metadata =
185 fs::metadata(path).with_context(|| format!("failed to stat {}", path.display()))?;
186 let modified = metadata
187 .modified()
188 .with_context(|| format!("failed to read mtime for {}", path.display()))?;
189 Ok(Self {
190 path: path.to_path_buf(),
191 modified,
192 len: metadata.len(),
193 #[cfg(unix)]
194 inode: std::os::unix::fs::MetadataExt::ino(&metadata),
195 })
196 }
197
198 fn changed_from(&self, previous: &Self) -> bool {
199 self.modified != previous.modified || self.len != previous.len || {
200 #[cfg(unix)]
201 {
202 self.inode != previous.inode
203 }
204 #[cfg(not(unix))]
205 {
206 false
207 }
208 }
209 }
210}
211
212#[derive(Debug, Clone)]
213struct HotReloadMonitor {
214 binary: BinaryFingerprint,
215 last_checked: Instant,
216 last_reload_attempt: Option<Instant>,
217}
218
219impl HotReloadMonitor {
220 fn new(binary: BinaryFingerprint) -> Self {
221 Self {
222 binary,
223 last_checked: Instant::now(),
224 last_reload_attempt: None,
225 }
226 }
227
228 fn for_current_exe() -> Result<Self> {
229 let path = std::env::current_exe().context("failed to resolve current executable")?;
230 Ok(Self::new(BinaryFingerprint::capture(&path)?))
231 }
232
233 fn should_check(&self) -> bool {
234 self.last_checked.elapsed() >= HOT_RELOAD_CHECK_INTERVAL
235 }
236
237 fn changed_binary(&mut self) -> Result<Option<BinaryFingerprint>> {
238 self.last_checked = Instant::now();
239 let current = BinaryFingerprint::capture(&self.binary.path)?;
240 Ok(current.changed_from(&self.binary).then_some(current))
241 }
242
243 fn can_attempt_reload(&self) -> bool {
244 self.last_reload_attempt
245 .map(|instant| instant.elapsed() >= HOT_RELOAD_MIN_INTERVAL)
246 .unwrap_or(true)
247 }
248
249 fn mark_reload_attempt(&mut self) {
250 self.last_reload_attempt = Some(Instant::now());
251 }
252}
253
254impl TeamDaemon {
255 pub(super) fn watcher_mut(&mut self, name: &str) -> Result<&mut SessionWatcher> {
256 self.watchers
257 .get_mut(name)
258 .with_context(|| format!("watcher registry missing member '{name}'"))
259 }
260
261 pub fn new(config: DaemonConfig) -> Result<Self> {
263 let is_git_repo = super::git_cmd::is_git_repo(&config.project_root);
264 if !is_git_repo {
265 info!("Project is not a git repository \u{2014} git operations disabled");
266 }
267
268 let team_config_dir = config.project_root.join(".batty").join("team_config");
269 let events_path = team_config_dir.join("events.jsonl");
270 let event_sink =
271 EventSink::new_with_max_bytes(&events_path, config.team_config.event_log_max_bytes)?;
272
273 let mut watchers = HashMap::new();
275 let stale_secs = config.team_config.standup.interval_secs * 2;
276 for (name, pane_id) in &config.pane_map {
277 let session_tracker = config
278 .members
279 .iter()
280 .find(|member| member.name == *name)
281 .and_then(|member| member_session_tracker_config(&config.project_root, member));
282 watchers.insert(
283 name.clone(),
284 SessionWatcher::new(pane_id, name, stale_secs, session_tracker),
285 );
286 }
287
288 let mut channels: HashMap<String, Box<dyn Channel>> = HashMap::new();
290 for role in &config.team_config.roles {
291 if role.role_type == RoleType::User {
292 if let (Some(ch_type), Some(ch_config)) = (&role.channel, &role.channel_config) {
293 match comms::channel_from_config(ch_type, ch_config) {
294 Ok(ch) => {
295 channels.insert(role.name.clone(), ch);
296 }
297 Err(e) => {
298 warn!(role = %role.name, error = %e, "failed to create channel");
299 }
300 }
301 }
302 }
303 }
304
305 let telegram_bot = telegram_bridge::build_telegram_bot(&config.team_config);
307
308 let states = HashMap::new();
309
310 let mut nudges = HashMap::new();
312 for role in &config.team_config.roles {
313 if let Some(interval_secs) = role.nudge_interval_secs {
314 let prompt_path =
315 role_prompt_path(&team_config_dir, role.prompt.as_deref(), role.role_type);
316 if let Some(nudge_text) = extract_nudge_section(&prompt_path) {
317 let instance_names: Vec<String> = config
319 .members
320 .iter()
321 .filter(|m| m.role_name == role.name)
322 .map(|m| m.name.clone())
323 .collect();
324 for name in instance_names {
325 info!(member = %name, interval_secs, "registered nudge");
326 nudges.insert(
327 name,
328 NudgeSchedule {
329 text: nudge_text.clone(),
330 interval: Duration::from_secs(interval_secs),
331 idle_since: Some(Instant::now()),
333 fired_this_idle: false,
334 paused: false,
335 },
336 );
337 }
338 }
339 }
340 }
341
342 let telemetry_db = match super::telemetry_db::open(&config.project_root) {
344 Ok(conn) => {
345 info!("telemetry database opened");
346 Some(conn)
347 }
348 Err(error) => {
349 warn!(error = %error, "failed to open telemetry database; telemetry disabled");
350 None
351 }
352 };
353
354 Ok(Self {
355 config,
356 watchers,
357 states,
358 idle_started_at: HashMap::new(),
359 active_tasks: HashMap::new(),
360 retry_counts: HashMap::new(),
361 dispatch_queue: Vec::new(),
362 triage_idle_epochs: HashMap::new(),
363 triage_interventions: HashMap::new(),
364 owned_task_interventions: HashMap::new(),
365 intervention_cooldowns: HashMap::new(),
366 channels,
367 nudges,
368 telegram_bot,
369 failure_tracker: FailureTracker::new(20),
370 event_sink,
371 paused_standups: HashSet::new(),
372 last_standup: HashMap::new(),
373 last_board_rotation: Instant::now(),
374 last_auto_archive: Instant::now(),
375 last_auto_dispatch: Instant::now(),
376 pipeline_starvation_fired: false,
377 pipeline_starvation_last_fired: None,
378 retro_generated: false,
379 failed_deliveries: Vec::new(),
380 review_first_seen: HashMap::new(),
381 review_nudge_sent: HashSet::new(),
382 poll_interval: Duration::from_secs(5),
383 is_git_repo,
384 subsystem_error_counts: HashMap::new(),
385 auto_merge_overrides: HashMap::new(),
386 recent_dispatches: HashMap::new(),
387 telemetry_db,
388 manual_assign_cooldowns: HashMap::new(),
389 backend_health: HashMap::new(),
390 last_health_check: Instant::now() - Duration::from_secs(3600),
392 last_uncommitted_warn: HashMap::new(),
393 pending_delivery_queue: HashMap::new(),
394 })
395 }
396
397 pub(super) fn member_nudge_text(&self, member: &MemberInstance) -> Option<String> {
398 let prompt_path = role_prompt_path(
399 &super::team_config_dir(&self.config.project_root),
400 member.prompt.as_deref(),
401 member.role_type,
402 );
403 extract_nudge_section(&prompt_path)
404 }
405
406 pub(super) fn prepend_member_nudge(
407 &self,
408 member: &MemberInstance,
409 body: impl AsRef<str>,
410 ) -> String {
411 let body = body.as_ref();
412 match self.member_nudge_text(member) {
413 Some(nudge) => format!("{nudge}\n\n{body}"),
414 None => body.to_string(),
415 }
416 }
417
418 pub fn run(&mut self, resume: bool) -> Result<()> {
423 self.record_daemon_started();
424 self.acknowledge_hot_reload_marker();
425 info!(session = %self.config.session, resume, "daemon started");
426 self.record_orchestrator_action(format!(
427 "runtime: orchestrator started (mode={}, resume={resume})",
428 self.config.team_config.workflow_mode.as_str()
429 ));
430
431 let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
433 let flag_clone = shutdown_flag.clone();
434 if let Err(e) = ctrlc::set_handler(move || {
435 flag_clone.store(true, std::sync::atomic::Ordering::SeqCst);
436 }) {
437 warn!(error = %e, "failed to install signal handler");
438 }
439
440 self.run_startup_preflight()?;
441
442 self.spawn_all_agents(resume)?;
444 if resume {
445 self.restore_runtime_state();
446 }
447 self.persist_runtime_state(false)?;
448
449 let started_at = Instant::now();
450 let heartbeat_interval = Duration::from_secs(300); let mut last_heartbeat = Instant::now();
452 let mut hot_reload = match HotReloadMonitor::for_current_exe() {
453 Ok(monitor) => Some(monitor),
454 Err(error) => {
455 warn!(error = %error, "failed to initialize daemon hot-reload monitor");
456 None
457 }
458 };
459
460 let shutdown_reason;
462 loop {
463 if shutdown_flag.load(std::sync::atomic::Ordering::SeqCst) {
465 shutdown_reason = "signal";
466 info!("received shutdown signal");
467 break;
468 }
469
470 if !tmux::session_exists(&self.config.session) {
471 shutdown_reason = "session_gone";
472 info!("tmux session gone, shutting down");
473 break;
474 }
475
476 self.run_recoverable_step("poll_watchers", |daemon| daemon.poll_watchers());
478 self.run_recoverable_step("restart_dead_members", |daemon| {
479 daemon.restart_dead_members()
480 });
481 self.run_recoverable_step("sync_launch_state_session_ids", |daemon| {
482 daemon.sync_launch_state_session_ids()
483 });
484 self.run_recoverable_step("drain_legacy_command_queue", |daemon| {
485 daemon.drain_legacy_command_queue()
486 });
487
488 self.run_loop_step("deliver_inbox_messages", |daemon| {
490 daemon.deliver_inbox_messages()
491 });
492 self.run_loop_step("retry_failed_deliveries", |daemon| {
493 daemon.retry_failed_deliveries()
494 });
495
496 self.run_recoverable_step("maybe_intervene_triage_backlog", |daemon| {
498 daemon.maybe_intervene_triage_backlog()
499 });
500 self.run_recoverable_step("maybe_intervene_owned_tasks", |daemon| {
501 daemon.maybe_intervene_owned_tasks()
502 });
503 self.run_recoverable_step("maybe_intervene_review_backlog", |daemon| {
504 daemon.maybe_intervene_review_backlog()
505 });
506 self.run_recoverable_step("maybe_escalate_stale_reviews", |daemon| {
507 daemon.maybe_escalate_stale_reviews()
508 });
509 self.run_recoverable_step("maybe_auto_unblock_blocked_tasks", |daemon| {
510 daemon.maybe_auto_unblock_blocked_tasks()
511 });
512
513 self.run_loop_step("reconcile_active_tasks", |daemon| {
515 daemon.reconcile_active_tasks()
516 });
517 self.run_loop_step("maybe_auto_dispatch", |daemon| daemon.maybe_auto_dispatch());
518 self.run_recoverable_step("maybe_recycle_cron_tasks", |daemon| {
519 daemon.maybe_recycle_cron_tasks()
520 });
521
522 self.run_recoverable_step("maybe_intervene_manager_dispatch_gap", |daemon| {
524 daemon.maybe_intervene_manager_dispatch_gap()
525 });
526 self.run_recoverable_step("maybe_intervene_architect_utilization", |daemon| {
527 daemon.maybe_intervene_architect_utilization()
528 });
529 self.run_recoverable_step("maybe_intervene_board_replenishment", |daemon| {
530 daemon.maybe_intervene_board_replenishment()
531 });
532 self.run_recoverable_step("maybe_detect_pipeline_starvation", |daemon| {
533 daemon.maybe_detect_pipeline_starvation()
534 });
535
536 self.run_recoverable_step_with_catch_unwind("process_telegram_queue", |daemon| {
538 daemon.process_telegram_queue()
539 });
540 self.run_recoverable_step("maybe_fire_nudges", |daemon| daemon.maybe_fire_nudges());
541 self.run_recoverable_step("check_backend_health", |daemon| {
542 daemon.check_backend_health()
543 });
544 self.run_recoverable_step("maybe_reconcile_stale_worktrees", |daemon| {
545 daemon.maybe_reconcile_stale_worktrees()
546 });
547 self.run_recoverable_step("check_worktree_staleness", |daemon| {
548 daemon.check_worktree_staleness()
549 });
550 self.run_recoverable_step("maybe_warn_uncommitted_work", |daemon| {
551 daemon.maybe_warn_uncommitted_work()
552 });
553 self.run_recoverable_step_with_catch_unwind("maybe_generate_standup", |daemon| {
554 let generated =
555 standup::maybe_generate_standup(standup::StandupGenerationContext {
556 project_root: &daemon.config.project_root,
557 team_config: &daemon.config.team_config,
558 members: &daemon.config.members,
559 watchers: &daemon.watchers,
560 states: &daemon.states,
561 pane_map: &daemon.config.pane_map,
562 telegram_bot: daemon.telegram_bot.as_ref(),
563 paused_standups: &daemon.paused_standups,
564 last_standup: &mut daemon.last_standup,
565 backend_health: &daemon.backend_health,
566 })?;
567 for recipient in generated {
568 daemon.record_standup_generated(&recipient);
569 }
570 Ok(())
571 });
572 self.run_recoverable_step("maybe_rotate_board", |daemon| daemon.maybe_rotate_board());
573 self.run_recoverable_step("maybe_auto_archive", |daemon| daemon.maybe_auto_archive());
574 self.run_recoverable_step_with_catch_unwind("maybe_generate_retrospective", |daemon| {
575 daemon.maybe_generate_retrospective()
576 });
577 self.run_recoverable_step("maybe_notify_failure_patterns", |daemon| {
578 daemon.maybe_notify_failure_patterns()
579 });
580 self.run_recoverable_step("maybe_reload_binary", |daemon| {
581 daemon.maybe_hot_reload_binary(hot_reload.as_mut())
582 });
583 status::update_pane_status_labels(status::PaneStatusLabelUpdateContext {
584 project_root: &self.config.project_root,
585 members: &self.config.members,
586 pane_map: &self.config.pane_map,
587 states: &self.states,
588 nudges: &self.nudges,
589 last_standup: &self.last_standup,
590 paused_standups: &self.paused_standups,
591 standup_interval_for_member: |member_name| {
592 standup::standup_interval_for_member_name(
593 &self.config.team_config,
594 &self.config.members,
595 member_name,
596 )
597 },
598 });
599
600 if last_heartbeat.elapsed() >= heartbeat_interval {
602 let uptime = started_at.elapsed().as_secs();
603 self.record_daemon_heartbeat(uptime);
604 if let Err(error) = self.persist_runtime_state(false) {
605 warn!(error = %error, "failed to persist daemon checkpoint");
606 }
607 debug!(uptime_secs = uptime, "daemon heartbeat");
608 last_heartbeat = Instant::now();
609 }
610
611 std::thread::sleep(self.poll_interval);
612 }
613
614 let uptime = started_at.elapsed().as_secs();
615 if let Err(error) = self.persist_runtime_state(true) {
616 warn!(error = %error, "failed to persist final daemon checkpoint");
617 }
618 self.record_daemon_stopped(shutdown_reason, uptime);
619 Ok(())
620 }
621
622 fn maybe_hot_reload_binary(&mut self, monitor: Option<&mut HotReloadMonitor>) -> Result<()> {
623 let Some(monitor) = monitor else {
624 return Ok(());
625 };
626 if !monitor.should_check() {
627 return Ok(());
628 }
629
630 let Some(updated_binary) = monitor.changed_binary()? else {
631 return Ok(());
632 };
633
634 if !monitor.can_attempt_reload() {
635 warn!(
636 path = %updated_binary.path.display(),
637 "binary changed again but reload attempt is rate-limited"
638 );
639 return Ok(());
640 }
641
642 if !binary_is_reloadable(&updated_binary.path) {
643 warn!(
644 path = %updated_binary.path.display(),
645 "binary changed but is not safe to hot-reload yet"
646 );
647 return Ok(());
648 }
649
650 monitor.mark_reload_attempt();
651 self.persist_runtime_state(false)?;
652 self.record_daemon_reloading();
653 self.record_orchestrator_action(format!(
654 "runtime: daemon reloading after binary change ({})",
655 updated_binary.path.display()
656 ));
657 write_hot_reload_marker(&self.config.project_root)?;
658
659 if let Err(error) = exec_reloaded_daemon(&updated_binary.path, &self.config.project_root) {
660 let _ = clear_hot_reload_marker(&self.config.project_root);
661 warn!(
662 path = %updated_binary.path.display(),
663 error = %error,
664 "failed to exec updated daemon binary; continuing on existing process"
665 );
666 self.record_orchestrator_action(format!("runtime: daemon reload failed ({error})"));
667 }
668
669 Ok(())
670 }
671
672 pub(super) fn mark_member_working(&mut self, member_name: &str) {
673 self.states
674 .insert(member_name.to_string(), MemberState::Working);
675 if let Some(watcher) = self.watchers.get_mut(member_name) {
676 watcher.activate();
677 }
678 self.update_automation_timers_for_state(member_name, MemberState::Working);
679 }
680
681 pub(super) fn set_member_idle(&mut self, member_name: &str) {
682 self.states
683 .insert(member_name.to_string(), MemberState::Idle);
684 if let Some(watcher) = self.watchers.get_mut(member_name) {
685 watcher.deactivate();
686 }
687 self.update_automation_timers_for_state(member_name, MemberState::Idle);
688 }
689
690 pub(super) fn active_task_id(&self, engineer: &str) -> Option<u32> {
691 self.active_tasks.get(engineer).copied()
692 }
693
694 pub(super) fn project_root(&self) -> &Path {
695 &self.config.project_root
696 }
697
698 #[cfg(test)]
699 pub(super) fn set_auto_merge_override(&mut self, task_id: u32, enabled: bool) {
700 self.auto_merge_overrides.insert(task_id, enabled);
701 }
702
703 pub(super) fn auto_merge_override(&self, task_id: u32) -> Option<bool> {
704 if let Some(&value) = self.auto_merge_overrides.get(&task_id) {
706 return Some(value);
707 }
708 let disk_overrides = super::auto_merge::load_overrides(&self.config.project_root);
709 disk_overrides.get(&task_id).copied()
710 }
711
712 pub(super) fn worktree_dir(&self, engineer: &str) -> PathBuf {
713 self.config
714 .project_root
715 .join(".batty")
716 .join("worktrees")
717 .join(engineer)
718 }
719
720 pub(super) fn board_dir(&self) -> PathBuf {
721 self.config
722 .project_root
723 .join(".batty")
724 .join("team_config")
725 .join("board")
726 }
727
728 pub(super) fn member_uses_worktrees(&self, engineer: &str) -> bool {
729 if !self.is_git_repo {
730 return false;
731 }
732 self.config
733 .members
734 .iter()
735 .find(|member| member.name == engineer)
736 .map(|member| member.use_worktrees)
737 .unwrap_or(false)
738 }
739
740 pub(super) fn manager_name(&self, engineer: &str) -> Option<String> {
741 self.config
742 .members
743 .iter()
744 .find(|member| member.name == engineer)
745 .and_then(|member| member.reports_to.clone())
746 }
747
748 #[cfg(test)]
749 pub(super) fn set_active_task_for_test(&mut self, engineer: &str, task_id: u32) {
750 self.active_tasks.insert(engineer.to_string(), task_id);
751 }
752
753 #[cfg(test)]
754 pub(super) fn retry_count_for_test(&self, engineer: &str) -> Option<u32> {
755 self.retry_counts.get(engineer).copied()
756 }
757
758 #[cfg(test)]
759 pub(super) fn member_state_for_test(&self, engineer: &str) -> Option<MemberState> {
760 self.states.get(engineer).copied()
761 }
762
763 #[cfg(test)]
764 pub(super) fn set_member_state_for_test(&mut self, engineer: &str, state: MemberState) {
765 self.states.insert(engineer.to_string(), state);
766 }
767
768 pub(super) fn increment_retry(&mut self, engineer: &str) -> u32 {
769 let count = self.retry_counts.entry(engineer.to_string()).or_insert(0);
770 *count += 1;
771 *count
772 }
773
774 pub(super) fn clear_active_task(&mut self, engineer: &str) {
775 self.active_tasks.remove(engineer);
776 self.retry_counts.remove(engineer);
777 super::checkpoint::remove_checkpoint(&self.config.project_root, engineer);
779 }
780
781 pub(super) fn notify_reports_to(&mut self, from_role: &str, msg: &str) -> Result<()> {
783 let parent = self
784 .config
785 .members
786 .iter()
787 .find(|m| m.name == from_role)
788 .and_then(|m| m.reports_to.clone());
789 let Some(parent_name) = parent else {
790 return Ok(());
791 };
792 self.queue_message(from_role, &parent_name, msg)?;
793 self.mark_member_working(&parent_name);
794 Ok(())
795 }
796
797 pub(super) fn update_automation_timers_for_state(
799 &mut self,
800 member_name: &str,
801 new_state: MemberState,
802 ) {
803 match new_state {
804 MemberState::Idle => {
805 self.idle_started_at
806 .insert(member_name.to_string(), Instant::now());
807 }
808 MemberState::Working => {
809 self.idle_started_at.remove(member_name);
810 }
811 }
812 self.update_nudge_for_state(member_name, new_state);
813 standup::update_timer_for_state(
814 &self.config.team_config,
815 &self.config.members,
816 &mut self.paused_standups,
817 &mut self.last_standup,
818 member_name,
819 new_state,
820 );
821 self.update_triage_intervention_for_state(member_name, new_state);
822 }
823
824 fn restore_runtime_state(&mut self) {
825 let Some(state) = load_daemon_state(&self.config.project_root) else {
826 return;
827 };
828
829 self.states = state.states;
830 self.idle_started_at = self
831 .states
832 .iter()
833 .filter(|(_, state)| matches!(state, MemberState::Idle))
834 .map(|(member, _)| (member.clone(), Instant::now()))
835 .collect();
836 self.active_tasks = state.active_tasks;
837 self.retry_counts = state.retry_counts;
838 self.dispatch_queue = state.dispatch_queue;
839 self.paused_standups = state.paused_standups;
840 self.last_standup = standup::restore_timer_state(state.last_standup_elapsed_secs);
841
842 for (member_name, persisted) in state.nudge_state {
843 let Some(schedule) = self.nudges.get_mut(&member_name) else {
844 continue;
845 };
846 schedule.idle_since = persisted.idle_elapsed_secs.map(|elapsed_secs| {
847 Instant::now()
848 .checked_sub(Duration::from_secs(elapsed_secs))
849 .unwrap_or_else(Instant::now)
850 });
851 schedule.fired_this_idle = persisted.fired_this_idle;
852 schedule.paused = persisted.paused;
853 }
854 self.pipeline_starvation_fired = state.pipeline_starvation_fired;
855 }
856
857 fn persist_runtime_state(&self, clean_shutdown: bool) -> Result<()> {
858 let state = PersistedDaemonState {
859 clean_shutdown,
860 saved_at: now_unix(),
861 states: self.states.clone(),
862 active_tasks: self.active_tasks.clone(),
863 retry_counts: self.retry_counts.clone(),
864 dispatch_queue: self.dispatch_queue.clone(),
865 paused_standups: self.paused_standups.clone(),
866 last_standup_elapsed_secs: standup::snapshot_timer_state(&self.last_standup),
867 nudge_state: self
868 .nudges
869 .iter()
870 .map(|(member, schedule)| {
871 (
872 member.clone(),
873 PersistedNudgeState {
874 idle_elapsed_secs: schedule.idle_since.map(|t| t.elapsed().as_secs()),
875 fired_this_idle: schedule.fired_this_idle,
876 paused: schedule.paused,
877 },
878 )
879 })
880 .collect(),
881 pipeline_starvation_fired: self.pipeline_starvation_fired,
882 };
883 save_daemon_state(&self.config.project_root, &state)
884 }
885}
886
887fn describe_command_failure(command: &str, args: &[&str], output: &std::process::Output) -> String {
888 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
889 let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string();
890 let details = if !stderr.is_empty() {
891 stderr
892 } else if !stdout.is_empty() {
893 stdout
894 } else {
895 format!("process exited with status {}", output.status)
896 };
897
898 format!("`{command} {}` failed: {details}", args.join(" "))
899}
900
901fn default_prompt_file_for_role(role_type: RoleType) -> &'static str {
902 match role_type {
903 RoleType::Architect => "architect.md",
904 RoleType::Manager => "manager.md",
905 RoleType::Engineer => "engineer.md",
906 RoleType::User => "architect.md",
907 }
908}
909
910fn role_prompt_path(
911 team_config_dir: &Path,
912 prompt_override: Option<&str>,
913 role_type: RoleType,
914) -> PathBuf {
915 team_config_dir.join(prompt_override.unwrap_or(default_prompt_file_for_role(role_type)))
916}
917
918fn extract_nudge_section(prompt_path: &Path) -> Option<String> {
923 let content = std::fs::read_to_string(prompt_path).ok()?;
924 let mut in_nudge = false;
925 let mut lines = Vec::new();
926
927 for line in content.lines() {
928 if line.starts_with("## Nudge") {
929 in_nudge = true;
930 continue;
931 }
932 if in_nudge {
933 if line.starts_with("## ") {
935 break;
936 }
937 lines.push(line);
938 }
939 }
940
941 if lines.is_empty() {
942 return None;
943 }
944
945 let text = lines.join("\n").trim().to_string();
946 if text.is_empty() { None } else { Some(text) }
947}
948
949fn format_stuck_duration(stuck_age_secs: u64) -> String {
954 if stuck_age_secs >= 3600 {
955 let hours = stuck_age_secs / 3600;
956 let mins = (stuck_age_secs % 3600) / 60;
957 format!("{hours}h {mins}m")
958 } else if stuck_age_secs >= 60 {
959 let mins = stuck_age_secs / 60;
960 let secs = stuck_age_secs % 60;
961 format!("{mins}m {secs}s")
962 } else {
963 format!("{stuck_age_secs}s")
964 }
965}
966
967fn ensure_tmux_session_ready(session: &str) -> Result<()> {
968 if tmux::session_exists(session) {
969 Ok(())
970 } else {
971 bail!("daemon startup pre-flight failed: tmux session '{session}' is missing")
972 }
973}
974
975fn ensure_kanban_available() -> Result<()> {
976 let output = std::process::Command::new("kanban-md")
977 .arg("--help")
978 .output()
979 .context(
980 "daemon startup pre-flight failed while verifying board tooling: could not execute `kanban-md --help`",
981 )?;
982 if output.status.success() {
983 return Ok(());
984 }
985
986 let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
987 let detail = if stderr.is_empty() {
988 "unknown error".to_string()
989 } else {
990 stderr
991 };
992 bail!("daemon startup pre-flight failed: `kanban-md --help` failed: {detail}");
993}
994
995fn board_dir(project_root: &Path) -> PathBuf {
996 project_root
997 .join(".batty")
998 .join("team_config")
999 .join("board")
1000}
1001
1002fn ensure_board_initialized(project_root: &Path) -> Result<bool> {
1003 let board_dir = board_dir(project_root);
1004 if board_dir.join("tasks").is_dir() {
1005 return Ok(false);
1006 }
1007
1008 board_cmd::init(&board_dir).map_err(|error| {
1009 anyhow::anyhow!(
1010 "daemon startup pre-flight failed: unable to initialize board at '{}': {error}",
1011 board_dir.display()
1012 )
1013 })?;
1014 Ok(true)
1015}
1016
1017fn hot_reload_marker_path(project_root: &Path) -> PathBuf {
1018 project_root.join(".batty").join("reload")
1019}
1020
1021fn write_hot_reload_marker(project_root: &Path) -> Result<()> {
1022 let path = hot_reload_marker_path(project_root);
1023 if let Some(parent) = path.parent() {
1024 fs::create_dir_all(parent)
1025 .with_context(|| format!("failed to create {}", parent.display()))?;
1026 }
1027 fs::write(&path, now_unix().to_string())
1028 .with_context(|| format!("failed to write {}", path.display()))?;
1029 Ok(())
1030}
1031
1032fn clear_hot_reload_marker(project_root: &Path) -> Result<()> {
1033 let path = hot_reload_marker_path(project_root);
1034 if !path.exists() {
1035 return Ok(());
1036 }
1037 fs::remove_file(&path).with_context(|| format!("failed to remove {}", path.display()))?;
1038 Ok(())
1039}
1040
1041fn consume_hot_reload_marker(project_root: &Path) -> bool {
1042 let path = hot_reload_marker_path(project_root);
1043 if !path.exists() {
1044 return false;
1045 }
1046 clear_hot_reload_marker(project_root).is_ok()
1047}
1048
1049fn hot_reload_daemon_args(project_root: &Path) -> Vec<String> {
1050 let root = project_root
1051 .canonicalize()
1052 .unwrap_or_else(|_| project_root.to_path_buf())
1053 .to_string_lossy()
1054 .to_string();
1055 vec![
1056 "-v".to_string(),
1057 "daemon".to_string(),
1058 "--project-root".to_string(),
1059 root,
1060 "--resume".to_string(),
1061 ]
1062}
1063
1064fn binary_is_reloadable(path: &Path) -> bool {
1065 let Ok(metadata) = fs::metadata(path) else {
1066 return false;
1067 };
1068 if !metadata.is_file() {
1069 return false;
1070 }
1071
1072 #[cfg(unix)]
1073 {
1074 use std::os::unix::fs::PermissionsExt;
1075 if metadata.permissions().mode() & 0o111 == 0 {
1076 return false;
1077 }
1078 }
1079
1080 #[cfg(target_os = "macos")]
1081 {
1082 let Ok(status) = std::process::Command::new("codesign")
1083 .args(["--verify", path.to_string_lossy().as_ref()])
1084 .status()
1085 else {
1086 return false;
1087 };
1088 if !status.success() {
1089 return false;
1090 }
1091 }
1092
1093 true
1094}
1095
1096#[cfg(unix)]
1097fn exec_reloaded_daemon(executable: &Path, project_root: &Path) -> Result<()> {
1098 use std::os::unix::process::CommandExt;
1099
1100 let error = std::process::Command::new(executable)
1101 .args(hot_reload_daemon_args(project_root))
1102 .exec();
1103 Err(anyhow::Error::new(error).context(format!("failed to exec {}", executable.display())))
1104}
1105
1106#[cfg(not(unix))]
1107fn exec_reloaded_daemon(_executable: &Path, _project_root: &Path) -> Result<()> {
1108 bail!("daemon hot reload via exec is only supported on unix")
1109}
1110
1111fn daemon_state_path(project_root: &Path) -> PathBuf {
1112 super::daemon_state_path(project_root)
1113}
1114
1115fn load_daemon_state(project_root: &Path) -> Option<PersistedDaemonState> {
1116 let path = daemon_state_path(project_root);
1117 let Ok(content) = fs::read_to_string(&path) else {
1118 return None;
1119 };
1120
1121 match serde_json::from_str(&content) {
1122 Ok(state) => Some(state),
1123 Err(error) => {
1124 warn!(path = %path.display(), error = %error, "failed to parse daemon state, ignoring");
1125 None
1126 }
1127 }
1128}
1129
1130pub fn load_dispatch_queue_snapshot(project_root: &Path) -> Vec<DispatchQueueEntry> {
1131 load_daemon_state(project_root)
1132 .map(|state| state.dispatch_queue)
1133 .unwrap_or_default()
1134}
1135
1136fn save_daemon_state(project_root: &Path, state: &PersistedDaemonState) -> Result<()> {
1137 let path = daemon_state_path(project_root);
1138 if let Some(parent) = path.parent() {
1139 fs::create_dir_all(parent)
1140 .with_context(|| format!("failed to create {}", parent.display()))?;
1141 }
1142 let content =
1143 serde_json::to_string_pretty(state).context("failed to serialize daemon state")?;
1144 fs::write(&path, content).with_context(|| format!("failed to write {}", path.display()))?;
1145 Ok(())
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150 use super::*;
1151 use crate::team::config::AutomationConfig;
1152 use crate::team::config::{BoardConfig, RoleDef, StandupConfig, WorkflowMode, WorkflowPolicy};
1153 use crate::team::events::EventSink;
1154 use crate::team::test_helpers::make_test_daemon;
1155 use crate::team::test_support::{
1156 TestDaemonBuilder, architect_member, backdate_idle_grace, engineer_member, init_git_repo,
1157 manager_member, write_board_task_file, write_open_task_file, write_owned_task_file,
1158 };
1159 use std::time::UNIX_EPOCH;
1160
1161 use serial_test::serial;
1162 use std::collections::HashMap;
1163 use std::fs::OpenOptions;
1164 use std::io::Write;
1165 use std::path::{Path, PathBuf};
1166 fn production_unwrap_expect_count(path: &Path) -> usize {
1167 let content = std::fs::read_to_string(path).unwrap();
1168 let test_split = content.split("\n#[cfg(test)]").next().unwrap_or(&content);
1169 test_split
1170 .lines()
1171 .filter(|line| line.contains(".unwrap(") || line.contains(".expect("))
1172 .count()
1173 }
1174 fn setup_fake_codex(
1175 project_root: &Path,
1176 log_root: &Path,
1177 member_name: &str,
1178 ) -> (PathBuf, PathBuf) {
1179 let project_slug = project_root
1180 .file_name()
1181 .map(|name| name.to_string_lossy().into_owned())
1182 .unwrap_or_else(|| "default".to_string());
1183 let fake_bin = std::env::temp_dir().join(format!("batty-bin-{project_slug}-{member_name}"));
1184 let _ = std::fs::remove_dir_all(&fake_bin);
1185 std::fs::create_dir_all(&fake_bin).unwrap();
1186
1187 let fake_log = log_root.join(format!("{member_name}-fake-codex.log"));
1188 let fake_codex = fake_bin.join("codex");
1189 std::fs::write(
1190 &fake_codex,
1191 format!(
1192 "#!/bin/bash\nprintf 'PWD:%s\\nARGS:%s\\n' \"$PWD\" \"$*\" >> '{}'\nsleep 1\n",
1193 fake_log.display()
1194 ),
1195 )
1196 .unwrap();
1197 #[cfg(unix)]
1198 {
1199 use std::os::unix::fs::PermissionsExt;
1200 std::fs::set_permissions(&fake_codex, std::fs::Permissions::from_mode(0o755)).unwrap();
1201 }
1202
1203 (fake_bin, fake_log)
1204 }
1205
1206 fn write_codex_session_meta(cwd: &Path) -> PathBuf {
1207 let home = PathBuf::from(std::env::var("HOME").expect("HOME must be set for tests"));
1208 let session_dir = home
1209 .join(".codex")
1210 .join("sessions")
1211 .join("2099")
1212 .join("12")
1213 .join("31");
1214 std::fs::create_dir_all(&session_dir).unwrap();
1215
1216 let unique = format!(
1217 "batty-daemon-lifecycle-{}-{}.jsonl",
1218 std::process::id(),
1219 SystemTime::now()
1220 .duration_since(std::time::UNIX_EPOCH)
1221 .unwrap()
1222 .as_nanos()
1223 );
1224 let session_file = session_dir.join(unique);
1225 std::fs::write(
1226 &session_file,
1227 format!(
1228 "{{\"type\":\"session_meta\",\"payload\":{{\"cwd\":\"{}\"}}}}\n",
1229 cwd.display()
1230 ),
1231 )
1232 .unwrap();
1233 session_file
1234 }
1235
1236 fn append_codex_task_complete(session_file: &Path) {
1237 let mut handle = OpenOptions::new().append(true).open(session_file).unwrap();
1238 writeln!(
1239 handle,
1240 "{{\"type\":\"event_msg\",\"payload\":{{\"type\":\"task_complete\"}}}}"
1241 )
1242 .unwrap();
1243 handle.flush().unwrap();
1244 }
1245
1246 fn wait_for_log_contains(log_path: &Path, needle: &str) -> String {
1247 (0..300)
1248 .find_map(|_| {
1249 let content = match std::fs::read_to_string(log_path) {
1250 Ok(content) => content,
1251 Err(_) => {
1252 std::thread::sleep(Duration::from_millis(100));
1253 return None;
1254 }
1255 };
1256 if content.contains(needle) {
1257 Some(content)
1258 } else {
1259 std::thread::sleep(Duration::from_millis(100));
1260 None
1261 }
1262 })
1263 .unwrap_or_else(|| panic!("log {} never contained `{needle}`", log_path.display()))
1264 }
1265
1266 fn starvation_test_daemon(tmp: &tempfile::TempDir, threshold: Option<usize>) -> TeamDaemon {
1267 let mut daemon = TestDaemonBuilder::new(tmp.path())
1268 .members(vec![
1269 architect_member("architect"),
1270 engineer_member("eng-1", Some("architect"), false),
1271 engineer_member("eng-2", Some("architect"), false),
1272 ])
1273 .workflow_policy(WorkflowPolicy {
1274 pipeline_starvation_threshold: threshold,
1275 ..WorkflowPolicy::default()
1276 })
1277 .build();
1278 daemon.states = HashMap::from([
1279 ("eng-1".to_string(), MemberState::Idle),
1280 ("eng-2".to_string(), MemberState::Idle),
1281 ]);
1282 daemon
1283 }
1284
1285 #[test]
1286 fn extract_nudge_from_file() {
1287 let tmp = tempfile::NamedTempFile::new().unwrap();
1288 std::fs::write(
1289 tmp.path(),
1290 "# Architect\n\n## Nudge\n\nCheck work.\nUpdate roadmap.\n\n## Other\n\nstuff\n",
1291 )
1292 .unwrap();
1293 let nudge = extract_nudge_section(tmp.path()).unwrap();
1294 assert!(nudge.contains("Check work."));
1295 assert!(nudge.contains("Update roadmap."));
1296 assert!(!nudge.contains("## Other"));
1297 }
1298
1299 #[test]
1300 fn extract_nudge_returns_none_when_absent() {
1301 let tmp = tempfile::NamedTempFile::new().unwrap();
1302 std::fs::write(tmp.path(), "# Engineer\n\n## Workflow\n\n- code\n").unwrap();
1303 assert!(extract_nudge_section(tmp.path()).is_none());
1304 }
1305
1306 #[test]
1307 fn extract_nudge_returns_none_when_malformed() {
1308 let tmp = tempfile::NamedTempFile::new().unwrap();
1309 std::fs::write(
1310 tmp.path(),
1311 "# Engineer\n\n## Nudge\n\n## Workflow\n\n- code\n",
1312 )
1313 .unwrap();
1314 assert!(extract_nudge_section(tmp.path()).is_none());
1315 }
1316
1317 #[test]
1318 fn daemon_registers_per_role_nudge_intervals_from_prompt_sections() {
1319 let tmp = tempfile::tempdir().unwrap();
1320 let team_config_dir = tmp.path().join(".batty").join("team_config");
1321 std::fs::create_dir_all(&team_config_dir).unwrap();
1322 std::fs::write(
1323 team_config_dir.join("manager.md"),
1324 "# Manager\n\n## Nudge\n\nManager follow-up.\n",
1325 )
1326 .unwrap();
1327 std::fs::write(
1328 team_config_dir.join("engineer.md"),
1329 "# Engineer\n\n## Nudge\n\nEngineer follow-up.\n",
1330 )
1331 .unwrap();
1332
1333 let daemon = TeamDaemon::new(DaemonConfig {
1334 project_root: tmp.path().to_path_buf(),
1335 team_config: TeamConfig {
1336 name: "test".to_string(),
1337 agent: None,
1338 workflow_mode: WorkflowMode::Legacy,
1339 board: BoardConfig::default(),
1340 standup: StandupConfig::default(),
1341 automation: AutomationConfig::default(),
1342 automation_sender: None,
1343 external_senders: Vec::new(),
1344 orchestrator_pane: true,
1345 orchestrator_position: OrchestratorPosition::Bottom,
1346 layout: None,
1347 workflow_policy: WorkflowPolicy::default(),
1348 cost: Default::default(),
1349 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1350 retro_min_duration_secs: 60,
1351 roles: vec![
1352 RoleDef {
1353 name: "manager".to_string(),
1354 role_type: RoleType::Manager,
1355 agent: Some("claude".to_string()),
1356 instances: 1,
1357 prompt: None,
1358 talks_to: vec![],
1359 channel: None,
1360 channel_config: None,
1361 nudge_interval_secs: Some(120),
1362 receives_standup: None,
1363 standup_interval_secs: None,
1364 owns: Vec::new(),
1365 use_worktrees: false,
1366 },
1367 RoleDef {
1368 name: "engineer".to_string(),
1369 role_type: RoleType::Engineer,
1370 agent: Some("codex".to_string()),
1371 instances: 1,
1372 prompt: None,
1373 talks_to: vec![],
1374 channel: None,
1375 channel_config: None,
1376 nudge_interval_secs: Some(300),
1377 receives_standup: None,
1378 standup_interval_secs: None,
1379 owns: Vec::new(),
1380 use_worktrees: false,
1381 },
1382 ],
1383 },
1384 session: "test".to_string(),
1385 members: vec![
1386 MemberInstance {
1387 name: "lead".to_string(),
1388 role_name: "manager".to_string(),
1389 role_type: RoleType::Manager,
1390 agent: Some("claude".to_string()),
1391 prompt: None,
1392 reports_to: None,
1393 use_worktrees: false,
1394 },
1395 MemberInstance {
1396 name: "eng-1".to_string(),
1397 role_name: "engineer".to_string(),
1398 role_type: RoleType::Engineer,
1399 agent: Some("codex".to_string()),
1400 prompt: None,
1401 reports_to: Some("lead".to_string()),
1402 use_worktrees: true,
1403 },
1404 ],
1405 pane_map: HashMap::new(),
1406 })
1407 .unwrap();
1408
1409 assert_eq!(
1410 daemon
1411 .nudges
1412 .get("lead")
1413 .map(|schedule| schedule.text.as_str()),
1414 Some("Manager follow-up.")
1415 );
1416 assert_eq!(
1417 daemon.nudges.get("lead").map(|schedule| schedule.interval),
1418 Some(Duration::from_secs(120))
1419 );
1420 assert_eq!(
1421 daemon
1422 .nudges
1423 .get("eng-1")
1424 .map(|schedule| schedule.text.as_str()),
1425 Some("Engineer follow-up.")
1426 );
1427 assert_eq!(
1428 daemon.nudges.get("eng-1").map(|schedule| schedule.interval),
1429 Some(Duration::from_secs(300))
1430 );
1431 }
1432
1433 #[test]
1434 fn format_nudge_status_marks_sent_after_fire() {
1435 let schedule = NudgeSchedule {
1436 text: "check in".to_string(),
1437 interval: Duration::from_secs(600),
1438 idle_since: Some(Instant::now() - Duration::from_secs(601)),
1439 fired_this_idle: true,
1440 paused: false,
1441 };
1442
1443 assert_eq!(
1444 status::format_nudge_status(Some(&schedule)),
1445 " #[fg=magenta]nudge sent#[default]"
1446 );
1447 }
1448
1449 #[test]
1450 fn daemon_state_round_trip_preserves_runtime_fields() {
1451 let tmp = tempfile::tempdir().unwrap();
1452 let state = PersistedDaemonState {
1453 clean_shutdown: false,
1454 saved_at: 123,
1455 states: HashMap::from([("eng-1".to_string(), MemberState::Working)]),
1456 active_tasks: HashMap::from([("eng-1".to_string(), 42)]),
1457 retry_counts: HashMap::from([("eng-1".to_string(), 2)]),
1458 dispatch_queue: vec![DispatchQueueEntry {
1459 engineer: "eng-1".to_string(),
1460 task_id: 77,
1461 task_title: "queued".to_string(),
1462 queued_at: 999,
1463 validation_failures: 1,
1464 last_failure: Some("waiting for stabilization".to_string()),
1465 }],
1466 paused_standups: HashSet::from(["manager".to_string()]),
1467 last_standup_elapsed_secs: HashMap::from([("architect".to_string(), 55)]),
1468 nudge_state: HashMap::from([(
1469 "eng-1".to_string(),
1470 PersistedNudgeState {
1471 idle_elapsed_secs: Some(88),
1472 fired_this_idle: true,
1473 paused: false,
1474 },
1475 )]),
1476 pipeline_starvation_fired: true,
1477 };
1478
1479 save_daemon_state(tmp.path(), &state).unwrap();
1480
1481 let loaded = load_daemon_state(tmp.path()).unwrap();
1482 assert_eq!(loaded, state);
1483 }
1484
1485 #[test]
1486 fn watcher_mut_returns_context_for_unknown_member() {
1487 let tmp = tempfile::tempdir().unwrap();
1488 std::fs::create_dir_all(tmp.path().join(".batty").join("team_config")).unwrap();
1489 let mut daemon = make_test_daemon(tmp.path(), vec![manager_member("manager", None)]);
1490
1491 let error = match daemon.watcher_mut("missing") {
1492 Ok(_) => panic!("expected missing watcher to return an error"),
1493 Err(error) => error,
1494 };
1495
1496 assert!(
1497 error
1498 .to_string()
1499 .contains("watcher registry missing member 'missing'")
1500 );
1501 }
1502
1503 #[test]
1504 fn test_auto_dispatch_filters_idle_engineers_only() {
1505 let tmp = tempfile::tempdir().unwrap();
1506 let roles = vec![
1507 RoleDef {
1508 name: "architect".to_string(),
1509 role_type: RoleType::Architect,
1510 agent: Some("claude".to_string()),
1511 instances: 1,
1512 prompt: None,
1513 talks_to: vec![],
1514 channel: None,
1515 channel_config: None,
1516 nudge_interval_secs: None,
1517 receives_standup: None,
1518 standup_interval_secs: None,
1519 owns: Vec::new(),
1520 use_worktrees: false,
1521 },
1522 RoleDef {
1523 name: "manager".to_string(),
1524 role_type: RoleType::Manager,
1525 agent: Some("claude".to_string()),
1526 instances: 1,
1527 prompt: None,
1528 talks_to: vec![],
1529 channel: None,
1530 channel_config: None,
1531 nudge_interval_secs: None,
1532 receives_standup: None,
1533 standup_interval_secs: None,
1534 owns: Vec::new(),
1535 use_worktrees: false,
1536 },
1537 RoleDef {
1538 name: "eng-1".to_string(),
1539 role_type: RoleType::Engineer,
1540 agent: Some("claude".to_string()),
1541 instances: 1,
1542 prompt: None,
1543 talks_to: vec![],
1544 channel: None,
1545 channel_config: None,
1546 nudge_interval_secs: None,
1547 receives_standup: None,
1548 standup_interval_secs: None,
1549 owns: Vec::new(),
1550 use_worktrees: false,
1551 },
1552 RoleDef {
1553 name: "eng-2".to_string(),
1554 role_type: RoleType::Engineer,
1555 agent: Some("claude".to_string()),
1556 instances: 1,
1557 prompt: None,
1558 talks_to: vec![],
1559 channel: None,
1560 channel_config: None,
1561 nudge_interval_secs: None,
1562 receives_standup: None,
1563 standup_interval_secs: None,
1564 owns: Vec::new(),
1565 use_worktrees: false,
1566 },
1567 ];
1568 let members = vec![
1569 MemberInstance {
1570 name: "architect".to_string(),
1571 role_name: "architect".to_string(),
1572 role_type: RoleType::Architect,
1573 agent: Some("claude".to_string()),
1574 prompt: None,
1575 reports_to: None,
1576 use_worktrees: false,
1577 },
1578 MemberInstance {
1579 name: "manager".to_string(),
1580 role_name: "manager".to_string(),
1581 role_type: RoleType::Manager,
1582 agent: Some("claude".to_string()),
1583 prompt: None,
1584 reports_to: Some("architect".to_string()),
1585 use_worktrees: false,
1586 },
1587 MemberInstance {
1588 name: "eng-1".to_string(),
1589 role_name: "eng-1".to_string(),
1590 role_type: RoleType::Engineer,
1591 agent: Some("claude".to_string()),
1592 prompt: None,
1593 reports_to: Some("manager".to_string()),
1594 use_worktrees: false,
1595 },
1596 MemberInstance {
1597 name: "eng-2".to_string(),
1598 role_name: "eng-2".to_string(),
1599 role_type: RoleType::Engineer,
1600 agent: Some("claude".to_string()),
1601 prompt: None,
1602 reports_to: Some("manager".to_string()),
1603 use_worktrees: false,
1604 },
1605 ];
1606
1607 let mut daemon = TeamDaemon::new(DaemonConfig {
1608 project_root: tmp.path().to_path_buf(),
1609 team_config: TeamConfig {
1610 name: "test".to_string(),
1611 agent: None,
1612 workflow_mode: WorkflowMode::Legacy,
1613 workflow_policy: WorkflowPolicy::default(),
1614 board: BoardConfig::default(),
1615 standup: StandupConfig::default(),
1616 automation: AutomationConfig::default(),
1617 automation_sender: None,
1618 external_senders: Vec::new(),
1619 orchestrator_pane: true,
1620 orchestrator_position: OrchestratorPosition::Bottom,
1621 layout: None,
1622 cost: Default::default(),
1623 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1624 retro_min_duration_secs: 60,
1625 roles,
1626 },
1627 session: "test".to_string(),
1628 members,
1629 pane_map: HashMap::new(),
1630 })
1631 .unwrap();
1632
1633 daemon
1634 .states
1635 .insert("architect".to_string(), MemberState::Idle);
1636 daemon
1637 .states
1638 .insert("manager".to_string(), MemberState::Idle);
1639 daemon.states.insert("eng-1".to_string(), MemberState::Idle);
1640 daemon
1641 .states
1642 .insert("eng-2".to_string(), MemberState::Working);
1643
1644 let board_dir = tmp.path().join(".batty").join("team_config").join("board");
1645 let tasks_dir = board_dir.join("tasks");
1646 std::fs::create_dir_all(&tasks_dir).unwrap();
1647 std::fs::write(
1648 tasks_dir.join("001-auto-task.md"),
1649 "---\nid: 1\ntitle: auto-task\nstatus: todo\npriority: high\nclass: standard\n---\n\nTask description.\n",
1650 )
1651 .unwrap();
1652
1653 assert_eq!(daemon.idle_engineer_names(), vec!["eng-1".to_string()]);
1654 let task = next_unclaimed_task(&board_dir).unwrap().unwrap();
1655 assert_eq!(task.id, 1);
1656 }
1657
1658 #[test]
1659 fn test_maybe_auto_dispatch_respects_rate_limit() {
1660 let tmp = tempfile::tempdir().unwrap();
1661 let mut daemon = TestDaemonBuilder::new(tmp.path()).build();
1662
1663 let before = daemon.last_auto_dispatch;
1664 daemon.maybe_auto_dispatch().unwrap();
1665 assert_eq!(daemon.last_auto_dispatch, before);
1666 }
1667
1668 #[test]
1669 fn test_maybe_auto_dispatch_skips_when_disabled() {
1670 let tmp = tempfile::tempdir().unwrap();
1671 let mut daemon = TestDaemonBuilder::new(tmp.path())
1672 .board(BoardConfig {
1673 auto_dispatch: false,
1674 ..BoardConfig::default()
1675 })
1676 .build();
1677 daemon.last_auto_dispatch = Instant::now() - Duration::from_secs(30);
1678
1679 let before = daemon.last_auto_dispatch;
1680 daemon.maybe_auto_dispatch().unwrap();
1681 assert_eq!(daemon.last_auto_dispatch, before);
1682 }
1683
1684 #[test]
1685 #[serial]
1686 #[cfg_attr(not(feature = "integration"), ignore)]
1687 fn daemon_lifecycle_happy_path_exercises_decomposed_modules() {
1688 let session = format!("batty-test-daemon-lifecycle-{}", std::process::id());
1689 let _ = crate::tmux::kill_session(&session);
1690
1691 let tmp = tempfile::tempdir().unwrap();
1692 let repo = init_git_repo(&tmp, "batty-daemon-lifecycle");
1693 write_open_task_file(&repo, 42, "lifecycle-task", "todo");
1694
1695 let member_name = "eng-lifecycle";
1696 let (fake_bin, fake_log) = setup_fake_codex(&repo, tmp.path(), member_name);
1697
1698 crate::tmux::create_session(&session, "bash", &[], repo.to_string_lossy().as_ref())
1699 .unwrap();
1700 crate::tmux::create_window(
1701 &session,
1702 "keeper",
1703 "sleep",
1704 &["30".to_string()],
1705 repo.to_string_lossy().as_ref(),
1706 )
1707 .unwrap();
1708 let pane_id = crate::tmux::pane_id(&session).unwrap();
1709
1710 let engineer = MemberInstance {
1711 name: member_name.to_string(),
1712 role_name: "engineer".to_string(),
1713 role_type: RoleType::Engineer,
1714 agent: Some("codex".to_string()),
1715 prompt: None,
1716 reports_to: None,
1717 use_worktrees: true,
1718 };
1719 let mut daemon = TeamDaemon::new(DaemonConfig {
1720 project_root: repo.clone(),
1721 team_config: TeamConfig {
1722 name: "test".to_string(),
1723 agent: None,
1724 workflow_mode: WorkflowMode::Legacy,
1725 workflow_policy: WorkflowPolicy::default(),
1726 board: BoardConfig::default(),
1727 standup: StandupConfig::default(),
1728 automation: AutomationConfig::default(),
1729 automation_sender: None,
1730 external_senders: Vec::new(),
1731 orchestrator_pane: true,
1732 orchestrator_position: OrchestratorPosition::Bottom,
1733 layout: None,
1734 cost: Default::default(),
1735 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
1736 retro_min_duration_secs: 60,
1737 roles: Vec::new(),
1738 },
1739 session: session.clone(),
1740 members: vec![engineer],
1741 pane_map: HashMap::from([(member_name.to_string(), pane_id)]),
1742 })
1743 .unwrap();
1744 daemon.spawn_all_agents(false).unwrap();
1745 let spawn_log = wait_for_log_contains(&fake_log, "PWD:");
1746 assert!(spawn_log.contains("PWD:"));
1747 std::thread::sleep(Duration::from_millis(1200));
1748
1749 let assignment = "Task #42: lifecycle-task\n\nTask description.";
1750 daemon
1751 .assign_task_with_task_id(member_name, assignment, Some(42))
1752 .unwrap();
1753 daemon.active_tasks.insert(member_name.to_string(), 42);
1754 assert_eq!(daemon.active_task_id(member_name), Some(42));
1755 assert_eq!(daemon.states.get(member_name), Some(&MemberState::Working));
1756
1757 let worktree_dir = repo.join(".batty").join("worktrees").join(member_name);
1758 assert!(worktree_dir.exists());
1759 assert_eq!(
1760 crate::team::test_support::git_stdout(&worktree_dir, &["branch", "--show-current"]),
1761 format!("{member_name}/42")
1762 );
1763
1764 let codex_cwd = worktree_dir
1765 .join(".batty")
1766 .join("codex-context")
1767 .join(member_name);
1768 let session_file = write_codex_session_meta(&codex_cwd);
1769
1770 daemon.run_loop_step("poll_watchers", |daemon| daemon.poll_watchers());
1771 daemon.run_loop_step("sync_launch_state_session_ids", |daemon| {
1772 daemon.sync_launch_state_session_ids()
1773 });
1774
1775 std::fs::write(worktree_dir.join("note.txt"), "done\n").unwrap();
1776 crate::team::test_support::git_ok(&worktree_dir, &["add", "note.txt"]);
1777 crate::team::test_support::git_ok(&worktree_dir, &["commit", "-m", "finish task"]);
1778 append_codex_task_complete(&session_file);
1779
1780 daemon.run_loop_step("poll_watchers", |daemon| daemon.poll_watchers());
1781
1782 assert_eq!(daemon.active_task_id(member_name), None);
1783 assert_eq!(daemon.states.get(member_name), Some(&MemberState::Idle));
1784 assert_eq!(
1785 std::fs::read_to_string(repo.join("note.txt")).unwrap(),
1786 "done\n"
1787 );
1788
1789 let events = crate::team::events::read_events(
1790 &repo.join(".batty").join("team_config").join("events.jsonl"),
1791 )
1792 .unwrap();
1793 assert!(events.iter().any(|event| {
1794 event.event == "task_assigned"
1795 && event.role.as_deref() == Some(member_name)
1796 && event
1797 .task
1798 .as_deref()
1799 .is_some_and(|task| task.contains("Task #42: lifecycle-task"))
1800 }));
1801 assert!(events.iter().any(|event| {
1802 event.event == "task_completed" && event.role.as_deref() == Some(member_name)
1803 }));
1804
1805 let launch_state = load_launch_state(&repo);
1806 let identity = launch_state.get(member_name).expect("missing launch state");
1807 assert_eq!(identity.agent, "codex-cli");
1808 assert_eq!(
1809 identity.session_id.as_deref(),
1810 session_file.file_stem().and_then(|stem| stem.to_str())
1811 );
1812
1813 crate::tmux::kill_session(&session).unwrap();
1814 let _ = std::fs::remove_file(&session_file);
1815 let _ = std::fs::remove_dir_all(&fake_bin);
1816 }
1817 #[test]
1818 #[serial]
1819 #[cfg_attr(not(feature = "integration"), ignore)]
1820 fn maybe_fire_nudges_marks_member_working_after_live_delivery() {
1821 let session = "batty-test-nudge-live-delivery";
1822 let mut delivered_live = false;
1823
1824 for _attempt in 0..5 {
1828 let _ = crate::tmux::kill_session(session);
1829
1830 crate::tmux::create_session(session, "cat", &[], "/tmp").unwrap();
1831 let pane_id = crate::tmux::pane_id(session).unwrap();
1832 std::thread::sleep(Duration::from_millis(300));
1833
1834 let tmp = tempfile::tempdir().unwrap();
1835 let mut watchers = HashMap::new();
1836 let mut scientist_watcher = SessionWatcher::new(&pane_id, "scientist", 300, None);
1837 scientist_watcher.confirm_ready();
1838 watchers.insert("scientist".to_string(), scientist_watcher);
1839 let mut daemon = TestDaemonBuilder::new(tmp.path())
1840 .session(session)
1841 .members(vec![architect_member("scientist")])
1842 .pane_map(HashMap::from([("scientist".to_string(), pane_id.clone())]))
1843 .watchers(watchers)
1844 .states(HashMap::from([(
1845 "scientist".to_string(),
1846 MemberState::Idle,
1847 )]))
1848 .nudges(HashMap::from([(
1849 "scientist".to_string(),
1850 NudgeSchedule {
1851 text: "Please make progress.".to_string(),
1852 interval: Duration::from_secs(1),
1853 idle_since: Some(Instant::now() - Duration::from_secs(5)),
1854 fired_this_idle: false,
1855 paused: false,
1856 },
1857 )]))
1858 .build();
1859
1860 backdate_idle_grace(&mut daemon, "scientist");
1861 daemon.maybe_fire_nudges().unwrap();
1862
1863 if daemon.states.get("scientist") == Some(&MemberState::Working) {
1864 let schedule = daemon.nudges.get("scientist").unwrap();
1865 assert!(schedule.paused);
1866 assert!(schedule.idle_since.is_none());
1867 assert!(!schedule.fired_this_idle);
1868 delivered_live = true;
1869 crate::tmux::kill_session(session).unwrap();
1870 break;
1871 }
1872
1873 crate::tmux::kill_session(session).unwrap();
1874 std::thread::sleep(Duration::from_millis(100));
1875 }
1876
1877 assert!(
1878 delivered_live,
1879 "expected at least one successful live nudge delivery"
1880 );
1881 }
1882
1883 #[test]
1884 #[serial]
1885 #[cfg_attr(not(feature = "integration"), ignore)]
1886 fn maybe_intervene_triage_backlog_marks_member_working_after_live_delivery() {
1887 let session = format!("batty-test-triage-live-delivery-{}", std::process::id());
1888 let _ = crate::tmux::kill_session(&session);
1889
1890 crate::tmux::create_session(&session, "cat", &[], "/tmp").unwrap();
1891 let pane_id = crate::tmux::pane_id(&session).unwrap();
1892 std::thread::sleep(Duration::from_millis(300));
1893
1894 let tmp = tempfile::tempdir().unwrap();
1895 let mut watchers = HashMap::new();
1896 let mut lead_watcher = SessionWatcher::new(&pane_id, "lead", 300, None);
1897 lead_watcher.confirm_ready();
1898 watchers.insert("lead".to_string(), lead_watcher);
1899 let mut daemon = TestDaemonBuilder::new(tmp.path())
1900 .session(&session)
1901 .members(vec![
1902 manager_member("lead", Some("architect")),
1903 engineer_member("eng-1", Some("lead"), false),
1904 ])
1905 .pane_map(HashMap::from([("lead".to_string(), pane_id.clone())]))
1906 .watchers(watchers)
1907 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
1908 .build();
1909
1910 let root = inbox::inboxes_root(tmp.path());
1911 inbox::init_inbox(&root, "lead").unwrap();
1912 inbox::init_inbox(&root, "eng-1").unwrap();
1913 let mut result = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
1914 result.timestamp = super::now_unix();
1915 let id = inbox::deliver_to_inbox(&root, &result).unwrap();
1916 inbox::mark_delivered(&root, "lead", &id).unwrap();
1917
1918 daemon.update_automation_timers_for_state("lead", MemberState::Working);
1919 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
1920 backdate_idle_grace(&mut daemon, "lead");
1921 daemon.maybe_intervene_triage_backlog().unwrap();
1922
1923 assert_eq!(daemon.triage_interventions.get("lead"), Some(&1));
1924 if daemon.states.get("lead") == Some(&MemberState::Working) {
1925 let pane = (0..50)
1926 .find_map(|_| {
1927 let pane = tmux::capture_pane(&pane_id).unwrap_or_default();
1928 if pane.contains("batty inbox lead") {
1929 Some(pane)
1930 } else {
1931 std::thread::sleep(Duration::from_millis(200));
1932 None
1933 }
1934 })
1935 .unwrap_or_else(|| tmux::capture_pane(&pane_id).unwrap_or_default());
1936 assert!(pane.contains("batty inbox lead"));
1937 assert!(pane.contains("batty read lead <ref>"));
1938 assert!(pane.contains("batty send eng-1"));
1939 assert!(pane.contains("batty assign eng-1"));
1940 assert!(pane.contains("batty send architect"));
1941 assert!(pane.contains("next time you become idle"));
1942 } else {
1943 let pending = inbox::pending_messages(&root, "lead").unwrap();
1944 assert_eq!(pending.len(), 1);
1945 assert!(pending[0].body.contains("batty inbox lead"));
1946 }
1947
1948 crate::tmux::kill_session(&session).unwrap();
1949 }
1950
1951 #[test]
1952 fn maybe_intervene_triage_backlog_queues_when_live_delivery_falls_back_to_inbox() {
1953 let tmp = tempfile::tempdir().unwrap();
1954 let mut daemon = TestDaemonBuilder::new(tmp.path())
1955 .members(vec![
1956 manager_member("lead", Some("architect")),
1957 engineer_member("eng-1", Some("lead"), false),
1958 ])
1959 .pane_map(HashMap::from([(
1960 "lead".to_string(),
1961 "%9999999".to_string(),
1962 )]))
1963 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
1964 .build();
1965
1966 let root = inbox::inboxes_root(tmp.path());
1967 inbox::init_inbox(&root, "lead").unwrap();
1968 inbox::init_inbox(&root, "eng-1").unwrap();
1969 let mut result = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
1970 result.timestamp = super::now_unix();
1971 let id = inbox::deliver_to_inbox(&root, &result).unwrap();
1972 inbox::mark_delivered(&root, "lead", &id).unwrap();
1973
1974 daemon.update_automation_timers_for_state("lead", MemberState::Working);
1975 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
1976 backdate_idle_grace(&mut daemon, "lead");
1977 daemon.maybe_intervene_triage_backlog().unwrap();
1978
1979 assert_eq!(daemon.states.get("lead"), Some(&MemberState::Idle));
1980 assert_eq!(daemon.triage_interventions.get("lead"), Some(&1));
1981 let pending = inbox::pending_messages(&root, "lead").unwrap();
1982 assert_eq!(pending.len(), 1);
1983 assert_eq!(pending[0].from, "architect");
1984 assert!(pending[0].body.contains("Triage backlog detected"));
1985 assert!(pending[0].body.contains("batty inbox lead"));
1986 assert!(pending[0].body.contains("batty read lead <ref>"));
1987 assert!(pending[0].body.contains("batty send eng-1"));
1988 assert!(pending[0].body.contains("batty assign eng-1"));
1989 assert!(pending[0].body.contains("batty send architect"));
1990 assert!(pending[0].body.contains("next time you become idle"));
1991 }
1992
1993 #[test]
1994 fn maybe_intervene_triage_backlog_does_not_fire_on_startup_idle() {
1995 let tmp = tempfile::tempdir().unwrap();
1996 let mut daemon = TestDaemonBuilder::new(tmp.path())
1997 .members(vec![
1998 manager_member("lead", Some("architect")),
1999 engineer_member("eng-1", Some("lead"), false),
2000 ])
2001 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2002 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2003 .build();
2004
2005 let root = inbox::inboxes_root(tmp.path());
2006 inbox::init_inbox(&root, "lead").unwrap();
2007 inbox::init_inbox(&root, "eng-1").unwrap();
2008 let mut result = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
2009 result.timestamp = super::now_unix();
2010 let id = inbox::deliver_to_inbox(&root, &result).unwrap();
2011 inbox::mark_delivered(&root, "lead", &id).unwrap();
2012
2013 daemon.maybe_intervene_triage_backlog().unwrap();
2014
2015 assert!(!daemon.triage_interventions.contains_key("lead"));
2016 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2017 assert_eq!(daemon.states.get("lead"), Some(&MemberState::Idle));
2018 }
2019
2020 #[test]
2021 fn maybe_intervene_owned_tasks_queues_when_idle_member_owns_unfinished_task() {
2022 let tmp = tempfile::tempdir().unwrap();
2023 let mut daemon = TestDaemonBuilder::new(tmp.path())
2024 .members(vec![
2025 manager_member("lead", Some("architect")),
2026 engineer_member("eng-1", Some("lead"), false),
2027 ])
2028 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2029 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2030 .build();
2031
2032 let root = inbox::inboxes_root(tmp.path());
2033 inbox::init_inbox(&root, "lead").unwrap();
2034 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2035
2036 daemon.update_automation_timers_for_state("lead", MemberState::Working);
2037 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2038 backdate_idle_grace(&mut daemon, "lead");
2039 daemon.maybe_intervene_owned_tasks().unwrap();
2040
2041 assert_eq!(daemon.states.get("lead"), Some(&MemberState::Idle));
2042 assert_eq!(
2043 daemon
2044 .owned_task_interventions
2045 .get("lead")
2046 .map(|state| state.idle_epoch),
2047 Some(1)
2048 );
2049 let pending = inbox::pending_messages(&root, "lead").unwrap();
2050 assert_eq!(pending.len(), 1);
2051 assert_eq!(pending[0].from, "architect");
2052 assert!(pending[0].body.contains("Task #191"));
2053 assert!(
2054 pending[0]
2055 .body
2056 .contains("Owned active task backlog detected")
2057 );
2058 assert!(pending[0].body.contains("kanban-md list --dir"));
2059 assert!(pending[0].body.contains("kanban-md show --dir"));
2060 assert!(pending[0].body.contains("191"));
2061 assert!(pending[0].body.contains("sed -n '1,220p'"));
2062 assert!(pending[0].body.contains("batty assign eng-1"));
2063 assert!(pending[0].body.contains("batty send architect"));
2064 assert!(pending[0].body.contains("kanban-md move --dir"));
2065 assert!(pending[0].body.contains("next time you become idle"));
2066 }
2067
2068 #[test]
2069 fn maybe_intervene_owned_tasks_engineer_message_captures_initial_state() {
2070 let tmp = tempfile::tempdir().unwrap();
2071 let mut daemon = TestDaemonBuilder::new(tmp.path())
2072 .members(vec![
2073 manager_member("lead", Some("architect")),
2074 engineer_member("eng-1", Some("lead"), false),
2075 ])
2076 .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2077 .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2078 .build();
2079
2080 let root = inbox::inboxes_root(tmp.path());
2081 inbox::init_inbox(&root, "eng-1").unwrap();
2082 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2083
2084 daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2085 daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2086 backdate_idle_grace(&mut daemon, "eng-1");
2087 daemon.maybe_intervene_owned_tasks().unwrap();
2088
2089 let pending = inbox::pending_messages(&root, "eng-1").unwrap();
2090 assert_eq!(pending.len(), 1);
2091 assert_eq!(pending[0].from, "lead");
2092 assert!(
2093 pending[0]
2094 .body
2095 .contains("Owned active task backlog detected")
2096 );
2097 assert!(pending[0].body.contains("Task #191"));
2098 assert!(pending[0].body.contains("batty send lead"));
2099
2100 let state = daemon.owned_task_interventions.get("eng-1").unwrap();
2101 assert_eq!(state.idle_epoch, 1);
2102 assert_eq!(state.signature, "191:in-progress");
2103 assert!(!state.escalation_sent);
2104 }
2105
2106 #[test]
2107 fn maybe_intervene_owned_tasks_fires_for_persistent_startup_idle_state() {
2108 let tmp = tempfile::tempdir().unwrap();
2109 let mut daemon = TestDaemonBuilder::new(tmp.path())
2110 .members(vec![
2111 manager_member("lead", Some("architect")),
2112 engineer_member("eng-1", Some("lead"), false),
2113 ])
2114 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2115 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2116 .build();
2117
2118 let root = inbox::inboxes_root(tmp.path());
2119 inbox::init_inbox(&root, "lead").unwrap();
2120 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2121
2122 backdate_idle_grace(&mut daemon, "lead");
2123 daemon.maybe_intervene_owned_tasks().unwrap();
2124
2125 let pending = inbox::pending_messages(&root, "lead").unwrap();
2126 assert_eq!(pending.len(), 1);
2127 assert_eq!(pending[0].from, "architect");
2128 assert!(pending[0].body.contains("Task #191"));
2129 assert_eq!(
2130 daemon
2131 .owned_task_interventions
2132 .get("lead")
2133 .map(|state| state.idle_epoch),
2134 Some(0)
2135 );
2136 assert_eq!(daemon.states.get("lead"), Some(&MemberState::Idle));
2137 }
2138
2139 #[test]
2140 fn maybe_intervene_owned_tasks_waits_for_idle_grace() {
2141 let tmp = tempfile::tempdir().unwrap();
2142 let mut daemon = TestDaemonBuilder::new(tmp.path())
2143 .members(vec![manager_member("lead", Some("architect"))])
2144 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2145 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2146 .build();
2147
2148 let root = inbox::inboxes_root(tmp.path());
2149 inbox::init_inbox(&root, "lead").unwrap();
2150 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2151
2152 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2153 daemon.maybe_intervene_owned_tasks().unwrap();
2154 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2155
2156 backdate_idle_grace(&mut daemon, "lead");
2157 daemon.maybe_intervene_owned_tasks().unwrap();
2158 assert_eq!(inbox::pending_messages(&root, "lead").unwrap().len(), 1);
2159 }
2160
2161 #[test]
2162 fn maybe_intervene_owned_tasks_skips_when_pending_inbox_exists() {
2163 let tmp = tempfile::tempdir().unwrap();
2164 let mut daemon = TestDaemonBuilder::new(tmp.path())
2165 .members(vec![manager_member("lead", Some("architect"))])
2166 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2167 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2168 .build();
2169
2170 let root = inbox::inboxes_root(tmp.path());
2171 inbox::init_inbox(&root, "lead").unwrap();
2172 let message = inbox::InboxMessage::new_send("architect", "lead", "Check this first.");
2173 inbox::deliver_to_inbox(&root, &message).unwrap();
2174 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2175
2176 backdate_idle_grace(&mut daemon, "lead");
2177 daemon.maybe_intervene_owned_tasks().unwrap();
2178
2179 let pending = inbox::pending_messages(&root, "lead").unwrap();
2180 assert_eq!(pending.len(), 1);
2181 assert_eq!(pending[0].from, "architect");
2182 assert!(
2183 !daemon.owned_task_interventions.contains_key("lead"),
2184 "pending inbox should block new interventions"
2185 );
2186 }
2187
2188 #[test]
2189 fn maybe_intervene_owned_tasks_ignores_review_only_claims() {
2190 let tmp = tempfile::tempdir().unwrap();
2191 let mut daemon = TestDaemonBuilder::new(tmp.path())
2192 .members(vec![manager_member("lead", Some("architect"))])
2193 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2194 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2195 .build();
2196
2197 let root = inbox::inboxes_root(tmp.path());
2198 inbox::init_inbox(&root, "lead").unwrap();
2199 write_owned_task_file(tmp.path(), 191, "review-task", "review", "lead");
2200
2201 daemon.update_automation_timers_for_state("lead", MemberState::Working);
2202 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2203 daemon.maybe_intervene_owned_tasks().unwrap();
2204
2205 assert!(!daemon.owned_task_interventions.contains_key("lead"));
2206 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2207 }
2208
2209 #[test]
2210 fn maybe_intervene_owned_tasks_dedupes_same_active_signature_across_idle_epochs() {
2211 let tmp = tempfile::tempdir().unwrap();
2212 let mut daemon = TestDaemonBuilder::new(tmp.path())
2213 .members(vec![manager_member("lead", Some("architect"))])
2214 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2215 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2216 .build();
2217
2218 let root = inbox::inboxes_root(tmp.path());
2219 inbox::init_inbox(&root, "lead").unwrap();
2220 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2221
2222 daemon.update_automation_timers_for_state("lead", MemberState::Working);
2223 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2224 backdate_idle_grace(&mut daemon, "lead");
2225 daemon.maybe_intervene_owned_tasks().unwrap();
2226
2227 daemon
2228 .states
2229 .insert("lead".to_string(), MemberState::Working);
2230 daemon.update_automation_timers_for_state("lead", MemberState::Working);
2231 daemon.states.insert("lead".to_string(), MemberState::Idle);
2232 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2233 backdate_idle_grace(&mut daemon, "lead");
2234 daemon.maybe_intervene_owned_tasks().unwrap();
2235
2236 let pending = inbox::pending_messages(&root, "lead").unwrap();
2237 assert_eq!(pending.len(), 1);
2238 assert_eq!(
2239 daemon
2240 .owned_task_interventions
2241 .get("lead")
2242 .map(|state| state.idle_epoch),
2243 Some(2)
2244 );
2245 }
2246
2247 fn backdate_intervention_cooldown(daemon: &mut TeamDaemon, key: &str) {
2248 let cooldown = Duration::from_secs(
2249 daemon
2250 .config
2251 .team_config
2252 .automation
2253 .intervention_cooldown_secs,
2254 ) + Duration::from_secs(1);
2255 daemon
2256 .intervention_cooldowns
2257 .insert(key.to_string(), Instant::now() - cooldown);
2258 }
2259
2260 #[test]
2261 fn owned_task_intervention_updates_signature_when_board_state_changes() {
2262 let tmp = tempfile::tempdir().unwrap();
2263 let mut daemon = TestDaemonBuilder::new(tmp.path())
2264 .members(vec![
2265 manager_member("lead", Some("architect")),
2266 engineer_member("eng-1", Some("lead"), false),
2267 ])
2268 .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2269 .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2270 .build();
2271
2272 let root = inbox::inboxes_root(tmp.path());
2273 inbox::init_inbox(&root, "eng-1").unwrap();
2274 write_owned_task_file(tmp.path(), 191, "first-task", "in-progress", "eng-1");
2275
2276 daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2277 daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2278 backdate_idle_grace(&mut daemon, "eng-1");
2279 daemon.maybe_intervene_owned_tasks().unwrap();
2280
2281 let initial = daemon.owned_task_interventions.get("eng-1").unwrap();
2282 assert_eq!(initial.signature, "191:in-progress");
2283
2284 for message in inbox::pending_messages(&root, "eng-1").unwrap() {
2285 inbox::mark_delivered(&root, "eng-1", &message.id).unwrap();
2286 }
2287
2288 write_owned_task_file(tmp.path(), 192, "second-task", "in-progress", "eng-1");
2289 backdate_intervention_cooldown(&mut daemon, "eng-1");
2290 daemon.maybe_intervene_owned_tasks().unwrap();
2291
2292 let pending = inbox::pending_messages(&root, "eng-1").unwrap();
2293 assert_eq!(pending.len(), 1);
2294 assert!(pending[0].body.contains("Task #191"));
2295 assert!(pending[0].body.contains("#192 (in-progress) second-task"));
2296
2297 let updated = daemon.owned_task_interventions.get("eng-1").unwrap();
2298 assert_eq!(updated.signature, "191:in-progress|192:in-progress");
2299 assert!(!updated.escalation_sent);
2300 }
2301
2302 #[test]
2303 fn owned_task_intervention_respects_cooldown() {
2304 let tmp = tempfile::tempdir().unwrap();
2305 let mut daemon = TestDaemonBuilder::new(tmp.path())
2306 .members(vec![manager_member("lead", Some("architect"))])
2307 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2308 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2309 .build();
2310
2311 let root = inbox::inboxes_root(tmp.path());
2312 inbox::init_inbox(&root, "lead").unwrap();
2313 write_owned_task_file(tmp.path(), 191, "first-task", "in-progress", "lead");
2314
2315 backdate_idle_grace(&mut daemon, "lead");
2317 daemon.maybe_intervene_owned_tasks().unwrap();
2318 let pending = inbox::pending_messages(&root, "lead").unwrap();
2319 assert_eq!(pending.len(), 1, "first intervention should fire");
2320
2321 for msg in pending {
2323 inbox::mark_delivered(&root, "lead", &msg.id).unwrap();
2324 }
2325
2326 write_owned_task_file(tmp.path(), 192, "second-task", "in-progress", "lead");
2328 daemon.maybe_intervene_owned_tasks().unwrap();
2329 let pending = inbox::pending_messages(&root, "lead").unwrap();
2330 assert_eq!(pending.len(), 0, "cooldown should prevent refire");
2331
2332 backdate_intervention_cooldown(&mut daemon, "lead");
2334 daemon.maybe_intervene_owned_tasks().unwrap();
2335 let pending = inbox::pending_messages(&root, "lead").unwrap();
2336 assert_eq!(pending.len(), 1, "should fire after cooldown expires");
2337 }
2338
2339 #[test]
2340 fn triage_intervention_respects_cooldown() {
2341 let tmp = tempfile::tempdir().unwrap();
2342 let mut daemon = TestDaemonBuilder::new(tmp.path())
2343 .members(vec![
2344 manager_member("lead", Some("architect")),
2345 engineer_member("eng-1", Some("lead"), false),
2346 ])
2347 .pane_map(HashMap::from([
2348 ("lead".to_string(), "%999".to_string()),
2349 ("eng-1".to_string(), "%998".to_string()),
2350 ]))
2351 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2352 .build();
2353 daemon.triage_idle_epochs = HashMap::from([("lead".to_string(), 1)]);
2354
2355 let root = inbox::inboxes_root(tmp.path());
2356 inbox::init_inbox(&root, "lead").unwrap();
2357 inbox::init_inbox(&root, "eng-1").unwrap();
2358
2359 let msg = inbox::InboxMessage::new_send("eng-1", "lead", "done with task 42");
2361 let msg_id = inbox::deliver_to_inbox(&root, &msg).unwrap();
2362 inbox::mark_delivered(&root, "lead", &msg_id).unwrap();
2363
2364 backdate_idle_grace(&mut daemon, "lead");
2366 daemon.maybe_intervene_triage_backlog().unwrap();
2367 let pending = inbox::pending_messages(&root, "lead").unwrap();
2368 assert_eq!(pending.len(), 1, "first triage intervention should fire");
2369
2370 for p in pending {
2372 inbox::mark_delivered(&root, "lead", &p.id).unwrap();
2373 }
2374
2375 daemon.update_automation_timers_for_state("lead", MemberState::Working);
2377 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2378 backdate_idle_grace(&mut daemon, "lead");
2379
2380 daemon.maybe_intervene_triage_backlog().unwrap();
2382 let pending = inbox::pending_messages(&root, "lead").unwrap();
2383 assert_eq!(pending.len(), 0, "cooldown should prevent triage refire");
2384
2385 backdate_intervention_cooldown(&mut daemon, "triage::lead");
2387 daemon.maybe_intervene_triage_backlog().unwrap();
2388 let pending = inbox::pending_messages(&root, "lead").unwrap();
2389 assert_eq!(
2390 pending.len(),
2391 1,
2392 "triage should fire after cooldown expires"
2393 );
2394 }
2395
2396 #[test]
2397 fn maybe_intervene_owned_tasks_escalates_stuck_signature_to_parent() {
2398 let tmp = tempfile::tempdir().unwrap();
2399 let events_path = tmp.path().join("events.jsonl");
2400 let mut daemon = TestDaemonBuilder::new(tmp.path())
2401 .members(vec![
2402 manager_member("lead", Some("architect")),
2403 engineer_member("eng-1", Some("lead"), false),
2404 ])
2405 .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2406 .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2407 .workflow_policy(WorkflowPolicy {
2408 escalation_threshold_secs: 120,
2409 ..WorkflowPolicy::default()
2410 })
2411 .build();
2412 daemon.event_sink = EventSink::new(&events_path).unwrap();
2413
2414 let root = inbox::inboxes_root(tmp.path());
2415 inbox::init_inbox(&root, "eng-1").unwrap();
2416 inbox::init_inbox(&root, "lead").unwrap();
2417 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2418
2419 daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2420 daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2421 backdate_idle_grace(&mut daemon, "eng-1");
2422 daemon.maybe_intervene_owned_tasks().unwrap();
2423
2424 let state = daemon.owned_task_interventions.get_mut("eng-1").unwrap();
2425 state.detected_at = Instant::now() - Duration::from_secs(121);
2426
2427 daemon.maybe_intervene_owned_tasks().unwrap();
2428
2429 let engineer_pending = inbox::pending_messages(&root, "eng-1").unwrap();
2430 assert_eq!(engineer_pending.len(), 1);
2431 let lead_pending = inbox::pending_messages(&root, "lead").unwrap();
2432 assert_eq!(lead_pending.len(), 1);
2433 assert_eq!(lead_pending[0].from, "daemon");
2434 assert!(lead_pending[0].body.contains("Stuck task escalation"));
2435 assert!(lead_pending[0].body.contains("eng-1"));
2436 assert!(lead_pending[0].body.contains("Task #191"));
2437 assert!(lead_pending[0].body.contains("kanban-md edit --dir"));
2438 assert!(lead_pending[0].body.contains("batty assign eng-1"));
2439 assert!(
2440 daemon
2441 .owned_task_interventions
2442 .get("eng-1")
2443 .is_some_and(|state| state.escalation_sent)
2444 );
2445
2446 let events = super::super::events::read_events(&events_path).unwrap();
2447 assert!(
2448 events.iter().any(|event| {
2449 event.event == "task_escalated"
2450 && event.role.as_deref() == Some("eng-1")
2451 && event.task.as_deref() == Some("191")
2452 }),
2453 "expected task_escalated event for stuck owned task"
2454 );
2455 }
2456
2457 #[test]
2458 fn maybe_intervene_owned_tasks_only_escalates_stuck_signature_once() {
2459 let tmp = tempfile::tempdir().unwrap();
2460 let events_path = tmp.path().join("events.jsonl");
2461 let mut daemon = TestDaemonBuilder::new(tmp.path())
2462 .members(vec![
2463 manager_member("lead", Some("architect")),
2464 engineer_member("eng-1", Some("lead"), false),
2465 ])
2466 .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2467 .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2468 .workflow_policy(WorkflowPolicy {
2469 escalation_threshold_secs: 120,
2470 ..WorkflowPolicy::default()
2471 })
2472 .build();
2473 daemon.event_sink = EventSink::new(&events_path).unwrap();
2474
2475 let root = inbox::inboxes_root(tmp.path());
2476 inbox::init_inbox(&root, "eng-1").unwrap();
2477 inbox::init_inbox(&root, "lead").unwrap();
2478 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2479
2480 daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2481 daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2482 backdate_idle_grace(&mut daemon, "eng-1");
2483 daemon.maybe_intervene_owned_tasks().unwrap();
2484
2485 let state = daemon.owned_task_interventions.get_mut("eng-1").unwrap();
2486 state.detected_at = Instant::now() - Duration::from_secs(121);
2487
2488 daemon.maybe_intervene_owned_tasks().unwrap();
2489 daemon.maybe_intervene_owned_tasks().unwrap();
2490
2491 let lead_pending = inbox::pending_messages(&root, "lead").unwrap();
2492 assert_eq!(lead_pending.len(), 1);
2493 assert!(lead_pending[0].body.contains("Stuck task escalation"));
2494 assert!(
2495 daemon
2496 .owned_task_interventions
2497 .get("eng-1")
2498 .is_some_and(|state| state.escalation_sent)
2499 );
2500
2501 let events = super::super::events::read_events(&events_path).unwrap();
2502 assert_eq!(
2503 events
2504 .iter()
2505 .filter(|event| {
2506 event.event == "task_escalated"
2507 && event.role.as_deref() == Some("eng-1")
2508 && event.task.as_deref() == Some("191")
2509 })
2510 .count(),
2511 1
2512 );
2513 }
2514
2515 #[test]
2516 fn maybe_intervene_owned_tasks_waits_for_escalation_threshold() {
2517 let tmp = tempfile::tempdir().unwrap();
2518 let mut daemon = TestDaemonBuilder::new(tmp.path())
2519 .members(vec![
2520 manager_member("lead", Some("architect")),
2521 engineer_member("eng-1", Some("lead"), false),
2522 ])
2523 .pane_map(HashMap::from([("eng-1".to_string(), "%999".to_string())]))
2524 .states(HashMap::from([("eng-1".to_string(), MemberState::Idle)]))
2525 .workflow_policy(WorkflowPolicy {
2526 escalation_threshold_secs: 120,
2527 ..WorkflowPolicy::default()
2528 })
2529 .build();
2530
2531 let root = inbox::inboxes_root(tmp.path());
2532 inbox::init_inbox(&root, "eng-1").unwrap();
2533 inbox::init_inbox(&root, "lead").unwrap();
2534 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2535
2536 daemon.update_automation_timers_for_state("eng-1", MemberState::Working);
2537 daemon.update_automation_timers_for_state("eng-1", MemberState::Idle);
2538 backdate_idle_grace(&mut daemon, "eng-1");
2539 daemon.maybe_intervene_owned_tasks().unwrap();
2540
2541 let state = daemon.owned_task_interventions.get_mut("eng-1").unwrap();
2542 state.detected_at = Instant::now() - Duration::from_secs(119);
2543
2544 daemon.maybe_intervene_owned_tasks().unwrap();
2545
2546 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2547 assert!(
2548 daemon
2549 .owned_task_interventions
2550 .get("eng-1")
2551 .is_some_and(|state| !state.escalation_sent)
2552 );
2553 }
2554
2555 #[test]
2556 fn maybe_intervene_review_backlog_queues_for_idle_manager_with_branch_and_worktree_context() {
2557 let tmp = tempfile::tempdir().unwrap();
2558 let repo = init_git_repo(&tmp, "batty-daemon-test");
2559 let team_config_dir = repo.join(".batty").join("team_config");
2560 let worktree_dir = repo.join(".batty").join("worktrees").join("eng-1");
2561 setup_engineer_worktree(&repo, &worktree_dir, "eng-1", &team_config_dir).unwrap();
2562 write_owned_task_file(&repo, 191, "review-task", "review", "eng-1");
2563
2564 let mut daemon = TestDaemonBuilder::new(&repo)
2565 .members(vec![
2566 manager_member("lead", Some("architect")),
2567 engineer_member("eng-1", Some("lead"), true),
2568 ])
2569 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2570 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2571 .build();
2572
2573 let root = inbox::inboxes_root(&repo);
2574 inbox::init_inbox(&root, "lead").unwrap();
2575
2576 daemon.update_automation_timers_for_state("lead", MemberState::Working);
2577 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
2578 backdate_idle_grace(&mut daemon, "lead");
2579 daemon.maybe_intervene_review_backlog().unwrap();
2580
2581 let pending = inbox::pending_messages(&root, "lead").unwrap();
2582 assert_eq!(pending.len(), 1);
2583 assert_eq!(pending[0].from, "architect");
2584 assert!(pending[0].body.contains("Review backlog detected"));
2585 assert!(pending[0].body.contains("#191 by eng-1"));
2586 assert!(pending[0].body.contains("batty inbox lead"));
2587 assert!(pending[0].body.contains("batty read lead <ref>"));
2588 assert!(pending[0].body.contains("batty merge eng-1"));
2589 assert!(pending[0].body.contains("kanban-md move --dir"));
2590 assert!(pending[0].body.contains("191 done"));
2591 assert!(pending[0].body.contains("191 archived"));
2592 assert!(pending[0].body.contains("191 in-progress"));
2593 assert!(pending[0].body.contains("batty assign eng-1"));
2594 assert!(pending[0].body.contains("batty send architect"));
2595 assert!(
2596 pending[0]
2597 .body
2598 .contains(worktree_dir.to_string_lossy().as_ref())
2599 );
2600 assert!(pending[0].body.contains("branch: eng-1"));
2601 assert_eq!(
2602 daemon
2603 .owned_task_interventions
2604 .get("review::lead")
2605 .map(|state| state.idle_epoch),
2606 Some(1)
2607 );
2608 }
2609
2610 #[test]
2611 fn maybe_intervene_review_backlog_does_not_fire_on_startup_idle() {
2612 let tmp = tempfile::tempdir().unwrap();
2613 write_owned_task_file(tmp.path(), 191, "review-task", "review", "eng-1");
2614
2615 let mut daemon = TestDaemonBuilder::new(tmp.path())
2616 .members(vec![
2617 manager_member("lead", Some("architect")),
2618 engineer_member("eng-1", Some("lead"), false),
2619 ])
2620 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2621 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2622 .build();
2623
2624 let root = inbox::inboxes_root(tmp.path());
2625 inbox::init_inbox(&root, "lead").unwrap();
2626
2627 daemon.maybe_intervene_review_backlog().unwrap();
2628
2629 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2630 assert!(!daemon.owned_task_interventions.contains_key("review::lead"));
2631 }
2632
2633 #[test]
2634 fn maybe_intervene_manager_dispatch_gap_queues_for_idle_lead_with_idle_reports() {
2635 let tmp = tempfile::tempdir().unwrap();
2636 let mut daemon = TestDaemonBuilder::new(tmp.path())
2637 .members(vec![
2638 architect_member("architect"),
2639 manager_member("lead", Some("architect")),
2640 engineer_member("eng-1", Some("lead"), false),
2641 engineer_member("eng-2", Some("lead"), false),
2642 ])
2643 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2644 .states(HashMap::from([
2645 ("lead".to_string(), MemberState::Idle),
2646 ("eng-1".to_string(), MemberState::Idle),
2647 ("eng-2".to_string(), MemberState::Idle),
2648 ]))
2649 .build();
2650
2651 let root = inbox::inboxes_root(tmp.path());
2652 inbox::init_inbox(&root, "lead").unwrap();
2653 inbox::init_inbox(&root, "eng-1").unwrap();
2654 inbox::init_inbox(&root, "eng-2").unwrap();
2655 write_owned_task_file(tmp.path(), 191, "active-task", "in-progress", "eng-1");
2656 let tasks_dir = tmp
2657 .path()
2658 .join(".batty")
2659 .join("team_config")
2660 .join("board")
2661 .join("tasks");
2662 std::fs::write(
2663 tasks_dir.join("192-open-task.md"),
2664 "---\nid: 192\ntitle: open-task\nstatus: todo\npriority: high\nclass: standard\n---\n\nTask description.\n",
2665 )
2666 .unwrap();
2667
2668 backdate_idle_grace(&mut daemon, "lead");
2669 daemon.maybe_intervene_manager_dispatch_gap().unwrap();
2670
2671 let pending = inbox::pending_messages(&root, "lead").unwrap();
2672 assert_eq!(pending.len(), 1);
2673 assert_eq!(pending[0].from, "architect");
2674 assert!(pending[0].body.contains("Dispatch recovery needed"));
2675 assert!(pending[0].body.contains("eng-1 on #191"));
2676 assert!(pending[0].body.contains("eng-2"));
2677 assert!(pending[0].body.contains("batty status"));
2678 assert!(pending[0].body.contains("batty send eng-1"));
2679 assert!(pending[0].body.contains("batty assign eng-2"));
2680 assert!(pending[0].body.contains("batty send architect"));
2681 assert!(
2682 daemon
2683 .owned_task_interventions
2684 .contains_key("dispatch::lead")
2685 );
2686 }
2687
2688 #[test]
2689 fn maybe_intervene_architect_utilization_queues_for_underloaded_idle_architect() {
2690 let tmp = tempfile::tempdir().unwrap();
2691 let mut daemon = TestDaemonBuilder::new(tmp.path())
2692 .members(vec![
2693 architect_member("architect"),
2694 manager_member("lead", Some("architect")),
2695 engineer_member("eng-1", Some("lead"), false),
2696 engineer_member("eng-2", Some("lead"), false),
2697 ])
2698 .pane_map(HashMap::from([(
2699 "architect".to_string(),
2700 "%999".to_string(),
2701 )]))
2702 .states(HashMap::from([
2703 ("architect".to_string(), MemberState::Idle),
2704 ("lead".to_string(), MemberState::Idle),
2705 ("eng-1".to_string(), MemberState::Idle),
2706 ("eng-2".to_string(), MemberState::Idle),
2707 ]))
2708 .build();
2709
2710 let root = inbox::inboxes_root(tmp.path());
2711 inbox::init_inbox(&root, "architect").unwrap();
2712 write_owned_task_file(tmp.path(), 191, "active-task", "in-progress", "eng-1");
2713 let tasks_dir = tmp
2714 .path()
2715 .join(".batty")
2716 .join("team_config")
2717 .join("board")
2718 .join("tasks");
2719 std::fs::write(
2720 tasks_dir.join("192-open-task.md"),
2721 "---\nid: 192\ntitle: open-task\nstatus: backlog\npriority: high\nclass: standard\n---\n\nTask description.\n",
2722 )
2723 .unwrap();
2724
2725 backdate_idle_grace(&mut daemon, "architect");
2726 daemon.maybe_intervene_architect_utilization().unwrap();
2727
2728 let pending = inbox::pending_messages(&root, "architect").unwrap();
2729 assert_eq!(pending.len(), 1);
2730 assert_eq!(pending[0].from, "daemon");
2731 assert!(pending[0].body.contains("Utilization recovery needed"));
2732 assert!(pending[0].body.contains("eng-1 on #191"));
2733 assert!(pending[0].body.contains("eng-2"));
2734 assert!(pending[0].body.contains("batty status"));
2735 assert!(pending[0].body.contains("batty send lead"));
2736 assert!(pending[0].body.contains("Start Task #192 on eng-2"));
2737 assert!(
2738 daemon
2739 .owned_task_interventions
2740 .contains_key("utilization::architect")
2741 );
2742 }
2743
2744 #[test]
2745 fn zero_engineers_topology_skips_executor_interventions() {
2746 let tmp = tempfile::tempdir().unwrap();
2747 let mut daemon = TestDaemonBuilder::new(tmp.path())
2748 .members(vec![
2749 architect_member("architect"),
2750 manager_member("lead", Some("architect")),
2751 ])
2752 .pane_map(HashMap::from([
2753 ("architect".to_string(), "%998".to_string()),
2754 ("lead".to_string(), "%999".to_string()),
2755 ]))
2756 .states(HashMap::from([
2757 ("architect".to_string(), MemberState::Idle),
2758 ("lead".to_string(), MemberState::Idle),
2759 ]))
2760 .build();
2761
2762 let root = inbox::inboxes_root(tmp.path());
2763 inbox::init_inbox(&root, "architect").unwrap();
2764 inbox::init_inbox(&root, "lead").unwrap();
2765 write_open_task_file(tmp.path(), 191, "queued-task", "todo");
2766
2767 backdate_idle_grace(&mut daemon, "architect");
2768 backdate_idle_grace(&mut daemon, "lead");
2769 daemon.maybe_intervene_manager_dispatch_gap().unwrap();
2770 daemon.maybe_intervene_architect_utilization().unwrap();
2771
2772 assert!(
2773 inbox::pending_messages(&root, "architect")
2774 .unwrap()
2775 .is_empty()
2776 );
2777 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2778 assert!(
2779 !daemon
2780 .owned_task_interventions
2781 .contains_key("dispatch::lead")
2782 );
2783 assert!(
2784 !daemon
2785 .owned_task_interventions
2786 .contains_key("utilization::architect")
2787 );
2788 }
2789
2790 #[test]
2791 fn single_role_topology_nudges_idle_member() {
2792 let tmp = tempfile::tempdir().unwrap();
2793 let mut daemon = TestDaemonBuilder::new(tmp.path())
2794 .members(vec![architect_member("solo")])
2795 .pane_map(HashMap::from([("solo".to_string(), "%999".to_string())]))
2796 .states(HashMap::from([("solo".to_string(), MemberState::Idle)]))
2797 .nudges(HashMap::from([(
2798 "solo".to_string(),
2799 NudgeSchedule {
2800 text: "Solo mode should keep moving.".to_string(),
2801 interval: Duration::from_secs(1),
2802 idle_since: Some(Instant::now() - Duration::from_secs(5)),
2803 fired_this_idle: false,
2804 paused: false,
2805 },
2806 )]))
2807 .build();
2808
2809 let root = inbox::inboxes_root(tmp.path());
2810 inbox::init_inbox(&root, "solo").unwrap();
2811
2812 backdate_idle_grace(&mut daemon, "solo");
2813 daemon.maybe_fire_nudges().unwrap();
2814
2815 let pending = inbox::pending_messages(&root, "solo").unwrap();
2816 assert_eq!(pending.len(), 1);
2817 assert_eq!(pending[0].from, "daemon");
2818 assert!(pending[0].body.contains("Solo mode should keep moving."));
2819 assert!(pending[0].body.contains("Idle nudge:"));
2820 assert_eq!(daemon.states.get("solo"), Some(&MemberState::Idle));
2821 assert!(
2822 daemon
2823 .nudges
2824 .get("solo")
2825 .is_some_and(|schedule| schedule.fired_this_idle)
2826 );
2827 }
2828
2829 #[test]
2830 fn all_members_working_suppresses_interventions() {
2831 let tmp = tempfile::tempdir().unwrap();
2832 let mut daemon = TestDaemonBuilder::new(tmp.path())
2833 .members(vec![
2834 architect_member("architect"),
2835 manager_member("lead", Some("architect")),
2836 engineer_member("eng-1", Some("lead"), false),
2837 engineer_member("eng-2", Some("lead"), false),
2838 ])
2839 .pane_map(HashMap::from([
2840 ("architect".to_string(), "%997".to_string()),
2841 ("lead".to_string(), "%998".to_string()),
2842 ("eng-1".to_string(), "%999".to_string()),
2843 ("eng-2".to_string(), "%996".to_string()),
2844 ]))
2845 .states(HashMap::from([
2846 ("architect".to_string(), MemberState::Working),
2847 ("lead".to_string(), MemberState::Working),
2848 ("eng-1".to_string(), MemberState::Working),
2849 ("eng-2".to_string(), MemberState::Working),
2850 ]))
2851 .build();
2852
2853 let root = inbox::inboxes_root(tmp.path());
2854 inbox::init_inbox(&root, "architect").unwrap();
2855 inbox::init_inbox(&root, "lead").unwrap();
2856 inbox::init_inbox(&root, "eng-1").unwrap();
2857 inbox::init_inbox(&root, "eng-2").unwrap();
2858
2859 let mut triage_message = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
2860 triage_message.timestamp = super::now_unix();
2861 let triage_id = inbox::deliver_to_inbox(&root, &triage_message).unwrap();
2862 inbox::mark_delivered(&root, "lead", &triage_id).unwrap();
2863
2864 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "eng-1");
2865 write_owned_task_file(tmp.path(), 192, "review-task", "review", "eng-2");
2866 write_open_task_file(tmp.path(), 193, "open-task", "todo");
2867
2868 daemon.maybe_intervene_triage_backlog().unwrap();
2869 daemon.maybe_intervene_owned_tasks().unwrap();
2870 daemon.maybe_intervene_review_backlog().unwrap();
2871 daemon.maybe_intervene_manager_dispatch_gap().unwrap();
2872 daemon.maybe_intervene_architect_utilization().unwrap();
2873
2874 assert!(
2875 inbox::pending_messages(&root, "architect")
2876 .unwrap()
2877 .is_empty()
2878 );
2879 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2880 assert!(inbox::pending_messages(&root, "eng-1").unwrap().is_empty());
2881 assert!(inbox::pending_messages(&root, "eng-2").unwrap().is_empty());
2882 assert!(daemon.triage_interventions.is_empty());
2883 assert!(daemon.owned_task_interventions.is_empty());
2884 }
2885
2886 #[test]
2887 fn manager_dispatch_gap_skips_when_pending_inbox_exists() {
2888 let tmp = tempfile::tempdir().unwrap();
2889 let mut daemon = TestDaemonBuilder::new(tmp.path())
2890 .members(vec![
2891 architect_member("architect"),
2892 manager_member("lead", Some("architect")),
2893 engineer_member("eng-1", Some("lead"), false),
2894 engineer_member("eng-2", Some("lead"), false),
2895 ])
2896 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2897 .states(HashMap::from([
2898 ("lead".to_string(), MemberState::Idle),
2899 ("eng-1".to_string(), MemberState::Idle),
2900 ("eng-2".to_string(), MemberState::Idle),
2901 ]))
2902 .build();
2903
2904 let root = inbox::inboxes_root(tmp.path());
2905 inbox::init_inbox(&root, "lead").unwrap();
2906 inbox::init_inbox(&root, "eng-1").unwrap();
2907 inbox::init_inbox(&root, "eng-2").unwrap();
2908 let message = inbox::InboxMessage::new_send("architect", "lead", "Handle this first.");
2909 inbox::deliver_to_inbox(&root, &message).unwrap();
2910
2911 write_owned_task_file(tmp.path(), 191, "active-task", "in-progress", "eng-1");
2912 write_open_task_file(tmp.path(), 192, "open-task", "todo");
2913
2914 backdate_idle_grace(&mut daemon, "lead");
2915 daemon.maybe_intervene_manager_dispatch_gap().unwrap();
2916
2917 let pending = inbox::pending_messages(&root, "lead").unwrap();
2918 assert_eq!(pending.len(), 1);
2919 assert_eq!(pending[0].from, "architect");
2920 assert!(
2921 !daemon
2922 .owned_task_interventions
2923 .contains_key("dispatch::lead")
2924 );
2925 }
2926
2927 #[test]
2928 fn owned_task_intervention_refires_at_exact_cooldown_boundary() {
2929 let tmp = tempfile::tempdir().unwrap();
2930 let mut daemon = TestDaemonBuilder::new(tmp.path())
2931 .members(vec![manager_member("lead", Some("architect"))])
2932 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
2933 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
2934 .build();
2935
2936 let root = inbox::inboxes_root(tmp.path());
2937 inbox::init_inbox(&root, "lead").unwrap();
2938 write_owned_task_file(tmp.path(), 191, "owned-task", "in-progress", "lead");
2939
2940 let cooldown = Duration::from_secs(
2941 daemon
2942 .config
2943 .team_config
2944 .automation
2945 .intervention_cooldown_secs,
2946 );
2947
2948 backdate_idle_grace(&mut daemon, "lead");
2949 daemon.intervention_cooldowns.insert(
2950 "lead".to_string(),
2951 Instant::now() - (cooldown - Duration::from_secs(1)),
2952 );
2953 daemon.maybe_intervene_owned_tasks().unwrap();
2954 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
2955
2956 daemon
2957 .intervention_cooldowns
2958 .insert("lead".to_string(), Instant::now() - cooldown);
2959 daemon.maybe_intervene_owned_tasks().unwrap();
2960
2961 let pending = inbox::pending_messages(&root, "lead").unwrap();
2962 assert_eq!(pending.len(), 1);
2963 assert!(
2964 pending[0]
2965 .body
2966 .contains("Owned active task backlog detected")
2967 );
2968 assert!(daemon.owned_task_interventions.contains_key("lead"));
2969 }
2970
2971 #[test]
2972 fn empty_board_skips_interventions() {
2973 let tmp = tempfile::tempdir().unwrap();
2974 let mut daemon = TestDaemonBuilder::new(tmp.path())
2975 .members(vec![
2976 architect_member("architect"),
2977 manager_member("lead", Some("architect")),
2978 engineer_member("eng-1", Some("lead"), false),
2979 ])
2980 .pane_map(HashMap::from([
2981 ("architect".to_string(), "%997".to_string()),
2982 ("lead".to_string(), "%998".to_string()),
2983 ("eng-1".to_string(), "%999".to_string()),
2984 ]))
2985 .states(HashMap::from([
2986 ("architect".to_string(), MemberState::Idle),
2987 ("lead".to_string(), MemberState::Idle),
2988 ("eng-1".to_string(), MemberState::Idle),
2989 ]))
2990 .build();
2991
2992 std::fs::create_dir_all(
2993 tmp.path()
2994 .join(".batty")
2995 .join("team_config")
2996 .join("board")
2997 .join("tasks"),
2998 )
2999 .unwrap();
3000 let root = inbox::inboxes_root(tmp.path());
3001 inbox::init_inbox(&root, "architect").unwrap();
3002 inbox::init_inbox(&root, "lead").unwrap();
3003 inbox::init_inbox(&root, "eng-1").unwrap();
3004
3005 backdate_idle_grace(&mut daemon, "architect");
3006 backdate_idle_grace(&mut daemon, "lead");
3007 backdate_idle_grace(&mut daemon, "eng-1");
3008
3009 daemon.maybe_intervene_triage_backlog().unwrap();
3010 daemon.maybe_intervene_owned_tasks().unwrap();
3011 daemon.maybe_intervene_review_backlog().unwrap();
3012 daemon.maybe_intervene_manager_dispatch_gap().unwrap();
3013 daemon.maybe_intervene_architect_utilization().unwrap();
3014
3015 assert!(
3016 inbox::pending_messages(&root, "architect")
3017 .unwrap()
3018 .is_empty()
3019 );
3020 assert!(inbox::pending_messages(&root, "lead").unwrap().is_empty());
3021 assert!(inbox::pending_messages(&root, "eng-1").unwrap().is_empty());
3022 assert!(daemon.triage_interventions.is_empty());
3023 assert!(daemon.owned_task_interventions.is_empty());
3024 }
3025
3026 #[test]
3027 fn test_starvation_detected() {
3028 let tmp = tempfile::tempdir().unwrap();
3029 let mut daemon = starvation_test_daemon(&tmp, Some(1));
3030 let root = inbox::inboxes_root(tmp.path());
3031 inbox::init_inbox(&root, "architect").unwrap();
3032 write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3033
3034 daemon.maybe_detect_pipeline_starvation().unwrap();
3035
3036 let pending = inbox::pending_messages(&root, "architect").unwrap();
3037 assert_eq!(pending.len(), 1);
3038 assert_eq!(pending[0].from, "daemon");
3039 assert_eq!(
3040 pending[0].body,
3041 "Pipeline running dry: 2 idle engineers, 1 todo tasks."
3042 );
3043 assert!(daemon.pipeline_starvation_fired);
3044 }
3045
3046 #[test]
3047 fn test_debounce() {
3048 let tmp = tempfile::tempdir().unwrap();
3049 let mut daemon = starvation_test_daemon(&tmp, Some(1));
3050 let root = inbox::inboxes_root(tmp.path());
3051 inbox::init_inbox(&root, "architect").unwrap();
3052 write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3053
3054 daemon.maybe_detect_pipeline_starvation().unwrap();
3055 daemon.maybe_detect_pipeline_starvation().unwrap();
3056
3057 let pending = inbox::pending_messages(&root, "architect").unwrap();
3058 assert_eq!(pending.len(), 1);
3059 assert!(daemon.pipeline_starvation_fired);
3060 }
3061
3062 #[test]
3063 fn test_threshold_config() {
3064 let tmp = tempfile::tempdir().unwrap();
3065 let mut daemon = starvation_test_daemon(&tmp, Some(2));
3066 let root = inbox::inboxes_root(tmp.path());
3067 inbox::init_inbox(&root, "architect").unwrap();
3068 write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3069
3070 daemon.maybe_detect_pipeline_starvation().unwrap();
3071 assert!(
3072 inbox::pending_messages(&root, "architect")
3073 .unwrap()
3074 .is_empty()
3075 );
3076 assert!(!daemon.pipeline_starvation_fired);
3077
3078 let disabled_tmp = tempfile::tempdir().unwrap();
3079 let mut disabled_daemon = starvation_test_daemon(&disabled_tmp, None);
3080 let disabled_root = inbox::inboxes_root(disabled_tmp.path());
3081 inbox::init_inbox(&disabled_root, "architect").unwrap();
3082 write_open_task_file(disabled_tmp.path(), 101, "queued-task", "todo");
3083
3084 disabled_daemon.maybe_detect_pipeline_starvation().unwrap();
3085 assert!(
3086 inbox::pending_messages(&disabled_root, "architect")
3087 .unwrap()
3088 .is_empty()
3089 );
3090 assert!(!disabled_daemon.pipeline_starvation_fired);
3091 }
3092
3093 #[test]
3094 fn test_reset_when_work_added() {
3095 let tmp = tempfile::tempdir().unwrap();
3096 let mut daemon = starvation_test_daemon(&tmp, Some(1));
3097 let root = inbox::inboxes_root(tmp.path());
3098 inbox::init_inbox(&root, "architect").unwrap();
3099 write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3100
3101 daemon.maybe_detect_pipeline_starvation().unwrap();
3102 assert!(daemon.pipeline_starvation_fired);
3103
3104 write_open_task_file(tmp.path(), 102, "queued-task-2", "backlog");
3106 daemon.maybe_detect_pipeline_starvation().unwrap();
3107 assert!(daemon.pipeline_starvation_fired);
3108
3109 write_open_task_file(tmp.path(), 103, "queued-task-3", "backlog");
3111 daemon.maybe_detect_pipeline_starvation().unwrap();
3112 assert!(!daemon.pipeline_starvation_fired);
3113
3114 std::fs::remove_file(
3116 tmp.path()
3117 .join(".batty")
3118 .join("team_config")
3119 .join("board")
3120 .join("tasks")
3121 .join("102-queued-task-2.md"),
3122 )
3123 .unwrap();
3124 std::fs::remove_file(
3125 tmp.path()
3126 .join(".batty")
3127 .join("team_config")
3128 .join("board")
3129 .join("tasks")
3130 .join("103-queued-task-3.md"),
3131 )
3132 .unwrap();
3133 daemon.pipeline_starvation_last_fired = Some(Instant::now() - Duration::from_secs(301));
3134 daemon.maybe_detect_pipeline_starvation().unwrap();
3135
3136 let pending = inbox::pending_messages(&root, "architect").unwrap();
3137 assert_eq!(pending.len(), 2);
3138 assert!(daemon.pipeline_starvation_fired);
3139 }
3140
3141 #[test]
3142 fn starvation_suppressed_when_engineer_has_active_board_item() {
3143 let tmp = tempfile::tempdir().unwrap();
3144 let mut daemon = starvation_test_daemon(&tmp, Some(1));
3145 let root = inbox::inboxes_root(tmp.path());
3146 inbox::init_inbox(&root, "architect").unwrap();
3147
3148 write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3150 write_board_task_file(
3151 tmp.path(),
3152 102,
3153 "review-task",
3154 "review",
3155 Some("eng-1"),
3156 &[],
3157 None,
3158 );
3159
3160 daemon.maybe_detect_pipeline_starvation().unwrap();
3161
3162 let pending = inbox::pending_messages(&root, "architect").unwrap();
3165 assert!(pending.is_empty());
3166 assert!(!daemon.pipeline_starvation_fired);
3167 }
3168
3169 #[test]
3170 fn starvation_suppressed_when_manager_working() {
3171 let tmp = tempfile::tempdir().unwrap();
3172 let mut daemon = TestDaemonBuilder::new(tmp.path())
3173 .members(vec![
3174 architect_member("architect"),
3175 manager_member("lead", Some("architect")),
3176 engineer_member("eng-1", Some("lead"), false),
3177 engineer_member("eng-2", Some("lead"), false),
3178 ])
3179 .workflow_policy(WorkflowPolicy {
3180 pipeline_starvation_threshold: Some(1),
3181 ..WorkflowPolicy::default()
3182 })
3183 .build();
3184 daemon.states = HashMap::from([
3185 ("lead".to_string(), MemberState::Working),
3186 ("eng-1".to_string(), MemberState::Idle),
3187 ("eng-2".to_string(), MemberState::Idle),
3188 ]);
3189 let root = inbox::inboxes_root(tmp.path());
3190 inbox::init_inbox(&root, "architect").unwrap();
3191 write_open_task_file(tmp.path(), 101, "queued-task", "todo");
3192
3193 daemon.maybe_detect_pipeline_starvation().unwrap();
3194
3195 let pending = inbox::pending_messages(&root, "architect").unwrap();
3197 assert!(pending.is_empty());
3198 }
3199
3200 #[test]
3201 fn maybe_intervene_triage_backlog_does_not_refire_while_prior_intervention_remains_pending() {
3202 let tmp = tempfile::tempdir().unwrap();
3203 let mut daemon = TestDaemonBuilder::new(tmp.path())
3204 .members(vec![
3205 manager_member("lead", Some("architect")),
3206 engineer_member("eng-1", Some("lead"), false),
3207 ])
3208 .pane_map(HashMap::from([("lead".to_string(), "%999".to_string())]))
3209 .states(HashMap::from([("lead".to_string(), MemberState::Idle)]))
3210 .build();
3211
3212 let root = inbox::inboxes_root(tmp.path());
3213 inbox::init_inbox(&root, "lead").unwrap();
3214 inbox::init_inbox(&root, "eng-1").unwrap();
3215 let mut result = inbox::InboxMessage::new_send("eng-1", "lead", "Task complete.");
3216 result.timestamp = super::now_unix();
3217 let id = inbox::deliver_to_inbox(&root, &result).unwrap();
3218 inbox::mark_delivered(&root, "lead", &id).unwrap();
3219
3220 daemon.update_automation_timers_for_state("lead", MemberState::Working);
3221 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
3222 backdate_idle_grace(&mut daemon, "lead");
3223 daemon.maybe_intervene_triage_backlog().unwrap();
3224
3225 daemon
3226 .states
3227 .insert("lead".to_string(), MemberState::Working);
3228 daemon.update_automation_timers_for_state("lead", MemberState::Working);
3229 daemon.states.insert("lead".to_string(), MemberState::Idle);
3230 daemon.update_automation_timers_for_state("lead", MemberState::Idle);
3231 backdate_idle_grace(&mut daemon, "lead");
3232 daemon.maybe_intervene_triage_backlog().unwrap();
3233
3234 assert_eq!(daemon.triage_interventions.get("lead"), Some(&1));
3235 let pending = inbox::pending_messages(&root, "lead").unwrap();
3236 assert_eq!(pending.len(), 1);
3237 assert!(pending.iter().all(|message| message.from == "architect"));
3238 }
3239
3240 #[test]
3241 fn maybe_fire_nudges_keeps_member_idle_when_delivery_falls_back_to_inbox() {
3242 let tmp = tempfile::tempdir().unwrap();
3243 let mut daemon = TestDaemonBuilder::new(tmp.path())
3244 .members(vec![architect_member("scientist")])
3245 .pane_map(HashMap::from([(
3246 "scientist".to_string(),
3247 "%999".to_string(),
3248 )]))
3249 .states(HashMap::from([(
3250 "scientist".to_string(),
3251 MemberState::Idle,
3252 )]))
3253 .nudges(HashMap::from([(
3254 "scientist".to_string(),
3255 NudgeSchedule {
3256 text: "Please make progress.".to_string(),
3257 interval: Duration::from_secs(1),
3258 idle_since: Some(Instant::now() - Duration::from_secs(5)),
3259 fired_this_idle: false,
3260 paused: false,
3261 },
3262 )]))
3263 .build();
3264
3265 backdate_idle_grace(&mut daemon, "scientist");
3266 daemon.maybe_fire_nudges().unwrap();
3267
3268 assert_eq!(daemon.states.get("scientist"), Some(&MemberState::Idle));
3269 let schedule = daemon.nudges.get("scientist").unwrap();
3270 assert!(!schedule.paused);
3271 assert!(schedule.fired_this_idle);
3272
3273 let messages =
3274 inbox::pending_messages(&inbox::inboxes_root(tmp.path()), "scientist").unwrap();
3275 assert_eq!(messages.len(), 1);
3276 assert_eq!(messages[0].from, "daemon");
3277 assert!(messages[0].body.contains("Please make progress."));
3278 assert!(messages[0].body.contains("Idle nudge:"));
3279 }
3280
3281 #[test]
3282 fn maybe_fire_nudges_skips_when_pending_inbox_exists() {
3283 let tmp = tempfile::tempdir().unwrap();
3284 let mut daemon = TestDaemonBuilder::new(tmp.path())
3285 .members(vec![architect_member("scientist")])
3286 .pane_map(HashMap::from([(
3287 "scientist".to_string(),
3288 "%999".to_string(),
3289 )]))
3290 .states(HashMap::from([(
3291 "scientist".to_string(),
3292 MemberState::Idle,
3293 )]))
3294 .nudges(HashMap::from([(
3295 "scientist".to_string(),
3296 NudgeSchedule {
3297 text: "Please make progress.".to_string(),
3298 interval: Duration::from_secs(1),
3299 idle_since: Some(Instant::now()),
3300 fired_this_idle: false,
3301 paused: false,
3302 },
3303 )]))
3304 .build();
3305
3306 let root = inbox::inboxes_root(tmp.path());
3307 inbox::init_inbox(&root, "scientist").unwrap();
3308 let message =
3309 inbox::InboxMessage::new_send("architect", "scientist", "Process this first.");
3310 inbox::deliver_to_inbox(&root, &message).unwrap();
3311
3312 backdate_idle_grace(&mut daemon, "scientist");
3313 daemon.maybe_fire_nudges().unwrap();
3314
3315 let messages = inbox::pending_messages(&root, "scientist").unwrap();
3316 assert_eq!(messages.len(), 1);
3317 assert_eq!(messages[0].from, "architect");
3318 let schedule = daemon.nudges.get("scientist").unwrap();
3319 assert!(!schedule.fired_this_idle);
3320 assert_eq!(daemon.states.get("scientist"), Some(&MemberState::Idle));
3321 }
3322
3323 #[test]
3324 fn automation_sender_prefers_direct_manager_and_config_fallback() {
3325 let tmp = tempfile::tempdir().unwrap();
3326 let mut daemon = TestDaemonBuilder::new(tmp.path())
3327 .members(vec![
3328 architect_member("architect"),
3329 manager_member("lead", Some("architect")),
3330 engineer_member("eng-1", Some("lead"), false),
3331 ])
3332 .build();
3333 daemon.config.team_config.automation_sender = Some("human".to_string());
3334
3335 assert_eq!(daemon.automation_sender_for("eng-1"), "lead");
3336 assert_eq!(daemon.automation_sender_for("lead"), "architect");
3337 assert_eq!(daemon.automation_sender_for("architect"), "human");
3338
3339 daemon.config.team_config.automation_sender = None;
3340 assert_eq!(daemon.automation_sender_for("architect"), "daemon");
3341 }
3342
3343 #[test]
3344 fn hot_reload_marker_round_trip() {
3345 let tmp = tempfile::tempdir().unwrap();
3346 let marker = hot_reload_marker_path(tmp.path());
3347
3348 write_hot_reload_marker(tmp.path()).unwrap();
3349 assert!(marker.exists());
3350 assert!(consume_hot_reload_marker(tmp.path()));
3351 assert!(!marker.exists());
3352 assert!(!consume_hot_reload_marker(tmp.path()));
3353 }
3354
3355 #[test]
3356 fn hot_reload_resume_args_include_resume_flag() {
3357 let tmp = tempfile::tempdir().unwrap();
3358 let args = hot_reload_daemon_args(tmp.path());
3359 let canonical_root = tmp.path().canonicalize().unwrap();
3360 assert_eq!(
3361 args,
3362 vec![
3363 "-v".to_string(),
3364 "daemon".to_string(),
3365 "--project-root".to_string(),
3366 canonical_root.to_string_lossy().to_string(),
3367 "--resume".to_string(),
3368 ]
3369 );
3370 }
3371
3372 #[test]
3373 fn hot_reload_fingerprint_detects_binary_change() {
3374 let tmp = tempfile::tempdir().unwrap();
3375 let binary = tmp.path().join("batty");
3376 fs::write(&binary, "old-binary").unwrap();
3377 let before = BinaryFingerprint::capture(&binary).unwrap();
3378
3379 std::thread::sleep(Duration::from_millis(1100));
3380 fs::write(&binary, "new-binary-build").unwrap();
3381 let after = BinaryFingerprint::capture(&binary).unwrap();
3382
3383 assert!(after.changed_from(&before));
3384 }
3385
3386 #[test]
3387 fn resume_decision_logged_to_orchestrator() {
3388 let tmp = tempfile::tempdir().unwrap();
3389 let member = MemberInstance {
3390 name: "architect".to_string(),
3391 role_name: "architect".to_string(),
3392 role_type: RoleType::Architect,
3393 agent: Some("claude".to_string()),
3394 prompt: None,
3395 reports_to: None,
3396 use_worktrees: false,
3397 };
3398 let mut daemon = TeamDaemon::new(DaemonConfig {
3399 project_root: tmp.path().to_path_buf(),
3400 team_config: TeamConfig {
3401 name: "test".to_string(),
3402 agent: None,
3403 workflow_mode: WorkflowMode::Hybrid,
3404 workflow_policy: WorkflowPolicy::default(),
3405 board: BoardConfig::default(),
3406 standup: StandupConfig::default(),
3407 automation: AutomationConfig::default(),
3408 automation_sender: None,
3409 external_senders: Vec::new(),
3410 orchestrator_pane: true,
3411 orchestrator_position: OrchestratorPosition::Bottom,
3412 layout: None,
3413 cost: Default::default(),
3414 event_log_max_bytes: crate::team::DEFAULT_EVENT_LOG_MAX_BYTES,
3415 retro_min_duration_secs: 60,
3416 roles: Vec::new(),
3417 },
3418 session: "test".to_string(),
3419 members: vec![member],
3420 pane_map: HashMap::from([("architect".to_string(), "%999".to_string())]),
3421 })
3422 .unwrap();
3423
3424 daemon.spawn_all_agents(false).unwrap();
3425
3426 let content =
3427 fs::read_to_string(tmp.path().join(".batty").join("orchestrator.log")).unwrap();
3428 assert!(content.contains("resume: architect=no (resume disabled)"));
3429 }
3430
3431 #[test]
3432 fn reconcile_clears_done_task() {
3433 let tmp = tempfile::tempdir().unwrap();
3434 std::fs::create_dir_all(tmp.path().join(".batty").join("team_config")).unwrap();
3435 write_owned_task_file(tmp.path(), 42, "finished-work", "done", "eng-1");
3436 let mut daemon = make_test_daemon(
3437 tmp.path(),
3438 vec![engineer_member("eng-1", Some("manager"), false)],
3439 );
3440 daemon.active_tasks.insert("eng-1".to_string(), 42);
3441
3442 daemon.reconcile_active_tasks().unwrap();
3443
3444 assert_eq!(daemon.active_task_id("eng-1"), None);
3445 }
3446
3447 #[test]
3448 fn reconcile_keeps_in_progress_task() {
3449 let tmp = tempfile::tempdir().unwrap();
3450 std::fs::create_dir_all(tmp.path().join(".batty").join("team_config")).unwrap();
3451 write_owned_task_file(tmp.path(), 42, "active-work", "in-progress", "eng-1");
3452 let mut daemon = make_test_daemon(
3453 tmp.path(),
3454 vec![engineer_member("eng-1", Some("manager"), false)],
3455 );
3456 daemon.active_tasks.insert("eng-1".to_string(), 42);
3457
3458 daemon.reconcile_active_tasks().unwrap();
3459
3460 assert_eq!(daemon.active_task_id("eng-1"), Some(42));
3461 }
3462
3463 #[test]
3464 fn reconcile_clears_missing_task() {
3465 let tmp = tempfile::tempdir().unwrap();
3466 let tasks_dir = tmp
3467 .path()
3468 .join(".batty")
3469 .join("team_config")
3470 .join("board")
3471 .join("tasks");
3472 std::fs::create_dir_all(&tasks_dir).unwrap();
3473 let mut daemon = make_test_daemon(
3475 tmp.path(),
3476 vec![engineer_member("eng-1", Some("manager"), false)],
3477 );
3478 daemon.active_tasks.insert("eng-1".to_string(), 99);
3479
3480 daemon.reconcile_active_tasks().unwrap();
3481
3482 assert_eq!(daemon.active_task_id("eng-1"), None);
3483 }
3484
3485 #[test]
3486 fn production_daemon_has_no_unwrap_or_expect_calls() {
3487 let count = production_unwrap_expect_count(Path::new(file!()));
3488 assert_eq!(count, 0, "production daemon.rs should avoid unwrap/expect");
3489 }
3490
3491 #[test]
3492 fn non_git_repo_disables_worktrees() {
3493 use crate::team::harness::TestHarness;
3494 use crate::team::test_support::engineer_member;
3495
3496 let harness = TestHarness::new()
3497 .with_member(engineer_member("eng-1", Some("manager"), true))
3498 .with_member_state("eng-1", MemberState::Idle);
3499 let daemon = harness.build_daemon().unwrap();
3500
3501 assert!(!daemon.is_git_repo);
3503 assert!(
3505 !daemon.member_uses_worktrees("eng-1"),
3506 "worktrees should be disabled when project is not a git repo"
3507 );
3508 }
3509
3510 #[test]
3511 fn git_repo_enables_worktrees() {
3512 use crate::team::harness::TestHarness;
3513 use crate::team::test_support::engineer_member;
3514
3515 let harness = TestHarness::new()
3516 .with_member(engineer_member("eng-1", Some("manager"), true))
3517 .with_member_state("eng-1", MemberState::Idle);
3518 let mut daemon = harness.build_daemon().unwrap();
3519
3520 daemon.is_git_repo = true;
3522
3523 assert!(
3524 daemon.member_uses_worktrees("eng-1"),
3525 "worktrees should be enabled when project is a git repo and member has use_worktrees=true"
3526 );
3527 }
3528
3529 fn write_review_task(project_root: &Path, id: u32, review_owner: &str) {
3532 write_review_task_with_priority(project_root, id, review_owner, "high");
3533 }
3534
3535 fn write_review_task_with_priority(
3536 project_root: &Path,
3537 id: u32,
3538 review_owner: &str,
3539 priority: &str,
3540 ) {
3541 let tasks_dir = project_root
3542 .join(".batty")
3543 .join("team_config")
3544 .join("board")
3545 .join("tasks");
3546 std::fs::create_dir_all(&tasks_dir).unwrap();
3547 std::fs::write(
3548 tasks_dir.join(format!("{id:03}-review-task-{id}.md")),
3549 format!(
3550 "---\nid: {id}\ntitle: review-task-{id}\nstatus: review\npriority: {priority}\nclass: standard\nclaimed_by: eng-1\nreview_owner: {review_owner}\n---\n\nTask description.\n"
3551 ),
3552 )
3553 .unwrap();
3554 }
3555
3556 fn stale_review_daemon(tmp: &tempfile::TempDir) -> TeamDaemon {
3557 TestDaemonBuilder::new(tmp.path())
3558 .members(vec![
3559 architect_member("architect"),
3560 manager_member("manager", Some("architect")),
3561 engineer_member("eng-1", Some("manager"), false),
3562 ])
3563 .workflow_policy(WorkflowPolicy {
3564 review_nudge_threshold_secs: 1800,
3565 review_timeout_secs: 7200,
3566 ..WorkflowPolicy::default()
3567 })
3568 .build()
3569 }
3570
3571 #[test]
3572 fn stale_review_sends_nudge_at_threshold() {
3573 let tmp = tempfile::tempdir().unwrap();
3574 write_review_task(tmp.path(), 42, "manager");
3575 let mut daemon = stale_review_daemon(&tmp);
3576
3577 let now = SystemTime::now()
3579 .duration_since(UNIX_EPOCH)
3580 .unwrap_or_default()
3581 .as_secs();
3582 daemon.review_first_seen.insert(42, now - 1801);
3583
3584 daemon.maybe_escalate_stale_reviews().unwrap();
3585
3586 assert!(daemon.review_nudge_sent.contains(&42));
3588
3589 let events_path = tmp
3591 .path()
3592 .join(".batty")
3593 .join("team_config")
3594 .join("events.jsonl");
3595 let events = std::fs::read_to_string(&events_path).unwrap_or_default();
3596 assert!(events.contains("review_nudge_sent"));
3597 }
3598
3599 #[test]
3600 fn stale_review_escalates_at_timeout() {
3601 let tmp = tempfile::tempdir().unwrap();
3602 write_review_task(tmp.path(), 42, "manager");
3603 let mut daemon = stale_review_daemon(&tmp);
3604
3605 let now = SystemTime::now()
3607 .duration_since(UNIX_EPOCH)
3608 .unwrap_or_default()
3609 .as_secs();
3610 daemon.review_first_seen.insert(42, now - 7201);
3611
3612 daemon.maybe_escalate_stale_reviews().unwrap();
3613
3614 assert!(!daemon.review_first_seen.contains_key(&42));
3616 assert!(!daemon.review_nudge_sent.contains(&42));
3617
3618 let tasks_dir = tmp
3620 .path()
3621 .join(".batty")
3622 .join("team_config")
3623 .join("board")
3624 .join("tasks");
3625 let tasks = crate::task::load_tasks_from_dir(&tasks_dir).unwrap();
3626 let task = tasks.iter().find(|t| t.id == 42).unwrap();
3627 assert_eq!(task.status, "blocked");
3628 assert_eq!(
3629 task.blocked_on.as_deref(),
3630 Some("review timeout escalated to architect")
3631 );
3632
3633 let events_path = tmp
3635 .path()
3636 .join(".batty")
3637 .join("team_config")
3638 .join("events.jsonl");
3639 let events = std::fs::read_to_string(&events_path).unwrap_or_default();
3640 assert!(events.contains("review_escalated"));
3641 }
3642
3643 #[test]
3644 fn nudge_only_sent_once() {
3645 let tmp = tempfile::tempdir().unwrap();
3646 write_review_task(tmp.path(), 42, "manager");
3647 let mut daemon = stale_review_daemon(&tmp);
3648
3649 let now = SystemTime::now()
3650 .duration_since(UNIX_EPOCH)
3651 .unwrap_or_default()
3652 .as_secs();
3653 daemon.review_first_seen.insert(42, now - 1801);
3654
3655 daemon.maybe_escalate_stale_reviews().unwrap();
3657 assert!(daemon.review_nudge_sent.contains(&42));
3658
3659 let events_path = tmp
3661 .path()
3662 .join(".batty")
3663 .join("team_config")
3664 .join("events.jsonl");
3665 let events_before = std::fs::read_to_string(&events_path)
3666 .unwrap_or_default()
3667 .matches("review_nudge_sent")
3668 .count();
3669
3670 daemon.maybe_escalate_stale_reviews().unwrap();
3672 let events_after = std::fs::read_to_string(&events_path)
3673 .unwrap_or_default()
3674 .matches("review_nudge_sent")
3675 .count();
3676
3677 assert_eq!(events_before, events_after, "nudge should not fire twice");
3678 }
3679
3680 #[test]
3681 fn config_nudge_threshold_defaults() {
3682 let policy = WorkflowPolicy::default();
3683 assert_eq!(policy.review_nudge_threshold_secs, 1800);
3684 assert_eq!(policy.review_timeout_secs, 7200);
3685 }
3686
3687 fn stale_review_daemon_with_overrides(tmp: &tempfile::TempDir) -> TeamDaemon {
3690 use crate::team::config::ReviewTimeoutOverride;
3691 let mut overrides = std::collections::HashMap::new();
3692 overrides.insert(
3693 "critical".to_string(),
3694 ReviewTimeoutOverride {
3695 review_nudge_threshold_secs: Some(300),
3696 review_timeout_secs: Some(600),
3697 },
3698 );
3699 overrides.insert(
3700 "high".to_string(),
3701 ReviewTimeoutOverride {
3702 review_nudge_threshold_secs: Some(900),
3703 review_timeout_secs: Some(3600),
3704 },
3705 );
3706 TestDaemonBuilder::new(tmp.path())
3707 .members(vec![
3708 architect_member("architect"),
3709 manager_member("manager", Some("architect")),
3710 engineer_member("eng-1", Some("manager"), false),
3711 ])
3712 .workflow_policy(WorkflowPolicy {
3713 review_nudge_threshold_secs: 1800,
3714 review_timeout_secs: 7200,
3715 review_timeout_overrides: overrides,
3716 ..WorkflowPolicy::default()
3717 })
3718 .build()
3719 }
3720
3721 #[test]
3722 fn critical_task_nudges_at_priority_override_threshold() {
3723 let tmp = tempfile::tempdir().unwrap();
3724 write_review_task_with_priority(tmp.path(), 50, "manager", "critical");
3725 let mut daemon = stale_review_daemon_with_overrides(&tmp);
3726
3727 let now = SystemTime::now()
3728 .duration_since(UNIX_EPOCH)
3729 .unwrap_or_default()
3730 .as_secs();
3731 daemon.review_first_seen.insert(50, now - 301);
3733
3734 daemon.maybe_escalate_stale_reviews().unwrap();
3735
3736 assert!(
3737 daemon.review_nudge_sent.contains(&50),
3738 "critical task should be nudged at 300s override"
3739 );
3740 }
3741
3742 #[test]
3743 fn critical_task_not_nudged_below_override_threshold() {
3744 let tmp = tempfile::tempdir().unwrap();
3745 write_review_task_with_priority(tmp.path(), 50, "manager", "critical");
3746 let mut daemon = stale_review_daemon_with_overrides(&tmp);
3747
3748 let now = SystemTime::now()
3749 .duration_since(UNIX_EPOCH)
3750 .unwrap_or_default()
3751 .as_secs();
3752 daemon.review_first_seen.insert(50, now - 200);
3754
3755 daemon.maybe_escalate_stale_reviews().unwrap();
3756
3757 assert!(
3758 !daemon.review_nudge_sent.contains(&50),
3759 "critical task should not be nudged before 300s"
3760 );
3761 }
3762
3763 #[test]
3764 fn critical_task_escalates_at_priority_override_threshold() {
3765 let tmp = tempfile::tempdir().unwrap();
3766 write_review_task_with_priority(tmp.path(), 50, "manager", "critical");
3767 let mut daemon = stale_review_daemon_with_overrides(&tmp);
3768
3769 let now = SystemTime::now()
3770 .duration_since(UNIX_EPOCH)
3771 .unwrap_or_default()
3772 .as_secs();
3773 daemon.review_first_seen.insert(50, now - 601);
3775
3776 daemon.maybe_escalate_stale_reviews().unwrap();
3777
3778 assert!(!daemon.review_first_seen.contains_key(&50));
3780
3781 let tasks_dir = tmp
3783 .path()
3784 .join(".batty")
3785 .join("team_config")
3786 .join("board")
3787 .join("tasks");
3788 let tasks = crate::task::load_tasks_from_dir(&tasks_dir).unwrap();
3789 let task = tasks.iter().find(|t| t.id == 50).unwrap();
3790 assert_eq!(task.status, "blocked");
3791 }
3792
3793 #[test]
3794 fn medium_task_uses_global_thresholds_when_no_override() {
3795 let tmp = tempfile::tempdir().unwrap();
3796 write_review_task_with_priority(tmp.path(), 51, "manager", "medium");
3797 let mut daemon = stale_review_daemon_with_overrides(&tmp);
3798
3799 let now = SystemTime::now()
3800 .duration_since(UNIX_EPOCH)
3801 .unwrap_or_default()
3802 .as_secs();
3803 daemon.review_first_seen.insert(51, now - 1000);
3805
3806 daemon.maybe_escalate_stale_reviews().unwrap();
3807
3808 assert!(
3809 !daemon.review_nudge_sent.contains(&51),
3810 "medium task should use global 1800s threshold, not critical 300s"
3811 );
3812 }
3813
3814 #[test]
3815 fn mixed_priority_tasks_get_different_thresholds() {
3816 let tmp = tempfile::tempdir().unwrap();
3817 write_review_task_with_priority(tmp.path(), 60, "manager", "critical");
3818 write_review_task_with_priority(tmp.path(), 61, "manager", "medium");
3819 let mut daemon = stale_review_daemon_with_overrides(&tmp);
3820
3821 let now = SystemTime::now()
3822 .duration_since(UNIX_EPOCH)
3823 .unwrap_or_default()
3824 .as_secs();
3825 daemon.review_first_seen.insert(60, now - 400);
3827 daemon.review_first_seen.insert(61, now - 400);
3828
3829 daemon.maybe_escalate_stale_reviews().unwrap();
3830
3831 assert!(
3832 daemon.review_nudge_sent.contains(&60),
3833 "critical task should be nudged at 400s (threshold 300s)"
3834 );
3835 assert!(
3836 !daemon.review_nudge_sent.contains(&61),
3837 "medium task should NOT be nudged at 400s (threshold 1800s)"
3838 );
3839 }
3840}