Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, DiskTruncation, StreamKind, TokenCountInput};
27use super::output::{
28    cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29    cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30    retained_json_output_pointer, COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES,
31    COMPRESS_INPUT_TAIL_BYTES, FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES,
32    RAW_PASSTHROUGH_HEAD_BYTES, RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES,
33    STRUCTURED_OUTPUT_CAP_BYTES,
34};
35use super::persistence::{
36    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
37    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
38    ExitMarker, PersistedTask, TaskPaths,
39};
40use super::process::is_process_alive;
41#[cfg(unix)]
42use super::process::terminate_pgid;
43#[cfg(windows)]
44use super::process::terminate_pid;
45use super::pty_process::spawn_pty_for_command;
46use super::pty_runtime::PtyRuntime;
47use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
48use super::{BgTaskInfo, BgTaskStatus};
49/// Default timeout for background bash tasks: 30 minutes.
50/// Agents can override per-call via the `timeout` parameter (in ms).
51const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
52const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
53const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
54const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
55
56const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
57
58#[derive(Debug, Clone, Serialize)]
59pub struct BgCompletion {
60    pub task_id: String,
61    /// Intentionally omitted from serialized completion payloads: push frames
62    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
63    #[serde(skip_serializing)]
64    pub session_id: String,
65    pub status: BgTaskStatus,
66    pub exit_code: Option<i32>,
67    pub command: String,
68    /// Small head+tail preview of the cached terminal render at completion time,
69    /// cached so push-frame consumers and `bash_drain_completions` callers see
70    /// the same preview without racing against later output rotation. Empty
71    /// when not captured (e.g., persisted task seen on startup before buffer
72    /// reattachment).
73    #[serde(default, skip_serializing_if = "String::is_empty")]
74    pub output_preview: String,
75    /// True when the captured tail is shorter than the actual output (because
76    /// rotation occurred or the output exceeds the preview cap). Plugins use
77    /// this to render a `…` prefix and signal that `bash_status` would return
78    /// more.
79    #[serde(default, skip_serializing_if = "is_false")]
80    pub output_truncated: bool,
81    /// Token count for raw stdout+stderr before compression. Omitted when any
82    /// stream exceeds the 128 KiB tokenization cap.
83    #[serde(default, skip_serializing_if = "Option::is_none")]
84    pub original_tokens: Option<u32>,
85    /// Token count for the compressed output generated from the same capped
86    /// raw payload. Omitted when raw tokenization is skipped.
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub compressed_tokens: Option<u32>,
89    /// True when a stream exceeded the tokenization cap and counts are absent.
90    #[serde(default, skip_serializing_if = "is_false")]
91    pub tokens_skipped: bool,
92}
93
94fn is_false(v: &bool) -> bool {
95    !*v
96}
97
98#[derive(Debug, Clone, Serialize)]
99pub struct BgTaskSnapshot {
100    #[serde(flatten)]
101    pub info: BgTaskInfo,
102    pub exit_code: Option<i32>,
103    pub child_pid: Option<u32>,
104    pub workdir: String,
105    pub output_preview: String,
106    pub output_truncated: bool,
107    pub output_path: Option<String>,
108    pub stderr_path: Option<String>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub pty_rows: Option<u16>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub pty_cols: Option<u16>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub pty_screen: Option<String>,
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118enum TerminalOutputKind {
119    Compressed,
120    Raw,
121    Structured,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125struct TerminalOutputCache {
126    output_preview: String,
127    output_truncated: bool,
128    kind: TerminalOutputKind,
129    output_path: Option<String>,
130    stderr_path: Option<String>,
131    recovery: Option<RecoveryContext>,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135struct RecoveryContext {
136    dropped_by_class: BTreeMap<DropClass, usize>,
137    had_inner_drop: bool,
138    offset_hint_eligible: bool,
139    offset_start_line: Option<usize>,
140    byte_truncated: bool,
141    disk_truncated_prefix_bytes: u64,
142    output_path: Option<String>,
143    stderr_path: Option<String>,
144    include_stderr_path: bool,
145}
146
147impl RecoveryContext {
148    fn has_visible_drop(&self) -> bool {
149        self.byte_truncated
150            || self.disk_truncated_prefix_bytes > 0
151            || self.had_inner_drop
152            || !self.dropped_by_class.is_empty()
153    }
154}
155
156#[derive(Clone)]
157pub struct BgTaskRegistry {
158    pub(crate) inner: Arc<RegistryInner>,
159}
160
161pub(crate) struct RegistryInner {
162    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
163    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
164    pub(crate) progress_sender: SharedProgressSender,
165    watchdog_started: AtomicBool,
166    pub(crate) shutdown: AtomicBool,
167    pub(crate) long_running_reminder_enabled: AtomicBool,
168    pub(crate) long_running_reminder_interval_ms: AtomicU64,
169    persisted_gc_started: AtomicBool,
170    #[cfg(test)]
171    persisted_gc_runs: AtomicU64,
172    /// Output compression callback. Set by `AppContext` after construction.
173    /// Takes (command, raw_output, exit_code) and returns compressed text. Called from
174    /// the watchdog thread when a task reaches a terminal state and from
175    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
176    /// uncompressed.
177    pub(crate) compressor:
178        Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
179    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
180    pub(crate) db_harness: RwLock<Option<String>>,
181    pub(crate) wake_tx: crossbeam_channel::Sender<()>,
182    pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
183    pub(crate) watch_registry: Mutex<WatchRegistry>,
184}
185
186pub(crate) struct BgTask {
187    pub(crate) task_id: String,
188    pub(crate) session_id: String,
189    pub(crate) paths: TaskPaths,
190    pub(crate) started: Instant,
191    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
192    pub(crate) terminal_at: Mutex<Option<Instant>>,
193    pub(crate) state: Mutex<BgTaskState>,
194}
195
196pub(crate) enum TaskRuntime {
197    Piped(Option<Child>),
198    Pty(Option<PtyRuntime>),
199}
200
201pub(crate) struct BgTaskState {
202    pub(crate) metadata: PersistedTask,
203    pub(crate) runtime: TaskRuntime,
204    pub(crate) detached: bool,
205    /// True once `reap_child` has observed the direct child handle's exit
206    /// via `try_wait()`. Used by the two-pass watchdog to skip the racy
207    /// `is_process_alive(child_pid)` probe on the second pass — we already
208    /// have authoritative evidence that the child is dead, no need to
209    /// re-verify via PID liveness which is unreliable on Windows where
210    /// PIDs can be recycled within seconds.
211    ///
212    /// Remains `false` on replay-restored tasks (those have a `child_pid`
213    /// but never observed exit via this process's `try_wait()`), so those
214    /// continue to fall through to the `is_process_alive` probe path.
215    pub(crate) child_exit_observed: bool,
216    pub(crate) buffer: BgBuffer,
217    terminal_output_cache: Option<TerminalOutputCache>,
218    /// PTY-only: set for timeout kill intent before signaling the child.
219    pub(crate) pending_terminal_override: Option<BgTaskStatus>,
220}
221
222fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
223    session_id
224        .map(|session_id| completion.session_id == session_id)
225        .unwrap_or(true)
226}
227
228impl BgTaskRegistry {
229    pub fn new(progress_sender: SharedProgressSender) -> Self {
230        let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
231        Self {
232            inner: Arc::new(RegistryInner {
233                tasks: Mutex::new(HashMap::new()),
234                completions: Mutex::new(VecDeque::new()),
235                progress_sender,
236                watchdog_started: AtomicBool::new(false),
237                shutdown: AtomicBool::new(false),
238                long_running_reminder_enabled: AtomicBool::new(true),
239                long_running_reminder_interval_ms: AtomicU64::new(600_000),
240                persisted_gc_started: AtomicBool::new(false),
241                #[cfg(test)]
242                persisted_gc_runs: AtomicU64::new(0),
243                compressor: Mutex::new(None),
244                db_pool: RwLock::new(None),
245                db_harness: RwLock::new(None),
246                wake_tx,
247                wake_rx,
248                watch_registry: Mutex::new(WatchRegistry::default()),
249            }),
250        }
251    }
252
253    pub fn set_harness(&self, harness: Harness) {
254        if let Ok(mut slot) = self.inner.db_harness.write() {
255            *slot = Some(harness.storage_segment());
256        }
257    }
258
259    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
260        if let Ok(mut slot) = self.inner.db_pool.write() {
261            *slot = Some(conn);
262        }
263    }
264
265    pub fn clear_db_pool(&self) {
266        if let Ok(mut slot) = self.inner.db_pool.write() {
267            *slot = None;
268        }
269    }
270
271    /// Install the output-compression callback. Called by `main.rs` after
272    /// `AppContext` is constructed so that snapshot/completion paths can
273    /// invoke `compress::compress_with_registry` without holding a context
274    /// reference. When called multiple times, the latest installation wins.
275    pub fn set_compressor<F>(&self, compressor: F)
276    where
277        F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
278    {
279        self.set_compressor_with_exit_code(move |command, output, _exit_code| {
280            compressor(command, output)
281        });
282    }
283
284    pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
285    where
286        F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
287    {
288        if let Ok(mut slot) = self.inner.compressor.lock() {
289            *slot = Some(Box::new(compressor));
290        }
291    }
292
293    /// Apply the installed compressor (if any) to `output`. Returns `output`
294    /// untouched when no compressor is installed.
295    pub(crate) fn compress_output(
296        &self,
297        command: &str,
298        output: String,
299        exit_code: Option<i32>,
300    ) -> CompressionResult {
301        let Ok(slot) = self.inner.compressor.lock() else {
302            return CompressionResult::new(output);
303        };
304        match slot.as_ref() {
305            Some(compressor) => compressor(command, output, exit_code),
306            None => CompressionResult::new(output),
307        }
308    }
309
310    fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
311        let (metadata, buffer) = {
312            let state = task.state.lock().ok()?;
313            if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
314                return None;
315            }
316            if let Some(cache) = state.terminal_output_cache.clone() {
317                return Some(cache);
318            }
319            (state.metadata.clone(), state.buffer.clone())
320        };
321
322        let mut cap_buffer = buffer.clone();
323        let disk_truncation = cap_buffer.enforce_terminal_cap();
324        let cache = self.render_terminal_output(&metadata, &cap_buffer, disk_truncation);
325        let mut state = task.state.lock().ok()?;
326        if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
327            return None;
328        }
329        if let Some(existing) = state.terminal_output_cache.clone() {
330            return Some(existing);
331        }
332        state.terminal_output_cache = Some(cache.clone());
333        Some(cache)
334    }
335
336    fn render_terminal_output(
337        &self,
338        metadata: &PersistedTask,
339        buffer: &BgBuffer,
340        disk_truncation: DiskTruncation,
341    ) -> TerminalOutputCache {
342        if metadata.mode == BgMode::Pty {
343            return TerminalOutputCache {
344                output_preview: String::new(),
345                output_truncated: false,
346                kind: TerminalOutputKind::Raw,
347                output_path: buffer.output_path().map(|path| path.display().to_string()),
348                stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
349                recovery: None,
350            };
351        }
352
353        if let Some(structured) =
354            render_structured_output(&metadata.command, buffer, disk_truncation)
355        {
356            return structured;
357        }
358
359        if !metadata.compressed {
360            return render_raw_passthrough(buffer, disk_truncation);
361        }
362
363        let raw = buffer.read_combined_head_tail(
364            COMPRESS_INPUT_CAP_BYTES,
365            COMPRESS_INPUT_HEAD_BYTES,
366            COMPRESS_INPUT_TAIL_BYTES,
367        );
368        let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
369        render_compressed_with_recovery(buffer, compressed, raw.truncated, disk_truncation)
370    }
371
372    fn snapshot_with_terminal_cache(
373        &self,
374        task: &Arc<BgTask>,
375        preview_bytes: usize,
376    ) -> BgTaskSnapshot {
377        let mut snapshot = task.snapshot(preview_bytes);
378        self.maybe_compress_snapshot(task, &mut snapshot);
379        snapshot
380    }
381
382    fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
383        let (metadata, buffer) = {
384            let state = task
385                .state
386                .lock()
387                .map_err(|_| "background task lock poisoned".to_string())?;
388            if !state.metadata.status.is_terminal() {
389                return Ok(());
390            }
391            (state.metadata.clone(), state.buffer.clone())
392        };
393
394        let cache = self.ensure_terminal_output_cache(task);
395        self.enqueue_completion_from_parts(
396            &metadata,
397            Some(&buffer),
398            None,
399            emit_frame,
400            cache.as_ref(),
401        );
402        Ok(())
403    }
404
405    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
406        write_task(&paths.json, metadata)?;
407        self.dual_write_task(paths, metadata);
408        Ok(())
409    }
410
411    fn update_task_metadata<F>(
412        &self,
413        paths: &TaskPaths,
414        update: F,
415    ) -> std::io::Result<PersistedTask>
416    where
417        F: FnOnce(&mut PersistedTask),
418    {
419        let metadata = update_task(&paths.json, update)?;
420        self.dual_write_task(paths, &metadata);
421        Ok(metadata)
422    }
423
424    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
425        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
426        let Some(pool) = pool else {
427            return;
428        };
429        let harness = self
430            .inner
431            .db_harness
432            .read()
433            .ok()
434            .and_then(|slot| slot.clone());
435        let Some(harness) = harness else {
436            crate::slog_warn!(
437                "dual-write bash_task to DB skipped for {}: harness not configured",
438                metadata.task_id
439            );
440            return;
441        };
442        let row = match metadata.to_bash_task_row(&harness, paths) {
443            Ok(row) => row,
444            Err(error) => {
445                crate::slog_warn!(
446                    "dual-write bash_task to DB failed for {}: {}",
447                    metadata.task_id,
448                    error
449                );
450                return;
451            }
452        };
453        let conn = match pool.lock() {
454            Ok(conn) => conn,
455            Err(_) => {
456                crate::slog_warn!(
457                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
458                    metadata.task_id
459                );
460                return;
461            }
462        };
463        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
464            crate::slog_warn!(
465                "dual-write bash_task to DB failed for {}: {}",
466                metadata.task_id,
467                error
468            );
469        }
470    }
471
472    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
473        self.inner
474            .long_running_reminder_enabled
475            .store(enabled, Ordering::SeqCst);
476        self.inner
477            .long_running_reminder_interval_ms
478            .store(interval_ms, Ordering::SeqCst);
479    }
480
481    #[cfg(unix)]
482    #[allow(clippy::too_many_arguments)]
483    pub fn spawn(
484        &self,
485        command: &str,
486        session_id: String,
487        workdir: PathBuf,
488        env: HashMap<String, String>,
489        timeout: Option<Duration>,
490        storage_dir: PathBuf,
491        max_running: usize,
492        notify_on_completion: bool,
493        compressed: bool,
494        project_root: Option<PathBuf>,
495    ) -> Result<String, String> {
496        self.start_watchdog();
497
498        let running = self.running_count();
499        if running >= max_running {
500            return Err(format!(
501                "background bash task limit exceeded: {running} running (max {max_running})"
502            ));
503        }
504
505        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
506        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
507        let task_id = self.generate_unique_task_id()?;
508        let paths = task_paths(&storage_dir, &session_id, &task_id);
509        fs::create_dir_all(&paths.dir)
510            .map_err(|e| format!("failed to create background task dir: {e}"))?;
511
512        let mut metadata = PersistedTask::starting(
513            task_id.clone(),
514            session_id.clone(),
515            command.to_string(),
516            workdir.clone(),
517            project_root,
518            timeout_ms,
519            notify_on_completion,
520            compressed,
521        );
522        self.persist_task(&paths, &metadata)
523            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
524
525        // Pre-create capture files so the watchdog/buffer can always
526        // open them for reading. The spawn helper opens its own handles
527        // per attempt because each `Command::spawn()` consumes them.
528        create_capture_file(&paths.stdout)
529            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
530        create_capture_file(&paths.stderr)
531            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
532
533        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
534            Ok(child) => child,
535            Err(error) => {
536                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
537                let _ = delete_task_bundle(&paths);
538                return Err(error);
539            }
540        };
541
542        let child_pid = child.id();
543        metadata.mark_running(child_pid, child_pid as i32);
544        self.persist_task(&paths, &metadata)
545            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
546
547        let task = Arc::new(BgTask {
548            task_id: task_id.clone(),
549            session_id,
550            paths: paths.clone(),
551            started: Instant::now(),
552            last_reminder_at: Mutex::new(None),
553            terminal_at: Mutex::new(None),
554            state: Mutex::new(BgTaskState {
555                metadata,
556                runtime: TaskRuntime::Piped(Some(child)),
557                detached: false,
558                child_exit_observed: false,
559                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
560                terminal_output_cache: None,
561                pending_terminal_override: None,
562            }),
563        });
564
565        self.inner
566            .tasks
567            .lock()
568            .map_err(|_| "background task registry lock poisoned".to_string())?
569            .insert(task_id.clone(), task);
570
571        Ok(task_id)
572    }
573
574    #[allow(clippy::too_many_arguments)]
575    pub fn spawn_pty(
576        &self,
577        command: &str,
578        session_id: String,
579        workdir: PathBuf,
580        env: HashMap<String, String>,
581        timeout: Option<Duration>,
582        storage_dir: PathBuf,
583        max_running: usize,
584        notify_on_completion: bool,
585        compressed: bool,
586        project_root: Option<PathBuf>,
587        rows: u16,
588        cols: u16,
589    ) -> Result<String, String> {
590        self.start_watchdog();
591
592        let running = self.running_count();
593        if running >= max_running {
594            return Err(format!(
595                "background bash task limit exceeded: {running} running (max {max_running})"
596            ));
597        }
598
599        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
600        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
601        let task_id = self.generate_unique_task_id()?;
602        let paths = task_paths(&storage_dir, &session_id, &task_id);
603        fs::create_dir_all(&paths.dir)
604            .map_err(|e| format!("failed to create background task dir: {e}"))?;
605
606        let mut metadata = PersistedTask::starting(
607            task_id.clone(),
608            session_id.clone(),
609            command.to_string(),
610            workdir.clone(),
611            project_root,
612            timeout_ms,
613            notify_on_completion,
614            compressed,
615        );
616        metadata.mode = BgMode::Pty;
617        metadata.pty_rows = Some(rows);
618        metadata.pty_cols = Some(cols);
619        self.persist_task(&paths, &metadata)
620            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
621        create_capture_file(&paths.pty)
622            .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
623
624        let runtime = match spawn_pty_for_command(
625            &task_id,
626            &session_id,
627            command,
628            &paths,
629            &workdir,
630            &env,
631            rows,
632            cols,
633            self.inner.wake_tx.clone(),
634        ) {
635            Ok(runtime) => runtime,
636            Err(error) => {
637                crate::slog_warn!(
638                    "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
639                );
640                let _ = delete_task_bundle(&paths);
641                return Err(error);
642            }
643        };
644
645        if let Some(child_pid) = runtime.child_pid {
646            metadata.mark_running(child_pid, child_pid as i32);
647        } else {
648            metadata.status = BgTaskStatus::Running;
649            metadata.pgid = None;
650        }
651        self.persist_task(&paths, &metadata)
652            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
653
654        let task = Arc::new(BgTask {
655            task_id: task_id.clone(),
656            session_id,
657            paths: paths.clone(),
658            started: Instant::now(),
659            last_reminder_at: Mutex::new(None),
660            terminal_at: Mutex::new(None),
661            state: Mutex::new(BgTaskState {
662                metadata,
663                runtime: TaskRuntime::Pty(Some(runtime)),
664                detached: false,
665                child_exit_observed: false,
666                buffer: BgBuffer::pty(paths.pty.clone()),
667                terminal_output_cache: None,
668                pending_terminal_override: None,
669            }),
670        });
671
672        self.inner
673            .tasks
674            .lock()
675            .map_err(|_| "background task registry lock poisoned".to_string())?
676            .insert(task_id.clone(), task);
677
678        Ok(task_id)
679    }
680
681    #[cfg(windows)]
682    #[allow(clippy::too_many_arguments)]
683    pub fn spawn(
684        &self,
685        command: &str,
686        session_id: String,
687        workdir: PathBuf,
688        env: HashMap<String, String>,
689        timeout: Option<Duration>,
690        storage_dir: PathBuf,
691        max_running: usize,
692        notify_on_completion: bool,
693        compressed: bool,
694        project_root: Option<PathBuf>,
695    ) -> Result<String, String> {
696        self.start_watchdog();
697
698        let running = self.running_count();
699        if running >= max_running {
700            return Err(format!(
701                "background bash task limit exceeded: {running} running (max {max_running})"
702            ));
703        }
704
705        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
706        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
707        let task_id = self.generate_unique_task_id()?;
708        let paths = task_paths(&storage_dir, &session_id, &task_id);
709        fs::create_dir_all(&paths.dir)
710            .map_err(|e| format!("failed to create background task dir: {e}"))?;
711
712        let mut metadata = PersistedTask::starting(
713            task_id.clone(),
714            session_id.clone(),
715            command.to_string(),
716            workdir.clone(),
717            project_root,
718            timeout_ms,
719            notify_on_completion,
720            compressed,
721        );
722        self.persist_task(&paths, &metadata)
723            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
724
725        // Capture files are pre-created so the watchdog/buffer can always
726        // open them for reading even if the child hasn't written anything
727        // yet. The spawn helper opens its own handles per attempt because
728        // each `Command::spawn()` consumes them, and on Windows we may
729        // retry across multiple shell candidates if the first one fails.
730        create_capture_file(&paths.stdout)
731            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
732        create_capture_file(&paths.stderr)
733            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
734
735        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
736            Ok(child) => child,
737            Err(error) => {
738                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
739                let _ = delete_task_bundle(&paths);
740                return Err(error);
741            }
742        };
743
744        let child_pid = child.id();
745        metadata.status = BgTaskStatus::Running;
746        metadata.child_pid = Some(child_pid);
747        metadata.pgid = None;
748        self.persist_task(&paths, &metadata)
749            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
750
751        let task = Arc::new(BgTask {
752            task_id: task_id.clone(),
753            session_id,
754            paths: paths.clone(),
755            started: Instant::now(),
756            last_reminder_at: Mutex::new(None),
757            terminal_at: Mutex::new(None),
758            state: Mutex::new(BgTaskState {
759                metadata,
760                runtime: TaskRuntime::Piped(Some(child)),
761                detached: false,
762                child_exit_observed: false,
763                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
764                terminal_output_cache: None,
765                pending_terminal_override: None,
766            }),
767        });
768
769        self.inner
770            .tasks
771            .lock()
772            .map_err(|_| "background task registry lock poisoned".to_string())?
773            .insert(task_id.clone(), task);
774
775        Ok(task_id)
776    }
777
778    pub fn write_pty(
779        &self,
780        task_id: &str,
781        session_id: &str,
782        input: &[u8],
783    ) -> Result<usize, String> {
784        let task = self
785            .task_for_session(task_id, session_id)
786            .ok_or_else(|| "task_not_found".to_string())?;
787
788        let writer = {
789            let state = task
790                .state
791                .lock()
792                .map_err(|_| "background task lock poisoned".to_string())?;
793            if state.metadata.mode != BgMode::Pty {
794                return Err("task_not_pty".to_string());
795            }
796            if state.metadata.status.is_terminal() {
797                return Err("task_exited".to_string());
798            }
799            match &state.runtime {
800                TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
801                TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
802                TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
803            }
804        };
805
806        let mut writer = writer
807            .lock()
808            .map_err(|_| "PTY writer lock poisoned".to_string())?;
809        writer
810            .write_all(input)
811            .map_err(|error| format!("failed to write to PTY: {error}"))?;
812        writer
813            .flush()
814            .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
815        Ok(input.len())
816    }
817
818    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
819        self.replay_session_inner(storage_dir, session_id, None)
820    }
821
822    pub fn replay_session_for_project(
823        &self,
824        storage_dir: &Path,
825        session_id: &str,
826        project_root: &Path,
827    ) -> Result<(), String> {
828        self.replay_session_inner(storage_dir, session_id, Some(project_root))
829    }
830
831    fn replay_session_inner(
832        &self,
833        storage_dir: &Path,
834        session_id: &str,
835        project_root: Option<&Path>,
836    ) -> Result<(), String> {
837        self.start_watchdog();
838        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
839            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
840                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
841            }
842        }
843
844        let canonical_project = project_root.map(canonicalized_path);
845        // Replay strategy: DB is the post-v0.27 source of truth. Disk
846        // fallback handles pre-v0.27 tasks that haven't been migrated and
847        // the cold-start `__default__` namespace (configure runs before any
848        // user session exists, so plugin-init triggers a session-less DB
849        // lookup that will be empty until a real session writes a task).
850        //
851        // We deliberately keep the empty-DB / empty-disk path silent — it's
852        // the normal startup case and would otherwise fire on every configure
853        // (see GitHub user report against v0.27.0). INFO-level logs only when
854        // disk actually returned tasks (real migration signal); WARN when the
855        // DB lookup itself errored.
856        let tasks = match self.replay_session_from_db(session_id) {
857            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
858            Some(Ok(_)) => {
859                let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
860                if !disk_tasks.is_empty() {
861                    crate::slog_info!(
862                        "bash task replay: 0 in DB for session {}, {} from disk fallback",
863                        session_id,
864                        disk_tasks.len()
865                    );
866                }
867                disk_tasks
868            }
869            Some(Err(error)) => {
870                crate::slog_warn!(
871                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
872                    session_id,
873                    error
874                );
875                self.replay_session_from_disk(storage_dir, session_id)?
876            }
877            None => {
878                // DB pool unconfigured — common in tests + before harness is set.
879                self.replay_session_from_disk(storage_dir, session_id)?
880            }
881        };
882
883        for mut metadata in tasks {
884            if metadata.session_id != session_id {
885                continue;
886            }
887            if let Some(canonical_project) = canonical_project.as_deref() {
888                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
889                if metadata_project.as_deref() != Some(canonical_project) {
890                    continue;
891                }
892            }
893
894            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
895            match metadata.status {
896                BgTaskStatus::Starting => {
897                    let completion_was_delivered = metadata.completion_delivered;
898                    metadata.mark_terminal(
899                        BgTaskStatus::Failed,
900                        None,
901                        Some("spawn aborted".to_string()),
902                    );
903                    metadata.completion_delivered |= completion_was_delivered;
904                    let _ = self.persist_task(&paths, &metadata);
905                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
906                    self.insert_rehydrated_task(metadata, paths, true)?;
907                }
908                BgTaskStatus::Running | BgTaskStatus::Killing => {
909                    if metadata.mode == BgMode::Pty {
910                        if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
911                            let completion_was_delivered = metadata.completion_delivered;
912                            metadata = terminal_metadata_from_marker(metadata, marker, None);
913                            metadata.completion_delivered |= completion_was_delivered;
914                            let _ = self.persist_task(&paths, &metadata);
915                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
916                            self.insert_rehydrated_task(metadata, paths, true)?;
917                        } else if metadata.status.is_terminal() {
918                            self.insert_rehydrated_task(metadata, paths, true)?;
919                        } else {
920                            let completion_was_delivered = metadata.completion_delivered;
921                            metadata.mark_terminal(
922                                BgTaskStatus::Killed,
923                                None,
924                                Some("pty_lost_on_bridge_restart".to_string()),
925                            );
926                            metadata.completion_delivered |= completion_was_delivered;
927                            let _ = self.persist_task(&paths, &metadata);
928                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
929                            self.insert_rehydrated_task(metadata, paths, true)?;
930                        }
931                    } else if self.running_metadata_is_stale(&metadata) {
932                        let completion_was_delivered = metadata.completion_delivered;
933                        metadata.mark_terminal(
934                            BgTaskStatus::Killed,
935                            None,
936                            Some("orphaned (>24h)".to_string()),
937                        );
938                        metadata.completion_delivered |= completion_was_delivered;
939                        if !paths.exit.exists() {
940                            let _ = write_kill_marker_if_absent(&paths.exit);
941                        }
942                        let _ = self.persist_task(&paths, &metadata);
943                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
944                        self.insert_rehydrated_task(metadata, paths, true)?;
945                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
946                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
947                            "recovered from inconsistent killing state on replay".to_string()
948                        });
949                        if reason.is_some() {
950                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
951                            metadata.task_id);
952                        }
953                        let completion_was_delivered = metadata.completion_delivered;
954                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
955                        metadata.completion_delivered |= completion_was_delivered;
956                        let _ = self.persist_task(&paths, &metadata);
957                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
958                        self.insert_rehydrated_task(metadata, paths, true)?;
959                    } else if metadata.status == BgTaskStatus::Killing {
960                        if !paths.exit.exists() {
961                            let _ = write_kill_marker_if_absent(&paths.exit);
962                        }
963                        let completion_was_delivered = metadata.completion_delivered;
964                        metadata.mark_terminal(
965                            BgTaskStatus::Killed,
966                            None,
967                            Some("recovered from inconsistent killing state on replay".to_string()),
968                        );
969                        metadata.completion_delivered |= completion_was_delivered;
970                        let _ = self.persist_task(&paths, &metadata);
971                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
972                        self.insert_rehydrated_task(metadata, paths, true)?;
973                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
974                        let completion_was_delivered = metadata.completion_delivered;
975                        metadata.mark_terminal(
976                            BgTaskStatus::Failed,
977                            None,
978                            Some("process exited without exit marker".to_string()),
979                        );
980                        metadata.completion_delivered |= completion_was_delivered;
981                        let _ = self.persist_task(&paths, &metadata);
982                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
983                        self.insert_rehydrated_task(metadata, paths, true)?;
984                    } else {
985                        self.insert_rehydrated_task(metadata, paths, true)?;
986                    }
987                }
988                _ if metadata.status.is_terminal() => {
989                    // Borrow `paths` for the completion enqueue BEFORE
990                    // `insert_rehydrated_task` consumes it. The completion
991                    // helper only reads from `paths` (stdout/stderr/exit) to
992                    // reconstruct a tail preview, so it must see the same
993                    // paths the rehydrated task will own.
994                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
995                    self.insert_rehydrated_task(metadata, paths, true)?;
996                }
997                _ => {}
998            }
999        }
1000
1001        Ok(())
1002    }
1003
1004    fn replay_session_from_db(
1005        &self,
1006        session_id: &str,
1007    ) -> Option<Result<Vec<PersistedTask>, String>> {
1008        let pool = self
1009            .inner
1010            .db_pool
1011            .read()
1012            .ok()
1013            .and_then(|slot| slot.clone())?;
1014        let harness = self
1015            .inner
1016            .db_harness
1017            .read()
1018            .ok()
1019            .and_then(|slot| slot.clone())?;
1020        let conn = match pool.lock() {
1021            Ok(conn) => conn,
1022            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1023        };
1024        Some(
1025            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1026                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1027                .map_err(|error| error.to_string()),
1028        )
1029    }
1030
1031    fn replay_session_from_disk(
1032        &self,
1033        storage_dir: &Path,
1034        session_id: &str,
1035    ) -> Result<Vec<PersistedTask>, String> {
1036        let dir = session_tasks_dir(storage_dir, session_id);
1037        if !dir.exists() {
1038            return Ok(Vec::new());
1039        }
1040
1041        let entries = fs::read_dir(&dir)
1042            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1043        let mut tasks = Vec::new();
1044        for entry in entries.flatten() {
1045            let path = entry.path();
1046            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1047                continue;
1048            }
1049            match read_task(&path) {
1050                Ok(metadata) => tasks.push(metadata),
1051                Err(error) => {
1052                    crate::slog_warn!(
1053                        "quarantining invalid background task metadata {} during replay: {error}",
1054                        path.display()
1055                    );
1056                    if let Err(quarantine_error) =
1057                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1058                    {
1059                        crate::slog_warn!(
1060                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1061                            path.display()
1062                        );
1063                    }
1064                }
1065            }
1066        }
1067        Ok(tasks)
1068    }
1069
1070    pub fn register_watch(
1071        &self,
1072        task_id: String,
1073        pattern: WatchPattern,
1074        once: bool,
1075    ) -> Result<String, &'static str> {
1076        let task = self.task(&task_id).ok_or("task_not_found")?;
1077        let (mode, terminal_at_registration, stdout, stderr, pty) = task
1078            .state
1079            .lock()
1080            .map(|state| {
1081                (
1082                    state.metadata.mode.clone(),
1083                    state.metadata.status.is_terminal(),
1084                    task.paths.stdout.clone(),
1085                    task.paths.stderr.clone(),
1086                    task.paths.pty.clone(),
1087                )
1088            })
1089            .map_err(|_| "background_task_lock_poisoned")?;
1090
1091        let mut terminal_matches = Vec::new();
1092        let scanned_terminal = terminal_at_registration;
1093        let watch_id = {
1094            let mut registry = self
1095                .inner
1096                .watch_registry
1097                .lock()
1098                .map_err(|_| "watch_registry_poisoned")?;
1099            let watch_id = registry.register(task_id.clone(), pattern, once)?;
1100            match &mode {
1101                BgMode::Pipes => {
1102                    let stdout_key = format!("{task_id}:stdout");
1103                    let stderr_key = format!("{task_id}:stderr");
1104                    if terminal_at_registration {
1105                        registry.set_file_cursor(&stdout_key, 0);
1106                        registry.set_file_cursor(&stderr_key, 0);
1107                        terminal_matches.extend(registry.scan_file_new_bytes(
1108                            &stdout_key,
1109                            &task_id,
1110                            &stdout,
1111                        ));
1112                        terminal_matches.extend(registry.scan_file_new_bytes(
1113                            &stderr_key,
1114                            &task_id,
1115                            &stderr,
1116                        ));
1117                    } else {
1118                        registry.prime_file_cursor(&stdout_key, &stdout);
1119                        registry.prime_file_cursor(&stderr_key, &stderr);
1120                    }
1121                }
1122                BgMode::Pty => {
1123                    let pty_key = format!("{task_id}:pty");
1124                    if terminal_at_registration {
1125                        registry.set_file_cursor(&pty_key, 0);
1126                        terminal_matches
1127                            .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1128                    } else {
1129                        registry.prime_file_cursor(&pty_key, &pty);
1130                    }
1131                }
1132            }
1133            watch_id
1134        };
1135
1136        if task.is_terminal() {
1137            if !scanned_terminal {
1138                terminal_matches = {
1139                    let mut registry = self
1140                        .inner
1141                        .watch_registry
1142                        .lock()
1143                        .map_err(|_| "watch_registry_poisoned")?;
1144                    match &mode {
1145                        BgMode::Pipes => {
1146                            let stdout_key = format!("{task_id}:stdout");
1147                            let stderr_key = format!("{task_id}:stderr");
1148                            registry.set_file_cursor(&stdout_key, 0);
1149                            registry.set_file_cursor(&stderr_key, 0);
1150                            let mut matches =
1151                                registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1152                            matches.extend(registry.scan_file_new_bytes(
1153                                &stderr_key,
1154                                &task_id,
1155                                &stderr,
1156                            ));
1157                            matches
1158                        }
1159                        BgMode::Pty => {
1160                            let pty_key = format!("{task_id}:pty");
1161                            registry.set_file_cursor(&pty_key, 0);
1162                            registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1163                        }
1164                    }
1165                };
1166            }
1167
1168            let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1169            if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1170                if watch_matched {
1171                    let _ = task.set_completion_delivered(true, self);
1172                    self.clear_task_watch_state(&task_id);
1173                }
1174                return Ok(watch_id);
1175            }
1176
1177            let completion = self
1178                .remove_pending_completion(&task_id)
1179                .or_else(|| self.completion_snapshot_for_task(&task));
1180            if terminal_matches.is_empty() {
1181                if let Some(completion) = completion.as_ref() {
1182                    self.emit_bash_watch_exit(completion);
1183                }
1184            } else {
1185                for pattern_match in terminal_matches {
1186                    self.emit_bash_pattern_match(&task.session_id, pattern_match);
1187                }
1188            }
1189            let _ = task.set_completion_delivered(true, self);
1190            self.clear_task_watch_state(&task_id);
1191        }
1192
1193        Ok(watch_id)
1194    }
1195
1196    pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1197        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1198            registry.unregister(task_id, watch_id);
1199        }
1200    }
1201
1202    pub fn active_watch_count(&self, task_id: &str) -> usize {
1203        self.inner
1204            .watch_registry
1205            .lock()
1206            .map(|registry| registry.active_count(task_id))
1207            .unwrap_or(0)
1208    }
1209
1210    fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1211        self.inner
1212            .watch_registry
1213            .lock()
1214            .map(|registry| {
1215                (
1216                    registry.has_controlled_task(task_id),
1217                    registry.has_matched_task(task_id),
1218                )
1219            })
1220            .unwrap_or((false, false))
1221    }
1222
1223    fn task_has_watch_control(&self, task_id: &str) -> bool {
1224        self.inner
1225            .watch_registry
1226            .lock()
1227            .map(|registry| registry.has_controlled_task(task_id))
1228            .unwrap_or(false)
1229    }
1230
1231    fn clear_task_watch_state(&self, task_id: &str) {
1232        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1233            registry.clear_task(task_id);
1234        }
1235    }
1236
1237    pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1238        let (mode, stdout, stderr, pty) = match task.state.lock() {
1239            Ok(state) => (
1240                state.metadata.mode.clone(),
1241                task.paths.stdout.clone(),
1242                task.paths.stderr.clone(),
1243                task.paths.pty.clone(),
1244            ),
1245            Err(_) => return,
1246        };
1247        let mut matches = Vec::new();
1248        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1249            match mode {
1250                BgMode::Pipes => {
1251                    let stdout_key = format!("{}:stdout", task.task_id);
1252                    let stderr_key = format!("{}:stderr", task.task_id);
1253                    matches.extend(registry.scan_file_new_bytes(
1254                        &stdout_key,
1255                        &task.task_id,
1256                        &stdout,
1257                    ));
1258                    matches.extend(registry.scan_file_new_bytes(
1259                        &stderr_key,
1260                        &task.task_id,
1261                        &stderr,
1262                    ));
1263                }
1264                BgMode::Pty => {
1265                    let pty_key = format!("{}:pty", task.task_id);
1266                    matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1267                }
1268            }
1269        }
1270        for pattern_match in matches {
1271            self.emit_bash_pattern_match(&task.session_id, pattern_match);
1272        }
1273    }
1274
1275    pub fn status(
1276        &self,
1277        task_id: &str,
1278        session_id: &str,
1279        project_root: Option<&Path>,
1280        storage_dir: Option<&Path>,
1281        preview_bytes: usize,
1282    ) -> Option<BgTaskSnapshot> {
1283        let mut task = self.task_for_session(task_id, session_id);
1284        if task.is_none() {
1285            if let Some(storage_dir) = storage_dir {
1286                let _ = if let Some(project_root) = project_root {
1287                    self.replay_session_for_project(storage_dir, session_id, project_root)
1288                } else {
1289                    self.replay_session(storage_dir, session_id)
1290                };
1291                task = self.task_for_session(task_id, session_id);
1292            }
1293        }
1294        let Some(task) = task else {
1295            return self.status_relaxed(
1296                task_id,
1297                session_id,
1298                project_root?,
1299                storage_dir?,
1300                preview_bytes,
1301            );
1302        };
1303        let _ = self.poll_task(&task);
1304        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1305    }
1306
1307    fn status_relaxed_task(
1308        &self,
1309        task_id: &str,
1310        project_root: &Path,
1311        storage_dir: &Path,
1312    ) -> Option<Arc<BgTask>> {
1313        let canonical_project = canonicalized_path(project_root);
1314        match self.lookup_relaxed_task_from_db(task_id, project_root) {
1315            Some(Ok(Some(metadata))) => {
1316                if let Some(task) = self.task(task_id) {
1317                    let matches_project = task
1318                        .state
1319                        .lock()
1320                        .map(|state| {
1321                            state
1322                                .metadata
1323                                .project_root
1324                                .as_deref()
1325                                .map(canonicalized_path)
1326                                .as_deref()
1327                                == Some(canonical_project.as_path())
1328                        })
1329                        .unwrap_or(false);
1330                    return matches_project.then_some(task);
1331                }
1332                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1333                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1334                    return None;
1335                }
1336                return self.task(task_id);
1337            }
1338            Some(Ok(None)) => {
1339                crate::slog_info!(
1340                    "bash task relaxed DB miss for {}; falling back to disk",
1341                    task_id
1342                );
1343            }
1344            Some(Err(error)) => {
1345                crate::slog_warn!(
1346                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1347                    task_id,
1348                    error
1349                );
1350            }
1351            None => {
1352                crate::slog_info!(
1353                    "bash task relaxed DB unavailable for {}; falling back to disk",
1354                    task_id
1355                );
1356            }
1357        }
1358        let root = storage_dir.join("bash-tasks");
1359        let entries = fs::read_dir(&root).ok()?;
1360        for entry in entries.flatten() {
1361            let dir = entry.path();
1362            if !dir.is_dir() {
1363                continue;
1364            }
1365            let path = dir.join(format!("{task_id}.json"));
1366            if !path.exists() {
1367                continue;
1368            }
1369            let metadata = match read_task(&path) {
1370                Ok(metadata) => metadata,
1371                Err(error) => {
1372                    crate::slog_warn!(
1373                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1374                        path.display()
1375                    );
1376                    if let Err(quarantine_error) =
1377                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1378                    {
1379                        crate::slog_warn!(
1380                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1381                            path.display()
1382                        );
1383                    }
1384                    continue;
1385                }
1386            };
1387            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1388            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1389                continue;
1390            }
1391            if let Some(task) = self.task(task_id) {
1392                let matches_project = task
1393                    .state
1394                    .lock()
1395                    .map(|state| {
1396                        state
1397                            .metadata
1398                            .project_root
1399                            .as_deref()
1400                            .map(canonicalized_path)
1401                            .as_deref()
1402                            == Some(canonical_project.as_path())
1403                    })
1404                    .unwrap_or(false);
1405                return matches_project.then_some(task);
1406            }
1407            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1408            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1409                return None;
1410            }
1411            return self.task(task_id);
1412        }
1413        None
1414    }
1415
1416    fn lookup_relaxed_task_from_db(
1417        &self,
1418        task_id: &str,
1419        project_root: &Path,
1420    ) -> Option<Result<Option<PersistedTask>, String>> {
1421        let pool = self
1422            .inner
1423            .db_pool
1424            .read()
1425            .ok()
1426            .and_then(|slot| slot.clone())?;
1427        let harness = self
1428            .inner
1429            .db_harness
1430            .read()
1431            .ok()
1432            .and_then(|slot| slot.clone())?;
1433        let conn = match pool.lock() {
1434            Ok(conn) => conn,
1435            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1436        };
1437        let project_key = crate::path_identity::project_scope_key(project_root);
1438        Some(
1439            crate::db::bash_tasks::find_bash_task_for_project(
1440                &conn,
1441                &harness,
1442                &project_key,
1443                task_id,
1444            )
1445            .map(|row| row.map(PersistedTask::from))
1446            .map_err(|error| error.to_string()),
1447        )
1448    }
1449
1450    pub(super) fn status_relaxed(
1451        &self,
1452        task_id: &str,
1453        _session_id: &str,
1454        project_root: &Path,
1455        storage_dir: &Path,
1456        preview_bytes: usize,
1457    ) -> Option<BgTaskSnapshot> {
1458        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1459        let _ = self.poll_task(&task);
1460        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1461    }
1462
1463    pub fn kill_relaxed(
1464        &self,
1465        task_id: &str,
1466        project_root: &Path,
1467        storage_dir: &Path,
1468    ) -> Result<BgTaskSnapshot, String> {
1469        let task = self
1470            .status_relaxed_task(task_id, project_root, storage_dir)
1471            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1472        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1473    }
1474
1475    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1476        #[cfg(test)]
1477        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1478
1479        let mut deleted = 0usize;
1480
1481        let root = storage_dir.join("bash-tasks");
1482        if root.exists() {
1483            let session_dirs = fs::read_dir(&root).map_err(|e| {
1484                format!(
1485                    "failed to read background task root {}: {e}",
1486                    root.display()
1487                )
1488            })?;
1489            for session_entry in session_dirs.flatten() {
1490                let session_dir = session_entry.path();
1491                if !session_dir.is_dir() {
1492                    continue;
1493                }
1494                let task_entries = match fs::read_dir(&session_dir) {
1495                    Ok(entries) => entries,
1496                    Err(error) => {
1497                        crate::slog_warn!(
1498                            "failed to read background task session dir {}: {error}",
1499                            session_dir.display()
1500                        );
1501                        continue;
1502                    }
1503                };
1504                for task_entry in task_entries.flatten() {
1505                    let json_path = task_entry.path();
1506                    if json_path
1507                        .extension()
1508                        .and_then(|extension| extension.to_str())
1509                        != Some("json")
1510                    {
1511                        continue;
1512                    }
1513                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
1514                        continue;
1515                    }
1516                    let metadata = match read_task(&json_path) {
1517                        Ok(metadata) => metadata,
1518                        Err(error) => {
1519                            crate::slog_warn!(
1520                                "quarantining corrupt background task metadata {}: {error}",
1521                                json_path.display()
1522                            );
1523                            quarantine_task_json(
1524                                storage_dir,
1525                                &session_dir,
1526                                &json_path,
1527                                QuarantineKind::Corrupt,
1528                            )?;
1529                            continue;
1530                        }
1531                    };
1532                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1533                        continue;
1534                    }
1535                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1536                    match delete_task_bundle(&paths) {
1537                        Ok(()) => {
1538                            deleted += 1;
1539                            log::debug!(
1540                                "deleted persisted background task bundle {}",
1541                                metadata.task_id
1542                            );
1543                        }
1544                        Err(error) => {
1545                            crate::slog_warn!(
1546                                "failed to delete background task bundle {}: {error}",
1547                                metadata.task_id
1548                            );
1549                            continue;
1550                        }
1551                    }
1552                }
1553            }
1554        }
1555        gc_quarantine(storage_dir);
1556        Ok(deleted)
1557    }
1558
1559    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1560        let tasks = self
1561            .inner
1562            .tasks
1563            .lock()
1564            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1565            .unwrap_or_default();
1566        tasks
1567            .into_iter()
1568            .map(|task| {
1569                let _ = self.poll_task(&task);
1570                self.snapshot_with_terminal_cache(&task, preview_bytes)
1571            })
1572            .collect()
1573    }
1574
1575    /// Replace terminal pipe snapshots with the task's cached rendered output.
1576    /// Running tasks stay raw (tail-only) so agents debugging a live process see
1577    /// exactly what it emitted. PTY tasks are explicitly excluded: their raw
1578    /// terminal bytes are rendered by the plugin's PTY path, not the line
1579    /// compressor.
1580    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1581        if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1582            return;
1583        }
1584        if let Some(cache) = self.ensure_terminal_output_cache(task) {
1585            snapshot.output_preview = cache.output_preview;
1586            snapshot.output_truncated = cache.output_truncated;
1587        }
1588    }
1589
1590    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1591        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1592    }
1593
1594    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1595        let task = self
1596            .task_for_session(task_id, session_id)
1597            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1598        let terminal_after_promote = {
1599            let mut state = task
1600                .state
1601                .lock()
1602                .map_err(|_| "background task lock poisoned".to_string())?;
1603            let updated = self
1604                .update_task_metadata(&task.paths, |metadata| {
1605                    metadata.notify_on_completion = true;
1606                    metadata.completion_delivered = false;
1607                })
1608                .map_err(|e| format!("failed to promote background task: {e}"))?;
1609            state.metadata = updated;
1610            state.metadata.status.is_terminal()
1611        };
1612        if terminal_after_promote {
1613            self.post_terminal_transition(&task, true)?;
1614        }
1615        Ok(true)
1616    }
1617
1618    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1619        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1620            .map(|_| ())
1621    }
1622
1623    pub fn cleanup_finished(&self, older_than: Duration) {
1624        let cutoff = Instant::now().checked_sub(older_than);
1625        let removable_paths: Vec<(String, TaskPaths)> =
1626            if let Ok(mut tasks) = self.inner.tasks.lock() {
1627                let removable = tasks
1628                    .iter()
1629                    .filter_map(|(task_id, task)| {
1630                        let delivered_terminal = task
1631                            .state
1632                            .lock()
1633                            .map(|state| {
1634                                state.metadata.status.is_terminal()
1635                                    && state.metadata.completion_delivered
1636                            })
1637                            .unwrap_or(false);
1638                        if !delivered_terminal {
1639                            return None;
1640                        }
1641
1642                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1643                        let expired = match (terminal_at, cutoff) {
1644                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1645                            (Some(_), None) => true,
1646                            (None, _) => false,
1647                        };
1648                        expired.then(|| task_id.clone())
1649                    })
1650                    .collect::<Vec<_>>();
1651
1652                removable
1653                    .into_iter()
1654                    .filter_map(|task_id| {
1655                        tasks
1656                            .remove(&task_id)
1657                            .map(|task| (task_id, task.paths.clone()))
1658                    })
1659                    .collect()
1660            } else {
1661                Vec::new()
1662            };
1663
1664        for (task_id, paths) in removable_paths {
1665            match delete_task_bundle(&paths) {
1666                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1667                Err(error) => crate::slog_warn!(
1668                    "failed to delete persisted background task bundle {task_id}: {error}"
1669                ),
1670            }
1671        }
1672    }
1673
1674    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1675        self.drain_completions_for_session(None)
1676    }
1677
1678    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1679        let completions = match self.inner.completions.lock() {
1680            Ok(completions) => completions,
1681            Err(_) => return Vec::new(),
1682        };
1683
1684        completions
1685            .iter()
1686            .filter(|completion| completion_matches_session(completion, session_id))
1687            .cloned()
1688            .collect()
1689    }
1690
1691    pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1692        match self.inner.completions.lock() {
1693            Ok(completions) => completions
1694                .iter()
1695                .any(|completion| completion_matches_session(completion, session_id)),
1696            // Bias to safety: if the queue state cannot be inspected cheaply,
1697            // let callers take the existing drain path rather than risk
1698            // suppressing a pending completion.
1699            Err(_) => true,
1700        }
1701    }
1702
1703    pub fn ack_completions_for_session(
1704        &self,
1705        session_id: Option<&str>,
1706        task_ids: &[String],
1707    ) -> Vec<String> {
1708        if task_ids.is_empty() {
1709            return Vec::new();
1710        }
1711        let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1712        let mut completion_sessions = HashMap::new();
1713        if let Ok(mut completions) = self.inner.completions.lock() {
1714            completions.retain(|completion| {
1715                let session_matches = session_id
1716                    .map(|session_id| completion.session_id == session_id)
1717                    .unwrap_or(true);
1718                if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1719                    completion_sessions
1720                        .insert(completion.task_id.clone(), completion.session_id.clone());
1721                    false
1722                } else {
1723                    true
1724                }
1725            });
1726        }
1727
1728        let mut delivered = Vec::new();
1729        for task_id in task_ids {
1730            let task = if let Some(session_id) = session_id {
1731                self.task_for_session(task_id, session_id)
1732            } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1733                self.task_for_session(task_id, completion_session_id)
1734            } else {
1735                self.task(task_id)
1736            };
1737            if let Some(task) = task {
1738                if task.set_completion_delivered(true, self).is_ok() {
1739                    delivered.push(task_id.clone());
1740                }
1741            }
1742        }
1743
1744        delivered
1745    }
1746
1747    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1748        self.inner
1749            .completions
1750            .lock()
1751            .map(|completions| {
1752                completions
1753                    .iter()
1754                    .filter(|completion| completion.session_id == session_id)
1755                    .cloned()
1756                    .collect()
1757            })
1758            .unwrap_or_default()
1759    }
1760
1761    fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1762        let mut completions = self.inner.completions.lock().ok()?;
1763        let idx = completions
1764            .iter()
1765            .position(|completion| completion.task_id == task_id)?;
1766        completions.remove(idx)
1767    }
1768
1769    fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1770        let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1771        if !snapshot.info.status.is_terminal() {
1772            return None;
1773        }
1774        let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1775            (String::new(), false)
1776        } else {
1777            self.ensure_terminal_output_cache(task)
1778                .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1779                .unwrap_or_else(|| (String::new(), false))
1780        };
1781        Some(BgCompletion {
1782            task_id: snapshot.info.task_id,
1783            session_id: task.session_id.clone(),
1784            status: snapshot.info.status,
1785            exit_code: snapshot.exit_code,
1786            command: snapshot.info.command,
1787            output_preview,
1788            output_truncated,
1789            original_tokens: None,
1790            compressed_tokens: None,
1791            tokens_skipped: false,
1792        })
1793    }
1794
1795    pub fn detach(&self) {
1796        self.inner.shutdown.store(true, Ordering::SeqCst);
1797        if let Ok(mut tasks) = self.inner.tasks.lock() {
1798            for task in tasks.values() {
1799                if let Ok(mut state) = task.state.lock() {
1800                    match &mut state.runtime {
1801                        TaskRuntime::Piped(child) => *child = None,
1802                        TaskRuntime::Pty(runtime) => *runtime = None,
1803                    }
1804                    state.detached = true;
1805                }
1806            }
1807            tasks.clear();
1808        }
1809    }
1810
1811    pub fn shutdown(&self) {
1812        let tasks = self
1813            .inner
1814            .tasks
1815            .lock()
1816            .map(|tasks| {
1817                tasks
1818                    .values()
1819                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1820                    .collect::<Vec<_>>()
1821            })
1822            .unwrap_or_default();
1823        for (task_id, session_id) in tasks {
1824            let _ = self.kill(&task_id, &session_id);
1825        }
1826    }
1827
1828    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1829        if let Ok(state) = task.state.lock() {
1830            if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1831                // On Windows ConPTY, the reader may not observe EOF while the
1832                // master handle is still held in `PtyRuntime`. The waiter writes
1833                // the authoritative exit marker before setting `exit_observed`,
1834                // so once exit is observed we can finalize from that marker and
1835                // drop the runtime, which lets the reader finish. Waiting for
1836                // `reader_done && exit_observed` wedges completed PTY tasks on
1837                // Windows.
1838                if !pty.exit_observed.load(Ordering::SeqCst) {
1839                    return Ok(());
1840                }
1841            }
1842        }
1843        let marker = match read_exit_marker(&task.paths.exit) {
1844            Ok(Some(marker)) => marker,
1845            Ok(None) => return Ok(()),
1846            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1847        };
1848        self.finalize_from_marker(task, marker, None)
1849    }
1850
1851    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1852        let mut needs_completion = false;
1853        {
1854            let Ok(mut state) = task.state.lock() else {
1855                return;
1856            };
1857            match &mut state.runtime {
1858                TaskRuntime::Piped(child_slot) => {
1859                    if let Some(child) = child_slot.as_mut() {
1860                        if matches!(child.try_wait(), Ok(Some(_))) {
1861                            *child_slot = None;
1862                            state.detached = true;
1863                            state.child_exit_observed = true;
1864                        }
1865                    } else if state.detached {
1866                        let child_known_dead = state.child_exit_observed
1867                            || state
1868                                .metadata
1869                                .child_pid
1870                                .is_some_and(|pid| !is_process_alive(pid));
1871                        if child_known_dead {
1872                            needs_completion =
1873                                self.fail_without_exit_marker_if_needed(task, &mut state);
1874                        }
1875                    }
1876                }
1877                TaskRuntime::Pty(Some(pty)) => {
1878                    if pty.exit_observed.load(Ordering::SeqCst) {
1879                        drop(state);
1880                        let _ = self.poll_task(task);
1881                        return;
1882                    }
1883                }
1884                TaskRuntime::Pty(None) => {}
1885            }
1886        }
1887        if needs_completion {
1888            let _ = self.post_terminal_transition(task, true);
1889        }
1890    }
1891
1892    fn fail_without_exit_marker_if_needed(
1893        &self,
1894        task: &Arc<BgTask>,
1895        state: &mut BgTaskState,
1896    ) -> bool {
1897        if state.metadata.status.is_terminal() {
1898            return false;
1899        }
1900        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1901            return false;
1902        }
1903        let watch_controlled = self.task_has_watch_control(&task.task_id);
1904        let updated = self.update_task_metadata(&task.paths, |metadata| {
1905            metadata.mark_terminal(
1906                BgTaskStatus::Failed,
1907                None,
1908                Some("process exited without exit marker".to_string()),
1909            );
1910            if watch_controlled {
1911                metadata.completion_delivered = true;
1912            }
1913        });
1914        if let Ok(metadata) = updated {
1915            state.pending_terminal_override = None;
1916            state.metadata = metadata;
1917            task.mark_terminal_now();
1918            return true;
1919        }
1920        false
1921    }
1922
1923    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1924        self.inner
1925            .tasks
1926            .lock()
1927            .map(|tasks| {
1928                tasks
1929                    .values()
1930                    .filter(|task| task.is_running())
1931                    .cloned()
1932                    .collect()
1933            })
1934            .unwrap_or_default()
1935    }
1936
1937    fn insert_rehydrated_task(
1938        &self,
1939        metadata: PersistedTask,
1940        paths: TaskPaths,
1941        detached: bool,
1942    ) -> Result<(), String> {
1943        let task_id = metadata.task_id.clone();
1944        let session_id = metadata.session_id.clone();
1945        let started = started_instant_from_unix_millis(metadata.started_at);
1946        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1947        let mode = metadata.mode.clone();
1948        let task = Arc::new(BgTask {
1949            task_id: task_id.clone(),
1950            session_id,
1951            paths: paths.clone(),
1952            started,
1953            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1954            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1955            state: Mutex::new(BgTaskState {
1956                metadata,
1957                runtime: if mode == BgMode::Pty {
1958                    TaskRuntime::Pty(None)
1959                } else {
1960                    TaskRuntime::Piped(None)
1961                },
1962                detached,
1963                // Replay path: we never observed the child handle's exit
1964                // in this process (the previous AFT process did, but its
1965                // observation didn't survive restart). Leave this false so
1966                // the second-pass reap falls through to the
1967                // `is_process_alive(child_pid)` probe rather than declaring
1968                // failure based on stale evidence.
1969                child_exit_observed: false,
1970                buffer: if mode == BgMode::Pty {
1971                    BgBuffer::pty(paths.pty.clone())
1972                } else {
1973                    BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1974                },
1975                terminal_output_cache: None,
1976                pending_terminal_override: None,
1977            }),
1978        });
1979        self.inner
1980            .tasks
1981            .lock()
1982            .map_err(|_| "background task registry lock poisoned".to_string())?
1983            .insert(task_id, task);
1984        Ok(())
1985    }
1986
1987    fn kill_with_status(
1988        &self,
1989        task_id: &str,
1990        session_id: &str,
1991        terminal_status: BgTaskStatus,
1992    ) -> Result<BgTaskSnapshot, String> {
1993        let task = self
1994            .task_for_session(task_id, session_id)
1995            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1996        let mut terminalized = false;
1997
1998        {
1999            let mut state = task
2000                .state
2001                .lock()
2002                .map_err(|_| "background task lock poisoned".to_string())?;
2003            if state.metadata.status.is_terminal() {
2004                state.pending_terminal_override = None;
2005            } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
2006                state.metadata =
2007                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
2008                if self.task_has_watch_control(&task.task_id) {
2009                    state.metadata.completion_delivered = true;
2010                }
2011                state.pending_terminal_override = None;
2012                task.mark_terminal_now();
2013                match &mut state.runtime {
2014                    // Exit marker already present: the child finished on its
2015                    // own before this kill observed it. Reap it rather than
2016                    // dropping the handle so it doesn't become a zombie
2017                    // (issue #91). The active-kill branch below already
2018                    // `wait()`s after signaling, so this is the only kill
2019                    // path that needed the explicit reap.
2020                    TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2021                    TaskRuntime::Pty(runtime) => *runtime = None,
2022                }
2023                state.detached = true;
2024                self.persist_task(&task.paths, &state.metadata)
2025                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2026                terminalized = true;
2027            } else {
2028                let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2029                if !was_already_killing {
2030                    state.metadata.status = BgTaskStatus::Killing;
2031                    self.persist_task(&task.paths, &state.metadata)
2032                        .map_err(|e| format!("failed to persist killing state: {e}"))?;
2033                }
2034
2035                #[cfg(unix)]
2036                let pgid = state.metadata.pgid;
2037                #[cfg(windows)]
2038                let child_pid = state.metadata.child_pid;
2039                if !was_already_killing
2040                    && state.metadata.mode == BgMode::Pty
2041                    && terminal_status == BgTaskStatus::TimedOut
2042                {
2043                    state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2044                }
2045
2046                #[cfg(windows)]
2047                let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2048
2049                match &mut state.runtime {
2050                    TaskRuntime::Piped(child_slot) => {
2051                        #[cfg(unix)]
2052                        if let Some(pgid) = pgid {
2053                            terminate_pgid(pgid, child_slot.as_mut());
2054                        }
2055                        #[cfg(windows)]
2056                        if let Some(child) = child_slot.as_mut() {
2057                            super::process::terminate_process(child);
2058                        } else if let Some(pid) = child_pid {
2059                            terminate_pid(pid);
2060                        }
2061                        if let Some(child) = child_slot.as_mut() {
2062                            let _ = child.wait();
2063                        }
2064                        *child_slot = None;
2065                        state.detached = true;
2066
2067                        if !task.paths.exit.exists() {
2068                            write_kill_marker_if_absent(&task.paths.exit)
2069                                .map_err(|e| format!("failed to write kill marker: {e}"))?;
2070                        }
2071
2072                        let exit_code = terminal_exit_code_for_status(&terminal_status);
2073                        state
2074                            .metadata
2075                            .mark_terminal(terminal_status, exit_code, None);
2076                        if self.task_has_watch_control(&task.task_id) {
2077                            state.metadata.completion_delivered = true;
2078                        }
2079                        state.pending_terminal_override = None;
2080                        task.mark_terminal_now();
2081                        self.persist_task(&task.paths, &state.metadata)
2082                            .map_err(|e| format!("failed to persist killed state: {e}"))?;
2083                        terminalized = true;
2084                    }
2085                    TaskRuntime::Pty(Some(pty)) => {
2086                        pty.was_killed.store(true, Ordering::SeqCst);
2087                        if let Err(error) = pty.killer.kill() {
2088                            crate::slog_warn!(
2089                                "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2090                            );
2091                        }
2092                        if let Some(pid) = pty.child_pid {
2093                            #[cfg(unix)]
2094                            terminate_pgid(pid as i32, None);
2095                            #[cfg(windows)]
2096                            terminate_pid(pid);
2097                        }
2098                        drop(pty.master.take());
2099
2100                        #[cfg(windows)]
2101                        {
2102                            let default_status = if terminal_status == BgTaskStatus::TimedOut {
2103                                BgTaskStatus::TimedOut
2104                            } else {
2105                                BgTaskStatus::Killed
2106                            };
2107                            pty_forced_terminal_status = Some(
2108                                state
2109                                    .pending_terminal_override
2110                                    .take()
2111                                    .unwrap_or(default_status),
2112                            );
2113                        }
2114                    }
2115                    TaskRuntime::Pty(None) => {}
2116                }
2117
2118                #[cfg(windows)]
2119                if let Some(target_status) = pty_forced_terminal_status {
2120                    if !task.paths.exit.exists() {
2121                        write_kill_marker_if_absent(&task.paths.exit)
2122                            .map_err(|e| format!("failed to write kill marker: {e}"))?;
2123                    }
2124
2125                    let exit_code = terminal_exit_code_for_status(&target_status);
2126                    state.metadata.mark_terminal(target_status, exit_code, None);
2127                    if self.task_has_watch_control(&task.task_id) {
2128                        state.metadata.completion_delivered = true;
2129                    }
2130                    state.pending_terminal_override = None;
2131                    task.mark_terminal_now();
2132                    if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2133                        *runtime = None;
2134                    }
2135                    state.detached = true;
2136                    self.persist_task(&task.paths, &state.metadata)
2137                        .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2138                    terminalized = true;
2139                }
2140            }
2141        }
2142
2143        if terminalized {
2144            self.post_terminal_transition(&task, true)?;
2145        }
2146        Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2147    }
2148
2149    fn finalize_from_marker(
2150        &self,
2151        task: &Arc<BgTask>,
2152        marker: ExitMarker,
2153        reason: Option<String>,
2154    ) -> Result<(), String> {
2155        let watch_controlled = self.task_has_watch_control(&task.task_id);
2156        let mut pty_reader_done = None;
2157        {
2158            let mut state = task
2159                .state
2160                .lock()
2161                .map_err(|_| "background task lock poisoned".to_string())?;
2162            if state.metadata.status.is_terminal() {
2163                state.pending_terminal_override = None;
2164                return Ok(());
2165            }
2166
2167            let pending_override = state.pending_terminal_override.take();
2168            let is_pty = state.metadata.mode == BgMode::Pty;
2169            let updated = self
2170                .update_task_metadata(&task.paths, |metadata| {
2171                    let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2172                        let mut metadata = metadata.clone();
2173                        let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2174                        let exit_code = terminal_exit_code_for_status(&target_status);
2175                        metadata.mark_terminal(target_status, exit_code, reason);
2176                        metadata
2177                    } else {
2178                        terminal_metadata_from_marker(metadata.clone(), marker, reason)
2179                    };
2180                    if watch_controlled {
2181                        new_metadata.completion_delivered = true;
2182                    }
2183                    *metadata = new_metadata;
2184                })
2185                .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2186            state.metadata = updated;
2187            task.mark_terminal_now();
2188            match &mut state.runtime {
2189                // Reap the exited direct child instead of dropping it, so it
2190                // does not linger as a `<defunct>` zombie (issue #91). The
2191                // wrapper writes the exit marker as its final act, so the
2192                // child is already exiting and `wait()` returns immediately.
2193                TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2194                TaskRuntime::Pty(runtime) => {
2195                    pty_reader_done = runtime
2196                        .as_ref()
2197                        .map(|runtime| Arc::clone(&runtime.reader_done));
2198                    *runtime = None;
2199                }
2200            }
2201            state.detached = true;
2202        }
2203
2204        if let Some(reader_done) = pty_reader_done {
2205            let deadline = Instant::now() + Duration::from_millis(200);
2206            while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2207                std::thread::sleep(Duration::from_millis(10));
2208            }
2209        }
2210
2211        // One final scan runs before terminal notification routing so bytes
2212        // printed immediately before exit can win over the exit safety net.
2213        self.scan_task_watch_output(task);
2214
2215        self.post_terminal_transition(task, true)
2216    }
2217
2218    fn enqueue_completion_if_needed(
2219        &self,
2220        metadata: &PersistedTask,
2221        paths: Option<&TaskPaths>,
2222        emit_frame: bool,
2223    ) {
2224        if metadata.status.is_terminal() && !metadata.completion_delivered {
2225            let cache =
2226                paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2227            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2228        }
2229    }
2230
2231    fn render_terminal_output_from_paths(
2232        &self,
2233        metadata: &PersistedTask,
2234        paths: &TaskPaths,
2235    ) -> Option<TerminalOutputCache> {
2236        if metadata.mode == BgMode::Pty {
2237            return None;
2238        }
2239        let mut buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2240        let disk_truncation = buffer.enforce_terminal_cap();
2241        Some(self.render_terminal_output(metadata, &buffer, disk_truncation))
2242    }
2243
2244    fn enqueue_completion_from_parts(
2245        &self,
2246        metadata: &PersistedTask,
2247        buffer: Option<&BgBuffer>,
2248        paths: Option<&TaskPaths>,
2249        emit_frame: bool,
2250        terminal_render: Option<&TerminalOutputCache>,
2251    ) {
2252        // Only the terminal-state guard prevents double-recording here. The
2253        // `completion_delivered` flag is NOT used to gate compression-event
2254        // recording, because `mark_terminal` flips `completion_delivered=true`
2255        // immediately for tasks with `notify_on_completion=false` (foreground
2256        // bash polled via `bash_status`, which is the common case). Pre-emptive
2257        // delivery flagging is correct for the push-frame queue (suppresses
2258        // duplicate user-visible notifications) but would silently skip the
2259        // database insert below. Compression event recording is idempotent at
2260        // the DB layer (unique on harness+session+task_id), so re-entry is
2261        // safe; the dedupe-by-queue check stays for the push frame side.
2262        if !metadata.status.is_terminal() {
2263            return;
2264        }
2265
2266        let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2267            paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2268        } else {
2269            None
2270        };
2271        let render_buffer = buffer.or(owned_buffer.as_ref());
2272        let owned_render = if terminal_render.is_none() {
2273            render_buffer.map(|buffer| {
2274                let mut capped_buffer = buffer.clone();
2275                let disk_truncation = capped_buffer.enforce_terminal_cap();
2276                self.render_terminal_output(metadata, &capped_buffer, disk_truncation)
2277            })
2278        } else {
2279            None
2280        };
2281        let render = terminal_render.or(owned_render.as_ref());
2282
2283        // Completion reminders use the already-rendered terminal output and a
2284        // smaller, exit-aware head+tail cap. They never invoke the compressor
2285        // themselves.
2286        let (output_preview, output_truncated) = render
2287            .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2288            .unwrap_or_else(|| (String::new(), false));
2289
2290        let token_counts = self.completion_token_counts(
2291            metadata,
2292            buffer,
2293            paths,
2294            render.map(|render| render.output_preview.as_str()),
2295        );
2296        let completion = BgCompletion {
2297            task_id: metadata.task_id.clone(),
2298            session_id: metadata.session_id.clone(),
2299            status: metadata.status.clone(),
2300            exit_code: metadata.exit_code,
2301            command: metadata.command.clone(),
2302            output_preview,
2303            output_truncated,
2304            original_tokens: token_counts.original_tokens,
2305            compressed_tokens: token_counts.compressed_tokens,
2306            tokens_skipped: token_counts.tokens_skipped,
2307        };
2308
2309        // Record the compression event BEFORE the push-frame dedupe. Event
2310        // recording has its own idempotency at the DB layer (unique key on
2311        // harness+session+task_id), so it's safe to attempt for every
2312        // terminal-state finalize. Critically, this path runs even when
2313        // `completion_delivered=true` was pre-set by `mark_terminal` for
2314        // foreground bash (`notify_on_completion=false`) — which is the common
2315        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
2316        // after the dedupe guard and never fired for foreground tasks, which
2317        // meant compression accounting was effectively dead for >99% of
2318        // real-world bash usage.
2319        self.record_compression_event_if_applicable(metadata, &token_counts);
2320
2321        let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2322        if watch_controlled {
2323            if emit_frame && !watch_matched {
2324                self.emit_bash_watch_exit(&completion);
2325            }
2326            self.clear_task_watch_state(&metadata.task_id);
2327            return;
2328        }
2329
2330        // Push-frame queue is gated on `completion_delivered` so foreground
2331        // bash with `notify_on_completion=false` does not leak a user-visible
2332        // completion notification. `mark_terminal` pre-sets
2333        // `completion_delivered=true` for those tasks; honoring it here keeps
2334        // the suppression invariant the test
2335        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
2336        // asserts. The compression-event recording above intentionally runs
2337        // before this gate so foreground bash still contributes to the
2338        // session/project aggregates.
2339        if metadata.completion_delivered {
2340            return;
2341        }
2342
2343        // Push-frame queue dedupe stays per-task to prevent duplicate
2344        // user-visible completion notifications.
2345        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2346            if completions
2347                .iter()
2348                .any(|existing| existing.task_id == metadata.task_id)
2349            {
2350                false
2351            } else {
2352                completions.push_back(completion.clone());
2353                true
2354            }
2355        } else {
2356            false
2357        };
2358
2359        if pushed && emit_frame {
2360            self.emit_bash_completed(completion);
2361        }
2362    }
2363
2364    fn record_compression_event_if_applicable(
2365        &self,
2366        metadata: &PersistedTask,
2367        token_counts: &CompletionTokenCounts,
2368    ) {
2369        if metadata.mode == BgMode::Pty {
2370            return;
2371        }
2372
2373        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2374            token_counts.original_tokens,
2375            token_counts.compressed_tokens,
2376            token_counts.original_bytes,
2377            token_counts.compressed_bytes,
2378        ) {
2379            (
2380                Some(original_tokens),
2381                Some(compressed_tokens),
2382                Some(original_bytes),
2383                Some(compressed_bytes),
2384            ) => (
2385                original_tokens,
2386                compressed_tokens,
2387                original_bytes,
2388                compressed_bytes,
2389            ),
2390            _ => {
2391                crate::slog_warn!(
2392                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2393                    metadata.task_id
2394                );
2395                return;
2396            }
2397        };
2398
2399        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2400        let Some(pool) = pool else {
2401            crate::slog_warn!(
2402                "compression event skipped for {}: db_pool not initialized — was configure run?",
2403                metadata.task_id
2404            );
2405            return;
2406        };
2407        let harness = self
2408            .inner
2409            .db_harness
2410            .read()
2411            .ok()
2412            .and_then(|slot| slot.clone());
2413        let Some(harness) = harness else {
2414            crate::slog_warn!(
2415                "compression event insert skipped for {}: harness not configured",
2416                metadata.task_id
2417            );
2418            return;
2419        };
2420
2421        let project_root = metadata
2422            .project_root
2423            .as_deref()
2424            .unwrap_or(&metadata.workdir);
2425        let project_key = crate::path_identity::project_scope_key(project_root);
2426        let row = crate::db::compression_events::CompressionEventRow {
2427            harness: &harness,
2428            session_id: Some(&metadata.session_id),
2429            project_key: &project_key,
2430            tool: "bash",
2431            task_id: Some(&metadata.task_id),
2432            command: Some(&metadata.command),
2433            compressor: if metadata.compressed {
2434                "registry"
2435            } else {
2436                "none"
2437            },
2438            original_bytes,
2439            compressed_bytes,
2440            original_tokens,
2441            compressed_tokens,
2442            created_at: unix_millis() as i64,
2443        };
2444
2445        let conn = match pool.lock() {
2446            Ok(conn) => conn,
2447            Err(_) => {
2448                crate::slog_warn!(
2449                    "compression event insert failed for {}: db mutex poisoned",
2450                    metadata.task_id
2451                );
2452                return;
2453            }
2454        };
2455        match crate::db::compression_events::insert_compression_event(&conn, &row) {
2456            Ok(_) => {
2457                // DEBUG-level: each foreground bash call records one of these,
2458                // which clutters info-level logs without adding diagnostic value.
2459                // Aggregate totals are visible via the status RPC / TUI sidebar.
2460                crate::slog_debug!(
2461                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2462                    metadata.task_id,
2463                    project_key,
2464                    metadata.session_id,
2465                    original_tokens,
2466                    compressed_tokens
2467                );
2468            }
2469            Err(error) => {
2470                crate::slog_warn!(
2471                    "compression event insert failed for {}: {}",
2472                    metadata.task_id,
2473                    error
2474                );
2475            }
2476        }
2477    }
2478
2479    fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2480        let Ok(progress_sender) = self
2481            .inner
2482            .progress_sender
2483            .lock()
2484            .map(|sender| sender.clone())
2485        else {
2486            return;
2487        };
2488        if let Some(sender) = progress_sender.as_ref() {
2489            sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2490                pattern_match.task_id,
2491                session_id.to_string(),
2492                pattern_match.watch_id,
2493                pattern_match.match_text,
2494                pattern_match.match_offset,
2495                pattern_match.context,
2496                pattern_match.once,
2497            )));
2498        }
2499    }
2500
2501    fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2502        let Ok(progress_sender) = self
2503            .inner
2504            .progress_sender
2505            .lock()
2506            .map(|sender| sender.clone())
2507        else {
2508            return;
2509        };
2510        let Some(sender) = progress_sender.as_ref() else {
2511            return;
2512        };
2513        let status = completion_status_text(&completion.status, completion.exit_code);
2514        let preview = completion.output_preview.trim_end();
2515        let context = if preview.is_empty() {
2516            format!("task {} exited ({status})", completion.task_id)
2517        } else {
2518            format!(
2519                "task {} exited ({status})
2520{preview}",
2521                completion.task_id
2522            )
2523        };
2524        sender(PushFrame::BashPatternMatch(
2525            BashPatternMatchFrame::task_exit(
2526                completion.task_id.clone(),
2527                completion.session_id.clone(),
2528                format!("exited ({status})"),
2529                context,
2530            ),
2531        ));
2532    }
2533
2534    fn emit_bash_completed(&self, completion: BgCompletion) {
2535        let Ok(progress_sender) = self
2536            .inner
2537            .progress_sender
2538            .lock()
2539            .map(|sender| sender.clone())
2540        else {
2541            return;
2542        };
2543        let Some(sender) = progress_sender.as_ref() else {
2544            return;
2545        };
2546        // Clone the callback out of the registry mutex before writing to stdout;
2547        // otherwise a blocked push-frame write could pin the mutex and starve
2548        // unrelated progress-sender updates.
2549        // Bg task transitions are discovered by the watchdog thread, so the
2550        // sender is shared behind a Mutex. It still uses the same stdout writer
2551        // closure as foreground progress frames, preserving the existing lock/
2552        // flush behavior in main.rs.
2553        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2554            completion.task_id,
2555            completion.session_id,
2556            completion.status,
2557            completion.exit_code,
2558            completion.command,
2559            completion.output_preview,
2560            completion.output_truncated,
2561            completion.original_tokens,
2562            completion.compressed_tokens,
2563            completion.tokens_skipped,
2564        )));
2565    }
2566
2567    fn completion_token_counts(
2568        &self,
2569        metadata: &PersistedTask,
2570        buffer: Option<&BgBuffer>,
2571        paths: Option<&TaskPaths>,
2572        rendered_output: Option<&str>,
2573    ) -> CompletionTokenCounts {
2574        if metadata.mode == BgMode::Pty {
2575            return CompletionTokenCounts::skipped();
2576        }
2577
2578        let raw = match buffer {
2579            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2580            None => paths
2581                .map(|paths| {
2582                    read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2583                })
2584                .unwrap_or(TokenCountInput::Skipped),
2585        };
2586
2587        let TokenCountInput::Text(raw_output) = raw else {
2588            return CompletionTokenCounts::skipped();
2589        };
2590
2591        let original_tokens = token_count_u32(&raw_output);
2592        let original_bytes = raw_output.len() as i64;
2593        let compressed_output = rendered_output.unwrap_or(&raw_output);
2594        let compressed_tokens = token_count_u32(compressed_output);
2595        let compressed_bytes = compressed_output.len() as i64;
2596        CompletionTokenCounts {
2597            original_tokens: Some(original_tokens),
2598            compressed_tokens: Some(compressed_tokens),
2599            original_bytes: Some(original_bytes),
2600            compressed_bytes: Some(compressed_bytes),
2601            tokens_skipped: false,
2602        }
2603    }
2604
2605    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2606        if !self
2607            .inner
2608            .long_running_reminder_enabled
2609            .load(Ordering::SeqCst)
2610        {
2611            return;
2612        }
2613        let interval_ms = self
2614            .inner
2615            .long_running_reminder_interval_ms
2616            .load(Ordering::SeqCst);
2617        if interval_ms == 0 {
2618            return;
2619        }
2620        let interval = Duration::from_millis(interval_ms);
2621        let now = Instant::now();
2622        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2623            return;
2624        };
2625        let since = last_reminder_at.unwrap_or(task.started);
2626        if now.duration_since(since) < interval {
2627            return;
2628        }
2629        let command = task
2630            .state
2631            .lock()
2632            .map(|state| state.metadata.command.clone())
2633            .unwrap_or_default();
2634        *last_reminder_at = Some(now);
2635        self.emit_bash_long_running(BashLongRunningFrame::new(
2636            task.task_id.clone(),
2637            task.session_id.clone(),
2638            command,
2639            task.started.elapsed().as_millis() as u64,
2640        ));
2641    }
2642
2643    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2644        let Ok(progress_sender) = self
2645            .inner
2646            .progress_sender
2647            .lock()
2648            .map(|sender| sender.clone())
2649        else {
2650            return;
2651        };
2652        if let Some(sender) = progress_sender.as_ref() {
2653            sender(PushFrame::BashLongRunning(frame));
2654        }
2655    }
2656
2657    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2658        self.inner
2659            .tasks
2660            .lock()
2661            .ok()
2662            .and_then(|tasks| tasks.get(task_id).cloned())
2663    }
2664
2665    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2666        self.task(task_id)
2667            .filter(|task| task.session_id == session_id)
2668    }
2669
2670    fn running_count(&self) -> usize {
2671        self.inner
2672            .tasks
2673            .lock()
2674            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2675            .unwrap_or(0)
2676    }
2677
2678    fn start_watchdog(&self) {
2679        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2680            super::watchdog::start(self.clone());
2681        }
2682    }
2683
2684    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2685        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2686    }
2687
2688    #[cfg(test)]
2689    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2690        self.task_for_session(task_id, session_id)
2691            .map(|task| task.paths.json.clone())
2692    }
2693
2694    #[cfg(test)]
2695    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2696        self.task_for_session(task_id, session_id)
2697            .map(|task| task.paths.exit.clone())
2698    }
2699
2700    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
2701    fn generate_unique_task_id(&self) -> Result<String, String> {
2702        for _ in 0..32 {
2703            let candidate = random_slug();
2704            let tasks = self
2705                .inner
2706                .tasks
2707                .lock()
2708                .map_err(|_| "background task registry lock poisoned".to_string())?;
2709            if tasks.contains_key(&candidate) {
2710                continue;
2711            }
2712            let completions = self
2713                .inner
2714                .completions
2715                .lock()
2716                .map_err(|_| "background completions lock poisoned".to_string())?;
2717            if completions
2718                .iter()
2719                .any(|completion| completion.task_id == candidate)
2720            {
2721                continue;
2722            }
2723            return Ok(candidate);
2724        }
2725        Err("failed to allocate unique background task id after 32 attempts".to_string())
2726    }
2727}
2728
2729fn render_compressed_with_recovery(
2730    buffer: &BgBuffer,
2731    mut compressed: CompressionResult,
2732    input_truncated: bool,
2733    disk_truncation: DiskTruncation,
2734) -> TerminalOutputCache {
2735    // Preserve a single canonical trailing newline. A bare `.trim_end()` strips
2736    // the legitimate final newline that `echo` and most commands emit, so
2737    // agent-facing output diverged from native bash ("hello" vs "hello\n") and
2738    // broke the no-JSON-envelope contract. Collapse excess trailing blank lines
2739    // to one, but keep that one when the content had a trailing newline. NOTE:
2740    // the check must read the ORIGINAL text — strip_plain_truncation_marker_lines
2741    // rebuilds via `.lines().join("\n")`, which itself drops the trailing newline.
2742    let had_trailing_newline = compressed.text.ends_with('\n');
2743    let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2744        .trim_end()
2745        .to_string();
2746    if had_trailing_newline && !text.is_empty() {
2747        text.push('\n');
2748    }
2749    compressed.text = text;
2750
2751    let output_path = buffer.output_path().map(|path| path.display().to_string());
2752    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2753    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2754    let mut recovery = RecoveryContext {
2755        dropped_by_class: compressed.dropped_by_class,
2756        had_inner_drop: compressed.had_inner_drop,
2757        offset_hint_eligible: compressed.offset_hint_eligible,
2758        offset_start_line: compressed.offset_start_line,
2759        byte_truncated: input_truncated,
2760        disk_truncated_prefix_bytes: disk_truncation.total_prefix_bytes(),
2761        output_path: output_path.clone(),
2762        stderr_path: stderr_path.clone(),
2763        include_stderr_path,
2764    };
2765
2766    let (output_preview, output_truncated) =
2767        render_body_with_recovery_marker(&compressed.text, &mut recovery);
2768    TerminalOutputCache {
2769        output_preview,
2770        output_truncated,
2771        kind: TerminalOutputKind::Compressed,
2772        output_path,
2773        stderr_path,
2774        recovery: Some(recovery),
2775    }
2776}
2777
2778fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2779    render_body_with_recovery_marker_at_cap(
2780        body,
2781        recovery,
2782        FINAL_OUTPUT_CAP_BYTES,
2783        cap_final_output,
2784        cap_final_output_with_marker,
2785    )
2786}
2787
2788fn render_raw_body_with_recovery_marker(
2789    body: &str,
2790    recovery: &mut RecoveryContext,
2791) -> (String, bool) {
2792    render_body_with_recovery_marker_at_cap(
2793        body,
2794        recovery,
2795        RAW_PASSTHROUGH_CAP_BYTES,
2796        |input| {
2797            super::output::cap_head_tail(
2798                input,
2799                RAW_PASSTHROUGH_CAP_BYTES,
2800                RAW_PASSTHROUGH_HEAD_BYTES,
2801                RAW_PASSTHROUGH_TAIL_BYTES,
2802            )
2803        },
2804        |input, marker| {
2805            super::output::cap_head_tail_with_marker(
2806                input,
2807                RAW_PASSTHROUGH_CAP_BYTES,
2808                RAW_PASSTHROUGH_HEAD_BYTES,
2809                RAW_PASSTHROUGH_TAIL_BYTES,
2810                marker,
2811            )
2812        },
2813    )
2814}
2815
2816fn render_body_with_recovery_marker_at_cap<F, G>(
2817    body: &str,
2818    recovery: &mut RecoveryContext,
2819    cap_bytes: usize,
2820    cap_plain: F,
2821    cap_with_marker: G,
2822) -> (String, bool)
2823where
2824    F: Fn(&str) -> super::output::CappedText,
2825    G: Fn(&str, &str) -> super::output::CappedText,
2826{
2827    let needs_marker = recovery.has_visible_drop();
2828    if body.len() > cap_bytes {
2829        recovery.byte_truncated = true;
2830        if let Some(marker) = recovery_marker(recovery) {
2831            let capped = cap_with_marker(body, &marker);
2832            return (capped.text, true);
2833        }
2834        let capped = cap_plain(body);
2835        return (capped.text, capped.truncated || needs_marker);
2836    }
2837
2838    if !needs_marker {
2839        return (body.to_string(), false);
2840    }
2841
2842    let Some(marker) = recovery_marker(recovery) else {
2843        return (body.to_string(), true);
2844    };
2845    let with_marker = append_recovery_marker(body, &marker);
2846    if with_marker.len() <= cap_bytes {
2847        return (with_marker, true);
2848    }
2849
2850    recovery.byte_truncated = true;
2851    let marker = recovery_marker(recovery).unwrap_or(marker);
2852    let capped = cap_with_marker(body, &marker);
2853    (capped.text, true)
2854}
2855
2856fn append_recovery_marker(body: &str, marker: &str) -> String {
2857    if body.is_empty() {
2858        return marker.to_string();
2859    }
2860    let mut output = body.trim_end().to_string();
2861    output.push('\n');
2862    output.push_str(marker);
2863    output
2864}
2865
2866fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2867    let mut parts = Vec::new();
2868    for (class, count) in &recovery.dropped_by_class {
2869        let label = if *count == 1 {
2870            class.singular()
2871        } else {
2872            class.plural()
2873        };
2874        parts.push(format!("+{count} more {label}"));
2875    }
2876    if recovery.byte_truncated {
2877        parts.push("truncated output".to_string());
2878    }
2879    let disk_truncated_prefix_bytes = recovery.disk_truncated_prefix_bytes;
2880    if disk_truncated_prefix_bytes > 0 {
2881        parts.push(format!(
2882            "truncated {disk_truncated_prefix_bytes} bytes from saved output prefix"
2883        ));
2884    } else if recovery.had_inner_drop && parts.is_empty() {
2885        parts.push("omitted output".to_string());
2886    }
2887
2888    if parts.is_empty() {
2889        return None;
2890    }
2891
2892    let hint = recovery_hint(recovery);
2893    Some(format!("[{}; {hint}]", parts.join(", ")))
2894}
2895
2896fn recovery_hint(recovery: &RecoveryContext) -> String {
2897    // AFT stores stdout/stderr separately and combines them in memory. Class caps,
2898    // middle truncation, and mixed stdout/stderr renders are not line-offset
2899    // portable. Only a single-file contiguous-prefix drop may use `tail -n +N`.
2900    if recovery.offset_hint_eligible
2901        && !recovery.byte_truncated
2902        && recovery.dropped_by_class.is_empty()
2903        && !recovery.include_stderr_path
2904    {
2905        if let (Some(path), Some(line)) =
2906            (recovery.output_path.as_deref(), recovery.offset_start_line)
2907        {
2908            return format!("see remaining: tail -n +{line} {}", quote_path(path));
2909        }
2910    }
2911
2912    let mut paths = Vec::new();
2913    if let Some(path) = recovery.output_path.as_deref() {
2914        paths.push(path);
2915    }
2916    if recovery.include_stderr_path {
2917        if let Some(path) = recovery.stderr_path.as_deref() {
2918            if !paths.contains(&path) {
2919                paths.push(path);
2920            }
2921        }
2922    }
2923
2924    if paths.is_empty() {
2925        return "full output unavailable".to_string();
2926    }
2927
2928    let reads = paths
2929        .into_iter()
2930        .map(|path| format!("read {}", quote_path(path)))
2931        .collect::<Vec<_>>()
2932        .join(" and ");
2933    if recovery.disk_truncated_prefix_bytes > 0 {
2934        format!("retained output: {reads}")
2935    } else {
2936        format!("full output: {reads}")
2937    }
2938}
2939
2940fn strip_plain_truncation_marker_lines(input: &str) -> String {
2941    input
2942        .lines()
2943        .filter(|line| !is_plain_truncation_marker(line.trim()))
2944        .collect::<Vec<_>>()
2945        .join("\n")
2946}
2947
2948fn strip_recovery_marker_lines(input: &str) -> String {
2949    input
2950        .lines()
2951        .filter(|line| !is_recovery_marker(line.trim()))
2952        .collect::<Vec<_>>()
2953        .join("\n")
2954}
2955
2956fn is_plain_truncation_marker(line: &str) -> bool {
2957    let Some(rest) = line.strip_prefix("...<truncated ") else {
2958        return false;
2959    };
2960    let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2961        return false;
2962    };
2963    !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2964}
2965
2966fn is_recovery_marker(line: &str) -> bool {
2967    line.starts_with('[')
2968        && line.ends_with(']')
2969        && (line.contains("full output: read ")
2970            || line.contains("retained output: read ")
2971            || line.contains("see remaining: tail -n +")
2972            || line.contains("full output unavailable"))
2973}
2974
2975fn render_structured_output(
2976    command: &str,
2977    buffer: &BgBuffer,
2978    disk_truncation: DiskTruncation,
2979) -> Option<TerminalOutputCache> {
2980    if !is_gh_structured_command(command) {
2981        return None;
2982    }
2983
2984    let output_path = buffer
2985        .output_path()
2986        .map(|path| path.display().to_string())?;
2987    let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2988    if stdout_bytes == 0 {
2989        return None;
2990    }
2991
2992    if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2993        if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2994            return None;
2995        }
2996        let output_preview = if disk_truncation.total_prefix_bytes() > 0 {
2997            retained_json_output_pointer(
2998                stdout_bytes,
2999                &output_path,
3000                disk_truncation.total_prefix_bytes(),
3001            )
3002        } else {
3003            json_output_pointer(stdout_bytes, &output_path)
3004        };
3005        return Some(TerminalOutputCache {
3006            output_preview,
3007            output_truncated: true,
3008            kind: TerminalOutputKind::Structured,
3009            output_path: Some(output_path),
3010            stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3011            recovery: None,
3012        });
3013    }
3014
3015    let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
3016    if stdout.truncated || !is_structured_body(&stdout.text) {
3017        return None;
3018    }
3019
3020    Some(TerminalOutputCache {
3021        output_preview: stdout.text,
3022        output_truncated: false,
3023        kind: TerminalOutputKind::Structured,
3024        output_path: Some(output_path),
3025        stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3026        recovery: None,
3027    })
3028}
3029
3030fn render_raw_passthrough(
3031    buffer: &BgBuffer,
3032    disk_truncation: DiskTruncation,
3033) -> TerminalOutputCache {
3034    let raw = buffer.read_combined_head_tail(
3035        RAW_PASSTHROUGH_CAP_BYTES,
3036        RAW_PASSTHROUGH_HEAD_BYTES,
3037        RAW_PASSTHROUGH_TAIL_BYTES,
3038    );
3039    let output_path = buffer.output_path().map(|path| path.display().to_string());
3040    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3041    if !raw.truncated && disk_truncation.total_prefix_bytes() == 0 {
3042        return TerminalOutputCache {
3043            output_preview: raw.text,
3044            output_truncated: false,
3045            kind: TerminalOutputKind::Raw,
3046            output_path,
3047            stderr_path,
3048            recovery: None,
3049        };
3050    }
3051
3052    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3053    let mut recovery = RecoveryContext {
3054        dropped_by_class: BTreeMap::new(),
3055        had_inner_drop: false,
3056        offset_hint_eligible: false,
3057        offset_start_line: None,
3058        byte_truncated: raw.truncated,
3059        disk_truncated_prefix_bytes: disk_truncation.total_prefix_bytes(),
3060        output_path: output_path.clone(),
3061        stderr_path: stderr_path.clone(),
3062        include_stderr_path,
3063    };
3064    let (output_preview, output_truncated) =
3065        render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3066    TerminalOutputCache {
3067        output_preview,
3068        output_truncated,
3069        kind: TerminalOutputKind::Raw,
3070        output_path,
3071        stderr_path,
3072        recovery: Some(recovery),
3073    }
3074}
3075
3076fn completion_preview_for_cache(
3077    cache: &TerminalOutputCache,
3078    exit_code: Option<i32>,
3079) -> (String, bool) {
3080    // Reminder previews are sized by exit status: success gets a short tail,
3081    // failure keeps head+tail context (see output.rs completion caps).
3082    let exit_ok = exit_code == Some(0);
3083    let threshold = completion_preview_threshold(exit_ok);
3084    if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3085        if let Some(path) = cache.output_path.as_deref() {
3086            return (
3087                json_output_pointer(cache.output_preview.len() as u64, path),
3088                true,
3089            );
3090        }
3091        return (cache.output_preview.clone(), cache.output_truncated);
3092    }
3093
3094    if let Some(recovery) = cache.recovery.as_ref() {
3095        if cache.output_preview.len() <= threshold {
3096            return (cache.output_preview.clone(), cache.output_truncated);
3097        }
3098        let body = strip_recovery_marker_lines(&cache.output_preview);
3099        let mut completion_recovery = recovery.clone();
3100        completion_recovery.byte_truncated = true;
3101        if let Some(marker) = recovery_marker(&completion_recovery) {
3102            let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3103            return (capped.text, true);
3104        }
3105    }
3106
3107    let capped = cap_completion_output(&cache.output_preview, exit_ok);
3108    (capped.text, cache.output_truncated || capped.truncated)
3109}
3110
3111fn is_gh_structured_command(command: &str) -> bool {
3112    let Some(normalized) = crate::compress::plain_command_for_structured_output(command) else {
3113        return false;
3114    };
3115    let tokens = shell_words_for_flags(&normalized);
3116    let Some(head) = tokens.first() else {
3117        return false;
3118    };
3119    let head_name = Path::new(head)
3120        .file_name()
3121        .and_then(|name| name.to_str())
3122        .unwrap_or(head);
3123    if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3124        return false;
3125    }
3126    tokens.iter().any(|token| {
3127        matches!(token.as_str(), "--json" | "--jq" | "--template")
3128            || token.starts_with("--json=")
3129            || token.starts_with("--jq=")
3130            || token.starts_with("--template=")
3131    })
3132}
3133
3134fn shell_words_for_flags(command: &str) -> Vec<String> {
3135    let mut words = Vec::new();
3136    let mut current = String::new();
3137    let mut in_single = false;
3138    let mut in_double = false;
3139    let mut escaped = false;
3140
3141    for ch in command.chars() {
3142        if escaped {
3143            current.push(ch);
3144            escaped = false;
3145            continue;
3146        }
3147        if ch == '\\' && !in_single {
3148            escaped = true;
3149            continue;
3150        }
3151        if ch == '\'' && !in_double {
3152            in_single = !in_single;
3153            continue;
3154        }
3155        if ch == '"' && !in_single {
3156            in_double = !in_double;
3157            continue;
3158        }
3159        if ch.is_whitespace() && !in_single && !in_double {
3160            if !current.is_empty() {
3161                words.push(std::mem::take(&mut current));
3162            }
3163            continue;
3164        }
3165        if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3166            if !current.is_empty() {
3167                words.push(std::mem::take(&mut current));
3168            }
3169            continue;
3170        }
3171        current.push(ch);
3172    }
3173    if !current.is_empty() {
3174        words.push(current);
3175    }
3176    words
3177}
3178
3179fn is_structured_body(body: &str) -> bool {
3180    let trimmed = body.trim();
3181    if trimmed.is_empty() {
3182        return false;
3183    }
3184    if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3185        return true;
3186    }
3187
3188    let mut saw_line = false;
3189    for line in trimmed
3190        .lines()
3191        .map(str::trim)
3192        .filter(|line| !line.is_empty())
3193    {
3194        saw_line = true;
3195        if serde_json::from_str::<serde_json::Value>(line).is_err() {
3196            return false;
3197        }
3198    }
3199    saw_line
3200}
3201
3202fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3203    let path = match (buffer, stream) {
3204        (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3205        (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3206        (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3207    };
3208    let Some(path) = path else {
3209        return false;
3210    };
3211    let Ok(file) = std::fs::File::open(path) else {
3212        return false;
3213    };
3214    let mut limited = file.take(512);
3215    let mut bytes = Vec::new();
3216    if limited.read_to_end(&mut bytes).is_err() {
3217        return false;
3218    }
3219    String::from_utf8_lossy(&bytes)
3220        .chars()
3221        .find(|ch| !ch.is_whitespace())
3222        .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3223}
3224
3225struct CompletionTokenCounts {
3226    original_tokens: Option<u32>,
3227    compressed_tokens: Option<u32>,
3228    original_bytes: Option<i64>,
3229    compressed_bytes: Option<i64>,
3230    tokens_skipped: bool,
3231}
3232
3233impl CompletionTokenCounts {
3234    fn skipped() -> Self {
3235        Self {
3236            original_tokens: None,
3237            compressed_tokens: None,
3238            original_bytes: None,
3239            compressed_bytes: None,
3240            tokens_skipped: true,
3241        }
3242    }
3243}
3244
3245fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3246    match status {
3247        BgTaskStatus::TimedOut => "timed out".to_string(),
3248        BgTaskStatus::Killed => "killed".to_string(),
3249        _ => exit_code
3250            .map(|code| format!("exit {code}"))
3251            .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3252    }
3253}
3254
3255fn token_count_u32(text: &str) -> u32 {
3256    aft_tokenizer::count_tokens(text)
3257        .try_into()
3258        .unwrap_or(u32::MAX)
3259}
3260
3261impl Default for BgTaskRegistry {
3262    fn default() -> Self {
3263        Self::new(Arc::new(Mutex::new(None)))
3264    }
3265}
3266
3267fn modified_within(path: &Path, grace: Duration) -> bool {
3268    fs::metadata(path)
3269        .and_then(|metadata| metadata.modified())
3270        .ok()
3271        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3272        .map(|age| age < grace)
3273        .unwrap_or(false)
3274}
3275
3276fn canonicalized_path(path: &Path) -> PathBuf {
3277    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3278}
3279
3280fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3281    let now_ms = SystemTime::now()
3282        .duration_since(UNIX_EPOCH)
3283        .ok()
3284        .map(|duration| duration.as_millis() as u64)
3285        .unwrap_or(started_at);
3286    let elapsed_ms = now_ms.saturating_sub(started_at);
3287    Instant::now()
3288        .checked_sub(Duration::from_millis(elapsed_ms))
3289        .unwrap_or_else(Instant::now)
3290}
3291
3292fn gc_quarantine(storage_dir: &Path) {
3293    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3294    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3295        return;
3296    };
3297    for session_entry in session_dirs.flatten() {
3298        let session_quarantine_dir = session_entry.path();
3299        if !session_quarantine_dir.is_dir() {
3300            continue;
3301        }
3302        let entries = match fs::read_dir(&session_quarantine_dir) {
3303            Ok(entries) => entries,
3304            Err(error) => {
3305                crate::slog_warn!(
3306                    "failed to read background task quarantine dir {}: {error}",
3307                    session_quarantine_dir.display()
3308                );
3309                continue;
3310            }
3311        };
3312        for entry in entries.flatten() {
3313            let path = entry.path();
3314            if modified_within(&path, QUARANTINE_GC_GRACE) {
3315                continue;
3316            }
3317            let result = if path.is_dir() {
3318                fs::remove_dir_all(&path)
3319            } else {
3320                fs::remove_file(&path)
3321            };
3322            match result {
3323                Ok(()) => log::debug!(
3324                    "deleted old background task quarantine entry {}",
3325                    path.display()
3326                ),
3327                Err(error) => crate::slog_warn!(
3328                    "failed to delete old background task quarantine entry {}: {error}",
3329                    path.display()
3330                ),
3331            }
3332        }
3333        let _ = fs::remove_dir(&session_quarantine_dir);
3334    }
3335    let _ = fs::remove_dir(&quarantine_root);
3336}
3337
3338enum QuarantineKind {
3339    Corrupt,
3340    Invalid,
3341}
3342
3343fn quarantine_task_json(
3344    storage_dir: &Path,
3345    session_dir: &Path,
3346    json_path: &Path,
3347    kind: QuarantineKind,
3348) -> Result<(), String> {
3349    let session_hash = session_dir
3350        .file_name()
3351        .and_then(|name| name.to_str())
3352        .ok_or_else(|| {
3353            format!(
3354                "invalid background task session dir: {}",
3355                session_dir.display()
3356            )
3357        })?;
3358    let task_name = json_path
3359        .file_name()
3360        .and_then(|name| name.to_str())
3361        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3362    let unix_ts = SystemTime::now()
3363        .duration_since(UNIX_EPOCH)
3364        .map(|duration| duration.as_secs())
3365        .unwrap_or(0);
3366    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3367    fs::create_dir_all(&quarantine_dir).map_err(|e| {
3368        format!(
3369            "failed to create background task quarantine dir {}: {e}",
3370            quarantine_dir.display()
3371        )
3372    })?;
3373    let target_name = quarantine_name(task_name, unix_ts, &kind);
3374    let target = quarantine_dir.join(target_name);
3375    fs::rename(json_path, &target).map_err(|e| {
3376        format!(
3377            "failed to quarantine background task metadata {} to {}: {e}",
3378            json_path.display(),
3379            target.display()
3380        )
3381    })?;
3382
3383    for sibling in task_sibling_paths(json_path) {
3384        if !sibling.exists() {
3385            continue;
3386        }
3387        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3388            crate::slog_warn!(
3389                "skipping background task sibling with invalid name during quarantine: {}",
3390                sibling.display()
3391            );
3392            continue;
3393        };
3394        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3395        if let Err(error) = fs::rename(&sibling, &sibling_target) {
3396            crate::slog_warn!(
3397                "failed to quarantine background task sibling {} to {}: {error}",
3398                sibling.display(),
3399                sibling_target.display()
3400            );
3401        }
3402    }
3403
3404    let _ = fs::remove_dir(session_dir);
3405    Ok(())
3406}
3407
3408fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3409    match kind {
3410        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3411        QuarantineKind::Invalid => {
3412            let path = Path::new(file_name);
3413            let stem = path.file_stem().and_then(|stem| stem.to_str());
3414            let extension = path.extension().and_then(|extension| extension.to_str());
3415            match (stem, extension) {
3416                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3417                _ => format!("{file_name}.invalid.{unix_ts}"),
3418            }
3419        }
3420    }
3421}
3422
3423fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3424    let Some(parent) = json_path.parent() else {
3425        return Vec::new();
3426    };
3427    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3428        return Vec::new();
3429    };
3430    ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3431        .into_iter()
3432        .map(|extension| parent.join(format!("{stem}.{extension}")))
3433        .collect()
3434}
3435
3436fn read_for_token_count_from_disk(
3437    metadata: &PersistedTask,
3438    paths: &TaskPaths,
3439    max_bytes_per_stream: usize,
3440) -> TokenCountInput {
3441    if metadata.mode == BgMode::Pty {
3442        return TokenCountInput::Skipped;
3443    }
3444    // Read up to `max_bytes_per_stream` bytes per stream rather than
3445    // refusing to tokenize anything when the file exceeds the cap.
3446    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
3447    // (see comment there) — large outputs are exactly the tasks that
3448    // benefit most from compression accounting, so silent-skipping
3449    // them defeats the purpose of token tracking.
3450    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3451    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3452    match (stdout, stderr) {
3453        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3454            String::from_utf8_lossy(&stdout).as_ref(),
3455            String::from_utf8_lossy(&stderr).as_ref(),
3456        )),
3457        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3458            String::from_utf8_lossy(&stdout).as_ref(),
3459            "",
3460        )),
3461        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3462            "",
3463            String::from_utf8_lossy(&stderr).as_ref(),
3464        )),
3465        (Err(_), Err(_)) => TokenCountInput::Skipped,
3466    }
3467}
3468
3469/// Read at most `max_bytes` bytes from the END of `path`. Used for
3470/// tokenization where the most recent output is more representative than
3471/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
3472/// opened (genuinely missing or permissions error).
3473fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3474    use std::io::{Read, Seek, SeekFrom};
3475    let mut file = std::fs::File::open(path)?;
3476    let len = file.metadata()?.len();
3477    let read_len = len.min(max_bytes as u64);
3478    if read_len > 0 && len > max_bytes as u64 {
3479        file.seek(SeekFrom::End(-(read_len as i64)))?;
3480    }
3481    let mut bytes = Vec::with_capacity(read_len as usize);
3482    file.read_to_end(&mut bytes)?;
3483    Ok(bytes)
3484}
3485
3486impl BgTask {
3487    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3488        let state = self
3489            .state
3490            .lock()
3491            .unwrap_or_else(|poison| poison.into_inner());
3492        self.snapshot_locked(&state, preview_bytes)
3493    }
3494
3495    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3496        let metadata = &state.metadata;
3497        let duration_ms = metadata.duration_ms.or_else(|| {
3498            metadata
3499                .status
3500                .is_terminal()
3501                .then(|| self.started.elapsed().as_millis() as u64)
3502        });
3503        let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3504            (String::new(), false)
3505        } else if metadata.status.is_terminal() {
3506            state
3507                .terminal_output_cache
3508                .as_ref()
3509                .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3510                .unwrap_or_else(|| (String::new(), false))
3511        } else {
3512            state.buffer.read_tail(preview_bytes)
3513        };
3514        BgTaskSnapshot {
3515            info: BgTaskInfo {
3516                task_id: self.task_id.clone(),
3517                status: metadata.status.clone(),
3518                command: metadata.command.clone(),
3519                mode: metadata.mode.clone(),
3520                started_at: metadata.started_at,
3521                duration_ms,
3522            },
3523            exit_code: metadata.exit_code,
3524            child_pid: metadata.child_pid,
3525            workdir: metadata.workdir.display().to_string(),
3526            output_preview,
3527            output_truncated,
3528            output_path: state
3529                .buffer
3530                .output_path()
3531                .map(|path| path.display().to_string()),
3532            stderr_path: state
3533                .buffer
3534                .stderr_path()
3535                .map(|path| path.display().to_string()),
3536            pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3537            pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3538            pty_screen: None,
3539        }
3540    }
3541
3542    pub(crate) fn is_running(&self) -> bool {
3543        self.state
3544            .lock()
3545            .map(|state| {
3546                state.metadata.status == BgTaskStatus::Running
3547                    || (state.metadata.mode == BgMode::Pty
3548                        && state.metadata.status == BgTaskStatus::Killing)
3549            })
3550            .unwrap_or(false)
3551    }
3552
3553    fn is_terminal(&self) -> bool {
3554        self.state
3555            .lock()
3556            .map(|state| state.metadata.status.is_terminal())
3557            .unwrap_or(false)
3558    }
3559
3560    fn mark_terminal_now(&self) {
3561        if let Ok(mut terminal_at) = self.terminal_at.lock() {
3562            if terminal_at.is_none() {
3563                *terminal_at = Some(Instant::now());
3564            }
3565        }
3566    }
3567
3568    fn set_completion_delivered(
3569        &self,
3570        delivered: bool,
3571        registry: &BgTaskRegistry,
3572    ) -> Result<(), String> {
3573        let mut state = self
3574            .state
3575            .lock()
3576            .map_err(|_| "background task lock poisoned".to_string())?;
3577        let updated = registry
3578            .update_task_metadata(&self.paths, |metadata| {
3579                metadata.completion_delivered = delivered;
3580            })
3581            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3582        state.metadata = updated;
3583        Ok(())
3584    }
3585}
3586
3587/// Reap an exited direct child handle, then clear the slot.
3588///
3589/// Dropping a [`std::process::Child`] does NOT `wait()` on the underlying OS
3590/// process. On Unix a finished-but-unreaped child lingers as a `<defunct>`
3591/// zombie until the AFT process itself exits (issue #91: `[mv] <defunct>`).
3592/// The terminal-transition paths that learn of completion from the
3593/// exit-marker file — rather than from [`BgTaskRegistry::reap_child`]'s
3594/// `try_wait()` — must therefore reap the handle explicitly instead of just
3595/// nulling it.
3596///
3597/// The exit marker is written by the wrapper's final statement (an atomic
3598/// `mv` rename), so by the time we observe the marker the direct child has
3599/// finished its work and is exiting; `wait()` returns essentially
3600/// immediately. We attempt a non-blocking `try_wait()` first so the common
3601/// case never blocks at all, falling back to a (bounded) `wait()` only to
3602/// cover the microsecond window between the rename and process teardown.
3603///
3604/// Callers hold the task state mutex, so this is serialized against
3605/// `reap_child` — there is no double-`wait()` hazard: whichever path acquires
3606/// the lock first reaps and clears the slot, and the other observes `None`.
3607#[cfg(unix)]
3608fn reap_piped_child(child_slot: &mut Option<Child>) {
3609    if let Some(mut child) = child_slot.take() {
3610        if matches!(child.try_wait(), Ok(None)) {
3611            let _ = child.wait();
3612        }
3613    }
3614}
3615
3616/// Windows has no zombie/`<defunct>` concept: dropping the [`Child`] closes
3617/// the process handle, which is the correct release. Preserve the historical
3618/// behavior of simply clearing the slot so the documented Windows PID-recycle
3619/// handling in `reap_child` is unaffected.
3620#[cfg(windows)]
3621fn reap_piped_child(child_slot: &mut Option<Child>) {
3622    *child_slot = None;
3623}
3624
3625fn terminal_metadata_from_marker(
3626    mut metadata: PersistedTask,
3627    marker: ExitMarker,
3628    reason: Option<String>,
3629) -> PersistedTask {
3630    match marker {
3631        ExitMarker::Code(code) => {
3632            let status = if code == 0 {
3633                BgTaskStatus::Completed
3634            } else {
3635                BgTaskStatus::Failed
3636            };
3637            metadata.mark_terminal(status, Some(code), reason);
3638        }
3639        ExitMarker::Killed => metadata.mark_terminal(
3640            BgTaskStatus::Killed,
3641            terminal_exit_code_for_status(&BgTaskStatus::Killed),
3642            reason,
3643        ),
3644    }
3645    metadata
3646}
3647
3648fn terminal_exit_code_for_status(status: &BgTaskStatus) -> Option<i32> {
3649    match status {
3650        BgTaskStatus::TimedOut => Some(124),
3651        BgTaskStatus::Killed => Some(137),
3652        _ => None,
3653    }
3654}
3655
3656#[cfg(unix)]
3657fn write_unix_command_script(command: &str, paths: &TaskPaths) -> Result<PathBuf, String> {
3658    let stem = paths
3659        .json
3660        .file_stem()
3661        .and_then(|stem| stem.to_str())
3662        .unwrap_or("wrapper");
3663    let script_path = paths.dir.join(format!("{stem}.sh"));
3664    fs::write(&script_path, command)
3665        .map_err(|e| format!("failed to write background bash command script: {e}"))?;
3666    Ok(script_path)
3667}
3668
3669#[cfg(unix)]
3670fn detached_shell_command(command_script: &Path, exit_path: &Path) -> Command {
3671    let shell = resolve_posix_shell();
3672    let mut cmd = Command::new(&shell);
3673    // Keep the user-provided command body out of argv and shell `-c` parsing.
3674    // The direct child is still a tiny wrapper so it can write the authoritative
3675    // exit marker after the command script exits (including if that script calls
3676    // `exit`). Passing only file paths through `-c` avoids newline/quote/length
3677    // edge cases where a multi-command script can be mangled before execution.
3678    cmd.arg("-c")
3679        .arg(r#""$0" "$1"; code=$?; printf "%s" "$code" > "$2.tmp.$$"; mv -f "$2.tmp.$$" "$2""#)
3680        .arg(&shell)
3681        .arg(command_script)
3682        .arg(exit_path);
3683    unsafe {
3684        cmd.pre_exec(|| {
3685            if libc::setsid() == -1 {
3686                return Err(std::io::Error::last_os_error());
3687            }
3688            Ok(())
3689        });
3690    }
3691    cmd
3692}
3693
3694#[cfg(unix)]
3695fn resolve_posix_shell() -> PathBuf {
3696    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3697    POSIX_SHELL
3698        .get_or_init(|| {
3699            std::env::var_os("BASH")
3700                .filter(|value| !value.is_empty())
3701                .map(PathBuf::from)
3702                .filter(|path| path.exists())
3703                .or_else(|| which::which("bash").ok())
3704                .or_else(|| which::which("zsh").ok())
3705                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3706        })
3707        .clone()
3708}
3709
3710#[cfg(windows)]
3711fn detached_shell_command_for(
3712    shell: crate::windows_shell::WindowsShell,
3713    command: &str,
3714    exit_path: &Path,
3715    paths: &TaskPaths,
3716    creation_flags: u32,
3717) -> Result<Command, String> {
3718    use crate::windows_shell::WindowsShell;
3719    // Write the wrapper to a temp file alongside the other task files,
3720    // then invoke the shell with the file path as a single clean
3721    // argument. This sidesteps the entire Windows command-line quoting
3722    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
3723    // parser all interacting with embedded quotes in the wrapper).
3724    //
3725    // Path arguments don't need quoting in the same problematic way
3726    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
3727    // contains no characters that need shell escaping; (2) the wrapper
3728    // body's internal quotes never reach the shell command line — the
3729    // shell reads them from disk by file syntax rules, not command-line
3730    // parser rules.
3731    let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3732    let wrapper_ext = match shell {
3733        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3734        WindowsShell::Cmd => "bat",
3735        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
3736        // so the file extension is purely cosmetic; `.sh` matches what an
3737        // operator would expect when grepping the spill directory.
3738        WindowsShell::Posix(_) => "sh",
3739    };
3740    let wrapper_path = paths.dir.join(format!(
3741        "{}.{}",
3742        paths
3743            .json
3744            .file_stem()
3745            .and_then(|s| s.to_str())
3746            .unwrap_or("wrapper"),
3747        wrapper_ext
3748    ));
3749    fs::write(&wrapper_path, wrapper_body)
3750        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3751
3752    let mut cmd = Command::new(shell.binary().as_ref());
3753    match shell {
3754        WindowsShell::Pwsh | WindowsShell::Powershell => {
3755            // -File runs the script with no quoting issues. `-NoLogo`,
3756            // `-NoProfile`, etc. apply to the host before the file runs.
3757            cmd.args([
3758                "-NoLogo",
3759                "-NoProfile",
3760                "-NonInteractive",
3761                "-ExecutionPolicy",
3762                "Bypass",
3763                "-File",
3764            ]);
3765            cmd.arg(&wrapper_path);
3766        }
3767        WindowsShell::Cmd => {
3768            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
3769            // file via /C is well-defined; the file's contents are
3770            // read line-by-line by cmd's batch processor, NOT
3771            // re-interpreted by the /C parser. This avoids the
3772            // "filename syntax incorrect" errors that came from
3773            // having complex compound commands on the cmd line.
3774            cmd.args(["/D", "/C"]);
3775            cmd.arg(&wrapper_path);
3776        }
3777        WindowsShell::Posix(_) => {
3778            // git-bash and other POSIX shells run the wrapper script with
3779            // `<binary> <wrapper-path>` (the wrapper is just a shell
3780            // script). No special flags needed — the `trap` and atomic
3781            // exit-marker rename in `wrapper_script` are POSIX-standard.
3782            cmd.arg(&wrapper_path);
3783        }
3784    }
3785
3786    // Win32 process creation flags. Caller selects whether to include
3787    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
3788    // for the breakaway-fallback strategy.
3789    cmd.creation_flags(creation_flags);
3790    Ok(cmd)
3791}
3792
3793/// Spawn a detached background bash child process.
3794///
3795/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
3796/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
3797/// cmd.exe) and retries with the next candidate when the previous one
3798/// fails to spawn with `NotFound` — the same runtime safety net the
3799/// foreground bash path has, so issue #27 callers landing on cmd.exe
3800/// fallback can also use background bash. The wrapper script is
3801/// regenerated per attempt because PowerShell wrappers embed the shell
3802/// binary by name; the stdout/stderr capture handles are also reopened
3803/// per attempt because `Command::spawn()` consumes them.
3804///
3805/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
3806/// return immediately without retry — they indicate a problem with the
3807/// resolved shell that retrying with a different shell won't fix.
3808fn spawn_detached_child(
3809    command: &str,
3810    paths: &TaskPaths,
3811    workdir: &Path,
3812    env: &HashMap<String, String>,
3813) -> Result<std::process::Child, String> {
3814    #[cfg(not(windows))]
3815    {
3816        let stdout = create_capture_file(&paths.stdout)
3817            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3818        let stderr = create_capture_file(&paths.stderr)
3819            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3820        let command_script = write_unix_command_script(command, paths)?;
3821        detached_shell_command(&command_script, &paths.exit)
3822            .current_dir(workdir)
3823            .envs(env)
3824            .stdin(Stdio::null())
3825            .stdout(Stdio::from(stdout))
3826            .stderr(Stdio::from(stderr))
3827            .spawn()
3828            .map_err(|e| format!("failed to spawn background bash command: {e}"))
3829    }
3830    #[cfg(windows)]
3831    {
3832        use crate::windows_shell::shell_candidates;
3833        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
3834        // legacy foreground bash spawn path. v0.20 routes ALL bash through
3835        // this background spawn helper, including foreground tool calls
3836        // where the model writes PowerShell-syntax (`$var = ...`,
3837        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
3838        // The earlier v0.18-era cmd-first override worked around a
3839        // PowerShell detached-output bug; that bug is fixed at the
3840        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
3841        // see flag block below), so we no longer need to misroute PS
3842        // commands through cmd.
3843        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3844        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
3845        // first (so the bg child outlives the AFT process when AFT is killed),
3846        // then fall back without it for environments where the parent is in a
3847        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
3848        // runners (GitHub Actions windows-2022) and some MDM-managed corp
3849        // environments hit this — `CreateProcess` returns Access Denied (5).
3850        // Without breakaway, the child still runs detached but will be torn
3851        // down with the parent if the parent process group is signaled.
3852        //
3853        // We use CREATE_NO_WINDOW (no visible console window, but the
3854        // child still has a hidden console) rather than DETACHED_PROCESS
3855        // (no console at all). PowerShell-based wrappers that perform
3856        // file I/O via [System.IO.File] need a console handle to flush
3857        // stdout/stderr correctly even when redirected — under
3858        // DETACHED_PROCESS, pwsh sometimes silently exits before
3859        // executing later script statements (the Move-Item that writes
3860        // the exit marker never runs), leaving the bg task forever
3861        // marked Failed: process exited without exit marker. cmd.exe
3862        // wrappers tolerate DETACHED_PROCESS, but switching to
3863        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
3864        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3865        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3866        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3867        let with_breakaway =
3868            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3869        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3870        let mut last_error: Option<String> = None;
3871        for (idx, shell) in candidates.iter().enumerate() {
3872            // Per-shell, try with breakaway first. If the process is in a
3873            // restrictive job, the breakaway flag triggers Access Denied
3874            // (os error 5). Retry once without breakaway.
3875            for &flags in &[with_breakaway, without_breakaway] {
3876                // Re-open capture handles per attempt; spawn() consumes them.
3877                let stdout = create_capture_file(&paths.stdout)
3878                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3879                let stderr = create_capture_file(&paths.stderr)
3880                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3881                let mut cmd =
3882                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3883                cmd.current_dir(workdir)
3884                    .envs(env)
3885                    .stdin(Stdio::null())
3886                    .stdout(Stdio::from(stdout))
3887                    .stderr(Stdio::from(stderr));
3888                match cmd.spawn() {
3889                    Ok(child) => {
3890                        if idx > 0 {
3891                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3892                             the cached PATH probe disagreed with runtime spawn — likely PATH \
3893                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3894                            shell.binary(),
3895                            idx);
3896                        }
3897                        if flags == without_breakaway {
3898                            crate::slog_warn!(
3899                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3900                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
3901                             Spawned without breakaway; the bg task will be torn down if the \
3902                             AFT process group is killed."
3903                            );
3904                        }
3905                        return Ok(child);
3906                    }
3907                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3908                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3909                        shell.binary());
3910                        last_error = Some(format!("{}: {e}", shell.binary()));
3911                        // Skip the without-breakaway retry for NotFound — the
3912                        // binary itself is missing, breakaway flag is irrelevant.
3913                        break;
3914                    }
3915                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3916                        // Access Denied during breakaway — retry without it.
3917                        crate::slog_warn!(
3918                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3919                         Access Denied — retrying {} without breakaway",
3920                            shell.binary()
3921                        );
3922                        last_error = Some(format!("{}: {e}", shell.binary()));
3923                        continue;
3924                    }
3925                    Err(e) => {
3926                        return Err(format!(
3927                            "failed to spawn background bash command via {}: {e}",
3928                            shell.binary()
3929                        ));
3930                    }
3931                }
3932            }
3933        }
3934        Err(format!(
3935            "failed to spawn background bash command: no Windows shell could be spawned. \
3936             Last error: {}. PATH-probed candidates: {:?}",
3937            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3938            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3939        ))
3940    }
3941}
3942
3943fn random_slug() -> String {
3944    // 8 bytes = 64-bit entropy → `bash-{16hex}`, matching the documented contract
3945    // at `generate_unique_task_id`. The width is load-bearing for the subc
3946    // delivery dedup: a plugin can retain a delivered task id awaiting ack that
3947    // Rust has already dropped (a lost ack response), and Rust's uniqueness check
3948    // cannot see that plugin-side set — so id reuse must be made negligible by
3949    // entropy alone. 32-bit was reusable within a long session and could let a new
3950    // task collide with such a stale id and be silently skipped (audit R3 #3).
3951    let mut bytes = [0u8; 8];
3952    // getrandom is a transitive dependency; use it directly for OS entropy.
3953    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3954        // Extremely unlikely fallback: time + pid mix across all 8 bytes.
3955        let t = SystemTime::now()
3956            .duration_since(UNIX_EPOCH)
3957            .map(|d| d.as_nanos() as u64)
3958            .unwrap_or(0);
3959        let p = u64::from(std::process::id());
3960        bytes.copy_from_slice(&(t ^ p.rotate_left(32)).to_le_bytes());
3961    });
3962    // `bash-` + 16 lowercase hex chars — compact, OS-entropy backed.
3963    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3964    format!("bash-{hex}")
3965}
3966
3967#[cfg(test)]
3968mod tests {
3969    use std::collections::HashMap;
3970    use std::fs;
3971    use std::sync::atomic::{AtomicBool, AtomicUsize};
3972    use std::sync::{Arc, Mutex};
3973    use std::time::Duration;
3974    #[cfg(windows)]
3975    use std::time::Instant;
3976
3977    use super::*;
3978
3979    #[cfg(unix)]
3980    const QUICK_SUCCESS_COMMAND: &str = "true";
3981    #[cfg(windows)]
3982    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3983
3984    #[cfg(unix)]
3985    const LONG_RUNNING_COMMAND: &str = "sleep 5";
3986    #[cfg(windows)]
3987    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3988
3989    #[test]
3990    fn gh_structured_detection_rejects_piped_commands() {
3991        assert!(is_gh_structured_command(
3992            "gh issue list --json number,title"
3993        ));
3994        assert!(is_gh_structured_command(
3995            "cd repo && gh issue list --json number,title"
3996        ));
3997
3998        assert!(!is_gh_structured_command(
3999            "gh issue list --json number,title | jq '.[]'"
4000        ));
4001        assert!(!is_gh_structured_command(
4002            "gh issue list --json number,title |"
4003        ));
4004    }
4005
4006    fn insert_terminal_piped_task(
4007        registry: &BgTaskRegistry,
4008        dir: &tempfile::TempDir,
4009        command: &str,
4010        stdout: &str,
4011        stderr: &str,
4012        compressed: bool,
4013    ) -> (String, Arc<BgTask>) {
4014        let task_id = format!("bash-test-{}", random_slug());
4015        let paths = task_paths(dir.path(), "session", &task_id);
4016        fs::create_dir_all(&paths.dir).unwrap();
4017        fs::write(&paths.stdout, stdout).unwrap();
4018        fs::write(&paths.stderr, stderr).unwrap();
4019        let mut metadata = PersistedTask::starting(
4020            task_id.clone(),
4021            "session".to_string(),
4022            command.to_string(),
4023            dir.path().to_path_buf(),
4024            Some(dir.path().to_path_buf()),
4025            Some(30_000),
4026            true,
4027            compressed,
4028        );
4029        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
4030        write_task(&paths.json, &metadata).unwrap();
4031        registry
4032            .insert_rehydrated_task(metadata, paths, true)
4033            .expect("insert terminal task");
4034        let task = registry.task_for_session(&task_id, "session").unwrap();
4035        (task_id, task)
4036    }
4037
4038    fn insert_terminal_pty_task(
4039        registry: &BgTaskRegistry,
4040        dir: &tempfile::TempDir,
4041        pty_output: &str,
4042    ) -> (String, Arc<BgTask>) {
4043        let task_id = format!("bash-test-{}", random_slug());
4044        let paths = task_paths(dir.path(), "session", &task_id);
4045        fs::create_dir_all(&paths.dir).unwrap();
4046        fs::write(&paths.pty, pty_output).unwrap();
4047        let mut metadata = PersistedTask::starting(
4048            task_id.clone(),
4049            "session".to_string(),
4050            "python".to_string(),
4051            dir.path().to_path_buf(),
4052            Some(dir.path().to_path_buf()),
4053            Some(30_000),
4054            true,
4055            true,
4056        );
4057        metadata.mode = BgMode::Pty;
4058        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
4059        write_task(&paths.json, &metadata).unwrap();
4060        registry
4061            .insert_rehydrated_task(metadata, paths, true)
4062            .expect("insert terminal pty task");
4063        let task = registry.task_for_session(&task_id, "session").unwrap();
4064        (task_id, task)
4065    }
4066
4067    #[cfg(unix)]
4068    fn wait_for_terminal_snapshot(
4069        registry: &BgTaskRegistry,
4070        task_id: &str,
4071        session_id: &str,
4072        project: &Path,
4073        storage: &Path,
4074    ) -> BgTaskSnapshot {
4075        let started = Instant::now();
4076        loop {
4077            let snapshot = registry
4078                .status(task_id, session_id, Some(project), Some(storage), 4096)
4079                .expect("spawned task should be visible to status");
4080            if snapshot.info.status.is_terminal() {
4081                return snapshot;
4082            }
4083            assert!(
4084                started.elapsed() < Duration::from_secs(10),
4085                "timed out waiting for task {task_id} to finish; last status={:?}",
4086                snapshot.info.status
4087            );
4088            std::thread::sleep(Duration::from_millis(50));
4089        }
4090    }
4091
4092    fn write_running_project_task(storage: &Path, project: &Path, session: &str, task_id: &str) {
4093        let paths = task_paths(storage, session, task_id);
4094        let mut metadata = PersistedTask::starting(
4095            task_id.to_string(),
4096            session.to_string(),
4097            "sleep 60".to_string(),
4098            project.to_path_buf(),
4099            Some(project.to_path_buf()),
4100            Some(30_000),
4101            true,
4102            true,
4103        );
4104        metadata.status = BgTaskStatus::Running;
4105        write_task(&paths.json, &metadata).unwrap();
4106        fs::write(&paths.stdout, "still running\n").unwrap();
4107        fs::write(&paths.stderr, "").unwrap();
4108    }
4109
4110    #[test]
4111    fn status_replay_filters_same_session_by_project_root() {
4112        let project_a = tempfile::tempdir().unwrap();
4113        let project_b = tempfile::tempdir().unwrap();
4114        let storage = tempfile::tempdir().unwrap();
4115        let session = "shared-session";
4116        let task_id = "bash-project-a";
4117        write_running_project_task(storage.path(), project_a.path(), session, task_id);
4118
4119        let actor_b = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4120        assert!(actor_b
4121            .status(
4122                task_id,
4123                session,
4124                Some(project_b.path()),
4125                Some(storage.path()),
4126                1024,
4127            )
4128            .is_none());
4129        assert!(actor_b.task_for_session(task_id, session).is_none());
4130
4131        let actor_a = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4132        let snapshot = actor_a
4133            .status(
4134                task_id,
4135                session,
4136                Some(project_a.path()),
4137                Some(storage.path()),
4138                1024,
4139            )
4140            .expect("owning project should replay its task");
4141        assert_eq!(snapshot.info.status, BgTaskStatus::Running);
4142    }
4143
4144    #[cfg(unix)]
4145    #[test]
4146    fn multiline_pipeline_stdout_persists_all_lines_after_terminal_status() {
4147        let cases = [
4148            (
4149                "long-first",
4150                "sleep 0.5; printf 'one\\n' | cat\nprintf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4151                vec!["one", "1", "three"],
4152            ),
4153            (
4154                "short-first",
4155                "printf 'one\\n' | cat\nsleep 0.2; printf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4156                vec!["one", "1", "three"],
4157            ),
4158            (
4159                "failing-middle",
4160                "sleep 0.2; printf 'one\\n' | cat\nfalse; printf 'after-false\\n' | cat\nprintf 'three\\n' | cat",
4161                vec!["one", "after-false", "three"],
4162            ),
4163        ];
4164
4165        for (name, command, expected_lines) in cases {
4166            let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4167            let dir = tempfile::tempdir().unwrap();
4168            let session_id = format!("session-{name}");
4169            let task_id = registry
4170                .spawn(
4171                    command,
4172                    session_id.clone(),
4173                    dir.path().to_path_buf(),
4174                    HashMap::new(),
4175                    Some(Duration::from_secs(30)),
4176                    dir.path().to_path_buf(),
4177                    10,
4178                    true,
4179                    true,
4180                    Some(dir.path().to_path_buf()),
4181                )
4182                .unwrap();
4183
4184            let snapshot = wait_for_terminal_snapshot(
4185                &registry,
4186                &task_id,
4187                &session_id,
4188                dir.path(),
4189                dir.path(),
4190            );
4191            assert_eq!(
4192                snapshot.info.status,
4193                BgTaskStatus::Completed,
4194                "{name}: task should complete; snapshot={snapshot:?}"
4195            );
4196            assert_eq!(
4197                snapshot.exit_code,
4198                Some(0),
4199                "{name}: script should use the final command's exit code"
4200            );
4201
4202            let stdout_path = task_paths(dir.path(), &session_id, &task_id).stdout;
4203            let stdout = fs::read_to_string(&stdout_path).unwrap_or_else(|error| {
4204                panic!(
4205                    "{name}: failed to read raw stdout file {}: {error}",
4206                    stdout_path.display()
4207                )
4208            });
4209            let lines: Vec<&str> = stdout.lines().collect();
4210            assert_eq!(
4211                lines,
4212                expected_lines,
4213                "{name}: raw stdout file must include every newline-separated command's output; stdout_path={}",
4214                stdout_path.display()
4215            );
4216        }
4217    }
4218
4219    #[test]
4220    fn recognizes_all_recovery_marker_forms() {
4221        assert!(is_recovery_marker(
4222            "[truncated output; full output: read \"/tmp/out\"]"
4223        ));
4224        assert!(is_recovery_marker(
4225            "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
4226        ));
4227        assert!(is_recovery_marker(
4228            "[truncated output; full output unavailable]"
4229        ));
4230        assert!(is_recovery_marker(
4231            r#"[truncated 123 bytes from saved output prefix; retained output: read "/tmp/out"]"#
4232        ));
4233    }
4234
4235    #[test]
4236    fn recovery_marker_reports_disk_prefix_truncation_as_retained_output() {
4237        let recovery = RecoveryContext {
4238            dropped_by_class: BTreeMap::new(),
4239            had_inner_drop: false,
4240            offset_hint_eligible: false,
4241            offset_start_line: None,
4242            byte_truncated: false,
4243            disk_truncated_prefix_bytes: 4096,
4244            output_path: Some("/tmp/stdout".to_string()),
4245            stderr_path: None,
4246            include_stderr_path: false,
4247        };
4248
4249        let marker = recovery_marker(&recovery).expect("disk truncation must emit marker");
4250
4251        assert!(marker.contains("truncated 4096 bytes from saved output prefix"));
4252        assert!(marker.contains(r#"retained output: read "/tmp/stdout""#));
4253        assert!(!marker.contains("full output: read"));
4254    }
4255
4256    #[test]
4257    fn killed_exit_marker_sets_nonzero_sentinel_exit_code() {
4258        let metadata = PersistedTask::starting(
4259            "task".to_string(),
4260            "session".to_string(),
4261            "cargo test".to_string(),
4262            PathBuf::from("/tmp"),
4263            None,
4264            None,
4265            true,
4266            true,
4267        );
4268
4269        let terminal = terminal_metadata_from_marker(metadata, ExitMarker::Killed, None);
4270
4271        assert_eq!(terminal.status, BgTaskStatus::Killed);
4272        assert_eq!(terminal.exit_code, Some(137));
4273    }
4274
4275    #[test]
4276    fn terminal_status_polls_use_cached_render_once_and_off_lock() {
4277        let registry = BgTaskRegistry::default();
4278        let dir = tempfile::tempdir().unwrap();
4279        let (_task_id, task) = insert_terminal_piped_task(
4280            &registry,
4281            &dir,
4282            "custom-tool --verbose",
4283            &"stdout line\n".repeat(200_000),
4284            "",
4285            true,
4286        );
4287        let calls = Arc::new(AtomicUsize::new(0));
4288        let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4289        let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4290        let calls_for_closure = Arc::clone(&calls);
4291        let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4292        let task_for_closure = Arc::clone(&task_holder);
4293        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4294            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4295            if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4296                if task.state.try_lock().is_ok() {
4297                    unlocked_for_closure.store(true, Ordering::SeqCst);
4298                }
4299            }
4300            CompressionResult::new(format!("compressed {} bytes", output.len()))
4301        });
4302
4303        let first = registry
4304            .status(
4305                &task.task_id,
4306                "session",
4307                None,
4308                Some(dir.path()),
4309                RUNNING_OUTPUT_PREVIEW_BYTES,
4310            )
4311            .unwrap();
4312        let second = registry
4313            .status(
4314                &task.task_id,
4315                "session",
4316                None,
4317                Some(dir.path()),
4318                RUNNING_OUTPUT_PREVIEW_BYTES,
4319            )
4320            .unwrap();
4321        let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4322
4323        assert_eq!(
4324            calls.load(Ordering::SeqCst),
4325            1,
4326            "terminal render must be cached"
4327        );
4328        assert!(
4329            saw_unlocked_state.load(Ordering::SeqCst),
4330            "compressor must run after releasing the task state lock"
4331        );
4332        assert!(first.output_preview.starts_with("compressed "));
4333        assert_eq!(second.output_preview, first.output_preview);
4334        assert_eq!(listed[0].output_preview, first.output_preview);
4335    }
4336
4337    #[test]
4338    fn completion_preview_success_keeps_tail_only() {
4339        // Exit-aware completion previews: a SUCCESSFUL task's reminder keeps a
4340        // short tail only — head context is noise when the command worked
4341        // (regression: the uniform 4 KiB head+tail cap flooded reminders with
4342        // ~1K tokens of build noise per completed task).
4343        let registry = BgTaskRegistry::default();
4344        let dir = tempfile::tempdir().unwrap();
4345        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4346        let (_task_id, task) =
4347            insert_terminal_piped_task(&registry, &dir, "cat big.log", &output, "", false);
4348
4349        registry.post_terminal_transition(&task, true).unwrap();
4350        let completions = registry.drain_completions_for_session(Some("session"));
4351        assert_eq!(completions.len(), 1);
4352        let preview = &completions[0].output_preview;
4353        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4354        assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4355        assert!(completions[0].output_truncated);
4356    }
4357
4358    #[test]
4359    fn completion_preview_failure_keeps_head_and_tail() {
4360        // A FAILED task keeps a small head (first error / command banner) plus
4361        // a larger tail (tracebacks and summaries land at the end).
4362        let registry = BgTaskRegistry::default();
4363        let dir = tempfile::tempdir().unwrap();
4364        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4365        let task_id = format!("bash-test-{}", random_slug());
4366        let paths = task_paths(dir.path(), "session", &task_id);
4367        fs::create_dir_all(&paths.dir).unwrap();
4368        fs::write(&paths.stdout, &output).unwrap();
4369        fs::write(&paths.stderr, "").unwrap();
4370        let mut metadata = PersistedTask::starting(
4371            task_id.clone(),
4372            "session".to_string(),
4373            "cat big.log".to_string(),
4374            dir.path().to_path_buf(),
4375            Some(dir.path().to_path_buf()),
4376            Some(30_000),
4377            true,
4378            false,
4379        );
4380        metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4381        write_task(&paths.json, &metadata).unwrap();
4382        registry
4383            .insert_rehydrated_task(metadata, paths, true)
4384            .expect("insert terminal task");
4385        let task = registry.task_for_session(&task_id, "session").unwrap();
4386
4387        registry.post_terminal_transition(&task, true).unwrap();
4388        let completions = registry.drain_completions_for_session(Some("session"));
4389        assert_eq!(completions.len(), 1);
4390        let preview = &completions[0].output_preview;
4391        assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4392        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4393    }
4394
4395    #[test]
4396    fn has_completions_for_session_matches_pending_delivery() {
4397        let registry = BgTaskRegistry::default();
4398        assert!(!registry.has_completions_for_session(Some("session")));
4399        assert!(!registry.has_completions_for_session(None));
4400
4401        let dir = tempfile::tempdir().unwrap();
4402        let (_task_id, task) =
4403            insert_terminal_piped_task(&registry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4404        registry.post_terminal_transition(&task, true).unwrap();
4405
4406        assert!(registry.has_completions_for_session(Some("session")));
4407        assert!(registry.has_completions_for_session(None));
4408        assert!(!registry.has_completions_for_session(Some("other-session")));
4409
4410        let completions = registry.drain_completions_for_session(Some("session"));
4411        assert_eq!(completions.len(), 1);
4412        assert_eq!(completions[0].task_id, task.task_id);
4413    }
4414
4415    #[test]
4416    fn structured_gh_json_survives_intact_and_ignores_stderr() {
4417        let registry = BgTaskRegistry::default();
4418        let dir = tempfile::tempdir().unwrap();
4419        let calls = Arc::new(AtomicUsize::new(0));
4420        let calls_for_closure = Arc::clone(&calls);
4421        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4422            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4423            CompressionResult::new(output)
4424        });
4425        let (task_id, _task) = insert_terminal_piped_task(
4426            &registry,
4427            &dir,
4428            "gh pr view 123 --json body",
4429            "{\"body\":\"hello\"}",
4430            "warning: stderr must not join json",
4431            true,
4432        );
4433
4434        let snapshot = registry
4435            .status(
4436                &task_id,
4437                "session",
4438                None,
4439                Some(dir.path()),
4440                RUNNING_OUTPUT_PREVIEW_BYTES,
4441            )
4442            .unwrap();
4443
4444        assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4445        assert!(!snapshot.output_preview.contains("warning"));
4446        assert!(!snapshot.output_truncated);
4447        assert_eq!(
4448            calls.load(Ordering::SeqCst),
4449            0,
4450            "structured JSON bypasses compression"
4451        );
4452    }
4453
4454    #[test]
4455    fn registry_emits_single_recovery_marker_for_class_drops() {
4456        let registry = BgTaskRegistry::default();
4457        let dir = tempfile::tempdir().unwrap();
4458        registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4459            let mut dropped = BTreeMap::new();
4460            dropped.insert(DropClass::Error, 18);
4461            dropped.insert(DropClass::Warning, 6);
4462            CompressionResult::with_class_drops("kept diagnostic", dropped)
4463        });
4464        let (task_id, task) =
4465            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4466
4467        let snapshot = registry
4468            .status(
4469                &task_id,
4470                "session",
4471                None,
4472                Some(dir.path()),
4473                RUNNING_OUTPUT_PREVIEW_BYTES,
4474            )
4475            .unwrap();
4476
4477        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4478        assert!(snapshot.output_preview.contains("+18 more errors"));
4479        assert!(snapshot.output_preview.contains("+6 more warnings"));
4480        assert!(snapshot
4481            .output_preview
4482            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4483        assert!(!snapshot.output_preview.contains("tail -n +"));
4484        assert!(snapshot.output_truncated);
4485    }
4486
4487    #[test]
4488    fn registry_marker_reports_semantic_and_byte_drops_once() {
4489        let registry = BgTaskRegistry::default();
4490        let dir = tempfile::tempdir().unwrap();
4491        registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4492            let mut dropped = BTreeMap::new();
4493            dropped.insert(DropClass::Error, 1);
4494            CompressionResult::with_class_drops(
4495                format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4496                dropped,
4497            )
4498        });
4499        let (task_id, _task) =
4500            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4501
4502        let snapshot = registry
4503            .status(
4504                &task_id,
4505                "session",
4506                None,
4507                Some(dir.path()),
4508                RUNNING_OUTPUT_PREVIEW_BYTES,
4509            )
4510            .unwrap();
4511
4512        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4513        assert!(snapshot.output_preview.contains("+1 more error"));
4514        assert!(snapshot.output_preview.contains("truncated output"));
4515        assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4516        assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4517        assert!(!snapshot.output_preview.contains("...<truncated"));
4518        assert!(snapshot.output_truncated);
4519    }
4520
4521    #[test]
4522    fn cargo_stderr_class_drops_name_both_capture_paths() {
4523        let registry = BgTaskRegistry::default();
4524        let dir = tempfile::tempdir().unwrap();
4525        let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4526        registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4527            crate::compress::compress_with_registry_exit_code(
4528                command,
4529                &output,
4530                exit_code,
4531                &filter_registry,
4532            )
4533        });
4534        let stderr = (0..22)
4535            .map(|index| {
4536                format!(
4537                    "error: cargo failure {index}\n  --> src/lib.rs:{}:1\n   |\n{} | boom\n",
4538                    index + 1,
4539                    index + 1
4540                )
4541            })
4542            .collect::<Vec<_>>()
4543            .join("\n");
4544        let (task_id, task) = insert_terminal_piped_task(
4545            &registry,
4546            &dir,
4547            "cargo check",
4548            "Finished dev [unoptimized] target(s) in 0.01s\n",
4549            &stderr,
4550            true,
4551        );
4552
4553        let snapshot = registry
4554            .status(
4555                &task_id,
4556                "session",
4557                None,
4558                Some(dir.path()),
4559                RUNNING_OUTPUT_PREVIEW_BYTES,
4560            )
4561            .unwrap();
4562
4563        assert!(snapshot.output_preview.contains("+2 more errors"));
4564        assert!(snapshot
4565            .output_preview
4566            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4567        assert!(snapshot
4568            .output_preview
4569            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4570        assert!(!snapshot.output_preview.contains("tail -n +"));
4571    }
4572
4573    #[test]
4574    fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4575        let registry = BgTaskRegistry::default();
4576        let dir = tempfile::tempdir().unwrap();
4577        let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4578        let (task_id, task) = insert_terminal_piped_task(
4579            &registry,
4580            &dir,
4581            "cd /repo && gh pr view 123 --json body",
4582            &body,
4583            "",
4584            true,
4585        );
4586
4587        let snapshot = registry
4588            .status(
4589                &task_id,
4590                "session",
4591                None,
4592                Some(dir.path()),
4593                RUNNING_OUTPUT_PREVIEW_BYTES,
4594            )
4595            .unwrap();
4596
4597        assert!(snapshot.output_preview.starts_with("[JSON output "));
4598        assert!(snapshot
4599            .output_preview
4600            .contains(&task.paths.stdout.display().to_string()));
4601        assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4602        assert!(snapshot.output_truncated);
4603    }
4604
4605    #[test]
4606    fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4607        let registry = BgTaskRegistry::default();
4608        let dir = tempfile::tempdir().unwrap();
4609        let filter_registry = crate::compress::toml_filter::build_registry(
4610            crate::compress::builtin_filters::ALL,
4611            None,
4612            None,
4613        );
4614        registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4615            crate::compress::compress_with_registry_exit_code(
4616                command,
4617                &output,
4618                exit_code,
4619                &filter_registry,
4620            )
4621        });
4622        let stdout = format!(
4623            "make[1]: Entering directory `/tmp`\n{}",
4624            (0..100)
4625                .map(|index| format!("compile line {index}"))
4626                .collect::<Vec<_>>()
4627                .join("\n")
4628        );
4629        let (task_id, task) =
4630            insert_terminal_piped_task(&registry, &dir, "make all", &stdout, "", true);
4631
4632        let snapshot = registry
4633            .status(
4634                &task_id,
4635                "session",
4636                None,
4637                Some(dir.path()),
4638                RUNNING_OUTPUT_PREVIEW_BYTES,
4639            )
4640            .unwrap();
4641
4642        assert!(snapshot.output_preview.contains("compile line 99"));
4643        assert!(snapshot.output_preview.contains(&format!(
4644            "full output: read \"{}\"",
4645            task.paths.stdout.display()
4646        )));
4647        assert!(!snapshot
4648            .output_preview
4649            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4650        assert!(!snapshot.output_preview.contains("tail -n +"));
4651    }
4652
4653    #[test]
4654    fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4655        let registry = BgTaskRegistry::default();
4656        let dir = tempfile::tempdir().unwrap();
4657        let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4658        let (task_id, task) =
4659            insert_terminal_piped_task(&registry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4660
4661        let snapshot = registry
4662            .status(
4663                &task_id,
4664                "session",
4665                None,
4666                Some(dir.path()),
4667                RUNNING_OUTPUT_PREVIEW_BYTES,
4668            )
4669            .unwrap();
4670
4671        assert!(snapshot.output_preview.contains("RAW-HEAD"));
4672        assert!(snapshot.output_preview.contains("RAW-TAIL"));
4673        assert!(snapshot.output_preview.contains("truncated output"));
4674        assert!(snapshot
4675            .output_preview
4676            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4677        assert!(snapshot
4678            .output_preview
4679            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4680        assert!(!snapshot.output_preview.contains("tail -n +"));
4681        assert!(snapshot.output_preview.len() > 16 * 1024);
4682        assert!(snapshot.output_truncated);
4683    }
4684
4685    #[test]
4686    fn pty_terminal_snapshot_bypasses_line_compression() {
4687        let registry = BgTaskRegistry::default();
4688        let dir = tempfile::tempdir().unwrap();
4689        let calls = Arc::new(AtomicUsize::new(0));
4690        let calls_for_closure = Arc::clone(&calls);
4691        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4692            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4693            CompressionResult::new(output)
4694        });
4695        let (task_id, _task) = insert_terminal_pty_task(&registry, &dir, "raw\u{1b}[31m pty bytes");
4696
4697        let snapshot = registry
4698            .status(
4699                &task_id,
4700                "session",
4701                None,
4702                Some(dir.path()),
4703                RUNNING_OUTPUT_PREVIEW_BYTES,
4704            )
4705            .unwrap();
4706
4707        assert_eq!(snapshot.info.mode, BgMode::Pty);
4708        assert_eq!(snapshot.output_preview, "");
4709        assert_eq!(calls.load(Ordering::SeqCst), 0);
4710    }
4711
4712    #[test]
4713    fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4714        let registry = BgTaskRegistry::default();
4715        let dir = tempfile::tempdir().unwrap();
4716        let task_id = registry
4717            .spawn_pty(
4718                QUICK_SUCCESS_COMMAND,
4719                "session".to_string(),
4720                dir.path().to_path_buf(),
4721                HashMap::new(),
4722                Some(Duration::from_secs(30)),
4723                dir.path().to_path_buf(),
4724                10,
4725                true,
4726                false,
4727                Some(dir.path().to_path_buf()),
4728                50,
4729                120,
4730            )
4731            .unwrap();
4732
4733        let paths = task_paths(dir.path(), "session", &task_id);
4734        let metadata = read_task(&paths.json).unwrap();
4735        assert_eq!(
4736            metadata.schema_version,
4737            crate::bash_background::persistence::SCHEMA_VERSION
4738        );
4739        assert_eq!(metadata.mode, BgMode::Pty);
4740        assert_eq!(metadata.pty_rows, Some(50));
4741        assert_eq!(metadata.pty_cols, Some(120));
4742
4743        let snapshot = registry
4744            .status(&task_id, "session", None, Some(dir.path()), 1024)
4745            .unwrap();
4746        assert_eq!(snapshot.pty_rows, Some(50));
4747        assert_eq!(snapshot.pty_cols, Some(120));
4748    }
4749
4750    /// Spawn a child process that exits immediately and return it after
4751    /// it has terminated. Used by reap_child tests to simulate the
4752    /// "child exists and is dead" state when the watchdog has already
4753    /// nulled out the original child handle.
4754    fn spawn_dead_child() -> std::process::Child {
4755        #[cfg(unix)]
4756        let mut cmd = std::process::Command::new("true");
4757        #[cfg(windows)]
4758        let mut cmd = {
4759            let mut c = std::process::Command::new("cmd");
4760            c.args(["/c", "exit", "0"]);
4761            c
4762        };
4763        cmd.stdin(std::process::Stdio::null());
4764        cmd.stdout(std::process::Stdio::null());
4765        cmd.stderr(std::process::Stdio::null());
4766        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4767        // Poll try_wait() until the child actually exits, instead of calling
4768        // wait() which closes the OS handle. On Windows, after wait()
4769        // closes the handle, subsequent try_wait() calls (which reap_child
4770        // depends on) return Err — the test was inadvertently giving
4771        // reap_child an unusable child handle. Polling try_wait() keeps the
4772        // handle open and observes natural exit, matching the production
4773        // shape where the watchdog discovers an exited child for the first
4774        // time.
4775        let started = Instant::now();
4776        loop {
4777            match child.try_wait() {
4778                Ok(Some(_)) => break,
4779                Ok(None) => {
4780                    if started.elapsed() > Duration::from_secs(5) {
4781                        panic!("dead-child stand-in did not exit within 5s");
4782                    }
4783                    std::thread::sleep(Duration::from_millis(10));
4784                }
4785                Err(error) => panic!("dead-child try_wait failed: {error}"),
4786            }
4787        }
4788        child
4789    }
4790
4791    #[test]
4792    fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4793        let registry = BgTaskRegistry::default();
4794        let dir = tempfile::tempdir().unwrap();
4795        let task_id = registry
4796            .spawn(
4797                LONG_RUNNING_COMMAND,
4798                "session".to_string(),
4799                dir.path().to_path_buf(),
4800                HashMap::new(),
4801                Some(Duration::from_secs(30)),
4802                dir.path().to_path_buf(),
4803                10,
4804                true,
4805                false,
4806                Some(dir.path().to_path_buf()),
4807            )
4808            .unwrap();
4809        registry
4810            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4811            .unwrap();
4812        assert_eq!(
4813            registry
4814                .drain_completions_for_session(Some("session"))
4815                .len(),
4816            1
4817        );
4818
4819        // Simulate the plugin consuming a sync bash_watch({ exit:true }) result
4820        // locally before the Rust completion queue is drained/acked.
4821        registry.inner.completions.lock().unwrap().clear();
4822
4823        assert_eq!(
4824            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4825            vec![task_id.clone()]
4826        );
4827        assert!(registry
4828            .drain_completions_for_session(Some("session"))
4829            .is_empty());
4830
4831        let paths = task_paths(dir.path(), "session", &task_id);
4832        let metadata = read_task(&paths.json).unwrap();
4833        assert!(metadata.completion_delivered);
4834
4835        let replayed = BgTaskRegistry::default();
4836        replayed
4837            .replay_session_inner(dir.path(), "session", None)
4838            .unwrap();
4839        assert!(replayed
4840            .drain_completions_for_session(Some("session"))
4841            .is_empty());
4842    }
4843
4844    #[test]
4845    fn register_watch_rejects_unknown_task() {
4846        let registry = BgTaskRegistry::default();
4847
4848        let result = registry.register_watch(
4849            "missing-task".to_string(),
4850            WatchPattern::Substring("READY".into()),
4851            true,
4852        );
4853
4854        assert_eq!(result, Err("task_not_found"));
4855    }
4856
4857    #[test]
4858    fn register_watch_on_terminal_task_scans_existing_output() {
4859        let frames = Arc::new(Mutex::new(Vec::new()));
4860        let captured = Arc::clone(&frames);
4861        let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4862            captured.lock().unwrap().push(frame);
4863        })
4864            as Box<dyn Fn(PushFrame) + Send + Sync>);
4865        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4866        let dir = tempfile::tempdir().unwrap();
4867        let task_id = registry
4868            .spawn(
4869                LONG_RUNNING_COMMAND,
4870                "session".to_string(),
4871                dir.path().to_path_buf(),
4872                HashMap::new(),
4873                Some(Duration::from_secs(30)),
4874                dir.path().to_path_buf(),
4875                10,
4876                true,
4877                false,
4878                Some(dir.path().to_path_buf()),
4879            )
4880            .unwrap();
4881        registry
4882            .inner
4883            .shutdown
4884            .store(true, std::sync::atomic::Ordering::SeqCst);
4885        let task = registry.task_for_session(&task_id, "session").unwrap();
4886        std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4887        registry
4888            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4889            .unwrap();
4890        frames.lock().unwrap().clear();
4891        registry.inner.completions.lock().unwrap().clear();
4892
4893        registry
4894            .register_watch(
4895                task_id.clone(),
4896                WatchPattern::Substring("READY".into()),
4897                true,
4898            )
4899            .unwrap();
4900
4901        let frames = frames.lock().unwrap();
4902        let frame = frames
4903            .iter()
4904            .find_map(|frame| match frame {
4905                PushFrame::BashPatternMatch(frame) => Some(frame),
4906                _ => None,
4907            })
4908            .expect("terminal watch registration should emit pattern frame");
4909        assert_eq!(frame.reason, "pattern_match");
4910        assert_eq!(frame.task_id, task_id);
4911        assert_eq!(frame.session_id, "session");
4912        assert_eq!(frame.match_text, "READY");
4913        assert_eq!(frame.match_offset, 0);
4914        assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4915        let metadata = read_task(&task.paths.json).unwrap();
4916        assert!(metadata.completion_delivered);
4917    }
4918
4919    #[test]
4920    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4921        let registry = BgTaskRegistry::default();
4922        let dir = tempfile::tempdir().unwrap();
4923        let task_id = registry
4924            .spawn(
4925                QUICK_SUCCESS_COMMAND,
4926                "session".to_string(),
4927                dir.path().to_path_buf(),
4928                HashMap::new(),
4929                Some(Duration::from_secs(30)),
4930                dir.path().to_path_buf(),
4931                10,
4932                true,
4933                false,
4934                Some(dir.path().to_path_buf()),
4935            )
4936            .unwrap();
4937        registry
4938            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4939            .unwrap();
4940        let completions = registry.drain_completions_for_session(Some("session"));
4941        assert_eq!(completions.len(), 1);
4942        assert_eq!(
4943            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4944            vec![task_id.clone()]
4945        );
4946
4947        registry.cleanup_finished(Duration::ZERO);
4948
4949        assert!(registry.inner.tasks.lock().unwrap().is_empty());
4950    }
4951
4952    #[test]
4953    fn cleanup_finished_retains_undelivered_terminals() {
4954        let registry = BgTaskRegistry::default();
4955        let dir = tempfile::tempdir().unwrap();
4956        let task_id = registry
4957            .spawn(
4958                QUICK_SUCCESS_COMMAND,
4959                "session".to_string(),
4960                dir.path().to_path_buf(),
4961                HashMap::new(),
4962                Some(Duration::from_secs(30)),
4963                dir.path().to_path_buf(),
4964                10,
4965                true,
4966                false,
4967                Some(dir.path().to_path_buf()),
4968            )
4969            .unwrap();
4970        registry
4971            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4972            .unwrap();
4973
4974        registry.cleanup_finished(Duration::ZERO);
4975
4976        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4977    }
4978
4979    /// Verify that the live watchdog path (reap_child) gives an exited
4980    /// child one watchdog pass for its exit marker to land, then marks the
4981    /// task Failed if the next pass still sees no marker.
4982    ///
4983    /// Cross-platform: uses a quick-exiting command that does NOT go
4984    /// through the wrapper script (we manually clear the exit marker
4985    /// after spawn to simulate the wrapper crashing before write).
4986    #[test]
4987    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4988        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4989        let dir = tempfile::tempdir().unwrap();
4990        let task_id = registry
4991            .spawn(
4992                QUICK_SUCCESS_COMMAND,
4993                "session".to_string(),
4994                dir.path().to_path_buf(),
4995                HashMap::new(),
4996                Some(Duration::from_secs(30)),
4997                dir.path().to_path_buf(),
4998                10,
4999                true,
5000                false,
5001                Some(dir.path().to_path_buf()),
5002            )
5003            .unwrap();
5004
5005        let task = registry.task_for_session(&task_id, "session").unwrap();
5006
5007        // Wait for the child to actually exit and the wrapper to either
5008        // write the marker or fail. Then nuke the marker to simulate
5009        // wrapper crash before write. Poll up to 5s; this is plenty for a
5010        // `true`/`cmd /c exit 0` invocation.
5011        let started = Instant::now();
5012        loop {
5013            let exited = {
5014                let mut state = task.state.lock().unwrap();
5015                match &mut state.runtime {
5016                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5017                    _ => true,
5018                }
5019            };
5020            if exited {
5021                break;
5022            }
5023            assert!(
5024                started.elapsed() < Duration::from_secs(5),
5025                "child should exit quickly"
5026            );
5027            std::thread::sleep(Duration::from_millis(20));
5028        }
5029
5030        // Stop the watchdog so it doesn't race with our manual reap_child.
5031        // On fast Windows runners the watchdog ticks (every 500ms) can
5032        // observe the child exit and reap it before this test's assertion
5033        // fires, leaving us with state.child = None and an already-terminal
5034        // status. We specifically want to test reap_child's logic when
5035        // invoked manually on a Running-but-actually-dead task, so we need
5036        // exclusive control over the reap path here.
5037        registry
5038            .inner
5039            .shutdown
5040            .store(true, std::sync::atomic::Ordering::SeqCst);
5041        // Give the watchdog at most one tick (500ms) to notice shutdown
5042        // before we touch task state. Without this, an in-flight watchdog
5043        // iteration could still race with our state setup below.
5044        std::thread::sleep(Duration::from_millis(550));
5045
5046        // Wrapper likely wrote the marker by now; remove it to simulate
5047        // a wrapper crash that exited before persisting the exit code.
5048        let _ = std::fs::remove_file(&task.paths.exit);
5049
5050        // The watchdog may have already reaped the child handle and
5051        // marked the task terminal before we got here. Reset both so
5052        // reap_child has the "Running task whose child just exited"
5053        // shape it's designed to handle. If the original child handle is
5054        // gone, install a quick-exited stand-in so the first reap exercises
5055        // the same try_wait path as production.
5056        //
5057        // CRITICAL on Windows: the watchdog ticks fast enough that the
5058        // JSON on disk may already say `Completed`. `update_task` (called
5059        // by `reap_child`) reads from disk, applies the closure, but
5060        // ROLLS BACK if the original on-disk state was already terminal
5061        // (see persistence.rs::update_task). So we must reset BOTH
5062        // in-memory metadata AND the JSON on disk to a Running state to
5063        // give reap_child the fresh shape it expects to operate on.
5064        {
5065            let mut state = task.state.lock().unwrap();
5066            state.metadata.status = BgTaskStatus::Running;
5067            state.metadata.status_reason = None;
5068            state.metadata.exit_code = None;
5069            state.metadata.finished_at = None;
5070            state.metadata.duration_ms = None;
5071            // Persist the reset state to disk so update_task's terminal
5072            // rollback guard sees a non-terminal starting point.
5073            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5074                .expect("persist reset Running metadata for reap_child test");
5075            // If the watchdog already nulled state.child, we need to
5076            // simulate "child exists and is dead" so reap_child's
5077            // try_wait path runs. Spawn a quick-exit child as a stand-in.
5078            if matches!(state.runtime, TaskRuntime::Piped(None)) {
5079                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5080            }
5081        }
5082        // Clear the terminal_at marker too so mark_terminal_now() can fire
5083        // again inside reap_child.
5084        *task.terminal_at.lock().unwrap() = None;
5085
5086        // Sanity: task is still Running per metadata (replay/poll hasn't
5087        // observed the missing marker yet).
5088        assert!(
5089            task.is_running(),
5090            "precondition: metadata.status == Running"
5091        );
5092        assert!(
5093            !task.paths.exit.exists(),
5094            "precondition: exit marker absent"
5095        );
5096
5097        // First watchdog observation is intentionally insufficient to
5098        // declare failure. A missing marker may just mean the wrapper is
5099        // still completing its tmp-file-to-marker rename, so reap_child only
5100        // drops the child handle and switches to detached PID monitoring.
5101        registry.reap_child(&task);
5102
5103        {
5104            let state = task.state.lock().unwrap();
5105            assert_eq!(
5106                state.metadata.status,
5107                BgTaskStatus::Running,
5108                "first reap must leave status Running while waiting one pass for marker"
5109            );
5110            assert_eq!(
5111                state.metadata.status_reason, None,
5112                "first reap must not record a failure reason"
5113            );
5114            assert!(
5115                matches!(state.runtime, TaskRuntime::Piped(None)),
5116                "child handle must be released after first reap"
5117            );
5118            assert!(
5119                state.detached,
5120                "task must be marked detached after first reap"
5121            );
5122        }
5123
5124        // Second watchdog observation sees the detached PID is dead and the
5125        // marker is still absent. That is strong enough evidence that the
5126        // wrapper exited without persisting an exit code.
5127        registry.reap_child(&task);
5128
5129        let state = task.state.lock().unwrap();
5130        assert!(
5131            state.metadata.status.is_terminal(),
5132            "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
5133            state.metadata.status
5134        );
5135        assert_eq!(
5136            state.metadata.status,
5137            BgTaskStatus::Failed,
5138            "must specifically be Failed (not Killed): status={:?}",
5139            state.metadata.status
5140        );
5141        assert_eq!(
5142            state.metadata.status_reason.as_deref(),
5143            Some("process exited without exit marker"),
5144            "reason must match replay path's wording: {:?}",
5145            state.metadata.status_reason
5146        );
5147        assert!(
5148            matches!(state.runtime, TaskRuntime::Piped(None)),
5149            "child handle must stay released after second reap"
5150        );
5151        assert!(
5152            state.detached,
5153            "task must remain detached after second reap"
5154        );
5155    }
5156
5157    /// Companion to the above: when the exit marker DOES exist on disk
5158    /// at reap_child time, reap_child must NOT mark the task Failed.
5159    /// Instead it leaves status=Running and lets the next poll_task()
5160    /// cycle finalize via the marker.
5161    #[test]
5162    fn reap_child_preserves_running_when_exit_marker_exists() {
5163        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5164        let dir = tempfile::tempdir().unwrap();
5165        let task_id = registry
5166            .spawn(
5167                QUICK_SUCCESS_COMMAND,
5168                "session".to_string(),
5169                dir.path().to_path_buf(),
5170                HashMap::new(),
5171                Some(Duration::from_secs(30)),
5172                dir.path().to_path_buf(),
5173                10,
5174                true,
5175                false,
5176                Some(dir.path().to_path_buf()),
5177            )
5178            .unwrap();
5179
5180        let task = registry.task_for_session(&task_id, "session").unwrap();
5181
5182        // Wait for child to exit AND for the marker to land. Both happen
5183        // shortly after the wrapper finishes — but we want both observed.
5184        let started = Instant::now();
5185        loop {
5186            let exited = {
5187                let mut state = task.state.lock().unwrap();
5188                match &mut state.runtime {
5189                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5190                    _ => true,
5191                }
5192            };
5193            if exited && task.paths.exit.exists() {
5194                break;
5195            }
5196            assert!(
5197                started.elapsed() < Duration::from_secs(5),
5198                "child should exit and write marker quickly"
5199            );
5200            std::thread::sleep(Duration::from_millis(20));
5201        }
5202
5203        // Stop the watchdog so it doesn't race with our manual reap_child.
5204        // On fast Windows runners the watchdog can call poll_task (which
5205        // finalizes via marker) before this test asserts the
5206        // "marker exists, status still Running" invariant. We want
5207        // exclusive control over the reap path.
5208        registry
5209            .inner
5210            .shutdown
5211            .store(true, std::sync::atomic::Ordering::SeqCst);
5212        std::thread::sleep(Duration::from_millis(550));
5213
5214        // If the watchdog already finalized the task before we stopped it,
5215        // restore the test setup: reset status to Running and ensure the
5216        // marker file is still on disk. We're testing reap_child's
5217        // behavior when called manually with both child-exited AND
5218        // marker-present, regardless of whether the watchdog beat us.
5219        {
5220            let mut state = task.state.lock().unwrap();
5221            state.metadata.status = BgTaskStatus::Running;
5222            state.metadata.status_reason = None;
5223            if matches!(state.runtime, TaskRuntime::Piped(None)) {
5224                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5225            }
5226        }
5227        *task.terminal_at.lock().unwrap() = None;
5228        // Make sure the marker is still on disk (poll_task removes it on
5229        // finalization). Recreate it if needed.
5230        if !task.paths.exit.exists() {
5231            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
5232        }
5233
5234        // reap_child sees: child exited, marker exists. It should:
5235        //  - drop state.child / set state.detached = true
5236        //  - NOT change status (poll_task will finalize via marker next tick)
5237        registry.reap_child(&task);
5238
5239        let state = task.state.lock().unwrap();
5240        assert!(
5241            matches!(state.runtime, TaskRuntime::Piped(None)),
5242            "child handle still released even when marker exists"
5243        );
5244        assert!(
5245            state.detached,
5246            "task still marked detached even when marker exists"
5247        );
5248        // Status remains Running because reap_child defers to poll_task
5249        // when a marker exists. It would be wrong for reap to record the
5250        // marker outcome (poll_task does that with proper exit-code
5251        // parsing).
5252        assert_eq!(
5253            state.metadata.status,
5254            BgTaskStatus::Running,
5255            "reap_child must defer to poll_task when marker exists"
5256        );
5257    }
5258
5259    /// Read a process's `ps` state string ("Z", "S", "R", etc). Returns
5260    /// `None` once the PID has been fully reaped (no row), which is the
5261    /// post-reap state we want.
5262    #[cfg(unix)]
5263    fn pid_stat(pid: u32) -> Option<String> {
5264        let output = std::process::Command::new("ps")
5265            .args(["-o", "stat=", "-p", &pid.to_string()])
5266            .output()
5267            .ok()?;
5268        if !output.status.success() {
5269            return None;
5270        }
5271        let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
5272        if stat.is_empty() {
5273            None
5274        } else {
5275            Some(stat)
5276        }
5277    }
5278
5279    /// A `<defunct>` zombie carries `ps` state starting with 'Z'.
5280    #[cfg(unix)]
5281    fn is_zombie(pid: u32) -> bool {
5282        pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5283    }
5284
5285    /// Spawn a child that exits immediately and wait — via `ps`, NOT
5286    /// `try_wait()`/`wait()` — until it is observably a `<defunct>` zombie,
5287    /// then return the still-unreaped handle. This reproduces the exact
5288    /// state issue #91 leaves behind: an exited OS child whose parent has
5289    /// not reaped it.
5290    #[cfg(unix)]
5291    fn spawn_unreaped_zombie() -> std::process::Child {
5292        let child = std::process::Command::new("true")
5293            .stdin(std::process::Stdio::null())
5294            .stdout(std::process::Stdio::null())
5295            .stderr(std::process::Stdio::null())
5296            .spawn()
5297            .expect("spawn zombie stand-in");
5298        let pid = child.id();
5299        let started = Instant::now();
5300        while !is_zombie(pid) {
5301            assert!(
5302                started.elapsed() < Duration::from_secs(5),
5303                "stand-in child should become a zombie within 5s"
5304            );
5305            std::thread::sleep(Duration::from_millis(10));
5306        }
5307        // Return WITHOUT reaping — the handle still owns an unwaited zombie.
5308        child
5309    }
5310
5311    /// Regression test for issue #91: the exit-marker terminal path
5312    /// (`poll_task` -> `finalize_from_marker`) must REAP the direct child
5313    /// handle, not merely drop it. Dropping a `std::process::Child` does not
5314    /// `wait()` on Unix, so the exited child lingers as a `[mv] <defunct>`
5315    /// zombie until AFT exits.
5316    ///
5317    /// We install a known-unreaped zombie into the task's child slot and
5318    /// drive the marker finalize path, then assert the child is gone (reaped)
5319    /// rather than still `<defunct>`.
5320    #[cfg(unix)]
5321    #[test]
5322    fn finalize_from_marker_reaps_child_no_zombie() {
5323        use std::sync::atomic::Ordering;
5324
5325        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5326        let dir = tempfile::tempdir().unwrap();
5327        let task_id = registry
5328            .spawn(
5329                QUICK_SUCCESS_COMMAND,
5330                "session".to_string(),
5331                dir.path().to_path_buf(),
5332                HashMap::new(),
5333                Some(Duration::from_secs(30)),
5334                dir.path().to_path_buf(),
5335                10,
5336                true,
5337                false,
5338                Some(dir.path().to_path_buf()),
5339            )
5340            .unwrap();
5341
5342        // Stop the watchdog so the ONLY terminal-transition path under test
5343        // is the exit-marker finalize (not reap_child's try_wait, which would
5344        // reap the child for us and mask the bug).
5345        registry.inner.shutdown.store(true, Ordering::SeqCst);
5346        std::thread::sleep(Duration::from_millis(550));
5347
5348        let task = registry.task_for_session(&task_id, "session").unwrap();
5349
5350        // Wait for the wrapper's exit marker to land. We deliberately do NOT
5351        // call try_wait()/wait() on the real child here — doing so would reap
5352        // it and defeat the test.
5353        let started = Instant::now();
5354        while !task.paths.exit.exists() {
5355            assert!(
5356                started.elapsed() < Duration::from_secs(5),
5357                "exit marker should land quickly for `true`"
5358            );
5359            std::thread::sleep(Duration::from_millis(20));
5360        }
5361
5362        // Reset to a fresh Running shape and install a guaranteed-unreaped
5363        // zombie as the child handle, so the finalize path's reap behavior is
5364        // exercised deterministically regardless of how the real child was
5365        // handled. Persist Running so update_task's terminal-rollback guard
5366        // sees a non-terminal starting point.
5367        let zombie_pid;
5368        {
5369            let mut state = task.state.lock().unwrap();
5370            state.metadata.status = BgTaskStatus::Running;
5371            state.metadata.status_reason = None;
5372            state.metadata.exit_code = None;
5373            state.metadata.finished_at = None;
5374            state.metadata.duration_ms = None;
5375            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5376                .expect("persist reset Running metadata");
5377            let zombie = spawn_unreaped_zombie();
5378            zombie_pid = zombie.id();
5379            state.runtime = TaskRuntime::Piped(Some(zombie));
5380        }
5381        *task.terminal_at.lock().unwrap() = None;
5382
5383        // Precondition: the installed child is genuinely a `<defunct>` zombie.
5384        assert!(
5385            is_zombie(zombie_pid),
5386            "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5387        );
5388
5389        // Drive the exit-marker terminal path. Before the fix this nulled the
5390        // Child handle without wait(), leaving the zombie behind.
5391        registry.poll_task(&task).unwrap();
5392
5393        {
5394            let state = task.state.lock().unwrap();
5395            assert!(
5396                matches!(state.runtime, TaskRuntime::Piped(None)),
5397                "child handle must be released after marker finalize"
5398            );
5399            assert!(
5400                state.metadata.status.is_terminal(),
5401                "task must be terminal after marker finalize: {:?}",
5402                state.metadata.status
5403            );
5404        }
5405
5406        // The core assertion: the child must have been REAPED, not just
5407        // dropped. A reaped PID has no `ps` row (or at minimum is not 'Z').
5408        assert!(
5409            !is_zombie(zombie_pid),
5410            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5411             after the exit-marker terminal transition"
5412        );
5413    }
5414
5415    /// Companion to the above for the kill path: when a kill observes an
5416    /// already-present exit marker (the child finished on its own first), it
5417    /// must reap the child handle rather than dropping it.
5418    #[cfg(unix)]
5419    #[test]
5420    fn kill_with_existing_marker_reaps_child_no_zombie() {
5421        use std::sync::atomic::Ordering;
5422
5423        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5424        let dir = tempfile::tempdir().unwrap();
5425        let task_id = registry
5426            .spawn(
5427                QUICK_SUCCESS_COMMAND,
5428                "session".to_string(),
5429                dir.path().to_path_buf(),
5430                HashMap::new(),
5431                Some(Duration::from_secs(30)),
5432                dir.path().to_path_buf(),
5433                10,
5434                true,
5435                false,
5436                Some(dir.path().to_path_buf()),
5437            )
5438            .unwrap();
5439
5440        registry.inner.shutdown.store(true, Ordering::SeqCst);
5441        std::thread::sleep(Duration::from_millis(550));
5442
5443        let task = registry.task_for_session(&task_id, "session").unwrap();
5444
5445        let started = Instant::now();
5446        while !task.paths.exit.exists() {
5447            assert!(
5448                started.elapsed() < Duration::from_secs(5),
5449                "exit marker should land quickly for `true`"
5450            );
5451            std::thread::sleep(Duration::from_millis(20));
5452        }
5453
5454        let zombie_pid;
5455        {
5456            let mut state = task.state.lock().unwrap();
5457            state.metadata.status = BgTaskStatus::Running;
5458            state.metadata.status_reason = None;
5459            state.metadata.exit_code = None;
5460            state.metadata.finished_at = None;
5461            state.metadata.duration_ms = None;
5462            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5463                .expect("persist reset Running metadata");
5464            let zombie = spawn_unreaped_zombie();
5465            zombie_pid = zombie.id();
5466            state.runtime = TaskRuntime::Piped(Some(zombie));
5467        }
5468        *task.terminal_at.lock().unwrap() = None;
5469
5470        assert!(
5471            is_zombie(zombie_pid),
5472            "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5473        );
5474
5475        // Kill observes the existing marker and finalizes from it.
5476        registry
5477            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5478            .expect("kill should succeed");
5479
5480        {
5481            let state = task.state.lock().unwrap();
5482            assert!(
5483                matches!(state.runtime, TaskRuntime::Piped(None)),
5484                "child handle must be released after marker-aware kill"
5485            );
5486            assert!(state.metadata.status.is_terminal());
5487        }
5488
5489        assert!(
5490            !is_zombie(zombie_pid),
5491            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5492             after a marker-aware kill"
5493        );
5494    }
5495
5496    #[test]
5497    fn cleanup_finished_keeps_running_tasks() {
5498        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5499        let dir = tempfile::tempdir().unwrap();
5500        let task_id = registry
5501            .spawn(
5502                LONG_RUNNING_COMMAND,
5503                "session".to_string(),
5504                dir.path().to_path_buf(),
5505                HashMap::new(),
5506                Some(Duration::from_secs(30)),
5507                dir.path().to_path_buf(),
5508                10,
5509                true,
5510                false,
5511                Some(dir.path().to_path_buf()),
5512            )
5513            .unwrap();
5514
5515        registry.cleanup_finished(Duration::ZERO);
5516
5517        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5518        let _ = registry.kill(&task_id, "session");
5519    }
5520
5521    #[cfg(windows)]
5522    fn wait_for_file(path: &Path) -> String {
5523        let started = Instant::now();
5524        loop {
5525            if path.exists() {
5526                return fs::read_to_string(path).expect("read file");
5527            }
5528            assert!(
5529                started.elapsed() < Duration::from_secs(30),
5530                "timed out waiting for {}",
5531                path.display()
5532            );
5533            std::thread::sleep(Duration::from_millis(100));
5534        }
5535    }
5536
5537    #[cfg(windows)]
5538    fn spawn_windows_registry_command(
5539        command: &str,
5540    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5541        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5542        let dir = tempfile::tempdir().unwrap();
5543        let task_id = registry
5544            .spawn(
5545                command,
5546                "session".to_string(),
5547                dir.path().to_path_buf(),
5548                HashMap::new(),
5549                Some(Duration::from_secs(30)),
5550                dir.path().to_path_buf(),
5551                10,
5552                false,
5553                false,
5554                Some(dir.path().to_path_buf()),
5555            )
5556            .unwrap();
5557        (registry, dir, task_id)
5558    }
5559
5560    #[cfg(windows)]
5561    #[test]
5562    fn windows_spawn_writes_exit_marker_for_zero_exit() {
5563        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5564        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5565
5566        let content = wait_for_file(&exit_path);
5567
5568        assert_eq!(content.trim(), "0");
5569    }
5570
5571    #[cfg(windows)]
5572    #[test]
5573    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5574        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5575        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5576
5577        let content = wait_for_file(&exit_path);
5578
5579        assert_eq!(content.trim(), "42");
5580    }
5581
5582    #[cfg(windows)]
5583    #[test]
5584    fn windows_spawn_captures_stdout_to_disk() {
5585        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5586        let task = registry.task_for_session(&task_id, "session").unwrap();
5587        let stdout_path = task.paths.stdout.clone();
5588        let exit_path = task.paths.exit.clone();
5589
5590        let _ = wait_for_file(&exit_path);
5591        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5592
5593        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5594    }
5595
5596    #[cfg(windows)]
5597    #[test]
5598    fn windows_spawn_uses_pwsh_when_available() {
5599        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
5600        // (We intentionally pass None for shell_env to keep this test
5601        // independent of the runner's actual env.)
5602        let candidates = crate::windows_shell::shell_candidates_with(
5603            |binary| match binary {
5604                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5605                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5606                _ => None,
5607            },
5608            || None,
5609        );
5610        let shell = candidates.first().expect("at least one candidate").clone();
5611        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5612        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5613    }
5614
5615    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
5616    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
5617    /// evaluation is the default for batch files; parse-time expansion only
5618    /// applies to compound `&`-chained inline commands). Capturing
5619    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
5620    /// command runs records the real run-time exit code.
5621    #[cfg(windows)]
5622    #[test]
5623    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5624        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5625        let script =
5626            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5627
5628        // Batch wrapper: capture exit code into CODE on the line after the
5629        // user command, then write CODE to a temp marker file before
5630        // atomic-renaming it into place.
5631        assert!(
5632            script.contains("set CODE=%ERRORLEVEL%"),
5633            "wrapper must capture exit code into CODE: {script}"
5634        );
5635        assert!(
5636            script.contains("echo %CODE% >"),
5637            "wrapper must echo CODE to a temp marker file: {script}"
5638        );
5639        assert!(
5640            script.contains("move /Y"),
5641            "wrapper must use atomic move to write the marker: {script}"
5642        );
5643        // move output must be redirected to nul to avoid polluting the
5644        // user's captured stdout with "1 file(s) moved." lines.
5645        assert!(
5646            script.contains("> nul"),
5647            "wrapper must redirect move output to nul: {script}"
5648        );
5649        // exit /B %CODE% propagates the real exit code so wait() sees it.
5650        assert!(
5651            script.contains("exit /B %CODE%"),
5652            "wrapper must propagate the captured exit code: {script}"
5653        );
5654        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5655        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5656    }
5657
5658    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
5659    /// written to a `.bat` file where batch-line evaluation captures
5660    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
5661    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
5662    /// internal `"`-quoting from `cmd_quote`).
5663    #[cfg(windows)]
5664    #[test]
5665    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5666        use crate::windows_shell::WindowsShell;
5667        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5668        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5669        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5670        assert_eq!(
5671            args_strs,
5672            vec!["/D", "/S", "/C", "echo wrapped"],
5673            "Cmd::bg_command must prepend /D /S /C"
5674        );
5675    }
5676
5677    /// PowerShell variants don't need `/V:ON`-style flags; their
5678    /// `bg_command()` args stay on the standard `-Command` path.
5679    #[cfg(windows)]
5680    #[test]
5681    fn windows_shell_pwsh_bg_command_uses_standard_args() {
5682        use crate::windows_shell::WindowsShell;
5683        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5684        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5685        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5686        assert!(
5687            args_strs.contains(&"-Command"),
5688            "Pwsh::bg_command must use -Command: {args_strs:?}"
5689        );
5690        assert!(
5691            args_strs.contains(&"Get-Date"),
5692            "Pwsh::bg_command must include the user command body"
5693        );
5694    }
5695}