Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, StreamKind, TokenCountInput};
27use super::output::{
28    cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29    cap_final_output_with_marker, json_output_pointer, quote_path, COMPLETION_OUTPUT_PREVIEW_BYTES,
30    COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES, COMPRESS_INPUT_TAIL_BYTES,
31    FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES, RAW_PASSTHROUGH_HEAD_BYTES,
32    RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES, STRUCTURED_OUTPUT_CAP_BYTES,
33};
34use super::persistence::{
35    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
36    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
37    ExitMarker, PersistedTask, TaskPaths,
38};
39use super::process::is_process_alive;
40#[cfg(unix)]
41use super::process::terminate_pgid;
42#[cfg(windows)]
43use super::process::terminate_pid;
44use super::pty_process::spawn_pty_for_command;
45use super::pty_runtime::PtyRuntime;
46use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
47use super::{BgTaskInfo, BgTaskStatus};
48// Note: `resolve_windows_shell` is no longer imported at module scope —
49// production code in `spawn_detached_child` uses `shell_candidates()`
50// with retry instead, and the function remains in `windows_shell.rs`
51// for tests and as a future helper.
52
53/// Default timeout for background bash tasks: 30 minutes.
54/// Agents can override per-call via the `timeout` parameter (in ms).
55const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
56const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
57const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
58const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
59
60/// Completion previews are derived from the per-task terminal render cache using
61/// a small char-boundary-safe head+tail cap. Keep this bounded: completion
62/// reminders may batch multiple tasks into one prompt injection.
63const BG_COMPLETION_PREVIEW_BYTES: usize = COMPLETION_OUTPUT_PREVIEW_BYTES;
64const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
65
66#[derive(Debug, Clone, Serialize)]
67pub struct BgCompletion {
68    pub task_id: String,
69    /// Intentionally omitted from serialized completion payloads: push frames
70    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
71    #[serde(skip_serializing)]
72    pub session_id: String,
73    pub status: BgTaskStatus,
74    pub exit_code: Option<i32>,
75    pub command: String,
76    /// Small head+tail preview of the cached terminal render at completion time,
77    /// cached so push-frame consumers and `bash_drain_completions` callers see
78    /// the same preview without racing against later output rotation. Empty
79    /// when not captured (e.g., persisted task seen on startup before buffer
80    /// reattachment).
81    #[serde(default, skip_serializing_if = "String::is_empty")]
82    pub output_preview: String,
83    /// True when the captured tail is shorter than the actual output (because
84    /// rotation occurred or the output exceeds the preview cap). Plugins use
85    /// this to render a `…` prefix and signal that `bash_status` would return
86    /// more.
87    #[serde(default, skip_serializing_if = "is_false")]
88    pub output_truncated: bool,
89    /// Token count for raw stdout+stderr before compression. Omitted when any
90    /// stream exceeds the 128 KiB tokenization cap.
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub original_tokens: Option<u32>,
93    /// Token count for the compressed output generated from the same capped
94    /// raw payload. Omitted when raw tokenization is skipped.
95    #[serde(default, skip_serializing_if = "Option::is_none")]
96    pub compressed_tokens: Option<u32>,
97    /// True when a stream exceeded the tokenization cap and counts are absent.
98    #[serde(default, skip_serializing_if = "is_false")]
99    pub tokens_skipped: bool,
100}
101
102fn is_false(v: &bool) -> bool {
103    !*v
104}
105
106#[derive(Debug, Clone, Serialize)]
107pub struct BgTaskSnapshot {
108    #[serde(flatten)]
109    pub info: BgTaskInfo,
110    pub exit_code: Option<i32>,
111    pub child_pid: Option<u32>,
112    pub workdir: String,
113    pub output_preview: String,
114    pub output_truncated: bool,
115    pub output_path: Option<String>,
116    pub stderr_path: Option<String>,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    pub pty_rows: Option<u16>,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub pty_cols: Option<u16>,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124enum TerminalOutputKind {
125    Compressed,
126    Raw,
127    Structured,
128}
129
130#[derive(Debug, Clone, PartialEq, Eq)]
131struct TerminalOutputCache {
132    output_preview: String,
133    output_truncated: bool,
134    kind: TerminalOutputKind,
135    output_path: Option<String>,
136    stderr_path: Option<String>,
137    recovery: Option<RecoveryContext>,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141struct RecoveryContext {
142    dropped_by_class: BTreeMap<DropClass, usize>,
143    had_inner_drop: bool,
144    offset_hint_eligible: bool,
145    offset_start_line: Option<usize>,
146    byte_truncated: bool,
147    output_path: Option<String>,
148    stderr_path: Option<String>,
149    include_stderr_path: bool,
150}
151
152impl RecoveryContext {
153    fn has_visible_drop(&self) -> bool {
154        self.byte_truncated || self.had_inner_drop || !self.dropped_by_class.is_empty()
155    }
156}
157
158#[derive(Clone)]
159pub struct BgTaskRegistry {
160    pub(crate) inner: Arc<RegistryInner>,
161}
162
163pub(crate) struct RegistryInner {
164    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
165    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
166    pub(crate) progress_sender: SharedProgressSender,
167    watchdog_started: AtomicBool,
168    pub(crate) shutdown: AtomicBool,
169    pub(crate) long_running_reminder_enabled: AtomicBool,
170    pub(crate) long_running_reminder_interval_ms: AtomicU64,
171    persisted_gc_started: AtomicBool,
172    #[cfg(test)]
173    persisted_gc_runs: AtomicU64,
174    /// Output compression callback. Set by `AppContext` after construction.
175    /// Takes (command, raw_output) and returns compressed text. Called from
176    /// the watchdog thread when a task reaches a terminal state and from
177    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
178    /// uncompressed.
179    pub(crate) compressor:
180        Mutex<Option<Box<dyn Fn(&str, String) -> CompressionResult + Send + Sync>>>,
181    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
182    pub(crate) db_harness: RwLock<Option<String>>,
183    pub(crate) wake_tx: crossbeam_channel::Sender<()>,
184    pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
185    pub(crate) watch_registry: Mutex<WatchRegistry>,
186}
187
188pub(crate) struct BgTask {
189    pub(crate) task_id: String,
190    pub(crate) session_id: String,
191    pub(crate) paths: TaskPaths,
192    pub(crate) started: Instant,
193    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
194    pub(crate) terminal_at: Mutex<Option<Instant>>,
195    pub(crate) state: Mutex<BgTaskState>,
196}
197
198pub(crate) enum TaskRuntime {
199    Piped(Option<Child>),
200    Pty(Option<PtyRuntime>),
201}
202
203pub(crate) struct BgTaskState {
204    pub(crate) metadata: PersistedTask,
205    pub(crate) runtime: TaskRuntime,
206    pub(crate) detached: bool,
207    /// True once `reap_child` has observed the direct child handle's exit
208    /// via `try_wait()`. Used by the two-pass watchdog to skip the racy
209    /// `is_process_alive(child_pid)` probe on the second pass — we already
210    /// have authoritative evidence that the child is dead, no need to
211    /// re-verify via PID liveness which is unreliable on Windows where
212    /// PIDs can be recycled within seconds.
213    ///
214    /// Remains `false` on replay-restored tasks (those have a `child_pid`
215    /// but never observed exit via this process's `try_wait()`), so those
216    /// continue to fall through to the `is_process_alive` probe path.
217    pub(crate) child_exit_observed: bool,
218    pub(crate) buffer: BgBuffer,
219    terminal_output_cache: Option<TerminalOutputCache>,
220    /// PTY-only: set for timeout kill intent before signaling the child.
221    pub(crate) pending_terminal_override: Option<BgTaskStatus>,
222}
223
224impl BgTaskRegistry {
225    pub fn new(progress_sender: SharedProgressSender) -> Self {
226        let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
227        Self {
228            inner: Arc::new(RegistryInner {
229                tasks: Mutex::new(HashMap::new()),
230                completions: Mutex::new(VecDeque::new()),
231                progress_sender,
232                watchdog_started: AtomicBool::new(false),
233                shutdown: AtomicBool::new(false),
234                long_running_reminder_enabled: AtomicBool::new(true),
235                long_running_reminder_interval_ms: AtomicU64::new(600_000),
236                persisted_gc_started: AtomicBool::new(false),
237                #[cfg(test)]
238                persisted_gc_runs: AtomicU64::new(0),
239                compressor: Mutex::new(None),
240                db_pool: RwLock::new(None),
241                db_harness: RwLock::new(None),
242                wake_tx,
243                wake_rx,
244                watch_registry: Mutex::new(WatchRegistry::default()),
245            }),
246        }
247    }
248
249    pub fn set_harness(&self, harness: Harness) {
250        if let Ok(mut slot) = self.inner.db_harness.write() {
251            *slot = Some(harness.as_str().to_string());
252        }
253    }
254
255    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
256        if let Ok(mut slot) = self.inner.db_pool.write() {
257            *slot = Some(conn);
258        }
259    }
260
261    pub fn clear_db_pool(&self) {
262        if let Ok(mut slot) = self.inner.db_pool.write() {
263            *slot = None;
264        }
265    }
266
267    /// Install the output-compression callback. Called by `main.rs` after
268    /// `AppContext` is constructed so that snapshot/completion paths can
269    /// invoke `compress::compress_with_registry` without holding a context
270    /// reference. When called multiple times, the latest installation wins.
271    pub fn set_compressor<F>(&self, compressor: F)
272    where
273        F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
274    {
275        if let Ok(mut slot) = self.inner.compressor.lock() {
276            *slot = Some(Box::new(compressor));
277        }
278    }
279
280    /// Apply the installed compressor (if any) to `output`. Returns `output`
281    /// untouched when no compressor is installed.
282    pub(crate) fn compress_output(&self, command: &str, output: String) -> CompressionResult {
283        let Ok(slot) = self.inner.compressor.lock() else {
284            return CompressionResult::new(output);
285        };
286        match slot.as_ref() {
287            Some(compressor) => compressor(command, output),
288            None => CompressionResult::new(output),
289        }
290    }
291
292    fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
293        let (metadata, buffer) = {
294            let state = task.state.lock().ok()?;
295            if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
296                return None;
297            }
298            if let Some(cache) = state.terminal_output_cache.clone() {
299                return Some(cache);
300            }
301            (state.metadata.clone(), state.buffer.clone())
302        };
303
304        let mut cap_buffer = buffer.clone();
305        cap_buffer.enforce_terminal_cap();
306        let cache = self.render_terminal_output(&metadata, &buffer);
307        let mut state = task.state.lock().ok()?;
308        if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
309            return None;
310        }
311        if let Some(existing) = state.terminal_output_cache.clone() {
312            return Some(existing);
313        }
314        state.terminal_output_cache = Some(cache.clone());
315        Some(cache)
316    }
317
318    fn render_terminal_output(
319        &self,
320        metadata: &PersistedTask,
321        buffer: &BgBuffer,
322    ) -> TerminalOutputCache {
323        if metadata.mode == BgMode::Pty {
324            return TerminalOutputCache {
325                output_preview: String::new(),
326                output_truncated: false,
327                kind: TerminalOutputKind::Raw,
328                output_path: buffer.output_path().map(|path| path.display().to_string()),
329                stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
330                recovery: None,
331            };
332        }
333
334        if let Some(structured) = render_structured_output(&metadata.command, buffer) {
335            return structured;
336        }
337
338        if !metadata.compressed {
339            return render_raw_passthrough(buffer);
340        }
341
342        let raw = buffer.read_combined_head_tail(
343            COMPRESS_INPUT_CAP_BYTES,
344            COMPRESS_INPUT_HEAD_BYTES,
345            COMPRESS_INPUT_TAIL_BYTES,
346        );
347        let compressed = self.compress_output(&metadata.command, raw.text);
348        render_compressed_with_recovery(buffer, compressed, raw.truncated)
349    }
350
351    fn snapshot_with_terminal_cache(
352        &self,
353        task: &Arc<BgTask>,
354        preview_bytes: usize,
355    ) -> BgTaskSnapshot {
356        let mut snapshot = task.snapshot(preview_bytes);
357        self.maybe_compress_snapshot(task, &mut snapshot);
358        snapshot
359    }
360
361    fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
362        let (metadata, buffer) = {
363            let state = task
364                .state
365                .lock()
366                .map_err(|_| "background task lock poisoned".to_string())?;
367            if !state.metadata.status.is_terminal() {
368                return Ok(());
369            }
370            (state.metadata.clone(), state.buffer.clone())
371        };
372
373        let mut cap_buffer = buffer.clone();
374        cap_buffer.enforce_terminal_cap();
375        let cache = self.ensure_terminal_output_cache(task);
376        self.enqueue_completion_from_parts(
377            &metadata,
378            Some(&buffer),
379            None,
380            emit_frame,
381            cache.as_ref(),
382        );
383        Ok(())
384    }
385
386    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
387        write_task(&paths.json, metadata)?;
388        self.dual_write_task(paths, metadata);
389        Ok(())
390    }
391
392    fn update_task_metadata<F>(
393        &self,
394        paths: &TaskPaths,
395        update: F,
396    ) -> std::io::Result<PersistedTask>
397    where
398        F: FnOnce(&mut PersistedTask),
399    {
400        let metadata = update_task(&paths.json, update)?;
401        self.dual_write_task(paths, &metadata);
402        Ok(metadata)
403    }
404
405    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
406        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
407        let Some(pool) = pool else {
408            return;
409        };
410        let harness = self
411            .inner
412            .db_harness
413            .read()
414            .ok()
415            .and_then(|slot| slot.clone());
416        let Some(harness) = harness else {
417            crate::slog_warn!(
418                "dual-write bash_task to DB skipped for {}: harness not configured",
419                metadata.task_id
420            );
421            return;
422        };
423        let row = match metadata.to_bash_task_row(&harness, paths) {
424            Ok(row) => row,
425            Err(error) => {
426                crate::slog_warn!(
427                    "dual-write bash_task to DB failed for {}: {}",
428                    metadata.task_id,
429                    error
430                );
431                return;
432            }
433        };
434        let conn = match pool.lock() {
435            Ok(conn) => conn,
436            Err(_) => {
437                crate::slog_warn!(
438                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
439                    metadata.task_id
440                );
441                return;
442            }
443        };
444        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
445            crate::slog_warn!(
446                "dual-write bash_task to DB failed for {}: {}",
447                metadata.task_id,
448                error
449            );
450        }
451    }
452
453    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
454        self.inner
455            .long_running_reminder_enabled
456            .store(enabled, Ordering::SeqCst);
457        self.inner
458            .long_running_reminder_interval_ms
459            .store(interval_ms, Ordering::SeqCst);
460    }
461
462    #[cfg(unix)]
463    #[allow(clippy::too_many_arguments)]
464    pub fn spawn(
465        &self,
466        command: &str,
467        session_id: String,
468        workdir: PathBuf,
469        env: HashMap<String, String>,
470        timeout: Option<Duration>,
471        storage_dir: PathBuf,
472        max_running: usize,
473        notify_on_completion: bool,
474        compressed: bool,
475        project_root: Option<PathBuf>,
476    ) -> Result<String, String> {
477        self.start_watchdog();
478
479        let running = self.running_count();
480        if running >= max_running {
481            return Err(format!(
482                "background bash task limit exceeded: {running} running (max {max_running})"
483            ));
484        }
485
486        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
487        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
488        let task_id = self.generate_unique_task_id()?;
489        let paths = task_paths(&storage_dir, &session_id, &task_id);
490        fs::create_dir_all(&paths.dir)
491            .map_err(|e| format!("failed to create background task dir: {e}"))?;
492
493        let mut metadata = PersistedTask::starting(
494            task_id.clone(),
495            session_id.clone(),
496            command.to_string(),
497            workdir.clone(),
498            project_root,
499            timeout_ms,
500            notify_on_completion,
501            compressed,
502        );
503        self.persist_task(&paths, &metadata)
504            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
505
506        // Pre-create capture files so the watchdog/buffer can always
507        // open them for reading. The spawn helper opens its own handles
508        // per attempt because each `Command::spawn()` consumes them.
509        create_capture_file(&paths.stdout)
510            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
511        create_capture_file(&paths.stderr)
512            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
513
514        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
515            Ok(child) => child,
516            Err(error) => {
517                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
518                let _ = delete_task_bundle(&paths);
519                return Err(error);
520            }
521        };
522
523        let child_pid = child.id();
524        metadata.mark_running(child_pid, child_pid as i32);
525        self.persist_task(&paths, &metadata)
526            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
527
528        let task = Arc::new(BgTask {
529            task_id: task_id.clone(),
530            session_id,
531            paths: paths.clone(),
532            started: Instant::now(),
533            last_reminder_at: Mutex::new(None),
534            terminal_at: Mutex::new(None),
535            state: Mutex::new(BgTaskState {
536                metadata,
537                runtime: TaskRuntime::Piped(Some(child)),
538                detached: false,
539                child_exit_observed: false,
540                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
541                terminal_output_cache: None,
542                pending_terminal_override: None,
543            }),
544        });
545
546        self.inner
547            .tasks
548            .lock()
549            .map_err(|_| "background task registry lock poisoned".to_string())?
550            .insert(task_id.clone(), task);
551
552        Ok(task_id)
553    }
554
555    #[allow(clippy::too_many_arguments)]
556    pub fn spawn_pty(
557        &self,
558        command: &str,
559        session_id: String,
560        workdir: PathBuf,
561        env: HashMap<String, String>,
562        timeout: Option<Duration>,
563        storage_dir: PathBuf,
564        max_running: usize,
565        notify_on_completion: bool,
566        compressed: bool,
567        project_root: Option<PathBuf>,
568        rows: u16,
569        cols: u16,
570    ) -> Result<String, String> {
571        self.start_watchdog();
572
573        let running = self.running_count();
574        if running >= max_running {
575            return Err(format!(
576                "background bash task limit exceeded: {running} running (max {max_running})"
577            ));
578        }
579
580        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
581        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
582        let task_id = self.generate_unique_task_id()?;
583        let paths = task_paths(&storage_dir, &session_id, &task_id);
584        fs::create_dir_all(&paths.dir)
585            .map_err(|e| format!("failed to create background task dir: {e}"))?;
586
587        let mut metadata = PersistedTask::starting(
588            task_id.clone(),
589            session_id.clone(),
590            command.to_string(),
591            workdir.clone(),
592            project_root,
593            timeout_ms,
594            notify_on_completion,
595            compressed,
596        );
597        metadata.mode = BgMode::Pty;
598        metadata.pty_rows = Some(rows);
599        metadata.pty_cols = Some(cols);
600        self.persist_task(&paths, &metadata)
601            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
602        create_capture_file(&paths.pty)
603            .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
604
605        let runtime = match spawn_pty_for_command(
606            &task_id,
607            &session_id,
608            command,
609            &paths,
610            &workdir,
611            &env,
612            rows,
613            cols,
614            self.inner.wake_tx.clone(),
615        ) {
616            Ok(runtime) => runtime,
617            Err(error) => {
618                crate::slog_warn!(
619                    "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
620                );
621                let _ = delete_task_bundle(&paths);
622                return Err(error);
623            }
624        };
625
626        if let Some(child_pid) = runtime.child_pid {
627            metadata.mark_running(child_pid, child_pid as i32);
628        } else {
629            metadata.status = BgTaskStatus::Running;
630            metadata.pgid = None;
631        }
632        self.persist_task(&paths, &metadata)
633            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
634
635        let task = Arc::new(BgTask {
636            task_id: task_id.clone(),
637            session_id,
638            paths: paths.clone(),
639            started: Instant::now(),
640            last_reminder_at: Mutex::new(None),
641            terminal_at: Mutex::new(None),
642            state: Mutex::new(BgTaskState {
643                metadata,
644                runtime: TaskRuntime::Pty(Some(runtime)),
645                detached: false,
646                child_exit_observed: false,
647                buffer: BgBuffer::pty(paths.pty.clone()),
648                terminal_output_cache: None,
649                pending_terminal_override: None,
650            }),
651        });
652
653        self.inner
654            .tasks
655            .lock()
656            .map_err(|_| "background task registry lock poisoned".to_string())?
657            .insert(task_id.clone(), task);
658
659        Ok(task_id)
660    }
661
662    #[cfg(windows)]
663    #[allow(clippy::too_many_arguments)]
664    pub fn spawn(
665        &self,
666        command: &str,
667        session_id: String,
668        workdir: PathBuf,
669        env: HashMap<String, String>,
670        timeout: Option<Duration>,
671        storage_dir: PathBuf,
672        max_running: usize,
673        notify_on_completion: bool,
674        compressed: bool,
675        project_root: Option<PathBuf>,
676    ) -> Result<String, String> {
677        self.start_watchdog();
678
679        let running = self.running_count();
680        if running >= max_running {
681            return Err(format!(
682                "background bash task limit exceeded: {running} running (max {max_running})"
683            ));
684        }
685
686        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
687        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
688        let task_id = self.generate_unique_task_id()?;
689        let paths = task_paths(&storage_dir, &session_id, &task_id);
690        fs::create_dir_all(&paths.dir)
691            .map_err(|e| format!("failed to create background task dir: {e}"))?;
692
693        let mut metadata = PersistedTask::starting(
694            task_id.clone(),
695            session_id.clone(),
696            command.to_string(),
697            workdir.clone(),
698            project_root,
699            timeout_ms,
700            notify_on_completion,
701            compressed,
702        );
703        self.persist_task(&paths, &metadata)
704            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
705
706        // Capture files are pre-created so the watchdog/buffer can always
707        // open them for reading even if the child hasn't written anything
708        // yet. The spawn helper opens its own handles per attempt because
709        // each `Command::spawn()` consumes them, and on Windows we may
710        // retry across multiple shell candidates if the first one fails.
711        create_capture_file(&paths.stdout)
712            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
713        create_capture_file(&paths.stderr)
714            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
715
716        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
717            Ok(child) => child,
718            Err(error) => {
719                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
720                let _ = delete_task_bundle(&paths);
721                return Err(error);
722            }
723        };
724
725        let child_pid = child.id();
726        metadata.status = BgTaskStatus::Running;
727        metadata.child_pid = Some(child_pid);
728        metadata.pgid = None;
729        self.persist_task(&paths, &metadata)
730            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
731
732        let task = Arc::new(BgTask {
733            task_id: task_id.clone(),
734            session_id,
735            paths: paths.clone(),
736            started: Instant::now(),
737            last_reminder_at: Mutex::new(None),
738            terminal_at: Mutex::new(None),
739            state: Mutex::new(BgTaskState {
740                metadata,
741                runtime: TaskRuntime::Piped(Some(child)),
742                detached: false,
743                child_exit_observed: false,
744                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
745                terminal_output_cache: None,
746                pending_terminal_override: None,
747            }),
748        });
749
750        self.inner
751            .tasks
752            .lock()
753            .map_err(|_| "background task registry lock poisoned".to_string())?
754            .insert(task_id.clone(), task);
755
756        Ok(task_id)
757    }
758
759    pub fn write_pty(
760        &self,
761        task_id: &str,
762        session_id: &str,
763        input: &[u8],
764    ) -> Result<usize, String> {
765        let task = self
766            .task_for_session(task_id, session_id)
767            .ok_or_else(|| "task_not_found".to_string())?;
768
769        let writer = {
770            let state = task
771                .state
772                .lock()
773                .map_err(|_| "background task lock poisoned".to_string())?;
774            if state.metadata.mode != BgMode::Pty {
775                return Err("task_not_pty".to_string());
776            }
777            if state.metadata.status.is_terminal() {
778                return Err("task_exited".to_string());
779            }
780            match &state.runtime {
781                TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
782                TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
783                TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
784            }
785        };
786
787        let mut writer = writer
788            .lock()
789            .map_err(|_| "PTY writer lock poisoned".to_string())?;
790        writer
791            .write_all(input)
792            .map_err(|error| format!("failed to write to PTY: {error}"))?;
793        writer
794            .flush()
795            .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
796        Ok(input.len())
797    }
798
799    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
800        self.replay_session_inner(storage_dir, session_id, None)
801    }
802
803    pub fn replay_session_for_project(
804        &self,
805        storage_dir: &Path,
806        session_id: &str,
807        project_root: &Path,
808    ) -> Result<(), String> {
809        self.replay_session_inner(storage_dir, session_id, Some(project_root))
810    }
811
812    fn replay_session_inner(
813        &self,
814        storage_dir: &Path,
815        session_id: &str,
816        project_root: Option<&Path>,
817    ) -> Result<(), String> {
818        self.start_watchdog();
819        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
820            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
821                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
822            }
823        }
824
825        let canonical_project = project_root.map(canonicalized_path);
826        // Replay strategy: DB is the post-v0.27 source of truth. Disk
827        // fallback handles pre-v0.27 tasks that haven't been migrated and
828        // the cold-start `__default__` namespace (configure runs before any
829        // user session exists, so plugin-init triggers a session-less DB
830        // lookup that will be empty until a real session writes a task).
831        //
832        // We deliberately keep the empty-DB / empty-disk path silent — it's
833        // the normal startup case and would otherwise fire on every configure
834        // (see GitHub user report against v0.27.0). INFO-level logs only when
835        // disk actually returned tasks (real migration signal); WARN when the
836        // DB lookup itself errored.
837        let tasks = match self.replay_session_from_db(session_id) {
838            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
839            Some(Ok(_)) => {
840                let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
841                if !disk_tasks.is_empty() {
842                    crate::slog_info!(
843                        "bash task replay: 0 in DB for session {}, {} from disk fallback",
844                        session_id,
845                        disk_tasks.len()
846                    );
847                }
848                disk_tasks
849            }
850            Some(Err(error)) => {
851                crate::slog_warn!(
852                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
853                    session_id,
854                    error
855                );
856                self.replay_session_from_disk(storage_dir, session_id)?
857            }
858            None => {
859                // DB pool unconfigured — common in tests + before harness is set.
860                self.replay_session_from_disk(storage_dir, session_id)?
861            }
862        };
863
864        for mut metadata in tasks {
865            if metadata.session_id != session_id {
866                continue;
867            }
868            if let Some(canonical_project) = canonical_project.as_deref() {
869                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
870                if metadata_project.as_deref() != Some(canonical_project) {
871                    continue;
872                }
873            }
874
875            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
876            match metadata.status {
877                BgTaskStatus::Starting => {
878                    let completion_was_delivered = metadata.completion_delivered;
879                    metadata.mark_terminal(
880                        BgTaskStatus::Failed,
881                        None,
882                        Some("spawn aborted".to_string()),
883                    );
884                    metadata.completion_delivered |= completion_was_delivered;
885                    let _ = self.persist_task(&paths, &metadata);
886                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
887                    self.insert_rehydrated_task(metadata, paths, true)?;
888                }
889                BgTaskStatus::Running | BgTaskStatus::Killing => {
890                    if metadata.mode == BgMode::Pty {
891                        if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
892                            let completion_was_delivered = metadata.completion_delivered;
893                            metadata = terminal_metadata_from_marker(metadata, marker, None);
894                            metadata.completion_delivered |= completion_was_delivered;
895                            let _ = self.persist_task(&paths, &metadata);
896                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
897                            self.insert_rehydrated_task(metadata, paths, true)?;
898                        } else if metadata.status.is_terminal() {
899                            self.insert_rehydrated_task(metadata, paths, true)?;
900                        } else {
901                            let completion_was_delivered = metadata.completion_delivered;
902                            metadata.mark_terminal(
903                                BgTaskStatus::Killed,
904                                None,
905                                Some("pty_lost_on_bridge_restart".to_string()),
906                            );
907                            metadata.completion_delivered |= completion_was_delivered;
908                            let _ = self.persist_task(&paths, &metadata);
909                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
910                            self.insert_rehydrated_task(metadata, paths, true)?;
911                        }
912                    } else if self.running_metadata_is_stale(&metadata) {
913                        let completion_was_delivered = metadata.completion_delivered;
914                        metadata.mark_terminal(
915                            BgTaskStatus::Killed,
916                            None,
917                            Some("orphaned (>24h)".to_string()),
918                        );
919                        metadata.completion_delivered |= completion_was_delivered;
920                        if !paths.exit.exists() {
921                            let _ = write_kill_marker_if_absent(&paths.exit);
922                        }
923                        let _ = self.persist_task(&paths, &metadata);
924                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
925                        self.insert_rehydrated_task(metadata, paths, true)?;
926                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
927                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
928                            "recovered from inconsistent killing state on replay".to_string()
929                        });
930                        if reason.is_some() {
931                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
932                            metadata.task_id);
933                        }
934                        let completion_was_delivered = metadata.completion_delivered;
935                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
936                        metadata.completion_delivered |= completion_was_delivered;
937                        let _ = self.persist_task(&paths, &metadata);
938                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
939                        self.insert_rehydrated_task(metadata, paths, true)?;
940                    } else if metadata.status == BgTaskStatus::Killing {
941                        if !paths.exit.exists() {
942                            let _ = write_kill_marker_if_absent(&paths.exit);
943                        }
944                        let completion_was_delivered = metadata.completion_delivered;
945                        metadata.mark_terminal(
946                            BgTaskStatus::Killed,
947                            None,
948                            Some("recovered from inconsistent killing state on replay".to_string()),
949                        );
950                        metadata.completion_delivered |= completion_was_delivered;
951                        let _ = self.persist_task(&paths, &metadata);
952                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
953                        self.insert_rehydrated_task(metadata, paths, true)?;
954                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
955                        let completion_was_delivered = metadata.completion_delivered;
956                        metadata.mark_terminal(
957                            BgTaskStatus::Failed,
958                            None,
959                            Some("process exited without exit marker".to_string()),
960                        );
961                        metadata.completion_delivered |= completion_was_delivered;
962                        let _ = self.persist_task(&paths, &metadata);
963                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
964                        self.insert_rehydrated_task(metadata, paths, true)?;
965                    } else {
966                        self.insert_rehydrated_task(metadata, paths, true)?;
967                    }
968                }
969                _ if metadata.status.is_terminal() => {
970                    // Borrow `paths` for the completion enqueue BEFORE
971                    // `insert_rehydrated_task` consumes it. The completion
972                    // helper only reads from `paths` (stdout/stderr/exit) to
973                    // reconstruct a tail preview, so it must see the same
974                    // paths the rehydrated task will own.
975                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
976                    self.insert_rehydrated_task(metadata, paths, true)?;
977                }
978                _ => {}
979            }
980        }
981
982        Ok(())
983    }
984
985    fn replay_session_from_db(
986        &self,
987        session_id: &str,
988    ) -> Option<Result<Vec<PersistedTask>, String>> {
989        let pool = self
990            .inner
991            .db_pool
992            .read()
993            .ok()
994            .and_then(|slot| slot.clone())?;
995        let harness = self
996            .inner
997            .db_harness
998            .read()
999            .ok()
1000            .and_then(|slot| slot.clone())?;
1001        let conn = match pool.lock() {
1002            Ok(conn) => conn,
1003            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1004        };
1005        Some(
1006            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1007                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1008                .map_err(|error| error.to_string()),
1009        )
1010    }
1011
1012    fn replay_session_from_disk(
1013        &self,
1014        storage_dir: &Path,
1015        session_id: &str,
1016    ) -> Result<Vec<PersistedTask>, String> {
1017        let dir = session_tasks_dir(storage_dir, session_id);
1018        if !dir.exists() {
1019            return Ok(Vec::new());
1020        }
1021
1022        let entries = fs::read_dir(&dir)
1023            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1024        let mut tasks = Vec::new();
1025        for entry in entries.flatten() {
1026            let path = entry.path();
1027            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1028                continue;
1029            }
1030            match read_task(&path) {
1031                Ok(metadata) => tasks.push(metadata),
1032                Err(error) => {
1033                    crate::slog_warn!(
1034                        "quarantining invalid background task metadata {} during replay: {error}",
1035                        path.display()
1036                    );
1037                    if let Err(quarantine_error) =
1038                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1039                    {
1040                        crate::slog_warn!(
1041                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1042                            path.display()
1043                        );
1044                    }
1045                }
1046            }
1047        }
1048        Ok(tasks)
1049    }
1050
1051    pub fn register_watch(
1052        &self,
1053        task_id: String,
1054        pattern: WatchPattern,
1055        once: bool,
1056    ) -> Result<String, &'static str> {
1057        let task = self.task(&task_id).ok_or("task_not_found")?;
1058        let (mode, terminal_at_registration, stdout, stderr, pty) = task
1059            .state
1060            .lock()
1061            .map(|state| {
1062                (
1063                    state.metadata.mode.clone(),
1064                    state.metadata.status.is_terminal(),
1065                    task.paths.stdout.clone(),
1066                    task.paths.stderr.clone(),
1067                    task.paths.pty.clone(),
1068                )
1069            })
1070            .map_err(|_| "background_task_lock_poisoned")?;
1071
1072        let mut terminal_matches = Vec::new();
1073        let scanned_terminal = terminal_at_registration;
1074        let watch_id = {
1075            let mut registry = self
1076                .inner
1077                .watch_registry
1078                .lock()
1079                .map_err(|_| "watch_registry_poisoned")?;
1080            let watch_id = registry.register(task_id.clone(), pattern, once)?;
1081            match &mode {
1082                BgMode::Pipes => {
1083                    let stdout_key = format!("{task_id}:stdout");
1084                    let stderr_key = format!("{task_id}:stderr");
1085                    if terminal_at_registration {
1086                        registry.set_file_cursor(&stdout_key, 0);
1087                        registry.set_file_cursor(&stderr_key, 0);
1088                        terminal_matches.extend(registry.scan_file_new_bytes(
1089                            &stdout_key,
1090                            &task_id,
1091                            &stdout,
1092                        ));
1093                        terminal_matches.extend(registry.scan_file_new_bytes(
1094                            &stderr_key,
1095                            &task_id,
1096                            &stderr,
1097                        ));
1098                    } else {
1099                        registry.prime_file_cursor(&stdout_key, &stdout);
1100                        registry.prime_file_cursor(&stderr_key, &stderr);
1101                    }
1102                }
1103                BgMode::Pty => {
1104                    let pty_key = format!("{task_id}:pty");
1105                    if terminal_at_registration {
1106                        registry.set_file_cursor(&pty_key, 0);
1107                        terminal_matches
1108                            .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1109                    } else {
1110                        registry.prime_file_cursor(&pty_key, &pty);
1111                    }
1112                }
1113            }
1114            watch_id
1115        };
1116
1117        if task.is_terminal() {
1118            if !scanned_terminal {
1119                terminal_matches = {
1120                    let mut registry = self
1121                        .inner
1122                        .watch_registry
1123                        .lock()
1124                        .map_err(|_| "watch_registry_poisoned")?;
1125                    match &mode {
1126                        BgMode::Pipes => {
1127                            let stdout_key = format!("{task_id}:stdout");
1128                            let stderr_key = format!("{task_id}:stderr");
1129                            registry.set_file_cursor(&stdout_key, 0);
1130                            registry.set_file_cursor(&stderr_key, 0);
1131                            let mut matches =
1132                                registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1133                            matches.extend(registry.scan_file_new_bytes(
1134                                &stderr_key,
1135                                &task_id,
1136                                &stderr,
1137                            ));
1138                            matches
1139                        }
1140                        BgMode::Pty => {
1141                            let pty_key = format!("{task_id}:pty");
1142                            registry.set_file_cursor(&pty_key, 0);
1143                            registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1144                        }
1145                    }
1146                };
1147            }
1148
1149            let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1150            if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1151                if watch_matched {
1152                    let _ = task.set_completion_delivered(true, self);
1153                    self.clear_task_watch_state(&task_id);
1154                }
1155                return Ok(watch_id);
1156            }
1157
1158            let completion = self
1159                .remove_pending_completion(&task_id)
1160                .or_else(|| self.completion_snapshot_for_task(&task, BG_COMPLETION_PREVIEW_BYTES));
1161            if terminal_matches.is_empty() {
1162                if let Some(completion) = completion.as_ref() {
1163                    self.emit_bash_watch_exit(completion);
1164                }
1165            } else {
1166                for pattern_match in terminal_matches {
1167                    self.emit_bash_pattern_match(&task.session_id, pattern_match);
1168                }
1169            }
1170            let _ = task.set_completion_delivered(true, self);
1171            self.clear_task_watch_state(&task_id);
1172        }
1173
1174        Ok(watch_id)
1175    }
1176
1177    pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1178        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1179            registry.unregister(task_id, watch_id);
1180        }
1181    }
1182
1183    pub fn active_watch_count(&self, task_id: &str) -> usize {
1184        self.inner
1185            .watch_registry
1186            .lock()
1187            .map(|registry| registry.active_count(task_id))
1188            .unwrap_or(0)
1189    }
1190
1191    fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1192        self.inner
1193            .watch_registry
1194            .lock()
1195            .map(|registry| {
1196                (
1197                    registry.has_controlled_task(task_id),
1198                    registry.has_matched_task(task_id),
1199                )
1200            })
1201            .unwrap_or((false, false))
1202    }
1203
1204    fn task_has_watch_control(&self, task_id: &str) -> bool {
1205        self.inner
1206            .watch_registry
1207            .lock()
1208            .map(|registry| registry.has_controlled_task(task_id))
1209            .unwrap_or(false)
1210    }
1211
1212    fn clear_task_watch_state(&self, task_id: &str) {
1213        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1214            registry.clear_task(task_id);
1215        }
1216    }
1217
1218    pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1219        let (mode, stdout, stderr, pty) = match task.state.lock() {
1220            Ok(state) => (
1221                state.metadata.mode.clone(),
1222                task.paths.stdout.clone(),
1223                task.paths.stderr.clone(),
1224                task.paths.pty.clone(),
1225            ),
1226            Err(_) => return,
1227        };
1228        let mut matches = Vec::new();
1229        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1230            match mode {
1231                BgMode::Pipes => {
1232                    let stdout_key = format!("{}:stdout", task.task_id);
1233                    let stderr_key = format!("{}:stderr", task.task_id);
1234                    matches.extend(registry.scan_file_new_bytes(
1235                        &stdout_key,
1236                        &task.task_id,
1237                        &stdout,
1238                    ));
1239                    matches.extend(registry.scan_file_new_bytes(
1240                        &stderr_key,
1241                        &task.task_id,
1242                        &stderr,
1243                    ));
1244                }
1245                BgMode::Pty => {
1246                    let pty_key = format!("{}:pty", task.task_id);
1247                    matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1248                }
1249            }
1250        }
1251        for pattern_match in matches {
1252            self.emit_bash_pattern_match(&task.session_id, pattern_match);
1253        }
1254    }
1255
1256    pub fn status(
1257        &self,
1258        task_id: &str,
1259        session_id: &str,
1260        project_root: Option<&Path>,
1261        storage_dir: Option<&Path>,
1262        preview_bytes: usize,
1263    ) -> Option<BgTaskSnapshot> {
1264        let mut task = self.task_for_session(task_id, session_id);
1265        if task.is_none() {
1266            if let Some(storage_dir) = storage_dir {
1267                let _ = self.replay_session(storage_dir, session_id);
1268                task = self.task_for_session(task_id, session_id);
1269            }
1270        }
1271        let Some(task) = task else {
1272            return self.status_relaxed(
1273                task_id,
1274                session_id,
1275                project_root?,
1276                storage_dir?,
1277                preview_bytes,
1278            );
1279        };
1280        let _ = self.poll_task(&task);
1281        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1282    }
1283
1284    fn status_relaxed_task(
1285        &self,
1286        task_id: &str,
1287        project_root: &Path,
1288        storage_dir: &Path,
1289    ) -> Option<Arc<BgTask>> {
1290        let canonical_project = canonicalized_path(project_root);
1291        match self.lookup_relaxed_task_from_db(task_id, project_root) {
1292            Some(Ok(Some(metadata))) => {
1293                if let Some(task) = self.task(task_id) {
1294                    let matches_project = task
1295                        .state
1296                        .lock()
1297                        .map(|state| {
1298                            state
1299                                .metadata
1300                                .project_root
1301                                .as_deref()
1302                                .map(canonicalized_path)
1303                                .as_deref()
1304                                == Some(canonical_project.as_path())
1305                        })
1306                        .unwrap_or(false);
1307                    return matches_project.then_some(task);
1308                }
1309                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1310                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1311                    return None;
1312                }
1313                return self.task(task_id);
1314            }
1315            Some(Ok(None)) => {
1316                crate::slog_info!(
1317                    "bash task relaxed DB miss for {}; falling back to disk",
1318                    task_id
1319                );
1320            }
1321            Some(Err(error)) => {
1322                crate::slog_warn!(
1323                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1324                    task_id,
1325                    error
1326                );
1327            }
1328            None => {
1329                crate::slog_info!(
1330                    "bash task relaxed DB unavailable for {}; falling back to disk",
1331                    task_id
1332                );
1333            }
1334        }
1335        let root = storage_dir.join("bash-tasks");
1336        let entries = fs::read_dir(&root).ok()?;
1337        for entry in entries.flatten() {
1338            let dir = entry.path();
1339            if !dir.is_dir() {
1340                continue;
1341            }
1342            let path = dir.join(format!("{task_id}.json"));
1343            if !path.exists() {
1344                continue;
1345            }
1346            let metadata = match read_task(&path) {
1347                Ok(metadata) => metadata,
1348                Err(error) => {
1349                    crate::slog_warn!(
1350                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1351                        path.display()
1352                    );
1353                    if let Err(quarantine_error) =
1354                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1355                    {
1356                        crate::slog_warn!(
1357                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1358                            path.display()
1359                        );
1360                    }
1361                    continue;
1362                }
1363            };
1364            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1365            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1366                continue;
1367            }
1368            if let Some(task) = self.task(task_id) {
1369                let matches_project = task
1370                    .state
1371                    .lock()
1372                    .map(|state| {
1373                        state
1374                            .metadata
1375                            .project_root
1376                            .as_deref()
1377                            .map(canonicalized_path)
1378                            .as_deref()
1379                            == Some(canonical_project.as_path())
1380                    })
1381                    .unwrap_or(false);
1382                return matches_project.then_some(task);
1383            }
1384            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1385            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1386                return None;
1387            }
1388            return self.task(task_id);
1389        }
1390        None
1391    }
1392
1393    fn lookup_relaxed_task_from_db(
1394        &self,
1395        task_id: &str,
1396        project_root: &Path,
1397    ) -> Option<Result<Option<PersistedTask>, String>> {
1398        let pool = self
1399            .inner
1400            .db_pool
1401            .read()
1402            .ok()
1403            .and_then(|slot| slot.clone())?;
1404        let harness = self
1405            .inner
1406            .db_harness
1407            .read()
1408            .ok()
1409            .and_then(|slot| slot.clone())?;
1410        let conn = match pool.lock() {
1411            Ok(conn) => conn,
1412            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1413        };
1414        let project_key = crate::search_index::project_cache_key(project_root);
1415        Some(
1416            crate::db::bash_tasks::find_bash_task_for_project(
1417                &conn,
1418                &harness,
1419                &project_key,
1420                task_id,
1421            )
1422            .map(|row| row.map(PersistedTask::from))
1423            .map_err(|error| error.to_string()),
1424        )
1425    }
1426
1427    pub(super) fn status_relaxed(
1428        &self,
1429        task_id: &str,
1430        _session_id: &str,
1431        project_root: &Path,
1432        storage_dir: &Path,
1433        preview_bytes: usize,
1434    ) -> Option<BgTaskSnapshot> {
1435        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1436        let _ = self.poll_task(&task);
1437        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1438    }
1439
1440    pub fn kill_relaxed(
1441        &self,
1442        task_id: &str,
1443        project_root: &Path,
1444        storage_dir: &Path,
1445    ) -> Result<BgTaskSnapshot, String> {
1446        let task = self
1447            .status_relaxed_task(task_id, project_root, storage_dir)
1448            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1449        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1450    }
1451
1452    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1453        #[cfg(test)]
1454        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1455
1456        let mut deleted = 0usize;
1457
1458        let root = storage_dir.join("bash-tasks");
1459        if root.exists() {
1460            let session_dirs = fs::read_dir(&root).map_err(|e| {
1461                format!(
1462                    "failed to read background task root {}: {e}",
1463                    root.display()
1464                )
1465            })?;
1466            for session_entry in session_dirs.flatten() {
1467                let session_dir = session_entry.path();
1468                if !session_dir.is_dir() {
1469                    continue;
1470                }
1471                let task_entries = match fs::read_dir(&session_dir) {
1472                    Ok(entries) => entries,
1473                    Err(error) => {
1474                        crate::slog_warn!(
1475                            "failed to read background task session dir {}: {error}",
1476                            session_dir.display()
1477                        );
1478                        continue;
1479                    }
1480                };
1481                for task_entry in task_entries.flatten() {
1482                    let json_path = task_entry.path();
1483                    if json_path
1484                        .extension()
1485                        .and_then(|extension| extension.to_str())
1486                        != Some("json")
1487                    {
1488                        continue;
1489                    }
1490                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
1491                        continue;
1492                    }
1493                    let metadata = match read_task(&json_path) {
1494                        Ok(metadata) => metadata,
1495                        Err(error) => {
1496                            crate::slog_warn!(
1497                                "quarantining corrupt background task metadata {}: {error}",
1498                                json_path.display()
1499                            );
1500                            quarantine_task_json(
1501                                storage_dir,
1502                                &session_dir,
1503                                &json_path,
1504                                QuarantineKind::Corrupt,
1505                            )?;
1506                            continue;
1507                        }
1508                    };
1509                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1510                        continue;
1511                    }
1512                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1513                    match delete_task_bundle(&paths) {
1514                        Ok(()) => {
1515                            deleted += 1;
1516                            log::debug!(
1517                                "deleted persisted background task bundle {}",
1518                                metadata.task_id
1519                            );
1520                        }
1521                        Err(error) => {
1522                            crate::slog_warn!(
1523                                "failed to delete background task bundle {}: {error}",
1524                                metadata.task_id
1525                            );
1526                            continue;
1527                        }
1528                    }
1529                }
1530            }
1531        }
1532        gc_quarantine(storage_dir);
1533        Ok(deleted)
1534    }
1535
1536    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1537        let tasks = self
1538            .inner
1539            .tasks
1540            .lock()
1541            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1542            .unwrap_or_default();
1543        tasks
1544            .into_iter()
1545            .map(|task| {
1546                let _ = self.poll_task(&task);
1547                self.snapshot_with_terminal_cache(&task, preview_bytes)
1548            })
1549            .collect()
1550    }
1551
1552    /// Replace terminal pipe snapshots with the task's cached rendered output.
1553    /// Running tasks stay raw (tail-only) so agents debugging a live process see
1554    /// exactly what it emitted. PTY tasks are explicitly excluded: their raw
1555    /// terminal bytes are rendered by the plugin's PTY path, not the line
1556    /// compressor.
1557    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1558        if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1559            return;
1560        }
1561        if let Some(cache) = self.ensure_terminal_output_cache(task) {
1562            snapshot.output_preview = cache.output_preview;
1563            snapshot.output_truncated = cache.output_truncated;
1564        }
1565    }
1566
1567    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1568        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1569    }
1570
1571    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1572        let task = self
1573            .task_for_session(task_id, session_id)
1574            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1575        let terminal_after_promote = {
1576            let mut state = task
1577                .state
1578                .lock()
1579                .map_err(|_| "background task lock poisoned".to_string())?;
1580            let updated = self
1581                .update_task_metadata(&task.paths, |metadata| {
1582                    metadata.notify_on_completion = true;
1583                    metadata.completion_delivered = false;
1584                })
1585                .map_err(|e| format!("failed to promote background task: {e}"))?;
1586            state.metadata = updated;
1587            state.metadata.status.is_terminal()
1588        };
1589        if terminal_after_promote {
1590            self.post_terminal_transition(&task, true)?;
1591        }
1592        Ok(true)
1593    }
1594
1595    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1596        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1597            .map(|_| ())
1598    }
1599
1600    pub fn cleanup_finished(&self, older_than: Duration) {
1601        let cutoff = Instant::now().checked_sub(older_than);
1602        let removable_paths: Vec<(String, TaskPaths)> =
1603            if let Ok(mut tasks) = self.inner.tasks.lock() {
1604                let removable = tasks
1605                    .iter()
1606                    .filter_map(|(task_id, task)| {
1607                        let delivered_terminal = task
1608                            .state
1609                            .lock()
1610                            .map(|state| {
1611                                state.metadata.status.is_terminal()
1612                                    && state.metadata.completion_delivered
1613                            })
1614                            .unwrap_or(false);
1615                        if !delivered_terminal {
1616                            return None;
1617                        }
1618
1619                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1620                        let expired = match (terminal_at, cutoff) {
1621                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1622                            (Some(_), None) => true,
1623                            (None, _) => false,
1624                        };
1625                        expired.then(|| task_id.clone())
1626                    })
1627                    .collect::<Vec<_>>();
1628
1629                removable
1630                    .into_iter()
1631                    .filter_map(|task_id| {
1632                        tasks
1633                            .remove(&task_id)
1634                            .map(|task| (task_id, task.paths.clone()))
1635                    })
1636                    .collect()
1637            } else {
1638                Vec::new()
1639            };
1640
1641        for (task_id, paths) in removable_paths {
1642            match delete_task_bundle(&paths) {
1643                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1644                Err(error) => crate::slog_warn!(
1645                    "failed to delete persisted background task bundle {task_id}: {error}"
1646                ),
1647            }
1648        }
1649    }
1650
1651    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1652        self.drain_completions_for_session(None)
1653    }
1654
1655    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1656        let completions = match self.inner.completions.lock() {
1657            Ok(completions) => completions,
1658            Err(_) => return Vec::new(),
1659        };
1660
1661        completions
1662            .iter()
1663            .filter(|completion| {
1664                session_id
1665                    .map(|session_id| completion.session_id == session_id)
1666                    .unwrap_or(true)
1667            })
1668            .cloned()
1669            .collect()
1670    }
1671
1672    pub fn ack_completions_for_session(
1673        &self,
1674        session_id: Option<&str>,
1675        task_ids: &[String],
1676    ) -> Vec<String> {
1677        if task_ids.is_empty() {
1678            return Vec::new();
1679        }
1680        let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1681        let mut completion_sessions = HashMap::new();
1682        if let Ok(mut completions) = self.inner.completions.lock() {
1683            completions.retain(|completion| {
1684                let session_matches = session_id
1685                    .map(|session_id| completion.session_id == session_id)
1686                    .unwrap_or(true);
1687                if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1688                    completion_sessions
1689                        .insert(completion.task_id.clone(), completion.session_id.clone());
1690                    false
1691                } else {
1692                    true
1693                }
1694            });
1695        }
1696
1697        let mut delivered = Vec::new();
1698        for task_id in task_ids {
1699            let task = if let Some(session_id) = session_id {
1700                self.task_for_session(task_id, session_id)
1701            } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1702                self.task_for_session(task_id, completion_session_id)
1703            } else {
1704                self.task(task_id)
1705            };
1706            if let Some(task) = task {
1707                if task.set_completion_delivered(true, self).is_ok() {
1708                    delivered.push(task_id.clone());
1709                }
1710            }
1711        }
1712
1713        delivered
1714    }
1715
1716    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1717        self.inner
1718            .completions
1719            .lock()
1720            .map(|completions| {
1721                completions
1722                    .iter()
1723                    .filter(|completion| completion.session_id == session_id)
1724                    .cloned()
1725                    .collect()
1726            })
1727            .unwrap_or_default()
1728    }
1729
1730    fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1731        let mut completions = self.inner.completions.lock().ok()?;
1732        let idx = completions
1733            .iter()
1734            .position(|completion| completion.task_id == task_id)?;
1735        completions.remove(idx)
1736    }
1737
1738    fn completion_snapshot_for_task(
1739        &self,
1740        task: &Arc<BgTask>,
1741        _preview_bytes: usize,
1742    ) -> Option<BgCompletion> {
1743        let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1744        if !snapshot.info.status.is_terminal() {
1745            return None;
1746        }
1747        let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1748            (String::new(), false)
1749        } else {
1750            self.ensure_terminal_output_cache(task)
1751                .map(|cache| completion_preview_for_cache(&cache))
1752                .unwrap_or_else(|| (String::new(), false))
1753        };
1754        Some(BgCompletion {
1755            task_id: snapshot.info.task_id,
1756            session_id: task.session_id.clone(),
1757            status: snapshot.info.status,
1758            exit_code: snapshot.exit_code,
1759            command: snapshot.info.command,
1760            output_preview,
1761            output_truncated,
1762            original_tokens: None,
1763            compressed_tokens: None,
1764            tokens_skipped: false,
1765        })
1766    }
1767
1768    pub fn detach(&self) {
1769        self.inner.shutdown.store(true, Ordering::SeqCst);
1770        if let Ok(mut tasks) = self.inner.tasks.lock() {
1771            for task in tasks.values() {
1772                if let Ok(mut state) = task.state.lock() {
1773                    match &mut state.runtime {
1774                        TaskRuntime::Piped(child) => *child = None,
1775                        TaskRuntime::Pty(runtime) => *runtime = None,
1776                    }
1777                    state.detached = true;
1778                }
1779            }
1780            tasks.clear();
1781        }
1782    }
1783
1784    pub fn shutdown(&self) {
1785        let tasks = self
1786            .inner
1787            .tasks
1788            .lock()
1789            .map(|tasks| {
1790                tasks
1791                    .values()
1792                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1793                    .collect::<Vec<_>>()
1794            })
1795            .unwrap_or_default();
1796        for (task_id, session_id) in tasks {
1797            let _ = self.kill(&task_id, &session_id);
1798        }
1799    }
1800
1801    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1802        if let Ok(state) = task.state.lock() {
1803            if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1804                // On Windows ConPTY, the reader may not observe EOF while the
1805                // master handle is still held in `PtyRuntime`. The waiter writes
1806                // the authoritative exit marker before setting `exit_observed`,
1807                // so once exit is observed we can finalize from that marker and
1808                // drop the runtime, which lets the reader finish. Waiting for
1809                // `reader_done && exit_observed` wedges completed PTY tasks on
1810                // Windows.
1811                if !pty.exit_observed.load(Ordering::SeqCst) {
1812                    return Ok(());
1813                }
1814            }
1815        }
1816        let marker = match read_exit_marker(&task.paths.exit) {
1817            Ok(Some(marker)) => marker,
1818            Ok(None) => return Ok(()),
1819            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1820        };
1821        self.finalize_from_marker(task, marker, None)
1822    }
1823
1824    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1825        let mut needs_completion = false;
1826        {
1827            let Ok(mut state) = task.state.lock() else {
1828                return;
1829            };
1830            match &mut state.runtime {
1831                TaskRuntime::Piped(child_slot) => {
1832                    if let Some(child) = child_slot.as_mut() {
1833                        if matches!(child.try_wait(), Ok(Some(_))) {
1834                            *child_slot = None;
1835                            state.detached = true;
1836                            state.child_exit_observed = true;
1837                        }
1838                    } else if state.detached {
1839                        let child_known_dead = state.child_exit_observed
1840                            || state
1841                                .metadata
1842                                .child_pid
1843                                .is_some_and(|pid| !is_process_alive(pid));
1844                        if child_known_dead {
1845                            needs_completion =
1846                                self.fail_without_exit_marker_if_needed(task, &mut state);
1847                        }
1848                    }
1849                }
1850                TaskRuntime::Pty(Some(pty)) => {
1851                    if pty.exit_observed.load(Ordering::SeqCst) {
1852                        drop(state);
1853                        let _ = self.poll_task(task);
1854                        return;
1855                    }
1856                }
1857                TaskRuntime::Pty(None) => {}
1858            }
1859        }
1860        if needs_completion {
1861            let _ = self.post_terminal_transition(task, true);
1862        }
1863    }
1864
1865    fn fail_without_exit_marker_if_needed(
1866        &self,
1867        task: &Arc<BgTask>,
1868        state: &mut BgTaskState,
1869    ) -> bool {
1870        if state.metadata.status.is_terminal() {
1871            return false;
1872        }
1873        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1874            return false;
1875        }
1876        let watch_controlled = self.task_has_watch_control(&task.task_id);
1877        let updated = self.update_task_metadata(&task.paths, |metadata| {
1878            metadata.mark_terminal(
1879                BgTaskStatus::Failed,
1880                None,
1881                Some("process exited without exit marker".to_string()),
1882            );
1883            if watch_controlled {
1884                metadata.completion_delivered = true;
1885            }
1886        });
1887        if let Ok(metadata) = updated {
1888            state.pending_terminal_override = None;
1889            state.metadata = metadata;
1890            task.mark_terminal_now();
1891            return true;
1892        }
1893        false
1894    }
1895
1896    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1897        self.inner
1898            .tasks
1899            .lock()
1900            .map(|tasks| {
1901                tasks
1902                    .values()
1903                    .filter(|task| task.is_running())
1904                    .cloned()
1905                    .collect()
1906            })
1907            .unwrap_or_default()
1908    }
1909
1910    fn insert_rehydrated_task(
1911        &self,
1912        metadata: PersistedTask,
1913        paths: TaskPaths,
1914        detached: bool,
1915    ) -> Result<(), String> {
1916        let task_id = metadata.task_id.clone();
1917        let session_id = metadata.session_id.clone();
1918        let started = started_instant_from_unix_millis(metadata.started_at);
1919        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1920        let mode = metadata.mode.clone();
1921        let task = Arc::new(BgTask {
1922            task_id: task_id.clone(),
1923            session_id,
1924            paths: paths.clone(),
1925            started,
1926            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1927            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1928            state: Mutex::new(BgTaskState {
1929                metadata,
1930                runtime: if mode == BgMode::Pty {
1931                    TaskRuntime::Pty(None)
1932                } else {
1933                    TaskRuntime::Piped(None)
1934                },
1935                detached,
1936                // Replay path: we never observed the child handle's exit
1937                // in this process (the previous AFT process did, but its
1938                // observation didn't survive restart). Leave this false so
1939                // the second-pass reap falls through to the
1940                // `is_process_alive(child_pid)` probe rather than declaring
1941                // failure based on stale evidence.
1942                child_exit_observed: false,
1943                buffer: if mode == BgMode::Pty {
1944                    BgBuffer::pty(paths.pty.clone())
1945                } else {
1946                    BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1947                },
1948                terminal_output_cache: None,
1949                pending_terminal_override: None,
1950            }),
1951        });
1952        self.inner
1953            .tasks
1954            .lock()
1955            .map_err(|_| "background task registry lock poisoned".to_string())?
1956            .insert(task_id, task);
1957        Ok(())
1958    }
1959
1960    fn kill_with_status(
1961        &self,
1962        task_id: &str,
1963        session_id: &str,
1964        terminal_status: BgTaskStatus,
1965    ) -> Result<BgTaskSnapshot, String> {
1966        let task = self
1967            .task_for_session(task_id, session_id)
1968            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1969        let mut terminalized = false;
1970
1971        {
1972            let mut state = task
1973                .state
1974                .lock()
1975                .map_err(|_| "background task lock poisoned".to_string())?;
1976            if state.metadata.status.is_terminal() {
1977                state.pending_terminal_override = None;
1978            } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1979                state.metadata =
1980                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1981                if self.task_has_watch_control(&task.task_id) {
1982                    state.metadata.completion_delivered = true;
1983                }
1984                state.pending_terminal_override = None;
1985                task.mark_terminal_now();
1986                match &mut state.runtime {
1987                    // Exit marker already present: the child finished on its
1988                    // own before this kill observed it. Reap it rather than
1989                    // dropping the handle so it doesn't become a zombie
1990                    // (issue #91). The active-kill branch below already
1991                    // `wait()`s after signaling, so this is the only kill
1992                    // path that needed the explicit reap.
1993                    TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
1994                    TaskRuntime::Pty(runtime) => *runtime = None,
1995                }
1996                state.detached = true;
1997                self.persist_task(&task.paths, &state.metadata)
1998                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1999                terminalized = true;
2000            } else {
2001                let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2002                if !was_already_killing {
2003                    state.metadata.status = BgTaskStatus::Killing;
2004                    self.persist_task(&task.paths, &state.metadata)
2005                        .map_err(|e| format!("failed to persist killing state: {e}"))?;
2006                }
2007
2008                #[cfg(unix)]
2009                let pgid = state.metadata.pgid;
2010                #[cfg(windows)]
2011                let child_pid = state.metadata.child_pid;
2012                if !was_already_killing
2013                    && state.metadata.mode == BgMode::Pty
2014                    && terminal_status == BgTaskStatus::TimedOut
2015                {
2016                    state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2017                }
2018
2019                #[cfg(windows)]
2020                let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2021
2022                match &mut state.runtime {
2023                    TaskRuntime::Piped(child_slot) => {
2024                        #[cfg(unix)]
2025                        if let Some(pgid) = pgid {
2026                            terminate_pgid(pgid, child_slot.as_mut());
2027                        }
2028                        #[cfg(windows)]
2029                        if let Some(child) = child_slot.as_mut() {
2030                            super::process::terminate_process(child);
2031                        } else if let Some(pid) = child_pid {
2032                            terminate_pid(pid);
2033                        }
2034                        if let Some(child) = child_slot.as_mut() {
2035                            let _ = child.wait();
2036                        }
2037                        *child_slot = None;
2038                        state.detached = true;
2039
2040                        if !task.paths.exit.exists() {
2041                            write_kill_marker_if_absent(&task.paths.exit)
2042                                .map_err(|e| format!("failed to write kill marker: {e}"))?;
2043                        }
2044
2045                        let exit_code = if terminal_status == BgTaskStatus::TimedOut {
2046                            Some(124)
2047                        } else {
2048                            None
2049                        };
2050                        state
2051                            .metadata
2052                            .mark_terminal(terminal_status, exit_code, None);
2053                        if self.task_has_watch_control(&task.task_id) {
2054                            state.metadata.completion_delivered = true;
2055                        }
2056                        state.pending_terminal_override = None;
2057                        task.mark_terminal_now();
2058                        self.persist_task(&task.paths, &state.metadata)
2059                            .map_err(|e| format!("failed to persist killed state: {e}"))?;
2060                        terminalized = true;
2061                    }
2062                    TaskRuntime::Pty(Some(pty)) => {
2063                        pty.was_killed.store(true, Ordering::SeqCst);
2064                        if let Err(error) = pty.killer.kill() {
2065                            crate::slog_warn!(
2066                                "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2067                            );
2068                        }
2069                        if let Some(pid) = pty.child_pid {
2070                            #[cfg(unix)]
2071                            terminate_pgid(pid as i32, None);
2072                            #[cfg(windows)]
2073                            terminate_pid(pid);
2074                        }
2075                        drop(pty.master.take());
2076
2077                        #[cfg(windows)]
2078                        {
2079                            let default_status = if terminal_status == BgTaskStatus::TimedOut {
2080                                BgTaskStatus::TimedOut
2081                            } else {
2082                                BgTaskStatus::Killed
2083                            };
2084                            pty_forced_terminal_status = Some(
2085                                state
2086                                    .pending_terminal_override
2087                                    .take()
2088                                    .unwrap_or(default_status),
2089                            );
2090                        }
2091                    }
2092                    TaskRuntime::Pty(None) => {}
2093                }
2094
2095                #[cfg(windows)]
2096                if let Some(target_status) = pty_forced_terminal_status {
2097                    if !task.paths.exit.exists() {
2098                        write_kill_marker_if_absent(&task.paths.exit)
2099                            .map_err(|e| format!("failed to write kill marker: {e}"))?;
2100                    }
2101
2102                    let exit_code = if target_status == BgTaskStatus::TimedOut {
2103                        Some(124)
2104                    } else {
2105                        None
2106                    };
2107                    state.metadata.mark_terminal(target_status, exit_code, None);
2108                    if self.task_has_watch_control(&task.task_id) {
2109                        state.metadata.completion_delivered = true;
2110                    }
2111                    state.pending_terminal_override = None;
2112                    task.mark_terminal_now();
2113                    if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2114                        *runtime = None;
2115                    }
2116                    state.detached = true;
2117                    self.persist_task(&task.paths, &state.metadata)
2118                        .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2119                    terminalized = true;
2120                }
2121            }
2122        }
2123
2124        if terminalized {
2125            self.post_terminal_transition(&task, true)?;
2126        }
2127        Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2128    }
2129
2130    fn finalize_from_marker(
2131        &self,
2132        task: &Arc<BgTask>,
2133        marker: ExitMarker,
2134        reason: Option<String>,
2135    ) -> Result<(), String> {
2136        let watch_controlled = self.task_has_watch_control(&task.task_id);
2137        let mut pty_reader_done = None;
2138        {
2139            let mut state = task
2140                .state
2141                .lock()
2142                .map_err(|_| "background task lock poisoned".to_string())?;
2143            if state.metadata.status.is_terminal() {
2144                state.pending_terminal_override = None;
2145                return Ok(());
2146            }
2147
2148            let pending_override = state.pending_terminal_override.take();
2149            let is_pty = state.metadata.mode == BgMode::Pty;
2150            let updated = self
2151                .update_task_metadata(&task.paths, |metadata| {
2152                    let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2153                        let mut metadata = metadata.clone();
2154                        let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2155                        let exit_code = if target_status == BgTaskStatus::TimedOut {
2156                            Some(124)
2157                        } else {
2158                            None
2159                        };
2160                        metadata.mark_terminal(target_status, exit_code, reason);
2161                        metadata
2162                    } else {
2163                        terminal_metadata_from_marker(metadata.clone(), marker, reason)
2164                    };
2165                    if watch_controlled {
2166                        new_metadata.completion_delivered = true;
2167                    }
2168                    *metadata = new_metadata;
2169                })
2170                .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2171            state.metadata = updated;
2172            task.mark_terminal_now();
2173            match &mut state.runtime {
2174                // Reap the exited direct child instead of dropping it, so it
2175                // does not linger as a `<defunct>` zombie (issue #91). The
2176                // wrapper writes the exit marker as its final act, so the
2177                // child is already exiting and `wait()` returns immediately.
2178                TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2179                TaskRuntime::Pty(runtime) => {
2180                    pty_reader_done = runtime
2181                        .as_ref()
2182                        .map(|runtime| Arc::clone(&runtime.reader_done));
2183                    *runtime = None;
2184                }
2185            }
2186            state.detached = true;
2187        }
2188
2189        if let Some(reader_done) = pty_reader_done {
2190            let deadline = Instant::now() + Duration::from_millis(200);
2191            while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2192                std::thread::sleep(Duration::from_millis(10));
2193            }
2194        }
2195
2196        // One final scan runs before terminal notification routing so bytes
2197        // printed immediately before exit can win over the exit safety net.
2198        self.scan_task_watch_output(task);
2199
2200        self.post_terminal_transition(task, true)
2201    }
2202
2203    fn enqueue_completion_if_needed(
2204        &self,
2205        metadata: &PersistedTask,
2206        paths: Option<&TaskPaths>,
2207        emit_frame: bool,
2208    ) {
2209        if metadata.status.is_terminal() && !metadata.completion_delivered {
2210            let cache =
2211                paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2212            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2213        }
2214    }
2215
2216    fn render_terminal_output_from_paths(
2217        &self,
2218        metadata: &PersistedTask,
2219        paths: &TaskPaths,
2220    ) -> Option<TerminalOutputCache> {
2221        if metadata.mode == BgMode::Pty {
2222            return None;
2223        }
2224        let buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2225        Some(self.render_terminal_output(metadata, &buffer))
2226    }
2227
2228    fn enqueue_completion_from_parts(
2229        &self,
2230        metadata: &PersistedTask,
2231        buffer: Option<&BgBuffer>,
2232        paths: Option<&TaskPaths>,
2233        emit_frame: bool,
2234        terminal_render: Option<&TerminalOutputCache>,
2235    ) {
2236        // Only the terminal-state guard prevents double-recording here. The
2237        // `completion_delivered` flag is NOT used to gate compression-event
2238        // recording, because `mark_terminal` flips `completion_delivered=true`
2239        // immediately for tasks with `notify_on_completion=false` (foreground
2240        // bash polled via `bash_status`, which is the common case). Pre-emptive
2241        // delivery flagging is correct for the push-frame queue (suppresses
2242        // duplicate user-visible notifications) but would silently skip the
2243        // database insert below. Compression event recording is idempotent at
2244        // the DB layer (unique on harness+session+task_id), so re-entry is
2245        // safe; the dedupe-by-queue check stays for the push frame side.
2246        if !metadata.status.is_terminal() {
2247            return;
2248        }
2249
2250        let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2251            paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2252        } else {
2253            None
2254        };
2255        let render_buffer = buffer.or(owned_buffer.as_ref());
2256        let owned_render = if terminal_render.is_none() {
2257            render_buffer.map(|buffer| self.render_terminal_output(metadata, buffer))
2258        } else {
2259            None
2260        };
2261        let render = terminal_render.or(owned_render.as_ref());
2262
2263        // Completion reminders use the already-rendered terminal output and a
2264        // smaller head+tail cap. They never invoke the compressor themselves.
2265        let (output_preview, output_truncated) = render
2266            .map(completion_preview_for_cache)
2267            .unwrap_or_else(|| (String::new(), false));
2268
2269        let token_counts = self.completion_token_counts(
2270            metadata,
2271            buffer,
2272            paths,
2273            render.map(|render| render.output_preview.as_str()),
2274        );
2275        let completion = BgCompletion {
2276            task_id: metadata.task_id.clone(),
2277            session_id: metadata.session_id.clone(),
2278            status: metadata.status.clone(),
2279            exit_code: metadata.exit_code,
2280            command: metadata.command.clone(),
2281            output_preview,
2282            output_truncated,
2283            original_tokens: token_counts.original_tokens,
2284            compressed_tokens: token_counts.compressed_tokens,
2285            tokens_skipped: token_counts.tokens_skipped,
2286        };
2287
2288        // Record the compression event BEFORE the push-frame dedupe. Event
2289        // recording has its own idempotency at the DB layer (unique key on
2290        // harness+session+task_id), so it's safe to attempt for every
2291        // terminal-state finalize. Critically, this path runs even when
2292        // `completion_delivered=true` was pre-set by `mark_terminal` for
2293        // foreground bash (`notify_on_completion=false`) — which is the common
2294        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
2295        // after the dedupe guard and never fired for foreground tasks, which
2296        // meant compression accounting was effectively dead for >99% of
2297        // real-world bash usage.
2298        self.record_compression_event_if_applicable(metadata, &token_counts);
2299
2300        let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2301        if watch_controlled {
2302            if emit_frame && !watch_matched {
2303                self.emit_bash_watch_exit(&completion);
2304            }
2305            self.clear_task_watch_state(&metadata.task_id);
2306            return;
2307        }
2308
2309        // Push-frame queue is gated on `completion_delivered` so foreground
2310        // bash with `notify_on_completion=false` does not leak a user-visible
2311        // completion notification. `mark_terminal` pre-sets
2312        // `completion_delivered=true` for those tasks; honoring it here keeps
2313        // the suppression invariant the test
2314        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
2315        // asserts. The compression-event recording above intentionally runs
2316        // before this gate so foreground bash still contributes to the
2317        // session/project aggregates.
2318        if metadata.completion_delivered {
2319            return;
2320        }
2321
2322        // Push-frame queue dedupe stays per-task to prevent duplicate
2323        // user-visible completion notifications.
2324        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2325            if completions
2326                .iter()
2327                .any(|existing| existing.task_id == metadata.task_id)
2328            {
2329                false
2330            } else {
2331                completions.push_back(completion.clone());
2332                true
2333            }
2334        } else {
2335            false
2336        };
2337
2338        if pushed && emit_frame {
2339            self.emit_bash_completed(completion);
2340        }
2341    }
2342
2343    fn record_compression_event_if_applicable(
2344        &self,
2345        metadata: &PersistedTask,
2346        token_counts: &CompletionTokenCounts,
2347    ) {
2348        if metadata.mode == BgMode::Pty {
2349            return;
2350        }
2351
2352        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2353            token_counts.original_tokens,
2354            token_counts.compressed_tokens,
2355            token_counts.original_bytes,
2356            token_counts.compressed_bytes,
2357        ) {
2358            (
2359                Some(original_tokens),
2360                Some(compressed_tokens),
2361                Some(original_bytes),
2362                Some(compressed_bytes),
2363            ) => (
2364                original_tokens,
2365                compressed_tokens,
2366                original_bytes,
2367                compressed_bytes,
2368            ),
2369            _ => {
2370                crate::slog_warn!(
2371                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2372                    metadata.task_id
2373                );
2374                return;
2375            }
2376        };
2377
2378        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2379        let Some(pool) = pool else {
2380            crate::slog_warn!(
2381                "compression event skipped for {}: db_pool not initialized — was configure run?",
2382                metadata.task_id
2383            );
2384            return;
2385        };
2386        let harness = self
2387            .inner
2388            .db_harness
2389            .read()
2390            .ok()
2391            .and_then(|slot| slot.clone());
2392        let Some(harness) = harness else {
2393            crate::slog_warn!(
2394                "compression event insert skipped for {}: harness not configured",
2395                metadata.task_id
2396            );
2397            return;
2398        };
2399
2400        let project_root = metadata
2401            .project_root
2402            .as_deref()
2403            .unwrap_or(&metadata.workdir);
2404        let project_key = crate::search_index::project_cache_key(project_root);
2405        let row = crate::db::compression_events::CompressionEventRow {
2406            harness: &harness,
2407            session_id: Some(&metadata.session_id),
2408            project_key: &project_key,
2409            tool: "bash",
2410            task_id: Some(&metadata.task_id),
2411            command: Some(&metadata.command),
2412            compressor: if metadata.compressed {
2413                "registry"
2414            } else {
2415                "none"
2416            },
2417            original_bytes,
2418            compressed_bytes,
2419            original_tokens,
2420            compressed_tokens,
2421            created_at: unix_millis() as i64,
2422        };
2423
2424        let conn = match pool.lock() {
2425            Ok(conn) => conn,
2426            Err(_) => {
2427                crate::slog_warn!(
2428                    "compression event insert failed for {}: db mutex poisoned",
2429                    metadata.task_id
2430                );
2431                return;
2432            }
2433        };
2434        match crate::db::compression_events::insert_compression_event(&conn, &row) {
2435            Ok(_) => {
2436                // DEBUG-level: each foreground bash call records one of these,
2437                // which clutters info-level logs without adding diagnostic value.
2438                // Aggregate totals are visible via the status RPC / TUI sidebar.
2439                crate::slog_debug!(
2440                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2441                    metadata.task_id,
2442                    project_key,
2443                    metadata.session_id,
2444                    original_tokens,
2445                    compressed_tokens
2446                );
2447            }
2448            Err(error) => {
2449                crate::slog_warn!(
2450                    "compression event insert failed for {}: {}",
2451                    metadata.task_id,
2452                    error
2453                );
2454            }
2455        }
2456    }
2457
2458    fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2459        let Ok(progress_sender) = self
2460            .inner
2461            .progress_sender
2462            .lock()
2463            .map(|sender| sender.clone())
2464        else {
2465            return;
2466        };
2467        if let Some(sender) = progress_sender.as_ref() {
2468            sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2469                pattern_match.task_id,
2470                session_id.to_string(),
2471                pattern_match.watch_id,
2472                pattern_match.match_text,
2473                pattern_match.match_offset,
2474                pattern_match.context,
2475                pattern_match.once,
2476            )));
2477        }
2478    }
2479
2480    fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2481        let Ok(progress_sender) = self
2482            .inner
2483            .progress_sender
2484            .lock()
2485            .map(|sender| sender.clone())
2486        else {
2487            return;
2488        };
2489        let Some(sender) = progress_sender.as_ref() else {
2490            return;
2491        };
2492        let status = completion_status_text(&completion.status, completion.exit_code);
2493        let preview = completion.output_preview.trim_end();
2494        let context = if preview.is_empty() {
2495            format!("task {} exited ({status})", completion.task_id)
2496        } else {
2497            format!(
2498                "task {} exited ({status})
2499{preview}",
2500                completion.task_id
2501            )
2502        };
2503        sender(PushFrame::BashPatternMatch(
2504            BashPatternMatchFrame::task_exit(
2505                completion.task_id.clone(),
2506                completion.session_id.clone(),
2507                format!("exited ({status})"),
2508                context,
2509            ),
2510        ));
2511    }
2512
2513    fn emit_bash_completed(&self, completion: BgCompletion) {
2514        let Ok(progress_sender) = self
2515            .inner
2516            .progress_sender
2517            .lock()
2518            .map(|sender| sender.clone())
2519        else {
2520            return;
2521        };
2522        let Some(sender) = progress_sender.as_ref() else {
2523            return;
2524        };
2525        // Clone the callback out of the registry mutex before writing to stdout;
2526        // otherwise a blocked push-frame write could pin the mutex and starve
2527        // unrelated progress-sender updates.
2528        // Bg task transitions are discovered by the watchdog thread, so the
2529        // sender is shared behind a Mutex. It still uses the same stdout writer
2530        // closure as foreground progress frames, preserving the existing lock/
2531        // flush behavior in main.rs.
2532        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2533            completion.task_id,
2534            completion.session_id,
2535            completion.status,
2536            completion.exit_code,
2537            completion.command,
2538            completion.output_preview,
2539            completion.output_truncated,
2540            completion.original_tokens,
2541            completion.compressed_tokens,
2542            completion.tokens_skipped,
2543        )));
2544    }
2545
2546    fn completion_token_counts(
2547        &self,
2548        metadata: &PersistedTask,
2549        buffer: Option<&BgBuffer>,
2550        paths: Option<&TaskPaths>,
2551        rendered_output: Option<&str>,
2552    ) -> CompletionTokenCounts {
2553        if metadata.mode == BgMode::Pty {
2554            return CompletionTokenCounts::skipped();
2555        }
2556
2557        let raw = match buffer {
2558            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2559            None => paths
2560                .map(|paths| {
2561                    read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2562                })
2563                .unwrap_or(TokenCountInput::Skipped),
2564        };
2565
2566        let TokenCountInput::Text(raw_output) = raw else {
2567            return CompletionTokenCounts::skipped();
2568        };
2569
2570        let original_tokens = token_count_u32(&raw_output);
2571        let original_bytes = raw_output.len() as i64;
2572        let compressed_output = rendered_output.unwrap_or(&raw_output);
2573        let compressed_tokens = token_count_u32(compressed_output);
2574        let compressed_bytes = compressed_output.len() as i64;
2575        CompletionTokenCounts {
2576            original_tokens: Some(original_tokens),
2577            compressed_tokens: Some(compressed_tokens),
2578            original_bytes: Some(original_bytes),
2579            compressed_bytes: Some(compressed_bytes),
2580            tokens_skipped: false,
2581        }
2582    }
2583
2584    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2585        if !self
2586            .inner
2587            .long_running_reminder_enabled
2588            .load(Ordering::SeqCst)
2589        {
2590            return;
2591        }
2592        let interval_ms = self
2593            .inner
2594            .long_running_reminder_interval_ms
2595            .load(Ordering::SeqCst);
2596        if interval_ms == 0 {
2597            return;
2598        }
2599        let interval = Duration::from_millis(interval_ms);
2600        let now = Instant::now();
2601        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2602            return;
2603        };
2604        let since = last_reminder_at.unwrap_or(task.started);
2605        if now.duration_since(since) < interval {
2606            return;
2607        }
2608        let command = task
2609            .state
2610            .lock()
2611            .map(|state| state.metadata.command.clone())
2612            .unwrap_or_default();
2613        *last_reminder_at = Some(now);
2614        self.emit_bash_long_running(BashLongRunningFrame::new(
2615            task.task_id.clone(),
2616            task.session_id.clone(),
2617            command,
2618            task.started.elapsed().as_millis() as u64,
2619        ));
2620    }
2621
2622    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2623        let Ok(progress_sender) = self
2624            .inner
2625            .progress_sender
2626            .lock()
2627            .map(|sender| sender.clone())
2628        else {
2629            return;
2630        };
2631        if let Some(sender) = progress_sender.as_ref() {
2632            sender(PushFrame::BashLongRunning(frame));
2633        }
2634    }
2635
2636    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2637        self.inner
2638            .tasks
2639            .lock()
2640            .ok()
2641            .and_then(|tasks| tasks.get(task_id).cloned())
2642    }
2643
2644    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2645        self.task(task_id)
2646            .filter(|task| task.session_id == session_id)
2647    }
2648
2649    fn running_count(&self) -> usize {
2650        self.inner
2651            .tasks
2652            .lock()
2653            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2654            .unwrap_or(0)
2655    }
2656
2657    fn start_watchdog(&self) {
2658        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2659            super::watchdog::start(self.clone());
2660        }
2661    }
2662
2663    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2664        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2665    }
2666
2667    #[cfg(test)]
2668    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2669        self.task_for_session(task_id, session_id)
2670            .map(|task| task.paths.json.clone())
2671    }
2672
2673    #[cfg(test)]
2674    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2675        self.task_for_session(task_id, session_id)
2676            .map(|task| task.paths.exit.clone())
2677    }
2678
2679    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
2680    fn generate_unique_task_id(&self) -> Result<String, String> {
2681        for _ in 0..32 {
2682            let candidate = random_slug();
2683            let tasks = self
2684                .inner
2685                .tasks
2686                .lock()
2687                .map_err(|_| "background task registry lock poisoned".to_string())?;
2688            if tasks.contains_key(&candidate) {
2689                continue;
2690            }
2691            let completions = self
2692                .inner
2693                .completions
2694                .lock()
2695                .map_err(|_| "background completions lock poisoned".to_string())?;
2696            if completions
2697                .iter()
2698                .any(|completion| completion.task_id == candidate)
2699            {
2700                continue;
2701            }
2702            return Ok(candidate);
2703        }
2704        Err("failed to allocate unique background task id after 32 attempts".to_string())
2705    }
2706}
2707
2708fn render_compressed_with_recovery(
2709    buffer: &BgBuffer,
2710    mut compressed: CompressionResult,
2711    input_truncated: bool,
2712) -> TerminalOutputCache {
2713    // Preserve a single canonical trailing newline. A bare `.trim_end()` strips
2714    // the legitimate final newline that `echo` and most commands emit, so
2715    // agent-facing output diverged from native bash ("hello" vs "hello\n") and
2716    // broke the no-JSON-envelope contract. Collapse excess trailing blank lines
2717    // to one, but keep that one when the content had a trailing newline. NOTE:
2718    // the check must read the ORIGINAL text — strip_plain_truncation_marker_lines
2719    // rebuilds via `.lines().join("\n")`, which itself drops the trailing newline.
2720    let had_trailing_newline = compressed.text.ends_with('\n');
2721    let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2722        .trim_end()
2723        .to_string();
2724    if had_trailing_newline && !text.is_empty() {
2725        text.push('\n');
2726    }
2727    compressed.text = text;
2728
2729    let output_path = buffer.output_path().map(|path| path.display().to_string());
2730    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2731    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2732    let mut recovery = RecoveryContext {
2733        dropped_by_class: compressed.dropped_by_class,
2734        had_inner_drop: compressed.had_inner_drop,
2735        offset_hint_eligible: compressed.offset_hint_eligible,
2736        offset_start_line: compressed.offset_start_line,
2737        byte_truncated: input_truncated,
2738        output_path: output_path.clone(),
2739        stderr_path: stderr_path.clone(),
2740        include_stderr_path,
2741    };
2742
2743    let (output_preview, output_truncated) =
2744        render_body_with_recovery_marker(&compressed.text, &mut recovery);
2745    TerminalOutputCache {
2746        output_preview,
2747        output_truncated,
2748        kind: TerminalOutputKind::Compressed,
2749        output_path,
2750        stderr_path,
2751        recovery: Some(recovery),
2752    }
2753}
2754
2755fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2756    render_body_with_recovery_marker_at_cap(
2757        body,
2758        recovery,
2759        FINAL_OUTPUT_CAP_BYTES,
2760        cap_final_output,
2761        cap_final_output_with_marker,
2762    )
2763}
2764
2765fn render_raw_body_with_recovery_marker(
2766    body: &str,
2767    recovery: &mut RecoveryContext,
2768) -> (String, bool) {
2769    render_body_with_recovery_marker_at_cap(
2770        body,
2771        recovery,
2772        RAW_PASSTHROUGH_CAP_BYTES,
2773        |input| {
2774            super::output::cap_head_tail(
2775                input,
2776                RAW_PASSTHROUGH_CAP_BYTES,
2777                RAW_PASSTHROUGH_HEAD_BYTES,
2778                RAW_PASSTHROUGH_TAIL_BYTES,
2779            )
2780        },
2781        |input, marker| {
2782            super::output::cap_head_tail_with_marker(
2783                input,
2784                RAW_PASSTHROUGH_CAP_BYTES,
2785                RAW_PASSTHROUGH_HEAD_BYTES,
2786                RAW_PASSTHROUGH_TAIL_BYTES,
2787                marker,
2788            )
2789        },
2790    )
2791}
2792
2793fn render_body_with_recovery_marker_at_cap<F, G>(
2794    body: &str,
2795    recovery: &mut RecoveryContext,
2796    cap_bytes: usize,
2797    cap_plain: F,
2798    cap_with_marker: G,
2799) -> (String, bool)
2800where
2801    F: Fn(&str) -> super::output::CappedText,
2802    G: Fn(&str, &str) -> super::output::CappedText,
2803{
2804    let needs_marker = recovery.has_visible_drop();
2805    if body.len() > cap_bytes {
2806        recovery.byte_truncated = true;
2807        if let Some(marker) = recovery_marker(recovery) {
2808            let capped = cap_with_marker(body, &marker);
2809            return (capped.text, true);
2810        }
2811        let capped = cap_plain(body);
2812        return (capped.text, capped.truncated || needs_marker);
2813    }
2814
2815    if !needs_marker {
2816        return (body.to_string(), false);
2817    }
2818
2819    let Some(marker) = recovery_marker(recovery) else {
2820        return (body.to_string(), true);
2821    };
2822    let with_marker = append_recovery_marker(body, &marker);
2823    if with_marker.len() <= cap_bytes {
2824        return (with_marker, true);
2825    }
2826
2827    recovery.byte_truncated = true;
2828    let marker = recovery_marker(recovery).unwrap_or(marker);
2829    let capped = cap_with_marker(body, &marker);
2830    (capped.text, true)
2831}
2832
2833fn append_recovery_marker(body: &str, marker: &str) -> String {
2834    if body.is_empty() {
2835        return marker.to_string();
2836    }
2837    let mut output = body.trim_end().to_string();
2838    output.push('\n');
2839    output.push_str(marker);
2840    output
2841}
2842
2843fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2844    let mut parts = Vec::new();
2845    for (class, count) in &recovery.dropped_by_class {
2846        let label = if *count == 1 {
2847            class.singular()
2848        } else {
2849            class.plural()
2850        };
2851        parts.push(format!("+{count} more {label}"));
2852    }
2853    if recovery.byte_truncated {
2854        parts.push("truncated output".to_string());
2855    } else if recovery.had_inner_drop && parts.is_empty() {
2856        parts.push("omitted output".to_string());
2857    }
2858
2859    if parts.is_empty() {
2860        return None;
2861    }
2862
2863    let hint = recovery_hint(recovery);
2864    Some(format!("[{}; {hint}]", parts.join(", ")))
2865}
2866
2867fn recovery_hint(recovery: &RecoveryContext) -> String {
2868    // AFT stores stdout/stderr separately and combines them in memory. Class caps,
2869    // middle truncation, and mixed stdout/stderr renders are not line-offset
2870    // portable. Only a single-file contiguous-prefix drop may use `tail -n +N`.
2871    if recovery.offset_hint_eligible
2872        && !recovery.byte_truncated
2873        && recovery.dropped_by_class.is_empty()
2874        && !recovery.include_stderr_path
2875    {
2876        if let (Some(path), Some(line)) =
2877            (recovery.output_path.as_deref(), recovery.offset_start_line)
2878        {
2879            return format!("see remaining: tail -n +{line} {}", quote_path(path));
2880        }
2881    }
2882
2883    let mut paths = Vec::new();
2884    if let Some(path) = recovery.output_path.as_deref() {
2885        paths.push(path);
2886    }
2887    if recovery.include_stderr_path {
2888        if let Some(path) = recovery.stderr_path.as_deref() {
2889            if !paths.contains(&path) {
2890                paths.push(path);
2891            }
2892        }
2893    }
2894
2895    if paths.is_empty() {
2896        return "full output unavailable".to_string();
2897    }
2898
2899    let reads = paths
2900        .into_iter()
2901        .map(|path| format!("read {}", quote_path(path)))
2902        .collect::<Vec<_>>()
2903        .join(" and ");
2904    format!("full output: {reads}")
2905}
2906
2907fn strip_plain_truncation_marker_lines(input: &str) -> String {
2908    input
2909        .lines()
2910        .filter(|line| !is_plain_truncation_marker(line.trim()))
2911        .collect::<Vec<_>>()
2912        .join("\n")
2913}
2914
2915fn strip_recovery_marker_lines(input: &str) -> String {
2916    input
2917        .lines()
2918        .filter(|line| !is_recovery_marker(line.trim()))
2919        .collect::<Vec<_>>()
2920        .join("\n")
2921}
2922
2923fn is_plain_truncation_marker(line: &str) -> bool {
2924    let Some(rest) = line.strip_prefix("...<truncated ") else {
2925        return false;
2926    };
2927    let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2928        return false;
2929    };
2930    !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2931}
2932
2933fn is_recovery_marker(line: &str) -> bool {
2934    line.starts_with('[')
2935        && line.ends_with(']')
2936        && (line.contains("full output: read ")
2937            || line.contains("see remaining: tail -n +")
2938            || line.contains("full output unavailable"))
2939}
2940
2941fn render_structured_output(command: &str, buffer: &BgBuffer) -> Option<TerminalOutputCache> {
2942    if !is_gh_structured_command(command) {
2943        return None;
2944    }
2945
2946    let output_path = buffer
2947        .output_path()
2948        .map(|path| path.display().to_string())?;
2949    let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2950    if stdout_bytes == 0 {
2951        return None;
2952    }
2953
2954    if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2955        if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2956            return None;
2957        }
2958        return Some(TerminalOutputCache {
2959            output_preview: json_output_pointer(stdout_bytes, &output_path),
2960            output_truncated: true,
2961            kind: TerminalOutputKind::Structured,
2962            output_path: Some(output_path),
2963            stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2964            recovery: None,
2965        });
2966    }
2967
2968    let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
2969    if stdout.truncated || !is_structured_body(&stdout.text) {
2970        return None;
2971    }
2972
2973    Some(TerminalOutputCache {
2974        output_preview: stdout.text,
2975        output_truncated: false,
2976        kind: TerminalOutputKind::Structured,
2977        output_path: Some(output_path),
2978        stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2979        recovery: None,
2980    })
2981}
2982
2983fn render_raw_passthrough(buffer: &BgBuffer) -> TerminalOutputCache {
2984    let raw = buffer.read_combined_head_tail(
2985        RAW_PASSTHROUGH_CAP_BYTES,
2986        RAW_PASSTHROUGH_HEAD_BYTES,
2987        RAW_PASSTHROUGH_TAIL_BYTES,
2988    );
2989    let output_path = buffer.output_path().map(|path| path.display().to_string());
2990    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2991    if !raw.truncated {
2992        return TerminalOutputCache {
2993            output_preview: raw.text,
2994            output_truncated: false,
2995            kind: TerminalOutputKind::Raw,
2996            output_path,
2997            stderr_path,
2998            recovery: None,
2999        };
3000    }
3001
3002    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3003    let mut recovery = RecoveryContext {
3004        dropped_by_class: BTreeMap::new(),
3005        had_inner_drop: false,
3006        offset_hint_eligible: false,
3007        offset_start_line: None,
3008        byte_truncated: true,
3009        output_path: output_path.clone(),
3010        stderr_path: stderr_path.clone(),
3011        include_stderr_path,
3012    };
3013    let (output_preview, output_truncated) =
3014        render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3015    TerminalOutputCache {
3016        output_preview,
3017        output_truncated,
3018        kind: TerminalOutputKind::Raw,
3019        output_path,
3020        stderr_path,
3021        recovery: Some(recovery),
3022    }
3023}
3024
3025fn completion_preview_for_cache(cache: &TerminalOutputCache) -> (String, bool) {
3026    if cache.kind == TerminalOutputKind::Structured
3027        && cache.output_preview.len() > BG_COMPLETION_PREVIEW_BYTES
3028    {
3029        if let Some(path) = cache.output_path.as_deref() {
3030            return (
3031                json_output_pointer(cache.output_preview.len() as u64, path),
3032                true,
3033            );
3034        }
3035        return (cache.output_preview.clone(), cache.output_truncated);
3036    }
3037
3038    if let Some(recovery) = cache.recovery.as_ref() {
3039        if cache.output_preview.len() <= BG_COMPLETION_PREVIEW_BYTES {
3040            return (cache.output_preview.clone(), cache.output_truncated);
3041        }
3042        let body = strip_recovery_marker_lines(&cache.output_preview);
3043        let mut completion_recovery = recovery.clone();
3044        completion_recovery.byte_truncated = true;
3045        if let Some(marker) = recovery_marker(&completion_recovery) {
3046            let capped = cap_completion_output_with_marker(&body, &marker);
3047            return (capped.text, true);
3048        }
3049    }
3050
3051    let capped = cap_completion_output(&cache.output_preview);
3052    (capped.text, cache.output_truncated || capped.truncated)
3053}
3054
3055fn is_gh_structured_command(command: &str) -> bool {
3056    let normalized = crate::compress::normalize_command_for_dispatch(command)
3057        .unwrap_or_else(|| command.trim_start().to_string());
3058    let tokens = shell_words_for_flags(&normalized);
3059    let Some(head) = tokens.first() else {
3060        return false;
3061    };
3062    let head_name = Path::new(head)
3063        .file_name()
3064        .and_then(|name| name.to_str())
3065        .unwrap_or(head);
3066    if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3067        return false;
3068    }
3069    tokens.iter().any(|token| {
3070        matches!(token.as_str(), "--json" | "--jq" | "--template")
3071            || token.starts_with("--json=")
3072            || token.starts_with("--jq=")
3073            || token.starts_with("--template=")
3074    })
3075}
3076
3077fn shell_words_for_flags(command: &str) -> Vec<String> {
3078    let mut words = Vec::new();
3079    let mut current = String::new();
3080    let mut in_single = false;
3081    let mut in_double = false;
3082    let mut escaped = false;
3083
3084    for ch in command.chars() {
3085        if escaped {
3086            current.push(ch);
3087            escaped = false;
3088            continue;
3089        }
3090        if ch == '\\' && !in_single {
3091            escaped = true;
3092            continue;
3093        }
3094        if ch == '\'' && !in_double {
3095            in_single = !in_single;
3096            continue;
3097        }
3098        if ch == '"' && !in_single {
3099            in_double = !in_double;
3100            continue;
3101        }
3102        if ch.is_whitespace() && !in_single && !in_double {
3103            if !current.is_empty() {
3104                words.push(std::mem::take(&mut current));
3105            }
3106            continue;
3107        }
3108        if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3109            if !current.is_empty() {
3110                words.push(std::mem::take(&mut current));
3111            }
3112            continue;
3113        }
3114        current.push(ch);
3115    }
3116    if !current.is_empty() {
3117        words.push(current);
3118    }
3119    words
3120}
3121
3122fn is_structured_body(body: &str) -> bool {
3123    let trimmed = body.trim();
3124    if trimmed.is_empty() {
3125        return false;
3126    }
3127    if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3128        return true;
3129    }
3130
3131    let mut saw_line = false;
3132    for line in trimmed
3133        .lines()
3134        .map(str::trim)
3135        .filter(|line| !line.is_empty())
3136    {
3137        saw_line = true;
3138        if serde_json::from_str::<serde_json::Value>(line).is_err() {
3139            return false;
3140        }
3141    }
3142    saw_line
3143}
3144
3145fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3146    let path = match (buffer, stream) {
3147        (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3148        (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3149        (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3150    };
3151    let Some(path) = path else {
3152        return false;
3153    };
3154    let Ok(file) = std::fs::File::open(path) else {
3155        return false;
3156    };
3157    let mut limited = file.take(512);
3158    let mut bytes = Vec::new();
3159    if limited.read_to_end(&mut bytes).is_err() {
3160        return false;
3161    }
3162    String::from_utf8_lossy(&bytes)
3163        .chars()
3164        .find(|ch| !ch.is_whitespace())
3165        .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3166}
3167
3168struct CompletionTokenCounts {
3169    original_tokens: Option<u32>,
3170    compressed_tokens: Option<u32>,
3171    original_bytes: Option<i64>,
3172    compressed_bytes: Option<i64>,
3173    tokens_skipped: bool,
3174}
3175
3176impl CompletionTokenCounts {
3177    fn skipped() -> Self {
3178        Self {
3179            original_tokens: None,
3180            compressed_tokens: None,
3181            original_bytes: None,
3182            compressed_bytes: None,
3183            tokens_skipped: true,
3184        }
3185    }
3186}
3187
3188fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3189    match status {
3190        BgTaskStatus::TimedOut => "timed out".to_string(),
3191        BgTaskStatus::Killed => "killed".to_string(),
3192        _ => exit_code
3193            .map(|code| format!("exit {code}"))
3194            .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3195    }
3196}
3197
3198fn token_count_u32(text: &str) -> u32 {
3199    aft_tokenizer::count_tokens(text)
3200        .try_into()
3201        .unwrap_or(u32::MAX)
3202}
3203
3204impl Default for BgTaskRegistry {
3205    fn default() -> Self {
3206        Self::new(Arc::new(Mutex::new(None)))
3207    }
3208}
3209
3210fn modified_within(path: &Path, grace: Duration) -> bool {
3211    fs::metadata(path)
3212        .and_then(|metadata| metadata.modified())
3213        .ok()
3214        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3215        .map(|age| age < grace)
3216        .unwrap_or(false)
3217}
3218
3219fn canonicalized_path(path: &Path) -> PathBuf {
3220    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3221}
3222
3223fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3224    let now_ms = SystemTime::now()
3225        .duration_since(UNIX_EPOCH)
3226        .ok()
3227        .map(|duration| duration.as_millis() as u64)
3228        .unwrap_or(started_at);
3229    let elapsed_ms = now_ms.saturating_sub(started_at);
3230    Instant::now()
3231        .checked_sub(Duration::from_millis(elapsed_ms))
3232        .unwrap_or_else(Instant::now)
3233}
3234
3235fn gc_quarantine(storage_dir: &Path) {
3236    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3237    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3238        return;
3239    };
3240    for session_entry in session_dirs.flatten() {
3241        let session_quarantine_dir = session_entry.path();
3242        if !session_quarantine_dir.is_dir() {
3243            continue;
3244        }
3245        let entries = match fs::read_dir(&session_quarantine_dir) {
3246            Ok(entries) => entries,
3247            Err(error) => {
3248                crate::slog_warn!(
3249                    "failed to read background task quarantine dir {}: {error}",
3250                    session_quarantine_dir.display()
3251                );
3252                continue;
3253            }
3254        };
3255        for entry in entries.flatten() {
3256            let path = entry.path();
3257            if modified_within(&path, QUARANTINE_GC_GRACE) {
3258                continue;
3259            }
3260            let result = if path.is_dir() {
3261                fs::remove_dir_all(&path)
3262            } else {
3263                fs::remove_file(&path)
3264            };
3265            match result {
3266                Ok(()) => log::debug!(
3267                    "deleted old background task quarantine entry {}",
3268                    path.display()
3269                ),
3270                Err(error) => crate::slog_warn!(
3271                    "failed to delete old background task quarantine entry {}: {error}",
3272                    path.display()
3273                ),
3274            }
3275        }
3276        let _ = fs::remove_dir(&session_quarantine_dir);
3277    }
3278    let _ = fs::remove_dir(&quarantine_root);
3279}
3280
3281enum QuarantineKind {
3282    Corrupt,
3283    Invalid,
3284}
3285
3286fn quarantine_task_json(
3287    storage_dir: &Path,
3288    session_dir: &Path,
3289    json_path: &Path,
3290    kind: QuarantineKind,
3291) -> Result<(), String> {
3292    let session_hash = session_dir
3293        .file_name()
3294        .and_then(|name| name.to_str())
3295        .ok_or_else(|| {
3296            format!(
3297                "invalid background task session dir: {}",
3298                session_dir.display()
3299            )
3300        })?;
3301    let task_name = json_path
3302        .file_name()
3303        .and_then(|name| name.to_str())
3304        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3305    let unix_ts = SystemTime::now()
3306        .duration_since(UNIX_EPOCH)
3307        .map(|duration| duration.as_secs())
3308        .unwrap_or(0);
3309    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3310    fs::create_dir_all(&quarantine_dir).map_err(|e| {
3311        format!(
3312            "failed to create background task quarantine dir {}: {e}",
3313            quarantine_dir.display()
3314        )
3315    })?;
3316    let target_name = quarantine_name(task_name, unix_ts, &kind);
3317    let target = quarantine_dir.join(target_name);
3318    fs::rename(json_path, &target).map_err(|e| {
3319        format!(
3320            "failed to quarantine background task metadata {} to {}: {e}",
3321            json_path.display(),
3322            target.display()
3323        )
3324    })?;
3325
3326    for sibling in task_sibling_paths(json_path) {
3327        if !sibling.exists() {
3328            continue;
3329        }
3330        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3331            crate::slog_warn!(
3332                "skipping background task sibling with invalid name during quarantine: {}",
3333                sibling.display()
3334            );
3335            continue;
3336        };
3337        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3338        if let Err(error) = fs::rename(&sibling, &sibling_target) {
3339            crate::slog_warn!(
3340                "failed to quarantine background task sibling {} to {}: {error}",
3341                sibling.display(),
3342                sibling_target.display()
3343            );
3344        }
3345    }
3346
3347    let _ = fs::remove_dir(session_dir);
3348    Ok(())
3349}
3350
3351fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3352    match kind {
3353        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3354        QuarantineKind::Invalid => {
3355            let path = Path::new(file_name);
3356            let stem = path.file_stem().and_then(|stem| stem.to_str());
3357            let extension = path.extension().and_then(|extension| extension.to_str());
3358            match (stem, extension) {
3359                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3360                _ => format!("{file_name}.invalid.{unix_ts}"),
3361            }
3362        }
3363    }
3364}
3365
3366fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3367    let Some(parent) = json_path.parent() else {
3368        return Vec::new();
3369    };
3370    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3371        return Vec::new();
3372    };
3373    ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3374        .into_iter()
3375        .map(|extension| parent.join(format!("{stem}.{extension}")))
3376        .collect()
3377}
3378
3379fn read_for_token_count_from_disk(
3380    metadata: &PersistedTask,
3381    paths: &TaskPaths,
3382    max_bytes_per_stream: usize,
3383) -> TokenCountInput {
3384    if metadata.mode == BgMode::Pty {
3385        return TokenCountInput::Skipped;
3386    }
3387    // Read up to `max_bytes_per_stream` bytes per stream rather than
3388    // refusing to tokenize anything when the file exceeds the cap.
3389    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
3390    // (see comment there) — large outputs are exactly the tasks that
3391    // benefit most from compression accounting, so silent-skipping
3392    // them defeats the purpose of token tracking.
3393    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3394    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3395    match (stdout, stderr) {
3396        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3397            String::from_utf8_lossy(&stdout).as_ref(),
3398            String::from_utf8_lossy(&stderr).as_ref(),
3399        )),
3400        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3401            String::from_utf8_lossy(&stdout).as_ref(),
3402            "",
3403        )),
3404        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3405            "",
3406            String::from_utf8_lossy(&stderr).as_ref(),
3407        )),
3408        (Err(_), Err(_)) => TokenCountInput::Skipped,
3409    }
3410}
3411
3412/// Read at most `max_bytes` bytes from the END of `path`. Used for
3413/// tokenization where the most recent output is more representative than
3414/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
3415/// opened (genuinely missing or permissions error).
3416fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3417    use std::io::{Read, Seek, SeekFrom};
3418    let mut file = std::fs::File::open(path)?;
3419    let len = file.metadata()?.len();
3420    let read_len = len.min(max_bytes as u64);
3421    if read_len > 0 && len > max_bytes as u64 {
3422        file.seek(SeekFrom::End(-(read_len as i64)))?;
3423    }
3424    let mut bytes = Vec::with_capacity(read_len as usize);
3425    file.read_to_end(&mut bytes)?;
3426    Ok(bytes)
3427}
3428
3429impl BgTask {
3430    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3431        let state = self
3432            .state
3433            .lock()
3434            .unwrap_or_else(|poison| poison.into_inner());
3435        self.snapshot_locked(&state, preview_bytes)
3436    }
3437
3438    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3439        let metadata = &state.metadata;
3440        let duration_ms = metadata.duration_ms.or_else(|| {
3441            metadata
3442                .status
3443                .is_terminal()
3444                .then(|| self.started.elapsed().as_millis() as u64)
3445        });
3446        let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3447            (String::new(), false)
3448        } else if metadata.status.is_terminal() {
3449            state
3450                .terminal_output_cache
3451                .as_ref()
3452                .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3453                .unwrap_or_else(|| (String::new(), false))
3454        } else {
3455            state.buffer.read_tail(preview_bytes)
3456        };
3457        BgTaskSnapshot {
3458            info: BgTaskInfo {
3459                task_id: self.task_id.clone(),
3460                status: metadata.status.clone(),
3461                command: metadata.command.clone(),
3462                mode: metadata.mode.clone(),
3463                started_at: metadata.started_at,
3464                duration_ms,
3465            },
3466            exit_code: metadata.exit_code,
3467            child_pid: metadata.child_pid,
3468            workdir: metadata.workdir.display().to_string(),
3469            output_preview,
3470            output_truncated,
3471            output_path: state
3472                .buffer
3473                .output_path()
3474                .map(|path| path.display().to_string()),
3475            stderr_path: state
3476                .buffer
3477                .stderr_path()
3478                .map(|path| path.display().to_string()),
3479            pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3480            pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3481        }
3482    }
3483
3484    pub(crate) fn is_running(&self) -> bool {
3485        self.state
3486            .lock()
3487            .map(|state| {
3488                state.metadata.status == BgTaskStatus::Running
3489                    || (state.metadata.mode == BgMode::Pty
3490                        && state.metadata.status == BgTaskStatus::Killing)
3491            })
3492            .unwrap_or(false)
3493    }
3494
3495    fn is_terminal(&self) -> bool {
3496        self.state
3497            .lock()
3498            .map(|state| state.metadata.status.is_terminal())
3499            .unwrap_or(false)
3500    }
3501
3502    fn mark_terminal_now(&self) {
3503        if let Ok(mut terminal_at) = self.terminal_at.lock() {
3504            if terminal_at.is_none() {
3505                *terminal_at = Some(Instant::now());
3506            }
3507        }
3508    }
3509
3510    fn set_completion_delivered(
3511        &self,
3512        delivered: bool,
3513        registry: &BgTaskRegistry,
3514    ) -> Result<(), String> {
3515        let mut state = self
3516            .state
3517            .lock()
3518            .map_err(|_| "background task lock poisoned".to_string())?;
3519        let updated = registry
3520            .update_task_metadata(&self.paths, |metadata| {
3521                metadata.completion_delivered = delivered;
3522            })
3523            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3524        state.metadata = updated;
3525        Ok(())
3526    }
3527}
3528
3529/// Reap an exited direct child handle, then clear the slot.
3530///
3531/// Dropping a [`std::process::Child`] does NOT `wait()` on the underlying OS
3532/// process. On Unix a finished-but-unreaped child lingers as a `<defunct>`
3533/// zombie until the AFT process itself exits (issue #91: `[mv] <defunct>`).
3534/// The terminal-transition paths that learn of completion from the
3535/// exit-marker file — rather than from [`BgTaskRegistry::reap_child`]'s
3536/// `try_wait()` — must therefore reap the handle explicitly instead of just
3537/// nulling it.
3538///
3539/// The exit marker is written by the wrapper's final statement (an atomic
3540/// `mv` rename), so by the time we observe the marker the direct child has
3541/// finished its work and is exiting; `wait()` returns essentially
3542/// immediately. We attempt a non-blocking `try_wait()` first so the common
3543/// case never blocks at all, falling back to a (bounded) `wait()` only to
3544/// cover the microsecond window between the rename and process teardown.
3545///
3546/// Callers hold the task state mutex, so this is serialized against
3547/// `reap_child` — there is no double-`wait()` hazard: whichever path acquires
3548/// the lock first reaps and clears the slot, and the other observes `None`.
3549#[cfg(unix)]
3550fn reap_piped_child(child_slot: &mut Option<Child>) {
3551    if let Some(mut child) = child_slot.take() {
3552        if matches!(child.try_wait(), Ok(None)) {
3553            let _ = child.wait();
3554        }
3555    }
3556}
3557
3558/// Windows has no zombie/`<defunct>` concept: dropping the [`Child`] closes
3559/// the process handle, which is the correct release. Preserve the historical
3560/// behavior of simply clearing the slot so the documented Windows PID-recycle
3561/// handling in `reap_child` is unaffected.
3562#[cfg(windows)]
3563fn reap_piped_child(child_slot: &mut Option<Child>) {
3564    *child_slot = None;
3565}
3566
3567fn terminal_metadata_from_marker(
3568    mut metadata: PersistedTask,
3569    marker: ExitMarker,
3570    reason: Option<String>,
3571) -> PersistedTask {
3572    match marker {
3573        ExitMarker::Code(code) => {
3574            let status = if code == 0 {
3575                BgTaskStatus::Completed
3576            } else {
3577                BgTaskStatus::Failed
3578            };
3579            metadata.mark_terminal(status, Some(code), reason);
3580        }
3581        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3582    }
3583    metadata
3584}
3585
3586#[cfg(unix)]
3587fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
3588    let shell = resolve_posix_shell();
3589    let mut cmd = Command::new(&shell);
3590    cmd.arg("-c")
3591        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
3592        .arg(&shell)
3593        .arg(command)
3594        .arg(exit_path);
3595    unsafe {
3596        cmd.pre_exec(|| {
3597            if libc::setsid() == -1 {
3598                return Err(std::io::Error::last_os_error());
3599            }
3600            Ok(())
3601        });
3602    }
3603    cmd
3604}
3605
3606#[cfg(unix)]
3607fn resolve_posix_shell() -> PathBuf {
3608    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3609    POSIX_SHELL
3610        .get_or_init(|| {
3611            std::env::var_os("BASH")
3612                .filter(|value| !value.is_empty())
3613                .map(PathBuf::from)
3614                .filter(|path| path.exists())
3615                .or_else(|| which::which("bash").ok())
3616                .or_else(|| which::which("zsh").ok())
3617                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3618        })
3619        .clone()
3620}
3621
3622#[cfg(windows)]
3623fn detached_shell_command_for(
3624    shell: crate::windows_shell::WindowsShell,
3625    command: &str,
3626    exit_path: &Path,
3627    paths: &TaskPaths,
3628    creation_flags: u32,
3629) -> Result<Command, String> {
3630    use crate::windows_shell::WindowsShell;
3631    // Write the wrapper to a temp file alongside the other task files,
3632    // then invoke the shell with the file path as a single clean
3633    // argument. This sidesteps the entire Windows command-line quoting
3634    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
3635    // parser all interacting with embedded quotes in the wrapper).
3636    //
3637    // Path arguments don't need quoting in the same problematic way
3638    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
3639    // contains no characters that need shell escaping; (2) the wrapper
3640    // body's internal quotes never reach the shell command line — the
3641    // shell reads them from disk by file syntax rules, not command-line
3642    // parser rules.
3643    let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3644    let wrapper_ext = match shell {
3645        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3646        WindowsShell::Cmd => "bat",
3647        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
3648        // so the file extension is purely cosmetic; `.sh` matches what an
3649        // operator would expect when grepping the spill directory.
3650        WindowsShell::Posix(_) => "sh",
3651    };
3652    let wrapper_path = paths.dir.join(format!(
3653        "{}.{}",
3654        paths
3655            .json
3656            .file_stem()
3657            .and_then(|s| s.to_str())
3658            .unwrap_or("wrapper"),
3659        wrapper_ext
3660    ));
3661    fs::write(&wrapper_path, wrapper_body)
3662        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3663
3664    let mut cmd = Command::new(shell.binary().as_ref());
3665    match shell {
3666        WindowsShell::Pwsh | WindowsShell::Powershell => {
3667            // -File runs the script with no quoting issues. `-NoLogo`,
3668            // `-NoProfile`, etc. apply to the host before the file runs.
3669            cmd.args([
3670                "-NoLogo",
3671                "-NoProfile",
3672                "-NonInteractive",
3673                "-ExecutionPolicy",
3674                "Bypass",
3675                "-File",
3676            ]);
3677            cmd.arg(&wrapper_path);
3678        }
3679        WindowsShell::Cmd => {
3680            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
3681            // file via /C is well-defined; the file's contents are
3682            // read line-by-line by cmd's batch processor, NOT
3683            // re-interpreted by the /C parser. This avoids the
3684            // "filename syntax incorrect" errors that came from
3685            // having complex compound commands on the cmd line.
3686            cmd.args(["/D", "/C"]);
3687            cmd.arg(&wrapper_path);
3688        }
3689        WindowsShell::Posix(_) => {
3690            // git-bash and other POSIX shells run the wrapper script with
3691            // `<binary> <wrapper-path>` (the wrapper is just a shell
3692            // script). No special flags needed — the `trap` and atomic
3693            // exit-marker rename in `wrapper_script` are POSIX-standard.
3694            cmd.arg(&wrapper_path);
3695        }
3696    }
3697
3698    // Win32 process creation flags. Caller selects whether to include
3699    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
3700    // for the breakaway-fallback strategy.
3701    cmd.creation_flags(creation_flags);
3702    Ok(cmd)
3703}
3704
3705/// Spawn a detached background bash child process.
3706///
3707/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
3708/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
3709/// cmd.exe) and retries with the next candidate when the previous one
3710/// fails to spawn with `NotFound` — the same runtime safety net the
3711/// foreground bash path has, so issue #27 callers landing on cmd.exe
3712/// fallback can also use background bash. The wrapper script is
3713/// regenerated per attempt because PowerShell wrappers embed the shell
3714/// binary by name; the stdout/stderr capture handles are also reopened
3715/// per attempt because `Command::spawn()` consumes them.
3716///
3717/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
3718/// return immediately without retry — they indicate a problem with the
3719/// resolved shell that retrying with a different shell won't fix.
3720fn spawn_detached_child(
3721    command: &str,
3722    paths: &TaskPaths,
3723    workdir: &Path,
3724    env: &HashMap<String, String>,
3725) -> Result<std::process::Child, String> {
3726    #[cfg(not(windows))]
3727    {
3728        let stdout = create_capture_file(&paths.stdout)
3729            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3730        let stderr = create_capture_file(&paths.stderr)
3731            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3732        detached_shell_command(command, &paths.exit)
3733            .current_dir(workdir)
3734            .envs(env)
3735            .stdin(Stdio::null())
3736            .stdout(Stdio::from(stdout))
3737            .stderr(Stdio::from(stderr))
3738            .spawn()
3739            .map_err(|e| format!("failed to spawn background bash command: {e}"))
3740    }
3741    #[cfg(windows)]
3742    {
3743        use crate::windows_shell::shell_candidates;
3744        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
3745        // legacy foreground bash spawn path. v0.20 routes ALL bash through
3746        // this background spawn helper, including foreground tool calls
3747        // where the model writes PowerShell-syntax (`$var = ...`,
3748        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
3749        // The earlier v0.18-era cmd-first override worked around a
3750        // PowerShell detached-output bug; that bug is fixed at the
3751        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
3752        // see flag block below), so we no longer need to misroute PS
3753        // commands through cmd.
3754        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3755        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
3756        // first (so the bg child outlives the AFT process when AFT is killed),
3757        // then fall back without it for environments where the parent is in a
3758        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
3759        // runners (GitHub Actions windows-2022) and some MDM-managed corp
3760        // environments hit this — `CreateProcess` returns Access Denied (5).
3761        // Without breakaway, the child still runs detached but will be torn
3762        // down with the parent if the parent process group is signaled.
3763        //
3764        // We use CREATE_NO_WINDOW (no visible console window, but the
3765        // child still has a hidden console) rather than DETACHED_PROCESS
3766        // (no console at all). PowerShell-based wrappers that perform
3767        // file I/O via [System.IO.File] need a console handle to flush
3768        // stdout/stderr correctly even when redirected — under
3769        // DETACHED_PROCESS, pwsh sometimes silently exits before
3770        // executing later script statements (the Move-Item that writes
3771        // the exit marker never runs), leaving the bg task forever
3772        // marked Failed: process exited without exit marker. cmd.exe
3773        // wrappers tolerate DETACHED_PROCESS, but switching to
3774        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
3775        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3776        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3777        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3778        let with_breakaway =
3779            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3780        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3781        let mut last_error: Option<String> = None;
3782        for (idx, shell) in candidates.iter().enumerate() {
3783            // Per-shell, try with breakaway first. If the process is in a
3784            // restrictive job, the breakaway flag triggers Access Denied
3785            // (os error 5). Retry once without breakaway.
3786            for &flags in &[with_breakaway, without_breakaway] {
3787                // Re-open capture handles per attempt; spawn() consumes them.
3788                let stdout = create_capture_file(&paths.stdout)
3789                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3790                let stderr = create_capture_file(&paths.stderr)
3791                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3792                let mut cmd =
3793                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3794                cmd.current_dir(workdir)
3795                    .envs(env)
3796                    .stdin(Stdio::null())
3797                    .stdout(Stdio::from(stdout))
3798                    .stderr(Stdio::from(stderr));
3799                match cmd.spawn() {
3800                    Ok(child) => {
3801                        if idx > 0 {
3802                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3803                             the cached PATH probe disagreed with runtime spawn — likely PATH \
3804                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3805                            shell.binary(),
3806                            idx);
3807                        }
3808                        if flags == without_breakaway {
3809                            crate::slog_warn!(
3810                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3811                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
3812                             Spawned without breakaway; the bg task will be torn down if the \
3813                             AFT process group is killed."
3814                            );
3815                        }
3816                        return Ok(child);
3817                    }
3818                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3819                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3820                        shell.binary());
3821                        last_error = Some(format!("{}: {e}", shell.binary()));
3822                        // Skip the without-breakaway retry for NotFound — the
3823                        // binary itself is missing, breakaway flag is irrelevant.
3824                        break;
3825                    }
3826                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3827                        // Access Denied during breakaway — retry without it.
3828                        crate::slog_warn!(
3829                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3830                         Access Denied — retrying {} without breakaway",
3831                            shell.binary()
3832                        );
3833                        last_error = Some(format!("{}: {e}", shell.binary()));
3834                        continue;
3835                    }
3836                    Err(e) => {
3837                        return Err(format!(
3838                            "failed to spawn background bash command via {}: {e}",
3839                            shell.binary()
3840                        ));
3841                    }
3842                }
3843            }
3844        }
3845        Err(format!(
3846            "failed to spawn background bash command: no Windows shell could be spawned. \
3847             Last error: {}. PATH-probed candidates: {:?}",
3848            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3849            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3850        ))
3851    }
3852}
3853
3854fn random_slug() -> String {
3855    let mut bytes = [0u8; 4];
3856    // getrandom is a transitive dependency; use it directly for OS entropy.
3857    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3858        // Extremely unlikely fallback: time + pid mix.
3859        let t = SystemTime::now()
3860            .duration_since(UNIX_EPOCH)
3861            .map(|d| d.subsec_nanos())
3862            .unwrap_or(0);
3863        let p = std::process::id();
3864        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3865    });
3866    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
3867    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3868    format!("bash-{hex}")
3869}
3870
3871#[cfg(test)]
3872mod tests {
3873    use std::collections::HashMap;
3874    use std::fs;
3875    use std::sync::atomic::{AtomicBool, AtomicUsize};
3876    use std::sync::{Arc, Mutex};
3877    use std::time::Duration;
3878    #[cfg(windows)]
3879    use std::time::Instant;
3880
3881    use super::*;
3882
3883    #[cfg(unix)]
3884    const QUICK_SUCCESS_COMMAND: &str = "true";
3885    #[cfg(windows)]
3886    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3887
3888    #[cfg(unix)]
3889    const LONG_RUNNING_COMMAND: &str = "sleep 5";
3890    #[cfg(windows)]
3891    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3892
3893    fn insert_terminal_piped_task(
3894        registry: &BgTaskRegistry,
3895        dir: &tempfile::TempDir,
3896        command: &str,
3897        stdout: &str,
3898        stderr: &str,
3899        compressed: bool,
3900    ) -> (String, Arc<BgTask>) {
3901        let task_id = format!("bash-test-{}", random_slug());
3902        let paths = task_paths(dir.path(), "session", &task_id);
3903        fs::create_dir_all(&paths.dir).unwrap();
3904        fs::write(&paths.stdout, stdout).unwrap();
3905        fs::write(&paths.stderr, stderr).unwrap();
3906        let mut metadata = PersistedTask::starting(
3907            task_id.clone(),
3908            "session".to_string(),
3909            command.to_string(),
3910            dir.path().to_path_buf(),
3911            Some(dir.path().to_path_buf()),
3912            Some(30_000),
3913            true,
3914            compressed,
3915        );
3916        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3917        write_task(&paths.json, &metadata).unwrap();
3918        registry
3919            .insert_rehydrated_task(metadata, paths, true)
3920            .expect("insert terminal task");
3921        let task = registry.task_for_session(&task_id, "session").unwrap();
3922        (task_id, task)
3923    }
3924
3925    fn insert_terminal_pty_task(
3926        registry: &BgTaskRegistry,
3927        dir: &tempfile::TempDir,
3928        pty_output: &str,
3929    ) -> (String, Arc<BgTask>) {
3930        let task_id = format!("bash-test-{}", random_slug());
3931        let paths = task_paths(dir.path(), "session", &task_id);
3932        fs::create_dir_all(&paths.dir).unwrap();
3933        fs::write(&paths.pty, pty_output).unwrap();
3934        let mut metadata = PersistedTask::starting(
3935            task_id.clone(),
3936            "session".to_string(),
3937            "python".to_string(),
3938            dir.path().to_path_buf(),
3939            Some(dir.path().to_path_buf()),
3940            Some(30_000),
3941            true,
3942            true,
3943        );
3944        metadata.mode = BgMode::Pty;
3945        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3946        write_task(&paths.json, &metadata).unwrap();
3947        registry
3948            .insert_rehydrated_task(metadata, paths, true)
3949            .expect("insert terminal pty task");
3950        let task = registry.task_for_session(&task_id, "session").unwrap();
3951        (task_id, task)
3952    }
3953
3954    #[test]
3955    fn recognizes_all_recovery_marker_forms() {
3956        assert!(is_recovery_marker(
3957            "[truncated output; full output: read \"/tmp/out\"]"
3958        ));
3959        assert!(is_recovery_marker(
3960            "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
3961        ));
3962        assert!(is_recovery_marker(
3963            "[truncated output; full output unavailable]"
3964        ));
3965    }
3966
3967    #[test]
3968    fn terminal_status_polls_use_cached_render_once_and_off_lock() {
3969        let registry = BgTaskRegistry::default();
3970        let dir = tempfile::tempdir().unwrap();
3971        let (_task_id, task) = insert_terminal_piped_task(
3972            &registry,
3973            &dir,
3974            "custom-tool --verbose",
3975            &"stdout line\n".repeat(200_000),
3976            "",
3977            true,
3978        );
3979        let calls = Arc::new(AtomicUsize::new(0));
3980        let saw_unlocked_state = Arc::new(AtomicBool::new(false));
3981        let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
3982        let calls_for_closure = Arc::clone(&calls);
3983        let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
3984        let task_for_closure = Arc::clone(&task_holder);
3985        registry.set_compressor(move |_command, output| {
3986            calls_for_closure.fetch_add(1, Ordering::SeqCst);
3987            if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
3988                if task.state.try_lock().is_ok() {
3989                    unlocked_for_closure.store(true, Ordering::SeqCst);
3990                }
3991            }
3992            CompressionResult::new(format!("compressed {} bytes", output.len()))
3993        });
3994
3995        let first = registry
3996            .status(
3997                &task.task_id,
3998                "session",
3999                None,
4000                Some(dir.path()),
4001                RUNNING_OUTPUT_PREVIEW_BYTES,
4002            )
4003            .unwrap();
4004        let second = registry
4005            .status(
4006                &task.task_id,
4007                "session",
4008                None,
4009                Some(dir.path()),
4010                RUNNING_OUTPUT_PREVIEW_BYTES,
4011            )
4012            .unwrap();
4013        let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4014
4015        assert_eq!(
4016            calls.load(Ordering::SeqCst),
4017            1,
4018            "terminal render must be cached"
4019        );
4020        assert!(
4021            saw_unlocked_state.load(Ordering::SeqCst),
4022            "compressor must run after releasing the task state lock"
4023        );
4024        assert!(first.output_preview.starts_with("compressed "));
4025        assert_eq!(second.output_preview, first.output_preview);
4026        assert_eq!(listed[0].output_preview, first.output_preview);
4027    }
4028
4029    #[test]
4030    fn completion_preview_uses_head_and_tail_not_blind_tail() {
4031        let registry = BgTaskRegistry::default();
4032        let dir = tempfile::tempdir().unwrap();
4033        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4034        let (_task_id, task) =
4035            insert_terminal_piped_task(&registry, &dir, "cat big.log", &output, "", false);
4036
4037        registry.post_terminal_transition(&task, true).unwrap();
4038        let completions = registry.drain_completions_for_session(Some("session"));
4039        assert_eq!(completions.len(), 1);
4040        let preview = &completions[0].output_preview;
4041        assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4042        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4043        assert!(
4044            preview.contains("...<truncated "),
4045            "preview was {preview:?}"
4046        );
4047    }
4048
4049    #[test]
4050    fn structured_gh_json_survives_intact_and_ignores_stderr() {
4051        let registry = BgTaskRegistry::default();
4052        let dir = tempfile::tempdir().unwrap();
4053        let calls = Arc::new(AtomicUsize::new(0));
4054        let calls_for_closure = Arc::clone(&calls);
4055        registry.set_compressor(move |_command, output| {
4056            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4057            CompressionResult::new(output)
4058        });
4059        let (task_id, _task) = insert_terminal_piped_task(
4060            &registry,
4061            &dir,
4062            "gh pr view 123 --json body",
4063            "{\"body\":\"hello\"}",
4064            "warning: stderr must not join json",
4065            true,
4066        );
4067
4068        let snapshot = registry
4069            .status(
4070                &task_id,
4071                "session",
4072                None,
4073                Some(dir.path()),
4074                RUNNING_OUTPUT_PREVIEW_BYTES,
4075            )
4076            .unwrap();
4077
4078        assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4079        assert!(!snapshot.output_preview.contains("warning"));
4080        assert!(!snapshot.output_truncated);
4081        assert_eq!(
4082            calls.load(Ordering::SeqCst),
4083            0,
4084            "structured JSON bypasses compression"
4085        );
4086    }
4087
4088    #[test]
4089    fn registry_emits_single_recovery_marker_for_class_drops() {
4090        let registry = BgTaskRegistry::default();
4091        let dir = tempfile::tempdir().unwrap();
4092        registry.set_compressor(move |_command, _output| {
4093            let mut dropped = BTreeMap::new();
4094            dropped.insert(DropClass::Error, 18);
4095            dropped.insert(DropClass::Warning, 6);
4096            CompressionResult::with_class_drops("kept diagnostic", dropped)
4097        });
4098        let (task_id, task) =
4099            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4100
4101        let snapshot = registry
4102            .status(
4103                &task_id,
4104                "session",
4105                None,
4106                Some(dir.path()),
4107                RUNNING_OUTPUT_PREVIEW_BYTES,
4108            )
4109            .unwrap();
4110
4111        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4112        assert!(snapshot.output_preview.contains("+18 more errors"));
4113        assert!(snapshot.output_preview.contains("+6 more warnings"));
4114        assert!(snapshot
4115            .output_preview
4116            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4117        assert!(!snapshot.output_preview.contains("tail -n +"));
4118        assert!(snapshot.output_truncated);
4119    }
4120
4121    #[test]
4122    fn registry_marker_reports_semantic_and_byte_drops_once() {
4123        let registry = BgTaskRegistry::default();
4124        let dir = tempfile::tempdir().unwrap();
4125        registry.set_compressor(move |_command, _output| {
4126            let mut dropped = BTreeMap::new();
4127            dropped.insert(DropClass::Error, 1);
4128            CompressionResult::with_class_drops(
4129                format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4130                dropped,
4131            )
4132        });
4133        let (task_id, _task) =
4134            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4135
4136        let snapshot = registry
4137            .status(
4138                &task_id,
4139                "session",
4140                None,
4141                Some(dir.path()),
4142                RUNNING_OUTPUT_PREVIEW_BYTES,
4143            )
4144            .unwrap();
4145
4146        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4147        assert!(snapshot.output_preview.contains("+1 more error"));
4148        assert!(snapshot.output_preview.contains("truncated output"));
4149        assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4150        assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4151        assert!(!snapshot.output_preview.contains("...<truncated"));
4152        assert!(snapshot.output_truncated);
4153    }
4154
4155    #[test]
4156    fn cargo_stderr_class_drops_name_both_capture_paths() {
4157        let registry = BgTaskRegistry::default();
4158        let dir = tempfile::tempdir().unwrap();
4159        let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4160        registry.set_compressor(move |command, output| {
4161            crate::compress::compress_with_registry(command, &output, &filter_registry)
4162        });
4163        let stderr = (0..22)
4164            .map(|index| {
4165                format!(
4166                    "error: cargo failure {index}\n  --> src/lib.rs:{}:1\n   |\n{} | boom\n",
4167                    index + 1,
4168                    index + 1
4169                )
4170            })
4171            .collect::<Vec<_>>()
4172            .join("\n");
4173        let (task_id, task) = insert_terminal_piped_task(
4174            &registry,
4175            &dir,
4176            "cargo check",
4177            "Finished dev [unoptimized] target(s) in 0.01s\n",
4178            &stderr,
4179            true,
4180        );
4181
4182        let snapshot = registry
4183            .status(
4184                &task_id,
4185                "session",
4186                None,
4187                Some(dir.path()),
4188                RUNNING_OUTPUT_PREVIEW_BYTES,
4189            )
4190            .unwrap();
4191
4192        assert!(snapshot.output_preview.contains("+2 more errors"));
4193        assert!(snapshot
4194            .output_preview
4195            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4196        assert!(snapshot
4197            .output_preview
4198            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4199        assert!(!snapshot.output_preview.contains("tail -n +"));
4200    }
4201
4202    #[test]
4203    fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4204        let registry = BgTaskRegistry::default();
4205        let dir = tempfile::tempdir().unwrap();
4206        let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4207        let (task_id, task) = insert_terminal_piped_task(
4208            &registry,
4209            &dir,
4210            "cd /repo && gh pr view 123 --json body",
4211            &body,
4212            "",
4213            true,
4214        );
4215
4216        let snapshot = registry
4217            .status(
4218                &task_id,
4219                "session",
4220                None,
4221                Some(dir.path()),
4222                RUNNING_OUTPUT_PREVIEW_BYTES,
4223            )
4224            .unwrap();
4225
4226        assert!(snapshot.output_preview.starts_with("[JSON output "));
4227        assert!(snapshot
4228            .output_preview
4229            .contains(&task.paths.stdout.display().to_string()));
4230        assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4231        assert!(snapshot.output_truncated);
4232    }
4233
4234    #[test]
4235    fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4236        let registry = BgTaskRegistry::default();
4237        let dir = tempfile::tempdir().unwrap();
4238        let filter_registry = crate::compress::toml_filter::build_registry(
4239            crate::compress::builtin_filters::ALL,
4240            None,
4241            None,
4242        );
4243        registry.set_compressor(move |command, output| {
4244            crate::compress::compress_with_registry(command, &output, &filter_registry)
4245        });
4246        let stdout = format!(
4247            "make[1]: Entering directory `/tmp`\n{}",
4248            (0..100)
4249                .map(|index| format!("compile line {index}"))
4250                .collect::<Vec<_>>()
4251                .join("\n")
4252        );
4253        let (task_id, task) =
4254            insert_terminal_piped_task(&registry, &dir, "make all", &stdout, "", true);
4255
4256        let snapshot = registry
4257            .status(
4258                &task_id,
4259                "session",
4260                None,
4261                Some(dir.path()),
4262                RUNNING_OUTPUT_PREVIEW_BYTES,
4263            )
4264            .unwrap();
4265
4266        assert!(snapshot.output_preview.contains("compile line 99"));
4267        assert!(snapshot.output_preview.contains(&format!(
4268            "full output: read \"{}\"",
4269            task.paths.stdout.display()
4270        )));
4271        assert!(!snapshot
4272            .output_preview
4273            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4274        assert!(!snapshot.output_preview.contains("tail -n +"));
4275    }
4276
4277    #[test]
4278    fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4279        let registry = BgTaskRegistry::default();
4280        let dir = tempfile::tempdir().unwrap();
4281        let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4282        let (task_id, task) =
4283            insert_terminal_piped_task(&registry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4284
4285        let snapshot = registry
4286            .status(
4287                &task_id,
4288                "session",
4289                None,
4290                Some(dir.path()),
4291                RUNNING_OUTPUT_PREVIEW_BYTES,
4292            )
4293            .unwrap();
4294
4295        assert!(snapshot.output_preview.contains("RAW-HEAD"));
4296        assert!(snapshot.output_preview.contains("RAW-TAIL"));
4297        assert!(snapshot.output_preview.contains("truncated output"));
4298        assert!(snapshot
4299            .output_preview
4300            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4301        assert!(snapshot
4302            .output_preview
4303            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4304        assert!(!snapshot.output_preview.contains("tail -n +"));
4305        assert!(snapshot.output_preview.len() > 16 * 1024);
4306        assert!(snapshot.output_truncated);
4307    }
4308
4309    #[test]
4310    fn pty_terminal_snapshot_bypasses_line_compression() {
4311        let registry = BgTaskRegistry::default();
4312        let dir = tempfile::tempdir().unwrap();
4313        let calls = Arc::new(AtomicUsize::new(0));
4314        let calls_for_closure = Arc::clone(&calls);
4315        registry.set_compressor(move |_command, output| {
4316            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4317            CompressionResult::new(output)
4318        });
4319        let (task_id, _task) = insert_terminal_pty_task(&registry, &dir, "raw\u{1b}[31m pty bytes");
4320
4321        let snapshot = registry
4322            .status(
4323                &task_id,
4324                "session",
4325                None,
4326                Some(dir.path()),
4327                RUNNING_OUTPUT_PREVIEW_BYTES,
4328            )
4329            .unwrap();
4330
4331        assert_eq!(snapshot.info.mode, BgMode::Pty);
4332        assert_eq!(snapshot.output_preview, "");
4333        assert_eq!(calls.load(Ordering::SeqCst), 0);
4334    }
4335
4336    #[test]
4337    fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4338        let registry = BgTaskRegistry::default();
4339        let dir = tempfile::tempdir().unwrap();
4340        let task_id = registry
4341            .spawn_pty(
4342                QUICK_SUCCESS_COMMAND,
4343                "session".to_string(),
4344                dir.path().to_path_buf(),
4345                HashMap::new(),
4346                Some(Duration::from_secs(30)),
4347                dir.path().to_path_buf(),
4348                10,
4349                true,
4350                false,
4351                Some(dir.path().to_path_buf()),
4352                50,
4353                120,
4354            )
4355            .unwrap();
4356
4357        let paths = task_paths(dir.path(), "session", &task_id);
4358        let metadata = read_task(&paths.json).unwrap();
4359        assert_eq!(
4360            metadata.schema_version,
4361            crate::bash_background::persistence::SCHEMA_VERSION
4362        );
4363        assert_eq!(metadata.mode, BgMode::Pty);
4364        assert_eq!(metadata.pty_rows, Some(50));
4365        assert_eq!(metadata.pty_cols, Some(120));
4366
4367        let snapshot = registry
4368            .status(&task_id, "session", None, Some(dir.path()), 1024)
4369            .unwrap();
4370        assert_eq!(snapshot.pty_rows, Some(50));
4371        assert_eq!(snapshot.pty_cols, Some(120));
4372    }
4373
4374    /// Spawn a child process that exits immediately and return it after
4375    /// it has terminated. Used by reap_child tests to simulate the
4376    /// "child exists and is dead" state when the watchdog has already
4377    /// nulled out the original child handle.
4378    fn spawn_dead_child() -> std::process::Child {
4379        #[cfg(unix)]
4380        let mut cmd = std::process::Command::new("true");
4381        #[cfg(windows)]
4382        let mut cmd = {
4383            let mut c = std::process::Command::new("cmd");
4384            c.args(["/c", "exit", "0"]);
4385            c
4386        };
4387        cmd.stdin(std::process::Stdio::null());
4388        cmd.stdout(std::process::Stdio::null());
4389        cmd.stderr(std::process::Stdio::null());
4390        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4391        // Poll try_wait() until the child actually exits, instead of calling
4392        // wait() which closes the OS handle. On Windows, after wait()
4393        // closes the handle, subsequent try_wait() calls (which reap_child
4394        // depends on) return Err — the test was inadvertently giving
4395        // reap_child an unusable child handle. Polling try_wait() keeps the
4396        // handle open and observes natural exit, matching the production
4397        // shape where the watchdog discovers an exited child for the first
4398        // time.
4399        let started = Instant::now();
4400        loop {
4401            match child.try_wait() {
4402                Ok(Some(_)) => break,
4403                Ok(None) => {
4404                    if started.elapsed() > Duration::from_secs(5) {
4405                        panic!("dead-child stand-in did not exit within 5s");
4406                    }
4407                    std::thread::sleep(Duration::from_millis(10));
4408                }
4409                Err(error) => panic!("dead-child try_wait failed: {error}"),
4410            }
4411        }
4412        child
4413    }
4414
4415    #[test]
4416    fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4417        let registry = BgTaskRegistry::default();
4418        let dir = tempfile::tempdir().unwrap();
4419        let task_id = registry
4420            .spawn(
4421                LONG_RUNNING_COMMAND,
4422                "session".to_string(),
4423                dir.path().to_path_buf(),
4424                HashMap::new(),
4425                Some(Duration::from_secs(30)),
4426                dir.path().to_path_buf(),
4427                10,
4428                true,
4429                false,
4430                Some(dir.path().to_path_buf()),
4431            )
4432            .unwrap();
4433        registry
4434            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4435            .unwrap();
4436        assert_eq!(
4437            registry
4438                .drain_completions_for_session(Some("session"))
4439                .len(),
4440            1
4441        );
4442
4443        // Simulate the plugin consuming a sync bash_watch({ exit:true }) result
4444        // locally before the Rust completion queue is drained/acked.
4445        registry.inner.completions.lock().unwrap().clear();
4446
4447        assert_eq!(
4448            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4449            vec![task_id.clone()]
4450        );
4451        assert!(registry
4452            .drain_completions_for_session(Some("session"))
4453            .is_empty());
4454
4455        let paths = task_paths(dir.path(), "session", &task_id);
4456        let metadata = read_task(&paths.json).unwrap();
4457        assert!(metadata.completion_delivered);
4458
4459        let replayed = BgTaskRegistry::default();
4460        replayed
4461            .replay_session_inner(dir.path(), "session", None)
4462            .unwrap();
4463        assert!(replayed
4464            .drain_completions_for_session(Some("session"))
4465            .is_empty());
4466    }
4467
4468    #[test]
4469    fn register_watch_rejects_unknown_task() {
4470        let registry = BgTaskRegistry::default();
4471
4472        let result = registry.register_watch(
4473            "missing-task".to_string(),
4474            WatchPattern::Substring("READY".into()),
4475            true,
4476        );
4477
4478        assert_eq!(result, Err("task_not_found"));
4479    }
4480
4481    #[test]
4482    fn register_watch_on_terminal_task_scans_existing_output() {
4483        let frames = Arc::new(Mutex::new(Vec::new()));
4484        let captured = Arc::clone(&frames);
4485        let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4486            captured.lock().unwrap().push(frame);
4487        })
4488            as Box<dyn Fn(PushFrame) + Send + Sync>);
4489        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4490        let dir = tempfile::tempdir().unwrap();
4491        let task_id = registry
4492            .spawn(
4493                LONG_RUNNING_COMMAND,
4494                "session".to_string(),
4495                dir.path().to_path_buf(),
4496                HashMap::new(),
4497                Some(Duration::from_secs(30)),
4498                dir.path().to_path_buf(),
4499                10,
4500                true,
4501                false,
4502                Some(dir.path().to_path_buf()),
4503            )
4504            .unwrap();
4505        registry
4506            .inner
4507            .shutdown
4508            .store(true, std::sync::atomic::Ordering::SeqCst);
4509        let task = registry.task_for_session(&task_id, "session").unwrap();
4510        std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4511        registry
4512            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4513            .unwrap();
4514        frames.lock().unwrap().clear();
4515        registry.inner.completions.lock().unwrap().clear();
4516
4517        registry
4518            .register_watch(
4519                task_id.clone(),
4520                WatchPattern::Substring("READY".into()),
4521                true,
4522            )
4523            .unwrap();
4524
4525        let frames = frames.lock().unwrap();
4526        let frame = frames
4527            .iter()
4528            .find_map(|frame| match frame {
4529                PushFrame::BashPatternMatch(frame) => Some(frame),
4530                _ => None,
4531            })
4532            .expect("terminal watch registration should emit pattern frame");
4533        assert_eq!(frame.reason, "pattern_match");
4534        assert_eq!(frame.task_id, task_id);
4535        assert_eq!(frame.session_id, "session");
4536        assert_eq!(frame.match_text, "READY");
4537        assert_eq!(frame.match_offset, 0);
4538        assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4539        let metadata = read_task(&task.paths.json).unwrap();
4540        assert!(metadata.completion_delivered);
4541    }
4542
4543    #[test]
4544    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4545        let registry = BgTaskRegistry::default();
4546        let dir = tempfile::tempdir().unwrap();
4547        let task_id = registry
4548            .spawn(
4549                QUICK_SUCCESS_COMMAND,
4550                "session".to_string(),
4551                dir.path().to_path_buf(),
4552                HashMap::new(),
4553                Some(Duration::from_secs(30)),
4554                dir.path().to_path_buf(),
4555                10,
4556                true,
4557                false,
4558                Some(dir.path().to_path_buf()),
4559            )
4560            .unwrap();
4561        registry
4562            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4563            .unwrap();
4564        let completions = registry.drain_completions_for_session(Some("session"));
4565        assert_eq!(completions.len(), 1);
4566        assert_eq!(
4567            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4568            vec![task_id.clone()]
4569        );
4570
4571        registry.cleanup_finished(Duration::ZERO);
4572
4573        assert!(registry.inner.tasks.lock().unwrap().is_empty());
4574    }
4575
4576    #[test]
4577    fn cleanup_finished_retains_undelivered_terminals() {
4578        let registry = BgTaskRegistry::default();
4579        let dir = tempfile::tempdir().unwrap();
4580        let task_id = registry
4581            .spawn(
4582                QUICK_SUCCESS_COMMAND,
4583                "session".to_string(),
4584                dir.path().to_path_buf(),
4585                HashMap::new(),
4586                Some(Duration::from_secs(30)),
4587                dir.path().to_path_buf(),
4588                10,
4589                true,
4590                false,
4591                Some(dir.path().to_path_buf()),
4592            )
4593            .unwrap();
4594        registry
4595            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4596            .unwrap();
4597
4598        registry.cleanup_finished(Duration::ZERO);
4599
4600        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4601    }
4602
4603    /// Verify that the live watchdog path (reap_child) gives an exited
4604    /// child one watchdog pass for its exit marker to land, then marks the
4605    /// task Failed if the next pass still sees no marker.
4606    ///
4607    /// Cross-platform: uses a quick-exiting command that does NOT go
4608    /// through the wrapper script (we manually clear the exit marker
4609    /// after spawn to simulate the wrapper crashing before write).
4610    #[test]
4611    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4612        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4613        let dir = tempfile::tempdir().unwrap();
4614        let task_id = registry
4615            .spawn(
4616                QUICK_SUCCESS_COMMAND,
4617                "session".to_string(),
4618                dir.path().to_path_buf(),
4619                HashMap::new(),
4620                Some(Duration::from_secs(30)),
4621                dir.path().to_path_buf(),
4622                10,
4623                true,
4624                false,
4625                Some(dir.path().to_path_buf()),
4626            )
4627            .unwrap();
4628
4629        let task = registry.task_for_session(&task_id, "session").unwrap();
4630
4631        // Wait for the child to actually exit and the wrapper to either
4632        // write the marker or fail. Then nuke the marker to simulate
4633        // wrapper crash before write. Poll up to 5s; this is plenty for a
4634        // `true`/`cmd /c exit 0` invocation.
4635        let started = Instant::now();
4636        loop {
4637            let exited = {
4638                let mut state = task.state.lock().unwrap();
4639                match &mut state.runtime {
4640                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4641                    _ => true,
4642                }
4643            };
4644            if exited {
4645                break;
4646            }
4647            assert!(
4648                started.elapsed() < Duration::from_secs(5),
4649                "child should exit quickly"
4650            );
4651            std::thread::sleep(Duration::from_millis(20));
4652        }
4653
4654        // Stop the watchdog so it doesn't race with our manual reap_child.
4655        // On fast Windows runners the watchdog ticks (every 500ms) can
4656        // observe the child exit and reap it before this test's assertion
4657        // fires, leaving us with state.child = None and an already-terminal
4658        // status. We specifically want to test reap_child's logic when
4659        // invoked manually on a Running-but-actually-dead task, so we need
4660        // exclusive control over the reap path here.
4661        registry
4662            .inner
4663            .shutdown
4664            .store(true, std::sync::atomic::Ordering::SeqCst);
4665        // Give the watchdog at most one tick (500ms) to notice shutdown
4666        // before we touch task state. Without this, an in-flight watchdog
4667        // iteration could still race with our state setup below.
4668        std::thread::sleep(Duration::from_millis(550));
4669
4670        // Wrapper likely wrote the marker by now; remove it to simulate
4671        // a wrapper crash that exited before persisting the exit code.
4672        let _ = std::fs::remove_file(&task.paths.exit);
4673
4674        // The watchdog may have already reaped the child handle and
4675        // marked the task terminal before we got here. Reset both so
4676        // reap_child has the "Running task whose child just exited"
4677        // shape it's designed to handle. If the original child handle is
4678        // gone, install a quick-exited stand-in so the first reap exercises
4679        // the same try_wait path as production.
4680        //
4681        // CRITICAL on Windows: the watchdog ticks fast enough that the
4682        // JSON on disk may already say `Completed`. `update_task` (called
4683        // by `reap_child`) reads from disk, applies the closure, but
4684        // ROLLS BACK if the original on-disk state was already terminal
4685        // (see persistence.rs::update_task). So we must reset BOTH
4686        // in-memory metadata AND the JSON on disk to a Running state to
4687        // give reap_child the fresh shape it expects to operate on.
4688        {
4689            let mut state = task.state.lock().unwrap();
4690            state.metadata.status = BgTaskStatus::Running;
4691            state.metadata.status_reason = None;
4692            state.metadata.exit_code = None;
4693            state.metadata.finished_at = None;
4694            state.metadata.duration_ms = None;
4695            // Persist the reset state to disk so update_task's terminal
4696            // rollback guard sees a non-terminal starting point.
4697            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4698                .expect("persist reset Running metadata for reap_child test");
4699            // If the watchdog already nulled state.child, we need to
4700            // simulate "child exists and is dead" so reap_child's
4701            // try_wait path runs. Spawn a quick-exit child as a stand-in.
4702            if matches!(state.runtime, TaskRuntime::Piped(None)) {
4703                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4704            }
4705        }
4706        // Clear the terminal_at marker too so mark_terminal_now() can fire
4707        // again inside reap_child.
4708        *task.terminal_at.lock().unwrap() = None;
4709
4710        // Sanity: task is still Running per metadata (replay/poll hasn't
4711        // observed the missing marker yet).
4712        assert!(
4713            task.is_running(),
4714            "precondition: metadata.status == Running"
4715        );
4716        assert!(
4717            !task.paths.exit.exists(),
4718            "precondition: exit marker absent"
4719        );
4720
4721        // First watchdog observation is intentionally insufficient to
4722        // declare failure. A missing marker may just mean the wrapper is
4723        // still completing its tmp-file-to-marker rename, so reap_child only
4724        // drops the child handle and switches to detached PID monitoring.
4725        registry.reap_child(&task);
4726
4727        {
4728            let state = task.state.lock().unwrap();
4729            assert_eq!(
4730                state.metadata.status,
4731                BgTaskStatus::Running,
4732                "first reap must leave status Running while waiting one pass for marker"
4733            );
4734            assert_eq!(
4735                state.metadata.status_reason, None,
4736                "first reap must not record a failure reason"
4737            );
4738            assert!(
4739                matches!(state.runtime, TaskRuntime::Piped(None)),
4740                "child handle must be released after first reap"
4741            );
4742            assert!(
4743                state.detached,
4744                "task must be marked detached after first reap"
4745            );
4746        }
4747
4748        // Second watchdog observation sees the detached PID is dead and the
4749        // marker is still absent. That is strong enough evidence that the
4750        // wrapper exited without persisting an exit code.
4751        registry.reap_child(&task);
4752
4753        let state = task.state.lock().unwrap();
4754        assert!(
4755            state.metadata.status.is_terminal(),
4756            "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
4757            state.metadata.status
4758        );
4759        assert_eq!(
4760            state.metadata.status,
4761            BgTaskStatus::Failed,
4762            "must specifically be Failed (not Killed): status={:?}",
4763            state.metadata.status
4764        );
4765        assert_eq!(
4766            state.metadata.status_reason.as_deref(),
4767            Some("process exited without exit marker"),
4768            "reason must match replay path's wording: {:?}",
4769            state.metadata.status_reason
4770        );
4771        assert!(
4772            matches!(state.runtime, TaskRuntime::Piped(None)),
4773            "child handle must stay released after second reap"
4774        );
4775        assert!(
4776            state.detached,
4777            "task must remain detached after second reap"
4778        );
4779    }
4780
4781    /// Companion to the above: when the exit marker DOES exist on disk
4782    /// at reap_child time, reap_child must NOT mark the task Failed.
4783    /// Instead it leaves status=Running and lets the next poll_task()
4784    /// cycle finalize via the marker.
4785    #[test]
4786    fn reap_child_preserves_running_when_exit_marker_exists() {
4787        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4788        let dir = tempfile::tempdir().unwrap();
4789        let task_id = registry
4790            .spawn(
4791                QUICK_SUCCESS_COMMAND,
4792                "session".to_string(),
4793                dir.path().to_path_buf(),
4794                HashMap::new(),
4795                Some(Duration::from_secs(30)),
4796                dir.path().to_path_buf(),
4797                10,
4798                true,
4799                false,
4800                Some(dir.path().to_path_buf()),
4801            )
4802            .unwrap();
4803
4804        let task = registry.task_for_session(&task_id, "session").unwrap();
4805
4806        // Wait for child to exit AND for the marker to land. Both happen
4807        // shortly after the wrapper finishes — but we want both observed.
4808        let started = Instant::now();
4809        loop {
4810            let exited = {
4811                let mut state = task.state.lock().unwrap();
4812                match &mut state.runtime {
4813                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4814                    _ => true,
4815                }
4816            };
4817            if exited && task.paths.exit.exists() {
4818                break;
4819            }
4820            assert!(
4821                started.elapsed() < Duration::from_secs(5),
4822                "child should exit and write marker quickly"
4823            );
4824            std::thread::sleep(Duration::from_millis(20));
4825        }
4826
4827        // Stop the watchdog so it doesn't race with our manual reap_child.
4828        // On fast Windows runners the watchdog can call poll_task (which
4829        // finalizes via marker) before this test asserts the
4830        // "marker exists, status still Running" invariant. We want
4831        // exclusive control over the reap path.
4832        registry
4833            .inner
4834            .shutdown
4835            .store(true, std::sync::atomic::Ordering::SeqCst);
4836        std::thread::sleep(Duration::from_millis(550));
4837
4838        // If the watchdog already finalized the task before we stopped it,
4839        // restore the test setup: reset status to Running and ensure the
4840        // marker file is still on disk. We're testing reap_child's
4841        // behavior when called manually with both child-exited AND
4842        // marker-present, regardless of whether the watchdog beat us.
4843        {
4844            let mut state = task.state.lock().unwrap();
4845            state.metadata.status = BgTaskStatus::Running;
4846            state.metadata.status_reason = None;
4847            if matches!(state.runtime, TaskRuntime::Piped(None)) {
4848                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4849            }
4850        }
4851        *task.terminal_at.lock().unwrap() = None;
4852        // Make sure the marker is still on disk (poll_task removes it on
4853        // finalization). Recreate it if needed.
4854        if !task.paths.exit.exists() {
4855            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
4856        }
4857
4858        // reap_child sees: child exited, marker exists. It should:
4859        //  - drop state.child / set state.detached = true
4860        //  - NOT change status (poll_task will finalize via marker next tick)
4861        registry.reap_child(&task);
4862
4863        let state = task.state.lock().unwrap();
4864        assert!(
4865            matches!(state.runtime, TaskRuntime::Piped(None)),
4866            "child handle still released even when marker exists"
4867        );
4868        assert!(
4869            state.detached,
4870            "task still marked detached even when marker exists"
4871        );
4872        // Status remains Running because reap_child defers to poll_task
4873        // when a marker exists. It would be wrong for reap to record the
4874        // marker outcome (poll_task does that with proper exit-code
4875        // parsing).
4876        assert_eq!(
4877            state.metadata.status,
4878            BgTaskStatus::Running,
4879            "reap_child must defer to poll_task when marker exists"
4880        );
4881    }
4882
4883    /// Read a process's `ps` state string ("Z", "S", "R", etc). Returns
4884    /// `None` once the PID has been fully reaped (no row), which is the
4885    /// post-reap state we want.
4886    #[cfg(unix)]
4887    fn pid_stat(pid: u32) -> Option<String> {
4888        let output = std::process::Command::new("ps")
4889            .args(["-o", "stat=", "-p", &pid.to_string()])
4890            .output()
4891            .ok()?;
4892        if !output.status.success() {
4893            return None;
4894        }
4895        let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
4896        if stat.is_empty() {
4897            None
4898        } else {
4899            Some(stat)
4900        }
4901    }
4902
4903    /// A `<defunct>` zombie carries `ps` state starting with 'Z'.
4904    #[cfg(unix)]
4905    fn is_zombie(pid: u32) -> bool {
4906        pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
4907    }
4908
4909    /// Spawn a child that exits immediately and wait — via `ps`, NOT
4910    /// `try_wait()`/`wait()` — until it is observably a `<defunct>` zombie,
4911    /// then return the still-unreaped handle. This reproduces the exact
4912    /// state issue #91 leaves behind: an exited OS child whose parent has
4913    /// not reaped it.
4914    #[cfg(unix)]
4915    fn spawn_unreaped_zombie() -> std::process::Child {
4916        let child = std::process::Command::new("true")
4917            .stdin(std::process::Stdio::null())
4918            .stdout(std::process::Stdio::null())
4919            .stderr(std::process::Stdio::null())
4920            .spawn()
4921            .expect("spawn zombie stand-in");
4922        let pid = child.id();
4923        let started = Instant::now();
4924        while !is_zombie(pid) {
4925            assert!(
4926                started.elapsed() < Duration::from_secs(5),
4927                "stand-in child should become a zombie within 5s"
4928            );
4929            std::thread::sleep(Duration::from_millis(10));
4930        }
4931        // Return WITHOUT reaping — the handle still owns an unwaited zombie.
4932        child
4933    }
4934
4935    /// Regression test for issue #91: the exit-marker terminal path
4936    /// (`poll_task` -> `finalize_from_marker`) must REAP the direct child
4937    /// handle, not merely drop it. Dropping a `std::process::Child` does not
4938    /// `wait()` on Unix, so the exited child lingers as a `[mv] <defunct>`
4939    /// zombie until AFT exits.
4940    ///
4941    /// We install a known-unreaped zombie into the task's child slot and
4942    /// drive the marker finalize path, then assert the child is gone (reaped)
4943    /// rather than still `<defunct>`.
4944    #[cfg(unix)]
4945    #[test]
4946    fn finalize_from_marker_reaps_child_no_zombie() {
4947        use std::sync::atomic::Ordering;
4948
4949        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4950        let dir = tempfile::tempdir().unwrap();
4951        let task_id = registry
4952            .spawn(
4953                QUICK_SUCCESS_COMMAND,
4954                "session".to_string(),
4955                dir.path().to_path_buf(),
4956                HashMap::new(),
4957                Some(Duration::from_secs(30)),
4958                dir.path().to_path_buf(),
4959                10,
4960                true,
4961                false,
4962                Some(dir.path().to_path_buf()),
4963            )
4964            .unwrap();
4965
4966        // Stop the watchdog so the ONLY terminal-transition path under test
4967        // is the exit-marker finalize (not reap_child's try_wait, which would
4968        // reap the child for us and mask the bug).
4969        registry.inner.shutdown.store(true, Ordering::SeqCst);
4970        std::thread::sleep(Duration::from_millis(550));
4971
4972        let task = registry.task_for_session(&task_id, "session").unwrap();
4973
4974        // Wait for the wrapper's exit marker to land. We deliberately do NOT
4975        // call try_wait()/wait() on the real child here — doing so would reap
4976        // it and defeat the test.
4977        let started = Instant::now();
4978        while !task.paths.exit.exists() {
4979            assert!(
4980                started.elapsed() < Duration::from_secs(5),
4981                "exit marker should land quickly for `true`"
4982            );
4983            std::thread::sleep(Duration::from_millis(20));
4984        }
4985
4986        // Reset to a fresh Running shape and install a guaranteed-unreaped
4987        // zombie as the child handle, so the finalize path's reap behavior is
4988        // exercised deterministically regardless of how the real child was
4989        // handled. Persist Running so update_task's terminal-rollback guard
4990        // sees a non-terminal starting point.
4991        let zombie_pid;
4992        {
4993            let mut state = task.state.lock().unwrap();
4994            state.metadata.status = BgTaskStatus::Running;
4995            state.metadata.status_reason = None;
4996            state.metadata.exit_code = None;
4997            state.metadata.finished_at = None;
4998            state.metadata.duration_ms = None;
4999            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5000                .expect("persist reset Running metadata");
5001            let zombie = spawn_unreaped_zombie();
5002            zombie_pid = zombie.id();
5003            state.runtime = TaskRuntime::Piped(Some(zombie));
5004        }
5005        *task.terminal_at.lock().unwrap() = None;
5006
5007        // Precondition: the installed child is genuinely a `<defunct>` zombie.
5008        assert!(
5009            is_zombie(zombie_pid),
5010            "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5011        );
5012
5013        // Drive the exit-marker terminal path. Before the fix this nulled the
5014        // Child handle without wait(), leaving the zombie behind.
5015        registry.poll_task(&task).unwrap();
5016
5017        {
5018            let state = task.state.lock().unwrap();
5019            assert!(
5020                matches!(state.runtime, TaskRuntime::Piped(None)),
5021                "child handle must be released after marker finalize"
5022            );
5023            assert!(
5024                state.metadata.status.is_terminal(),
5025                "task must be terminal after marker finalize: {:?}",
5026                state.metadata.status
5027            );
5028        }
5029
5030        // The core assertion: the child must have been REAPED, not just
5031        // dropped. A reaped PID has no `ps` row (or at minimum is not 'Z').
5032        assert!(
5033            !is_zombie(zombie_pid),
5034            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5035             after the exit-marker terminal transition"
5036        );
5037    }
5038
5039    /// Companion to the above for the kill path: when a kill observes an
5040    /// already-present exit marker (the child finished on its own first), it
5041    /// must reap the child handle rather than dropping it.
5042    #[cfg(unix)]
5043    #[test]
5044    fn kill_with_existing_marker_reaps_child_no_zombie() {
5045        use std::sync::atomic::Ordering;
5046
5047        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5048        let dir = tempfile::tempdir().unwrap();
5049        let task_id = registry
5050            .spawn(
5051                QUICK_SUCCESS_COMMAND,
5052                "session".to_string(),
5053                dir.path().to_path_buf(),
5054                HashMap::new(),
5055                Some(Duration::from_secs(30)),
5056                dir.path().to_path_buf(),
5057                10,
5058                true,
5059                false,
5060                Some(dir.path().to_path_buf()),
5061            )
5062            .unwrap();
5063
5064        registry.inner.shutdown.store(true, Ordering::SeqCst);
5065        std::thread::sleep(Duration::from_millis(550));
5066
5067        let task = registry.task_for_session(&task_id, "session").unwrap();
5068
5069        let started = Instant::now();
5070        while !task.paths.exit.exists() {
5071            assert!(
5072                started.elapsed() < Duration::from_secs(5),
5073                "exit marker should land quickly for `true`"
5074            );
5075            std::thread::sleep(Duration::from_millis(20));
5076        }
5077
5078        let zombie_pid;
5079        {
5080            let mut state = task.state.lock().unwrap();
5081            state.metadata.status = BgTaskStatus::Running;
5082            state.metadata.status_reason = None;
5083            state.metadata.exit_code = None;
5084            state.metadata.finished_at = None;
5085            state.metadata.duration_ms = None;
5086            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5087                .expect("persist reset Running metadata");
5088            let zombie = spawn_unreaped_zombie();
5089            zombie_pid = zombie.id();
5090            state.runtime = TaskRuntime::Piped(Some(zombie));
5091        }
5092        *task.terminal_at.lock().unwrap() = None;
5093
5094        assert!(
5095            is_zombie(zombie_pid),
5096            "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5097        );
5098
5099        // Kill observes the existing marker and finalizes from it.
5100        registry
5101            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5102            .expect("kill should succeed");
5103
5104        {
5105            let state = task.state.lock().unwrap();
5106            assert!(
5107                matches!(state.runtime, TaskRuntime::Piped(None)),
5108                "child handle must be released after marker-aware kill"
5109            );
5110            assert!(state.metadata.status.is_terminal());
5111        }
5112
5113        assert!(
5114            !is_zombie(zombie_pid),
5115            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5116             after a marker-aware kill"
5117        );
5118    }
5119
5120    #[test]
5121    fn cleanup_finished_keeps_running_tasks() {
5122        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5123        let dir = tempfile::tempdir().unwrap();
5124        let task_id = registry
5125            .spawn(
5126                LONG_RUNNING_COMMAND,
5127                "session".to_string(),
5128                dir.path().to_path_buf(),
5129                HashMap::new(),
5130                Some(Duration::from_secs(30)),
5131                dir.path().to_path_buf(),
5132                10,
5133                true,
5134                false,
5135                Some(dir.path().to_path_buf()),
5136            )
5137            .unwrap();
5138
5139        registry.cleanup_finished(Duration::ZERO);
5140
5141        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5142        let _ = registry.kill(&task_id, "session");
5143    }
5144
5145    #[cfg(windows)]
5146    fn wait_for_file(path: &Path) -> String {
5147        let started = Instant::now();
5148        loop {
5149            if path.exists() {
5150                return fs::read_to_string(path).expect("read file");
5151            }
5152            assert!(
5153                started.elapsed() < Duration::from_secs(30),
5154                "timed out waiting for {}",
5155                path.display()
5156            );
5157            std::thread::sleep(Duration::from_millis(100));
5158        }
5159    }
5160
5161    #[cfg(windows)]
5162    fn spawn_windows_registry_command(
5163        command: &str,
5164    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5165        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5166        let dir = tempfile::tempdir().unwrap();
5167        let task_id = registry
5168            .spawn(
5169                command,
5170                "session".to_string(),
5171                dir.path().to_path_buf(),
5172                HashMap::new(),
5173                Some(Duration::from_secs(30)),
5174                dir.path().to_path_buf(),
5175                10,
5176                false,
5177                false,
5178                Some(dir.path().to_path_buf()),
5179            )
5180            .unwrap();
5181        (registry, dir, task_id)
5182    }
5183
5184    #[cfg(windows)]
5185    #[test]
5186    fn windows_spawn_writes_exit_marker_for_zero_exit() {
5187        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5188        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5189
5190        let content = wait_for_file(&exit_path);
5191
5192        assert_eq!(content.trim(), "0");
5193    }
5194
5195    #[cfg(windows)]
5196    #[test]
5197    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5198        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5199        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5200
5201        let content = wait_for_file(&exit_path);
5202
5203        assert_eq!(content.trim(), "42");
5204    }
5205
5206    #[cfg(windows)]
5207    #[test]
5208    fn windows_spawn_captures_stdout_to_disk() {
5209        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5210        let task = registry.task_for_session(&task_id, "session").unwrap();
5211        let stdout_path = task.paths.stdout.clone();
5212        let exit_path = task.paths.exit.clone();
5213
5214        let _ = wait_for_file(&exit_path);
5215        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5216
5217        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5218    }
5219
5220    #[cfg(windows)]
5221    #[test]
5222    fn windows_spawn_uses_pwsh_when_available() {
5223        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
5224        // (We intentionally pass None for shell_env to keep this test
5225        // independent of the runner's actual env.)
5226        let candidates = crate::windows_shell::shell_candidates_with(
5227            |binary| match binary {
5228                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5229                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5230                _ => None,
5231            },
5232            || None,
5233        );
5234        let shell = candidates.first().expect("at least one candidate").clone();
5235        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5236        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5237    }
5238
5239    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
5240    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
5241    /// evaluation is the default for batch files; parse-time expansion only
5242    /// applies to compound `&`-chained inline commands). Capturing
5243    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
5244    /// command runs records the real run-time exit code.
5245    #[cfg(windows)]
5246    #[test]
5247    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5248        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5249        let script =
5250            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5251
5252        // Batch wrapper: capture exit code into CODE on the line after the
5253        // user command, then write CODE to a temp marker file before
5254        // atomic-renaming it into place.
5255        assert!(
5256            script.contains("set CODE=%ERRORLEVEL%"),
5257            "wrapper must capture exit code into CODE: {script}"
5258        );
5259        assert!(
5260            script.contains("echo %CODE% >"),
5261            "wrapper must echo CODE to a temp marker file: {script}"
5262        );
5263        assert!(
5264            script.contains("move /Y"),
5265            "wrapper must use atomic move to write the marker: {script}"
5266        );
5267        // move output must be redirected to nul to avoid polluting the
5268        // user's captured stdout with "1 file(s) moved." lines.
5269        assert!(
5270            script.contains("> nul"),
5271            "wrapper must redirect move output to nul: {script}"
5272        );
5273        // exit /B %CODE% propagates the real exit code so wait() sees it.
5274        assert!(
5275            script.contains("exit /B %CODE%"),
5276            "wrapper must propagate the captured exit code: {script}"
5277        );
5278        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5279        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5280    }
5281
5282    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
5283    /// written to a `.bat` file where batch-line evaluation captures
5284    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
5285    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
5286    /// internal `"`-quoting from `cmd_quote`).
5287    #[cfg(windows)]
5288    #[test]
5289    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5290        use crate::windows_shell::WindowsShell;
5291        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5292        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5293        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5294        assert_eq!(
5295            args_strs,
5296            vec!["/D", "/S", "/C", "echo wrapped"],
5297            "Cmd::bg_command must prepend /D /S /C"
5298        );
5299    }
5300
5301    /// PowerShell variants don't need `/V:ON`-style flags; their args
5302    /// are the same for foreground (`command()`) and background
5303    /// (`bg_command()`).
5304    #[cfg(windows)]
5305    #[test]
5306    fn windows_shell_pwsh_bg_command_uses_standard_args() {
5307        use crate::windows_shell::WindowsShell;
5308        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5309        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5310        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5311        assert!(
5312            args_strs.contains(&"-Command"),
5313            "Pwsh::bg_command must use -Command: {args_strs:?}"
5314        );
5315        assert!(
5316            args_strs.contains(&"Get-Date"),
5317            "Pwsh::bg_command must include the user command body"
5318        );
5319    }
5320
5321    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
5322    /// **cmd.exe-specific** wrapper path captures the user command's
5323    /// run-time exit code correctly. The existing
5324    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
5325    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
5326    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
5327    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
5328    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
5329    ///
5330    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
5331    /// force the cmd-wrapper code path, then writes the exit marker and
5332    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
5333    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
5334    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
5335    /// regardless of what the user command returned.
5336    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
5337    /// the inline command-line wrapper helper that production code does
5338    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
5339    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
5340    /// produced silent failures on Windows 11 (move /Y could not parse
5341    /// path arguments through cmd's /C parser). The `bg_command` helper
5342    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
5343    /// the production spawn path goes through `detached_shell_command_for`
5344    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
5345    /// <bat-path>`.
5346    ///
5347    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
5348    /// covered live by the Windows e2e harness scenario 2d
5349    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
5350    /// exercises the real file-based wrapper end-to-end via the protocol.
5351    #[allow(dead_code)]
5352    #[cfg(any())] // disabled on all targets
5353    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
5354}