1use std::collections::{HashMap, HashSet};
19use std::fs;
20use std::path::{Path, PathBuf};
21#[cfg(test)]
22use std::time::SystemTime;
23use std::time::{Duration, Instant};
24
25use anyhow::{Context, Result, bail};
26use serde::{Deserialize, Serialize};
27use sha2::{Digest, Sha256};
28use tracing::{debug, info, warn};
29use uuid::Uuid;
30
31use super::board;
32use super::comms::{self, Channel};
33#[cfg(test)]
34use super::config::OrchestratorPosition;
35use super::config::{RoleType, TeamConfig};
36use super::delivery::{FailedDelivery, PendingMessage};
37use super::events::EventSink;
38use super::events::TeamEvent;
39use super::failure_patterns::FailureTracker;
40use super::hierarchy::MemberInstance;
41use super::inbox;
42use super::merge;
43use super::standup::{self, MemberState};
44use super::status;
45use super::task_cmd;
46#[cfg(test)]
47use super::task_loop::next_unclaimed_task;
48use super::task_loop::{
49 branch_is_merged_into, current_worktree_branch, engineer_base_branch_name,
50 preserve_worktree_with_commit, setup_engineer_worktree,
51};
52use super::verification::VerificationState;
53use super::watcher::{SessionWatcher, WatcherState};
54use super::{AssignmentDeliveryResult, AssignmentResultStatus, now_unix, store_assignment_result};
55use crate::agent::{self, BackendHealth};
56use crate::tmux;
57use dispatch::DispatchQueueEntry;
58
59const STALLED_MID_TURN_MARKER: &str = "stalled mid-turn";
60const STALLED_MID_TURN_RETRY_BACKOFF_SECS: [u64; 2] = [30, 60];
61
62#[path = "daemon/agent_handle.rs"]
63pub(super) mod agent_handle;
64#[path = "daemon/automation.rs"]
65mod automation;
66#[path = "daemon/config_reload.rs"]
67mod config_reload;
68#[path = "discord_bridge.rs"]
69mod discord_bridge;
70#[path = "dispatch/mod.rs"]
71mod dispatch;
72#[path = "daemon/error_handling.rs"]
73mod error_handling;
74#[path = "daemon/health/mod.rs"]
75mod health;
76#[path = "daemon/helpers.rs"]
77mod helpers;
78#[path = "daemon/hot_reload.rs"]
79mod hot_reload;
80#[path = "daemon/interventions/mod.rs"]
81mod interventions;
82#[path = "launcher.rs"]
83mod launcher;
84#[path = "daemon/merge_queue.rs"]
85mod merge_queue;
86#[path = "daemon/poll.rs"]
87mod poll;
88#[path = "daemon/reconcile.rs"]
89mod reconcile;
90#[cfg(any(test, feature = "scenario-test"))]
91#[path = "daemon/scenario_api.rs"]
92pub mod scenario_api;
93#[path = "daemon/shim_spawn.rs"]
94mod shim_spawn;
95#[path = "daemon/shim_state.rs"]
96mod shim_state;
97#[path = "daemon/spec_gen.rs"]
98mod spec_gen;
99#[path = "daemon/state.rs"]
100mod state;
101#[path = "telegram_bridge.rs"]
102mod telegram_bridge;
103#[path = "daemon/telemetry.rs"]
104pub(crate) mod telemetry;
105#[path = "daemon/tick_report.rs"]
106pub mod tick_report;
107#[path = "daemon/verification.rs"]
108pub(crate) mod verification;
109
110pub(crate) use self::discord_bridge::{
111 build_shutdown_snapshot, send_discord_shutdown_notice, send_discord_shutdown_summary,
112};
113#[cfg(test)]
114use self::dispatch::normalized_assignment_dir;
115pub(crate) use self::error_handling::{optional_subsystem_for_step, optional_subsystem_names};
116use self::helpers::{extract_nudge_section, role_prompt_path};
117use self::hot_reload::consume_hot_reload_marker;
118#[cfg(test)]
119use self::hot_reload::{
120 BinaryFingerprint, hot_reload_daemon_args, hot_reload_marker_path, write_hot_reload_marker,
121};
122pub(crate) use self::interventions::NudgeSchedule;
123use self::interventions::OwnedTaskInterventionState;
124use self::launcher::{
125 duplicate_claude_session_ids, load_launch_state, member_session_tracker_config,
126};
127pub(crate) use self::merge_queue::{MergeQueue, MergeRequest};
128pub use self::state::load_dispatch_queue_snapshot;
129#[cfg(test)]
130use self::state::{
131 PersistedDaemonState, PersistedNudgeState, daemon_state_path, load_daemon_state,
132 save_daemon_state,
133};
134pub(super) use super::delivery::MessageDelivery;
135
136pub struct DaemonConfig {
138 pub project_root: PathBuf,
139 pub team_config: TeamConfig,
140 pub session: String,
141 pub members: Vec<MemberInstance>,
142 pub pane_map: HashMap<String, String>,
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub(super) enum CleanroomBackend {
147 SkoolKit,
148 Ghidra,
149}
150
151impl CleanroomBackend {
152 fn detect(input_path: &Path) -> Result<Self> {
153 let extension = input_path
154 .extension()
155 .and_then(|ext| ext.to_str())
156 .map(|ext| ext.to_ascii_lowercase());
157 match extension.as_deref() {
158 Some("z80" | "sna") => Ok(Self::SkoolKit),
159 Some("nes" | "gb" | "gbc" | "com" | "exe") => Ok(Self::Ghidra),
160 _ => bail!(
161 "unsupported clean-room input '{}': expected one of .z80, .sna, .nes, .gb, .gbc, .com, or .exe",
162 input_path.display()
163 ),
164 }
165 }
166}
167
168pub struct TeamDaemon {
170 pub(super) config: DaemonConfig,
171 pub(super) watchers: HashMap<String, SessionWatcher>,
172 pub(super) states: HashMap<String, MemberState>,
173 pub(super) idle_started_at: HashMap<String, Instant>,
174 pub(super) active_tasks: HashMap<String, u32>,
175 pub(super) retry_counts: HashMap<String, u32>,
176 pub(super) dispatch_queue: Vec<DispatchQueueEntry>,
177 pub(super) triage_idle_epochs: HashMap<String, u64>,
178 pub(super) triage_interventions: HashMap<String, u64>,
179 pub(super) owned_task_interventions: HashMap<String, OwnedTaskInterventionState>,
180 pub(super) intervention_cooldowns: HashMap<String, Instant>,
181 pub(super) channels: HashMap<String, Box<dyn Channel>>,
182 pub(super) nudges: HashMap<String, NudgeSchedule>,
183 pub(super) discord_bot: Option<super::discord::DiscordBot>,
184 pub(super) discord_event_cursor: usize,
185 pub(super) telegram_bot: Option<super::telegram::TelegramBot>,
186 pub(super) failure_tracker: FailureTracker,
187 pub(super) event_sink: EventSink,
188 pub(super) paused_standups: HashSet<String>,
189 pub(super) last_standup: HashMap<String, Instant>,
190 pub(super) last_board_rotation: Instant,
191 pub(super) last_auto_archive: Instant,
192 pub(super) last_auto_dispatch: Instant,
193 pub(super) pipeline_starvation_fired: bool,
194 pub(super) pipeline_starvation_last_fired: Option<Instant>,
195 pub(super) planning_cycle_last_fired: Option<Instant>,
196 pub(super) planning_cycle_active: bool,
197 pub(super) retro_generated: bool,
198 pub(super) failed_deliveries: Vec<FailedDelivery>,
199 pub(super) review_first_seen: HashMap<u32, u64>,
200 pub(super) review_nudge_sent: HashSet<u32>,
201 pub(super) poll_cycle_count: u64,
202 pub(super) poll_interval: Duration,
203 pub(super) current_tick_errors: Vec<(String, String)>,
209 pub(super) is_git_repo: bool,
210 pub(super) is_multi_repo: bool,
212 pub(super) sub_repo_names: Vec<String>,
214 pub(super) subsystem_error_counts: HashMap<String, u32>,
216 pub(super) auto_merge_overrides: HashMap<u32, bool>,
217 pub(super) recent_dispatches: HashMap<(u32, String), Instant>,
219 pub(super) recent_escalations: HashMap<String, Instant>,
221 pub(super) telemetry_db: Option<rusqlite::Connection>,
223 pub(super) manual_assign_cooldowns: HashMap<String, Instant>,
225 pub(super) backend_health: HashMap<String, BackendHealth>,
227 pub(super) narration_tracker: health::narration::NarrationTracker,
229 pub(super) context_pressure_tracker: health::context::ContextPressureTracker,
231 pub(super) last_health_check: Instant,
233 pub(super) last_uncommitted_warn: HashMap<String, Instant>,
235 pub(super) last_shared_target_cleanup: Instant,
237 pub(super) last_disk_hygiene_check: Instant,
239 pub(super) verification_states: HashMap<String, VerificationState>,
241 pub(super) narration_rejection_counts: HashMap<u32, u32>,
244 pub(super) pending_delivery_queue: HashMap<String, Vec<PendingMessage>>,
247 pub(super) shim_handles: HashMap<String, agent_handle::AgentHandle>,
249 pub(super) last_shim_health_check: Instant,
251 pub(super) merge_queue: MergeQueue,
253}
254
255#[cfg(any(test, feature = "scenario-test"))]
256impl TeamDaemon {
257 pub fn scenario_hooks(&mut self) -> scenario_api::ScenarioHooks<'_> {
263 scenario_api::ScenarioHooks::new(self)
264 }
265}
266
267impl TeamDaemon {
268 pub(in crate::team) fn report_preserve_failure(
269 &mut self,
270 member_name: &str,
271 task_id: Option<u32>,
272 context: &str,
273 detail: &str,
274 ) {
275 let reason = match task_id {
276 Some(task_id) => format!(
277 "Task #{task_id} is blocked because Batty could not safely auto-save {member_name}'s dirty worktree before {context}. {detail}"
278 ),
279 None => format!(
280 "Batty could not safely auto-save {member_name}'s dirty worktree before {context}. {detail}"
281 ),
282 };
283
284 let detail_digest: u64 = {
290 use std::collections::hash_map::DefaultHasher;
291 use std::hash::{Hash, Hasher};
292 let mut hasher = DefaultHasher::new();
293 detail.hash(&mut hasher);
294 hasher.finish()
295 };
296 let task_key = task_id
297 .map(|id| id.to_string())
298 .unwrap_or_else(|| "-".to_string());
299 let dedup_key = format!("preserve:{member_name}:{task_key}:{context}:{detail_digest}");
300 if self.suppress_recent_escalation(dedup_key, Duration::from_secs(600)) {
301 return;
302 }
303
304 if let Some(task_id) = task_id {
305 if let Err(error) =
306 task_cmd::block_task_with_reason(&self.board_dir(), task_id, &reason)
307 {
308 warn!(
309 member = member_name,
310 task_id,
311 error = %error,
312 "failed to block task after dirty worktree preservation failure"
313 );
314 }
315 }
316 let manager = self.assignment_sender(member_name);
317 let _ = self.queue_daemon_message(member_name, &reason);
318 let _ = self.queue_daemon_message(&manager, &reason);
319 self.record_orchestrator_action(format!("blocked recovery: {reason}"));
320 }
321
322 #[allow(dead_code)]
323 pub(super) fn preserve_member_worktree(
324 &mut self,
325 member_name: &str,
326 commit_message: &str,
327 ) -> bool {
328 let policy = &self.config.team_config.workflow_policy;
329 if !policy.auto_commit_on_restart {
330 return false;
331 }
332
333 let Some(member) = self
334 .config
335 .members
336 .iter()
337 .find(|member| member.name == member_name)
338 else {
339 return false;
340 };
341 if member.role_type != RoleType::Engineer || !member.use_worktrees {
342 return false;
343 }
344
345 let worktree_dir = self
346 .config
347 .project_root
348 .join(".batty")
349 .join("worktrees")
350 .join(member_name);
351 if !worktree_dir.exists() {
352 return false;
353 }
354
355 match preserve_worktree_with_commit(
356 &worktree_dir,
357 commit_message,
358 Duration::from_secs(policy.graceful_shutdown_timeout_secs),
359 ) {
360 Ok(saved) => {
361 if saved {
362 info!(
363 member = member_name,
364 worktree = %worktree_dir.display(),
365 "auto-saved worktree before restart/shutdown"
366 );
367 }
368 saved
369 }
370 Err(error) => {
371 warn!(
372 member = member_name,
373 worktree = %worktree_dir.display(),
374 error = %error,
375 "failed to auto-save worktree before restart/shutdown"
376 );
377 self.report_preserve_failure(
378 member_name,
379 self.active_task_id(member_name),
380 "restart or shutdown",
381 &error.to_string(),
382 );
383 false
384 }
385 }
386 }
387
388 #[allow(dead_code)]
389 pub(super) fn watcher_mut(&mut self, name: &str) -> Result<&mut SessionWatcher> {
390 self.watchers
391 .get_mut(name)
392 .with_context(|| format!("watcher registry missing member '{name}'"))
393 }
394
395 pub fn new(config: DaemonConfig) -> Result<Self> {
397 let is_git_repo = super::git_cmd::is_git_repo(&config.project_root);
398 let (is_multi_repo, sub_repo_names) = if is_git_repo {
399 (false, Vec::new())
400 } else {
401 let subs = super::git_cmd::discover_sub_repos(&config.project_root);
402 if subs.is_empty() {
403 (false, Vec::new())
404 } else {
405 let names: Vec<String> = subs
406 .iter()
407 .filter_map(|p| p.file_name().map(|n| n.to_string_lossy().to_string()))
408 .collect();
409 info!(
410 sub_repos = ?names,
411 "Detected multi-repo project with {} sub-repos",
412 names.len()
413 );
414 (true, names)
415 }
416 };
417 if !is_git_repo && !is_multi_repo {
418 info!("Project is not a git repository \u{2014} git operations disabled");
419 }
420
421 let team_config_dir = config.project_root.join(".batty").join("team_config");
422 let events_path = team_config_dir.join("events.jsonl");
423 let event_sink =
424 EventSink::new_with_max_bytes(&events_path, config.team_config.event_log_max_bytes)?;
425
426 let mut watchers = HashMap::new();
428 let stale_secs = config.team_config.standup.interval_secs * 2;
429 for (name, pane_id) in &config.pane_map {
430 let session_tracker = config
431 .members
432 .iter()
433 .find(|member| member.name == *name)
434 .and_then(|member| member_session_tracker_config(&config.project_root, member));
435 watchers.insert(
436 name.clone(),
437 SessionWatcher::new(pane_id, name, stale_secs, session_tracker),
438 );
439 }
440
441 let mut channels: HashMap<String, Box<dyn Channel>> = HashMap::new();
443 for role in &config.team_config.roles {
444 if role.role_type == RoleType::User {
445 if let (Some(ch_type), Some(ch_config)) = (&role.channel, &role.channel_config) {
446 match comms::channel_from_config(ch_type, ch_config) {
447 Ok(ch) => {
448 channels.insert(role.name.clone(), ch);
449 }
450 Err(e) => {
451 warn!(role = %role.name, error = %e, "failed to create channel");
452 }
453 }
454 }
455 }
456 }
457
458 let discord_bot = discord_bridge::build_discord_bot(&config.team_config);
460 let telegram_bot = telegram_bridge::build_telegram_bot(&config.team_config);
462 let narration_detection_enabled = config
463 .team_config
464 .workflow_policy
465 .narration_detection_enabled;
466 let narration_threshold_polls =
467 config.team_config.workflow_policy.narration_threshold_polls;
468
469 let states = HashMap::new();
470
471 let mut nudges = HashMap::new();
473 for role in &config.team_config.roles {
474 if let Some(interval_secs) = role.nudge_interval_secs {
475 let prompt_path =
476 role_prompt_path(&team_config_dir, role.prompt.as_deref(), role.role_type);
477 if let Some(nudge_text) = extract_nudge_section(&prompt_path) {
478 let instance_names: Vec<String> = config
480 .members
481 .iter()
482 .filter(|m| m.role_name == role.name)
483 .map(|m| m.name.clone())
484 .collect();
485 for name in instance_names {
486 info!(member = %name, interval_secs, "registered nudge");
487 nudges.insert(
488 name,
489 NudgeSchedule {
490 text: nudge_text.clone(),
491 interval: Duration::from_secs(interval_secs),
492 idle_since: Some(Instant::now()),
494 fired_this_idle: false,
495 paused: false,
496 },
497 );
498 }
499 }
500 }
501 }
502
503 let telemetry_db = match super::telemetry_db::open(&config.project_root) {
505 Ok(conn) => {
506 info!("telemetry database opened");
507 Some(conn)
508 }
509 Err(error) => {
510 warn!(error = %error, "failed to open telemetry database; telemetry disabled");
511 None
512 }
513 };
514
515 let context_pressure_threshold = config
516 .team_config
517 .workflow_policy
518 .context_pressure_threshold;
519 let context_pressure_threshold_bytes = config
520 .team_config
521 .workflow_policy
522 .context_pressure_threshold_bytes;
523
524 Ok(Self {
525 config,
526 watchers,
527 states,
528 idle_started_at: HashMap::new(),
529 active_tasks: HashMap::new(),
530 retry_counts: HashMap::new(),
531 dispatch_queue: Vec::new(),
532 triage_idle_epochs: HashMap::new(),
533 triage_interventions: HashMap::new(),
534 owned_task_interventions: HashMap::new(),
535 intervention_cooldowns: HashMap::new(),
536 channels,
537 nudges,
538 discord_bot,
539 discord_event_cursor: crate::team::events::read_events(event_sink.path())
543 .map(|events| events.len())
544 .unwrap_or(0),
545 telegram_bot,
546 failure_tracker: FailureTracker::new(20),
547 event_sink,
548 paused_standups: HashSet::new(),
549 last_standup: HashMap::new(),
550 last_board_rotation: Instant::now(),
551 last_auto_archive: Instant::now(),
552 last_auto_dispatch: Instant::now(),
553 pipeline_starvation_fired: false,
554 pipeline_starvation_last_fired: None,
555 planning_cycle_last_fired: None,
556 planning_cycle_active: false,
557 retro_generated: false,
558 failed_deliveries: Vec::new(),
559 review_first_seen: HashMap::new(),
560 review_nudge_sent: HashSet::new(),
561 poll_cycle_count: 0,
562 current_tick_errors: Vec::new(),
563 poll_interval: Duration::from_secs(5),
564 is_git_repo,
565 is_multi_repo,
566 sub_repo_names,
567 subsystem_error_counts: HashMap::new(),
568 auto_merge_overrides: HashMap::new(),
569 recent_dispatches: HashMap::new(),
570 recent_escalations: HashMap::new(),
571 telemetry_db,
572 manual_assign_cooldowns: HashMap::new(),
573 backend_health: HashMap::new(),
574 narration_tracker: health::narration::NarrationTracker::new(
575 narration_detection_enabled,
576 narration_threshold_polls,
577 ),
578 context_pressure_tracker: health::context::ContextPressureTracker::new(
579 context_pressure_threshold,
580 context_pressure_threshold_bytes,
581 ),
582 last_health_check: Instant::now() - Duration::from_secs(3600),
584 last_uncommitted_warn: HashMap::new(),
585 last_shared_target_cleanup: Instant::now() - Duration::from_secs(3600),
586 last_disk_hygiene_check: Instant::now() - Duration::from_secs(3600),
587 verification_states: HashMap::new(),
588 narration_rejection_counts: HashMap::new(),
589 pending_delivery_queue: HashMap::new(),
590 shim_handles: HashMap::new(),
591 last_shim_health_check: Instant::now(),
592 merge_queue: MergeQueue::default(),
593 })
594 }
595
596 pub(crate) fn suppress_recent_escalation(
597 &mut self,
598 key: impl Into<String>,
599 window: Duration,
600 ) -> bool {
601 let now = Instant::now();
602 self.recent_escalations
603 .retain(|_, seen_at| now.duration_since(*seen_at) < window);
604
605 let key = key.into();
606 if self.recent_escalations.contains_key(&key) {
607 return true;
608 }
609
610 self.recent_escalations.insert(key, now);
611 false
612 }
613
614 pub(super) fn member_nudge_text(&self, member: &MemberInstance) -> Option<String> {
615 let prompt_path = role_prompt_path(
616 &super::team_config_dir(&self.config.project_root),
617 member.prompt.as_deref(),
618 member.role_type,
619 );
620 extract_nudge_section(&prompt_path)
621 }
622
623 pub(super) fn prepend_member_nudge(
624 &self,
625 member: &MemberInstance,
626 body: impl AsRef<str>,
627 ) -> String {
628 let body = body.as_ref();
629 match self.member_nudge_text(member) {
630 Some(nudge) => format!("{nudge}\n\n{body}"),
631 None => body.to_string(),
632 }
633 }
634
635 pub(super) fn mark_member_working(&mut self, member_name: &str) {
636 if self.shim_handles.contains_key(member_name) {
642 return;
643 }
644 self.states
645 .insert(member_name.to_string(), MemberState::Working);
646 if let Some(watcher) = self.watchers.get_mut(member_name) {
647 watcher.activate();
648 }
649 self.update_automation_timers_for_state(member_name, MemberState::Working);
650 }
651
652 pub(super) fn set_member_idle(&mut self, member_name: &str) {
653 if self.shim_handles.contains_key(member_name) {
658 if self.states.get(member_name) == Some(&MemberState::Idle) {
659 self.update_automation_timers_for_state(member_name, MemberState::Idle);
660 }
661 return;
662 }
663 self.states
664 .insert(member_name.to_string(), MemberState::Idle);
665 if let Some(watcher) = self.watchers.get_mut(member_name) {
666 watcher.deactivate();
667 }
668 self.update_automation_timers_for_state(member_name, MemberState::Idle);
669 }
670
671 pub(super) fn active_task_id(&self, engineer: &str) -> Option<u32> {
672 self.active_tasks.get(engineer).copied()
673 }
674
675 pub(super) fn preserve_worktree_before_restart(
676 &mut self,
677 member_name: &str,
678 worktree_dir: &Path,
679 reason: &str,
680 ) {
681 let Some(member) = self
682 .config
683 .members
684 .iter()
685 .find(|member| member.name == member_name)
686 else {
687 return;
688 };
689 if member.role_type != RoleType::Engineer || !member.use_worktrees {
690 return;
691 }
692 if !self
693 .config
694 .team_config
695 .workflow_policy
696 .auto_commit_on_restart
697 || !worktree_dir.exists()
698 {
699 return;
700 }
701
702 let timeout = Duration::from_secs(
703 self.config
704 .team_config
705 .workflow_policy
706 .graceful_shutdown_timeout_secs,
707 );
708 match preserve_worktree_with_commit(
709 worktree_dir,
710 "wip: auto-save before restart [batty]",
711 timeout,
712 ) {
713 Ok(true) => info!(
714 member = member_name,
715 worktree = %worktree_dir.display(),
716 reason,
717 "auto-saved dirty worktree before restart"
718 ),
719 Ok(false) => {}
720 Err(error) => {
721 warn!(
722 member = member_name,
723 worktree = %worktree_dir.display(),
724 reason,
725 error = %error,
726 "failed to auto-save dirty worktree before restart"
727 );
728 self.report_preserve_failure(
729 member_name,
730 self.active_task_id(member_name),
731 reason,
732 &error.to_string(),
733 );
734 }
735 }
736 }
737
738 pub(super) fn project_root(&self) -> &Path {
739 &self.config.project_root
740 }
741
742 #[cfg(test)]
743 pub(super) fn set_auto_merge_override(&mut self, task_id: u32, enabled: bool) {
744 self.auto_merge_overrides.insert(task_id, enabled);
745 }
746
747 pub(super) fn auto_merge_override(&self, task_id: u32) -> Option<bool> {
748 if let Some(&value) = self.auto_merge_overrides.get(&task_id) {
750 return Some(value);
751 }
752 let disk_overrides = super::auto_merge::load_overrides(&self.config.project_root);
753 disk_overrides.get(&task_id).copied()
754 }
755
756 pub(super) fn worktree_dir(&self, engineer: &str) -> PathBuf {
757 let base = self.config.project_root.join(".batty").join("worktrees");
758 match self.member_barrier_group(engineer) {
759 Some(group) if self.config.team_config.workflow_policy.clean_room_mode => {
760 base.join(group).join(engineer)
761 }
762 _ => base.join(engineer),
763 }
764 }
765
766 pub(super) fn board_dir(&self) -> PathBuf {
767 self.config
768 .project_root
769 .join(".batty")
770 .join("team_config")
771 .join("board")
772 }
773
774 pub(super) fn member_uses_worktrees(&self, engineer: &str) -> bool {
775 if !self.is_git_repo && !self.is_multi_repo {
776 return false;
777 }
778 self.config
779 .members
780 .iter()
781 .find(|member| member.name == engineer)
782 .map(|member| member.use_worktrees)
783 .unwrap_or(false)
784 }
785
786 pub(super) fn handoff_dir(&self) -> PathBuf {
787 self.config.project_root.join(
788 self.config
789 .team_config
790 .workflow_policy
791 .handoff_directory
792 .as_str(),
793 )
794 }
795
796 pub(super) fn member_barrier_group(&self, member_name: &str) -> Option<&str> {
797 let member = self
798 .config
799 .members
800 .iter()
801 .find(|member| member.name == member_name)?;
802 self.config
803 .team_config
804 .role_barrier_group(&member.role_name)
805 }
806
807 fn barrier_worktree_root(&self, barrier_group: &str) -> PathBuf {
808 self.config
809 .project_root
810 .join(".batty")
811 .join("worktrees")
812 .join(barrier_group)
813 }
814
815 #[cfg_attr(not(test), allow(dead_code))]
816 pub(super) fn analysis_dir(&self, member_name: &str) -> Result<PathBuf> {
817 let Some(group) = self.member_barrier_group(member_name) else {
818 bail!(
819 "member '{}' is not assigned to a clean-room barrier group",
820 member_name
821 );
822 };
823 if group != "analysis" {
824 bail!(
825 "member '{}' is in barrier group '{}' and cannot write analysis artifacts",
826 member_name,
827 group
828 );
829 }
830
831 Ok(self.worktree_dir(member_name).join("analysis"))
832 }
833
834 pub(super) fn validate_member_work_dir(
835 &self,
836 member_name: &str,
837 work_dir: &Path,
838 ) -> Result<()> {
839 if !self.config.team_config.workflow_policy.clean_room_mode {
840 return Ok(());
841 }
842
843 let expected = self.worktree_dir(member_name);
844 if work_dir == expected {
845 return Ok(());
846 }
847 bail!(
848 "clean-room barrier violation: member '{}' launch dir '{}' does not match '{}'",
849 member_name,
850 work_dir.display(),
851 expected.display()
852 );
853 }
854
855 #[cfg_attr(not(test), allow(dead_code))]
856 pub(super) fn validate_member_barrier_path(
857 &mut self,
858 member_name: &str,
859 path: &Path,
860 access: &str,
861 ) -> Result<()> {
862 if !self.config.team_config.workflow_policy.clean_room_mode {
863 return Ok(());
864 }
865
866 let Some(group) = self.member_barrier_group(member_name) else {
867 return Ok(());
868 };
869 let member_root = self.worktree_dir(member_name);
870 let barrier_root = self.barrier_worktree_root(group);
871 let handoff_root = self.handoff_dir();
872 if path.starts_with(&member_root)
873 || path.starts_with(&barrier_root)
874 || path.starts_with(&handoff_root)
875 {
876 return Ok(());
877 }
878
879 self.record_barrier_violation_attempt(
880 member_name,
881 &path.display().to_string(),
882 &format!("{access} outside barrier group '{group}'"),
883 );
884 bail!(
885 "clean-room barrier violation: '{}' cannot {} '{}'",
886 member_name,
887 access,
888 path.display()
889 );
890 }
891
892 #[cfg_attr(not(test), allow(dead_code))]
893 pub(super) fn write_handoff_artifact(
894 &mut self,
895 author_role: &str,
896 relative_path: &Path,
897 content: &[u8],
898 ) -> Result<PathBuf> {
899 if relative_path.is_absolute()
900 || relative_path
901 .components()
902 .any(|component| matches!(component, std::path::Component::ParentDir))
903 {
904 self.record_barrier_violation_attempt(
905 author_role,
906 &relative_path.display().to_string(),
907 "handoff writes must stay within the shared handoff directory",
908 );
909 bail!(
910 "invalid handoff artifact path '{}': must be relative and stay under handoff/",
911 relative_path.display()
912 );
913 }
914 let handoff_root = self.handoff_dir();
915 let artifact_path = handoff_root.join(relative_path);
916 let Some(parent) = artifact_path.parent() else {
917 bail!(
918 "handoff artifact path '{}' has no parent",
919 artifact_path.display()
920 );
921 };
922 std::fs::create_dir_all(parent)
923 .with_context(|| format!("failed to create {}", parent.display()))?;
924 std::fs::write(&artifact_path, content)
925 .with_context(|| format!("failed to write {}", artifact_path.display()))?;
926
927 let content_hash = format!("{:x}", Sha256::digest(content));
928 self.record_barrier_artifact_created(
929 author_role,
930 &artifact_path.display().to_string(),
931 &content_hash,
932 );
933 Ok(artifact_path)
934 }
935
936 #[cfg_attr(not(test), allow(dead_code))]
937 pub(super) fn write_analysis_artifact(
938 &mut self,
939 author_role: &str,
940 relative_path: &Path,
941 content: &[u8],
942 ) -> Result<PathBuf> {
943 if relative_path.is_absolute()
944 || relative_path
945 .components()
946 .any(|component| matches!(component, std::path::Component::ParentDir))
947 {
948 self.record_barrier_violation_attempt(
949 author_role,
950 &relative_path.display().to_string(),
951 "analysis artifact writes must stay within the analysis worktree",
952 );
953 bail!(
954 "invalid analysis artifact path '{}': must be relative and stay under analysis/",
955 relative_path.display()
956 );
957 }
958
959 let artifact_path = self.analysis_dir(author_role)?.join(relative_path);
960 let Some(parent) = artifact_path.parent() else {
961 bail!(
962 "analysis artifact path '{}' has no parent",
963 artifact_path.display()
964 );
965 };
966 std::fs::create_dir_all(parent)
967 .with_context(|| format!("failed to create {}", parent.display()))?;
968 std::fs::write(&artifact_path, content)
969 .with_context(|| format!("failed to write {}", artifact_path.display()))?;
970
971 let content_hash = format!("{:x}", Sha256::digest(content));
972 self.record_barrier_artifact_created(
973 author_role,
974 &artifact_path.display().to_string(),
975 &content_hash,
976 );
977 Ok(artifact_path)
978 }
979
980 #[cfg_attr(not(test), allow(dead_code))]
981 pub(super) fn run_skoolkit_disassembly(
982 &mut self,
983 author_role: &str,
984 snapshot_path: &Path,
985 output_relative_path: &Path,
986 ) -> Result<PathBuf> {
987 let snapshot_extension = snapshot_path
988 .extension()
989 .and_then(|ext| ext.to_str())
990 .map(|ext| ext.to_ascii_lowercase());
991 if !matches!(snapshot_extension.as_deref(), Some("z80" | "sna")) {
992 bail!(
993 "unsupported SkoolKit snapshot '{}': expected .z80 or .sna",
994 snapshot_path.display()
995 );
996 }
997
998 let sna2skool =
999 std::env::var("BATTY_SKOOLKIT_SNA2SKOOL").unwrap_or_else(|_| "sna2skool".to_string());
1000 let output = std::process::Command::new(&sna2skool)
1001 .arg(snapshot_path)
1002 .output()
1003 .with_context(|| {
1004 format!(
1005 "failed to launch '{}' for snapshot '{}'",
1006 sna2skool,
1007 snapshot_path.display()
1008 )
1009 })?;
1010 if !output.status.success() {
1011 bail!(
1012 "SkoolKit disassembly failed for '{}': {}",
1013 snapshot_path.display(),
1014 String::from_utf8_lossy(&output.stderr).trim()
1015 );
1016 }
1017
1018 self.write_analysis_artifact(author_role, output_relative_path, &output.stdout)
1019 }
1020
1021 #[cfg_attr(not(test), allow(dead_code))]
1022 pub(super) fn run_ghidra_disassembly(
1023 &mut self,
1024 author_role: &str,
1025 binary_path: &Path,
1026 output_relative_path: &Path,
1027 ) -> Result<PathBuf> {
1028 let backend = CleanroomBackend::detect(binary_path)?;
1029 if backend != CleanroomBackend::Ghidra {
1030 bail!(
1031 "unsupported Ghidra target '{}': expected .nes, .gb, .gbc, .com, or .exe",
1032 binary_path.display()
1033 );
1034 }
1035
1036 let analyze_headless = std::env::var("BATTY_GHIDRA_HEADLESS")
1037 .unwrap_or_else(|_| "analyzeHeadless".to_string());
1038 let output = std::process::Command::new(&analyze_headless)
1039 .arg(binary_path)
1040 .output()
1041 .with_context(|| {
1042 format!(
1043 "failed to launch '{}' for target '{}'",
1044 analyze_headless,
1045 binary_path.display()
1046 )
1047 })?;
1048 if !output.status.success() {
1049 bail!(
1050 "Ghidra disassembly failed for '{}': {}",
1051 binary_path.display(),
1052 String::from_utf8_lossy(&output.stderr).trim()
1053 );
1054 }
1055
1056 self.write_analysis_artifact(author_role, output_relative_path, &output.stdout)
1057 }
1058
1059 #[cfg_attr(not(test), allow(dead_code))]
1060 pub(super) fn run_cleanroom_disassembly(
1061 &mut self,
1062 author_role: &str,
1063 input_path: &Path,
1064 output_relative_path: &Path,
1065 ) -> Result<PathBuf> {
1066 match CleanroomBackend::detect(input_path)? {
1067 CleanroomBackend::SkoolKit => {
1068 self.run_skoolkit_disassembly(author_role, input_path, output_relative_path)
1069 }
1070 CleanroomBackend::Ghidra => {
1071 self.run_ghidra_disassembly(author_role, input_path, output_relative_path)
1072 }
1073 }
1074 }
1075
1076 #[cfg_attr(not(test), allow(dead_code))]
1077 pub(super) fn read_handoff_artifact(
1078 &mut self,
1079 reader_role: &str,
1080 relative_path: &Path,
1081 ) -> Result<Vec<u8>> {
1082 if relative_path.is_absolute()
1083 || relative_path
1084 .components()
1085 .any(|component| matches!(component, std::path::Component::ParentDir))
1086 {
1087 self.record_barrier_violation_attempt(
1088 reader_role,
1089 &relative_path.display().to_string(),
1090 "handoff reads must stay within the shared handoff directory",
1091 );
1092 bail!(
1093 "invalid handoff artifact path '{}': must be relative and stay under handoff/",
1094 relative_path.display()
1095 );
1096 }
1097 let artifact_path = self.handoff_dir().join(relative_path);
1098 self.validate_member_barrier_path(reader_role, &artifact_path, "read")?;
1099 let content = std::fs::read(&artifact_path)
1100 .with_context(|| format!("failed to read {}", artifact_path.display()))?;
1101 let content_hash = format!("{:x}", Sha256::digest(&content));
1102 self.record_barrier_artifact_read(
1103 reader_role,
1104 &artifact_path.display().to_string(),
1105 &content_hash,
1106 );
1107 Ok(content)
1108 }
1109
1110 pub(super) fn manager_name(&self, engineer: &str) -> Option<String> {
1111 self.config
1112 .members
1113 .iter()
1114 .find(|member| member.name == engineer)
1115 .and_then(|member| member.reports_to.clone())
1116 }
1117
1118 pub(super) fn architect_names(&self) -> Vec<String> {
1119 self.config
1120 .members
1121 .iter()
1122 .filter(|member| member.role_type == RoleType::Architect)
1123 .map(|member| member.name.clone())
1124 .collect()
1125 }
1126
1127 #[cfg(test)]
1128 pub(super) fn set_active_task_for_test(&mut self, engineer: &str, task_id: u32) {
1129 self.active_tasks.insert(engineer.to_string(), task_id);
1130 }
1131
1132 #[cfg(test)]
1133 pub(super) fn retry_count_for_test(&self, engineer: &str) -> Option<u32> {
1134 self.retry_counts.get(engineer).copied()
1135 }
1136
1137 #[cfg(test)]
1138 pub(super) fn member_state_for_test(&self, engineer: &str) -> Option<MemberState> {
1139 self.states.get(engineer).copied()
1140 }
1141
1142 #[cfg(test)]
1143 pub(super) fn set_member_state_for_test(&mut self, engineer: &str, state: MemberState) {
1144 self.states.insert(engineer.to_string(), state);
1145 }
1146
1147 #[cfg(test)]
1148 pub(crate) fn queued_merge_count_for_test(&self) -> usize {
1149 self.merge_queue.queued_len()
1150 }
1151
1152 #[cfg(test)]
1153 pub(crate) fn process_merge_queue_for_test(&mut self) -> Result<()> {
1154 self.process_merge_queue()
1155 }
1156
1157 pub(super) fn increment_retry(&mut self, engineer: &str) -> u32 {
1158 let count = self.retry_counts.entry(engineer.to_string()).or_insert(0);
1159 *count += 1;
1160 *count
1161 }
1162
1163 pub(super) fn response_is_stalled_mid_turn(&self, response: &str) -> bool {
1164 response
1165 .lines()
1166 .next()
1167 .is_some_and(|line| line.contains(STALLED_MID_TURN_MARKER))
1168 }
1169
1170 pub(super) fn handle_stalled_mid_turn_completion(
1171 &mut self,
1172 member_name: &str,
1173 response: &str,
1174 ) -> Result<bool> {
1175 if !self.response_is_stalled_mid_turn(response) {
1176 return Ok(false);
1177 }
1178
1179 let attempt = self.increment_retry(member_name);
1180 if let Some(backoff_secs) = stalled_mid_turn_backoff_secs(attempt) {
1181 warn!(
1182 member = member_name,
1183 attempt,
1184 backoff_secs,
1185 "shim reported stalled mid-turn completion; retrying after backoff"
1186 );
1187 self.record_orchestrator_action(format!(
1188 "stall: shim reported stalled mid-turn for {member_name}; retry {attempt} after {backoff_secs}s"
1189 ));
1190 sleep_stalled_mid_turn_backoff(Duration::from_secs(backoff_secs));
1191
1192 let retry_notice = format!(
1193 "Claude SDK stalled mid-turn and Batty released the stuck turn. Waited {backoff_secs}s before retrying.\n{response}\n\nContinue from the current worktree state. Do not restart or discard prior work unless the task requires it."
1194 );
1195 self.queue_message("daemon", member_name, &retry_notice)?;
1196 self.mark_member_working(member_name);
1197 return Ok(true);
1198 }
1199
1200 warn!(
1201 member = member_name,
1202 attempt, "shim reported stalled mid-turn completion; restarting agent"
1203 );
1204 self.record_orchestrator_action(format!(
1205 "stall: shim reported stalled mid-turn for {member_name}; restarting on attempt {attempt}"
1206 ));
1207 self.restart_member_with_task_context(member_name, "stalled mid-turn")?;
1208 if let Some(task_id) = self.active_task_id(member_name) {
1209 self.record_agent_restarted(
1210 member_name,
1211 task_id.to_string(),
1212 "stalled_mid_turn",
1213 attempt,
1214 );
1215 }
1216 Ok(true)
1217 }
1218
1219 pub(super) fn clear_active_task(&mut self, engineer: &str) {
1220 if let Some(task_id) = self.active_tasks.remove(engineer) {
1221 self.narration_rejection_counts.remove(&task_id);
1222 }
1223 self.retry_counts.remove(engineer);
1224 self.verification_states.remove(engineer);
1225 super::checkpoint::remove_checkpoint(&self.config.project_root, engineer);
1227 let work_dir = self
1228 .config
1229 .members
1230 .iter()
1231 .find(|member| member.name == engineer)
1232 .map(|member| self.member_work_dir(member))
1233 .unwrap_or_else(|| self.config.project_root.clone());
1234 super::checkpoint::remove_restart_context(&work_dir);
1235 }
1236
1237 pub(super) fn notify_reports_to(&mut self, from_role: &str, msg: &str) -> Result<()> {
1239 let parent = self
1240 .config
1241 .members
1242 .iter()
1243 .find(|m| m.name == from_role)
1244 .and_then(|m| m.reports_to.clone());
1245 let Some(parent_name) = parent else {
1246 return Ok(());
1247 };
1248 self.queue_message(from_role, &parent_name, msg)?;
1249 self.mark_member_working(&parent_name);
1250 Ok(())
1251 }
1252
1253 pub(super) fn notify_architects(&mut self, msg: &str) -> Result<()> {
1254 for architect in self.architect_names() {
1255 self.queue_message("daemon", &architect, msg)?;
1256 self.mark_member_working(&architect);
1257 }
1258 Ok(())
1259 }
1260
1261 pub(super) fn update_automation_timers_for_state(
1263 &mut self,
1264 member_name: &str,
1265 new_state: MemberState,
1266 ) {
1267 match new_state {
1268 MemberState::Idle => {
1269 self.idle_started_at
1270 .insert(member_name.to_string(), Instant::now());
1271 }
1272 MemberState::Working => {
1273 self.idle_started_at.remove(member_name);
1274 }
1275 }
1276 self.update_nudge_for_state(member_name, new_state);
1277 standup::update_timer_for_state(
1278 &self.config.team_config,
1279 &self.config.members,
1280 &mut self.paused_standups,
1281 &mut self.last_standup,
1282 member_name,
1283 new_state,
1284 );
1285 self.update_triage_intervention_for_state(member_name, new_state);
1286 }
1287}
1288
1289fn stalled_mid_turn_backoff_secs(attempt: u32) -> Option<u64> {
1290 STALLED_MID_TURN_RETRY_BACKOFF_SECS
1291 .get(attempt.saturating_sub(1) as usize)
1292 .copied()
1293}
1294
1295#[cfg(not(test))]
1296fn sleep_stalled_mid_turn_backoff(duration: Duration) {
1297 std::thread::sleep(duration);
1298}
1299
1300#[cfg(test)]
1301fn sleep_stalled_mid_turn_backoff(_duration: Duration) {}
1302
1303#[cfg(test)]
1304#[path = "daemon/tests.rs"]
1305mod tests;
1306
1307#[cfg(test)]
1308mod stalled_mid_turn_tests {
1309 use super::*;
1310 use crate::team::inbox;
1311 use crate::team::test_support::{TestDaemonBuilder, engineer_member, write_owned_task_file};
1312
1313 #[test]
1314 fn stalled_mid_turn_backoff_schedule_matches_task_requirements() {
1315 assert_eq!(stalled_mid_turn_backoff_secs(1), Some(30));
1316 assert_eq!(stalled_mid_turn_backoff_secs(2), Some(60));
1317 assert_eq!(stalled_mid_turn_backoff_secs(3), None);
1318 }
1319
1320 #[test]
1321 fn stalled_mid_turn_detection_matches_marker_prefix() {
1322 let tmp = tempfile::tempdir().unwrap();
1323 let daemon = TestDaemonBuilder::new(tmp.path()).build();
1324 assert!(daemon.response_is_stalled_mid_turn(
1325 "stalled mid-turn: no stdout from Claude SDK for 120s while working."
1326 ));
1327 assert!(!daemon.response_is_stalled_mid_turn("normal completion"));
1328 }
1329
1330 #[test]
1331 fn stalled_mid_turn_first_retry_requeues_message_after_backoff() {
1332 let tmp = tempfile::tempdir().unwrap();
1333 let member_name = "eng-1";
1334 write_owned_task_file(tmp.path(), 42, "sdk-stall", "in-progress", member_name);
1335 let inbox_root = inbox::inboxes_root(tmp.path());
1336 inbox::init_inbox(&inbox_root, member_name).unwrap();
1337
1338 let mut daemon = TestDaemonBuilder::new(tmp.path())
1339 .members(vec![engineer_member(member_name, Some("manager"), true)])
1340 .build();
1341 daemon.active_tasks.insert(member_name.to_string(), 42);
1342
1343 let handled = daemon
1344 .handle_stalled_mid_turn_completion(
1345 member_name,
1346 "stalled mid-turn: no stdout from Claude SDK for 120s while working.\nlast_sent_message_from: manager",
1347 )
1348 .unwrap();
1349
1350 assert!(handled);
1351 assert_eq!(daemon.retry_count_for_test(member_name), Some(1));
1352 assert_eq!(
1353 daemon.member_state_for_test(member_name),
1354 Some(MemberState::Working)
1355 );
1356
1357 let inbox_entries = inbox::pending_messages(&inbox_root, member_name).unwrap();
1358 assert_eq!(inbox_entries.len(), 1);
1359 assert!(inbox_entries[0].body.contains("Waited 30s before retrying"));
1360 assert!(inbox_entries[0].body.contains("stalled mid-turn"));
1361 }
1362}