Skip to main content

batty_cli/team/
daemon.rs

1//! Core team daemon: poll loop, lifecycle coordination, and routing.
2//!
3//! `TeamDaemon` owns the long-running control loop for a Batty team session.
4//! It starts and resumes member agents, polls tmux-backed watchers, routes
5//! messages across panes, inboxes, and external channels, persists runtime
6//! state, and runs periodic automation such as standups and board rotation.
7//!
8//! Focused subsystems that were extracted from this file stay close to the
9//! daemon boundary:
10//! - `merge` handles engineer completion, test gating, and merge/escalation
11//!   flow once a task is reported done.
12//! - `interventions` handles idle nudges and manager/architect intervention
13//!   automation without changing the daemon's main control flow.
14//!
15//! This module remains the integration layer that sequences those subsystems
16//! inside each poll iteration.
17
18use std::collections::{HashMap, HashSet};
19use std::fs;
20use std::path::{Path, PathBuf};
21#[cfg(test)]
22use std::time::SystemTime;
23use std::time::{Duration, Instant};
24
25use anyhow::{Context, Result, bail};
26use serde::{Deserialize, Serialize};
27use sha2::{Digest, Sha256};
28use tracing::{debug, info, warn};
29use uuid::Uuid;
30
31use super::board;
32use super::comms::{self, Channel};
33#[cfg(test)]
34use super::config::OrchestratorPosition;
35use super::config::{RoleType, TeamConfig};
36use super::delivery::{FailedDelivery, PendingMessage};
37use super::events::EventSink;
38use super::events::TeamEvent;
39use super::failure_patterns::FailureTracker;
40use super::hierarchy::MemberInstance;
41use super::inbox;
42use super::merge;
43use super::standup::{self, MemberState};
44use super::status;
45use super::task_cmd;
46#[cfg(test)]
47use super::task_loop::next_unclaimed_task;
48use super::task_loop::{
49    branch_is_merged_into, current_worktree_branch, engineer_base_branch_name,
50    preserve_worktree_with_commit, setup_engineer_worktree,
51};
52use super::verification::VerificationState;
53use super::watcher::{SessionWatcher, WatcherState};
54use super::{AssignmentDeliveryResult, AssignmentResultStatus, now_unix, store_assignment_result};
55use crate::agent::{self, BackendHealth};
56use crate::tmux;
57use dispatch::DispatchQueueEntry;
58
59const STALLED_MID_TURN_MARKER: &str = "stalled mid-turn";
60const STALLED_MID_TURN_RETRY_BACKOFF_SECS: [u64; 2] = [30, 60];
61
62#[path = "daemon/agent_handle.rs"]
63pub(super) mod agent_handle;
64#[path = "daemon/automation.rs"]
65mod automation;
66#[path = "daemon/config_reload.rs"]
67mod config_reload;
68#[path = "discord_bridge.rs"]
69mod discord_bridge;
70#[path = "dispatch/mod.rs"]
71mod dispatch;
72#[path = "daemon/error_handling.rs"]
73mod error_handling;
74#[path = "daemon/health/mod.rs"]
75mod health;
76#[path = "daemon/helpers.rs"]
77mod helpers;
78#[path = "daemon/hot_reload.rs"]
79mod hot_reload;
80#[path = "daemon/interventions/mod.rs"]
81mod interventions;
82#[path = "launcher.rs"]
83mod launcher;
84#[path = "daemon/merge_queue.rs"]
85mod merge_queue;
86#[path = "daemon/poll.rs"]
87mod poll;
88#[path = "daemon/reconcile.rs"]
89mod reconcile;
90#[cfg(any(test, feature = "scenario-test"))]
91#[path = "daemon/scenario_api.rs"]
92pub mod scenario_api;
93#[path = "daemon/shim_spawn.rs"]
94mod shim_spawn;
95#[path = "daemon/shim_state.rs"]
96mod shim_state;
97#[path = "daemon/spec_gen.rs"]
98mod spec_gen;
99#[path = "daemon/state.rs"]
100mod state;
101#[path = "telegram_bridge.rs"]
102mod telegram_bridge;
103#[path = "daemon/telemetry.rs"]
104pub(crate) mod telemetry;
105#[path = "daemon/tick_report.rs"]
106pub mod tick_report;
107#[path = "daemon/verification.rs"]
108pub(crate) mod verification;
109
110pub(crate) use self::discord_bridge::{
111    build_shutdown_snapshot, send_discord_shutdown_notice, send_discord_shutdown_summary,
112};
113#[cfg(test)]
114use self::dispatch::normalized_assignment_dir;
115pub(crate) use self::error_handling::{optional_subsystem_for_step, optional_subsystem_names};
116use self::helpers::{extract_nudge_section, role_prompt_path};
117use self::hot_reload::consume_hot_reload_marker;
118#[cfg(test)]
119use self::hot_reload::{
120    BinaryFingerprint, hot_reload_daemon_args, hot_reload_marker_path, write_hot_reload_marker,
121};
122pub(crate) use self::interventions::NudgeSchedule;
123use self::interventions::OwnedTaskInterventionState;
124use self::launcher::{
125    duplicate_claude_session_ids, load_launch_state, member_session_tracker_config,
126};
127pub(crate) use self::merge_queue::{MergeQueue, MergeRequest};
128pub use self::state::load_dispatch_queue_snapshot;
129#[cfg(test)]
130use self::state::{
131    PersistedDaemonState, PersistedNudgeState, daemon_state_path, load_daemon_state,
132    save_daemon_state,
133};
134pub(super) use super::delivery::MessageDelivery;
135
136/// Daemon configuration derived from TeamConfig.
137pub struct DaemonConfig {
138    pub project_root: PathBuf,
139    pub team_config: TeamConfig,
140    pub session: String,
141    pub members: Vec<MemberInstance>,
142    pub pane_map: HashMap<String, String>,
143}
144
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub(super) enum CleanroomBackend {
147    SkoolKit,
148    Ghidra,
149}
150
151impl CleanroomBackend {
152    fn detect(input_path: &Path) -> Result<Self> {
153        let extension = input_path
154            .extension()
155            .and_then(|ext| ext.to_str())
156            .map(|ext| ext.to_ascii_lowercase());
157        match extension.as_deref() {
158            Some("z80" | "sna") => Ok(Self::SkoolKit),
159            Some("nes" | "gb" | "gbc" | "com" | "exe") => Ok(Self::Ghidra),
160            _ => bail!(
161                "unsupported clean-room input '{}': expected one of .z80, .sna, .nes, .gb, .gbc, .com, or .exe",
162                input_path.display()
163            ),
164        }
165    }
166}
167
168/// The running team daemon.
169pub struct TeamDaemon {
170    pub(super) config: DaemonConfig,
171    pub(super) watchers: HashMap<String, SessionWatcher>,
172    pub(super) states: HashMap<String, MemberState>,
173    pub(super) idle_started_at: HashMap<String, Instant>,
174    pub(super) active_tasks: HashMap<String, u32>,
175    pub(super) retry_counts: HashMap<String, u32>,
176    pub(super) dispatch_queue: Vec<DispatchQueueEntry>,
177    pub(super) triage_idle_epochs: HashMap<String, u64>,
178    pub(super) triage_interventions: HashMap<String, u64>,
179    pub(super) owned_task_interventions: HashMap<String, OwnedTaskInterventionState>,
180    pub(super) intervention_cooldowns: HashMap<String, Instant>,
181    pub(super) channels: HashMap<String, Box<dyn Channel>>,
182    pub(super) nudges: HashMap<String, NudgeSchedule>,
183    pub(super) discord_bot: Option<super::discord::DiscordBot>,
184    pub(super) discord_event_cursor: usize,
185    pub(super) telegram_bot: Option<super::telegram::TelegramBot>,
186    pub(super) failure_tracker: FailureTracker,
187    pub(super) event_sink: EventSink,
188    pub(super) paused_standups: HashSet<String>,
189    pub(super) last_standup: HashMap<String, Instant>,
190    pub(super) last_board_rotation: Instant,
191    pub(super) last_auto_archive: Instant,
192    pub(super) last_auto_dispatch: Instant,
193    pub(super) pipeline_starvation_fired: bool,
194    pub(super) pipeline_starvation_last_fired: Option<Instant>,
195    pub(super) planning_cycle_last_fired: Option<Instant>,
196    pub(super) planning_cycle_active: bool,
197    pub(super) retro_generated: bool,
198    pub(super) failed_deliveries: Vec<FailedDelivery>,
199    pub(super) review_first_seen: HashMap<u32, u64>,
200    pub(super) review_nudge_sent: HashSet<u32>,
201    pub(super) poll_cycle_count: u64,
202    pub(super) poll_interval: Duration,
203    /// Errors recorded during the current `tick()` call. Cleared at the
204    /// start of each tick and drained into the returned `TickReport`.
205    /// Always populated (cheap: empty `Vec` cost), so non-test builds can
206    /// also surface tick-level diagnostics from the future
207    /// `batty debug tick` subcommand.
208    pub(super) current_tick_errors: Vec<(String, String)>,
209    pub(super) is_git_repo: bool,
210    /// True when the project root is not a git repo but contains git sub-repos.
211    pub(super) is_multi_repo: bool,
212    /// Cached list of sub-repo directory names (relative to project root) for multi-repo projects.
213    pub(super) sub_repo_names: Vec<String>,
214    /// Consecutive error counts per recoverable subsystem name.
215    pub(super) subsystem_error_counts: HashMap<String, u32>,
216    pub(super) auto_merge_overrides: HashMap<u32, bool>,
217    /// Tracks recent (task_id, engineer) dispatch pairs for deduplication.
218    pub(super) recent_dispatches: HashMap<(u32, String), Instant>,
219    /// Tracks recent escalation keys to suppress repeated alerts.
220    pub(super) recent_escalations: HashMap<String, Instant>,
221    /// SQLite telemetry database connection (None if open failed).
222    pub(super) telemetry_db: Option<rusqlite::Connection>,
223    /// Timestamp of the last manual assignment per engineer (for cooldown).
224    pub(super) manual_assign_cooldowns: HashMap<String, Instant>,
225    /// Per-member agent backend health state.
226    pub(super) backend_health: HashMap<String, BackendHealth>,
227    /// Rolling capture history used to detect narration loops.
228    pub(super) narration_tracker: health::narration::NarrationTracker,
229    /// Per-session output tracking used for proactive context-pressure handling.
230    pub(super) context_pressure_tracker: health::context::ContextPressureTracker,
231    /// When the last periodic health check was run.
232    pub(super) last_health_check: Instant,
233    /// Rate-limiting: last time each engineer received an uncommitted-work warning.
234    pub(super) last_uncommitted_warn: HashMap<String, Instant>,
235    /// Last time the daemon checked for stale per-worktree cargo targets to prune.
236    pub(super) last_shared_target_cleanup: Instant,
237    /// Last time the daemon ran a full disk hygiene pass.
238    pub(super) last_disk_hygiene_check: Instant,
239    /// Per-engineer completion verification loop state.
240    pub(super) verification_states: HashMap<String, VerificationState>,
241    /// Tracks consecutive narration-only rejections per task (commits exist
242    /// but the branch still has no file diff). After threshold, escalates.
243    pub(super) narration_rejection_counts: HashMap<u32, u32>,
244    /// Messages deferred because the target agent was still starting.
245    /// Drained automatically when the agent transitions to ready.
246    pub(super) pending_delivery_queue: HashMap<String, Vec<PendingMessage>>,
247    /// Per-agent shim handles (only populated when `use_shim` is true).
248    pub(super) shim_handles: HashMap<String, agent_handle::AgentHandle>,
249    /// When the last shim health check (Ping) was sent.
250    pub(super) last_shim_health_check: Instant,
251    /// Serial daemon-owned merge queue for auto-merge execution.
252    pub(super) merge_queue: MergeQueue,
253}
254
255#[cfg(any(test, feature = "scenario-test"))]
256impl TeamDaemon {
257    /// Acquire the scenario framework's test-API hooks. Gated by
258    /// `#[cfg(any(test, feature = "scenario-test"))]` so it does not
259    /// exist in release builds. See
260    /// [`scenario_api::ScenarioHooks`](crate::team::daemon::scenario_api::ScenarioHooks)
261    /// for the list of supported operations.
262    pub fn scenario_hooks(&mut self) -> scenario_api::ScenarioHooks<'_> {
263        scenario_api::ScenarioHooks::new(self)
264    }
265}
266
267impl TeamDaemon {
268    pub(in crate::team) fn report_preserve_failure(
269        &mut self,
270        member_name: &str,
271        task_id: Option<u32>,
272        context: &str,
273        detail: &str,
274    ) {
275        let reason = match task_id {
276            Some(task_id) => format!(
277                "Task #{task_id} is blocked because Batty could not safely auto-save {member_name}'s dirty worktree before {context}. {detail}"
278            ),
279            None => format!(
280                "Batty could not safely auto-save {member_name}'s dirty worktree before {context}. {detail}"
281            ),
282        };
283
284        // Deduplicate repeated preserve-failure alerts for the same
285        // (member, task, context, detail). Without this, the daemon fires the
286        // same alert to engineer + manager on every reconciliation cycle as
287        // long as the stale branch condition persists, creating tight
288        // acknowledgement loops that flood the inbox without forward progress.
289        let detail_digest: u64 = {
290            use std::collections::hash_map::DefaultHasher;
291            use std::hash::{Hash, Hasher};
292            let mut hasher = DefaultHasher::new();
293            detail.hash(&mut hasher);
294            hasher.finish()
295        };
296        let task_key = task_id
297            .map(|id| id.to_string())
298            .unwrap_or_else(|| "-".to_string());
299        let dedup_key = format!("preserve:{member_name}:{task_key}:{context}:{detail_digest}");
300        if self.suppress_recent_escalation(dedup_key, Duration::from_secs(600)) {
301            return;
302        }
303
304        if let Some(task_id) = task_id {
305            if let Err(error) =
306                task_cmd::block_task_with_reason(&self.board_dir(), task_id, &reason)
307            {
308                warn!(
309                    member = member_name,
310                    task_id,
311                    error = %error,
312                    "failed to block task after dirty worktree preservation failure"
313                );
314            }
315        }
316        let manager = self.assignment_sender(member_name);
317        let _ = self.queue_daemon_message(member_name, &reason);
318        let _ = self.queue_daemon_message(&manager, &reason);
319        self.record_orchestrator_action(format!("blocked recovery: {reason}"));
320    }
321
322    #[allow(dead_code)]
323    pub(super) fn preserve_member_worktree(
324        &mut self,
325        member_name: &str,
326        commit_message: &str,
327    ) -> bool {
328        let policy = &self.config.team_config.workflow_policy;
329        if !policy.auto_commit_on_restart {
330            return false;
331        }
332
333        let Some(member) = self
334            .config
335            .members
336            .iter()
337            .find(|member| member.name == member_name)
338        else {
339            return false;
340        };
341        if member.role_type != RoleType::Engineer || !member.use_worktrees {
342            return false;
343        }
344
345        let worktree_dir = self
346            .config
347            .project_root
348            .join(".batty")
349            .join("worktrees")
350            .join(member_name);
351        if !worktree_dir.exists() {
352            return false;
353        }
354
355        match preserve_worktree_with_commit(
356            &worktree_dir,
357            commit_message,
358            Duration::from_secs(policy.graceful_shutdown_timeout_secs),
359        ) {
360            Ok(saved) => {
361                if saved {
362                    info!(
363                        member = member_name,
364                        worktree = %worktree_dir.display(),
365                        "auto-saved worktree before restart/shutdown"
366                    );
367                }
368                saved
369            }
370            Err(error) => {
371                warn!(
372                    member = member_name,
373                    worktree = %worktree_dir.display(),
374                    error = %error,
375                    "failed to auto-save worktree before restart/shutdown"
376                );
377                self.report_preserve_failure(
378                    member_name,
379                    self.active_task_id(member_name),
380                    "restart or shutdown",
381                    &error.to_string(),
382                );
383                false
384            }
385        }
386    }
387
388    #[allow(dead_code)]
389    pub(super) fn watcher_mut(&mut self, name: &str) -> Result<&mut SessionWatcher> {
390        self.watchers
391            .get_mut(name)
392            .with_context(|| format!("watcher registry missing member '{name}'"))
393    }
394
395    /// Create a new daemon from resolved config and layout.
396    pub fn new(config: DaemonConfig) -> Result<Self> {
397        let is_git_repo = super::git_cmd::is_git_repo(&config.project_root);
398        let (is_multi_repo, sub_repo_names) = if is_git_repo {
399            (false, Vec::new())
400        } else {
401            let subs = super::git_cmd::discover_sub_repos(&config.project_root);
402            if subs.is_empty() {
403                (false, Vec::new())
404            } else {
405                let names: Vec<String> = subs
406                    .iter()
407                    .filter_map(|p| p.file_name().map(|n| n.to_string_lossy().to_string()))
408                    .collect();
409                info!(
410                    sub_repos = ?names,
411                    "Detected multi-repo project with {} sub-repos",
412                    names.len()
413                );
414                (true, names)
415            }
416        };
417        if !is_git_repo && !is_multi_repo {
418            info!("Project is not a git repository \u{2014} git operations disabled");
419        }
420
421        let team_config_dir = config.project_root.join(".batty").join("team_config");
422        let events_path = team_config_dir.join("events.jsonl");
423        let event_sink =
424            EventSink::new_with_max_bytes(&events_path, config.team_config.event_log_max_bytes)?;
425
426        // Create watchers for each pane member
427        let mut watchers = HashMap::new();
428        let stale_secs = config.team_config.standup.interval_secs * 2;
429        for (name, pane_id) in &config.pane_map {
430            let session_tracker = config
431                .members
432                .iter()
433                .find(|member| member.name == *name)
434                .and_then(|member| member_session_tracker_config(&config.project_root, member));
435            watchers.insert(
436                name.clone(),
437                SessionWatcher::new(pane_id, name, stale_secs, session_tracker),
438            );
439        }
440
441        // Create channels for user roles
442        let mut channels: HashMap<String, Box<dyn Channel>> = HashMap::new();
443        for role in &config.team_config.roles {
444            if role.role_type == RoleType::User {
445                if let (Some(ch_type), Some(ch_config)) = (&role.channel, &role.channel_config) {
446                    match comms::channel_from_config(ch_type, ch_config) {
447                        Ok(ch) => {
448                            channels.insert(role.name.clone(), ch);
449                        }
450                        Err(e) => {
451                            warn!(role = %role.name, error = %e, "failed to create channel");
452                        }
453                    }
454                }
455            }
456        }
457
458        // Create Discord bot for inbound polling and event mirroring (if configured)
459        let discord_bot = discord_bridge::build_discord_bot(&config.team_config);
460        // Create Telegram bot for inbound polling (if configured)
461        let telegram_bot = telegram_bridge::build_telegram_bot(&config.team_config);
462        let narration_detection_enabled = config
463            .team_config
464            .workflow_policy
465            .narration_detection_enabled;
466        let narration_threshold_polls =
467            config.team_config.workflow_policy.narration_threshold_polls;
468
469        let states = HashMap::new();
470
471        // Build nudge schedules from role configs + prompt files
472        let mut nudges = HashMap::new();
473        for role in &config.team_config.roles {
474            if let Some(interval_secs) = role.nudge_interval_secs {
475                let prompt_path =
476                    role_prompt_path(&team_config_dir, role.prompt.as_deref(), role.role_type);
477                if let Some(nudge_text) = extract_nudge_section(&prompt_path) {
478                    // Apply nudge to all instances of this role
479                    let instance_names: Vec<String> = config
480                        .members
481                        .iter()
482                        .filter(|m| m.role_name == role.name)
483                        .map(|m| m.name.clone())
484                        .collect();
485                    for name in instance_names {
486                        info!(member = %name, interval_secs, "registered nudge");
487                        nudges.insert(
488                            name,
489                            NudgeSchedule {
490                                text: nudge_text.clone(),
491                                interval: Duration::from_secs(interval_secs),
492                                // All roles start idle, so begin the countdown
493                                idle_since: Some(Instant::now()),
494                                fired_this_idle: false,
495                                paused: false,
496                            },
497                        );
498                    }
499                }
500            }
501        }
502
503        // Open telemetry database (best-effort — log and continue if it fails).
504        let telemetry_db = match super::telemetry_db::open(&config.project_root) {
505            Ok(conn) => {
506                info!("telemetry database opened");
507                Some(conn)
508            }
509            Err(error) => {
510                warn!(error = %error, "failed to open telemetry database; telemetry disabled");
511                None
512            }
513        };
514
515        let context_pressure_threshold = config
516            .team_config
517            .workflow_policy
518            .context_pressure_threshold;
519        let context_pressure_threshold_bytes = config
520            .team_config
521            .workflow_policy
522            .context_pressure_threshold_bytes;
523
524        Ok(Self {
525            config,
526            watchers,
527            states,
528            idle_started_at: HashMap::new(),
529            active_tasks: HashMap::new(),
530            retry_counts: HashMap::new(),
531            dispatch_queue: Vec::new(),
532            triage_idle_epochs: HashMap::new(),
533            triage_interventions: HashMap::new(),
534            owned_task_interventions: HashMap::new(),
535            intervention_cooldowns: HashMap::new(),
536            channels,
537            nudges,
538            discord_bot,
539            // Skip backlog on startup — only send events that happen after boot.
540            // Reading the entire history on first start hammers Discord API with
541            // hundreds of events and triggers rate limits (429).
542            discord_event_cursor: crate::team::events::read_events(event_sink.path())
543                .map(|events| events.len())
544                .unwrap_or(0),
545            telegram_bot,
546            failure_tracker: FailureTracker::new(20),
547            event_sink,
548            paused_standups: HashSet::new(),
549            last_standup: HashMap::new(),
550            last_board_rotation: Instant::now(),
551            last_auto_archive: Instant::now(),
552            last_auto_dispatch: Instant::now(),
553            pipeline_starvation_fired: false,
554            pipeline_starvation_last_fired: None,
555            planning_cycle_last_fired: None,
556            planning_cycle_active: false,
557            retro_generated: false,
558            failed_deliveries: Vec::new(),
559            review_first_seen: HashMap::new(),
560            review_nudge_sent: HashSet::new(),
561            poll_cycle_count: 0,
562            current_tick_errors: Vec::new(),
563            poll_interval: Duration::from_secs(5),
564            is_git_repo,
565            is_multi_repo,
566            sub_repo_names,
567            subsystem_error_counts: HashMap::new(),
568            auto_merge_overrides: HashMap::new(),
569            recent_dispatches: HashMap::new(),
570            recent_escalations: HashMap::new(),
571            telemetry_db,
572            manual_assign_cooldowns: HashMap::new(),
573            backend_health: HashMap::new(),
574            narration_tracker: health::narration::NarrationTracker::new(
575                narration_detection_enabled,
576                narration_threshold_polls,
577            ),
578            context_pressure_tracker: health::context::ContextPressureTracker::new(
579                context_pressure_threshold,
580                context_pressure_threshold_bytes,
581            ),
582            // Start far enough in the past to trigger an immediate check.
583            last_health_check: Instant::now() - Duration::from_secs(3600),
584            last_uncommitted_warn: HashMap::new(),
585            last_shared_target_cleanup: Instant::now() - Duration::from_secs(3600),
586            last_disk_hygiene_check: Instant::now() - Duration::from_secs(3600),
587            verification_states: HashMap::new(),
588            narration_rejection_counts: HashMap::new(),
589            pending_delivery_queue: HashMap::new(),
590            shim_handles: HashMap::new(),
591            last_shim_health_check: Instant::now(),
592            merge_queue: MergeQueue::default(),
593        })
594    }
595
596    pub(crate) fn suppress_recent_escalation(
597        &mut self,
598        key: impl Into<String>,
599        window: Duration,
600    ) -> bool {
601        let now = Instant::now();
602        self.recent_escalations
603            .retain(|_, seen_at| now.duration_since(*seen_at) < window);
604
605        let key = key.into();
606        if self.recent_escalations.contains_key(&key) {
607            return true;
608        }
609
610        self.recent_escalations.insert(key, now);
611        false
612    }
613
614    pub(super) fn member_nudge_text(&self, member: &MemberInstance) -> Option<String> {
615        let prompt_path = role_prompt_path(
616            &super::team_config_dir(&self.config.project_root),
617            member.prompt.as_deref(),
618            member.role_type,
619        );
620        extract_nudge_section(&prompt_path)
621    }
622
623    pub(super) fn prepend_member_nudge(
624        &self,
625        member: &MemberInstance,
626        body: impl AsRef<str>,
627    ) -> String {
628        let body = body.as_ref();
629        match self.member_nudge_text(member) {
630            Some(nudge) => format!("{nudge}\n\n{body}"),
631            None => body.to_string(),
632        }
633    }
634
635    pub(super) fn mark_member_working(&mut self, member_name: &str) {
636        // When shim mode is active, the shim is the single source of truth for
637        // agent state. Speculative mark_member_working calls from delivery,
638        // completion, and interventions must not override the shim's verdict —
639        // doing so causes the daemon to permanently think the agent is "working"
640        // when the shim classifier sees it as idle.
641        if self.shim_handles.contains_key(member_name) {
642            return;
643        }
644        self.states
645            .insert(member_name.to_string(), MemberState::Working);
646        if let Some(watcher) = self.watchers.get_mut(member_name) {
647            watcher.activate();
648        }
649        self.update_automation_timers_for_state(member_name, MemberState::Working);
650    }
651
652    pub(super) fn set_member_idle(&mut self, member_name: &str) {
653        // For shim agents: don't override the state (shim is source of truth),
654        // but DO update automation timers so idle_started_at gets populated.
655        // Without this, interventions like review_backlog never fire because
656        // automation_idle_grace_elapsed returns false.
657        if self.shim_handles.contains_key(member_name) {
658            if self.states.get(member_name) == Some(&MemberState::Idle) {
659                self.update_automation_timers_for_state(member_name, MemberState::Idle);
660            }
661            return;
662        }
663        self.states
664            .insert(member_name.to_string(), MemberState::Idle);
665        if let Some(watcher) = self.watchers.get_mut(member_name) {
666            watcher.deactivate();
667        }
668        self.update_automation_timers_for_state(member_name, MemberState::Idle);
669    }
670
671    pub(super) fn active_task_id(&self, engineer: &str) -> Option<u32> {
672        self.active_tasks.get(engineer).copied()
673    }
674
675    pub(super) fn preserve_worktree_before_restart(
676        &mut self,
677        member_name: &str,
678        worktree_dir: &Path,
679        reason: &str,
680    ) {
681        let Some(member) = self
682            .config
683            .members
684            .iter()
685            .find(|member| member.name == member_name)
686        else {
687            return;
688        };
689        if member.role_type != RoleType::Engineer || !member.use_worktrees {
690            return;
691        }
692        if !self
693            .config
694            .team_config
695            .workflow_policy
696            .auto_commit_on_restart
697            || !worktree_dir.exists()
698        {
699            return;
700        }
701
702        let timeout = Duration::from_secs(
703            self.config
704                .team_config
705                .workflow_policy
706                .graceful_shutdown_timeout_secs,
707        );
708        match preserve_worktree_with_commit(
709            worktree_dir,
710            "wip: auto-save before restart [batty]",
711            timeout,
712        ) {
713            Ok(true) => info!(
714                member = member_name,
715                worktree = %worktree_dir.display(),
716                reason,
717                "auto-saved dirty worktree before restart"
718            ),
719            Ok(false) => {}
720            Err(error) => {
721                warn!(
722                    member = member_name,
723                    worktree = %worktree_dir.display(),
724                    reason,
725                    error = %error,
726                    "failed to auto-save dirty worktree before restart"
727                );
728                self.report_preserve_failure(
729                    member_name,
730                    self.active_task_id(member_name),
731                    reason,
732                    &error.to_string(),
733                );
734            }
735        }
736    }
737
738    pub(super) fn project_root(&self) -> &Path {
739        &self.config.project_root
740    }
741
742    #[cfg(test)]
743    pub(super) fn set_auto_merge_override(&mut self, task_id: u32, enabled: bool) {
744        self.auto_merge_overrides.insert(task_id, enabled);
745    }
746
747    pub(super) fn auto_merge_override(&self, task_id: u32) -> Option<bool> {
748        // In-memory overrides take priority, then check disk
749        if let Some(&value) = self.auto_merge_overrides.get(&task_id) {
750            return Some(value);
751        }
752        let disk_overrides = super::auto_merge::load_overrides(&self.config.project_root);
753        disk_overrides.get(&task_id).copied()
754    }
755
756    pub(super) fn worktree_dir(&self, engineer: &str) -> PathBuf {
757        let base = self.config.project_root.join(".batty").join("worktrees");
758        match self.member_barrier_group(engineer) {
759            Some(group) if self.config.team_config.workflow_policy.clean_room_mode => {
760                base.join(group).join(engineer)
761            }
762            _ => base.join(engineer),
763        }
764    }
765
766    pub(super) fn board_dir(&self) -> PathBuf {
767        self.config
768            .project_root
769            .join(".batty")
770            .join("team_config")
771            .join("board")
772    }
773
774    pub(super) fn member_uses_worktrees(&self, engineer: &str) -> bool {
775        if !self.is_git_repo && !self.is_multi_repo {
776            return false;
777        }
778        self.config
779            .members
780            .iter()
781            .find(|member| member.name == engineer)
782            .map(|member| member.use_worktrees)
783            .unwrap_or(false)
784    }
785
786    pub(super) fn handoff_dir(&self) -> PathBuf {
787        self.config.project_root.join(
788            self.config
789                .team_config
790                .workflow_policy
791                .handoff_directory
792                .as_str(),
793        )
794    }
795
796    pub(super) fn member_barrier_group(&self, member_name: &str) -> Option<&str> {
797        let member = self
798            .config
799            .members
800            .iter()
801            .find(|member| member.name == member_name)?;
802        self.config
803            .team_config
804            .role_barrier_group(&member.role_name)
805    }
806
807    fn barrier_worktree_root(&self, barrier_group: &str) -> PathBuf {
808        self.config
809            .project_root
810            .join(".batty")
811            .join("worktrees")
812            .join(barrier_group)
813    }
814
815    #[cfg_attr(not(test), allow(dead_code))]
816    pub(super) fn analysis_dir(&self, member_name: &str) -> Result<PathBuf> {
817        let Some(group) = self.member_barrier_group(member_name) else {
818            bail!(
819                "member '{}' is not assigned to a clean-room barrier group",
820                member_name
821            );
822        };
823        if group != "analysis" {
824            bail!(
825                "member '{}' is in barrier group '{}' and cannot write analysis artifacts",
826                member_name,
827                group
828            );
829        }
830
831        Ok(self.worktree_dir(member_name).join("analysis"))
832    }
833
834    pub(super) fn validate_member_work_dir(
835        &self,
836        member_name: &str,
837        work_dir: &Path,
838    ) -> Result<()> {
839        if !self.config.team_config.workflow_policy.clean_room_mode {
840            return Ok(());
841        }
842
843        let expected = self.worktree_dir(member_name);
844        if work_dir == expected {
845            return Ok(());
846        }
847        bail!(
848            "clean-room barrier violation: member '{}' launch dir '{}' does not match '{}'",
849            member_name,
850            work_dir.display(),
851            expected.display()
852        );
853    }
854
855    #[cfg_attr(not(test), allow(dead_code))]
856    pub(super) fn validate_member_barrier_path(
857        &mut self,
858        member_name: &str,
859        path: &Path,
860        access: &str,
861    ) -> Result<()> {
862        if !self.config.team_config.workflow_policy.clean_room_mode {
863            return Ok(());
864        }
865
866        let Some(group) = self.member_barrier_group(member_name) else {
867            return Ok(());
868        };
869        let member_root = self.worktree_dir(member_name);
870        let barrier_root = self.barrier_worktree_root(group);
871        let handoff_root = self.handoff_dir();
872        if path.starts_with(&member_root)
873            || path.starts_with(&barrier_root)
874            || path.starts_with(&handoff_root)
875        {
876            return Ok(());
877        }
878
879        self.record_barrier_violation_attempt(
880            member_name,
881            &path.display().to_string(),
882            &format!("{access} outside barrier group '{group}'"),
883        );
884        bail!(
885            "clean-room barrier violation: '{}' cannot {} '{}'",
886            member_name,
887            access,
888            path.display()
889        );
890    }
891
892    #[cfg_attr(not(test), allow(dead_code))]
893    pub(super) fn write_handoff_artifact(
894        &mut self,
895        author_role: &str,
896        relative_path: &Path,
897        content: &[u8],
898    ) -> Result<PathBuf> {
899        if relative_path.is_absolute()
900            || relative_path
901                .components()
902                .any(|component| matches!(component, std::path::Component::ParentDir))
903        {
904            self.record_barrier_violation_attempt(
905                author_role,
906                &relative_path.display().to_string(),
907                "handoff writes must stay within the shared handoff directory",
908            );
909            bail!(
910                "invalid handoff artifact path '{}': must be relative and stay under handoff/",
911                relative_path.display()
912            );
913        }
914        let handoff_root = self.handoff_dir();
915        let artifact_path = handoff_root.join(relative_path);
916        let Some(parent) = artifact_path.parent() else {
917            bail!(
918                "handoff artifact path '{}' has no parent",
919                artifact_path.display()
920            );
921        };
922        std::fs::create_dir_all(parent)
923            .with_context(|| format!("failed to create {}", parent.display()))?;
924        std::fs::write(&artifact_path, content)
925            .with_context(|| format!("failed to write {}", artifact_path.display()))?;
926
927        let content_hash = format!("{:x}", Sha256::digest(content));
928        self.record_barrier_artifact_created(
929            author_role,
930            &artifact_path.display().to_string(),
931            &content_hash,
932        );
933        Ok(artifact_path)
934    }
935
936    #[cfg_attr(not(test), allow(dead_code))]
937    pub(super) fn write_analysis_artifact(
938        &mut self,
939        author_role: &str,
940        relative_path: &Path,
941        content: &[u8],
942    ) -> Result<PathBuf> {
943        if relative_path.is_absolute()
944            || relative_path
945                .components()
946                .any(|component| matches!(component, std::path::Component::ParentDir))
947        {
948            self.record_barrier_violation_attempt(
949                author_role,
950                &relative_path.display().to_string(),
951                "analysis artifact writes must stay within the analysis worktree",
952            );
953            bail!(
954                "invalid analysis artifact path '{}': must be relative and stay under analysis/",
955                relative_path.display()
956            );
957        }
958
959        let artifact_path = self.analysis_dir(author_role)?.join(relative_path);
960        let Some(parent) = artifact_path.parent() else {
961            bail!(
962                "analysis artifact path '{}' has no parent",
963                artifact_path.display()
964            );
965        };
966        std::fs::create_dir_all(parent)
967            .with_context(|| format!("failed to create {}", parent.display()))?;
968        std::fs::write(&artifact_path, content)
969            .with_context(|| format!("failed to write {}", artifact_path.display()))?;
970
971        let content_hash = format!("{:x}", Sha256::digest(content));
972        self.record_barrier_artifact_created(
973            author_role,
974            &artifact_path.display().to_string(),
975            &content_hash,
976        );
977        Ok(artifact_path)
978    }
979
980    #[cfg_attr(not(test), allow(dead_code))]
981    pub(super) fn run_skoolkit_disassembly(
982        &mut self,
983        author_role: &str,
984        snapshot_path: &Path,
985        output_relative_path: &Path,
986    ) -> Result<PathBuf> {
987        let snapshot_extension = snapshot_path
988            .extension()
989            .and_then(|ext| ext.to_str())
990            .map(|ext| ext.to_ascii_lowercase());
991        if !matches!(snapshot_extension.as_deref(), Some("z80" | "sna")) {
992            bail!(
993                "unsupported SkoolKit snapshot '{}': expected .z80 or .sna",
994                snapshot_path.display()
995            );
996        }
997
998        let sna2skool =
999            std::env::var("BATTY_SKOOLKIT_SNA2SKOOL").unwrap_or_else(|_| "sna2skool".to_string());
1000        let output = std::process::Command::new(&sna2skool)
1001            .arg(snapshot_path)
1002            .output()
1003            .with_context(|| {
1004                format!(
1005                    "failed to launch '{}' for snapshot '{}'",
1006                    sna2skool,
1007                    snapshot_path.display()
1008                )
1009            })?;
1010        if !output.status.success() {
1011            bail!(
1012                "SkoolKit disassembly failed for '{}': {}",
1013                snapshot_path.display(),
1014                String::from_utf8_lossy(&output.stderr).trim()
1015            );
1016        }
1017
1018        self.write_analysis_artifact(author_role, output_relative_path, &output.stdout)
1019    }
1020
1021    #[cfg_attr(not(test), allow(dead_code))]
1022    pub(super) fn run_ghidra_disassembly(
1023        &mut self,
1024        author_role: &str,
1025        binary_path: &Path,
1026        output_relative_path: &Path,
1027    ) -> Result<PathBuf> {
1028        let backend = CleanroomBackend::detect(binary_path)?;
1029        if backend != CleanroomBackend::Ghidra {
1030            bail!(
1031                "unsupported Ghidra target '{}': expected .nes, .gb, .gbc, .com, or .exe",
1032                binary_path.display()
1033            );
1034        }
1035
1036        let analyze_headless = std::env::var("BATTY_GHIDRA_HEADLESS")
1037            .unwrap_or_else(|_| "analyzeHeadless".to_string());
1038        let output = std::process::Command::new(&analyze_headless)
1039            .arg(binary_path)
1040            .output()
1041            .with_context(|| {
1042                format!(
1043                    "failed to launch '{}' for target '{}'",
1044                    analyze_headless,
1045                    binary_path.display()
1046                )
1047            })?;
1048        if !output.status.success() {
1049            bail!(
1050                "Ghidra disassembly failed for '{}': {}",
1051                binary_path.display(),
1052                String::from_utf8_lossy(&output.stderr).trim()
1053            );
1054        }
1055
1056        self.write_analysis_artifact(author_role, output_relative_path, &output.stdout)
1057    }
1058
1059    #[cfg_attr(not(test), allow(dead_code))]
1060    pub(super) fn run_cleanroom_disassembly(
1061        &mut self,
1062        author_role: &str,
1063        input_path: &Path,
1064        output_relative_path: &Path,
1065    ) -> Result<PathBuf> {
1066        match CleanroomBackend::detect(input_path)? {
1067            CleanroomBackend::SkoolKit => {
1068                self.run_skoolkit_disassembly(author_role, input_path, output_relative_path)
1069            }
1070            CleanroomBackend::Ghidra => {
1071                self.run_ghidra_disassembly(author_role, input_path, output_relative_path)
1072            }
1073        }
1074    }
1075
1076    #[cfg_attr(not(test), allow(dead_code))]
1077    pub(super) fn read_handoff_artifact(
1078        &mut self,
1079        reader_role: &str,
1080        relative_path: &Path,
1081    ) -> Result<Vec<u8>> {
1082        if relative_path.is_absolute()
1083            || relative_path
1084                .components()
1085                .any(|component| matches!(component, std::path::Component::ParentDir))
1086        {
1087            self.record_barrier_violation_attempt(
1088                reader_role,
1089                &relative_path.display().to_string(),
1090                "handoff reads must stay within the shared handoff directory",
1091            );
1092            bail!(
1093                "invalid handoff artifact path '{}': must be relative and stay under handoff/",
1094                relative_path.display()
1095            );
1096        }
1097        let artifact_path = self.handoff_dir().join(relative_path);
1098        self.validate_member_barrier_path(reader_role, &artifact_path, "read")?;
1099        let content = std::fs::read(&artifact_path)
1100            .with_context(|| format!("failed to read {}", artifact_path.display()))?;
1101        let content_hash = format!("{:x}", Sha256::digest(&content));
1102        self.record_barrier_artifact_read(
1103            reader_role,
1104            &artifact_path.display().to_string(),
1105            &content_hash,
1106        );
1107        Ok(content)
1108    }
1109
1110    pub(super) fn manager_name(&self, engineer: &str) -> Option<String> {
1111        self.config
1112            .members
1113            .iter()
1114            .find(|member| member.name == engineer)
1115            .and_then(|member| member.reports_to.clone())
1116    }
1117
1118    pub(super) fn architect_names(&self) -> Vec<String> {
1119        self.config
1120            .members
1121            .iter()
1122            .filter(|member| member.role_type == RoleType::Architect)
1123            .map(|member| member.name.clone())
1124            .collect()
1125    }
1126
1127    #[cfg(test)]
1128    pub(super) fn set_active_task_for_test(&mut self, engineer: &str, task_id: u32) {
1129        self.active_tasks.insert(engineer.to_string(), task_id);
1130    }
1131
1132    #[cfg(test)]
1133    pub(super) fn retry_count_for_test(&self, engineer: &str) -> Option<u32> {
1134        self.retry_counts.get(engineer).copied()
1135    }
1136
1137    #[cfg(test)]
1138    pub(super) fn member_state_for_test(&self, engineer: &str) -> Option<MemberState> {
1139        self.states.get(engineer).copied()
1140    }
1141
1142    #[cfg(test)]
1143    pub(super) fn set_member_state_for_test(&mut self, engineer: &str, state: MemberState) {
1144        self.states.insert(engineer.to_string(), state);
1145    }
1146
1147    #[cfg(test)]
1148    pub(crate) fn queued_merge_count_for_test(&self) -> usize {
1149        self.merge_queue.queued_len()
1150    }
1151
1152    #[cfg(test)]
1153    pub(crate) fn process_merge_queue_for_test(&mut self) -> Result<()> {
1154        self.process_merge_queue()
1155    }
1156
1157    pub(super) fn increment_retry(&mut self, engineer: &str) -> u32 {
1158        let count = self.retry_counts.entry(engineer.to_string()).or_insert(0);
1159        *count += 1;
1160        *count
1161    }
1162
1163    pub(super) fn response_is_stalled_mid_turn(&self, response: &str) -> bool {
1164        response
1165            .lines()
1166            .next()
1167            .is_some_and(|line| line.contains(STALLED_MID_TURN_MARKER))
1168    }
1169
1170    pub(super) fn handle_stalled_mid_turn_completion(
1171        &mut self,
1172        member_name: &str,
1173        response: &str,
1174    ) -> Result<bool> {
1175        if !self.response_is_stalled_mid_turn(response) {
1176            return Ok(false);
1177        }
1178
1179        let attempt = self.increment_retry(member_name);
1180        if let Some(backoff_secs) = stalled_mid_turn_backoff_secs(attempt) {
1181            warn!(
1182                member = member_name,
1183                attempt,
1184                backoff_secs,
1185                "shim reported stalled mid-turn completion; retrying after backoff"
1186            );
1187            self.record_orchestrator_action(format!(
1188                "stall: shim reported stalled mid-turn for {member_name}; retry {attempt} after {backoff_secs}s"
1189            ));
1190            sleep_stalled_mid_turn_backoff(Duration::from_secs(backoff_secs));
1191
1192            let retry_notice = format!(
1193                "Claude SDK stalled mid-turn and Batty released the stuck turn. Waited {backoff_secs}s before retrying.\n{response}\n\nContinue from the current worktree state. Do not restart or discard prior work unless the task requires it."
1194            );
1195            self.queue_message("daemon", member_name, &retry_notice)?;
1196            self.mark_member_working(member_name);
1197            return Ok(true);
1198        }
1199
1200        warn!(
1201            member = member_name,
1202            attempt, "shim reported stalled mid-turn completion; restarting agent"
1203        );
1204        self.record_orchestrator_action(format!(
1205            "stall: shim reported stalled mid-turn for {member_name}; restarting on attempt {attempt}"
1206        ));
1207        self.restart_member_with_task_context(member_name, "stalled mid-turn")?;
1208        if let Some(task_id) = self.active_task_id(member_name) {
1209            self.record_agent_restarted(
1210                member_name,
1211                task_id.to_string(),
1212                "stalled_mid_turn",
1213                attempt,
1214            );
1215        }
1216        Ok(true)
1217    }
1218
1219    pub(super) fn clear_active_task(&mut self, engineer: &str) {
1220        if let Some(task_id) = self.active_tasks.remove(engineer) {
1221            self.narration_rejection_counts.remove(&task_id);
1222        }
1223        self.retry_counts.remove(engineer);
1224        self.verification_states.remove(engineer);
1225        // Clean up any progress checkpoint left from a prior restart.
1226        super::checkpoint::remove_checkpoint(&self.config.project_root, engineer);
1227        let work_dir = self
1228            .config
1229            .members
1230            .iter()
1231            .find(|member| member.name == engineer)
1232            .map(|member| self.member_work_dir(member))
1233            .unwrap_or_else(|| self.config.project_root.clone());
1234        super::checkpoint::remove_restart_context(&work_dir);
1235    }
1236
1237    /// Remove active_task entries for tasks that are done, archived, or no longer on the board.
1238    pub(super) fn notify_reports_to(&mut self, from_role: &str, msg: &str) -> Result<()> {
1239        let parent = self
1240            .config
1241            .members
1242            .iter()
1243            .find(|m| m.name == from_role)
1244            .and_then(|m| m.reports_to.clone());
1245        let Some(parent_name) = parent else {
1246            return Ok(());
1247        };
1248        self.queue_message(from_role, &parent_name, msg)?;
1249        self.mark_member_working(&parent_name);
1250        Ok(())
1251    }
1252
1253    pub(super) fn notify_architects(&mut self, msg: &str) -> Result<()> {
1254        for architect in self.architect_names() {
1255            self.queue_message("daemon", &architect, msg)?;
1256            self.mark_member_working(&architect);
1257        }
1258        Ok(())
1259    }
1260
1261    /// Update automation countdowns when a member's state changes.
1262    pub(super) fn update_automation_timers_for_state(
1263        &mut self,
1264        member_name: &str,
1265        new_state: MemberState,
1266    ) {
1267        match new_state {
1268            MemberState::Idle => {
1269                self.idle_started_at
1270                    .insert(member_name.to_string(), Instant::now());
1271            }
1272            MemberState::Working => {
1273                self.idle_started_at.remove(member_name);
1274            }
1275        }
1276        self.update_nudge_for_state(member_name, new_state);
1277        standup::update_timer_for_state(
1278            &self.config.team_config,
1279            &self.config.members,
1280            &mut self.paused_standups,
1281            &mut self.last_standup,
1282            member_name,
1283            new_state,
1284        );
1285        self.update_triage_intervention_for_state(member_name, new_state);
1286    }
1287}
1288
1289fn stalled_mid_turn_backoff_secs(attempt: u32) -> Option<u64> {
1290    STALLED_MID_TURN_RETRY_BACKOFF_SECS
1291        .get(attempt.saturating_sub(1) as usize)
1292        .copied()
1293}
1294
1295#[cfg(not(test))]
1296fn sleep_stalled_mid_turn_backoff(duration: Duration) {
1297    std::thread::sleep(duration);
1298}
1299
1300#[cfg(test)]
1301fn sleep_stalled_mid_turn_backoff(_duration: Duration) {}
1302
1303#[cfg(test)]
1304#[path = "daemon/tests.rs"]
1305mod tests;
1306
1307#[cfg(test)]
1308mod stalled_mid_turn_tests {
1309    use super::*;
1310    use crate::team::inbox;
1311    use crate::team::test_support::{TestDaemonBuilder, engineer_member, write_owned_task_file};
1312
1313    #[test]
1314    fn stalled_mid_turn_backoff_schedule_matches_task_requirements() {
1315        assert_eq!(stalled_mid_turn_backoff_secs(1), Some(30));
1316        assert_eq!(stalled_mid_turn_backoff_secs(2), Some(60));
1317        assert_eq!(stalled_mid_turn_backoff_secs(3), None);
1318    }
1319
1320    #[test]
1321    fn stalled_mid_turn_detection_matches_marker_prefix() {
1322        let tmp = tempfile::tempdir().unwrap();
1323        let daemon = TestDaemonBuilder::new(tmp.path()).build();
1324        assert!(daemon.response_is_stalled_mid_turn(
1325            "stalled mid-turn: no stdout from Claude SDK for 120s while working."
1326        ));
1327        assert!(!daemon.response_is_stalled_mid_turn("normal completion"));
1328    }
1329
1330    #[test]
1331    fn stalled_mid_turn_first_retry_requeues_message_after_backoff() {
1332        let tmp = tempfile::tempdir().unwrap();
1333        let member_name = "eng-1";
1334        write_owned_task_file(tmp.path(), 42, "sdk-stall", "in-progress", member_name);
1335        let inbox_root = inbox::inboxes_root(tmp.path());
1336        inbox::init_inbox(&inbox_root, member_name).unwrap();
1337
1338        let mut daemon = TestDaemonBuilder::new(tmp.path())
1339            .members(vec![engineer_member(member_name, Some("manager"), true)])
1340            .build();
1341        daemon.active_tasks.insert(member_name.to_string(), 42);
1342
1343        let handled = daemon
1344            .handle_stalled_mid_turn_completion(
1345                member_name,
1346                "stalled mid-turn: no stdout from Claude SDK for 120s while working.\nlast_sent_message_from: manager",
1347            )
1348            .unwrap();
1349
1350        assert!(handled);
1351        assert_eq!(daemon.retry_count_for_test(member_name), Some(1));
1352        assert_eq!(
1353            daemon.member_state_for_test(member_name),
1354            Some(MemberState::Working)
1355        );
1356
1357        let inbox_entries = inbox::pending_messages(&inbox_root, member_name).unwrap();
1358        assert_eq!(inbox_entries.len(), 1);
1359        assert!(inbox_entries[0].body.contains("Waited 30s before retrying"));
1360        assert!(inbox_entries[0].body.contains("stalled mid-turn"));
1361    }
1362}