1use std::collections::{HashMap, HashSet};
19use std::fs;
20use std::path::{Path, PathBuf};
21use std::process::Command;
22#[cfg(test)]
23use std::time::SystemTime;
24use std::time::{Duration, Instant};
25
26use anyhow::{Context, Result, bail};
27use serde::{Deserialize, Serialize};
28use sha2::{Digest, Sha256};
29use tracing::{debug, info, warn};
30use uuid::Uuid;
31
32use super::board;
33use super::comms::{self, Channel};
34#[cfg(test)]
35use super::config::OrchestratorPosition;
36use super::config::{RoleType, TeamConfig};
37use super::delivery::{FailedDelivery, PendingMessage};
38use super::events::EventSink;
39use super::events::TeamEvent;
40use super::failure_patterns::FailureTracker;
41use super::hierarchy::MemberInstance;
42use super::inbox;
43use super::merge;
44use super::standup::{self, MemberState};
45use super::status;
46use super::task_cmd;
47#[cfg(test)]
48use super::task_loop::next_unclaimed_task;
49use super::task_loop::{
50 branch_is_merged_into, current_worktree_branch, engineer_base_branch_name,
51 preserve_worktree_with_commit, setup_engineer_worktree,
52};
53use super::verification::VerificationState;
54use super::watcher::{SessionWatcher, WatcherState};
55use super::{AssignmentDeliveryResult, AssignmentResultStatus, now_unix, store_assignment_result};
56use crate::agent::{self, BackendHealth};
57use crate::tmux;
58use dispatch::DispatchQueueEntry;
59
60const STALLED_MID_TURN_MARKER: &str = "stalled mid-turn";
61const STALLED_MID_TURN_RETRY_BACKOFF_SECS: [u64; 2] = [30, 60];
62
63#[path = "daemon/agent_handle.rs"]
64pub(super) mod agent_handle;
65#[path = "daemon/automation.rs"]
66mod automation;
67#[path = "daemon/config_reload.rs"]
68mod config_reload;
69#[path = "discord_bridge.rs"]
70mod discord_bridge;
71#[path = "dispatch/mod.rs"]
72mod dispatch;
73#[path = "daemon/error_handling.rs"]
74mod error_handling;
75#[path = "daemon/health/mod.rs"]
76mod health;
77#[path = "daemon/helpers.rs"]
78mod helpers;
79#[path = "daemon/hot_reload.rs"]
80mod hot_reload;
81#[path = "daemon/interventions/mod.rs"]
82mod interventions;
83#[path = "launcher.rs"]
84mod launcher;
85#[path = "daemon/merge_queue.rs"]
86mod merge_queue;
87#[path = "daemon/poll.rs"]
88mod poll;
89#[path = "daemon/reconcile.rs"]
90mod reconcile;
91#[cfg(any(test, feature = "scenario-test"))]
92#[path = "daemon/scenario_api.rs"]
93pub mod scenario_api;
94#[path = "daemon/shim_spawn.rs"]
95mod shim_spawn;
96#[path = "daemon/shim_state.rs"]
97mod shim_state;
98#[path = "daemon/spec_gen.rs"]
99mod spec_gen;
100#[path = "daemon/state.rs"]
101mod state;
102#[path = "telegram_bridge.rs"]
103mod telegram_bridge;
104#[path = "daemon/telemetry.rs"]
105pub(crate) mod telemetry;
106#[path = "daemon/tick_report.rs"]
107pub mod tick_report;
108#[path = "daemon/verification.rs"]
109pub(crate) mod verification;
110
111pub(crate) use self::discord_bridge::{
112 build_shutdown_snapshot, send_discord_shutdown_notice, send_discord_shutdown_summary,
113};
114#[cfg(test)]
115use self::dispatch::normalized_assignment_dir;
116pub(crate) use self::error_handling::{optional_subsystem_for_step, optional_subsystem_names};
117use self::helpers::{extract_nudge_section, role_prompt_path};
118use self::hot_reload::consume_hot_reload_marker;
119#[cfg(test)]
120use self::hot_reload::{
121 BinaryFingerprint, hot_reload_daemon_args, hot_reload_marker_path, write_hot_reload_marker,
122};
123pub(crate) use self::interventions::NudgeSchedule;
124use self::interventions::OwnedTaskInterventionState;
125use self::launcher::{
126 duplicate_claude_session_ids, load_launch_state, member_session_tracker_config,
127};
128pub(crate) use self::merge_queue::{MergeQueue, MergeRequest};
129pub use self::state::load_dispatch_queue_snapshot;
130#[cfg(test)]
131use self::state::{
132 PersistedDaemonState, PersistedNudgeState, daemon_state_path, load_daemon_state,
133 save_daemon_state,
134};
135pub(super) use super::delivery::MessageDelivery;
136
137pub struct DaemonConfig {
139 pub project_root: PathBuf,
140 pub team_config: TeamConfig,
141 pub session: String,
142 pub members: Vec<MemberInstance>,
143 pub pane_map: HashMap<String, String>,
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
147pub(crate) struct MainSmokeState {
148 pub broken: bool,
149 pub pause_dispatch: bool,
150 pub last_run_at: u64,
151 #[serde(skip_serializing_if = "Option::is_none")]
152 pub last_success_commit: Option<String>,
153 #[serde(skip_serializing_if = "Option::is_none")]
154 pub broken_commit: Option<String>,
155 #[serde(default)]
156 pub suspects: Vec<String>,
157 #[serde(skip_serializing_if = "Option::is_none")]
158 pub summary: Option<String>,
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
162pub(super) enum CleanroomBackend {
163 SkoolKit,
164 Ghidra,
165}
166
167impl CleanroomBackend {
168 fn detect(input_path: &Path) -> Result<Self> {
169 let extension = input_path
170 .extension()
171 .and_then(|ext| ext.to_str())
172 .map(|ext| ext.to_ascii_lowercase());
173 match extension.as_deref() {
174 Some("z80" | "sna") => Ok(Self::SkoolKit),
175 Some("nes" | "gb" | "gbc" | "com" | "exe") => Ok(Self::Ghidra),
176 _ => bail!(
177 "unsupported clean-room input '{}': expected one of .z80, .sna, .nes, .gb, .gbc, .com, or .exe",
178 input_path.display()
179 ),
180 }
181 }
182}
183
184pub struct TeamDaemon {
186 pub(super) config: DaemonConfig,
187 pub(super) watchers: HashMap<String, SessionWatcher>,
188 pub(super) states: HashMap<String, MemberState>,
189 pub(super) idle_started_at: HashMap<String, Instant>,
190 pub(super) active_tasks: HashMap<String, u32>,
191 pub(super) retry_counts: HashMap<String, u32>,
192 pub(super) dispatch_queue: Vec<DispatchQueueEntry>,
193 pub(super) triage_idle_epochs: HashMap<String, u64>,
194 pub(super) triage_interventions: HashMap<String, u64>,
195 pub(super) owned_task_interventions: HashMap<String, OwnedTaskInterventionState>,
196 pub(super) intervention_cooldowns: HashMap<String, Instant>,
197 pub(super) channels: HashMap<String, Box<dyn Channel>>,
198 pub(super) nudges: HashMap<String, NudgeSchedule>,
199 pub(super) discord_bot: Option<super::discord::DiscordBot>,
200 pub(super) discord_event_cursor: usize,
201 pub(super) telegram_bot: Option<super::telegram::TelegramBot>,
202 pub(super) failure_tracker: FailureTracker,
203 pub(super) event_sink: EventSink,
204 pub(super) paused_standups: HashSet<String>,
205 pub(super) last_standup: HashMap<String, Instant>,
206 pub(super) last_board_rotation: Instant,
207 pub(super) last_auto_archive: Instant,
208 pub(super) last_auto_dispatch: Instant,
209 pub(super) last_main_smoke_check: Instant,
210 pub(super) pipeline_starvation_fired: bool,
211 pub(super) pipeline_starvation_last_fired: Option<Instant>,
212 pub(super) planning_cycle_last_fired: Option<Instant>,
213 pub(super) planning_cycle_active: bool,
214 pub(super) retro_generated: bool,
215 pub(super) failed_deliveries: Vec<FailedDelivery>,
216 pub(super) review_first_seen: HashMap<u32, u64>,
217 pub(super) review_nudge_sent: HashSet<u32>,
218 pub(super) poll_cycle_count: u64,
219 pub(super) poll_interval: Duration,
220 pub(super) current_tick_errors: Vec<(String, String)>,
226 pub(super) is_git_repo: bool,
227 pub(super) is_multi_repo: bool,
229 pub(super) sub_repo_names: Vec<String>,
231 pub(super) subsystem_error_counts: HashMap<String, u32>,
233 pub(super) auto_merge_overrides: HashMap<u32, bool>,
234 pub(super) recent_dispatches: HashMap<(u32, String), Instant>,
236 pub(super) recent_escalations: HashMap<String, Instant>,
238 pub(super) main_smoke_state: Option<MainSmokeState>,
240 pub(super) telemetry_db: Option<rusqlite::Connection>,
242 pub(super) manual_assign_cooldowns: HashMap<String, Instant>,
244 pub(super) backend_health: HashMap<String, BackendHealth>,
246 pub(super) narration_tracker: health::narration::NarrationTracker,
248 pub(super) context_pressure_tracker: health::context::ContextPressureTracker,
250 pub(super) last_health_check: Instant,
252 pub(super) last_uncommitted_warn: HashMap<String, Instant>,
254 pub(super) last_shared_target_cleanup: Instant,
256 pub(super) last_disk_hygiene_check: Instant,
258 pub(super) verification_states: HashMap<String, VerificationState>,
260 pub(super) narration_rejection_counts: HashMap<u32, u32>,
263 pub(super) zero_diff_completion_counts: HashMap<u32, u32>,
266 pub(super) pending_delivery_queue: HashMap<String, Vec<PendingMessage>>,
269 pub(super) shim_handles: HashMap<String, agent_handle::AgentHandle>,
271 pub(super) last_shim_health_check: Instant,
273 pub(super) merge_queue: MergeQueue,
275}
276
277#[cfg(any(test, feature = "scenario-test"))]
278impl TeamDaemon {
279 pub fn scenario_hooks(&mut self) -> scenario_api::ScenarioHooks<'_> {
285 scenario_api::ScenarioHooks::new(self)
286 }
287}
288
289impl TeamDaemon {
290 pub(in crate::team) fn report_preserve_failure(
291 &mut self,
292 member_name: &str,
293 task_id: Option<u32>,
294 context: &str,
295 detail: &str,
296 ) {
297 let reason = match task_id {
298 Some(task_id) => format!(
299 "Task #{task_id} is blocked because Batty could not safely auto-save {member_name}'s dirty worktree before {context}. {detail}"
300 ),
301 None => format!(
302 "Batty could not safely auto-save {member_name}'s dirty worktree before {context}. {detail}"
303 ),
304 };
305
306 let detail_digest: u64 = {
312 use std::collections::hash_map::DefaultHasher;
313 use std::hash::{Hash, Hasher};
314 let mut hasher = DefaultHasher::new();
315 detail.hash(&mut hasher);
316 hasher.finish()
317 };
318 let task_key = task_id
319 .map(|id| id.to_string())
320 .unwrap_or_else(|| "-".to_string());
321 let dedup_key = format!("preserve:{member_name}:{task_key}:{context}:{detail_digest}");
322 if self.suppress_recent_escalation(dedup_key, Duration::from_secs(600)) {
323 return;
324 }
325
326 if let Some(task_id) = task_id {
327 if let Err(error) =
328 task_cmd::block_task_with_reason(&self.board_dir(), task_id, &reason)
329 {
330 warn!(
331 member = member_name,
332 task_id,
333 error = %error,
334 "failed to block task after dirty worktree preservation failure"
335 );
336 }
337 }
338 let manager = self.assignment_sender(member_name);
339 let _ = self.queue_daemon_message(member_name, &reason);
340 let _ = self.queue_daemon_message(&manager, &reason);
341 self.record_orchestrator_action(format!("blocked recovery: {reason}"));
342 }
343
344 #[allow(dead_code)]
345 pub(super) fn preserve_member_worktree(
346 &mut self,
347 member_name: &str,
348 commit_message: &str,
349 ) -> bool {
350 let policy = &self.config.team_config.workflow_policy;
351 if !policy.auto_commit_on_restart {
352 return false;
353 }
354
355 let Some(member) = self
356 .config
357 .members
358 .iter()
359 .find(|member| member.name == member_name)
360 else {
361 return false;
362 };
363 if member.role_type != RoleType::Engineer || !member.use_worktrees {
364 return false;
365 }
366
367 let worktree_dir = self
368 .config
369 .project_root
370 .join(".batty")
371 .join("worktrees")
372 .join(member_name);
373 if !worktree_dir.exists() {
374 return false;
375 }
376
377 match preserve_worktree_with_commit(
378 &worktree_dir,
379 commit_message,
380 Duration::from_secs(policy.graceful_shutdown_timeout_secs),
381 ) {
382 Ok(saved) => {
383 if saved {
384 info!(
385 member = member_name,
386 worktree = %worktree_dir.display(),
387 "auto-saved worktree before restart/shutdown"
388 );
389 }
390 saved
391 }
392 Err(error) => {
393 warn!(
394 member = member_name,
395 worktree = %worktree_dir.display(),
396 error = %error,
397 "failed to auto-save worktree before restart/shutdown"
398 );
399 self.report_preserve_failure(
400 member_name,
401 self.active_task_id(member_name),
402 "restart or shutdown",
403 &error.to_string(),
404 );
405 false
406 }
407 }
408 }
409
410 #[allow(dead_code)]
411 pub(super) fn watcher_mut(&mut self, name: &str) -> Result<&mut SessionWatcher> {
412 self.watchers
413 .get_mut(name)
414 .with_context(|| format!("watcher registry missing member '{name}'"))
415 }
416
417 pub fn new(config: DaemonConfig) -> Result<Self> {
419 let is_git_repo = super::git_cmd::is_git_repo(&config.project_root);
420 let (is_multi_repo, sub_repo_names) = if is_git_repo {
421 (false, Vec::new())
422 } else {
423 let subs = super::git_cmd::discover_sub_repos(&config.project_root);
424 if subs.is_empty() {
425 (false, Vec::new())
426 } else {
427 let names: Vec<String> = subs
428 .iter()
429 .filter_map(|p| p.file_name().map(|n| n.to_string_lossy().to_string()))
430 .collect();
431 info!(
432 sub_repos = ?names,
433 "Detected multi-repo project with {} sub-repos",
434 names.len()
435 );
436 (true, names)
437 }
438 };
439 if !is_git_repo && !is_multi_repo {
440 info!("Project is not a git repository \u{2014} git operations disabled");
441 }
442
443 let team_config_dir = config.project_root.join(".batty").join("team_config");
444 let events_path = team_config_dir.join("events.jsonl");
445 let event_sink =
446 EventSink::new_with_max_bytes(&events_path, config.team_config.event_log_max_bytes)?;
447
448 let mut watchers = HashMap::new();
450 let stale_secs = config.team_config.standup.interval_secs * 2;
451 for (name, pane_id) in &config.pane_map {
452 let session_tracker = config
453 .members
454 .iter()
455 .find(|member| member.name == *name)
456 .and_then(|member| member_session_tracker_config(&config.project_root, member));
457 watchers.insert(
458 name.clone(),
459 SessionWatcher::new(pane_id, name, stale_secs, session_tracker),
460 );
461 }
462
463 let mut channels: HashMap<String, Box<dyn Channel>> = HashMap::new();
465 for role in &config.team_config.roles {
466 if role.role_type == RoleType::User {
467 if let (Some(ch_type), Some(ch_config)) = (&role.channel, &role.channel_config) {
468 match comms::channel_from_config(ch_type, ch_config) {
469 Ok(ch) => {
470 channels.insert(role.name.clone(), ch);
471 }
472 Err(e) => {
473 warn!(role = %role.name, error = %e, "failed to create channel");
474 }
475 }
476 }
477 }
478 }
479
480 let discord_bot = discord_bridge::build_discord_bot(&config.team_config);
482 let telegram_bot = telegram_bridge::build_telegram_bot(&config.team_config);
484 let narration_detection_enabled = config
485 .team_config
486 .workflow_policy
487 .narration_detection_enabled;
488 let narration_threshold_polls =
489 config.team_config.workflow_policy.narration_threshold_polls;
490
491 let states = HashMap::new();
492
493 let mut nudges = HashMap::new();
495 for role in &config.team_config.roles {
496 if let Some(interval_secs) = role.nudge_interval_secs {
497 let prompt_path =
498 role_prompt_path(&team_config_dir, role.prompt.as_deref(), role.role_type);
499 if let Some(nudge_text) = extract_nudge_section(&prompt_path) {
500 let instance_names: Vec<String> = config
502 .members
503 .iter()
504 .filter(|m| m.role_name == role.name)
505 .map(|m| m.name.clone())
506 .collect();
507 for name in instance_names {
508 info!(member = %name, interval_secs, "registered nudge");
509 nudges.insert(
510 name,
511 NudgeSchedule {
512 text: nudge_text.clone(),
513 interval: Duration::from_secs(interval_secs),
514 idle_since: Some(Instant::now()),
516 fired_this_idle: false,
517 paused: false,
518 },
519 );
520 }
521 }
522 }
523 }
524
525 let telemetry_db = match super::telemetry_db::open(&config.project_root) {
527 Ok(conn) => {
528 info!("telemetry database opened");
529 Some(conn)
530 }
531 Err(error) => {
532 warn!(error = %error, "failed to open telemetry database; telemetry disabled");
533 None
534 }
535 };
536
537 let context_pressure_threshold = config
538 .team_config
539 .workflow_policy
540 .context_pressure_threshold;
541 let context_pressure_threshold_bytes = config
542 .team_config
543 .workflow_policy
544 .context_pressure_threshold_bytes;
545
546 Ok(Self {
547 config,
548 watchers,
549 states,
550 idle_started_at: HashMap::new(),
551 active_tasks: HashMap::new(),
552 retry_counts: HashMap::new(),
553 dispatch_queue: Vec::new(),
554 triage_idle_epochs: HashMap::new(),
555 triage_interventions: HashMap::new(),
556 owned_task_interventions: HashMap::new(),
557 intervention_cooldowns: HashMap::new(),
558 channels,
559 nudges,
560 discord_bot,
561 discord_event_cursor: crate::team::events::read_events(event_sink.path())
565 .map(|events| events.len())
566 .unwrap_or(0),
567 telegram_bot,
568 failure_tracker: FailureTracker::new(20),
569 event_sink,
570 paused_standups: HashSet::new(),
571 last_standup: HashMap::new(),
572 last_board_rotation: Instant::now(),
573 last_auto_archive: Instant::now(),
574 last_auto_dispatch: Instant::now(),
575 last_main_smoke_check: Instant::now(),
576 pipeline_starvation_fired: false,
577 pipeline_starvation_last_fired: None,
578 planning_cycle_last_fired: None,
579 planning_cycle_active: false,
580 retro_generated: false,
581 failed_deliveries: Vec::new(),
582 review_first_seen: HashMap::new(),
583 review_nudge_sent: HashSet::new(),
584 poll_cycle_count: 0,
585 current_tick_errors: Vec::new(),
586 poll_interval: Duration::from_secs(5),
587 is_git_repo,
588 is_multi_repo,
589 sub_repo_names,
590 subsystem_error_counts: HashMap::new(),
591 auto_merge_overrides: HashMap::new(),
592 recent_dispatches: HashMap::new(),
593 recent_escalations: HashMap::new(),
594 main_smoke_state: None,
595 telemetry_db,
596 manual_assign_cooldowns: HashMap::new(),
597 backend_health: HashMap::new(),
598 narration_tracker: health::narration::NarrationTracker::new(
599 narration_detection_enabled,
600 narration_threshold_polls,
601 ),
602 context_pressure_tracker: health::context::ContextPressureTracker::new(
603 context_pressure_threshold,
604 context_pressure_threshold_bytes,
605 ),
606 last_health_check: Instant::now() - Duration::from_secs(3600),
608 last_uncommitted_warn: HashMap::new(),
609 last_shared_target_cleanup: Instant::now() - Duration::from_secs(3600),
610 last_disk_hygiene_check: Instant::now() - Duration::from_secs(3600),
611 verification_states: HashMap::new(),
612 narration_rejection_counts: HashMap::new(),
613 zero_diff_completion_counts: HashMap::new(),
614 pending_delivery_queue: HashMap::new(),
615 shim_handles: HashMap::new(),
616 last_shim_health_check: Instant::now(),
617 merge_queue: MergeQueue::default(),
618 })
619 }
620
621 pub(crate) fn suppress_recent_escalation(
622 &mut self,
623 key: impl Into<String>,
624 window: Duration,
625 ) -> bool {
626 let now = Instant::now();
627 self.recent_escalations
628 .retain(|_, seen_at| now.duration_since(*seen_at) < window);
629
630 let key = key.into();
631 if self.recent_escalations.contains_key(&key) {
632 return true;
633 }
634
635 self.recent_escalations.insert(key, now);
636 false
637 }
638
639 pub(super) fn member_nudge_text(&self, member: &MemberInstance) -> Option<String> {
640 let prompt_path = role_prompt_path(
641 &super::team_config_dir(&self.config.project_root),
642 member.prompt.as_deref(),
643 member.role_type,
644 );
645 extract_nudge_section(&prompt_path)
646 }
647
648 pub(super) fn prepend_member_nudge(
649 &self,
650 member: &MemberInstance,
651 body: impl AsRef<str>,
652 ) -> String {
653 let body = body.as_ref();
654 match self.member_nudge_text(member) {
655 Some(nudge) => format!("{nudge}\n\n{body}"),
656 None => body.to_string(),
657 }
658 }
659
660 pub(super) fn mark_member_working(&mut self, member_name: &str) {
661 if self.shim_handles.contains_key(member_name) {
667 return;
668 }
669 self.states
670 .insert(member_name.to_string(), MemberState::Working);
671 if let Some(watcher) = self.watchers.get_mut(member_name) {
672 watcher.activate();
673 }
674 self.update_automation_timers_for_state(member_name, MemberState::Working);
675 }
676
677 pub(super) fn set_member_idle(&mut self, member_name: &str) {
678 if self.shim_handles.contains_key(member_name) {
683 if self.states.get(member_name) == Some(&MemberState::Idle) {
684 self.update_automation_timers_for_state(member_name, MemberState::Idle);
685 }
686 return;
687 }
688 self.states
689 .insert(member_name.to_string(), MemberState::Idle);
690 if let Some(watcher) = self.watchers.get_mut(member_name) {
691 watcher.deactivate();
692 }
693 self.update_automation_timers_for_state(member_name, MemberState::Idle);
694 }
695
696 pub(super) fn active_task_id(&self, engineer: &str) -> Option<u32> {
697 self.active_tasks.get(engineer).copied()
698 }
699
700 pub(super) fn preserve_worktree_before_restart(
701 &mut self,
702 member_name: &str,
703 worktree_dir: &Path,
704 reason: &str,
705 ) {
706 let Some(member) = self
707 .config
708 .members
709 .iter()
710 .find(|member| member.name == member_name)
711 else {
712 return;
713 };
714 if member.role_type != RoleType::Engineer || !member.use_worktrees {
715 return;
716 }
717 if !self
718 .config
719 .team_config
720 .workflow_policy
721 .auto_commit_on_restart
722 || !worktree_dir.exists()
723 {
724 return;
725 }
726
727 let timeout = Duration::from_secs(
728 self.config
729 .team_config
730 .workflow_policy
731 .graceful_shutdown_timeout_secs,
732 );
733 match preserve_worktree_with_commit(
734 worktree_dir,
735 "wip: auto-save before restart [batty]",
736 timeout,
737 ) {
738 Ok(true) => info!(
739 member = member_name,
740 worktree = %worktree_dir.display(),
741 reason,
742 "auto-saved dirty worktree before restart"
743 ),
744 Ok(false) => {}
745 Err(error) => {
746 warn!(
747 member = member_name,
748 worktree = %worktree_dir.display(),
749 reason,
750 error = %error,
751 "failed to auto-save dirty worktree before restart"
752 );
753 self.report_preserve_failure(
754 member_name,
755 self.active_task_id(member_name),
756 reason,
757 &error.to_string(),
758 );
759 }
760 }
761 }
762
763 pub(super) fn project_root(&self) -> &Path {
764 &self.config.project_root
765 }
766
767 pub(super) fn dispatch_paused_by_main_smoke(&self) -> bool {
768 self.main_smoke_state
769 .as_ref()
770 .is_some_and(|state| state.broken && state.pause_dispatch)
771 }
772
773 pub(super) fn maybe_run_main_smoke(&mut self) -> Result<()> {
774 const DEFAULT_MAIN_SMOKE_SUSPECT_COMMITS: usize = 5;
775
776 let policy = self.config.team_config.workflow_policy.main_smoke.clone();
777 if !policy.enabled {
778 return Ok(());
779 }
780
781 let interval = Duration::from_secs(policy.interval_secs);
782 if self.last_main_smoke_check.elapsed() < interval {
783 return Ok(());
784 }
785 self.last_main_smoke_check = Instant::now();
786
787 if !self.is_git_repo || self.is_multi_repo {
788 return Ok(());
789 }
790
791 let command = policy.command.trim();
792 if command.is_empty() {
793 warn!("main smoke command is empty; skipping");
794 return Ok(());
795 }
796
797 let head = Self::short_head_commit(self.project_root())?;
798 self.record_orchestrator_action(format!("main smoke: running `{command}` at {head}"));
799
800 let test_run =
801 crate::team::task_loop::run_tests_in_worktree(self.project_root(), Some(command))
802 .with_context(|| {
803 format!(
804 "failed while running main smoke command `{command}` in {}",
805 self.project_root().display()
806 )
807 })?;
808
809 if test_run.passed {
810 let was_broken = self
811 .main_smoke_state
812 .as_ref()
813 .is_some_and(|state| state.broken);
814 self.main_smoke_state = Some(MainSmokeState {
815 broken: false,
816 pause_dispatch: policy.pause_dispatch_on_failure,
817 last_run_at: now_unix(),
818 last_success_commit: Some(head.clone()),
819 broken_commit: None,
820 suspects: Vec::new(),
821 summary: Some(format!("`{command}` passed on {head}")),
822 });
823 if was_broken {
824 self.emit_event(TeamEvent::main_smoke_recovered(&head, command));
825 self.record_orchestrator_action(format!(
826 "main smoke: recovered on {head}; dispatch gate cleared"
827 ));
828 }
829 return Ok(());
830 }
831
832 let suspects =
833 Self::recent_main_suspects(self.project_root(), DEFAULT_MAIN_SMOKE_SUSPECT_COMMITS)?;
834 let summary = Self::summarize_smoke_output(&test_run.output);
835 let should_emit = self.main_smoke_state.as_ref().is_none_or(|state| {
836 state.broken_commit.as_deref() != Some(head.as_str())
837 || state.summary.as_deref() != Some(summary.as_str())
838 });
839
840 let last_success_commit = self
841 .main_smoke_state
842 .as_ref()
843 .and_then(|state| state.last_success_commit.clone());
844 self.main_smoke_state = Some(MainSmokeState {
845 broken: true,
846 pause_dispatch: policy.pause_dispatch_on_failure,
847 last_run_at: now_unix(),
848 last_success_commit,
849 broken_commit: Some(head.clone()),
850 suspects: suspects.clone(),
851 summary: Some(summary.clone()),
852 });
853
854 if should_emit {
855 self.emit_event(TeamEvent::main_broken(&head, &suspects, &summary));
856 }
857 self.record_orchestrator_action(format!(
858 "main smoke: BROKEN at {head}; suspects [{}]; {summary}",
859 suspects.join(", ")
860 ));
861
862 if policy.auto_revert {
863 self.maybe_auto_revert_broken_main(&head)?;
864 }
865 Ok(())
866 }
867
868 fn maybe_auto_revert_broken_main(&mut self, broken_commit: &str) -> Result<()> {
869 let parent_line = super::git_cmd::run_git(
870 self.project_root(),
871 &["rev-list", "--parents", "-n", "1", "HEAD"],
872 )
873 .with_context(|| {
874 format!("failed to inspect parents for broken main commit {broken_commit}")
875 })?
876 .stdout;
877 let parent_count = parent_line.split_whitespace().count().saturating_sub(1);
878 let revert_args = if parent_count > 1 {
879 vec!["revert", "-m", "1", "--no-edit", "HEAD"]
880 } else {
881 vec!["revert", "--no-edit", "HEAD"]
882 };
883 let output = Command::new("git")
884 .args(&revert_args)
885 .current_dir(self.project_root())
886 .output()
887 .with_context(|| {
888 format!("failed to launch auto-revert for broken main commit {broken_commit}")
889 })?;
890 if output.status.success() {
891 let reverted_to = Self::short_head_commit(self.project_root())?;
892 info!(
893 broken_commit,
894 reverted_to, "main smoke auto-reverted most recent main commit"
895 );
896 self.record_orchestrator_action(format!(
897 "main smoke: auto-reverted broken commit {broken_commit}; main is now {reverted_to}"
898 ));
899 } else {
900 let stderr = String::from_utf8_lossy(&output.stderr);
901 warn!(
902 broken_commit,
903 error = %stderr.trim(),
904 "main smoke auto-revert failed"
905 );
906 self.record_orchestrator_action(format!(
907 "main smoke: auto-revert failed for {broken_commit} ({})",
908 stderr.trim()
909 ));
910 }
911 Ok(())
912 }
913
914 #[cfg(test)]
915 pub(super) fn set_auto_merge_override(&mut self, task_id: u32, enabled: bool) {
916 self.auto_merge_overrides.insert(task_id, enabled);
917 }
918
919 pub(super) fn auto_merge_override(&self, task_id: u32) -> Option<bool> {
920 if let Some(&value) = self.auto_merge_overrides.get(&task_id) {
922 return Some(value);
923 }
924 let disk_overrides = super::auto_merge::load_overrides(&self.config.project_root);
925 disk_overrides.get(&task_id).copied()
926 }
927
928 pub(super) fn worktree_dir(&self, engineer: &str) -> PathBuf {
929 let base = self.config.project_root.join(".batty").join("worktrees");
930 match self.member_barrier_group(engineer) {
931 Some(group) if self.config.team_config.workflow_policy.clean_room_mode => {
932 base.join(group).join(engineer)
933 }
934 _ => base.join(engineer),
935 }
936 }
937
938 pub(super) fn board_dir(&self) -> PathBuf {
939 self.config
940 .project_root
941 .join(".batty")
942 .join("team_config")
943 .join("board")
944 }
945
946 pub(super) fn member_uses_worktrees(&self, engineer: &str) -> bool {
947 if !self.is_git_repo && !self.is_multi_repo {
948 return false;
949 }
950 self.config
951 .members
952 .iter()
953 .find(|member| member.name == engineer)
954 .map(|member| member.use_worktrees)
955 .unwrap_or(false)
956 }
957
958 pub(super) fn handoff_dir(&self) -> PathBuf {
959 self.config.project_root.join(
960 self.config
961 .team_config
962 .workflow_policy
963 .handoff_directory
964 .as_str(),
965 )
966 }
967
968 pub(super) fn member_barrier_group(&self, member_name: &str) -> Option<&str> {
969 let member = self
970 .config
971 .members
972 .iter()
973 .find(|member| member.name == member_name)?;
974 self.config
975 .team_config
976 .role_barrier_group(&member.role_name)
977 }
978
979 fn barrier_worktree_root(&self, barrier_group: &str) -> PathBuf {
980 self.config
981 .project_root
982 .join(".batty")
983 .join("worktrees")
984 .join(barrier_group)
985 }
986
987 #[cfg_attr(not(test), allow(dead_code))]
988 pub(super) fn analysis_dir(&self, member_name: &str) -> Result<PathBuf> {
989 let Some(group) = self.member_barrier_group(member_name) else {
990 bail!(
991 "member '{}' is not assigned to a clean-room barrier group",
992 member_name
993 );
994 };
995 if group != "analysis" {
996 bail!(
997 "member '{}' is in barrier group '{}' and cannot write analysis artifacts",
998 member_name,
999 group
1000 );
1001 }
1002
1003 Ok(self.worktree_dir(member_name).join("analysis"))
1004 }
1005
1006 pub(super) fn validate_member_work_dir(
1007 &self,
1008 member_name: &str,
1009 work_dir: &Path,
1010 ) -> Result<()> {
1011 if !self.config.team_config.workflow_policy.clean_room_mode {
1012 return Ok(());
1013 }
1014
1015 let expected = self.worktree_dir(member_name);
1016 if work_dir == expected {
1017 return Ok(());
1018 }
1019 bail!(
1020 "clean-room barrier violation: member '{}' launch dir '{}' does not match '{}'",
1021 member_name,
1022 work_dir.display(),
1023 expected.display()
1024 );
1025 }
1026
1027 #[cfg_attr(not(test), allow(dead_code))]
1028 pub(super) fn validate_member_barrier_path(
1029 &mut self,
1030 member_name: &str,
1031 path: &Path,
1032 access: &str,
1033 ) -> Result<()> {
1034 if !self.config.team_config.workflow_policy.clean_room_mode {
1035 return Ok(());
1036 }
1037
1038 let Some(group) = self.member_barrier_group(member_name) else {
1039 return Ok(());
1040 };
1041 let member_root = self.worktree_dir(member_name);
1042 let barrier_root = self.barrier_worktree_root(group);
1043 let handoff_root = self.handoff_dir();
1044 if path.starts_with(&member_root)
1045 || path.starts_with(&barrier_root)
1046 || path.starts_with(&handoff_root)
1047 {
1048 return Ok(());
1049 }
1050
1051 self.record_barrier_violation_attempt(
1052 member_name,
1053 &path.display().to_string(),
1054 &format!("{access} outside barrier group '{group}'"),
1055 );
1056 bail!(
1057 "clean-room barrier violation: '{}' cannot {} '{}'",
1058 member_name,
1059 access,
1060 path.display()
1061 );
1062 }
1063
1064 #[cfg_attr(not(test), allow(dead_code))]
1065 pub(super) fn write_handoff_artifact(
1066 &mut self,
1067 author_role: &str,
1068 relative_path: &Path,
1069 content: &[u8],
1070 ) -> Result<PathBuf> {
1071 if relative_path.is_absolute()
1072 || relative_path
1073 .components()
1074 .any(|component| matches!(component, std::path::Component::ParentDir))
1075 {
1076 self.record_barrier_violation_attempt(
1077 author_role,
1078 &relative_path.display().to_string(),
1079 "handoff writes must stay within the shared handoff directory",
1080 );
1081 bail!(
1082 "invalid handoff artifact path '{}': must be relative and stay under handoff/",
1083 relative_path.display()
1084 );
1085 }
1086 let handoff_root = self.handoff_dir();
1087 let artifact_path = handoff_root.join(relative_path);
1088 let Some(parent) = artifact_path.parent() else {
1089 bail!(
1090 "handoff artifact path '{}' has no parent",
1091 artifact_path.display()
1092 );
1093 };
1094 std::fs::create_dir_all(parent)
1095 .with_context(|| format!("failed to create {}", parent.display()))?;
1096 std::fs::write(&artifact_path, content)
1097 .with_context(|| format!("failed to write {}", artifact_path.display()))?;
1098
1099 let content_hash = format!("{:x}", Sha256::digest(content));
1100 self.record_barrier_artifact_created(
1101 author_role,
1102 &artifact_path.display().to_string(),
1103 &content_hash,
1104 );
1105 Ok(artifact_path)
1106 }
1107
1108 #[cfg_attr(not(test), allow(dead_code))]
1109 pub(super) fn write_analysis_artifact(
1110 &mut self,
1111 author_role: &str,
1112 relative_path: &Path,
1113 content: &[u8],
1114 ) -> Result<PathBuf> {
1115 if relative_path.is_absolute()
1116 || relative_path
1117 .components()
1118 .any(|component| matches!(component, std::path::Component::ParentDir))
1119 {
1120 self.record_barrier_violation_attempt(
1121 author_role,
1122 &relative_path.display().to_string(),
1123 "analysis artifact writes must stay within the analysis worktree",
1124 );
1125 bail!(
1126 "invalid analysis artifact path '{}': must be relative and stay under analysis/",
1127 relative_path.display()
1128 );
1129 }
1130
1131 let artifact_path = self.analysis_dir(author_role)?.join(relative_path);
1132 let Some(parent) = artifact_path.parent() else {
1133 bail!(
1134 "analysis artifact path '{}' has no parent",
1135 artifact_path.display()
1136 );
1137 };
1138 std::fs::create_dir_all(parent)
1139 .with_context(|| format!("failed to create {}", parent.display()))?;
1140 std::fs::write(&artifact_path, content)
1141 .with_context(|| format!("failed to write {}", artifact_path.display()))?;
1142
1143 let content_hash = format!("{:x}", Sha256::digest(content));
1144 self.record_barrier_artifact_created(
1145 author_role,
1146 &artifact_path.display().to_string(),
1147 &content_hash,
1148 );
1149 Ok(artifact_path)
1150 }
1151
1152 #[cfg_attr(not(test), allow(dead_code))]
1153 pub(super) fn run_skoolkit_disassembly(
1154 &mut self,
1155 author_role: &str,
1156 snapshot_path: &Path,
1157 output_relative_path: &Path,
1158 ) -> Result<PathBuf> {
1159 let snapshot_extension = snapshot_path
1160 .extension()
1161 .and_then(|ext| ext.to_str())
1162 .map(|ext| ext.to_ascii_lowercase());
1163 if !matches!(snapshot_extension.as_deref(), Some("z80" | "sna")) {
1164 bail!(
1165 "unsupported SkoolKit snapshot '{}': expected .z80 or .sna",
1166 snapshot_path.display()
1167 );
1168 }
1169
1170 let sna2skool =
1171 std::env::var("BATTY_SKOOLKIT_SNA2SKOOL").unwrap_or_else(|_| "sna2skool".to_string());
1172 let output = std::process::Command::new(&sna2skool)
1173 .arg(snapshot_path)
1174 .output()
1175 .with_context(|| {
1176 format!(
1177 "failed to launch '{}' for snapshot '{}'",
1178 sna2skool,
1179 snapshot_path.display()
1180 )
1181 })?;
1182 if !output.status.success() {
1183 bail!(
1184 "SkoolKit disassembly failed for '{}': {}",
1185 snapshot_path.display(),
1186 String::from_utf8_lossy(&output.stderr).trim()
1187 );
1188 }
1189
1190 self.write_analysis_artifact(author_role, output_relative_path, &output.stdout)
1191 }
1192
1193 #[cfg_attr(not(test), allow(dead_code))]
1194 pub(super) fn run_ghidra_disassembly(
1195 &mut self,
1196 author_role: &str,
1197 binary_path: &Path,
1198 output_relative_path: &Path,
1199 ) -> Result<PathBuf> {
1200 let backend = CleanroomBackend::detect(binary_path)?;
1201 if backend != CleanroomBackend::Ghidra {
1202 bail!(
1203 "unsupported Ghidra target '{}': expected .nes, .gb, .gbc, .com, or .exe",
1204 binary_path.display()
1205 );
1206 }
1207
1208 let analyze_headless = std::env::var("BATTY_GHIDRA_HEADLESS")
1209 .unwrap_or_else(|_| "analyzeHeadless".to_string());
1210 let output = std::process::Command::new(&analyze_headless)
1211 .arg(binary_path)
1212 .output()
1213 .with_context(|| {
1214 format!(
1215 "failed to launch '{}' for target '{}'",
1216 analyze_headless,
1217 binary_path.display()
1218 )
1219 })?;
1220 if !output.status.success() {
1221 bail!(
1222 "Ghidra disassembly failed for '{}': {}",
1223 binary_path.display(),
1224 String::from_utf8_lossy(&output.stderr).trim()
1225 );
1226 }
1227
1228 self.write_analysis_artifact(author_role, output_relative_path, &output.stdout)
1229 }
1230
1231 #[cfg_attr(not(test), allow(dead_code))]
1232 pub(super) fn run_cleanroom_disassembly(
1233 &mut self,
1234 author_role: &str,
1235 input_path: &Path,
1236 output_relative_path: &Path,
1237 ) -> Result<PathBuf> {
1238 match CleanroomBackend::detect(input_path)? {
1239 CleanroomBackend::SkoolKit => {
1240 self.run_skoolkit_disassembly(author_role, input_path, output_relative_path)
1241 }
1242 CleanroomBackend::Ghidra => {
1243 self.run_ghidra_disassembly(author_role, input_path, output_relative_path)
1244 }
1245 }
1246 }
1247
1248 #[cfg_attr(not(test), allow(dead_code))]
1249 pub(super) fn read_handoff_artifact(
1250 &mut self,
1251 reader_role: &str,
1252 relative_path: &Path,
1253 ) -> Result<Vec<u8>> {
1254 if relative_path.is_absolute()
1255 || relative_path
1256 .components()
1257 .any(|component| matches!(component, std::path::Component::ParentDir))
1258 {
1259 self.record_barrier_violation_attempt(
1260 reader_role,
1261 &relative_path.display().to_string(),
1262 "handoff reads must stay within the shared handoff directory",
1263 );
1264 bail!(
1265 "invalid handoff artifact path '{}': must be relative and stay under handoff/",
1266 relative_path.display()
1267 );
1268 }
1269 let artifact_path = self.handoff_dir().join(relative_path);
1270 self.validate_member_barrier_path(reader_role, &artifact_path, "read")?;
1271 let content = std::fs::read(&artifact_path)
1272 .with_context(|| format!("failed to read {}", artifact_path.display()))?;
1273 let content_hash = format!("{:x}", Sha256::digest(&content));
1274 self.record_barrier_artifact_read(
1275 reader_role,
1276 &artifact_path.display().to_string(),
1277 &content_hash,
1278 );
1279 Ok(content)
1280 }
1281
1282 pub(super) fn manager_name(&self, engineer: &str) -> Option<String> {
1283 self.config
1284 .members
1285 .iter()
1286 .find(|member| member.name == engineer)
1287 .and_then(|member| member.reports_to.clone())
1288 }
1289
1290 pub(super) fn architect_names(&self) -> Vec<String> {
1291 self.config
1292 .members
1293 .iter()
1294 .filter(|member| member.role_type == RoleType::Architect)
1295 .map(|member| member.name.clone())
1296 .collect()
1297 }
1298
1299 fn short_head_commit(project_root: &Path) -> Result<String> {
1300 Ok(
1301 super::git_cmd::run_git(project_root, &["rev-parse", "--short", "HEAD"])?
1302 .stdout
1303 .trim()
1304 .to_string(),
1305 )
1306 }
1307
1308 fn recent_main_suspects(project_root: &Path, count: usize) -> Result<Vec<String>> {
1309 let limit = count.max(1).to_string();
1310 let output = super::git_cmd::run_git(
1311 project_root,
1312 &["log", "--format=%h %s", "-n", limit.as_str(), "main"],
1313 )?;
1314 Ok(output
1315 .stdout
1316 .lines()
1317 .map(str::trim)
1318 .filter(|line| !line.is_empty())
1319 .map(str::to_string)
1320 .collect())
1321 }
1322
1323 fn summarize_smoke_output(output: &str) -> String {
1324 fn strip_ansi(s: &str) -> String {
1327 let mut out = String::with_capacity(s.len());
1328 let mut chars = s.chars().peekable();
1329 while let Some(c) = chars.next() {
1330 if c == '\u{1b}' && chars.peek() == Some(&'[') {
1331 chars.next();
1332 for esc in chars.by_ref() {
1333 if esc.is_ascii_alphabetic() {
1334 break;
1335 }
1336 }
1337 } else {
1338 out.push(c);
1339 }
1340 }
1341 out
1342 }
1343
1344 let summary = output
1345 .lines()
1346 .map(|line| strip_ansi(line).trim().to_string())
1347 .find(|line| {
1348 !line.is_empty()
1349 && !line.starts_with("Compiling ")
1350 && !line.starts_with("Checking ")
1351 && !line.starts_with("Blocking waiting for file lock")
1352 && !line.starts_with("Finished ")
1353 && !line.starts_with("Running ")
1354 })
1355 .unwrap_or_else(|| "main smoke command failed".to_string());
1356 summary.chars().take(240).collect()
1357 }
1358
1359 #[cfg(test)]
1360 pub(super) fn set_active_task_for_test(&mut self, engineer: &str, task_id: u32) {
1361 self.active_tasks.insert(engineer.to_string(), task_id);
1362 }
1363
1364 #[cfg(test)]
1365 pub(super) fn retry_count_for_test(&self, engineer: &str) -> Option<u32> {
1366 self.retry_counts.get(engineer).copied()
1367 }
1368
1369 #[cfg(test)]
1370 pub(super) fn member_state_for_test(&self, engineer: &str) -> Option<MemberState> {
1371 self.states.get(engineer).copied()
1372 }
1373
1374 #[cfg(test)]
1375 pub(super) fn set_member_state_for_test(&mut self, engineer: &str, state: MemberState) {
1376 self.states.insert(engineer.to_string(), state);
1377 }
1378
1379 #[cfg(test)]
1380 pub(crate) fn queued_merge_count_for_test(&self) -> usize {
1381 self.merge_queue.queued_len()
1382 }
1383
1384 #[cfg(test)]
1385 pub(crate) fn process_merge_queue_for_test(&mut self) -> Result<()> {
1386 self.process_merge_queue()
1387 }
1388
1389 pub(super) fn increment_retry(&mut self, engineer: &str) -> u32 {
1390 let count = self.retry_counts.entry(engineer.to_string()).or_insert(0);
1391 *count += 1;
1392 *count
1393 }
1394
1395 pub(super) fn response_is_stalled_mid_turn(&self, response: &str) -> bool {
1396 response
1397 .lines()
1398 .next()
1399 .is_some_and(|line| line.contains(STALLED_MID_TURN_MARKER))
1400 }
1401
1402 pub(super) fn handle_stalled_mid_turn_completion(
1403 &mut self,
1404 member_name: &str,
1405 response: &str,
1406 ) -> Result<bool> {
1407 if !self.response_is_stalled_mid_turn(response) {
1408 return Ok(false);
1409 }
1410
1411 let attempt = self.increment_retry(member_name);
1412 if let Some(backoff_secs) = stalled_mid_turn_backoff_secs(attempt) {
1413 warn!(
1414 member = member_name,
1415 attempt,
1416 backoff_secs,
1417 "shim reported stalled mid-turn completion; retrying after backoff"
1418 );
1419 self.record_orchestrator_action(format!(
1420 "stall: shim reported stalled mid-turn for {member_name}; retry {attempt} after {backoff_secs}s"
1421 ));
1422 sleep_stalled_mid_turn_backoff(Duration::from_secs(backoff_secs));
1423
1424 let retry_notice = format!(
1425 "Claude SDK stalled mid-turn and Batty released the stuck turn. Waited {backoff_secs}s before retrying.\n{response}\n\nContinue from the current worktree state. Do not restart or discard prior work unless the task requires it."
1426 );
1427 self.queue_message("daemon", member_name, &retry_notice)?;
1428 self.mark_member_working(member_name);
1429 return Ok(true);
1430 }
1431
1432 warn!(
1433 member = member_name,
1434 attempt, "shim reported stalled mid-turn completion; restarting agent"
1435 );
1436 self.record_orchestrator_action(format!(
1437 "stall: shim reported stalled mid-turn for {member_name}; restarting on attempt {attempt}"
1438 ));
1439 self.restart_member_with_task_context(member_name, "stalled mid-turn")?;
1440 if let Some(task_id) = self.active_task_id(member_name) {
1441 self.record_agent_restarted(
1442 member_name,
1443 task_id.to_string(),
1444 "stalled_mid_turn",
1445 attempt,
1446 );
1447 }
1448 Ok(true)
1449 }
1450
1451 pub(super) fn clear_active_task(&mut self, engineer: &str) {
1452 if let Some(task_id) = self.active_tasks.remove(engineer) {
1453 self.narration_rejection_counts.remove(&task_id);
1454 self.zero_diff_completion_counts.remove(&task_id);
1455 }
1456 self.retry_counts.remove(engineer);
1457 self.verification_states.remove(engineer);
1458 super::checkpoint::remove_checkpoint(&self.config.project_root, engineer);
1460 let work_dir = self
1461 .config
1462 .members
1463 .iter()
1464 .find(|member| member.name == engineer)
1465 .map(|member| self.member_work_dir(member))
1466 .unwrap_or_else(|| self.config.project_root.clone());
1467 super::checkpoint::remove_restart_context(&work_dir);
1468 }
1469
1470 pub(super) fn note_zero_diff_completion(&mut self, task_id: u32) -> u32 {
1471 let count = self.zero_diff_completion_counts.entry(task_id).or_insert(0);
1472 *count += 1;
1473 *count
1474 }
1475
1476 pub(super) fn clear_zero_diff_completion(&mut self, task_id: u32) {
1477 self.zero_diff_completion_counts.remove(&task_id);
1478 }
1479
1480 pub(super) fn notify_reports_to(&mut self, from_role: &str, msg: &str) -> Result<()> {
1482 let parent = self
1483 .config
1484 .members
1485 .iter()
1486 .find(|m| m.name == from_role)
1487 .and_then(|m| m.reports_to.clone());
1488 let Some(parent_name) = parent else {
1489 return Ok(());
1490 };
1491 self.queue_message(from_role, &parent_name, msg)?;
1492 self.mark_member_working(&parent_name);
1493 Ok(())
1494 }
1495
1496 pub(super) fn notify_architects(&mut self, msg: &str) -> Result<()> {
1497 for architect in self.architect_names() {
1498 self.queue_message("daemon", &architect, msg)?;
1499 self.mark_member_working(&architect);
1500 }
1501 Ok(())
1502 }
1503
1504 pub(super) fn update_automation_timers_for_state(
1506 &mut self,
1507 member_name: &str,
1508 new_state: MemberState,
1509 ) {
1510 match new_state {
1511 MemberState::Idle => {
1512 self.idle_started_at
1513 .insert(member_name.to_string(), Instant::now());
1514 }
1515 MemberState::Working => {
1516 self.idle_started_at.remove(member_name);
1517 }
1518 }
1519 self.update_nudge_for_state(member_name, new_state);
1520 standup::update_timer_for_state(
1521 &self.config.team_config,
1522 &self.config.members,
1523 &mut self.paused_standups,
1524 &mut self.last_standup,
1525 member_name,
1526 new_state,
1527 );
1528 self.update_triage_intervention_for_state(member_name, new_state);
1529 }
1530}
1531
1532fn stalled_mid_turn_backoff_secs(attempt: u32) -> Option<u64> {
1533 STALLED_MID_TURN_RETRY_BACKOFF_SECS
1534 .get(attempt.saturating_sub(1) as usize)
1535 .copied()
1536}
1537
1538#[cfg(not(test))]
1539fn sleep_stalled_mid_turn_backoff(duration: Duration) {
1540 std::thread::sleep(duration);
1541}
1542
1543#[cfg(test)]
1544fn sleep_stalled_mid_turn_backoff(_duration: Duration) {}
1545
1546#[cfg(test)]
1547#[path = "daemon/tests.rs"]
1548mod tests;
1549
1550#[cfg(test)]
1551mod stalled_mid_turn_tests {
1552 use super::*;
1553 use crate::team::inbox;
1554 use crate::team::test_support::{TestDaemonBuilder, engineer_member, write_owned_task_file};
1555
1556 #[test]
1557 fn stalled_mid_turn_backoff_schedule_matches_task_requirements() {
1558 assert_eq!(stalled_mid_turn_backoff_secs(1), Some(30));
1559 assert_eq!(stalled_mid_turn_backoff_secs(2), Some(60));
1560 assert_eq!(stalled_mid_turn_backoff_secs(3), None);
1561 }
1562
1563 #[test]
1564 fn stalled_mid_turn_detection_matches_marker_prefix() {
1565 let tmp = tempfile::tempdir().unwrap();
1566 let daemon = TestDaemonBuilder::new(tmp.path()).build();
1567 assert!(daemon.response_is_stalled_mid_turn(
1568 "stalled mid-turn: no stdout from Claude SDK for 120s while working."
1569 ));
1570 assert!(!daemon.response_is_stalled_mid_turn("normal completion"));
1571 }
1572
1573 #[test]
1574 fn stalled_mid_turn_first_retry_requeues_message_after_backoff() {
1575 let tmp = tempfile::tempdir().unwrap();
1576 let member_name = "eng-1";
1577 write_owned_task_file(tmp.path(), 42, "sdk-stall", "in-progress", member_name);
1578 let inbox_root = inbox::inboxes_root(tmp.path());
1579 inbox::init_inbox(&inbox_root, member_name).unwrap();
1580
1581 let mut daemon = TestDaemonBuilder::new(tmp.path())
1582 .members(vec![engineer_member(member_name, Some("manager"), true)])
1583 .build();
1584 daemon.active_tasks.insert(member_name.to_string(), 42);
1585
1586 let handled = daemon
1587 .handle_stalled_mid_turn_completion(
1588 member_name,
1589 "stalled mid-turn: no stdout from Claude SDK for 120s while working.\nlast_sent_message_from: manager",
1590 )
1591 .unwrap();
1592
1593 assert!(handled);
1594 assert_eq!(daemon.retry_count_for_test(member_name), Some(1));
1595 assert_eq!(
1596 daemon.member_state_for_test(member_name),
1597 Some(MemberState::Working)
1598 );
1599
1600 let inbox_entries = inbox::pending_messages(&inbox_root, member_name).unwrap();
1601 assert_eq!(inbox_entries.len(), 1);
1602 assert!(inbox_entries[0].body.contains("Waited 30s before retrying"));
1603 assert!(inbox_entries[0].body.contains("stalled mid-turn"));
1604 }
1605}