Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, StreamKind, TokenCountInput};
27use super::output::{
28    cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29    cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30    COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES, COMPRESS_INPUT_TAIL_BYTES,
31    FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES, RAW_PASSTHROUGH_HEAD_BYTES,
32    RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES, STRUCTURED_OUTPUT_CAP_BYTES,
33};
34use super::persistence::{
35    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
36    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
37    ExitMarker, PersistedTask, TaskPaths,
38};
39use super::process::is_process_alive;
40#[cfg(unix)]
41use super::process::terminate_pgid;
42#[cfg(windows)]
43use super::process::terminate_pid;
44use super::pty_process::spawn_pty_for_command;
45use super::pty_runtime::PtyRuntime;
46use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
47use super::{BgTaskInfo, BgTaskStatus};
48// Note: `resolve_windows_shell` is no longer imported at module scope —
49// production code in `spawn_detached_child` uses `shell_candidates()`
50// with retry instead, and the function remains in `windows_shell.rs`
51// for tests and as a future helper.
52
53/// Default timeout for background bash tasks: 30 minutes.
54/// Agents can override per-call via the `timeout` parameter (in ms).
55const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
56const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
57const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
58const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
59
60const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
61
62#[derive(Debug, Clone, Serialize)]
63pub struct BgCompletion {
64    pub task_id: String,
65    /// Intentionally omitted from serialized completion payloads: push frames
66    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
67    #[serde(skip_serializing)]
68    pub session_id: String,
69    pub status: BgTaskStatus,
70    pub exit_code: Option<i32>,
71    pub command: String,
72    /// Small head+tail preview of the cached terminal render at completion time,
73    /// cached so push-frame consumers and `bash_drain_completions` callers see
74    /// the same preview without racing against later output rotation. Empty
75    /// when not captured (e.g., persisted task seen on startup before buffer
76    /// reattachment).
77    #[serde(default, skip_serializing_if = "String::is_empty")]
78    pub output_preview: String,
79    /// True when the captured tail is shorter than the actual output (because
80    /// rotation occurred or the output exceeds the preview cap). Plugins use
81    /// this to render a `…` prefix and signal that `bash_status` would return
82    /// more.
83    #[serde(default, skip_serializing_if = "is_false")]
84    pub output_truncated: bool,
85    /// Token count for raw stdout+stderr before compression. Omitted when any
86    /// stream exceeds the 128 KiB tokenization cap.
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub original_tokens: Option<u32>,
89    /// Token count for the compressed output generated from the same capped
90    /// raw payload. Omitted when raw tokenization is skipped.
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub compressed_tokens: Option<u32>,
93    /// True when a stream exceeded the tokenization cap and counts are absent.
94    #[serde(default, skip_serializing_if = "is_false")]
95    pub tokens_skipped: bool,
96}
97
98fn is_false(v: &bool) -> bool {
99    !*v
100}
101
102#[derive(Debug, Clone, Serialize)]
103pub struct BgTaskSnapshot {
104    #[serde(flatten)]
105    pub info: BgTaskInfo,
106    pub exit_code: Option<i32>,
107    pub child_pid: Option<u32>,
108    pub workdir: String,
109    pub output_preview: String,
110    pub output_truncated: bool,
111    pub output_path: Option<String>,
112    pub stderr_path: Option<String>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub pty_rows: Option<u16>,
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub pty_cols: Option<u16>,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120enum TerminalOutputKind {
121    Compressed,
122    Raw,
123    Structured,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
127struct TerminalOutputCache {
128    output_preview: String,
129    output_truncated: bool,
130    kind: TerminalOutputKind,
131    output_path: Option<String>,
132    stderr_path: Option<String>,
133    recovery: Option<RecoveryContext>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137struct RecoveryContext {
138    dropped_by_class: BTreeMap<DropClass, usize>,
139    had_inner_drop: bool,
140    offset_hint_eligible: bool,
141    offset_start_line: Option<usize>,
142    byte_truncated: bool,
143    output_path: Option<String>,
144    stderr_path: Option<String>,
145    include_stderr_path: bool,
146}
147
148impl RecoveryContext {
149    fn has_visible_drop(&self) -> bool {
150        self.byte_truncated || self.had_inner_drop || !self.dropped_by_class.is_empty()
151    }
152}
153
154#[derive(Clone)]
155pub struct BgTaskRegistry {
156    pub(crate) inner: Arc<RegistryInner>,
157}
158
159pub(crate) struct RegistryInner {
160    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
161    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
162    pub(crate) progress_sender: SharedProgressSender,
163    watchdog_started: AtomicBool,
164    pub(crate) shutdown: AtomicBool,
165    pub(crate) long_running_reminder_enabled: AtomicBool,
166    pub(crate) long_running_reminder_interval_ms: AtomicU64,
167    persisted_gc_started: AtomicBool,
168    #[cfg(test)]
169    persisted_gc_runs: AtomicU64,
170    /// Output compression callback. Set by `AppContext` after construction.
171    /// Takes (command, raw_output, exit_code) and returns compressed text. Called from
172    /// the watchdog thread when a task reaches a terminal state and from
173    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
174    /// uncompressed.
175    pub(crate) compressor:
176        Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
177    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
178    pub(crate) db_harness: RwLock<Option<String>>,
179    pub(crate) wake_tx: crossbeam_channel::Sender<()>,
180    pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
181    pub(crate) watch_registry: Mutex<WatchRegistry>,
182}
183
184pub(crate) struct BgTask {
185    pub(crate) task_id: String,
186    pub(crate) session_id: String,
187    pub(crate) paths: TaskPaths,
188    pub(crate) started: Instant,
189    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
190    pub(crate) terminal_at: Mutex<Option<Instant>>,
191    pub(crate) state: Mutex<BgTaskState>,
192}
193
194pub(crate) enum TaskRuntime {
195    Piped(Option<Child>),
196    Pty(Option<PtyRuntime>),
197}
198
199pub(crate) struct BgTaskState {
200    pub(crate) metadata: PersistedTask,
201    pub(crate) runtime: TaskRuntime,
202    pub(crate) detached: bool,
203    /// True once `reap_child` has observed the direct child handle's exit
204    /// via `try_wait()`. Used by the two-pass watchdog to skip the racy
205    /// `is_process_alive(child_pid)` probe on the second pass — we already
206    /// have authoritative evidence that the child is dead, no need to
207    /// re-verify via PID liveness which is unreliable on Windows where
208    /// PIDs can be recycled within seconds.
209    ///
210    /// Remains `false` on replay-restored tasks (those have a `child_pid`
211    /// but never observed exit via this process's `try_wait()`), so those
212    /// continue to fall through to the `is_process_alive` probe path.
213    pub(crate) child_exit_observed: bool,
214    pub(crate) buffer: BgBuffer,
215    terminal_output_cache: Option<TerminalOutputCache>,
216    /// PTY-only: set for timeout kill intent before signaling the child.
217    pub(crate) pending_terminal_override: Option<BgTaskStatus>,
218}
219
220fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
221    session_id
222        .map(|session_id| completion.session_id == session_id)
223        .unwrap_or(true)
224}
225
226impl BgTaskRegistry {
227    pub fn new(progress_sender: SharedProgressSender) -> Self {
228        let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
229        Self {
230            inner: Arc::new(RegistryInner {
231                tasks: Mutex::new(HashMap::new()),
232                completions: Mutex::new(VecDeque::new()),
233                progress_sender,
234                watchdog_started: AtomicBool::new(false),
235                shutdown: AtomicBool::new(false),
236                long_running_reminder_enabled: AtomicBool::new(true),
237                long_running_reminder_interval_ms: AtomicU64::new(600_000),
238                persisted_gc_started: AtomicBool::new(false),
239                #[cfg(test)]
240                persisted_gc_runs: AtomicU64::new(0),
241                compressor: Mutex::new(None),
242                db_pool: RwLock::new(None),
243                db_harness: RwLock::new(None),
244                wake_tx,
245                wake_rx,
246                watch_registry: Mutex::new(WatchRegistry::default()),
247            }),
248        }
249    }
250
251    pub fn set_harness(&self, harness: Harness) {
252        if let Ok(mut slot) = self.inner.db_harness.write() {
253            *slot = Some(harness.as_str().to_string());
254        }
255    }
256
257    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
258        if let Ok(mut slot) = self.inner.db_pool.write() {
259            *slot = Some(conn);
260        }
261    }
262
263    pub fn clear_db_pool(&self) {
264        if let Ok(mut slot) = self.inner.db_pool.write() {
265            *slot = None;
266        }
267    }
268
269    /// Install the output-compression callback. Called by `main.rs` after
270    /// `AppContext` is constructed so that snapshot/completion paths can
271    /// invoke `compress::compress_with_registry` without holding a context
272    /// reference. When called multiple times, the latest installation wins.
273    pub fn set_compressor<F>(&self, compressor: F)
274    where
275        F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
276    {
277        self.set_compressor_with_exit_code(move |command, output, _exit_code| {
278            compressor(command, output)
279        });
280    }
281
282    pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
283    where
284        F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
285    {
286        if let Ok(mut slot) = self.inner.compressor.lock() {
287            *slot = Some(Box::new(compressor));
288        }
289    }
290
291    /// Apply the installed compressor (if any) to `output`. Returns `output`
292    /// untouched when no compressor is installed.
293    pub(crate) fn compress_output(
294        &self,
295        command: &str,
296        output: String,
297        exit_code: Option<i32>,
298    ) -> CompressionResult {
299        let Ok(slot) = self.inner.compressor.lock() else {
300            return CompressionResult::new(output);
301        };
302        match slot.as_ref() {
303            Some(compressor) => compressor(command, output, exit_code),
304            None => CompressionResult::new(output),
305        }
306    }
307
308    fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
309        let (metadata, buffer) = {
310            let state = task.state.lock().ok()?;
311            if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
312                return None;
313            }
314            if let Some(cache) = state.terminal_output_cache.clone() {
315                return Some(cache);
316            }
317            (state.metadata.clone(), state.buffer.clone())
318        };
319
320        let mut cap_buffer = buffer.clone();
321        cap_buffer.enforce_terminal_cap();
322        let cache = self.render_terminal_output(&metadata, &buffer);
323        let mut state = task.state.lock().ok()?;
324        if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
325            return None;
326        }
327        if let Some(existing) = state.terminal_output_cache.clone() {
328            return Some(existing);
329        }
330        state.terminal_output_cache = Some(cache.clone());
331        Some(cache)
332    }
333
334    fn render_terminal_output(
335        &self,
336        metadata: &PersistedTask,
337        buffer: &BgBuffer,
338    ) -> TerminalOutputCache {
339        if metadata.mode == BgMode::Pty {
340            return TerminalOutputCache {
341                output_preview: String::new(),
342                output_truncated: false,
343                kind: TerminalOutputKind::Raw,
344                output_path: buffer.output_path().map(|path| path.display().to_string()),
345                stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
346                recovery: None,
347            };
348        }
349
350        if let Some(structured) = render_structured_output(&metadata.command, buffer) {
351            return structured;
352        }
353
354        if !metadata.compressed {
355            return render_raw_passthrough(buffer);
356        }
357
358        let raw = buffer.read_combined_head_tail(
359            COMPRESS_INPUT_CAP_BYTES,
360            COMPRESS_INPUT_HEAD_BYTES,
361            COMPRESS_INPUT_TAIL_BYTES,
362        );
363        let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
364        render_compressed_with_recovery(buffer, compressed, raw.truncated)
365    }
366
367    fn snapshot_with_terminal_cache(
368        &self,
369        task: &Arc<BgTask>,
370        preview_bytes: usize,
371    ) -> BgTaskSnapshot {
372        let mut snapshot = task.snapshot(preview_bytes);
373        self.maybe_compress_snapshot(task, &mut snapshot);
374        snapshot
375    }
376
377    fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
378        let (metadata, buffer) = {
379            let state = task
380                .state
381                .lock()
382                .map_err(|_| "background task lock poisoned".to_string())?;
383            if !state.metadata.status.is_terminal() {
384                return Ok(());
385            }
386            (state.metadata.clone(), state.buffer.clone())
387        };
388
389        let mut cap_buffer = buffer.clone();
390        cap_buffer.enforce_terminal_cap();
391        let cache = self.ensure_terminal_output_cache(task);
392        self.enqueue_completion_from_parts(
393            &metadata,
394            Some(&buffer),
395            None,
396            emit_frame,
397            cache.as_ref(),
398        );
399        Ok(())
400    }
401
402    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
403        write_task(&paths.json, metadata)?;
404        self.dual_write_task(paths, metadata);
405        Ok(())
406    }
407
408    fn update_task_metadata<F>(
409        &self,
410        paths: &TaskPaths,
411        update: F,
412    ) -> std::io::Result<PersistedTask>
413    where
414        F: FnOnce(&mut PersistedTask),
415    {
416        let metadata = update_task(&paths.json, update)?;
417        self.dual_write_task(paths, &metadata);
418        Ok(metadata)
419    }
420
421    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
422        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
423        let Some(pool) = pool else {
424            return;
425        };
426        let harness = self
427            .inner
428            .db_harness
429            .read()
430            .ok()
431            .and_then(|slot| slot.clone());
432        let Some(harness) = harness else {
433            crate::slog_warn!(
434                "dual-write bash_task to DB skipped for {}: harness not configured",
435                metadata.task_id
436            );
437            return;
438        };
439        let row = match metadata.to_bash_task_row(&harness, paths) {
440            Ok(row) => row,
441            Err(error) => {
442                crate::slog_warn!(
443                    "dual-write bash_task to DB failed for {}: {}",
444                    metadata.task_id,
445                    error
446                );
447                return;
448            }
449        };
450        let conn = match pool.lock() {
451            Ok(conn) => conn,
452            Err(_) => {
453                crate::slog_warn!(
454                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
455                    metadata.task_id
456                );
457                return;
458            }
459        };
460        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
461            crate::slog_warn!(
462                "dual-write bash_task to DB failed for {}: {}",
463                metadata.task_id,
464                error
465            );
466        }
467    }
468
469    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
470        self.inner
471            .long_running_reminder_enabled
472            .store(enabled, Ordering::SeqCst);
473        self.inner
474            .long_running_reminder_interval_ms
475            .store(interval_ms, Ordering::SeqCst);
476    }
477
478    #[cfg(unix)]
479    #[allow(clippy::too_many_arguments)]
480    pub fn spawn(
481        &self,
482        command: &str,
483        session_id: String,
484        workdir: PathBuf,
485        env: HashMap<String, String>,
486        timeout: Option<Duration>,
487        storage_dir: PathBuf,
488        max_running: usize,
489        notify_on_completion: bool,
490        compressed: bool,
491        project_root: Option<PathBuf>,
492    ) -> Result<String, String> {
493        self.start_watchdog();
494
495        let running = self.running_count();
496        if running >= max_running {
497            return Err(format!(
498                "background bash task limit exceeded: {running} running (max {max_running})"
499            ));
500        }
501
502        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
503        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
504        let task_id = self.generate_unique_task_id()?;
505        let paths = task_paths(&storage_dir, &session_id, &task_id);
506        fs::create_dir_all(&paths.dir)
507            .map_err(|e| format!("failed to create background task dir: {e}"))?;
508
509        let mut metadata = PersistedTask::starting(
510            task_id.clone(),
511            session_id.clone(),
512            command.to_string(),
513            workdir.clone(),
514            project_root,
515            timeout_ms,
516            notify_on_completion,
517            compressed,
518        );
519        self.persist_task(&paths, &metadata)
520            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
521
522        // Pre-create capture files so the watchdog/buffer can always
523        // open them for reading. The spawn helper opens its own handles
524        // per attempt because each `Command::spawn()` consumes them.
525        create_capture_file(&paths.stdout)
526            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
527        create_capture_file(&paths.stderr)
528            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
529
530        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
531            Ok(child) => child,
532            Err(error) => {
533                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
534                let _ = delete_task_bundle(&paths);
535                return Err(error);
536            }
537        };
538
539        let child_pid = child.id();
540        metadata.mark_running(child_pid, child_pid as i32);
541        self.persist_task(&paths, &metadata)
542            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
543
544        let task = Arc::new(BgTask {
545            task_id: task_id.clone(),
546            session_id,
547            paths: paths.clone(),
548            started: Instant::now(),
549            last_reminder_at: Mutex::new(None),
550            terminal_at: Mutex::new(None),
551            state: Mutex::new(BgTaskState {
552                metadata,
553                runtime: TaskRuntime::Piped(Some(child)),
554                detached: false,
555                child_exit_observed: false,
556                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
557                terminal_output_cache: None,
558                pending_terminal_override: None,
559            }),
560        });
561
562        self.inner
563            .tasks
564            .lock()
565            .map_err(|_| "background task registry lock poisoned".to_string())?
566            .insert(task_id.clone(), task);
567
568        Ok(task_id)
569    }
570
571    #[allow(clippy::too_many_arguments)]
572    pub fn spawn_pty(
573        &self,
574        command: &str,
575        session_id: String,
576        workdir: PathBuf,
577        env: HashMap<String, String>,
578        timeout: Option<Duration>,
579        storage_dir: PathBuf,
580        max_running: usize,
581        notify_on_completion: bool,
582        compressed: bool,
583        project_root: Option<PathBuf>,
584        rows: u16,
585        cols: u16,
586    ) -> Result<String, String> {
587        self.start_watchdog();
588
589        let running = self.running_count();
590        if running >= max_running {
591            return Err(format!(
592                "background bash task limit exceeded: {running} running (max {max_running})"
593            ));
594        }
595
596        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
597        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
598        let task_id = self.generate_unique_task_id()?;
599        let paths = task_paths(&storage_dir, &session_id, &task_id);
600        fs::create_dir_all(&paths.dir)
601            .map_err(|e| format!("failed to create background task dir: {e}"))?;
602
603        let mut metadata = PersistedTask::starting(
604            task_id.clone(),
605            session_id.clone(),
606            command.to_string(),
607            workdir.clone(),
608            project_root,
609            timeout_ms,
610            notify_on_completion,
611            compressed,
612        );
613        metadata.mode = BgMode::Pty;
614        metadata.pty_rows = Some(rows);
615        metadata.pty_cols = Some(cols);
616        self.persist_task(&paths, &metadata)
617            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
618        create_capture_file(&paths.pty)
619            .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
620
621        let runtime = match spawn_pty_for_command(
622            &task_id,
623            &session_id,
624            command,
625            &paths,
626            &workdir,
627            &env,
628            rows,
629            cols,
630            self.inner.wake_tx.clone(),
631        ) {
632            Ok(runtime) => runtime,
633            Err(error) => {
634                crate::slog_warn!(
635                    "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
636                );
637                let _ = delete_task_bundle(&paths);
638                return Err(error);
639            }
640        };
641
642        if let Some(child_pid) = runtime.child_pid {
643            metadata.mark_running(child_pid, child_pid as i32);
644        } else {
645            metadata.status = BgTaskStatus::Running;
646            metadata.pgid = None;
647        }
648        self.persist_task(&paths, &metadata)
649            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
650
651        let task = Arc::new(BgTask {
652            task_id: task_id.clone(),
653            session_id,
654            paths: paths.clone(),
655            started: Instant::now(),
656            last_reminder_at: Mutex::new(None),
657            terminal_at: Mutex::new(None),
658            state: Mutex::new(BgTaskState {
659                metadata,
660                runtime: TaskRuntime::Pty(Some(runtime)),
661                detached: false,
662                child_exit_observed: false,
663                buffer: BgBuffer::pty(paths.pty.clone()),
664                terminal_output_cache: None,
665                pending_terminal_override: None,
666            }),
667        });
668
669        self.inner
670            .tasks
671            .lock()
672            .map_err(|_| "background task registry lock poisoned".to_string())?
673            .insert(task_id.clone(), task);
674
675        Ok(task_id)
676    }
677
678    #[cfg(windows)]
679    #[allow(clippy::too_many_arguments)]
680    pub fn spawn(
681        &self,
682        command: &str,
683        session_id: String,
684        workdir: PathBuf,
685        env: HashMap<String, String>,
686        timeout: Option<Duration>,
687        storage_dir: PathBuf,
688        max_running: usize,
689        notify_on_completion: bool,
690        compressed: bool,
691        project_root: Option<PathBuf>,
692    ) -> Result<String, String> {
693        self.start_watchdog();
694
695        let running = self.running_count();
696        if running >= max_running {
697            return Err(format!(
698                "background bash task limit exceeded: {running} running (max {max_running})"
699            ));
700        }
701
702        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
703        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
704        let task_id = self.generate_unique_task_id()?;
705        let paths = task_paths(&storage_dir, &session_id, &task_id);
706        fs::create_dir_all(&paths.dir)
707            .map_err(|e| format!("failed to create background task dir: {e}"))?;
708
709        let mut metadata = PersistedTask::starting(
710            task_id.clone(),
711            session_id.clone(),
712            command.to_string(),
713            workdir.clone(),
714            project_root,
715            timeout_ms,
716            notify_on_completion,
717            compressed,
718        );
719        self.persist_task(&paths, &metadata)
720            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
721
722        // Capture files are pre-created so the watchdog/buffer can always
723        // open them for reading even if the child hasn't written anything
724        // yet. The spawn helper opens its own handles per attempt because
725        // each `Command::spawn()` consumes them, and on Windows we may
726        // retry across multiple shell candidates if the first one fails.
727        create_capture_file(&paths.stdout)
728            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
729        create_capture_file(&paths.stderr)
730            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
731
732        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
733            Ok(child) => child,
734            Err(error) => {
735                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
736                let _ = delete_task_bundle(&paths);
737                return Err(error);
738            }
739        };
740
741        let child_pid = child.id();
742        metadata.status = BgTaskStatus::Running;
743        metadata.child_pid = Some(child_pid);
744        metadata.pgid = None;
745        self.persist_task(&paths, &metadata)
746            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
747
748        let task = Arc::new(BgTask {
749            task_id: task_id.clone(),
750            session_id,
751            paths: paths.clone(),
752            started: Instant::now(),
753            last_reminder_at: Mutex::new(None),
754            terminal_at: Mutex::new(None),
755            state: Mutex::new(BgTaskState {
756                metadata,
757                runtime: TaskRuntime::Piped(Some(child)),
758                detached: false,
759                child_exit_observed: false,
760                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
761                terminal_output_cache: None,
762                pending_terminal_override: None,
763            }),
764        });
765
766        self.inner
767            .tasks
768            .lock()
769            .map_err(|_| "background task registry lock poisoned".to_string())?
770            .insert(task_id.clone(), task);
771
772        Ok(task_id)
773    }
774
775    pub fn write_pty(
776        &self,
777        task_id: &str,
778        session_id: &str,
779        input: &[u8],
780    ) -> Result<usize, String> {
781        let task = self
782            .task_for_session(task_id, session_id)
783            .ok_or_else(|| "task_not_found".to_string())?;
784
785        let writer = {
786            let state = task
787                .state
788                .lock()
789                .map_err(|_| "background task lock poisoned".to_string())?;
790            if state.metadata.mode != BgMode::Pty {
791                return Err("task_not_pty".to_string());
792            }
793            if state.metadata.status.is_terminal() {
794                return Err("task_exited".to_string());
795            }
796            match &state.runtime {
797                TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
798                TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
799                TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
800            }
801        };
802
803        let mut writer = writer
804            .lock()
805            .map_err(|_| "PTY writer lock poisoned".to_string())?;
806        writer
807            .write_all(input)
808            .map_err(|error| format!("failed to write to PTY: {error}"))?;
809        writer
810            .flush()
811            .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
812        Ok(input.len())
813    }
814
815    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
816        self.replay_session_inner(storage_dir, session_id, None)
817    }
818
819    pub fn replay_session_for_project(
820        &self,
821        storage_dir: &Path,
822        session_id: &str,
823        project_root: &Path,
824    ) -> Result<(), String> {
825        self.replay_session_inner(storage_dir, session_id, Some(project_root))
826    }
827
828    fn replay_session_inner(
829        &self,
830        storage_dir: &Path,
831        session_id: &str,
832        project_root: Option<&Path>,
833    ) -> Result<(), String> {
834        self.start_watchdog();
835        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
836            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
837                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
838            }
839        }
840
841        let canonical_project = project_root.map(canonicalized_path);
842        // Replay strategy: DB is the post-v0.27 source of truth. Disk
843        // fallback handles pre-v0.27 tasks that haven't been migrated and
844        // the cold-start `__default__` namespace (configure runs before any
845        // user session exists, so plugin-init triggers a session-less DB
846        // lookup that will be empty until a real session writes a task).
847        //
848        // We deliberately keep the empty-DB / empty-disk path silent — it's
849        // the normal startup case and would otherwise fire on every configure
850        // (see GitHub user report against v0.27.0). INFO-level logs only when
851        // disk actually returned tasks (real migration signal); WARN when the
852        // DB lookup itself errored.
853        let tasks = match self.replay_session_from_db(session_id) {
854            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
855            Some(Ok(_)) => {
856                let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
857                if !disk_tasks.is_empty() {
858                    crate::slog_info!(
859                        "bash task replay: 0 in DB for session {}, {} from disk fallback",
860                        session_id,
861                        disk_tasks.len()
862                    );
863                }
864                disk_tasks
865            }
866            Some(Err(error)) => {
867                crate::slog_warn!(
868                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
869                    session_id,
870                    error
871                );
872                self.replay_session_from_disk(storage_dir, session_id)?
873            }
874            None => {
875                // DB pool unconfigured — common in tests + before harness is set.
876                self.replay_session_from_disk(storage_dir, session_id)?
877            }
878        };
879
880        for mut metadata in tasks {
881            if metadata.session_id != session_id {
882                continue;
883            }
884            if let Some(canonical_project) = canonical_project.as_deref() {
885                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
886                if metadata_project.as_deref() != Some(canonical_project) {
887                    continue;
888                }
889            }
890
891            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
892            match metadata.status {
893                BgTaskStatus::Starting => {
894                    let completion_was_delivered = metadata.completion_delivered;
895                    metadata.mark_terminal(
896                        BgTaskStatus::Failed,
897                        None,
898                        Some("spawn aborted".to_string()),
899                    );
900                    metadata.completion_delivered |= completion_was_delivered;
901                    let _ = self.persist_task(&paths, &metadata);
902                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
903                    self.insert_rehydrated_task(metadata, paths, true)?;
904                }
905                BgTaskStatus::Running | BgTaskStatus::Killing => {
906                    if metadata.mode == BgMode::Pty {
907                        if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
908                            let completion_was_delivered = metadata.completion_delivered;
909                            metadata = terminal_metadata_from_marker(metadata, marker, None);
910                            metadata.completion_delivered |= completion_was_delivered;
911                            let _ = self.persist_task(&paths, &metadata);
912                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
913                            self.insert_rehydrated_task(metadata, paths, true)?;
914                        } else if metadata.status.is_terminal() {
915                            self.insert_rehydrated_task(metadata, paths, true)?;
916                        } else {
917                            let completion_was_delivered = metadata.completion_delivered;
918                            metadata.mark_terminal(
919                                BgTaskStatus::Killed,
920                                None,
921                                Some("pty_lost_on_bridge_restart".to_string()),
922                            );
923                            metadata.completion_delivered |= completion_was_delivered;
924                            let _ = self.persist_task(&paths, &metadata);
925                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
926                            self.insert_rehydrated_task(metadata, paths, true)?;
927                        }
928                    } else if self.running_metadata_is_stale(&metadata) {
929                        let completion_was_delivered = metadata.completion_delivered;
930                        metadata.mark_terminal(
931                            BgTaskStatus::Killed,
932                            None,
933                            Some("orphaned (>24h)".to_string()),
934                        );
935                        metadata.completion_delivered |= completion_was_delivered;
936                        if !paths.exit.exists() {
937                            let _ = write_kill_marker_if_absent(&paths.exit);
938                        }
939                        let _ = self.persist_task(&paths, &metadata);
940                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
941                        self.insert_rehydrated_task(metadata, paths, true)?;
942                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
943                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
944                            "recovered from inconsistent killing state on replay".to_string()
945                        });
946                        if reason.is_some() {
947                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
948                            metadata.task_id);
949                        }
950                        let completion_was_delivered = metadata.completion_delivered;
951                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
952                        metadata.completion_delivered |= completion_was_delivered;
953                        let _ = self.persist_task(&paths, &metadata);
954                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
955                        self.insert_rehydrated_task(metadata, paths, true)?;
956                    } else if metadata.status == BgTaskStatus::Killing {
957                        if !paths.exit.exists() {
958                            let _ = write_kill_marker_if_absent(&paths.exit);
959                        }
960                        let completion_was_delivered = metadata.completion_delivered;
961                        metadata.mark_terminal(
962                            BgTaskStatus::Killed,
963                            None,
964                            Some("recovered from inconsistent killing state on replay".to_string()),
965                        );
966                        metadata.completion_delivered |= completion_was_delivered;
967                        let _ = self.persist_task(&paths, &metadata);
968                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
969                        self.insert_rehydrated_task(metadata, paths, true)?;
970                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
971                        let completion_was_delivered = metadata.completion_delivered;
972                        metadata.mark_terminal(
973                            BgTaskStatus::Failed,
974                            None,
975                            Some("process exited without exit marker".to_string()),
976                        );
977                        metadata.completion_delivered |= completion_was_delivered;
978                        let _ = self.persist_task(&paths, &metadata);
979                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
980                        self.insert_rehydrated_task(metadata, paths, true)?;
981                    } else {
982                        self.insert_rehydrated_task(metadata, paths, true)?;
983                    }
984                }
985                _ if metadata.status.is_terminal() => {
986                    // Borrow `paths` for the completion enqueue BEFORE
987                    // `insert_rehydrated_task` consumes it. The completion
988                    // helper only reads from `paths` (stdout/stderr/exit) to
989                    // reconstruct a tail preview, so it must see the same
990                    // paths the rehydrated task will own.
991                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
992                    self.insert_rehydrated_task(metadata, paths, true)?;
993                }
994                _ => {}
995            }
996        }
997
998        Ok(())
999    }
1000
1001    fn replay_session_from_db(
1002        &self,
1003        session_id: &str,
1004    ) -> Option<Result<Vec<PersistedTask>, String>> {
1005        let pool = self
1006            .inner
1007            .db_pool
1008            .read()
1009            .ok()
1010            .and_then(|slot| slot.clone())?;
1011        let harness = self
1012            .inner
1013            .db_harness
1014            .read()
1015            .ok()
1016            .and_then(|slot| slot.clone())?;
1017        let conn = match pool.lock() {
1018            Ok(conn) => conn,
1019            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1020        };
1021        Some(
1022            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1023                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1024                .map_err(|error| error.to_string()),
1025        )
1026    }
1027
1028    fn replay_session_from_disk(
1029        &self,
1030        storage_dir: &Path,
1031        session_id: &str,
1032    ) -> Result<Vec<PersistedTask>, String> {
1033        let dir = session_tasks_dir(storage_dir, session_id);
1034        if !dir.exists() {
1035            return Ok(Vec::new());
1036        }
1037
1038        let entries = fs::read_dir(&dir)
1039            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1040        let mut tasks = Vec::new();
1041        for entry in entries.flatten() {
1042            let path = entry.path();
1043            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1044                continue;
1045            }
1046            match read_task(&path) {
1047                Ok(metadata) => tasks.push(metadata),
1048                Err(error) => {
1049                    crate::slog_warn!(
1050                        "quarantining invalid background task metadata {} during replay: {error}",
1051                        path.display()
1052                    );
1053                    if let Err(quarantine_error) =
1054                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1055                    {
1056                        crate::slog_warn!(
1057                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1058                            path.display()
1059                        );
1060                    }
1061                }
1062            }
1063        }
1064        Ok(tasks)
1065    }
1066
1067    pub fn register_watch(
1068        &self,
1069        task_id: String,
1070        pattern: WatchPattern,
1071        once: bool,
1072    ) -> Result<String, &'static str> {
1073        let task = self.task(&task_id).ok_or("task_not_found")?;
1074        let (mode, terminal_at_registration, stdout, stderr, pty) = task
1075            .state
1076            .lock()
1077            .map(|state| {
1078                (
1079                    state.metadata.mode.clone(),
1080                    state.metadata.status.is_terminal(),
1081                    task.paths.stdout.clone(),
1082                    task.paths.stderr.clone(),
1083                    task.paths.pty.clone(),
1084                )
1085            })
1086            .map_err(|_| "background_task_lock_poisoned")?;
1087
1088        let mut terminal_matches = Vec::new();
1089        let scanned_terminal = terminal_at_registration;
1090        let watch_id = {
1091            let mut registry = self
1092                .inner
1093                .watch_registry
1094                .lock()
1095                .map_err(|_| "watch_registry_poisoned")?;
1096            let watch_id = registry.register(task_id.clone(), pattern, once)?;
1097            match &mode {
1098                BgMode::Pipes => {
1099                    let stdout_key = format!("{task_id}:stdout");
1100                    let stderr_key = format!("{task_id}:stderr");
1101                    if terminal_at_registration {
1102                        registry.set_file_cursor(&stdout_key, 0);
1103                        registry.set_file_cursor(&stderr_key, 0);
1104                        terminal_matches.extend(registry.scan_file_new_bytes(
1105                            &stdout_key,
1106                            &task_id,
1107                            &stdout,
1108                        ));
1109                        terminal_matches.extend(registry.scan_file_new_bytes(
1110                            &stderr_key,
1111                            &task_id,
1112                            &stderr,
1113                        ));
1114                    } else {
1115                        registry.prime_file_cursor(&stdout_key, &stdout);
1116                        registry.prime_file_cursor(&stderr_key, &stderr);
1117                    }
1118                }
1119                BgMode::Pty => {
1120                    let pty_key = format!("{task_id}:pty");
1121                    if terminal_at_registration {
1122                        registry.set_file_cursor(&pty_key, 0);
1123                        terminal_matches
1124                            .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1125                    } else {
1126                        registry.prime_file_cursor(&pty_key, &pty);
1127                    }
1128                }
1129            }
1130            watch_id
1131        };
1132
1133        if task.is_terminal() {
1134            if !scanned_terminal {
1135                terminal_matches = {
1136                    let mut registry = self
1137                        .inner
1138                        .watch_registry
1139                        .lock()
1140                        .map_err(|_| "watch_registry_poisoned")?;
1141                    match &mode {
1142                        BgMode::Pipes => {
1143                            let stdout_key = format!("{task_id}:stdout");
1144                            let stderr_key = format!("{task_id}:stderr");
1145                            registry.set_file_cursor(&stdout_key, 0);
1146                            registry.set_file_cursor(&stderr_key, 0);
1147                            let mut matches =
1148                                registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1149                            matches.extend(registry.scan_file_new_bytes(
1150                                &stderr_key,
1151                                &task_id,
1152                                &stderr,
1153                            ));
1154                            matches
1155                        }
1156                        BgMode::Pty => {
1157                            let pty_key = format!("{task_id}:pty");
1158                            registry.set_file_cursor(&pty_key, 0);
1159                            registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1160                        }
1161                    }
1162                };
1163            }
1164
1165            let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1166            if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1167                if watch_matched {
1168                    let _ = task.set_completion_delivered(true, self);
1169                    self.clear_task_watch_state(&task_id);
1170                }
1171                return Ok(watch_id);
1172            }
1173
1174            let completion = self
1175                .remove_pending_completion(&task_id)
1176                .or_else(|| self.completion_snapshot_for_task(&task));
1177            if terminal_matches.is_empty() {
1178                if let Some(completion) = completion.as_ref() {
1179                    self.emit_bash_watch_exit(completion);
1180                }
1181            } else {
1182                for pattern_match in terminal_matches {
1183                    self.emit_bash_pattern_match(&task.session_id, pattern_match);
1184                }
1185            }
1186            let _ = task.set_completion_delivered(true, self);
1187            self.clear_task_watch_state(&task_id);
1188        }
1189
1190        Ok(watch_id)
1191    }
1192
1193    pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1194        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1195            registry.unregister(task_id, watch_id);
1196        }
1197    }
1198
1199    pub fn active_watch_count(&self, task_id: &str) -> usize {
1200        self.inner
1201            .watch_registry
1202            .lock()
1203            .map(|registry| registry.active_count(task_id))
1204            .unwrap_or(0)
1205    }
1206
1207    fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1208        self.inner
1209            .watch_registry
1210            .lock()
1211            .map(|registry| {
1212                (
1213                    registry.has_controlled_task(task_id),
1214                    registry.has_matched_task(task_id),
1215                )
1216            })
1217            .unwrap_or((false, false))
1218    }
1219
1220    fn task_has_watch_control(&self, task_id: &str) -> bool {
1221        self.inner
1222            .watch_registry
1223            .lock()
1224            .map(|registry| registry.has_controlled_task(task_id))
1225            .unwrap_or(false)
1226    }
1227
1228    fn clear_task_watch_state(&self, task_id: &str) {
1229        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1230            registry.clear_task(task_id);
1231        }
1232    }
1233
1234    pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1235        let (mode, stdout, stderr, pty) = match task.state.lock() {
1236            Ok(state) => (
1237                state.metadata.mode.clone(),
1238                task.paths.stdout.clone(),
1239                task.paths.stderr.clone(),
1240                task.paths.pty.clone(),
1241            ),
1242            Err(_) => return,
1243        };
1244        let mut matches = Vec::new();
1245        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1246            match mode {
1247                BgMode::Pipes => {
1248                    let stdout_key = format!("{}:stdout", task.task_id);
1249                    let stderr_key = format!("{}:stderr", task.task_id);
1250                    matches.extend(registry.scan_file_new_bytes(
1251                        &stdout_key,
1252                        &task.task_id,
1253                        &stdout,
1254                    ));
1255                    matches.extend(registry.scan_file_new_bytes(
1256                        &stderr_key,
1257                        &task.task_id,
1258                        &stderr,
1259                    ));
1260                }
1261                BgMode::Pty => {
1262                    let pty_key = format!("{}:pty", task.task_id);
1263                    matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1264                }
1265            }
1266        }
1267        for pattern_match in matches {
1268            self.emit_bash_pattern_match(&task.session_id, pattern_match);
1269        }
1270    }
1271
1272    pub fn status(
1273        &self,
1274        task_id: &str,
1275        session_id: &str,
1276        project_root: Option<&Path>,
1277        storage_dir: Option<&Path>,
1278        preview_bytes: usize,
1279    ) -> Option<BgTaskSnapshot> {
1280        let mut task = self.task_for_session(task_id, session_id);
1281        if task.is_none() {
1282            if let Some(storage_dir) = storage_dir {
1283                let _ = self.replay_session(storage_dir, session_id);
1284                task = self.task_for_session(task_id, session_id);
1285            }
1286        }
1287        let Some(task) = task else {
1288            return self.status_relaxed(
1289                task_id,
1290                session_id,
1291                project_root?,
1292                storage_dir?,
1293                preview_bytes,
1294            );
1295        };
1296        let _ = self.poll_task(&task);
1297        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1298    }
1299
1300    fn status_relaxed_task(
1301        &self,
1302        task_id: &str,
1303        project_root: &Path,
1304        storage_dir: &Path,
1305    ) -> Option<Arc<BgTask>> {
1306        let canonical_project = canonicalized_path(project_root);
1307        match self.lookup_relaxed_task_from_db(task_id, project_root) {
1308            Some(Ok(Some(metadata))) => {
1309                if let Some(task) = self.task(task_id) {
1310                    let matches_project = task
1311                        .state
1312                        .lock()
1313                        .map(|state| {
1314                            state
1315                                .metadata
1316                                .project_root
1317                                .as_deref()
1318                                .map(canonicalized_path)
1319                                .as_deref()
1320                                == Some(canonical_project.as_path())
1321                        })
1322                        .unwrap_or(false);
1323                    return matches_project.then_some(task);
1324                }
1325                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1326                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1327                    return None;
1328                }
1329                return self.task(task_id);
1330            }
1331            Some(Ok(None)) => {
1332                crate::slog_info!(
1333                    "bash task relaxed DB miss for {}; falling back to disk",
1334                    task_id
1335                );
1336            }
1337            Some(Err(error)) => {
1338                crate::slog_warn!(
1339                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1340                    task_id,
1341                    error
1342                );
1343            }
1344            None => {
1345                crate::slog_info!(
1346                    "bash task relaxed DB unavailable for {}; falling back to disk",
1347                    task_id
1348                );
1349            }
1350        }
1351        let root = storage_dir.join("bash-tasks");
1352        let entries = fs::read_dir(&root).ok()?;
1353        for entry in entries.flatten() {
1354            let dir = entry.path();
1355            if !dir.is_dir() {
1356                continue;
1357            }
1358            let path = dir.join(format!("{task_id}.json"));
1359            if !path.exists() {
1360                continue;
1361            }
1362            let metadata = match read_task(&path) {
1363                Ok(metadata) => metadata,
1364                Err(error) => {
1365                    crate::slog_warn!(
1366                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1367                        path.display()
1368                    );
1369                    if let Err(quarantine_error) =
1370                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1371                    {
1372                        crate::slog_warn!(
1373                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1374                            path.display()
1375                        );
1376                    }
1377                    continue;
1378                }
1379            };
1380            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1381            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1382                continue;
1383            }
1384            if let Some(task) = self.task(task_id) {
1385                let matches_project = task
1386                    .state
1387                    .lock()
1388                    .map(|state| {
1389                        state
1390                            .metadata
1391                            .project_root
1392                            .as_deref()
1393                            .map(canonicalized_path)
1394                            .as_deref()
1395                            == Some(canonical_project.as_path())
1396                    })
1397                    .unwrap_or(false);
1398                return matches_project.then_some(task);
1399            }
1400            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1401            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1402                return None;
1403            }
1404            return self.task(task_id);
1405        }
1406        None
1407    }
1408
1409    fn lookup_relaxed_task_from_db(
1410        &self,
1411        task_id: &str,
1412        project_root: &Path,
1413    ) -> Option<Result<Option<PersistedTask>, String>> {
1414        let pool = self
1415            .inner
1416            .db_pool
1417            .read()
1418            .ok()
1419            .and_then(|slot| slot.clone())?;
1420        let harness = self
1421            .inner
1422            .db_harness
1423            .read()
1424            .ok()
1425            .and_then(|slot| slot.clone())?;
1426        let conn = match pool.lock() {
1427            Ok(conn) => conn,
1428            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1429        };
1430        let project_key = crate::search_index::project_cache_key(project_root);
1431        Some(
1432            crate::db::bash_tasks::find_bash_task_for_project(
1433                &conn,
1434                &harness,
1435                &project_key,
1436                task_id,
1437            )
1438            .map(|row| row.map(PersistedTask::from))
1439            .map_err(|error| error.to_string()),
1440        )
1441    }
1442
1443    pub(super) fn status_relaxed(
1444        &self,
1445        task_id: &str,
1446        _session_id: &str,
1447        project_root: &Path,
1448        storage_dir: &Path,
1449        preview_bytes: usize,
1450    ) -> Option<BgTaskSnapshot> {
1451        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1452        let _ = self.poll_task(&task);
1453        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1454    }
1455
1456    pub fn kill_relaxed(
1457        &self,
1458        task_id: &str,
1459        project_root: &Path,
1460        storage_dir: &Path,
1461    ) -> Result<BgTaskSnapshot, String> {
1462        let task = self
1463            .status_relaxed_task(task_id, project_root, storage_dir)
1464            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1465        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1466    }
1467
1468    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1469        #[cfg(test)]
1470        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1471
1472        let mut deleted = 0usize;
1473
1474        let root = storage_dir.join("bash-tasks");
1475        if root.exists() {
1476            let session_dirs = fs::read_dir(&root).map_err(|e| {
1477                format!(
1478                    "failed to read background task root {}: {e}",
1479                    root.display()
1480                )
1481            })?;
1482            for session_entry in session_dirs.flatten() {
1483                let session_dir = session_entry.path();
1484                if !session_dir.is_dir() {
1485                    continue;
1486                }
1487                let task_entries = match fs::read_dir(&session_dir) {
1488                    Ok(entries) => entries,
1489                    Err(error) => {
1490                        crate::slog_warn!(
1491                            "failed to read background task session dir {}: {error}",
1492                            session_dir.display()
1493                        );
1494                        continue;
1495                    }
1496                };
1497                for task_entry in task_entries.flatten() {
1498                    let json_path = task_entry.path();
1499                    if json_path
1500                        .extension()
1501                        .and_then(|extension| extension.to_str())
1502                        != Some("json")
1503                    {
1504                        continue;
1505                    }
1506                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
1507                        continue;
1508                    }
1509                    let metadata = match read_task(&json_path) {
1510                        Ok(metadata) => metadata,
1511                        Err(error) => {
1512                            crate::slog_warn!(
1513                                "quarantining corrupt background task metadata {}: {error}",
1514                                json_path.display()
1515                            );
1516                            quarantine_task_json(
1517                                storage_dir,
1518                                &session_dir,
1519                                &json_path,
1520                                QuarantineKind::Corrupt,
1521                            )?;
1522                            continue;
1523                        }
1524                    };
1525                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1526                        continue;
1527                    }
1528                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1529                    match delete_task_bundle(&paths) {
1530                        Ok(()) => {
1531                            deleted += 1;
1532                            log::debug!(
1533                                "deleted persisted background task bundle {}",
1534                                metadata.task_id
1535                            );
1536                        }
1537                        Err(error) => {
1538                            crate::slog_warn!(
1539                                "failed to delete background task bundle {}: {error}",
1540                                metadata.task_id
1541                            );
1542                            continue;
1543                        }
1544                    }
1545                }
1546            }
1547        }
1548        gc_quarantine(storage_dir);
1549        Ok(deleted)
1550    }
1551
1552    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1553        let tasks = self
1554            .inner
1555            .tasks
1556            .lock()
1557            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1558            .unwrap_or_default();
1559        tasks
1560            .into_iter()
1561            .map(|task| {
1562                let _ = self.poll_task(&task);
1563                self.snapshot_with_terminal_cache(&task, preview_bytes)
1564            })
1565            .collect()
1566    }
1567
1568    /// Replace terminal pipe snapshots with the task's cached rendered output.
1569    /// Running tasks stay raw (tail-only) so agents debugging a live process see
1570    /// exactly what it emitted. PTY tasks are explicitly excluded: their raw
1571    /// terminal bytes are rendered by the plugin's PTY path, not the line
1572    /// compressor.
1573    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1574        if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1575            return;
1576        }
1577        if let Some(cache) = self.ensure_terminal_output_cache(task) {
1578            snapshot.output_preview = cache.output_preview;
1579            snapshot.output_truncated = cache.output_truncated;
1580        }
1581    }
1582
1583    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1584        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1585    }
1586
1587    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1588        let task = self
1589            .task_for_session(task_id, session_id)
1590            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1591        let terminal_after_promote = {
1592            let mut state = task
1593                .state
1594                .lock()
1595                .map_err(|_| "background task lock poisoned".to_string())?;
1596            let updated = self
1597                .update_task_metadata(&task.paths, |metadata| {
1598                    metadata.notify_on_completion = true;
1599                    metadata.completion_delivered = false;
1600                })
1601                .map_err(|e| format!("failed to promote background task: {e}"))?;
1602            state.metadata = updated;
1603            state.metadata.status.is_terminal()
1604        };
1605        if terminal_after_promote {
1606            self.post_terminal_transition(&task, true)?;
1607        }
1608        Ok(true)
1609    }
1610
1611    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1612        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1613            .map(|_| ())
1614    }
1615
1616    pub fn cleanup_finished(&self, older_than: Duration) {
1617        let cutoff = Instant::now().checked_sub(older_than);
1618        let removable_paths: Vec<(String, TaskPaths)> =
1619            if let Ok(mut tasks) = self.inner.tasks.lock() {
1620                let removable = tasks
1621                    .iter()
1622                    .filter_map(|(task_id, task)| {
1623                        let delivered_terminal = task
1624                            .state
1625                            .lock()
1626                            .map(|state| {
1627                                state.metadata.status.is_terminal()
1628                                    && state.metadata.completion_delivered
1629                            })
1630                            .unwrap_or(false);
1631                        if !delivered_terminal {
1632                            return None;
1633                        }
1634
1635                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1636                        let expired = match (terminal_at, cutoff) {
1637                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1638                            (Some(_), None) => true,
1639                            (None, _) => false,
1640                        };
1641                        expired.then(|| task_id.clone())
1642                    })
1643                    .collect::<Vec<_>>();
1644
1645                removable
1646                    .into_iter()
1647                    .filter_map(|task_id| {
1648                        tasks
1649                            .remove(&task_id)
1650                            .map(|task| (task_id, task.paths.clone()))
1651                    })
1652                    .collect()
1653            } else {
1654                Vec::new()
1655            };
1656
1657        for (task_id, paths) in removable_paths {
1658            match delete_task_bundle(&paths) {
1659                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1660                Err(error) => crate::slog_warn!(
1661                    "failed to delete persisted background task bundle {task_id}: {error}"
1662                ),
1663            }
1664        }
1665    }
1666
1667    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1668        self.drain_completions_for_session(None)
1669    }
1670
1671    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1672        let completions = match self.inner.completions.lock() {
1673            Ok(completions) => completions,
1674            Err(_) => return Vec::new(),
1675        };
1676
1677        completions
1678            .iter()
1679            .filter(|completion| completion_matches_session(completion, session_id))
1680            .cloned()
1681            .collect()
1682    }
1683
1684    pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1685        match self.inner.completions.lock() {
1686            Ok(completions) => completions
1687                .iter()
1688                .any(|completion| completion_matches_session(completion, session_id)),
1689            // Bias to safety: if the queue state cannot be inspected cheaply,
1690            // let callers take the existing drain path rather than risk
1691            // suppressing a pending completion.
1692            Err(_) => true,
1693        }
1694    }
1695
1696    pub fn ack_completions_for_session(
1697        &self,
1698        session_id: Option<&str>,
1699        task_ids: &[String],
1700    ) -> Vec<String> {
1701        if task_ids.is_empty() {
1702            return Vec::new();
1703        }
1704        let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1705        let mut completion_sessions = HashMap::new();
1706        if let Ok(mut completions) = self.inner.completions.lock() {
1707            completions.retain(|completion| {
1708                let session_matches = session_id
1709                    .map(|session_id| completion.session_id == session_id)
1710                    .unwrap_or(true);
1711                if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1712                    completion_sessions
1713                        .insert(completion.task_id.clone(), completion.session_id.clone());
1714                    false
1715                } else {
1716                    true
1717                }
1718            });
1719        }
1720
1721        let mut delivered = Vec::new();
1722        for task_id in task_ids {
1723            let task = if let Some(session_id) = session_id {
1724                self.task_for_session(task_id, session_id)
1725            } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1726                self.task_for_session(task_id, completion_session_id)
1727            } else {
1728                self.task(task_id)
1729            };
1730            if let Some(task) = task {
1731                if task.set_completion_delivered(true, self).is_ok() {
1732                    delivered.push(task_id.clone());
1733                }
1734            }
1735        }
1736
1737        delivered
1738    }
1739
1740    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1741        self.inner
1742            .completions
1743            .lock()
1744            .map(|completions| {
1745                completions
1746                    .iter()
1747                    .filter(|completion| completion.session_id == session_id)
1748                    .cloned()
1749                    .collect()
1750            })
1751            .unwrap_or_default()
1752    }
1753
1754    fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1755        let mut completions = self.inner.completions.lock().ok()?;
1756        let idx = completions
1757            .iter()
1758            .position(|completion| completion.task_id == task_id)?;
1759        completions.remove(idx)
1760    }
1761
1762    fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1763        let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1764        if !snapshot.info.status.is_terminal() {
1765            return None;
1766        }
1767        let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1768            (String::new(), false)
1769        } else {
1770            self.ensure_terminal_output_cache(task)
1771                .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1772                .unwrap_or_else(|| (String::new(), false))
1773        };
1774        Some(BgCompletion {
1775            task_id: snapshot.info.task_id,
1776            session_id: task.session_id.clone(),
1777            status: snapshot.info.status,
1778            exit_code: snapshot.exit_code,
1779            command: snapshot.info.command,
1780            output_preview,
1781            output_truncated,
1782            original_tokens: None,
1783            compressed_tokens: None,
1784            tokens_skipped: false,
1785        })
1786    }
1787
1788    pub fn detach(&self) {
1789        self.inner.shutdown.store(true, Ordering::SeqCst);
1790        if let Ok(mut tasks) = self.inner.tasks.lock() {
1791            for task in tasks.values() {
1792                if let Ok(mut state) = task.state.lock() {
1793                    match &mut state.runtime {
1794                        TaskRuntime::Piped(child) => *child = None,
1795                        TaskRuntime::Pty(runtime) => *runtime = None,
1796                    }
1797                    state.detached = true;
1798                }
1799            }
1800            tasks.clear();
1801        }
1802    }
1803
1804    pub fn shutdown(&self) {
1805        let tasks = self
1806            .inner
1807            .tasks
1808            .lock()
1809            .map(|tasks| {
1810                tasks
1811                    .values()
1812                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1813                    .collect::<Vec<_>>()
1814            })
1815            .unwrap_or_default();
1816        for (task_id, session_id) in tasks {
1817            let _ = self.kill(&task_id, &session_id);
1818        }
1819    }
1820
1821    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1822        if let Ok(state) = task.state.lock() {
1823            if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1824                // On Windows ConPTY, the reader may not observe EOF while the
1825                // master handle is still held in `PtyRuntime`. The waiter writes
1826                // the authoritative exit marker before setting `exit_observed`,
1827                // so once exit is observed we can finalize from that marker and
1828                // drop the runtime, which lets the reader finish. Waiting for
1829                // `reader_done && exit_observed` wedges completed PTY tasks on
1830                // Windows.
1831                if !pty.exit_observed.load(Ordering::SeqCst) {
1832                    return Ok(());
1833                }
1834            }
1835        }
1836        let marker = match read_exit_marker(&task.paths.exit) {
1837            Ok(Some(marker)) => marker,
1838            Ok(None) => return Ok(()),
1839            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1840        };
1841        self.finalize_from_marker(task, marker, None)
1842    }
1843
1844    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1845        let mut needs_completion = false;
1846        {
1847            let Ok(mut state) = task.state.lock() else {
1848                return;
1849            };
1850            match &mut state.runtime {
1851                TaskRuntime::Piped(child_slot) => {
1852                    if let Some(child) = child_slot.as_mut() {
1853                        if matches!(child.try_wait(), Ok(Some(_))) {
1854                            *child_slot = None;
1855                            state.detached = true;
1856                            state.child_exit_observed = true;
1857                        }
1858                    } else if state.detached {
1859                        let child_known_dead = state.child_exit_observed
1860                            || state
1861                                .metadata
1862                                .child_pid
1863                                .is_some_and(|pid| !is_process_alive(pid));
1864                        if child_known_dead {
1865                            needs_completion =
1866                                self.fail_without_exit_marker_if_needed(task, &mut state);
1867                        }
1868                    }
1869                }
1870                TaskRuntime::Pty(Some(pty)) => {
1871                    if pty.exit_observed.load(Ordering::SeqCst) {
1872                        drop(state);
1873                        let _ = self.poll_task(task);
1874                        return;
1875                    }
1876                }
1877                TaskRuntime::Pty(None) => {}
1878            }
1879        }
1880        if needs_completion {
1881            let _ = self.post_terminal_transition(task, true);
1882        }
1883    }
1884
1885    fn fail_without_exit_marker_if_needed(
1886        &self,
1887        task: &Arc<BgTask>,
1888        state: &mut BgTaskState,
1889    ) -> bool {
1890        if state.metadata.status.is_terminal() {
1891            return false;
1892        }
1893        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1894            return false;
1895        }
1896        let watch_controlled = self.task_has_watch_control(&task.task_id);
1897        let updated = self.update_task_metadata(&task.paths, |metadata| {
1898            metadata.mark_terminal(
1899                BgTaskStatus::Failed,
1900                None,
1901                Some("process exited without exit marker".to_string()),
1902            );
1903            if watch_controlled {
1904                metadata.completion_delivered = true;
1905            }
1906        });
1907        if let Ok(metadata) = updated {
1908            state.pending_terminal_override = None;
1909            state.metadata = metadata;
1910            task.mark_terminal_now();
1911            return true;
1912        }
1913        false
1914    }
1915
1916    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1917        self.inner
1918            .tasks
1919            .lock()
1920            .map(|tasks| {
1921                tasks
1922                    .values()
1923                    .filter(|task| task.is_running())
1924                    .cloned()
1925                    .collect()
1926            })
1927            .unwrap_or_default()
1928    }
1929
1930    fn insert_rehydrated_task(
1931        &self,
1932        metadata: PersistedTask,
1933        paths: TaskPaths,
1934        detached: bool,
1935    ) -> Result<(), String> {
1936        let task_id = metadata.task_id.clone();
1937        let session_id = metadata.session_id.clone();
1938        let started = started_instant_from_unix_millis(metadata.started_at);
1939        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1940        let mode = metadata.mode.clone();
1941        let task = Arc::new(BgTask {
1942            task_id: task_id.clone(),
1943            session_id,
1944            paths: paths.clone(),
1945            started,
1946            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1947            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1948            state: Mutex::new(BgTaskState {
1949                metadata,
1950                runtime: if mode == BgMode::Pty {
1951                    TaskRuntime::Pty(None)
1952                } else {
1953                    TaskRuntime::Piped(None)
1954                },
1955                detached,
1956                // Replay path: we never observed the child handle's exit
1957                // in this process (the previous AFT process did, but its
1958                // observation didn't survive restart). Leave this false so
1959                // the second-pass reap falls through to the
1960                // `is_process_alive(child_pid)` probe rather than declaring
1961                // failure based on stale evidence.
1962                child_exit_observed: false,
1963                buffer: if mode == BgMode::Pty {
1964                    BgBuffer::pty(paths.pty.clone())
1965                } else {
1966                    BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1967                },
1968                terminal_output_cache: None,
1969                pending_terminal_override: None,
1970            }),
1971        });
1972        self.inner
1973            .tasks
1974            .lock()
1975            .map_err(|_| "background task registry lock poisoned".to_string())?
1976            .insert(task_id, task);
1977        Ok(())
1978    }
1979
1980    fn kill_with_status(
1981        &self,
1982        task_id: &str,
1983        session_id: &str,
1984        terminal_status: BgTaskStatus,
1985    ) -> Result<BgTaskSnapshot, String> {
1986        let task = self
1987            .task_for_session(task_id, session_id)
1988            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1989        let mut terminalized = false;
1990
1991        {
1992            let mut state = task
1993                .state
1994                .lock()
1995                .map_err(|_| "background task lock poisoned".to_string())?;
1996            if state.metadata.status.is_terminal() {
1997                state.pending_terminal_override = None;
1998            } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1999                state.metadata =
2000                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
2001                if self.task_has_watch_control(&task.task_id) {
2002                    state.metadata.completion_delivered = true;
2003                }
2004                state.pending_terminal_override = None;
2005                task.mark_terminal_now();
2006                match &mut state.runtime {
2007                    // Exit marker already present: the child finished on its
2008                    // own before this kill observed it. Reap it rather than
2009                    // dropping the handle so it doesn't become a zombie
2010                    // (issue #91). The active-kill branch below already
2011                    // `wait()`s after signaling, so this is the only kill
2012                    // path that needed the explicit reap.
2013                    TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2014                    TaskRuntime::Pty(runtime) => *runtime = None,
2015                }
2016                state.detached = true;
2017                self.persist_task(&task.paths, &state.metadata)
2018                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2019                terminalized = true;
2020            } else {
2021                let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2022                if !was_already_killing {
2023                    state.metadata.status = BgTaskStatus::Killing;
2024                    self.persist_task(&task.paths, &state.metadata)
2025                        .map_err(|e| format!("failed to persist killing state: {e}"))?;
2026                }
2027
2028                #[cfg(unix)]
2029                let pgid = state.metadata.pgid;
2030                #[cfg(windows)]
2031                let child_pid = state.metadata.child_pid;
2032                if !was_already_killing
2033                    && state.metadata.mode == BgMode::Pty
2034                    && terminal_status == BgTaskStatus::TimedOut
2035                {
2036                    state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2037                }
2038
2039                #[cfg(windows)]
2040                let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2041
2042                match &mut state.runtime {
2043                    TaskRuntime::Piped(child_slot) => {
2044                        #[cfg(unix)]
2045                        if let Some(pgid) = pgid {
2046                            terminate_pgid(pgid, child_slot.as_mut());
2047                        }
2048                        #[cfg(windows)]
2049                        if let Some(child) = child_slot.as_mut() {
2050                            super::process::terminate_process(child);
2051                        } else if let Some(pid) = child_pid {
2052                            terminate_pid(pid);
2053                        }
2054                        if let Some(child) = child_slot.as_mut() {
2055                            let _ = child.wait();
2056                        }
2057                        *child_slot = None;
2058                        state.detached = true;
2059
2060                        if !task.paths.exit.exists() {
2061                            write_kill_marker_if_absent(&task.paths.exit)
2062                                .map_err(|e| format!("failed to write kill marker: {e}"))?;
2063                        }
2064
2065                        let exit_code = if terminal_status == BgTaskStatus::TimedOut {
2066                            Some(124)
2067                        } else {
2068                            None
2069                        };
2070                        state
2071                            .metadata
2072                            .mark_terminal(terminal_status, exit_code, None);
2073                        if self.task_has_watch_control(&task.task_id) {
2074                            state.metadata.completion_delivered = true;
2075                        }
2076                        state.pending_terminal_override = None;
2077                        task.mark_terminal_now();
2078                        self.persist_task(&task.paths, &state.metadata)
2079                            .map_err(|e| format!("failed to persist killed state: {e}"))?;
2080                        terminalized = true;
2081                    }
2082                    TaskRuntime::Pty(Some(pty)) => {
2083                        pty.was_killed.store(true, Ordering::SeqCst);
2084                        if let Err(error) = pty.killer.kill() {
2085                            crate::slog_warn!(
2086                                "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2087                            );
2088                        }
2089                        if let Some(pid) = pty.child_pid {
2090                            #[cfg(unix)]
2091                            terminate_pgid(pid as i32, None);
2092                            #[cfg(windows)]
2093                            terminate_pid(pid);
2094                        }
2095                        drop(pty.master.take());
2096
2097                        #[cfg(windows)]
2098                        {
2099                            let default_status = if terminal_status == BgTaskStatus::TimedOut {
2100                                BgTaskStatus::TimedOut
2101                            } else {
2102                                BgTaskStatus::Killed
2103                            };
2104                            pty_forced_terminal_status = Some(
2105                                state
2106                                    .pending_terminal_override
2107                                    .take()
2108                                    .unwrap_or(default_status),
2109                            );
2110                        }
2111                    }
2112                    TaskRuntime::Pty(None) => {}
2113                }
2114
2115                #[cfg(windows)]
2116                if let Some(target_status) = pty_forced_terminal_status {
2117                    if !task.paths.exit.exists() {
2118                        write_kill_marker_if_absent(&task.paths.exit)
2119                            .map_err(|e| format!("failed to write kill marker: {e}"))?;
2120                    }
2121
2122                    let exit_code = if target_status == BgTaskStatus::TimedOut {
2123                        Some(124)
2124                    } else {
2125                        None
2126                    };
2127                    state.metadata.mark_terminal(target_status, exit_code, None);
2128                    if self.task_has_watch_control(&task.task_id) {
2129                        state.metadata.completion_delivered = true;
2130                    }
2131                    state.pending_terminal_override = None;
2132                    task.mark_terminal_now();
2133                    if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2134                        *runtime = None;
2135                    }
2136                    state.detached = true;
2137                    self.persist_task(&task.paths, &state.metadata)
2138                        .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2139                    terminalized = true;
2140                }
2141            }
2142        }
2143
2144        if terminalized {
2145            self.post_terminal_transition(&task, true)?;
2146        }
2147        Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2148    }
2149
2150    fn finalize_from_marker(
2151        &self,
2152        task: &Arc<BgTask>,
2153        marker: ExitMarker,
2154        reason: Option<String>,
2155    ) -> Result<(), String> {
2156        let watch_controlled = self.task_has_watch_control(&task.task_id);
2157        let mut pty_reader_done = None;
2158        {
2159            let mut state = task
2160                .state
2161                .lock()
2162                .map_err(|_| "background task lock poisoned".to_string())?;
2163            if state.metadata.status.is_terminal() {
2164                state.pending_terminal_override = None;
2165                return Ok(());
2166            }
2167
2168            let pending_override = state.pending_terminal_override.take();
2169            let is_pty = state.metadata.mode == BgMode::Pty;
2170            let updated = self
2171                .update_task_metadata(&task.paths, |metadata| {
2172                    let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2173                        let mut metadata = metadata.clone();
2174                        let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2175                        let exit_code = if target_status == BgTaskStatus::TimedOut {
2176                            Some(124)
2177                        } else {
2178                            None
2179                        };
2180                        metadata.mark_terminal(target_status, exit_code, reason);
2181                        metadata
2182                    } else {
2183                        terminal_metadata_from_marker(metadata.clone(), marker, reason)
2184                    };
2185                    if watch_controlled {
2186                        new_metadata.completion_delivered = true;
2187                    }
2188                    *metadata = new_metadata;
2189                })
2190                .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2191            state.metadata = updated;
2192            task.mark_terminal_now();
2193            match &mut state.runtime {
2194                // Reap the exited direct child instead of dropping it, so it
2195                // does not linger as a `<defunct>` zombie (issue #91). The
2196                // wrapper writes the exit marker as its final act, so the
2197                // child is already exiting and `wait()` returns immediately.
2198                TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2199                TaskRuntime::Pty(runtime) => {
2200                    pty_reader_done = runtime
2201                        .as_ref()
2202                        .map(|runtime| Arc::clone(&runtime.reader_done));
2203                    *runtime = None;
2204                }
2205            }
2206            state.detached = true;
2207        }
2208
2209        if let Some(reader_done) = pty_reader_done {
2210            let deadline = Instant::now() + Duration::from_millis(200);
2211            while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2212                std::thread::sleep(Duration::from_millis(10));
2213            }
2214        }
2215
2216        // One final scan runs before terminal notification routing so bytes
2217        // printed immediately before exit can win over the exit safety net.
2218        self.scan_task_watch_output(task);
2219
2220        self.post_terminal_transition(task, true)
2221    }
2222
2223    fn enqueue_completion_if_needed(
2224        &self,
2225        metadata: &PersistedTask,
2226        paths: Option<&TaskPaths>,
2227        emit_frame: bool,
2228    ) {
2229        if metadata.status.is_terminal() && !metadata.completion_delivered {
2230            let cache =
2231                paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2232            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2233        }
2234    }
2235
2236    fn render_terminal_output_from_paths(
2237        &self,
2238        metadata: &PersistedTask,
2239        paths: &TaskPaths,
2240    ) -> Option<TerminalOutputCache> {
2241        if metadata.mode == BgMode::Pty {
2242            return None;
2243        }
2244        let buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2245        Some(self.render_terminal_output(metadata, &buffer))
2246    }
2247
2248    fn enqueue_completion_from_parts(
2249        &self,
2250        metadata: &PersistedTask,
2251        buffer: Option<&BgBuffer>,
2252        paths: Option<&TaskPaths>,
2253        emit_frame: bool,
2254        terminal_render: Option<&TerminalOutputCache>,
2255    ) {
2256        // Only the terminal-state guard prevents double-recording here. The
2257        // `completion_delivered` flag is NOT used to gate compression-event
2258        // recording, because `mark_terminal` flips `completion_delivered=true`
2259        // immediately for tasks with `notify_on_completion=false` (foreground
2260        // bash polled via `bash_status`, which is the common case). Pre-emptive
2261        // delivery flagging is correct for the push-frame queue (suppresses
2262        // duplicate user-visible notifications) but would silently skip the
2263        // database insert below. Compression event recording is idempotent at
2264        // the DB layer (unique on harness+session+task_id), so re-entry is
2265        // safe; the dedupe-by-queue check stays for the push frame side.
2266        if !metadata.status.is_terminal() {
2267            return;
2268        }
2269
2270        let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2271            paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2272        } else {
2273            None
2274        };
2275        let render_buffer = buffer.or(owned_buffer.as_ref());
2276        let owned_render = if terminal_render.is_none() {
2277            render_buffer.map(|buffer| self.render_terminal_output(metadata, buffer))
2278        } else {
2279            None
2280        };
2281        let render = terminal_render.or(owned_render.as_ref());
2282
2283        // Completion reminders use the already-rendered terminal output and a
2284        // smaller, exit-aware head+tail cap. They never invoke the compressor
2285        // themselves.
2286        let (output_preview, output_truncated) = render
2287            .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2288            .unwrap_or_else(|| (String::new(), false));
2289
2290        let token_counts = self.completion_token_counts(
2291            metadata,
2292            buffer,
2293            paths,
2294            render.map(|render| render.output_preview.as_str()),
2295        );
2296        let completion = BgCompletion {
2297            task_id: metadata.task_id.clone(),
2298            session_id: metadata.session_id.clone(),
2299            status: metadata.status.clone(),
2300            exit_code: metadata.exit_code,
2301            command: metadata.command.clone(),
2302            output_preview,
2303            output_truncated,
2304            original_tokens: token_counts.original_tokens,
2305            compressed_tokens: token_counts.compressed_tokens,
2306            tokens_skipped: token_counts.tokens_skipped,
2307        };
2308
2309        // Record the compression event BEFORE the push-frame dedupe. Event
2310        // recording has its own idempotency at the DB layer (unique key on
2311        // harness+session+task_id), so it's safe to attempt for every
2312        // terminal-state finalize. Critically, this path runs even when
2313        // `completion_delivered=true` was pre-set by `mark_terminal` for
2314        // foreground bash (`notify_on_completion=false`) — which is the common
2315        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
2316        // after the dedupe guard and never fired for foreground tasks, which
2317        // meant compression accounting was effectively dead for >99% of
2318        // real-world bash usage.
2319        self.record_compression_event_if_applicable(metadata, &token_counts);
2320
2321        let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2322        if watch_controlled {
2323            if emit_frame && !watch_matched {
2324                self.emit_bash_watch_exit(&completion);
2325            }
2326            self.clear_task_watch_state(&metadata.task_id);
2327            return;
2328        }
2329
2330        // Push-frame queue is gated on `completion_delivered` so foreground
2331        // bash with `notify_on_completion=false` does not leak a user-visible
2332        // completion notification. `mark_terminal` pre-sets
2333        // `completion_delivered=true` for those tasks; honoring it here keeps
2334        // the suppression invariant the test
2335        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
2336        // asserts. The compression-event recording above intentionally runs
2337        // before this gate so foreground bash still contributes to the
2338        // session/project aggregates.
2339        if metadata.completion_delivered {
2340            return;
2341        }
2342
2343        // Push-frame queue dedupe stays per-task to prevent duplicate
2344        // user-visible completion notifications.
2345        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2346            if completions
2347                .iter()
2348                .any(|existing| existing.task_id == metadata.task_id)
2349            {
2350                false
2351            } else {
2352                completions.push_back(completion.clone());
2353                true
2354            }
2355        } else {
2356            false
2357        };
2358
2359        if pushed && emit_frame {
2360            self.emit_bash_completed(completion);
2361        }
2362    }
2363
2364    fn record_compression_event_if_applicable(
2365        &self,
2366        metadata: &PersistedTask,
2367        token_counts: &CompletionTokenCounts,
2368    ) {
2369        if metadata.mode == BgMode::Pty {
2370            return;
2371        }
2372
2373        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2374            token_counts.original_tokens,
2375            token_counts.compressed_tokens,
2376            token_counts.original_bytes,
2377            token_counts.compressed_bytes,
2378        ) {
2379            (
2380                Some(original_tokens),
2381                Some(compressed_tokens),
2382                Some(original_bytes),
2383                Some(compressed_bytes),
2384            ) => (
2385                original_tokens,
2386                compressed_tokens,
2387                original_bytes,
2388                compressed_bytes,
2389            ),
2390            _ => {
2391                crate::slog_warn!(
2392                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2393                    metadata.task_id
2394                );
2395                return;
2396            }
2397        };
2398
2399        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2400        let Some(pool) = pool else {
2401            crate::slog_warn!(
2402                "compression event skipped for {}: db_pool not initialized — was configure run?",
2403                metadata.task_id
2404            );
2405            return;
2406        };
2407        let harness = self
2408            .inner
2409            .db_harness
2410            .read()
2411            .ok()
2412            .and_then(|slot| slot.clone());
2413        let Some(harness) = harness else {
2414            crate::slog_warn!(
2415                "compression event insert skipped for {}: harness not configured",
2416                metadata.task_id
2417            );
2418            return;
2419        };
2420
2421        let project_root = metadata
2422            .project_root
2423            .as_deref()
2424            .unwrap_or(&metadata.workdir);
2425        let project_key = crate::search_index::project_cache_key(project_root);
2426        let row = crate::db::compression_events::CompressionEventRow {
2427            harness: &harness,
2428            session_id: Some(&metadata.session_id),
2429            project_key: &project_key,
2430            tool: "bash",
2431            task_id: Some(&metadata.task_id),
2432            command: Some(&metadata.command),
2433            compressor: if metadata.compressed {
2434                "registry"
2435            } else {
2436                "none"
2437            },
2438            original_bytes,
2439            compressed_bytes,
2440            original_tokens,
2441            compressed_tokens,
2442            created_at: unix_millis() as i64,
2443        };
2444
2445        let conn = match pool.lock() {
2446            Ok(conn) => conn,
2447            Err(_) => {
2448                crate::slog_warn!(
2449                    "compression event insert failed for {}: db mutex poisoned",
2450                    metadata.task_id
2451                );
2452                return;
2453            }
2454        };
2455        match crate::db::compression_events::insert_compression_event(&conn, &row) {
2456            Ok(_) => {
2457                // DEBUG-level: each foreground bash call records one of these,
2458                // which clutters info-level logs without adding diagnostic value.
2459                // Aggregate totals are visible via the status RPC / TUI sidebar.
2460                crate::slog_debug!(
2461                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2462                    metadata.task_id,
2463                    project_key,
2464                    metadata.session_id,
2465                    original_tokens,
2466                    compressed_tokens
2467                );
2468            }
2469            Err(error) => {
2470                crate::slog_warn!(
2471                    "compression event insert failed for {}: {}",
2472                    metadata.task_id,
2473                    error
2474                );
2475            }
2476        }
2477    }
2478
2479    fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2480        let Ok(progress_sender) = self
2481            .inner
2482            .progress_sender
2483            .lock()
2484            .map(|sender| sender.clone())
2485        else {
2486            return;
2487        };
2488        if let Some(sender) = progress_sender.as_ref() {
2489            sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2490                pattern_match.task_id,
2491                session_id.to_string(),
2492                pattern_match.watch_id,
2493                pattern_match.match_text,
2494                pattern_match.match_offset,
2495                pattern_match.context,
2496                pattern_match.once,
2497            )));
2498        }
2499    }
2500
2501    fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2502        let Ok(progress_sender) = self
2503            .inner
2504            .progress_sender
2505            .lock()
2506            .map(|sender| sender.clone())
2507        else {
2508            return;
2509        };
2510        let Some(sender) = progress_sender.as_ref() else {
2511            return;
2512        };
2513        let status = completion_status_text(&completion.status, completion.exit_code);
2514        let preview = completion.output_preview.trim_end();
2515        let context = if preview.is_empty() {
2516            format!("task {} exited ({status})", completion.task_id)
2517        } else {
2518            format!(
2519                "task {} exited ({status})
2520{preview}",
2521                completion.task_id
2522            )
2523        };
2524        sender(PushFrame::BashPatternMatch(
2525            BashPatternMatchFrame::task_exit(
2526                completion.task_id.clone(),
2527                completion.session_id.clone(),
2528                format!("exited ({status})"),
2529                context,
2530            ),
2531        ));
2532    }
2533
2534    fn emit_bash_completed(&self, completion: BgCompletion) {
2535        let Ok(progress_sender) = self
2536            .inner
2537            .progress_sender
2538            .lock()
2539            .map(|sender| sender.clone())
2540        else {
2541            return;
2542        };
2543        let Some(sender) = progress_sender.as_ref() else {
2544            return;
2545        };
2546        // Clone the callback out of the registry mutex before writing to stdout;
2547        // otherwise a blocked push-frame write could pin the mutex and starve
2548        // unrelated progress-sender updates.
2549        // Bg task transitions are discovered by the watchdog thread, so the
2550        // sender is shared behind a Mutex. It still uses the same stdout writer
2551        // closure as foreground progress frames, preserving the existing lock/
2552        // flush behavior in main.rs.
2553        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2554            completion.task_id,
2555            completion.session_id,
2556            completion.status,
2557            completion.exit_code,
2558            completion.command,
2559            completion.output_preview,
2560            completion.output_truncated,
2561            completion.original_tokens,
2562            completion.compressed_tokens,
2563            completion.tokens_skipped,
2564        )));
2565    }
2566
2567    fn completion_token_counts(
2568        &self,
2569        metadata: &PersistedTask,
2570        buffer: Option<&BgBuffer>,
2571        paths: Option<&TaskPaths>,
2572        rendered_output: Option<&str>,
2573    ) -> CompletionTokenCounts {
2574        if metadata.mode == BgMode::Pty {
2575            return CompletionTokenCounts::skipped();
2576        }
2577
2578        let raw = match buffer {
2579            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2580            None => paths
2581                .map(|paths| {
2582                    read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2583                })
2584                .unwrap_or(TokenCountInput::Skipped),
2585        };
2586
2587        let TokenCountInput::Text(raw_output) = raw else {
2588            return CompletionTokenCounts::skipped();
2589        };
2590
2591        let original_tokens = token_count_u32(&raw_output);
2592        let original_bytes = raw_output.len() as i64;
2593        let compressed_output = rendered_output.unwrap_or(&raw_output);
2594        let compressed_tokens = token_count_u32(compressed_output);
2595        let compressed_bytes = compressed_output.len() as i64;
2596        CompletionTokenCounts {
2597            original_tokens: Some(original_tokens),
2598            compressed_tokens: Some(compressed_tokens),
2599            original_bytes: Some(original_bytes),
2600            compressed_bytes: Some(compressed_bytes),
2601            tokens_skipped: false,
2602        }
2603    }
2604
2605    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2606        if !self
2607            .inner
2608            .long_running_reminder_enabled
2609            .load(Ordering::SeqCst)
2610        {
2611            return;
2612        }
2613        let interval_ms = self
2614            .inner
2615            .long_running_reminder_interval_ms
2616            .load(Ordering::SeqCst);
2617        if interval_ms == 0 {
2618            return;
2619        }
2620        let interval = Duration::from_millis(interval_ms);
2621        let now = Instant::now();
2622        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2623            return;
2624        };
2625        let since = last_reminder_at.unwrap_or(task.started);
2626        if now.duration_since(since) < interval {
2627            return;
2628        }
2629        let command = task
2630            .state
2631            .lock()
2632            .map(|state| state.metadata.command.clone())
2633            .unwrap_or_default();
2634        *last_reminder_at = Some(now);
2635        self.emit_bash_long_running(BashLongRunningFrame::new(
2636            task.task_id.clone(),
2637            task.session_id.clone(),
2638            command,
2639            task.started.elapsed().as_millis() as u64,
2640        ));
2641    }
2642
2643    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2644        let Ok(progress_sender) = self
2645            .inner
2646            .progress_sender
2647            .lock()
2648            .map(|sender| sender.clone())
2649        else {
2650            return;
2651        };
2652        if let Some(sender) = progress_sender.as_ref() {
2653            sender(PushFrame::BashLongRunning(frame));
2654        }
2655    }
2656
2657    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2658        self.inner
2659            .tasks
2660            .lock()
2661            .ok()
2662            .and_then(|tasks| tasks.get(task_id).cloned())
2663    }
2664
2665    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2666        self.task(task_id)
2667            .filter(|task| task.session_id == session_id)
2668    }
2669
2670    fn running_count(&self) -> usize {
2671        self.inner
2672            .tasks
2673            .lock()
2674            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2675            .unwrap_or(0)
2676    }
2677
2678    fn start_watchdog(&self) {
2679        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2680            super::watchdog::start(self.clone());
2681        }
2682    }
2683
2684    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2685        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2686    }
2687
2688    #[cfg(test)]
2689    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2690        self.task_for_session(task_id, session_id)
2691            .map(|task| task.paths.json.clone())
2692    }
2693
2694    #[cfg(test)]
2695    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2696        self.task_for_session(task_id, session_id)
2697            .map(|task| task.paths.exit.clone())
2698    }
2699
2700    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
2701    fn generate_unique_task_id(&self) -> Result<String, String> {
2702        for _ in 0..32 {
2703            let candidate = random_slug();
2704            let tasks = self
2705                .inner
2706                .tasks
2707                .lock()
2708                .map_err(|_| "background task registry lock poisoned".to_string())?;
2709            if tasks.contains_key(&candidate) {
2710                continue;
2711            }
2712            let completions = self
2713                .inner
2714                .completions
2715                .lock()
2716                .map_err(|_| "background completions lock poisoned".to_string())?;
2717            if completions
2718                .iter()
2719                .any(|completion| completion.task_id == candidate)
2720            {
2721                continue;
2722            }
2723            return Ok(candidate);
2724        }
2725        Err("failed to allocate unique background task id after 32 attempts".to_string())
2726    }
2727}
2728
2729fn render_compressed_with_recovery(
2730    buffer: &BgBuffer,
2731    mut compressed: CompressionResult,
2732    input_truncated: bool,
2733) -> TerminalOutputCache {
2734    // Preserve a single canonical trailing newline. A bare `.trim_end()` strips
2735    // the legitimate final newline that `echo` and most commands emit, so
2736    // agent-facing output diverged from native bash ("hello" vs "hello\n") and
2737    // broke the no-JSON-envelope contract. Collapse excess trailing blank lines
2738    // to one, but keep that one when the content had a trailing newline. NOTE:
2739    // the check must read the ORIGINAL text — strip_plain_truncation_marker_lines
2740    // rebuilds via `.lines().join("\n")`, which itself drops the trailing newline.
2741    let had_trailing_newline = compressed.text.ends_with('\n');
2742    let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2743        .trim_end()
2744        .to_string();
2745    if had_trailing_newline && !text.is_empty() {
2746        text.push('\n');
2747    }
2748    compressed.text = text;
2749
2750    let output_path = buffer.output_path().map(|path| path.display().to_string());
2751    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2752    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2753    let mut recovery = RecoveryContext {
2754        dropped_by_class: compressed.dropped_by_class,
2755        had_inner_drop: compressed.had_inner_drop,
2756        offset_hint_eligible: compressed.offset_hint_eligible,
2757        offset_start_line: compressed.offset_start_line,
2758        byte_truncated: input_truncated,
2759        output_path: output_path.clone(),
2760        stderr_path: stderr_path.clone(),
2761        include_stderr_path,
2762    };
2763
2764    let (output_preview, output_truncated) =
2765        render_body_with_recovery_marker(&compressed.text, &mut recovery);
2766    TerminalOutputCache {
2767        output_preview,
2768        output_truncated,
2769        kind: TerminalOutputKind::Compressed,
2770        output_path,
2771        stderr_path,
2772        recovery: Some(recovery),
2773    }
2774}
2775
2776fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2777    render_body_with_recovery_marker_at_cap(
2778        body,
2779        recovery,
2780        FINAL_OUTPUT_CAP_BYTES,
2781        cap_final_output,
2782        cap_final_output_with_marker,
2783    )
2784}
2785
2786fn render_raw_body_with_recovery_marker(
2787    body: &str,
2788    recovery: &mut RecoveryContext,
2789) -> (String, bool) {
2790    render_body_with_recovery_marker_at_cap(
2791        body,
2792        recovery,
2793        RAW_PASSTHROUGH_CAP_BYTES,
2794        |input| {
2795            super::output::cap_head_tail(
2796                input,
2797                RAW_PASSTHROUGH_CAP_BYTES,
2798                RAW_PASSTHROUGH_HEAD_BYTES,
2799                RAW_PASSTHROUGH_TAIL_BYTES,
2800            )
2801        },
2802        |input, marker| {
2803            super::output::cap_head_tail_with_marker(
2804                input,
2805                RAW_PASSTHROUGH_CAP_BYTES,
2806                RAW_PASSTHROUGH_HEAD_BYTES,
2807                RAW_PASSTHROUGH_TAIL_BYTES,
2808                marker,
2809            )
2810        },
2811    )
2812}
2813
2814fn render_body_with_recovery_marker_at_cap<F, G>(
2815    body: &str,
2816    recovery: &mut RecoveryContext,
2817    cap_bytes: usize,
2818    cap_plain: F,
2819    cap_with_marker: G,
2820) -> (String, bool)
2821where
2822    F: Fn(&str) -> super::output::CappedText,
2823    G: Fn(&str, &str) -> super::output::CappedText,
2824{
2825    let needs_marker = recovery.has_visible_drop();
2826    if body.len() > cap_bytes {
2827        recovery.byte_truncated = true;
2828        if let Some(marker) = recovery_marker(recovery) {
2829            let capped = cap_with_marker(body, &marker);
2830            return (capped.text, true);
2831        }
2832        let capped = cap_plain(body);
2833        return (capped.text, capped.truncated || needs_marker);
2834    }
2835
2836    if !needs_marker {
2837        return (body.to_string(), false);
2838    }
2839
2840    let Some(marker) = recovery_marker(recovery) else {
2841        return (body.to_string(), true);
2842    };
2843    let with_marker = append_recovery_marker(body, &marker);
2844    if with_marker.len() <= cap_bytes {
2845        return (with_marker, true);
2846    }
2847
2848    recovery.byte_truncated = true;
2849    let marker = recovery_marker(recovery).unwrap_or(marker);
2850    let capped = cap_with_marker(body, &marker);
2851    (capped.text, true)
2852}
2853
2854fn append_recovery_marker(body: &str, marker: &str) -> String {
2855    if body.is_empty() {
2856        return marker.to_string();
2857    }
2858    let mut output = body.trim_end().to_string();
2859    output.push('\n');
2860    output.push_str(marker);
2861    output
2862}
2863
2864fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2865    let mut parts = Vec::new();
2866    for (class, count) in &recovery.dropped_by_class {
2867        let label = if *count == 1 {
2868            class.singular()
2869        } else {
2870            class.plural()
2871        };
2872        parts.push(format!("+{count} more {label}"));
2873    }
2874    if recovery.byte_truncated {
2875        parts.push("truncated output".to_string());
2876    } else if recovery.had_inner_drop && parts.is_empty() {
2877        parts.push("omitted output".to_string());
2878    }
2879
2880    if parts.is_empty() {
2881        return None;
2882    }
2883
2884    let hint = recovery_hint(recovery);
2885    Some(format!("[{}; {hint}]", parts.join(", ")))
2886}
2887
2888fn recovery_hint(recovery: &RecoveryContext) -> String {
2889    // AFT stores stdout/stderr separately and combines them in memory. Class caps,
2890    // middle truncation, and mixed stdout/stderr renders are not line-offset
2891    // portable. Only a single-file contiguous-prefix drop may use `tail -n +N`.
2892    if recovery.offset_hint_eligible
2893        && !recovery.byte_truncated
2894        && recovery.dropped_by_class.is_empty()
2895        && !recovery.include_stderr_path
2896    {
2897        if let (Some(path), Some(line)) =
2898            (recovery.output_path.as_deref(), recovery.offset_start_line)
2899        {
2900            return format!("see remaining: tail -n +{line} {}", quote_path(path));
2901        }
2902    }
2903
2904    let mut paths = Vec::new();
2905    if let Some(path) = recovery.output_path.as_deref() {
2906        paths.push(path);
2907    }
2908    if recovery.include_stderr_path {
2909        if let Some(path) = recovery.stderr_path.as_deref() {
2910            if !paths.contains(&path) {
2911                paths.push(path);
2912            }
2913        }
2914    }
2915
2916    if paths.is_empty() {
2917        return "full output unavailable".to_string();
2918    }
2919
2920    let reads = paths
2921        .into_iter()
2922        .map(|path| format!("read {}", quote_path(path)))
2923        .collect::<Vec<_>>()
2924        .join(" and ");
2925    format!("full output: {reads}")
2926}
2927
2928fn strip_plain_truncation_marker_lines(input: &str) -> String {
2929    input
2930        .lines()
2931        .filter(|line| !is_plain_truncation_marker(line.trim()))
2932        .collect::<Vec<_>>()
2933        .join("\n")
2934}
2935
2936fn strip_recovery_marker_lines(input: &str) -> String {
2937    input
2938        .lines()
2939        .filter(|line| !is_recovery_marker(line.trim()))
2940        .collect::<Vec<_>>()
2941        .join("\n")
2942}
2943
2944fn is_plain_truncation_marker(line: &str) -> bool {
2945    let Some(rest) = line.strip_prefix("...<truncated ") else {
2946        return false;
2947    };
2948    let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2949        return false;
2950    };
2951    !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2952}
2953
2954fn is_recovery_marker(line: &str) -> bool {
2955    line.starts_with('[')
2956        && line.ends_with(']')
2957        && (line.contains("full output: read ")
2958            || line.contains("see remaining: tail -n +")
2959            || line.contains("full output unavailable"))
2960}
2961
2962fn render_structured_output(command: &str, buffer: &BgBuffer) -> Option<TerminalOutputCache> {
2963    if !is_gh_structured_command(command) {
2964        return None;
2965    }
2966
2967    let output_path = buffer
2968        .output_path()
2969        .map(|path| path.display().to_string())?;
2970    let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2971    if stdout_bytes == 0 {
2972        return None;
2973    }
2974
2975    if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2976        if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2977            return None;
2978        }
2979        return Some(TerminalOutputCache {
2980            output_preview: json_output_pointer(stdout_bytes, &output_path),
2981            output_truncated: true,
2982            kind: TerminalOutputKind::Structured,
2983            output_path: Some(output_path),
2984            stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2985            recovery: None,
2986        });
2987    }
2988
2989    let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
2990    if stdout.truncated || !is_structured_body(&stdout.text) {
2991        return None;
2992    }
2993
2994    Some(TerminalOutputCache {
2995        output_preview: stdout.text,
2996        output_truncated: false,
2997        kind: TerminalOutputKind::Structured,
2998        output_path: Some(output_path),
2999        stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3000        recovery: None,
3001    })
3002}
3003
3004fn render_raw_passthrough(buffer: &BgBuffer) -> TerminalOutputCache {
3005    let raw = buffer.read_combined_head_tail(
3006        RAW_PASSTHROUGH_CAP_BYTES,
3007        RAW_PASSTHROUGH_HEAD_BYTES,
3008        RAW_PASSTHROUGH_TAIL_BYTES,
3009    );
3010    let output_path = buffer.output_path().map(|path| path.display().to_string());
3011    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3012    if !raw.truncated {
3013        return TerminalOutputCache {
3014            output_preview: raw.text,
3015            output_truncated: false,
3016            kind: TerminalOutputKind::Raw,
3017            output_path,
3018            stderr_path,
3019            recovery: None,
3020        };
3021    }
3022
3023    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3024    let mut recovery = RecoveryContext {
3025        dropped_by_class: BTreeMap::new(),
3026        had_inner_drop: false,
3027        offset_hint_eligible: false,
3028        offset_start_line: None,
3029        byte_truncated: true,
3030        output_path: output_path.clone(),
3031        stderr_path: stderr_path.clone(),
3032        include_stderr_path,
3033    };
3034    let (output_preview, output_truncated) =
3035        render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3036    TerminalOutputCache {
3037        output_preview,
3038        output_truncated,
3039        kind: TerminalOutputKind::Raw,
3040        output_path,
3041        stderr_path,
3042        recovery: Some(recovery),
3043    }
3044}
3045
3046fn completion_preview_for_cache(
3047    cache: &TerminalOutputCache,
3048    exit_code: Option<i32>,
3049) -> (String, bool) {
3050    // Reminder previews are sized by exit status: success gets a short tail,
3051    // failure keeps head+tail context (see output.rs completion caps).
3052    let exit_ok = exit_code == Some(0);
3053    let threshold = completion_preview_threshold(exit_ok);
3054    if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3055        if let Some(path) = cache.output_path.as_deref() {
3056            return (
3057                json_output_pointer(cache.output_preview.len() as u64, path),
3058                true,
3059            );
3060        }
3061        return (cache.output_preview.clone(), cache.output_truncated);
3062    }
3063
3064    if let Some(recovery) = cache.recovery.as_ref() {
3065        if cache.output_preview.len() <= threshold {
3066            return (cache.output_preview.clone(), cache.output_truncated);
3067        }
3068        let body = strip_recovery_marker_lines(&cache.output_preview);
3069        let mut completion_recovery = recovery.clone();
3070        completion_recovery.byte_truncated = true;
3071        if let Some(marker) = recovery_marker(&completion_recovery) {
3072            let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3073            return (capped.text, true);
3074        }
3075    }
3076
3077    let capped = cap_completion_output(&cache.output_preview, exit_ok);
3078    (capped.text, cache.output_truncated || capped.truncated)
3079}
3080
3081fn is_gh_structured_command(command: &str) -> bool {
3082    let normalized = crate::compress::normalize_command_for_dispatch(command)
3083        .unwrap_or_else(|| command.trim_start().to_string());
3084    let tokens = shell_words_for_flags(&normalized);
3085    let Some(head) = tokens.first() else {
3086        return false;
3087    };
3088    let head_name = Path::new(head)
3089        .file_name()
3090        .and_then(|name| name.to_str())
3091        .unwrap_or(head);
3092    if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3093        return false;
3094    }
3095    tokens.iter().any(|token| {
3096        matches!(token.as_str(), "--json" | "--jq" | "--template")
3097            || token.starts_with("--json=")
3098            || token.starts_with("--jq=")
3099            || token.starts_with("--template=")
3100    })
3101}
3102
3103fn shell_words_for_flags(command: &str) -> Vec<String> {
3104    let mut words = Vec::new();
3105    let mut current = String::new();
3106    let mut in_single = false;
3107    let mut in_double = false;
3108    let mut escaped = false;
3109
3110    for ch in command.chars() {
3111        if escaped {
3112            current.push(ch);
3113            escaped = false;
3114            continue;
3115        }
3116        if ch == '\\' && !in_single {
3117            escaped = true;
3118            continue;
3119        }
3120        if ch == '\'' && !in_double {
3121            in_single = !in_single;
3122            continue;
3123        }
3124        if ch == '"' && !in_single {
3125            in_double = !in_double;
3126            continue;
3127        }
3128        if ch.is_whitespace() && !in_single && !in_double {
3129            if !current.is_empty() {
3130                words.push(std::mem::take(&mut current));
3131            }
3132            continue;
3133        }
3134        if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3135            if !current.is_empty() {
3136                words.push(std::mem::take(&mut current));
3137            }
3138            continue;
3139        }
3140        current.push(ch);
3141    }
3142    if !current.is_empty() {
3143        words.push(current);
3144    }
3145    words
3146}
3147
3148fn is_structured_body(body: &str) -> bool {
3149    let trimmed = body.trim();
3150    if trimmed.is_empty() {
3151        return false;
3152    }
3153    if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3154        return true;
3155    }
3156
3157    let mut saw_line = false;
3158    for line in trimmed
3159        .lines()
3160        .map(str::trim)
3161        .filter(|line| !line.is_empty())
3162    {
3163        saw_line = true;
3164        if serde_json::from_str::<serde_json::Value>(line).is_err() {
3165            return false;
3166        }
3167    }
3168    saw_line
3169}
3170
3171fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3172    let path = match (buffer, stream) {
3173        (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3174        (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3175        (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3176    };
3177    let Some(path) = path else {
3178        return false;
3179    };
3180    let Ok(file) = std::fs::File::open(path) else {
3181        return false;
3182    };
3183    let mut limited = file.take(512);
3184    let mut bytes = Vec::new();
3185    if limited.read_to_end(&mut bytes).is_err() {
3186        return false;
3187    }
3188    String::from_utf8_lossy(&bytes)
3189        .chars()
3190        .find(|ch| !ch.is_whitespace())
3191        .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3192}
3193
3194struct CompletionTokenCounts {
3195    original_tokens: Option<u32>,
3196    compressed_tokens: Option<u32>,
3197    original_bytes: Option<i64>,
3198    compressed_bytes: Option<i64>,
3199    tokens_skipped: bool,
3200}
3201
3202impl CompletionTokenCounts {
3203    fn skipped() -> Self {
3204        Self {
3205            original_tokens: None,
3206            compressed_tokens: None,
3207            original_bytes: None,
3208            compressed_bytes: None,
3209            tokens_skipped: true,
3210        }
3211    }
3212}
3213
3214fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3215    match status {
3216        BgTaskStatus::TimedOut => "timed out".to_string(),
3217        BgTaskStatus::Killed => "killed".to_string(),
3218        _ => exit_code
3219            .map(|code| format!("exit {code}"))
3220            .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3221    }
3222}
3223
3224fn token_count_u32(text: &str) -> u32 {
3225    aft_tokenizer::count_tokens(text)
3226        .try_into()
3227        .unwrap_or(u32::MAX)
3228}
3229
3230impl Default for BgTaskRegistry {
3231    fn default() -> Self {
3232        Self::new(Arc::new(Mutex::new(None)))
3233    }
3234}
3235
3236fn modified_within(path: &Path, grace: Duration) -> bool {
3237    fs::metadata(path)
3238        .and_then(|metadata| metadata.modified())
3239        .ok()
3240        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3241        .map(|age| age < grace)
3242        .unwrap_or(false)
3243}
3244
3245fn canonicalized_path(path: &Path) -> PathBuf {
3246    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3247}
3248
3249fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3250    let now_ms = SystemTime::now()
3251        .duration_since(UNIX_EPOCH)
3252        .ok()
3253        .map(|duration| duration.as_millis() as u64)
3254        .unwrap_or(started_at);
3255    let elapsed_ms = now_ms.saturating_sub(started_at);
3256    Instant::now()
3257        .checked_sub(Duration::from_millis(elapsed_ms))
3258        .unwrap_or_else(Instant::now)
3259}
3260
3261fn gc_quarantine(storage_dir: &Path) {
3262    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3263    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3264        return;
3265    };
3266    for session_entry in session_dirs.flatten() {
3267        let session_quarantine_dir = session_entry.path();
3268        if !session_quarantine_dir.is_dir() {
3269            continue;
3270        }
3271        let entries = match fs::read_dir(&session_quarantine_dir) {
3272            Ok(entries) => entries,
3273            Err(error) => {
3274                crate::slog_warn!(
3275                    "failed to read background task quarantine dir {}: {error}",
3276                    session_quarantine_dir.display()
3277                );
3278                continue;
3279            }
3280        };
3281        for entry in entries.flatten() {
3282            let path = entry.path();
3283            if modified_within(&path, QUARANTINE_GC_GRACE) {
3284                continue;
3285            }
3286            let result = if path.is_dir() {
3287                fs::remove_dir_all(&path)
3288            } else {
3289                fs::remove_file(&path)
3290            };
3291            match result {
3292                Ok(()) => log::debug!(
3293                    "deleted old background task quarantine entry {}",
3294                    path.display()
3295                ),
3296                Err(error) => crate::slog_warn!(
3297                    "failed to delete old background task quarantine entry {}: {error}",
3298                    path.display()
3299                ),
3300            }
3301        }
3302        let _ = fs::remove_dir(&session_quarantine_dir);
3303    }
3304    let _ = fs::remove_dir(&quarantine_root);
3305}
3306
3307enum QuarantineKind {
3308    Corrupt,
3309    Invalid,
3310}
3311
3312fn quarantine_task_json(
3313    storage_dir: &Path,
3314    session_dir: &Path,
3315    json_path: &Path,
3316    kind: QuarantineKind,
3317) -> Result<(), String> {
3318    let session_hash = session_dir
3319        .file_name()
3320        .and_then(|name| name.to_str())
3321        .ok_or_else(|| {
3322            format!(
3323                "invalid background task session dir: {}",
3324                session_dir.display()
3325            )
3326        })?;
3327    let task_name = json_path
3328        .file_name()
3329        .and_then(|name| name.to_str())
3330        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3331    let unix_ts = SystemTime::now()
3332        .duration_since(UNIX_EPOCH)
3333        .map(|duration| duration.as_secs())
3334        .unwrap_or(0);
3335    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3336    fs::create_dir_all(&quarantine_dir).map_err(|e| {
3337        format!(
3338            "failed to create background task quarantine dir {}: {e}",
3339            quarantine_dir.display()
3340        )
3341    })?;
3342    let target_name = quarantine_name(task_name, unix_ts, &kind);
3343    let target = quarantine_dir.join(target_name);
3344    fs::rename(json_path, &target).map_err(|e| {
3345        format!(
3346            "failed to quarantine background task metadata {} to {}: {e}",
3347            json_path.display(),
3348            target.display()
3349        )
3350    })?;
3351
3352    for sibling in task_sibling_paths(json_path) {
3353        if !sibling.exists() {
3354            continue;
3355        }
3356        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3357            crate::slog_warn!(
3358                "skipping background task sibling with invalid name during quarantine: {}",
3359                sibling.display()
3360            );
3361            continue;
3362        };
3363        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3364        if let Err(error) = fs::rename(&sibling, &sibling_target) {
3365            crate::slog_warn!(
3366                "failed to quarantine background task sibling {} to {}: {error}",
3367                sibling.display(),
3368                sibling_target.display()
3369            );
3370        }
3371    }
3372
3373    let _ = fs::remove_dir(session_dir);
3374    Ok(())
3375}
3376
3377fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3378    match kind {
3379        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3380        QuarantineKind::Invalid => {
3381            let path = Path::new(file_name);
3382            let stem = path.file_stem().and_then(|stem| stem.to_str());
3383            let extension = path.extension().and_then(|extension| extension.to_str());
3384            match (stem, extension) {
3385                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3386                _ => format!("{file_name}.invalid.{unix_ts}"),
3387            }
3388        }
3389    }
3390}
3391
3392fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3393    let Some(parent) = json_path.parent() else {
3394        return Vec::new();
3395    };
3396    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3397        return Vec::new();
3398    };
3399    ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3400        .into_iter()
3401        .map(|extension| parent.join(format!("{stem}.{extension}")))
3402        .collect()
3403}
3404
3405fn read_for_token_count_from_disk(
3406    metadata: &PersistedTask,
3407    paths: &TaskPaths,
3408    max_bytes_per_stream: usize,
3409) -> TokenCountInput {
3410    if metadata.mode == BgMode::Pty {
3411        return TokenCountInput::Skipped;
3412    }
3413    // Read up to `max_bytes_per_stream` bytes per stream rather than
3414    // refusing to tokenize anything when the file exceeds the cap.
3415    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
3416    // (see comment there) — large outputs are exactly the tasks that
3417    // benefit most from compression accounting, so silent-skipping
3418    // them defeats the purpose of token tracking.
3419    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3420    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3421    match (stdout, stderr) {
3422        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3423            String::from_utf8_lossy(&stdout).as_ref(),
3424            String::from_utf8_lossy(&stderr).as_ref(),
3425        )),
3426        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3427            String::from_utf8_lossy(&stdout).as_ref(),
3428            "",
3429        )),
3430        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3431            "",
3432            String::from_utf8_lossy(&stderr).as_ref(),
3433        )),
3434        (Err(_), Err(_)) => TokenCountInput::Skipped,
3435    }
3436}
3437
3438/// Read at most `max_bytes` bytes from the END of `path`. Used for
3439/// tokenization where the most recent output is more representative than
3440/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
3441/// opened (genuinely missing or permissions error).
3442fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3443    use std::io::{Read, Seek, SeekFrom};
3444    let mut file = std::fs::File::open(path)?;
3445    let len = file.metadata()?.len();
3446    let read_len = len.min(max_bytes as u64);
3447    if read_len > 0 && len > max_bytes as u64 {
3448        file.seek(SeekFrom::End(-(read_len as i64)))?;
3449    }
3450    let mut bytes = Vec::with_capacity(read_len as usize);
3451    file.read_to_end(&mut bytes)?;
3452    Ok(bytes)
3453}
3454
3455impl BgTask {
3456    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3457        let state = self
3458            .state
3459            .lock()
3460            .unwrap_or_else(|poison| poison.into_inner());
3461        self.snapshot_locked(&state, preview_bytes)
3462    }
3463
3464    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3465        let metadata = &state.metadata;
3466        let duration_ms = metadata.duration_ms.or_else(|| {
3467            metadata
3468                .status
3469                .is_terminal()
3470                .then(|| self.started.elapsed().as_millis() as u64)
3471        });
3472        let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3473            (String::new(), false)
3474        } else if metadata.status.is_terminal() {
3475            state
3476                .terminal_output_cache
3477                .as_ref()
3478                .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3479                .unwrap_or_else(|| (String::new(), false))
3480        } else {
3481            state.buffer.read_tail(preview_bytes)
3482        };
3483        BgTaskSnapshot {
3484            info: BgTaskInfo {
3485                task_id: self.task_id.clone(),
3486                status: metadata.status.clone(),
3487                command: metadata.command.clone(),
3488                mode: metadata.mode.clone(),
3489                started_at: metadata.started_at,
3490                duration_ms,
3491            },
3492            exit_code: metadata.exit_code,
3493            child_pid: metadata.child_pid,
3494            workdir: metadata.workdir.display().to_string(),
3495            output_preview,
3496            output_truncated,
3497            output_path: state
3498                .buffer
3499                .output_path()
3500                .map(|path| path.display().to_string()),
3501            stderr_path: state
3502                .buffer
3503                .stderr_path()
3504                .map(|path| path.display().to_string()),
3505            pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3506            pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3507        }
3508    }
3509
3510    pub(crate) fn is_running(&self) -> bool {
3511        self.state
3512            .lock()
3513            .map(|state| {
3514                state.metadata.status == BgTaskStatus::Running
3515                    || (state.metadata.mode == BgMode::Pty
3516                        && state.metadata.status == BgTaskStatus::Killing)
3517            })
3518            .unwrap_or(false)
3519    }
3520
3521    fn is_terminal(&self) -> bool {
3522        self.state
3523            .lock()
3524            .map(|state| state.metadata.status.is_terminal())
3525            .unwrap_or(false)
3526    }
3527
3528    fn mark_terminal_now(&self) {
3529        if let Ok(mut terminal_at) = self.terminal_at.lock() {
3530            if terminal_at.is_none() {
3531                *terminal_at = Some(Instant::now());
3532            }
3533        }
3534    }
3535
3536    fn set_completion_delivered(
3537        &self,
3538        delivered: bool,
3539        registry: &BgTaskRegistry,
3540    ) -> Result<(), String> {
3541        let mut state = self
3542            .state
3543            .lock()
3544            .map_err(|_| "background task lock poisoned".to_string())?;
3545        let updated = registry
3546            .update_task_metadata(&self.paths, |metadata| {
3547                metadata.completion_delivered = delivered;
3548            })
3549            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3550        state.metadata = updated;
3551        Ok(())
3552    }
3553}
3554
3555/// Reap an exited direct child handle, then clear the slot.
3556///
3557/// Dropping a [`std::process::Child`] does NOT `wait()` on the underlying OS
3558/// process. On Unix a finished-but-unreaped child lingers as a `<defunct>`
3559/// zombie until the AFT process itself exits (issue #91: `[mv] <defunct>`).
3560/// The terminal-transition paths that learn of completion from the
3561/// exit-marker file — rather than from [`BgTaskRegistry::reap_child`]'s
3562/// `try_wait()` — must therefore reap the handle explicitly instead of just
3563/// nulling it.
3564///
3565/// The exit marker is written by the wrapper's final statement (an atomic
3566/// `mv` rename), so by the time we observe the marker the direct child has
3567/// finished its work and is exiting; `wait()` returns essentially
3568/// immediately. We attempt a non-blocking `try_wait()` first so the common
3569/// case never blocks at all, falling back to a (bounded) `wait()` only to
3570/// cover the microsecond window between the rename and process teardown.
3571///
3572/// Callers hold the task state mutex, so this is serialized against
3573/// `reap_child` — there is no double-`wait()` hazard: whichever path acquires
3574/// the lock first reaps and clears the slot, and the other observes `None`.
3575#[cfg(unix)]
3576fn reap_piped_child(child_slot: &mut Option<Child>) {
3577    if let Some(mut child) = child_slot.take() {
3578        if matches!(child.try_wait(), Ok(None)) {
3579            let _ = child.wait();
3580        }
3581    }
3582}
3583
3584/// Windows has no zombie/`<defunct>` concept: dropping the [`Child`] closes
3585/// the process handle, which is the correct release. Preserve the historical
3586/// behavior of simply clearing the slot so the documented Windows PID-recycle
3587/// handling in `reap_child` is unaffected.
3588#[cfg(windows)]
3589fn reap_piped_child(child_slot: &mut Option<Child>) {
3590    *child_slot = None;
3591}
3592
3593fn terminal_metadata_from_marker(
3594    mut metadata: PersistedTask,
3595    marker: ExitMarker,
3596    reason: Option<String>,
3597) -> PersistedTask {
3598    match marker {
3599        ExitMarker::Code(code) => {
3600            let status = if code == 0 {
3601                BgTaskStatus::Completed
3602            } else {
3603                BgTaskStatus::Failed
3604            };
3605            metadata.mark_terminal(status, Some(code), reason);
3606        }
3607        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3608    }
3609    metadata
3610}
3611
3612#[cfg(unix)]
3613fn write_unix_command_script(command: &str, paths: &TaskPaths) -> Result<PathBuf, String> {
3614    let stem = paths
3615        .json
3616        .file_stem()
3617        .and_then(|stem| stem.to_str())
3618        .unwrap_or("wrapper");
3619    let script_path = paths.dir.join(format!("{stem}.sh"));
3620    fs::write(&script_path, command)
3621        .map_err(|e| format!("failed to write background bash command script: {e}"))?;
3622    Ok(script_path)
3623}
3624
3625#[cfg(unix)]
3626fn detached_shell_command(command_script: &Path, exit_path: &Path) -> Command {
3627    let shell = resolve_posix_shell();
3628    let mut cmd = Command::new(&shell);
3629    // Keep the user-provided command body out of argv and shell `-c` parsing.
3630    // The direct child is still a tiny wrapper so it can write the authoritative
3631    // exit marker after the command script exits (including if that script calls
3632    // `exit`). Passing only file paths through `-c` avoids newline/quote/length
3633    // edge cases where a multi-command script can be mangled before execution.
3634    cmd.arg("-c")
3635        .arg(r#""$0" "$1"; code=$?; printf "%s" "$code" > "$2.tmp.$$"; mv -f "$2.tmp.$$" "$2""#)
3636        .arg(&shell)
3637        .arg(command_script)
3638        .arg(exit_path);
3639    unsafe {
3640        cmd.pre_exec(|| {
3641            if libc::setsid() == -1 {
3642                return Err(std::io::Error::last_os_error());
3643            }
3644            Ok(())
3645        });
3646    }
3647    cmd
3648}
3649
3650#[cfg(unix)]
3651fn resolve_posix_shell() -> PathBuf {
3652    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3653    POSIX_SHELL
3654        .get_or_init(|| {
3655            std::env::var_os("BASH")
3656                .filter(|value| !value.is_empty())
3657                .map(PathBuf::from)
3658                .filter(|path| path.exists())
3659                .or_else(|| which::which("bash").ok())
3660                .or_else(|| which::which("zsh").ok())
3661                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3662        })
3663        .clone()
3664}
3665
3666#[cfg(windows)]
3667fn detached_shell_command_for(
3668    shell: crate::windows_shell::WindowsShell,
3669    command: &str,
3670    exit_path: &Path,
3671    paths: &TaskPaths,
3672    creation_flags: u32,
3673) -> Result<Command, String> {
3674    use crate::windows_shell::WindowsShell;
3675    // Write the wrapper to a temp file alongside the other task files,
3676    // then invoke the shell with the file path as a single clean
3677    // argument. This sidesteps the entire Windows command-line quoting
3678    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
3679    // parser all interacting with embedded quotes in the wrapper).
3680    //
3681    // Path arguments don't need quoting in the same problematic way
3682    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
3683    // contains no characters that need shell escaping; (2) the wrapper
3684    // body's internal quotes never reach the shell command line — the
3685    // shell reads them from disk by file syntax rules, not command-line
3686    // parser rules.
3687    let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3688    let wrapper_ext = match shell {
3689        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3690        WindowsShell::Cmd => "bat",
3691        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
3692        // so the file extension is purely cosmetic; `.sh` matches what an
3693        // operator would expect when grepping the spill directory.
3694        WindowsShell::Posix(_) => "sh",
3695    };
3696    let wrapper_path = paths.dir.join(format!(
3697        "{}.{}",
3698        paths
3699            .json
3700            .file_stem()
3701            .and_then(|s| s.to_str())
3702            .unwrap_or("wrapper"),
3703        wrapper_ext
3704    ));
3705    fs::write(&wrapper_path, wrapper_body)
3706        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3707
3708    let mut cmd = Command::new(shell.binary().as_ref());
3709    match shell {
3710        WindowsShell::Pwsh | WindowsShell::Powershell => {
3711            // -File runs the script with no quoting issues. `-NoLogo`,
3712            // `-NoProfile`, etc. apply to the host before the file runs.
3713            cmd.args([
3714                "-NoLogo",
3715                "-NoProfile",
3716                "-NonInteractive",
3717                "-ExecutionPolicy",
3718                "Bypass",
3719                "-File",
3720            ]);
3721            cmd.arg(&wrapper_path);
3722        }
3723        WindowsShell::Cmd => {
3724            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
3725            // file via /C is well-defined; the file's contents are
3726            // read line-by-line by cmd's batch processor, NOT
3727            // re-interpreted by the /C parser. This avoids the
3728            // "filename syntax incorrect" errors that came from
3729            // having complex compound commands on the cmd line.
3730            cmd.args(["/D", "/C"]);
3731            cmd.arg(&wrapper_path);
3732        }
3733        WindowsShell::Posix(_) => {
3734            // git-bash and other POSIX shells run the wrapper script with
3735            // `<binary> <wrapper-path>` (the wrapper is just a shell
3736            // script). No special flags needed — the `trap` and atomic
3737            // exit-marker rename in `wrapper_script` are POSIX-standard.
3738            cmd.arg(&wrapper_path);
3739        }
3740    }
3741
3742    // Win32 process creation flags. Caller selects whether to include
3743    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
3744    // for the breakaway-fallback strategy.
3745    cmd.creation_flags(creation_flags);
3746    Ok(cmd)
3747}
3748
3749/// Spawn a detached background bash child process.
3750///
3751/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
3752/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
3753/// cmd.exe) and retries with the next candidate when the previous one
3754/// fails to spawn with `NotFound` — the same runtime safety net the
3755/// foreground bash path has, so issue #27 callers landing on cmd.exe
3756/// fallback can also use background bash. The wrapper script is
3757/// regenerated per attempt because PowerShell wrappers embed the shell
3758/// binary by name; the stdout/stderr capture handles are also reopened
3759/// per attempt because `Command::spawn()` consumes them.
3760///
3761/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
3762/// return immediately without retry — they indicate a problem with the
3763/// resolved shell that retrying with a different shell won't fix.
3764fn spawn_detached_child(
3765    command: &str,
3766    paths: &TaskPaths,
3767    workdir: &Path,
3768    env: &HashMap<String, String>,
3769) -> Result<std::process::Child, String> {
3770    #[cfg(not(windows))]
3771    {
3772        let stdout = create_capture_file(&paths.stdout)
3773            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3774        let stderr = create_capture_file(&paths.stderr)
3775            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3776        let command_script = write_unix_command_script(command, paths)?;
3777        detached_shell_command(&command_script, &paths.exit)
3778            .current_dir(workdir)
3779            .envs(env)
3780            .stdin(Stdio::null())
3781            .stdout(Stdio::from(stdout))
3782            .stderr(Stdio::from(stderr))
3783            .spawn()
3784            .map_err(|e| format!("failed to spawn background bash command: {e}"))
3785    }
3786    #[cfg(windows)]
3787    {
3788        use crate::windows_shell::shell_candidates;
3789        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
3790        // legacy foreground bash spawn path. v0.20 routes ALL bash through
3791        // this background spawn helper, including foreground tool calls
3792        // where the model writes PowerShell-syntax (`$var = ...`,
3793        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
3794        // The earlier v0.18-era cmd-first override worked around a
3795        // PowerShell detached-output bug; that bug is fixed at the
3796        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
3797        // see flag block below), so we no longer need to misroute PS
3798        // commands through cmd.
3799        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3800        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
3801        // first (so the bg child outlives the AFT process when AFT is killed),
3802        // then fall back without it for environments where the parent is in a
3803        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
3804        // runners (GitHub Actions windows-2022) and some MDM-managed corp
3805        // environments hit this — `CreateProcess` returns Access Denied (5).
3806        // Without breakaway, the child still runs detached but will be torn
3807        // down with the parent if the parent process group is signaled.
3808        //
3809        // We use CREATE_NO_WINDOW (no visible console window, but the
3810        // child still has a hidden console) rather than DETACHED_PROCESS
3811        // (no console at all). PowerShell-based wrappers that perform
3812        // file I/O via [System.IO.File] need a console handle to flush
3813        // stdout/stderr correctly even when redirected — under
3814        // DETACHED_PROCESS, pwsh sometimes silently exits before
3815        // executing later script statements (the Move-Item that writes
3816        // the exit marker never runs), leaving the bg task forever
3817        // marked Failed: process exited without exit marker. cmd.exe
3818        // wrappers tolerate DETACHED_PROCESS, but switching to
3819        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
3820        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3821        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3822        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3823        let with_breakaway =
3824            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3825        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3826        let mut last_error: Option<String> = None;
3827        for (idx, shell) in candidates.iter().enumerate() {
3828            // Per-shell, try with breakaway first. If the process is in a
3829            // restrictive job, the breakaway flag triggers Access Denied
3830            // (os error 5). Retry once without breakaway.
3831            for &flags in &[with_breakaway, without_breakaway] {
3832                // Re-open capture handles per attempt; spawn() consumes them.
3833                let stdout = create_capture_file(&paths.stdout)
3834                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3835                let stderr = create_capture_file(&paths.stderr)
3836                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3837                let mut cmd =
3838                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3839                cmd.current_dir(workdir)
3840                    .envs(env)
3841                    .stdin(Stdio::null())
3842                    .stdout(Stdio::from(stdout))
3843                    .stderr(Stdio::from(stderr));
3844                match cmd.spawn() {
3845                    Ok(child) => {
3846                        if idx > 0 {
3847                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3848                             the cached PATH probe disagreed with runtime spawn — likely PATH \
3849                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3850                            shell.binary(),
3851                            idx);
3852                        }
3853                        if flags == without_breakaway {
3854                            crate::slog_warn!(
3855                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3856                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
3857                             Spawned without breakaway; the bg task will be torn down if the \
3858                             AFT process group is killed."
3859                            );
3860                        }
3861                        return Ok(child);
3862                    }
3863                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3864                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3865                        shell.binary());
3866                        last_error = Some(format!("{}: {e}", shell.binary()));
3867                        // Skip the without-breakaway retry for NotFound — the
3868                        // binary itself is missing, breakaway flag is irrelevant.
3869                        break;
3870                    }
3871                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3872                        // Access Denied during breakaway — retry without it.
3873                        crate::slog_warn!(
3874                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3875                         Access Denied — retrying {} without breakaway",
3876                            shell.binary()
3877                        );
3878                        last_error = Some(format!("{}: {e}", shell.binary()));
3879                        continue;
3880                    }
3881                    Err(e) => {
3882                        return Err(format!(
3883                            "failed to spawn background bash command via {}: {e}",
3884                            shell.binary()
3885                        ));
3886                    }
3887                }
3888            }
3889        }
3890        Err(format!(
3891            "failed to spawn background bash command: no Windows shell could be spawned. \
3892             Last error: {}. PATH-probed candidates: {:?}",
3893            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3894            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3895        ))
3896    }
3897}
3898
3899fn random_slug() -> String {
3900    let mut bytes = [0u8; 4];
3901    // getrandom is a transitive dependency; use it directly for OS entropy.
3902    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3903        // Extremely unlikely fallback: time + pid mix.
3904        let t = SystemTime::now()
3905            .duration_since(UNIX_EPOCH)
3906            .map(|d| d.subsec_nanos())
3907            .unwrap_or(0);
3908        let p = std::process::id();
3909        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3910    });
3911    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
3912    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3913    format!("bash-{hex}")
3914}
3915
3916#[cfg(test)]
3917mod tests {
3918    use std::collections::HashMap;
3919    use std::fs;
3920    use std::sync::atomic::{AtomicBool, AtomicUsize};
3921    use std::sync::{Arc, Mutex};
3922    use std::time::Duration;
3923    #[cfg(windows)]
3924    use std::time::Instant;
3925
3926    use super::*;
3927
3928    #[cfg(unix)]
3929    const QUICK_SUCCESS_COMMAND: &str = "true";
3930    #[cfg(windows)]
3931    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3932
3933    #[cfg(unix)]
3934    const LONG_RUNNING_COMMAND: &str = "sleep 5";
3935    #[cfg(windows)]
3936    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3937
3938    fn insert_terminal_piped_task(
3939        registry: &BgTaskRegistry,
3940        dir: &tempfile::TempDir,
3941        command: &str,
3942        stdout: &str,
3943        stderr: &str,
3944        compressed: bool,
3945    ) -> (String, Arc<BgTask>) {
3946        let task_id = format!("bash-test-{}", random_slug());
3947        let paths = task_paths(dir.path(), "session", &task_id);
3948        fs::create_dir_all(&paths.dir).unwrap();
3949        fs::write(&paths.stdout, stdout).unwrap();
3950        fs::write(&paths.stderr, stderr).unwrap();
3951        let mut metadata = PersistedTask::starting(
3952            task_id.clone(),
3953            "session".to_string(),
3954            command.to_string(),
3955            dir.path().to_path_buf(),
3956            Some(dir.path().to_path_buf()),
3957            Some(30_000),
3958            true,
3959            compressed,
3960        );
3961        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3962        write_task(&paths.json, &metadata).unwrap();
3963        registry
3964            .insert_rehydrated_task(metadata, paths, true)
3965            .expect("insert terminal task");
3966        let task = registry.task_for_session(&task_id, "session").unwrap();
3967        (task_id, task)
3968    }
3969
3970    fn insert_terminal_pty_task(
3971        registry: &BgTaskRegistry,
3972        dir: &tempfile::TempDir,
3973        pty_output: &str,
3974    ) -> (String, Arc<BgTask>) {
3975        let task_id = format!("bash-test-{}", random_slug());
3976        let paths = task_paths(dir.path(), "session", &task_id);
3977        fs::create_dir_all(&paths.dir).unwrap();
3978        fs::write(&paths.pty, pty_output).unwrap();
3979        let mut metadata = PersistedTask::starting(
3980            task_id.clone(),
3981            "session".to_string(),
3982            "python".to_string(),
3983            dir.path().to_path_buf(),
3984            Some(dir.path().to_path_buf()),
3985            Some(30_000),
3986            true,
3987            true,
3988        );
3989        metadata.mode = BgMode::Pty;
3990        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3991        write_task(&paths.json, &metadata).unwrap();
3992        registry
3993            .insert_rehydrated_task(metadata, paths, true)
3994            .expect("insert terminal pty task");
3995        let task = registry.task_for_session(&task_id, "session").unwrap();
3996        (task_id, task)
3997    }
3998
3999    #[cfg(unix)]
4000    fn wait_for_terminal_snapshot(
4001        registry: &BgTaskRegistry,
4002        task_id: &str,
4003        session_id: &str,
4004        project: &Path,
4005        storage: &Path,
4006    ) -> BgTaskSnapshot {
4007        let started = Instant::now();
4008        loop {
4009            let snapshot = registry
4010                .status(task_id, session_id, Some(project), Some(storage), 4096)
4011                .expect("spawned task should be visible to status");
4012            if snapshot.info.status.is_terminal() {
4013                return snapshot;
4014            }
4015            assert!(
4016                started.elapsed() < Duration::from_secs(10),
4017                "timed out waiting for task {task_id} to finish; last status={:?}",
4018                snapshot.info.status
4019            );
4020            std::thread::sleep(Duration::from_millis(50));
4021        }
4022    }
4023
4024    #[cfg(unix)]
4025    #[test]
4026    fn multiline_pipeline_stdout_persists_all_lines_after_terminal_status() {
4027        let cases = [
4028            (
4029                "long-first",
4030                "sleep 0.5; printf 'one\\n' | cat\nprintf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4031                vec!["one", "1", "three"],
4032            ),
4033            (
4034                "short-first",
4035                "printf 'one\\n' | cat\nsleep 0.2; printf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4036                vec!["one", "1", "three"],
4037            ),
4038            (
4039                "failing-middle",
4040                "sleep 0.2; printf 'one\\n' | cat\nfalse; printf 'after-false\\n' | cat\nprintf 'three\\n' | cat",
4041                vec!["one", "after-false", "three"],
4042            ),
4043        ];
4044
4045        for (name, command, expected_lines) in cases {
4046            let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4047            let dir = tempfile::tempdir().unwrap();
4048            let session_id = format!("session-{name}");
4049            let task_id = registry
4050                .spawn(
4051                    command,
4052                    session_id.clone(),
4053                    dir.path().to_path_buf(),
4054                    HashMap::new(),
4055                    Some(Duration::from_secs(30)),
4056                    dir.path().to_path_buf(),
4057                    10,
4058                    true,
4059                    true,
4060                    Some(dir.path().to_path_buf()),
4061                )
4062                .unwrap();
4063
4064            let snapshot = wait_for_terminal_snapshot(
4065                &registry,
4066                &task_id,
4067                &session_id,
4068                dir.path(),
4069                dir.path(),
4070            );
4071            assert_eq!(
4072                snapshot.info.status,
4073                BgTaskStatus::Completed,
4074                "{name}: task should complete; snapshot={snapshot:?}"
4075            );
4076            assert_eq!(
4077                snapshot.exit_code,
4078                Some(0),
4079                "{name}: script should use the final command's exit code"
4080            );
4081
4082            let stdout_path = task_paths(dir.path(), &session_id, &task_id).stdout;
4083            let stdout = fs::read_to_string(&stdout_path).unwrap_or_else(|error| {
4084                panic!(
4085                    "{name}: failed to read raw stdout file {}: {error}",
4086                    stdout_path.display()
4087                )
4088            });
4089            let lines: Vec<&str> = stdout.lines().collect();
4090            assert_eq!(
4091                lines,
4092                expected_lines,
4093                "{name}: raw stdout file must include every newline-separated command's output; stdout_path={}",
4094                stdout_path.display()
4095            );
4096        }
4097    }
4098
4099    #[test]
4100    fn recognizes_all_recovery_marker_forms() {
4101        assert!(is_recovery_marker(
4102            "[truncated output; full output: read \"/tmp/out\"]"
4103        ));
4104        assert!(is_recovery_marker(
4105            "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
4106        ));
4107        assert!(is_recovery_marker(
4108            "[truncated output; full output unavailable]"
4109        ));
4110    }
4111
4112    #[test]
4113    fn terminal_status_polls_use_cached_render_once_and_off_lock() {
4114        let registry = BgTaskRegistry::default();
4115        let dir = tempfile::tempdir().unwrap();
4116        let (_task_id, task) = insert_terminal_piped_task(
4117            &registry,
4118            &dir,
4119            "custom-tool --verbose",
4120            &"stdout line\n".repeat(200_000),
4121            "",
4122            true,
4123        );
4124        let calls = Arc::new(AtomicUsize::new(0));
4125        let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4126        let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4127        let calls_for_closure = Arc::clone(&calls);
4128        let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4129        let task_for_closure = Arc::clone(&task_holder);
4130        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4131            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4132            if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4133                if task.state.try_lock().is_ok() {
4134                    unlocked_for_closure.store(true, Ordering::SeqCst);
4135                }
4136            }
4137            CompressionResult::new(format!("compressed {} bytes", output.len()))
4138        });
4139
4140        let first = registry
4141            .status(
4142                &task.task_id,
4143                "session",
4144                None,
4145                Some(dir.path()),
4146                RUNNING_OUTPUT_PREVIEW_BYTES,
4147            )
4148            .unwrap();
4149        let second = registry
4150            .status(
4151                &task.task_id,
4152                "session",
4153                None,
4154                Some(dir.path()),
4155                RUNNING_OUTPUT_PREVIEW_BYTES,
4156            )
4157            .unwrap();
4158        let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4159
4160        assert_eq!(
4161            calls.load(Ordering::SeqCst),
4162            1,
4163            "terminal render must be cached"
4164        );
4165        assert!(
4166            saw_unlocked_state.load(Ordering::SeqCst),
4167            "compressor must run after releasing the task state lock"
4168        );
4169        assert!(first.output_preview.starts_with("compressed "));
4170        assert_eq!(second.output_preview, first.output_preview);
4171        assert_eq!(listed[0].output_preview, first.output_preview);
4172    }
4173
4174    #[test]
4175    fn completion_preview_success_keeps_tail_only() {
4176        // Exit-aware completion previews: a SUCCESSFUL task's reminder keeps a
4177        // short tail only — head context is noise when the command worked
4178        // (regression: the uniform 4 KiB head+tail cap flooded reminders with
4179        // ~1K tokens of build noise per completed task).
4180        let registry = BgTaskRegistry::default();
4181        let dir = tempfile::tempdir().unwrap();
4182        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4183        let (_task_id, task) =
4184            insert_terminal_piped_task(&registry, &dir, "cat big.log", &output, "", false);
4185
4186        registry.post_terminal_transition(&task, true).unwrap();
4187        let completions = registry.drain_completions_for_session(Some("session"));
4188        assert_eq!(completions.len(), 1);
4189        let preview = &completions[0].output_preview;
4190        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4191        assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4192        assert!(completions[0].output_truncated);
4193    }
4194
4195    #[test]
4196    fn completion_preview_failure_keeps_head_and_tail() {
4197        // A FAILED task keeps a small head (first error / command banner) plus
4198        // a larger tail (tracebacks and summaries land at the end).
4199        let registry = BgTaskRegistry::default();
4200        let dir = tempfile::tempdir().unwrap();
4201        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4202        let task_id = format!("bash-test-{}", random_slug());
4203        let paths = task_paths(dir.path(), "session", &task_id);
4204        fs::create_dir_all(&paths.dir).unwrap();
4205        fs::write(&paths.stdout, &output).unwrap();
4206        fs::write(&paths.stderr, "").unwrap();
4207        let mut metadata = PersistedTask::starting(
4208            task_id.clone(),
4209            "session".to_string(),
4210            "cat big.log".to_string(),
4211            dir.path().to_path_buf(),
4212            Some(dir.path().to_path_buf()),
4213            Some(30_000),
4214            true,
4215            false,
4216        );
4217        metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4218        write_task(&paths.json, &metadata).unwrap();
4219        registry
4220            .insert_rehydrated_task(metadata, paths, true)
4221            .expect("insert terminal task");
4222        let task = registry.task_for_session(&task_id, "session").unwrap();
4223
4224        registry.post_terminal_transition(&task, true).unwrap();
4225        let completions = registry.drain_completions_for_session(Some("session"));
4226        assert_eq!(completions.len(), 1);
4227        let preview = &completions[0].output_preview;
4228        assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4229        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4230    }
4231
4232    #[test]
4233    fn has_completions_for_session_matches_pending_delivery() {
4234        let registry = BgTaskRegistry::default();
4235        assert!(!registry.has_completions_for_session(Some("session")));
4236        assert!(!registry.has_completions_for_session(None));
4237
4238        let dir = tempfile::tempdir().unwrap();
4239        let (_task_id, task) =
4240            insert_terminal_piped_task(&registry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4241        registry.post_terminal_transition(&task, true).unwrap();
4242
4243        assert!(registry.has_completions_for_session(Some("session")));
4244        assert!(registry.has_completions_for_session(None));
4245        assert!(!registry.has_completions_for_session(Some("other-session")));
4246
4247        let completions = registry.drain_completions_for_session(Some("session"));
4248        assert_eq!(completions.len(), 1);
4249        assert_eq!(completions[0].task_id, task.task_id);
4250    }
4251
4252    #[test]
4253    fn structured_gh_json_survives_intact_and_ignores_stderr() {
4254        let registry = BgTaskRegistry::default();
4255        let dir = tempfile::tempdir().unwrap();
4256        let calls = Arc::new(AtomicUsize::new(0));
4257        let calls_for_closure = Arc::clone(&calls);
4258        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4259            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4260            CompressionResult::new(output)
4261        });
4262        let (task_id, _task) = insert_terminal_piped_task(
4263            &registry,
4264            &dir,
4265            "gh pr view 123 --json body",
4266            "{\"body\":\"hello\"}",
4267            "warning: stderr must not join json",
4268            true,
4269        );
4270
4271        let snapshot = registry
4272            .status(
4273                &task_id,
4274                "session",
4275                None,
4276                Some(dir.path()),
4277                RUNNING_OUTPUT_PREVIEW_BYTES,
4278            )
4279            .unwrap();
4280
4281        assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4282        assert!(!snapshot.output_preview.contains("warning"));
4283        assert!(!snapshot.output_truncated);
4284        assert_eq!(
4285            calls.load(Ordering::SeqCst),
4286            0,
4287            "structured JSON bypasses compression"
4288        );
4289    }
4290
4291    #[test]
4292    fn registry_emits_single_recovery_marker_for_class_drops() {
4293        let registry = BgTaskRegistry::default();
4294        let dir = tempfile::tempdir().unwrap();
4295        registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4296            let mut dropped = BTreeMap::new();
4297            dropped.insert(DropClass::Error, 18);
4298            dropped.insert(DropClass::Warning, 6);
4299            CompressionResult::with_class_drops("kept diagnostic", dropped)
4300        });
4301        let (task_id, task) =
4302            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4303
4304        let snapshot = registry
4305            .status(
4306                &task_id,
4307                "session",
4308                None,
4309                Some(dir.path()),
4310                RUNNING_OUTPUT_PREVIEW_BYTES,
4311            )
4312            .unwrap();
4313
4314        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4315        assert!(snapshot.output_preview.contains("+18 more errors"));
4316        assert!(snapshot.output_preview.contains("+6 more warnings"));
4317        assert!(snapshot
4318            .output_preview
4319            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4320        assert!(!snapshot.output_preview.contains("tail -n +"));
4321        assert!(snapshot.output_truncated);
4322    }
4323
4324    #[test]
4325    fn registry_marker_reports_semantic_and_byte_drops_once() {
4326        let registry = BgTaskRegistry::default();
4327        let dir = tempfile::tempdir().unwrap();
4328        registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4329            let mut dropped = BTreeMap::new();
4330            dropped.insert(DropClass::Error, 1);
4331            CompressionResult::with_class_drops(
4332                format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4333                dropped,
4334            )
4335        });
4336        let (task_id, _task) =
4337            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4338
4339        let snapshot = registry
4340            .status(
4341                &task_id,
4342                "session",
4343                None,
4344                Some(dir.path()),
4345                RUNNING_OUTPUT_PREVIEW_BYTES,
4346            )
4347            .unwrap();
4348
4349        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4350        assert!(snapshot.output_preview.contains("+1 more error"));
4351        assert!(snapshot.output_preview.contains("truncated output"));
4352        assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4353        assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4354        assert!(!snapshot.output_preview.contains("...<truncated"));
4355        assert!(snapshot.output_truncated);
4356    }
4357
4358    #[test]
4359    fn cargo_stderr_class_drops_name_both_capture_paths() {
4360        let registry = BgTaskRegistry::default();
4361        let dir = tempfile::tempdir().unwrap();
4362        let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4363        registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4364            crate::compress::compress_with_registry_exit_code(
4365                command,
4366                &output,
4367                exit_code,
4368                &filter_registry,
4369            )
4370        });
4371        let stderr = (0..22)
4372            .map(|index| {
4373                format!(
4374                    "error: cargo failure {index}\n  --> src/lib.rs:{}:1\n   |\n{} | boom\n",
4375                    index + 1,
4376                    index + 1
4377                )
4378            })
4379            .collect::<Vec<_>>()
4380            .join("\n");
4381        let (task_id, task) = insert_terminal_piped_task(
4382            &registry,
4383            &dir,
4384            "cargo check",
4385            "Finished dev [unoptimized] target(s) in 0.01s\n",
4386            &stderr,
4387            true,
4388        );
4389
4390        let snapshot = registry
4391            .status(
4392                &task_id,
4393                "session",
4394                None,
4395                Some(dir.path()),
4396                RUNNING_OUTPUT_PREVIEW_BYTES,
4397            )
4398            .unwrap();
4399
4400        assert!(snapshot.output_preview.contains("+2 more errors"));
4401        assert!(snapshot
4402            .output_preview
4403            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4404        assert!(snapshot
4405            .output_preview
4406            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4407        assert!(!snapshot.output_preview.contains("tail -n +"));
4408    }
4409
4410    #[test]
4411    fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4412        let registry = BgTaskRegistry::default();
4413        let dir = tempfile::tempdir().unwrap();
4414        let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4415        let (task_id, task) = insert_terminal_piped_task(
4416            &registry,
4417            &dir,
4418            "cd /repo && gh pr view 123 --json body",
4419            &body,
4420            "",
4421            true,
4422        );
4423
4424        let snapshot = registry
4425            .status(
4426                &task_id,
4427                "session",
4428                None,
4429                Some(dir.path()),
4430                RUNNING_OUTPUT_PREVIEW_BYTES,
4431            )
4432            .unwrap();
4433
4434        assert!(snapshot.output_preview.starts_with("[JSON output "));
4435        assert!(snapshot
4436            .output_preview
4437            .contains(&task.paths.stdout.display().to_string()));
4438        assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4439        assert!(snapshot.output_truncated);
4440    }
4441
4442    #[test]
4443    fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4444        let registry = BgTaskRegistry::default();
4445        let dir = tempfile::tempdir().unwrap();
4446        let filter_registry = crate::compress::toml_filter::build_registry(
4447            crate::compress::builtin_filters::ALL,
4448            None,
4449            None,
4450        );
4451        registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4452            crate::compress::compress_with_registry_exit_code(
4453                command,
4454                &output,
4455                exit_code,
4456                &filter_registry,
4457            )
4458        });
4459        let stdout = format!(
4460            "make[1]: Entering directory `/tmp`\n{}",
4461            (0..100)
4462                .map(|index| format!("compile line {index}"))
4463                .collect::<Vec<_>>()
4464                .join("\n")
4465        );
4466        let (task_id, task) =
4467            insert_terminal_piped_task(&registry, &dir, "make all", &stdout, "", true);
4468
4469        let snapshot = registry
4470            .status(
4471                &task_id,
4472                "session",
4473                None,
4474                Some(dir.path()),
4475                RUNNING_OUTPUT_PREVIEW_BYTES,
4476            )
4477            .unwrap();
4478
4479        assert!(snapshot.output_preview.contains("compile line 99"));
4480        assert!(snapshot.output_preview.contains(&format!(
4481            "full output: read \"{}\"",
4482            task.paths.stdout.display()
4483        )));
4484        assert!(!snapshot
4485            .output_preview
4486            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4487        assert!(!snapshot.output_preview.contains("tail -n +"));
4488    }
4489
4490    #[test]
4491    fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4492        let registry = BgTaskRegistry::default();
4493        let dir = tempfile::tempdir().unwrap();
4494        let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4495        let (task_id, task) =
4496            insert_terminal_piped_task(&registry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4497
4498        let snapshot = registry
4499            .status(
4500                &task_id,
4501                "session",
4502                None,
4503                Some(dir.path()),
4504                RUNNING_OUTPUT_PREVIEW_BYTES,
4505            )
4506            .unwrap();
4507
4508        assert!(snapshot.output_preview.contains("RAW-HEAD"));
4509        assert!(snapshot.output_preview.contains("RAW-TAIL"));
4510        assert!(snapshot.output_preview.contains("truncated output"));
4511        assert!(snapshot
4512            .output_preview
4513            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4514        assert!(snapshot
4515            .output_preview
4516            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4517        assert!(!snapshot.output_preview.contains("tail -n +"));
4518        assert!(snapshot.output_preview.len() > 16 * 1024);
4519        assert!(snapshot.output_truncated);
4520    }
4521
4522    #[test]
4523    fn pty_terminal_snapshot_bypasses_line_compression() {
4524        let registry = BgTaskRegistry::default();
4525        let dir = tempfile::tempdir().unwrap();
4526        let calls = Arc::new(AtomicUsize::new(0));
4527        let calls_for_closure = Arc::clone(&calls);
4528        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4529            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4530            CompressionResult::new(output)
4531        });
4532        let (task_id, _task) = insert_terminal_pty_task(&registry, &dir, "raw\u{1b}[31m pty bytes");
4533
4534        let snapshot = registry
4535            .status(
4536                &task_id,
4537                "session",
4538                None,
4539                Some(dir.path()),
4540                RUNNING_OUTPUT_PREVIEW_BYTES,
4541            )
4542            .unwrap();
4543
4544        assert_eq!(snapshot.info.mode, BgMode::Pty);
4545        assert_eq!(snapshot.output_preview, "");
4546        assert_eq!(calls.load(Ordering::SeqCst), 0);
4547    }
4548
4549    #[test]
4550    fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4551        let registry = BgTaskRegistry::default();
4552        let dir = tempfile::tempdir().unwrap();
4553        let task_id = registry
4554            .spawn_pty(
4555                QUICK_SUCCESS_COMMAND,
4556                "session".to_string(),
4557                dir.path().to_path_buf(),
4558                HashMap::new(),
4559                Some(Duration::from_secs(30)),
4560                dir.path().to_path_buf(),
4561                10,
4562                true,
4563                false,
4564                Some(dir.path().to_path_buf()),
4565                50,
4566                120,
4567            )
4568            .unwrap();
4569
4570        let paths = task_paths(dir.path(), "session", &task_id);
4571        let metadata = read_task(&paths.json).unwrap();
4572        assert_eq!(
4573            metadata.schema_version,
4574            crate::bash_background::persistence::SCHEMA_VERSION
4575        );
4576        assert_eq!(metadata.mode, BgMode::Pty);
4577        assert_eq!(metadata.pty_rows, Some(50));
4578        assert_eq!(metadata.pty_cols, Some(120));
4579
4580        let snapshot = registry
4581            .status(&task_id, "session", None, Some(dir.path()), 1024)
4582            .unwrap();
4583        assert_eq!(snapshot.pty_rows, Some(50));
4584        assert_eq!(snapshot.pty_cols, Some(120));
4585    }
4586
4587    /// Spawn a child process that exits immediately and return it after
4588    /// it has terminated. Used by reap_child tests to simulate the
4589    /// "child exists and is dead" state when the watchdog has already
4590    /// nulled out the original child handle.
4591    fn spawn_dead_child() -> std::process::Child {
4592        #[cfg(unix)]
4593        let mut cmd = std::process::Command::new("true");
4594        #[cfg(windows)]
4595        let mut cmd = {
4596            let mut c = std::process::Command::new("cmd");
4597            c.args(["/c", "exit", "0"]);
4598            c
4599        };
4600        cmd.stdin(std::process::Stdio::null());
4601        cmd.stdout(std::process::Stdio::null());
4602        cmd.stderr(std::process::Stdio::null());
4603        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4604        // Poll try_wait() until the child actually exits, instead of calling
4605        // wait() which closes the OS handle. On Windows, after wait()
4606        // closes the handle, subsequent try_wait() calls (which reap_child
4607        // depends on) return Err — the test was inadvertently giving
4608        // reap_child an unusable child handle. Polling try_wait() keeps the
4609        // handle open and observes natural exit, matching the production
4610        // shape where the watchdog discovers an exited child for the first
4611        // time.
4612        let started = Instant::now();
4613        loop {
4614            match child.try_wait() {
4615                Ok(Some(_)) => break,
4616                Ok(None) => {
4617                    if started.elapsed() > Duration::from_secs(5) {
4618                        panic!("dead-child stand-in did not exit within 5s");
4619                    }
4620                    std::thread::sleep(Duration::from_millis(10));
4621                }
4622                Err(error) => panic!("dead-child try_wait failed: {error}"),
4623            }
4624        }
4625        child
4626    }
4627
4628    #[test]
4629    fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4630        let registry = BgTaskRegistry::default();
4631        let dir = tempfile::tempdir().unwrap();
4632        let task_id = registry
4633            .spawn(
4634                LONG_RUNNING_COMMAND,
4635                "session".to_string(),
4636                dir.path().to_path_buf(),
4637                HashMap::new(),
4638                Some(Duration::from_secs(30)),
4639                dir.path().to_path_buf(),
4640                10,
4641                true,
4642                false,
4643                Some(dir.path().to_path_buf()),
4644            )
4645            .unwrap();
4646        registry
4647            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4648            .unwrap();
4649        assert_eq!(
4650            registry
4651                .drain_completions_for_session(Some("session"))
4652                .len(),
4653            1
4654        );
4655
4656        // Simulate the plugin consuming a sync bash_watch({ exit:true }) result
4657        // locally before the Rust completion queue is drained/acked.
4658        registry.inner.completions.lock().unwrap().clear();
4659
4660        assert_eq!(
4661            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4662            vec![task_id.clone()]
4663        );
4664        assert!(registry
4665            .drain_completions_for_session(Some("session"))
4666            .is_empty());
4667
4668        let paths = task_paths(dir.path(), "session", &task_id);
4669        let metadata = read_task(&paths.json).unwrap();
4670        assert!(metadata.completion_delivered);
4671
4672        let replayed = BgTaskRegistry::default();
4673        replayed
4674            .replay_session_inner(dir.path(), "session", None)
4675            .unwrap();
4676        assert!(replayed
4677            .drain_completions_for_session(Some("session"))
4678            .is_empty());
4679    }
4680
4681    #[test]
4682    fn register_watch_rejects_unknown_task() {
4683        let registry = BgTaskRegistry::default();
4684
4685        let result = registry.register_watch(
4686            "missing-task".to_string(),
4687            WatchPattern::Substring("READY".into()),
4688            true,
4689        );
4690
4691        assert_eq!(result, Err("task_not_found"));
4692    }
4693
4694    #[test]
4695    fn register_watch_on_terminal_task_scans_existing_output() {
4696        let frames = Arc::new(Mutex::new(Vec::new()));
4697        let captured = Arc::clone(&frames);
4698        let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4699            captured.lock().unwrap().push(frame);
4700        })
4701            as Box<dyn Fn(PushFrame) + Send + Sync>);
4702        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4703        let dir = tempfile::tempdir().unwrap();
4704        let task_id = registry
4705            .spawn(
4706                LONG_RUNNING_COMMAND,
4707                "session".to_string(),
4708                dir.path().to_path_buf(),
4709                HashMap::new(),
4710                Some(Duration::from_secs(30)),
4711                dir.path().to_path_buf(),
4712                10,
4713                true,
4714                false,
4715                Some(dir.path().to_path_buf()),
4716            )
4717            .unwrap();
4718        registry
4719            .inner
4720            .shutdown
4721            .store(true, std::sync::atomic::Ordering::SeqCst);
4722        let task = registry.task_for_session(&task_id, "session").unwrap();
4723        std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4724        registry
4725            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4726            .unwrap();
4727        frames.lock().unwrap().clear();
4728        registry.inner.completions.lock().unwrap().clear();
4729
4730        registry
4731            .register_watch(
4732                task_id.clone(),
4733                WatchPattern::Substring("READY".into()),
4734                true,
4735            )
4736            .unwrap();
4737
4738        let frames = frames.lock().unwrap();
4739        let frame = frames
4740            .iter()
4741            .find_map(|frame| match frame {
4742                PushFrame::BashPatternMatch(frame) => Some(frame),
4743                _ => None,
4744            })
4745            .expect("terminal watch registration should emit pattern frame");
4746        assert_eq!(frame.reason, "pattern_match");
4747        assert_eq!(frame.task_id, task_id);
4748        assert_eq!(frame.session_id, "session");
4749        assert_eq!(frame.match_text, "READY");
4750        assert_eq!(frame.match_offset, 0);
4751        assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4752        let metadata = read_task(&task.paths.json).unwrap();
4753        assert!(metadata.completion_delivered);
4754    }
4755
4756    #[test]
4757    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4758        let registry = BgTaskRegistry::default();
4759        let dir = tempfile::tempdir().unwrap();
4760        let task_id = registry
4761            .spawn(
4762                QUICK_SUCCESS_COMMAND,
4763                "session".to_string(),
4764                dir.path().to_path_buf(),
4765                HashMap::new(),
4766                Some(Duration::from_secs(30)),
4767                dir.path().to_path_buf(),
4768                10,
4769                true,
4770                false,
4771                Some(dir.path().to_path_buf()),
4772            )
4773            .unwrap();
4774        registry
4775            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4776            .unwrap();
4777        let completions = registry.drain_completions_for_session(Some("session"));
4778        assert_eq!(completions.len(), 1);
4779        assert_eq!(
4780            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4781            vec![task_id.clone()]
4782        );
4783
4784        registry.cleanup_finished(Duration::ZERO);
4785
4786        assert!(registry.inner.tasks.lock().unwrap().is_empty());
4787    }
4788
4789    #[test]
4790    fn cleanup_finished_retains_undelivered_terminals() {
4791        let registry = BgTaskRegistry::default();
4792        let dir = tempfile::tempdir().unwrap();
4793        let task_id = registry
4794            .spawn(
4795                QUICK_SUCCESS_COMMAND,
4796                "session".to_string(),
4797                dir.path().to_path_buf(),
4798                HashMap::new(),
4799                Some(Duration::from_secs(30)),
4800                dir.path().to_path_buf(),
4801                10,
4802                true,
4803                false,
4804                Some(dir.path().to_path_buf()),
4805            )
4806            .unwrap();
4807        registry
4808            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4809            .unwrap();
4810
4811        registry.cleanup_finished(Duration::ZERO);
4812
4813        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4814    }
4815
4816    /// Verify that the live watchdog path (reap_child) gives an exited
4817    /// child one watchdog pass for its exit marker to land, then marks the
4818    /// task Failed if the next pass still sees no marker.
4819    ///
4820    /// Cross-platform: uses a quick-exiting command that does NOT go
4821    /// through the wrapper script (we manually clear the exit marker
4822    /// after spawn to simulate the wrapper crashing before write).
4823    #[test]
4824    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4825        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4826        let dir = tempfile::tempdir().unwrap();
4827        let task_id = registry
4828            .spawn(
4829                QUICK_SUCCESS_COMMAND,
4830                "session".to_string(),
4831                dir.path().to_path_buf(),
4832                HashMap::new(),
4833                Some(Duration::from_secs(30)),
4834                dir.path().to_path_buf(),
4835                10,
4836                true,
4837                false,
4838                Some(dir.path().to_path_buf()),
4839            )
4840            .unwrap();
4841
4842        let task = registry.task_for_session(&task_id, "session").unwrap();
4843
4844        // Wait for the child to actually exit and the wrapper to either
4845        // write the marker or fail. Then nuke the marker to simulate
4846        // wrapper crash before write. Poll up to 5s; this is plenty for a
4847        // `true`/`cmd /c exit 0` invocation.
4848        let started = Instant::now();
4849        loop {
4850            let exited = {
4851                let mut state = task.state.lock().unwrap();
4852                match &mut state.runtime {
4853                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4854                    _ => true,
4855                }
4856            };
4857            if exited {
4858                break;
4859            }
4860            assert!(
4861                started.elapsed() < Duration::from_secs(5),
4862                "child should exit quickly"
4863            );
4864            std::thread::sleep(Duration::from_millis(20));
4865        }
4866
4867        // Stop the watchdog so it doesn't race with our manual reap_child.
4868        // On fast Windows runners the watchdog ticks (every 500ms) can
4869        // observe the child exit and reap it before this test's assertion
4870        // fires, leaving us with state.child = None and an already-terminal
4871        // status. We specifically want to test reap_child's logic when
4872        // invoked manually on a Running-but-actually-dead task, so we need
4873        // exclusive control over the reap path here.
4874        registry
4875            .inner
4876            .shutdown
4877            .store(true, std::sync::atomic::Ordering::SeqCst);
4878        // Give the watchdog at most one tick (500ms) to notice shutdown
4879        // before we touch task state. Without this, an in-flight watchdog
4880        // iteration could still race with our state setup below.
4881        std::thread::sleep(Duration::from_millis(550));
4882
4883        // Wrapper likely wrote the marker by now; remove it to simulate
4884        // a wrapper crash that exited before persisting the exit code.
4885        let _ = std::fs::remove_file(&task.paths.exit);
4886
4887        // The watchdog may have already reaped the child handle and
4888        // marked the task terminal before we got here. Reset both so
4889        // reap_child has the "Running task whose child just exited"
4890        // shape it's designed to handle. If the original child handle is
4891        // gone, install a quick-exited stand-in so the first reap exercises
4892        // the same try_wait path as production.
4893        //
4894        // CRITICAL on Windows: the watchdog ticks fast enough that the
4895        // JSON on disk may already say `Completed`. `update_task` (called
4896        // by `reap_child`) reads from disk, applies the closure, but
4897        // ROLLS BACK if the original on-disk state was already terminal
4898        // (see persistence.rs::update_task). So we must reset BOTH
4899        // in-memory metadata AND the JSON on disk to a Running state to
4900        // give reap_child the fresh shape it expects to operate on.
4901        {
4902            let mut state = task.state.lock().unwrap();
4903            state.metadata.status = BgTaskStatus::Running;
4904            state.metadata.status_reason = None;
4905            state.metadata.exit_code = None;
4906            state.metadata.finished_at = None;
4907            state.metadata.duration_ms = None;
4908            // Persist the reset state to disk so update_task's terminal
4909            // rollback guard sees a non-terminal starting point.
4910            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4911                .expect("persist reset Running metadata for reap_child test");
4912            // If the watchdog already nulled state.child, we need to
4913            // simulate "child exists and is dead" so reap_child's
4914            // try_wait path runs. Spawn a quick-exit child as a stand-in.
4915            if matches!(state.runtime, TaskRuntime::Piped(None)) {
4916                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4917            }
4918        }
4919        // Clear the terminal_at marker too so mark_terminal_now() can fire
4920        // again inside reap_child.
4921        *task.terminal_at.lock().unwrap() = None;
4922
4923        // Sanity: task is still Running per metadata (replay/poll hasn't
4924        // observed the missing marker yet).
4925        assert!(
4926            task.is_running(),
4927            "precondition: metadata.status == Running"
4928        );
4929        assert!(
4930            !task.paths.exit.exists(),
4931            "precondition: exit marker absent"
4932        );
4933
4934        // First watchdog observation is intentionally insufficient to
4935        // declare failure. A missing marker may just mean the wrapper is
4936        // still completing its tmp-file-to-marker rename, so reap_child only
4937        // drops the child handle and switches to detached PID monitoring.
4938        registry.reap_child(&task);
4939
4940        {
4941            let state = task.state.lock().unwrap();
4942            assert_eq!(
4943                state.metadata.status,
4944                BgTaskStatus::Running,
4945                "first reap must leave status Running while waiting one pass for marker"
4946            );
4947            assert_eq!(
4948                state.metadata.status_reason, None,
4949                "first reap must not record a failure reason"
4950            );
4951            assert!(
4952                matches!(state.runtime, TaskRuntime::Piped(None)),
4953                "child handle must be released after first reap"
4954            );
4955            assert!(
4956                state.detached,
4957                "task must be marked detached after first reap"
4958            );
4959        }
4960
4961        // Second watchdog observation sees the detached PID is dead and the
4962        // marker is still absent. That is strong enough evidence that the
4963        // wrapper exited without persisting an exit code.
4964        registry.reap_child(&task);
4965
4966        let state = task.state.lock().unwrap();
4967        assert!(
4968            state.metadata.status.is_terminal(),
4969            "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
4970            state.metadata.status
4971        );
4972        assert_eq!(
4973            state.metadata.status,
4974            BgTaskStatus::Failed,
4975            "must specifically be Failed (not Killed): status={:?}",
4976            state.metadata.status
4977        );
4978        assert_eq!(
4979            state.metadata.status_reason.as_deref(),
4980            Some("process exited without exit marker"),
4981            "reason must match replay path's wording: {:?}",
4982            state.metadata.status_reason
4983        );
4984        assert!(
4985            matches!(state.runtime, TaskRuntime::Piped(None)),
4986            "child handle must stay released after second reap"
4987        );
4988        assert!(
4989            state.detached,
4990            "task must remain detached after second reap"
4991        );
4992    }
4993
4994    /// Companion to the above: when the exit marker DOES exist on disk
4995    /// at reap_child time, reap_child must NOT mark the task Failed.
4996    /// Instead it leaves status=Running and lets the next poll_task()
4997    /// cycle finalize via the marker.
4998    #[test]
4999    fn reap_child_preserves_running_when_exit_marker_exists() {
5000        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5001        let dir = tempfile::tempdir().unwrap();
5002        let task_id = registry
5003            .spawn(
5004                QUICK_SUCCESS_COMMAND,
5005                "session".to_string(),
5006                dir.path().to_path_buf(),
5007                HashMap::new(),
5008                Some(Duration::from_secs(30)),
5009                dir.path().to_path_buf(),
5010                10,
5011                true,
5012                false,
5013                Some(dir.path().to_path_buf()),
5014            )
5015            .unwrap();
5016
5017        let task = registry.task_for_session(&task_id, "session").unwrap();
5018
5019        // Wait for child to exit AND for the marker to land. Both happen
5020        // shortly after the wrapper finishes — but we want both observed.
5021        let started = Instant::now();
5022        loop {
5023            let exited = {
5024                let mut state = task.state.lock().unwrap();
5025                match &mut state.runtime {
5026                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5027                    _ => true,
5028                }
5029            };
5030            if exited && task.paths.exit.exists() {
5031                break;
5032            }
5033            assert!(
5034                started.elapsed() < Duration::from_secs(5),
5035                "child should exit and write marker quickly"
5036            );
5037            std::thread::sleep(Duration::from_millis(20));
5038        }
5039
5040        // Stop the watchdog so it doesn't race with our manual reap_child.
5041        // On fast Windows runners the watchdog can call poll_task (which
5042        // finalizes via marker) before this test asserts the
5043        // "marker exists, status still Running" invariant. We want
5044        // exclusive control over the reap path.
5045        registry
5046            .inner
5047            .shutdown
5048            .store(true, std::sync::atomic::Ordering::SeqCst);
5049        std::thread::sleep(Duration::from_millis(550));
5050
5051        // If the watchdog already finalized the task before we stopped it,
5052        // restore the test setup: reset status to Running and ensure the
5053        // marker file is still on disk. We're testing reap_child's
5054        // behavior when called manually with both child-exited AND
5055        // marker-present, regardless of whether the watchdog beat us.
5056        {
5057            let mut state = task.state.lock().unwrap();
5058            state.metadata.status = BgTaskStatus::Running;
5059            state.metadata.status_reason = None;
5060            if matches!(state.runtime, TaskRuntime::Piped(None)) {
5061                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5062            }
5063        }
5064        *task.terminal_at.lock().unwrap() = None;
5065        // Make sure the marker is still on disk (poll_task removes it on
5066        // finalization). Recreate it if needed.
5067        if !task.paths.exit.exists() {
5068            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
5069        }
5070
5071        // reap_child sees: child exited, marker exists. It should:
5072        //  - drop state.child / set state.detached = true
5073        //  - NOT change status (poll_task will finalize via marker next tick)
5074        registry.reap_child(&task);
5075
5076        let state = task.state.lock().unwrap();
5077        assert!(
5078            matches!(state.runtime, TaskRuntime::Piped(None)),
5079            "child handle still released even when marker exists"
5080        );
5081        assert!(
5082            state.detached,
5083            "task still marked detached even when marker exists"
5084        );
5085        // Status remains Running because reap_child defers to poll_task
5086        // when a marker exists. It would be wrong for reap to record the
5087        // marker outcome (poll_task does that with proper exit-code
5088        // parsing).
5089        assert_eq!(
5090            state.metadata.status,
5091            BgTaskStatus::Running,
5092            "reap_child must defer to poll_task when marker exists"
5093        );
5094    }
5095
5096    /// Read a process's `ps` state string ("Z", "S", "R", etc). Returns
5097    /// `None` once the PID has been fully reaped (no row), which is the
5098    /// post-reap state we want.
5099    #[cfg(unix)]
5100    fn pid_stat(pid: u32) -> Option<String> {
5101        let output = std::process::Command::new("ps")
5102            .args(["-o", "stat=", "-p", &pid.to_string()])
5103            .output()
5104            .ok()?;
5105        if !output.status.success() {
5106            return None;
5107        }
5108        let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
5109        if stat.is_empty() {
5110            None
5111        } else {
5112            Some(stat)
5113        }
5114    }
5115
5116    /// A `<defunct>` zombie carries `ps` state starting with 'Z'.
5117    #[cfg(unix)]
5118    fn is_zombie(pid: u32) -> bool {
5119        pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5120    }
5121
5122    /// Spawn a child that exits immediately and wait — via `ps`, NOT
5123    /// `try_wait()`/`wait()` — until it is observably a `<defunct>` zombie,
5124    /// then return the still-unreaped handle. This reproduces the exact
5125    /// state issue #91 leaves behind: an exited OS child whose parent has
5126    /// not reaped it.
5127    #[cfg(unix)]
5128    fn spawn_unreaped_zombie() -> std::process::Child {
5129        let child = std::process::Command::new("true")
5130            .stdin(std::process::Stdio::null())
5131            .stdout(std::process::Stdio::null())
5132            .stderr(std::process::Stdio::null())
5133            .spawn()
5134            .expect("spawn zombie stand-in");
5135        let pid = child.id();
5136        let started = Instant::now();
5137        while !is_zombie(pid) {
5138            assert!(
5139                started.elapsed() < Duration::from_secs(5),
5140                "stand-in child should become a zombie within 5s"
5141            );
5142            std::thread::sleep(Duration::from_millis(10));
5143        }
5144        // Return WITHOUT reaping — the handle still owns an unwaited zombie.
5145        child
5146    }
5147
5148    /// Regression test for issue #91: the exit-marker terminal path
5149    /// (`poll_task` -> `finalize_from_marker`) must REAP the direct child
5150    /// handle, not merely drop it. Dropping a `std::process::Child` does not
5151    /// `wait()` on Unix, so the exited child lingers as a `[mv] <defunct>`
5152    /// zombie until AFT exits.
5153    ///
5154    /// We install a known-unreaped zombie into the task's child slot and
5155    /// drive the marker finalize path, then assert the child is gone (reaped)
5156    /// rather than still `<defunct>`.
5157    #[cfg(unix)]
5158    #[test]
5159    fn finalize_from_marker_reaps_child_no_zombie() {
5160        use std::sync::atomic::Ordering;
5161
5162        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5163        let dir = tempfile::tempdir().unwrap();
5164        let task_id = registry
5165            .spawn(
5166                QUICK_SUCCESS_COMMAND,
5167                "session".to_string(),
5168                dir.path().to_path_buf(),
5169                HashMap::new(),
5170                Some(Duration::from_secs(30)),
5171                dir.path().to_path_buf(),
5172                10,
5173                true,
5174                false,
5175                Some(dir.path().to_path_buf()),
5176            )
5177            .unwrap();
5178
5179        // Stop the watchdog so the ONLY terminal-transition path under test
5180        // is the exit-marker finalize (not reap_child's try_wait, which would
5181        // reap the child for us and mask the bug).
5182        registry.inner.shutdown.store(true, Ordering::SeqCst);
5183        std::thread::sleep(Duration::from_millis(550));
5184
5185        let task = registry.task_for_session(&task_id, "session").unwrap();
5186
5187        // Wait for the wrapper's exit marker to land. We deliberately do NOT
5188        // call try_wait()/wait() on the real child here — doing so would reap
5189        // it and defeat the test.
5190        let started = Instant::now();
5191        while !task.paths.exit.exists() {
5192            assert!(
5193                started.elapsed() < Duration::from_secs(5),
5194                "exit marker should land quickly for `true`"
5195            );
5196            std::thread::sleep(Duration::from_millis(20));
5197        }
5198
5199        // Reset to a fresh Running shape and install a guaranteed-unreaped
5200        // zombie as the child handle, so the finalize path's reap behavior is
5201        // exercised deterministically regardless of how the real child was
5202        // handled. Persist Running so update_task's terminal-rollback guard
5203        // sees a non-terminal starting point.
5204        let zombie_pid;
5205        {
5206            let mut state = task.state.lock().unwrap();
5207            state.metadata.status = BgTaskStatus::Running;
5208            state.metadata.status_reason = None;
5209            state.metadata.exit_code = None;
5210            state.metadata.finished_at = None;
5211            state.metadata.duration_ms = None;
5212            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5213                .expect("persist reset Running metadata");
5214            let zombie = spawn_unreaped_zombie();
5215            zombie_pid = zombie.id();
5216            state.runtime = TaskRuntime::Piped(Some(zombie));
5217        }
5218        *task.terminal_at.lock().unwrap() = None;
5219
5220        // Precondition: the installed child is genuinely a `<defunct>` zombie.
5221        assert!(
5222            is_zombie(zombie_pid),
5223            "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5224        );
5225
5226        // Drive the exit-marker terminal path. Before the fix this nulled the
5227        // Child handle without wait(), leaving the zombie behind.
5228        registry.poll_task(&task).unwrap();
5229
5230        {
5231            let state = task.state.lock().unwrap();
5232            assert!(
5233                matches!(state.runtime, TaskRuntime::Piped(None)),
5234                "child handle must be released after marker finalize"
5235            );
5236            assert!(
5237                state.metadata.status.is_terminal(),
5238                "task must be terminal after marker finalize: {:?}",
5239                state.metadata.status
5240            );
5241        }
5242
5243        // The core assertion: the child must have been REAPED, not just
5244        // dropped. A reaped PID has no `ps` row (or at minimum is not 'Z').
5245        assert!(
5246            !is_zombie(zombie_pid),
5247            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5248             after the exit-marker terminal transition"
5249        );
5250    }
5251
5252    /// Companion to the above for the kill path: when a kill observes an
5253    /// already-present exit marker (the child finished on its own first), it
5254    /// must reap the child handle rather than dropping it.
5255    #[cfg(unix)]
5256    #[test]
5257    fn kill_with_existing_marker_reaps_child_no_zombie() {
5258        use std::sync::atomic::Ordering;
5259
5260        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5261        let dir = tempfile::tempdir().unwrap();
5262        let task_id = registry
5263            .spawn(
5264                QUICK_SUCCESS_COMMAND,
5265                "session".to_string(),
5266                dir.path().to_path_buf(),
5267                HashMap::new(),
5268                Some(Duration::from_secs(30)),
5269                dir.path().to_path_buf(),
5270                10,
5271                true,
5272                false,
5273                Some(dir.path().to_path_buf()),
5274            )
5275            .unwrap();
5276
5277        registry.inner.shutdown.store(true, Ordering::SeqCst);
5278        std::thread::sleep(Duration::from_millis(550));
5279
5280        let task = registry.task_for_session(&task_id, "session").unwrap();
5281
5282        let started = Instant::now();
5283        while !task.paths.exit.exists() {
5284            assert!(
5285                started.elapsed() < Duration::from_secs(5),
5286                "exit marker should land quickly for `true`"
5287            );
5288            std::thread::sleep(Duration::from_millis(20));
5289        }
5290
5291        let zombie_pid;
5292        {
5293            let mut state = task.state.lock().unwrap();
5294            state.metadata.status = BgTaskStatus::Running;
5295            state.metadata.status_reason = None;
5296            state.metadata.exit_code = None;
5297            state.metadata.finished_at = None;
5298            state.metadata.duration_ms = None;
5299            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5300                .expect("persist reset Running metadata");
5301            let zombie = spawn_unreaped_zombie();
5302            zombie_pid = zombie.id();
5303            state.runtime = TaskRuntime::Piped(Some(zombie));
5304        }
5305        *task.terminal_at.lock().unwrap() = None;
5306
5307        assert!(
5308            is_zombie(zombie_pid),
5309            "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5310        );
5311
5312        // Kill observes the existing marker and finalizes from it.
5313        registry
5314            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5315            .expect("kill should succeed");
5316
5317        {
5318            let state = task.state.lock().unwrap();
5319            assert!(
5320                matches!(state.runtime, TaskRuntime::Piped(None)),
5321                "child handle must be released after marker-aware kill"
5322            );
5323            assert!(state.metadata.status.is_terminal());
5324        }
5325
5326        assert!(
5327            !is_zombie(zombie_pid),
5328            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5329             after a marker-aware kill"
5330        );
5331    }
5332
5333    #[test]
5334    fn cleanup_finished_keeps_running_tasks() {
5335        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5336        let dir = tempfile::tempdir().unwrap();
5337        let task_id = registry
5338            .spawn(
5339                LONG_RUNNING_COMMAND,
5340                "session".to_string(),
5341                dir.path().to_path_buf(),
5342                HashMap::new(),
5343                Some(Duration::from_secs(30)),
5344                dir.path().to_path_buf(),
5345                10,
5346                true,
5347                false,
5348                Some(dir.path().to_path_buf()),
5349            )
5350            .unwrap();
5351
5352        registry.cleanup_finished(Duration::ZERO);
5353
5354        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5355        let _ = registry.kill(&task_id, "session");
5356    }
5357
5358    #[cfg(windows)]
5359    fn wait_for_file(path: &Path) -> String {
5360        let started = Instant::now();
5361        loop {
5362            if path.exists() {
5363                return fs::read_to_string(path).expect("read file");
5364            }
5365            assert!(
5366                started.elapsed() < Duration::from_secs(30),
5367                "timed out waiting for {}",
5368                path.display()
5369            );
5370            std::thread::sleep(Duration::from_millis(100));
5371        }
5372    }
5373
5374    #[cfg(windows)]
5375    fn spawn_windows_registry_command(
5376        command: &str,
5377    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5378        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5379        let dir = tempfile::tempdir().unwrap();
5380        let task_id = registry
5381            .spawn(
5382                command,
5383                "session".to_string(),
5384                dir.path().to_path_buf(),
5385                HashMap::new(),
5386                Some(Duration::from_secs(30)),
5387                dir.path().to_path_buf(),
5388                10,
5389                false,
5390                false,
5391                Some(dir.path().to_path_buf()),
5392            )
5393            .unwrap();
5394        (registry, dir, task_id)
5395    }
5396
5397    #[cfg(windows)]
5398    #[test]
5399    fn windows_spawn_writes_exit_marker_for_zero_exit() {
5400        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5401        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5402
5403        let content = wait_for_file(&exit_path);
5404
5405        assert_eq!(content.trim(), "0");
5406    }
5407
5408    #[cfg(windows)]
5409    #[test]
5410    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5411        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5412        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5413
5414        let content = wait_for_file(&exit_path);
5415
5416        assert_eq!(content.trim(), "42");
5417    }
5418
5419    #[cfg(windows)]
5420    #[test]
5421    fn windows_spawn_captures_stdout_to_disk() {
5422        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5423        let task = registry.task_for_session(&task_id, "session").unwrap();
5424        let stdout_path = task.paths.stdout.clone();
5425        let exit_path = task.paths.exit.clone();
5426
5427        let _ = wait_for_file(&exit_path);
5428        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5429
5430        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5431    }
5432
5433    #[cfg(windows)]
5434    #[test]
5435    fn windows_spawn_uses_pwsh_when_available() {
5436        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
5437        // (We intentionally pass None for shell_env to keep this test
5438        // independent of the runner's actual env.)
5439        let candidates = crate::windows_shell::shell_candidates_with(
5440            |binary| match binary {
5441                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5442                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5443                _ => None,
5444            },
5445            || None,
5446        );
5447        let shell = candidates.first().expect("at least one candidate").clone();
5448        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5449        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5450    }
5451
5452    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
5453    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
5454    /// evaluation is the default for batch files; parse-time expansion only
5455    /// applies to compound `&`-chained inline commands). Capturing
5456    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
5457    /// command runs records the real run-time exit code.
5458    #[cfg(windows)]
5459    #[test]
5460    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5461        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5462        let script =
5463            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5464
5465        // Batch wrapper: capture exit code into CODE on the line after the
5466        // user command, then write CODE to a temp marker file before
5467        // atomic-renaming it into place.
5468        assert!(
5469            script.contains("set CODE=%ERRORLEVEL%"),
5470            "wrapper must capture exit code into CODE: {script}"
5471        );
5472        assert!(
5473            script.contains("echo %CODE% >"),
5474            "wrapper must echo CODE to a temp marker file: {script}"
5475        );
5476        assert!(
5477            script.contains("move /Y"),
5478            "wrapper must use atomic move to write the marker: {script}"
5479        );
5480        // move output must be redirected to nul to avoid polluting the
5481        // user's captured stdout with "1 file(s) moved." lines.
5482        assert!(
5483            script.contains("> nul"),
5484            "wrapper must redirect move output to nul: {script}"
5485        );
5486        // exit /B %CODE% propagates the real exit code so wait() sees it.
5487        assert!(
5488            script.contains("exit /B %CODE%"),
5489            "wrapper must propagate the captured exit code: {script}"
5490        );
5491        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5492        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5493    }
5494
5495    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
5496    /// written to a `.bat` file where batch-line evaluation captures
5497    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
5498    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
5499    /// internal `"`-quoting from `cmd_quote`).
5500    #[cfg(windows)]
5501    #[test]
5502    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5503        use crate::windows_shell::WindowsShell;
5504        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5505        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5506        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5507        assert_eq!(
5508            args_strs,
5509            vec!["/D", "/S", "/C", "echo wrapped"],
5510            "Cmd::bg_command must prepend /D /S /C"
5511        );
5512    }
5513
5514    /// PowerShell variants don't need `/V:ON`-style flags; their args
5515    /// are the same for foreground (`command()`) and background
5516    /// (`bg_command()`).
5517    #[cfg(windows)]
5518    #[test]
5519    fn windows_shell_pwsh_bg_command_uses_standard_args() {
5520        use crate::windows_shell::WindowsShell;
5521        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5522        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5523        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5524        assert!(
5525            args_strs.contains(&"-Command"),
5526            "Pwsh::bg_command must use -Command: {args_strs:?}"
5527        );
5528        assert!(
5529            args_strs.contains(&"Get-Date"),
5530            "Pwsh::bg_command must include the user command body"
5531        );
5532    }
5533
5534    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
5535    /// **cmd.exe-specific** wrapper path captures the user command's
5536    /// run-time exit code correctly. The existing
5537    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
5538    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
5539    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
5540    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
5541    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
5542    ///
5543    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
5544    /// force the cmd-wrapper code path, then writes the exit marker and
5545    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
5546    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
5547    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
5548    /// regardless of what the user command returned.
5549    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
5550    /// the inline command-line wrapper helper that production code does
5551    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
5552    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
5553    /// produced silent failures on Windows 11 (move /Y could not parse
5554    /// path arguments through cmd's /C parser). The `bg_command` helper
5555    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
5556    /// the production spawn path goes through `detached_shell_command_for`
5557    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
5558    /// <bat-path>`.
5559    ///
5560    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
5561    /// covered live by the Windows e2e harness scenario 2d
5562    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
5563    /// exercises the real file-based wrapper end-to-end via the protocol.
5564    #[allow(dead_code)]
5565    #[cfg(any())] // disabled on all targets
5566    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
5567}