Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, DiskTruncation, StreamKind, TokenCountInput};
27use super::output::{
28    cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29    cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30    retained_json_output_pointer, COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES,
31    COMPRESS_INPUT_TAIL_BYTES, FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES,
32    RAW_PASSTHROUGH_HEAD_BYTES, RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES,
33    STRUCTURED_OUTPUT_CAP_BYTES,
34};
35use super::persistence::{
36    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
37    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
38    ExitMarker, PersistedTask, TaskPaths,
39};
40use super::process::is_process_alive;
41#[cfg(unix)]
42use super::process::terminate_pgid;
43#[cfg(windows)]
44use super::process::terminate_pid;
45use super::pty_process::spawn_pty_for_command;
46use super::pty_runtime::PtyRuntime;
47use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
48use super::{BgTaskInfo, BgTaskStatus};
49/// Default timeout for background bash tasks: 30 minutes.
50/// Agents can override per-call via the `timeout` parameter (in ms).
51const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
52const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
53const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
54const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
55
56const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
57
58#[derive(Debug, Clone, Serialize)]
59pub struct BgCompletion {
60    pub task_id: String,
61    /// Intentionally omitted from serialized completion payloads: push frames
62    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
63    #[serde(skip_serializing)]
64    pub session_id: String,
65    pub status: BgTaskStatus,
66    pub exit_code: Option<i32>,
67    pub command: String,
68    /// Small head+tail preview of the cached terminal render at completion time,
69    /// cached so push-frame consumers and `bash_drain_completions` callers see
70    /// the same preview without racing against later output rotation. Empty
71    /// when not captured (e.g., persisted task seen on startup before buffer
72    /// reattachment).
73    #[serde(default, skip_serializing_if = "String::is_empty")]
74    pub output_preview: String,
75    /// True when the captured tail is shorter than the actual output (because
76    /// rotation occurred or the output exceeds the preview cap). Plugins use
77    /// this to render a `…` prefix and signal that `bash_status` would return
78    /// more.
79    #[serde(default, skip_serializing_if = "is_false")]
80    pub output_truncated: bool,
81    /// Token count for raw stdout+stderr before compression. Omitted when any
82    /// stream exceeds the 128 KiB tokenization cap.
83    #[serde(default, skip_serializing_if = "Option::is_none")]
84    pub original_tokens: Option<u32>,
85    /// Token count for the compressed output generated from the same capped
86    /// raw payload. Omitted when raw tokenization is skipped.
87    #[serde(default, skip_serializing_if = "Option::is_none")]
88    pub compressed_tokens: Option<u32>,
89    /// True when a stream exceeded the tokenization cap and counts are absent.
90    #[serde(default, skip_serializing_if = "is_false")]
91    pub tokens_skipped: bool,
92}
93
94fn is_false(v: &bool) -> bool {
95    !*v
96}
97
98#[derive(Debug, Clone, Serialize)]
99pub struct BgTaskSnapshot {
100    #[serde(flatten)]
101    pub info: BgTaskInfo,
102    pub exit_code: Option<i32>,
103    pub child_pid: Option<u32>,
104    pub workdir: String,
105    pub output_preview: String,
106    pub output_truncated: bool,
107    pub output_path: Option<String>,
108    pub stderr_path: Option<String>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    pub pty_rows: Option<u16>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub pty_cols: Option<u16>,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116enum TerminalOutputKind {
117    Compressed,
118    Raw,
119    Structured,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123struct TerminalOutputCache {
124    output_preview: String,
125    output_truncated: bool,
126    kind: TerminalOutputKind,
127    output_path: Option<String>,
128    stderr_path: Option<String>,
129    recovery: Option<RecoveryContext>,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
133struct RecoveryContext {
134    dropped_by_class: BTreeMap<DropClass, usize>,
135    had_inner_drop: bool,
136    offset_hint_eligible: bool,
137    offset_start_line: Option<usize>,
138    byte_truncated: bool,
139    disk_truncated_prefix_bytes: u64,
140    output_path: Option<String>,
141    stderr_path: Option<String>,
142    include_stderr_path: bool,
143}
144
145impl RecoveryContext {
146    fn has_visible_drop(&self) -> bool {
147        self.byte_truncated
148            || self.disk_truncated_prefix_bytes > 0
149            || self.had_inner_drop
150            || !self.dropped_by_class.is_empty()
151    }
152}
153
154#[derive(Clone)]
155pub struct BgTaskRegistry {
156    pub(crate) inner: Arc<RegistryInner>,
157}
158
159pub(crate) struct RegistryInner {
160    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
161    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
162    pub(crate) progress_sender: SharedProgressSender,
163    watchdog_started: AtomicBool,
164    pub(crate) shutdown: AtomicBool,
165    pub(crate) long_running_reminder_enabled: AtomicBool,
166    pub(crate) long_running_reminder_interval_ms: AtomicU64,
167    persisted_gc_started: AtomicBool,
168    #[cfg(test)]
169    persisted_gc_runs: AtomicU64,
170    /// Output compression callback. Set by `AppContext` after construction.
171    /// Takes (command, raw_output, exit_code) and returns compressed text. Called from
172    /// the watchdog thread when a task reaches a terminal state and from
173    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
174    /// uncompressed.
175    pub(crate) compressor:
176        Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
177    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
178    pub(crate) db_harness: RwLock<Option<String>>,
179    pub(crate) wake_tx: crossbeam_channel::Sender<()>,
180    pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
181    pub(crate) watch_registry: Mutex<WatchRegistry>,
182}
183
184pub(crate) struct BgTask {
185    pub(crate) task_id: String,
186    pub(crate) session_id: String,
187    pub(crate) paths: TaskPaths,
188    pub(crate) started: Instant,
189    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
190    pub(crate) terminal_at: Mutex<Option<Instant>>,
191    pub(crate) state: Mutex<BgTaskState>,
192}
193
194pub(crate) enum TaskRuntime {
195    Piped(Option<Child>),
196    Pty(Option<PtyRuntime>),
197}
198
199pub(crate) struct BgTaskState {
200    pub(crate) metadata: PersistedTask,
201    pub(crate) runtime: TaskRuntime,
202    pub(crate) detached: bool,
203    /// True once `reap_child` has observed the direct child handle's exit
204    /// via `try_wait()`. Used by the two-pass watchdog to skip the racy
205    /// `is_process_alive(child_pid)` probe on the second pass — we already
206    /// have authoritative evidence that the child is dead, no need to
207    /// re-verify via PID liveness which is unreliable on Windows where
208    /// PIDs can be recycled within seconds.
209    ///
210    /// Remains `false` on replay-restored tasks (those have a `child_pid`
211    /// but never observed exit via this process's `try_wait()`), so those
212    /// continue to fall through to the `is_process_alive` probe path.
213    pub(crate) child_exit_observed: bool,
214    pub(crate) buffer: BgBuffer,
215    terminal_output_cache: Option<TerminalOutputCache>,
216    /// PTY-only: set for timeout kill intent before signaling the child.
217    pub(crate) pending_terminal_override: Option<BgTaskStatus>,
218}
219
220fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
221    session_id
222        .map(|session_id| completion.session_id == session_id)
223        .unwrap_or(true)
224}
225
226impl BgTaskRegistry {
227    pub fn new(progress_sender: SharedProgressSender) -> Self {
228        let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
229        Self {
230            inner: Arc::new(RegistryInner {
231                tasks: Mutex::new(HashMap::new()),
232                completions: Mutex::new(VecDeque::new()),
233                progress_sender,
234                watchdog_started: AtomicBool::new(false),
235                shutdown: AtomicBool::new(false),
236                long_running_reminder_enabled: AtomicBool::new(true),
237                long_running_reminder_interval_ms: AtomicU64::new(600_000),
238                persisted_gc_started: AtomicBool::new(false),
239                #[cfg(test)]
240                persisted_gc_runs: AtomicU64::new(0),
241                compressor: Mutex::new(None),
242                db_pool: RwLock::new(None),
243                db_harness: RwLock::new(None),
244                wake_tx,
245                wake_rx,
246                watch_registry: Mutex::new(WatchRegistry::default()),
247            }),
248        }
249    }
250
251    pub fn set_harness(&self, harness: Harness) {
252        if let Ok(mut slot) = self.inner.db_harness.write() {
253            *slot = Some(harness.storage_segment());
254        }
255    }
256
257    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
258        if let Ok(mut slot) = self.inner.db_pool.write() {
259            *slot = Some(conn);
260        }
261    }
262
263    pub fn clear_db_pool(&self) {
264        if let Ok(mut slot) = self.inner.db_pool.write() {
265            *slot = None;
266        }
267    }
268
269    /// Install the output-compression callback. Called by `main.rs` after
270    /// `AppContext` is constructed so that snapshot/completion paths can
271    /// invoke `compress::compress_with_registry` without holding a context
272    /// reference. When called multiple times, the latest installation wins.
273    pub fn set_compressor<F>(&self, compressor: F)
274    where
275        F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
276    {
277        self.set_compressor_with_exit_code(move |command, output, _exit_code| {
278            compressor(command, output)
279        });
280    }
281
282    pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
283    where
284        F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
285    {
286        if let Ok(mut slot) = self.inner.compressor.lock() {
287            *slot = Some(Box::new(compressor));
288        }
289    }
290
291    /// Apply the installed compressor (if any) to `output`. Returns `output`
292    /// untouched when no compressor is installed.
293    pub(crate) fn compress_output(
294        &self,
295        command: &str,
296        output: String,
297        exit_code: Option<i32>,
298    ) -> CompressionResult {
299        let Ok(slot) = self.inner.compressor.lock() else {
300            return CompressionResult::new(output);
301        };
302        match slot.as_ref() {
303            Some(compressor) => compressor(command, output, exit_code),
304            None => CompressionResult::new(output),
305        }
306    }
307
308    fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
309        let (metadata, buffer) = {
310            let state = task.state.lock().ok()?;
311            if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
312                return None;
313            }
314            if let Some(cache) = state.terminal_output_cache.clone() {
315                return Some(cache);
316            }
317            (state.metadata.clone(), state.buffer.clone())
318        };
319
320        let mut cap_buffer = buffer.clone();
321        let disk_truncation = cap_buffer.enforce_terminal_cap();
322        let cache = self.render_terminal_output(&metadata, &cap_buffer, disk_truncation);
323        let mut state = task.state.lock().ok()?;
324        if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
325            return None;
326        }
327        if let Some(existing) = state.terminal_output_cache.clone() {
328            return Some(existing);
329        }
330        state.terminal_output_cache = Some(cache.clone());
331        Some(cache)
332    }
333
334    fn render_terminal_output(
335        &self,
336        metadata: &PersistedTask,
337        buffer: &BgBuffer,
338        disk_truncation: DiskTruncation,
339    ) -> TerminalOutputCache {
340        if metadata.mode == BgMode::Pty {
341            return TerminalOutputCache {
342                output_preview: String::new(),
343                output_truncated: false,
344                kind: TerminalOutputKind::Raw,
345                output_path: buffer.output_path().map(|path| path.display().to_string()),
346                stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
347                recovery: None,
348            };
349        }
350
351        if let Some(structured) =
352            render_structured_output(&metadata.command, buffer, disk_truncation)
353        {
354            return structured;
355        }
356
357        if !metadata.compressed {
358            return render_raw_passthrough(buffer, disk_truncation);
359        }
360
361        let raw = buffer.read_combined_head_tail(
362            COMPRESS_INPUT_CAP_BYTES,
363            COMPRESS_INPUT_HEAD_BYTES,
364            COMPRESS_INPUT_TAIL_BYTES,
365        );
366        let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
367        render_compressed_with_recovery(buffer, compressed, raw.truncated, disk_truncation)
368    }
369
370    fn snapshot_with_terminal_cache(
371        &self,
372        task: &Arc<BgTask>,
373        preview_bytes: usize,
374    ) -> BgTaskSnapshot {
375        let mut snapshot = task.snapshot(preview_bytes);
376        self.maybe_compress_snapshot(task, &mut snapshot);
377        snapshot
378    }
379
380    fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
381        let (metadata, buffer) = {
382            let state = task
383                .state
384                .lock()
385                .map_err(|_| "background task lock poisoned".to_string())?;
386            if !state.metadata.status.is_terminal() {
387                return Ok(());
388            }
389            (state.metadata.clone(), state.buffer.clone())
390        };
391
392        let cache = self.ensure_terminal_output_cache(task);
393        self.enqueue_completion_from_parts(
394            &metadata,
395            Some(&buffer),
396            None,
397            emit_frame,
398            cache.as_ref(),
399        );
400        Ok(())
401    }
402
403    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
404        write_task(&paths.json, metadata)?;
405        self.dual_write_task(paths, metadata);
406        Ok(())
407    }
408
409    fn update_task_metadata<F>(
410        &self,
411        paths: &TaskPaths,
412        update: F,
413    ) -> std::io::Result<PersistedTask>
414    where
415        F: FnOnce(&mut PersistedTask),
416    {
417        let metadata = update_task(&paths.json, update)?;
418        self.dual_write_task(paths, &metadata);
419        Ok(metadata)
420    }
421
422    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
423        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
424        let Some(pool) = pool else {
425            return;
426        };
427        let harness = self
428            .inner
429            .db_harness
430            .read()
431            .ok()
432            .and_then(|slot| slot.clone());
433        let Some(harness) = harness else {
434            crate::slog_warn!(
435                "dual-write bash_task to DB skipped for {}: harness not configured",
436                metadata.task_id
437            );
438            return;
439        };
440        let row = match metadata.to_bash_task_row(&harness, paths) {
441            Ok(row) => row,
442            Err(error) => {
443                crate::slog_warn!(
444                    "dual-write bash_task to DB failed for {}: {}",
445                    metadata.task_id,
446                    error
447                );
448                return;
449            }
450        };
451        let conn = match pool.lock() {
452            Ok(conn) => conn,
453            Err(_) => {
454                crate::slog_warn!(
455                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
456                    metadata.task_id
457                );
458                return;
459            }
460        };
461        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
462            crate::slog_warn!(
463                "dual-write bash_task to DB failed for {}: {}",
464                metadata.task_id,
465                error
466            );
467        }
468    }
469
470    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
471        self.inner
472            .long_running_reminder_enabled
473            .store(enabled, Ordering::SeqCst);
474        self.inner
475            .long_running_reminder_interval_ms
476            .store(interval_ms, Ordering::SeqCst);
477    }
478
479    #[cfg(unix)]
480    #[allow(clippy::too_many_arguments)]
481    pub fn spawn(
482        &self,
483        command: &str,
484        session_id: String,
485        workdir: PathBuf,
486        env: HashMap<String, String>,
487        timeout: Option<Duration>,
488        storage_dir: PathBuf,
489        max_running: usize,
490        notify_on_completion: bool,
491        compressed: bool,
492        project_root: Option<PathBuf>,
493    ) -> Result<String, String> {
494        self.start_watchdog();
495
496        let running = self.running_count();
497        if running >= max_running {
498            return Err(format!(
499                "background bash task limit exceeded: {running} running (max {max_running})"
500            ));
501        }
502
503        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
504        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
505        let task_id = self.generate_unique_task_id()?;
506        let paths = task_paths(&storage_dir, &session_id, &task_id);
507        fs::create_dir_all(&paths.dir)
508            .map_err(|e| format!("failed to create background task dir: {e}"))?;
509
510        let mut metadata = PersistedTask::starting(
511            task_id.clone(),
512            session_id.clone(),
513            command.to_string(),
514            workdir.clone(),
515            project_root,
516            timeout_ms,
517            notify_on_completion,
518            compressed,
519        );
520        self.persist_task(&paths, &metadata)
521            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
522
523        // Pre-create capture files so the watchdog/buffer can always
524        // open them for reading. The spawn helper opens its own handles
525        // per attempt because each `Command::spawn()` consumes them.
526        create_capture_file(&paths.stdout)
527            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
528        create_capture_file(&paths.stderr)
529            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
530
531        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
532            Ok(child) => child,
533            Err(error) => {
534                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
535                let _ = delete_task_bundle(&paths);
536                return Err(error);
537            }
538        };
539
540        let child_pid = child.id();
541        metadata.mark_running(child_pid, child_pid as i32);
542        self.persist_task(&paths, &metadata)
543            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
544
545        let task = Arc::new(BgTask {
546            task_id: task_id.clone(),
547            session_id,
548            paths: paths.clone(),
549            started: Instant::now(),
550            last_reminder_at: Mutex::new(None),
551            terminal_at: Mutex::new(None),
552            state: Mutex::new(BgTaskState {
553                metadata,
554                runtime: TaskRuntime::Piped(Some(child)),
555                detached: false,
556                child_exit_observed: false,
557                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
558                terminal_output_cache: None,
559                pending_terminal_override: None,
560            }),
561        });
562
563        self.inner
564            .tasks
565            .lock()
566            .map_err(|_| "background task registry lock poisoned".to_string())?
567            .insert(task_id.clone(), task);
568
569        Ok(task_id)
570    }
571
572    #[allow(clippy::too_many_arguments)]
573    pub fn spawn_pty(
574        &self,
575        command: &str,
576        session_id: String,
577        workdir: PathBuf,
578        env: HashMap<String, String>,
579        timeout: Option<Duration>,
580        storage_dir: PathBuf,
581        max_running: usize,
582        notify_on_completion: bool,
583        compressed: bool,
584        project_root: Option<PathBuf>,
585        rows: u16,
586        cols: u16,
587    ) -> Result<String, String> {
588        self.start_watchdog();
589
590        let running = self.running_count();
591        if running >= max_running {
592            return Err(format!(
593                "background bash task limit exceeded: {running} running (max {max_running})"
594            ));
595        }
596
597        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
598        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
599        let task_id = self.generate_unique_task_id()?;
600        let paths = task_paths(&storage_dir, &session_id, &task_id);
601        fs::create_dir_all(&paths.dir)
602            .map_err(|e| format!("failed to create background task dir: {e}"))?;
603
604        let mut metadata = PersistedTask::starting(
605            task_id.clone(),
606            session_id.clone(),
607            command.to_string(),
608            workdir.clone(),
609            project_root,
610            timeout_ms,
611            notify_on_completion,
612            compressed,
613        );
614        metadata.mode = BgMode::Pty;
615        metadata.pty_rows = Some(rows);
616        metadata.pty_cols = Some(cols);
617        self.persist_task(&paths, &metadata)
618            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
619        create_capture_file(&paths.pty)
620            .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
621
622        let runtime = match spawn_pty_for_command(
623            &task_id,
624            &session_id,
625            command,
626            &paths,
627            &workdir,
628            &env,
629            rows,
630            cols,
631            self.inner.wake_tx.clone(),
632        ) {
633            Ok(runtime) => runtime,
634            Err(error) => {
635                crate::slog_warn!(
636                    "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
637                );
638                let _ = delete_task_bundle(&paths);
639                return Err(error);
640            }
641        };
642
643        if let Some(child_pid) = runtime.child_pid {
644            metadata.mark_running(child_pid, child_pid as i32);
645        } else {
646            metadata.status = BgTaskStatus::Running;
647            metadata.pgid = None;
648        }
649        self.persist_task(&paths, &metadata)
650            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
651
652        let task = Arc::new(BgTask {
653            task_id: task_id.clone(),
654            session_id,
655            paths: paths.clone(),
656            started: Instant::now(),
657            last_reminder_at: Mutex::new(None),
658            terminal_at: Mutex::new(None),
659            state: Mutex::new(BgTaskState {
660                metadata,
661                runtime: TaskRuntime::Pty(Some(runtime)),
662                detached: false,
663                child_exit_observed: false,
664                buffer: BgBuffer::pty(paths.pty.clone()),
665                terminal_output_cache: None,
666                pending_terminal_override: None,
667            }),
668        });
669
670        self.inner
671            .tasks
672            .lock()
673            .map_err(|_| "background task registry lock poisoned".to_string())?
674            .insert(task_id.clone(), task);
675
676        Ok(task_id)
677    }
678
679    #[cfg(windows)]
680    #[allow(clippy::too_many_arguments)]
681    pub fn spawn(
682        &self,
683        command: &str,
684        session_id: String,
685        workdir: PathBuf,
686        env: HashMap<String, String>,
687        timeout: Option<Duration>,
688        storage_dir: PathBuf,
689        max_running: usize,
690        notify_on_completion: bool,
691        compressed: bool,
692        project_root: Option<PathBuf>,
693    ) -> Result<String, String> {
694        self.start_watchdog();
695
696        let running = self.running_count();
697        if running >= max_running {
698            return Err(format!(
699                "background bash task limit exceeded: {running} running (max {max_running})"
700            ));
701        }
702
703        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
704        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
705        let task_id = self.generate_unique_task_id()?;
706        let paths = task_paths(&storage_dir, &session_id, &task_id);
707        fs::create_dir_all(&paths.dir)
708            .map_err(|e| format!("failed to create background task dir: {e}"))?;
709
710        let mut metadata = PersistedTask::starting(
711            task_id.clone(),
712            session_id.clone(),
713            command.to_string(),
714            workdir.clone(),
715            project_root,
716            timeout_ms,
717            notify_on_completion,
718            compressed,
719        );
720        self.persist_task(&paths, &metadata)
721            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
722
723        // Capture files are pre-created so the watchdog/buffer can always
724        // open them for reading even if the child hasn't written anything
725        // yet. The spawn helper opens its own handles per attempt because
726        // each `Command::spawn()` consumes them, and on Windows we may
727        // retry across multiple shell candidates if the first one fails.
728        create_capture_file(&paths.stdout)
729            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
730        create_capture_file(&paths.stderr)
731            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
732
733        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
734            Ok(child) => child,
735            Err(error) => {
736                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
737                let _ = delete_task_bundle(&paths);
738                return Err(error);
739            }
740        };
741
742        let child_pid = child.id();
743        metadata.status = BgTaskStatus::Running;
744        metadata.child_pid = Some(child_pid);
745        metadata.pgid = None;
746        self.persist_task(&paths, &metadata)
747            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
748
749        let task = Arc::new(BgTask {
750            task_id: task_id.clone(),
751            session_id,
752            paths: paths.clone(),
753            started: Instant::now(),
754            last_reminder_at: Mutex::new(None),
755            terminal_at: Mutex::new(None),
756            state: Mutex::new(BgTaskState {
757                metadata,
758                runtime: TaskRuntime::Piped(Some(child)),
759                detached: false,
760                child_exit_observed: false,
761                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
762                terminal_output_cache: None,
763                pending_terminal_override: None,
764            }),
765        });
766
767        self.inner
768            .tasks
769            .lock()
770            .map_err(|_| "background task registry lock poisoned".to_string())?
771            .insert(task_id.clone(), task);
772
773        Ok(task_id)
774    }
775
776    pub fn write_pty(
777        &self,
778        task_id: &str,
779        session_id: &str,
780        input: &[u8],
781    ) -> Result<usize, String> {
782        let task = self
783            .task_for_session(task_id, session_id)
784            .ok_or_else(|| "task_not_found".to_string())?;
785
786        let writer = {
787            let state = task
788                .state
789                .lock()
790                .map_err(|_| "background task lock poisoned".to_string())?;
791            if state.metadata.mode != BgMode::Pty {
792                return Err("task_not_pty".to_string());
793            }
794            if state.metadata.status.is_terminal() {
795                return Err("task_exited".to_string());
796            }
797            match &state.runtime {
798                TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
799                TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
800                TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
801            }
802        };
803
804        let mut writer = writer
805            .lock()
806            .map_err(|_| "PTY writer lock poisoned".to_string())?;
807        writer
808            .write_all(input)
809            .map_err(|error| format!("failed to write to PTY: {error}"))?;
810        writer
811            .flush()
812            .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
813        Ok(input.len())
814    }
815
816    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
817        self.replay_session_inner(storage_dir, session_id, None)
818    }
819
820    pub fn replay_session_for_project(
821        &self,
822        storage_dir: &Path,
823        session_id: &str,
824        project_root: &Path,
825    ) -> Result<(), String> {
826        self.replay_session_inner(storage_dir, session_id, Some(project_root))
827    }
828
829    fn replay_session_inner(
830        &self,
831        storage_dir: &Path,
832        session_id: &str,
833        project_root: Option<&Path>,
834    ) -> Result<(), String> {
835        self.start_watchdog();
836        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
837            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
838                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
839            }
840        }
841
842        let canonical_project = project_root.map(canonicalized_path);
843        // Replay strategy: DB is the post-v0.27 source of truth. Disk
844        // fallback handles pre-v0.27 tasks that haven't been migrated and
845        // the cold-start `__default__` namespace (configure runs before any
846        // user session exists, so plugin-init triggers a session-less DB
847        // lookup that will be empty until a real session writes a task).
848        //
849        // We deliberately keep the empty-DB / empty-disk path silent — it's
850        // the normal startup case and would otherwise fire on every configure
851        // (see GitHub user report against v0.27.0). INFO-level logs only when
852        // disk actually returned tasks (real migration signal); WARN when the
853        // DB lookup itself errored.
854        let tasks = match self.replay_session_from_db(session_id) {
855            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
856            Some(Ok(_)) => {
857                let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
858                if !disk_tasks.is_empty() {
859                    crate::slog_info!(
860                        "bash task replay: 0 in DB for session {}, {} from disk fallback",
861                        session_id,
862                        disk_tasks.len()
863                    );
864                }
865                disk_tasks
866            }
867            Some(Err(error)) => {
868                crate::slog_warn!(
869                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
870                    session_id,
871                    error
872                );
873                self.replay_session_from_disk(storage_dir, session_id)?
874            }
875            None => {
876                // DB pool unconfigured — common in tests + before harness is set.
877                self.replay_session_from_disk(storage_dir, session_id)?
878            }
879        };
880
881        for mut metadata in tasks {
882            if metadata.session_id != session_id {
883                continue;
884            }
885            if let Some(canonical_project) = canonical_project.as_deref() {
886                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
887                if metadata_project.as_deref() != Some(canonical_project) {
888                    continue;
889                }
890            }
891
892            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
893            match metadata.status {
894                BgTaskStatus::Starting => {
895                    let completion_was_delivered = metadata.completion_delivered;
896                    metadata.mark_terminal(
897                        BgTaskStatus::Failed,
898                        None,
899                        Some("spawn aborted".to_string()),
900                    );
901                    metadata.completion_delivered |= completion_was_delivered;
902                    let _ = self.persist_task(&paths, &metadata);
903                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
904                    self.insert_rehydrated_task(metadata, paths, true)?;
905                }
906                BgTaskStatus::Running | BgTaskStatus::Killing => {
907                    if metadata.mode == BgMode::Pty {
908                        if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
909                            let completion_was_delivered = metadata.completion_delivered;
910                            metadata = terminal_metadata_from_marker(metadata, marker, None);
911                            metadata.completion_delivered |= completion_was_delivered;
912                            let _ = self.persist_task(&paths, &metadata);
913                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
914                            self.insert_rehydrated_task(metadata, paths, true)?;
915                        } else if metadata.status.is_terminal() {
916                            self.insert_rehydrated_task(metadata, paths, true)?;
917                        } else {
918                            let completion_was_delivered = metadata.completion_delivered;
919                            metadata.mark_terminal(
920                                BgTaskStatus::Killed,
921                                None,
922                                Some("pty_lost_on_bridge_restart".to_string()),
923                            );
924                            metadata.completion_delivered |= completion_was_delivered;
925                            let _ = self.persist_task(&paths, &metadata);
926                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
927                            self.insert_rehydrated_task(metadata, paths, true)?;
928                        }
929                    } else if self.running_metadata_is_stale(&metadata) {
930                        let completion_was_delivered = metadata.completion_delivered;
931                        metadata.mark_terminal(
932                            BgTaskStatus::Killed,
933                            None,
934                            Some("orphaned (>24h)".to_string()),
935                        );
936                        metadata.completion_delivered |= completion_was_delivered;
937                        if !paths.exit.exists() {
938                            let _ = write_kill_marker_if_absent(&paths.exit);
939                        }
940                        let _ = self.persist_task(&paths, &metadata);
941                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
942                        self.insert_rehydrated_task(metadata, paths, true)?;
943                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
944                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
945                            "recovered from inconsistent killing state on replay".to_string()
946                        });
947                        if reason.is_some() {
948                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
949                            metadata.task_id);
950                        }
951                        let completion_was_delivered = metadata.completion_delivered;
952                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
953                        metadata.completion_delivered |= completion_was_delivered;
954                        let _ = self.persist_task(&paths, &metadata);
955                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
956                        self.insert_rehydrated_task(metadata, paths, true)?;
957                    } else if metadata.status == BgTaskStatus::Killing {
958                        if !paths.exit.exists() {
959                            let _ = write_kill_marker_if_absent(&paths.exit);
960                        }
961                        let completion_was_delivered = metadata.completion_delivered;
962                        metadata.mark_terminal(
963                            BgTaskStatus::Killed,
964                            None,
965                            Some("recovered from inconsistent killing state on replay".to_string()),
966                        );
967                        metadata.completion_delivered |= completion_was_delivered;
968                        let _ = self.persist_task(&paths, &metadata);
969                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
970                        self.insert_rehydrated_task(metadata, paths, true)?;
971                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
972                        let completion_was_delivered = metadata.completion_delivered;
973                        metadata.mark_terminal(
974                            BgTaskStatus::Failed,
975                            None,
976                            Some("process exited without exit marker".to_string()),
977                        );
978                        metadata.completion_delivered |= completion_was_delivered;
979                        let _ = self.persist_task(&paths, &metadata);
980                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
981                        self.insert_rehydrated_task(metadata, paths, true)?;
982                    } else {
983                        self.insert_rehydrated_task(metadata, paths, true)?;
984                    }
985                }
986                _ if metadata.status.is_terminal() => {
987                    // Borrow `paths` for the completion enqueue BEFORE
988                    // `insert_rehydrated_task` consumes it. The completion
989                    // helper only reads from `paths` (stdout/stderr/exit) to
990                    // reconstruct a tail preview, so it must see the same
991                    // paths the rehydrated task will own.
992                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
993                    self.insert_rehydrated_task(metadata, paths, true)?;
994                }
995                _ => {}
996            }
997        }
998
999        Ok(())
1000    }
1001
1002    fn replay_session_from_db(
1003        &self,
1004        session_id: &str,
1005    ) -> Option<Result<Vec<PersistedTask>, String>> {
1006        let pool = self
1007            .inner
1008            .db_pool
1009            .read()
1010            .ok()
1011            .and_then(|slot| slot.clone())?;
1012        let harness = self
1013            .inner
1014            .db_harness
1015            .read()
1016            .ok()
1017            .and_then(|slot| slot.clone())?;
1018        let conn = match pool.lock() {
1019            Ok(conn) => conn,
1020            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1021        };
1022        Some(
1023            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1024                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1025                .map_err(|error| error.to_string()),
1026        )
1027    }
1028
1029    fn replay_session_from_disk(
1030        &self,
1031        storage_dir: &Path,
1032        session_id: &str,
1033    ) -> Result<Vec<PersistedTask>, String> {
1034        let dir = session_tasks_dir(storage_dir, session_id);
1035        if !dir.exists() {
1036            return Ok(Vec::new());
1037        }
1038
1039        let entries = fs::read_dir(&dir)
1040            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1041        let mut tasks = Vec::new();
1042        for entry in entries.flatten() {
1043            let path = entry.path();
1044            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1045                continue;
1046            }
1047            match read_task(&path) {
1048                Ok(metadata) => tasks.push(metadata),
1049                Err(error) => {
1050                    crate::slog_warn!(
1051                        "quarantining invalid background task metadata {} during replay: {error}",
1052                        path.display()
1053                    );
1054                    if let Err(quarantine_error) =
1055                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1056                    {
1057                        crate::slog_warn!(
1058                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1059                            path.display()
1060                        );
1061                    }
1062                }
1063            }
1064        }
1065        Ok(tasks)
1066    }
1067
1068    pub fn register_watch(
1069        &self,
1070        task_id: String,
1071        pattern: WatchPattern,
1072        once: bool,
1073    ) -> Result<String, &'static str> {
1074        let task = self.task(&task_id).ok_or("task_not_found")?;
1075        let (mode, terminal_at_registration, stdout, stderr, pty) = task
1076            .state
1077            .lock()
1078            .map(|state| {
1079                (
1080                    state.metadata.mode.clone(),
1081                    state.metadata.status.is_terminal(),
1082                    task.paths.stdout.clone(),
1083                    task.paths.stderr.clone(),
1084                    task.paths.pty.clone(),
1085                )
1086            })
1087            .map_err(|_| "background_task_lock_poisoned")?;
1088
1089        let mut terminal_matches = Vec::new();
1090        let scanned_terminal = terminal_at_registration;
1091        let watch_id = {
1092            let mut registry = self
1093                .inner
1094                .watch_registry
1095                .lock()
1096                .map_err(|_| "watch_registry_poisoned")?;
1097            let watch_id = registry.register(task_id.clone(), pattern, once)?;
1098            match &mode {
1099                BgMode::Pipes => {
1100                    let stdout_key = format!("{task_id}:stdout");
1101                    let stderr_key = format!("{task_id}:stderr");
1102                    if terminal_at_registration {
1103                        registry.set_file_cursor(&stdout_key, 0);
1104                        registry.set_file_cursor(&stderr_key, 0);
1105                        terminal_matches.extend(registry.scan_file_new_bytes(
1106                            &stdout_key,
1107                            &task_id,
1108                            &stdout,
1109                        ));
1110                        terminal_matches.extend(registry.scan_file_new_bytes(
1111                            &stderr_key,
1112                            &task_id,
1113                            &stderr,
1114                        ));
1115                    } else {
1116                        registry.prime_file_cursor(&stdout_key, &stdout);
1117                        registry.prime_file_cursor(&stderr_key, &stderr);
1118                    }
1119                }
1120                BgMode::Pty => {
1121                    let pty_key = format!("{task_id}:pty");
1122                    if terminal_at_registration {
1123                        registry.set_file_cursor(&pty_key, 0);
1124                        terminal_matches
1125                            .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1126                    } else {
1127                        registry.prime_file_cursor(&pty_key, &pty);
1128                    }
1129                }
1130            }
1131            watch_id
1132        };
1133
1134        if task.is_terminal() {
1135            if !scanned_terminal {
1136                terminal_matches = {
1137                    let mut registry = self
1138                        .inner
1139                        .watch_registry
1140                        .lock()
1141                        .map_err(|_| "watch_registry_poisoned")?;
1142                    match &mode {
1143                        BgMode::Pipes => {
1144                            let stdout_key = format!("{task_id}:stdout");
1145                            let stderr_key = format!("{task_id}:stderr");
1146                            registry.set_file_cursor(&stdout_key, 0);
1147                            registry.set_file_cursor(&stderr_key, 0);
1148                            let mut matches =
1149                                registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1150                            matches.extend(registry.scan_file_new_bytes(
1151                                &stderr_key,
1152                                &task_id,
1153                                &stderr,
1154                            ));
1155                            matches
1156                        }
1157                        BgMode::Pty => {
1158                            let pty_key = format!("{task_id}:pty");
1159                            registry.set_file_cursor(&pty_key, 0);
1160                            registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1161                        }
1162                    }
1163                };
1164            }
1165
1166            let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1167            if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1168                if watch_matched {
1169                    let _ = task.set_completion_delivered(true, self);
1170                    self.clear_task_watch_state(&task_id);
1171                }
1172                return Ok(watch_id);
1173            }
1174
1175            let completion = self
1176                .remove_pending_completion(&task_id)
1177                .or_else(|| self.completion_snapshot_for_task(&task));
1178            if terminal_matches.is_empty() {
1179                if let Some(completion) = completion.as_ref() {
1180                    self.emit_bash_watch_exit(completion);
1181                }
1182            } else {
1183                for pattern_match in terminal_matches {
1184                    self.emit_bash_pattern_match(&task.session_id, pattern_match);
1185                }
1186            }
1187            let _ = task.set_completion_delivered(true, self);
1188            self.clear_task_watch_state(&task_id);
1189        }
1190
1191        Ok(watch_id)
1192    }
1193
1194    pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1195        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1196            registry.unregister(task_id, watch_id);
1197        }
1198    }
1199
1200    pub fn active_watch_count(&self, task_id: &str) -> usize {
1201        self.inner
1202            .watch_registry
1203            .lock()
1204            .map(|registry| registry.active_count(task_id))
1205            .unwrap_or(0)
1206    }
1207
1208    fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1209        self.inner
1210            .watch_registry
1211            .lock()
1212            .map(|registry| {
1213                (
1214                    registry.has_controlled_task(task_id),
1215                    registry.has_matched_task(task_id),
1216                )
1217            })
1218            .unwrap_or((false, false))
1219    }
1220
1221    fn task_has_watch_control(&self, task_id: &str) -> bool {
1222        self.inner
1223            .watch_registry
1224            .lock()
1225            .map(|registry| registry.has_controlled_task(task_id))
1226            .unwrap_or(false)
1227    }
1228
1229    fn clear_task_watch_state(&self, task_id: &str) {
1230        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1231            registry.clear_task(task_id);
1232        }
1233    }
1234
1235    pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1236        let (mode, stdout, stderr, pty) = match task.state.lock() {
1237            Ok(state) => (
1238                state.metadata.mode.clone(),
1239                task.paths.stdout.clone(),
1240                task.paths.stderr.clone(),
1241                task.paths.pty.clone(),
1242            ),
1243            Err(_) => return,
1244        };
1245        let mut matches = Vec::new();
1246        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1247            match mode {
1248                BgMode::Pipes => {
1249                    let stdout_key = format!("{}:stdout", task.task_id);
1250                    let stderr_key = format!("{}:stderr", task.task_id);
1251                    matches.extend(registry.scan_file_new_bytes(
1252                        &stdout_key,
1253                        &task.task_id,
1254                        &stdout,
1255                    ));
1256                    matches.extend(registry.scan_file_new_bytes(
1257                        &stderr_key,
1258                        &task.task_id,
1259                        &stderr,
1260                    ));
1261                }
1262                BgMode::Pty => {
1263                    let pty_key = format!("{}:pty", task.task_id);
1264                    matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1265                }
1266            }
1267        }
1268        for pattern_match in matches {
1269            self.emit_bash_pattern_match(&task.session_id, pattern_match);
1270        }
1271    }
1272
1273    pub fn status(
1274        &self,
1275        task_id: &str,
1276        session_id: &str,
1277        project_root: Option<&Path>,
1278        storage_dir: Option<&Path>,
1279        preview_bytes: usize,
1280    ) -> Option<BgTaskSnapshot> {
1281        let mut task = self.task_for_session(task_id, session_id);
1282        if task.is_none() {
1283            if let Some(storage_dir) = storage_dir {
1284                let _ = if let Some(project_root) = project_root {
1285                    self.replay_session_for_project(storage_dir, session_id, project_root)
1286                } else {
1287                    self.replay_session(storage_dir, session_id)
1288                };
1289                task = self.task_for_session(task_id, session_id);
1290            }
1291        }
1292        let Some(task) = task else {
1293            return self.status_relaxed(
1294                task_id,
1295                session_id,
1296                project_root?,
1297                storage_dir?,
1298                preview_bytes,
1299            );
1300        };
1301        let _ = self.poll_task(&task);
1302        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1303    }
1304
1305    fn status_relaxed_task(
1306        &self,
1307        task_id: &str,
1308        project_root: &Path,
1309        storage_dir: &Path,
1310    ) -> Option<Arc<BgTask>> {
1311        let canonical_project = canonicalized_path(project_root);
1312        match self.lookup_relaxed_task_from_db(task_id, project_root) {
1313            Some(Ok(Some(metadata))) => {
1314                if let Some(task) = self.task(task_id) {
1315                    let matches_project = task
1316                        .state
1317                        .lock()
1318                        .map(|state| {
1319                            state
1320                                .metadata
1321                                .project_root
1322                                .as_deref()
1323                                .map(canonicalized_path)
1324                                .as_deref()
1325                                == Some(canonical_project.as_path())
1326                        })
1327                        .unwrap_or(false);
1328                    return matches_project.then_some(task);
1329                }
1330                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1331                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1332                    return None;
1333                }
1334                return self.task(task_id);
1335            }
1336            Some(Ok(None)) => {
1337                crate::slog_info!(
1338                    "bash task relaxed DB miss for {}; falling back to disk",
1339                    task_id
1340                );
1341            }
1342            Some(Err(error)) => {
1343                crate::slog_warn!(
1344                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1345                    task_id,
1346                    error
1347                );
1348            }
1349            None => {
1350                crate::slog_info!(
1351                    "bash task relaxed DB unavailable for {}; falling back to disk",
1352                    task_id
1353                );
1354            }
1355        }
1356        let root = storage_dir.join("bash-tasks");
1357        let entries = fs::read_dir(&root).ok()?;
1358        for entry in entries.flatten() {
1359            let dir = entry.path();
1360            if !dir.is_dir() {
1361                continue;
1362            }
1363            let path = dir.join(format!("{task_id}.json"));
1364            if !path.exists() {
1365                continue;
1366            }
1367            let metadata = match read_task(&path) {
1368                Ok(metadata) => metadata,
1369                Err(error) => {
1370                    crate::slog_warn!(
1371                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1372                        path.display()
1373                    );
1374                    if let Err(quarantine_error) =
1375                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1376                    {
1377                        crate::slog_warn!(
1378                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1379                            path.display()
1380                        );
1381                    }
1382                    continue;
1383                }
1384            };
1385            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1386            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1387                continue;
1388            }
1389            if let Some(task) = self.task(task_id) {
1390                let matches_project = task
1391                    .state
1392                    .lock()
1393                    .map(|state| {
1394                        state
1395                            .metadata
1396                            .project_root
1397                            .as_deref()
1398                            .map(canonicalized_path)
1399                            .as_deref()
1400                            == Some(canonical_project.as_path())
1401                    })
1402                    .unwrap_or(false);
1403                return matches_project.then_some(task);
1404            }
1405            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1406            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1407                return None;
1408            }
1409            return self.task(task_id);
1410        }
1411        None
1412    }
1413
1414    fn lookup_relaxed_task_from_db(
1415        &self,
1416        task_id: &str,
1417        project_root: &Path,
1418    ) -> Option<Result<Option<PersistedTask>, String>> {
1419        let pool = self
1420            .inner
1421            .db_pool
1422            .read()
1423            .ok()
1424            .and_then(|slot| slot.clone())?;
1425        let harness = self
1426            .inner
1427            .db_harness
1428            .read()
1429            .ok()
1430            .and_then(|slot| slot.clone())?;
1431        let conn = match pool.lock() {
1432            Ok(conn) => conn,
1433            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1434        };
1435        let project_key = crate::path_identity::project_scope_key(project_root);
1436        Some(
1437            crate::db::bash_tasks::find_bash_task_for_project(
1438                &conn,
1439                &harness,
1440                &project_key,
1441                task_id,
1442            )
1443            .map(|row| row.map(PersistedTask::from))
1444            .map_err(|error| error.to_string()),
1445        )
1446    }
1447
1448    pub(super) fn status_relaxed(
1449        &self,
1450        task_id: &str,
1451        _session_id: &str,
1452        project_root: &Path,
1453        storage_dir: &Path,
1454        preview_bytes: usize,
1455    ) -> Option<BgTaskSnapshot> {
1456        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1457        let _ = self.poll_task(&task);
1458        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1459    }
1460
1461    pub fn kill_relaxed(
1462        &self,
1463        task_id: &str,
1464        project_root: &Path,
1465        storage_dir: &Path,
1466    ) -> Result<BgTaskSnapshot, String> {
1467        let task = self
1468            .status_relaxed_task(task_id, project_root, storage_dir)
1469            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1470        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1471    }
1472
1473    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1474        #[cfg(test)]
1475        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1476
1477        let mut deleted = 0usize;
1478
1479        let root = storage_dir.join("bash-tasks");
1480        if root.exists() {
1481            let session_dirs = fs::read_dir(&root).map_err(|e| {
1482                format!(
1483                    "failed to read background task root {}: {e}",
1484                    root.display()
1485                )
1486            })?;
1487            for session_entry in session_dirs.flatten() {
1488                let session_dir = session_entry.path();
1489                if !session_dir.is_dir() {
1490                    continue;
1491                }
1492                let task_entries = match fs::read_dir(&session_dir) {
1493                    Ok(entries) => entries,
1494                    Err(error) => {
1495                        crate::slog_warn!(
1496                            "failed to read background task session dir {}: {error}",
1497                            session_dir.display()
1498                        );
1499                        continue;
1500                    }
1501                };
1502                for task_entry in task_entries.flatten() {
1503                    let json_path = task_entry.path();
1504                    if json_path
1505                        .extension()
1506                        .and_then(|extension| extension.to_str())
1507                        != Some("json")
1508                    {
1509                        continue;
1510                    }
1511                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
1512                        continue;
1513                    }
1514                    let metadata = match read_task(&json_path) {
1515                        Ok(metadata) => metadata,
1516                        Err(error) => {
1517                            crate::slog_warn!(
1518                                "quarantining corrupt background task metadata {}: {error}",
1519                                json_path.display()
1520                            );
1521                            quarantine_task_json(
1522                                storage_dir,
1523                                &session_dir,
1524                                &json_path,
1525                                QuarantineKind::Corrupt,
1526                            )?;
1527                            continue;
1528                        }
1529                    };
1530                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1531                        continue;
1532                    }
1533                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1534                    match delete_task_bundle(&paths) {
1535                        Ok(()) => {
1536                            deleted += 1;
1537                            log::debug!(
1538                                "deleted persisted background task bundle {}",
1539                                metadata.task_id
1540                            );
1541                        }
1542                        Err(error) => {
1543                            crate::slog_warn!(
1544                                "failed to delete background task bundle {}: {error}",
1545                                metadata.task_id
1546                            );
1547                            continue;
1548                        }
1549                    }
1550                }
1551            }
1552        }
1553        gc_quarantine(storage_dir);
1554        Ok(deleted)
1555    }
1556
1557    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1558        let tasks = self
1559            .inner
1560            .tasks
1561            .lock()
1562            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1563            .unwrap_or_default();
1564        tasks
1565            .into_iter()
1566            .map(|task| {
1567                let _ = self.poll_task(&task);
1568                self.snapshot_with_terminal_cache(&task, preview_bytes)
1569            })
1570            .collect()
1571    }
1572
1573    /// Replace terminal pipe snapshots with the task's cached rendered output.
1574    /// Running tasks stay raw (tail-only) so agents debugging a live process see
1575    /// exactly what it emitted. PTY tasks are explicitly excluded: their raw
1576    /// terminal bytes are rendered by the plugin's PTY path, not the line
1577    /// compressor.
1578    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1579        if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1580            return;
1581        }
1582        if let Some(cache) = self.ensure_terminal_output_cache(task) {
1583            snapshot.output_preview = cache.output_preview;
1584            snapshot.output_truncated = cache.output_truncated;
1585        }
1586    }
1587
1588    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1589        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1590    }
1591
1592    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1593        let task = self
1594            .task_for_session(task_id, session_id)
1595            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1596        let terminal_after_promote = {
1597            let mut state = task
1598                .state
1599                .lock()
1600                .map_err(|_| "background task lock poisoned".to_string())?;
1601            let updated = self
1602                .update_task_metadata(&task.paths, |metadata| {
1603                    metadata.notify_on_completion = true;
1604                    metadata.completion_delivered = false;
1605                })
1606                .map_err(|e| format!("failed to promote background task: {e}"))?;
1607            state.metadata = updated;
1608            state.metadata.status.is_terminal()
1609        };
1610        if terminal_after_promote {
1611            self.post_terminal_transition(&task, true)?;
1612        }
1613        Ok(true)
1614    }
1615
1616    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1617        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1618            .map(|_| ())
1619    }
1620
1621    pub fn cleanup_finished(&self, older_than: Duration) {
1622        let cutoff = Instant::now().checked_sub(older_than);
1623        let removable_paths: Vec<(String, TaskPaths)> =
1624            if let Ok(mut tasks) = self.inner.tasks.lock() {
1625                let removable = tasks
1626                    .iter()
1627                    .filter_map(|(task_id, task)| {
1628                        let delivered_terminal = task
1629                            .state
1630                            .lock()
1631                            .map(|state| {
1632                                state.metadata.status.is_terminal()
1633                                    && state.metadata.completion_delivered
1634                            })
1635                            .unwrap_or(false);
1636                        if !delivered_terminal {
1637                            return None;
1638                        }
1639
1640                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1641                        let expired = match (terminal_at, cutoff) {
1642                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1643                            (Some(_), None) => true,
1644                            (None, _) => false,
1645                        };
1646                        expired.then(|| task_id.clone())
1647                    })
1648                    .collect::<Vec<_>>();
1649
1650                removable
1651                    .into_iter()
1652                    .filter_map(|task_id| {
1653                        tasks
1654                            .remove(&task_id)
1655                            .map(|task| (task_id, task.paths.clone()))
1656                    })
1657                    .collect()
1658            } else {
1659                Vec::new()
1660            };
1661
1662        for (task_id, paths) in removable_paths {
1663            match delete_task_bundle(&paths) {
1664                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1665                Err(error) => crate::slog_warn!(
1666                    "failed to delete persisted background task bundle {task_id}: {error}"
1667                ),
1668            }
1669        }
1670    }
1671
1672    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1673        self.drain_completions_for_session(None)
1674    }
1675
1676    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1677        let completions = match self.inner.completions.lock() {
1678            Ok(completions) => completions,
1679            Err(_) => return Vec::new(),
1680        };
1681
1682        completions
1683            .iter()
1684            .filter(|completion| completion_matches_session(completion, session_id))
1685            .cloned()
1686            .collect()
1687    }
1688
1689    pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1690        match self.inner.completions.lock() {
1691            Ok(completions) => completions
1692                .iter()
1693                .any(|completion| completion_matches_session(completion, session_id)),
1694            // Bias to safety: if the queue state cannot be inspected cheaply,
1695            // let callers take the existing drain path rather than risk
1696            // suppressing a pending completion.
1697            Err(_) => true,
1698        }
1699    }
1700
1701    pub fn ack_completions_for_session(
1702        &self,
1703        session_id: Option<&str>,
1704        task_ids: &[String],
1705    ) -> Vec<String> {
1706        if task_ids.is_empty() {
1707            return Vec::new();
1708        }
1709        let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1710        let mut completion_sessions = HashMap::new();
1711        if let Ok(mut completions) = self.inner.completions.lock() {
1712            completions.retain(|completion| {
1713                let session_matches = session_id
1714                    .map(|session_id| completion.session_id == session_id)
1715                    .unwrap_or(true);
1716                if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1717                    completion_sessions
1718                        .insert(completion.task_id.clone(), completion.session_id.clone());
1719                    false
1720                } else {
1721                    true
1722                }
1723            });
1724        }
1725
1726        let mut delivered = Vec::new();
1727        for task_id in task_ids {
1728            let task = if let Some(session_id) = session_id {
1729                self.task_for_session(task_id, session_id)
1730            } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1731                self.task_for_session(task_id, completion_session_id)
1732            } else {
1733                self.task(task_id)
1734            };
1735            if let Some(task) = task {
1736                if task.set_completion_delivered(true, self).is_ok() {
1737                    delivered.push(task_id.clone());
1738                }
1739            }
1740        }
1741
1742        delivered
1743    }
1744
1745    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1746        self.inner
1747            .completions
1748            .lock()
1749            .map(|completions| {
1750                completions
1751                    .iter()
1752                    .filter(|completion| completion.session_id == session_id)
1753                    .cloned()
1754                    .collect()
1755            })
1756            .unwrap_or_default()
1757    }
1758
1759    fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1760        let mut completions = self.inner.completions.lock().ok()?;
1761        let idx = completions
1762            .iter()
1763            .position(|completion| completion.task_id == task_id)?;
1764        completions.remove(idx)
1765    }
1766
1767    fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1768        let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1769        if !snapshot.info.status.is_terminal() {
1770            return None;
1771        }
1772        let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1773            (String::new(), false)
1774        } else {
1775            self.ensure_terminal_output_cache(task)
1776                .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1777                .unwrap_or_else(|| (String::new(), false))
1778        };
1779        Some(BgCompletion {
1780            task_id: snapshot.info.task_id,
1781            session_id: task.session_id.clone(),
1782            status: snapshot.info.status,
1783            exit_code: snapshot.exit_code,
1784            command: snapshot.info.command,
1785            output_preview,
1786            output_truncated,
1787            original_tokens: None,
1788            compressed_tokens: None,
1789            tokens_skipped: false,
1790        })
1791    }
1792
1793    pub fn detach(&self) {
1794        self.inner.shutdown.store(true, Ordering::SeqCst);
1795        if let Ok(mut tasks) = self.inner.tasks.lock() {
1796            for task in tasks.values() {
1797                if let Ok(mut state) = task.state.lock() {
1798                    match &mut state.runtime {
1799                        TaskRuntime::Piped(child) => *child = None,
1800                        TaskRuntime::Pty(runtime) => *runtime = None,
1801                    }
1802                    state.detached = true;
1803                }
1804            }
1805            tasks.clear();
1806        }
1807    }
1808
1809    pub fn shutdown(&self) {
1810        let tasks = self
1811            .inner
1812            .tasks
1813            .lock()
1814            .map(|tasks| {
1815                tasks
1816                    .values()
1817                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1818                    .collect::<Vec<_>>()
1819            })
1820            .unwrap_or_default();
1821        for (task_id, session_id) in tasks {
1822            let _ = self.kill(&task_id, &session_id);
1823        }
1824    }
1825
1826    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1827        if let Ok(state) = task.state.lock() {
1828            if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1829                // On Windows ConPTY, the reader may not observe EOF while the
1830                // master handle is still held in `PtyRuntime`. The waiter writes
1831                // the authoritative exit marker before setting `exit_observed`,
1832                // so once exit is observed we can finalize from that marker and
1833                // drop the runtime, which lets the reader finish. Waiting for
1834                // `reader_done && exit_observed` wedges completed PTY tasks on
1835                // Windows.
1836                if !pty.exit_observed.load(Ordering::SeqCst) {
1837                    return Ok(());
1838                }
1839            }
1840        }
1841        let marker = match read_exit_marker(&task.paths.exit) {
1842            Ok(Some(marker)) => marker,
1843            Ok(None) => return Ok(()),
1844            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1845        };
1846        self.finalize_from_marker(task, marker, None)
1847    }
1848
1849    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1850        let mut needs_completion = false;
1851        {
1852            let Ok(mut state) = task.state.lock() else {
1853                return;
1854            };
1855            match &mut state.runtime {
1856                TaskRuntime::Piped(child_slot) => {
1857                    if let Some(child) = child_slot.as_mut() {
1858                        if matches!(child.try_wait(), Ok(Some(_))) {
1859                            *child_slot = None;
1860                            state.detached = true;
1861                            state.child_exit_observed = true;
1862                        }
1863                    } else if state.detached {
1864                        let child_known_dead = state.child_exit_observed
1865                            || state
1866                                .metadata
1867                                .child_pid
1868                                .is_some_and(|pid| !is_process_alive(pid));
1869                        if child_known_dead {
1870                            needs_completion =
1871                                self.fail_without_exit_marker_if_needed(task, &mut state);
1872                        }
1873                    }
1874                }
1875                TaskRuntime::Pty(Some(pty)) => {
1876                    if pty.exit_observed.load(Ordering::SeqCst) {
1877                        drop(state);
1878                        let _ = self.poll_task(task);
1879                        return;
1880                    }
1881                }
1882                TaskRuntime::Pty(None) => {}
1883            }
1884        }
1885        if needs_completion {
1886            let _ = self.post_terminal_transition(task, true);
1887        }
1888    }
1889
1890    fn fail_without_exit_marker_if_needed(
1891        &self,
1892        task: &Arc<BgTask>,
1893        state: &mut BgTaskState,
1894    ) -> bool {
1895        if state.metadata.status.is_terminal() {
1896            return false;
1897        }
1898        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1899            return false;
1900        }
1901        let watch_controlled = self.task_has_watch_control(&task.task_id);
1902        let updated = self.update_task_metadata(&task.paths, |metadata| {
1903            metadata.mark_terminal(
1904                BgTaskStatus::Failed,
1905                None,
1906                Some("process exited without exit marker".to_string()),
1907            );
1908            if watch_controlled {
1909                metadata.completion_delivered = true;
1910            }
1911        });
1912        if let Ok(metadata) = updated {
1913            state.pending_terminal_override = None;
1914            state.metadata = metadata;
1915            task.mark_terminal_now();
1916            return true;
1917        }
1918        false
1919    }
1920
1921    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1922        self.inner
1923            .tasks
1924            .lock()
1925            .map(|tasks| {
1926                tasks
1927                    .values()
1928                    .filter(|task| task.is_running())
1929                    .cloned()
1930                    .collect()
1931            })
1932            .unwrap_or_default()
1933    }
1934
1935    fn insert_rehydrated_task(
1936        &self,
1937        metadata: PersistedTask,
1938        paths: TaskPaths,
1939        detached: bool,
1940    ) -> Result<(), String> {
1941        let task_id = metadata.task_id.clone();
1942        let session_id = metadata.session_id.clone();
1943        let started = started_instant_from_unix_millis(metadata.started_at);
1944        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1945        let mode = metadata.mode.clone();
1946        let task = Arc::new(BgTask {
1947            task_id: task_id.clone(),
1948            session_id,
1949            paths: paths.clone(),
1950            started,
1951            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1952            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1953            state: Mutex::new(BgTaskState {
1954                metadata,
1955                runtime: if mode == BgMode::Pty {
1956                    TaskRuntime::Pty(None)
1957                } else {
1958                    TaskRuntime::Piped(None)
1959                },
1960                detached,
1961                // Replay path: we never observed the child handle's exit
1962                // in this process (the previous AFT process did, but its
1963                // observation didn't survive restart). Leave this false so
1964                // the second-pass reap falls through to the
1965                // `is_process_alive(child_pid)` probe rather than declaring
1966                // failure based on stale evidence.
1967                child_exit_observed: false,
1968                buffer: if mode == BgMode::Pty {
1969                    BgBuffer::pty(paths.pty.clone())
1970                } else {
1971                    BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1972                },
1973                terminal_output_cache: None,
1974                pending_terminal_override: None,
1975            }),
1976        });
1977        self.inner
1978            .tasks
1979            .lock()
1980            .map_err(|_| "background task registry lock poisoned".to_string())?
1981            .insert(task_id, task);
1982        Ok(())
1983    }
1984
1985    fn kill_with_status(
1986        &self,
1987        task_id: &str,
1988        session_id: &str,
1989        terminal_status: BgTaskStatus,
1990    ) -> Result<BgTaskSnapshot, String> {
1991        let task = self
1992            .task_for_session(task_id, session_id)
1993            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1994        let mut terminalized = false;
1995
1996        {
1997            let mut state = task
1998                .state
1999                .lock()
2000                .map_err(|_| "background task lock poisoned".to_string())?;
2001            if state.metadata.status.is_terminal() {
2002                state.pending_terminal_override = None;
2003            } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
2004                state.metadata =
2005                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
2006                if self.task_has_watch_control(&task.task_id) {
2007                    state.metadata.completion_delivered = true;
2008                }
2009                state.pending_terminal_override = None;
2010                task.mark_terminal_now();
2011                match &mut state.runtime {
2012                    // Exit marker already present: the child finished on its
2013                    // own before this kill observed it. Reap it rather than
2014                    // dropping the handle so it doesn't become a zombie
2015                    // (issue #91). The active-kill branch below already
2016                    // `wait()`s after signaling, so this is the only kill
2017                    // path that needed the explicit reap.
2018                    TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2019                    TaskRuntime::Pty(runtime) => *runtime = None,
2020                }
2021                state.detached = true;
2022                self.persist_task(&task.paths, &state.metadata)
2023                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2024                terminalized = true;
2025            } else {
2026                let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2027                if !was_already_killing {
2028                    state.metadata.status = BgTaskStatus::Killing;
2029                    self.persist_task(&task.paths, &state.metadata)
2030                        .map_err(|e| format!("failed to persist killing state: {e}"))?;
2031                }
2032
2033                #[cfg(unix)]
2034                let pgid = state.metadata.pgid;
2035                #[cfg(windows)]
2036                let child_pid = state.metadata.child_pid;
2037                if !was_already_killing
2038                    && state.metadata.mode == BgMode::Pty
2039                    && terminal_status == BgTaskStatus::TimedOut
2040                {
2041                    state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2042                }
2043
2044                #[cfg(windows)]
2045                let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2046
2047                match &mut state.runtime {
2048                    TaskRuntime::Piped(child_slot) => {
2049                        #[cfg(unix)]
2050                        if let Some(pgid) = pgid {
2051                            terminate_pgid(pgid, child_slot.as_mut());
2052                        }
2053                        #[cfg(windows)]
2054                        if let Some(child) = child_slot.as_mut() {
2055                            super::process::terminate_process(child);
2056                        } else if let Some(pid) = child_pid {
2057                            terminate_pid(pid);
2058                        }
2059                        if let Some(child) = child_slot.as_mut() {
2060                            let _ = child.wait();
2061                        }
2062                        *child_slot = None;
2063                        state.detached = true;
2064
2065                        if !task.paths.exit.exists() {
2066                            write_kill_marker_if_absent(&task.paths.exit)
2067                                .map_err(|e| format!("failed to write kill marker: {e}"))?;
2068                        }
2069
2070                        let exit_code = terminal_exit_code_for_status(&terminal_status);
2071                        state
2072                            .metadata
2073                            .mark_terminal(terminal_status, exit_code, None);
2074                        if self.task_has_watch_control(&task.task_id) {
2075                            state.metadata.completion_delivered = true;
2076                        }
2077                        state.pending_terminal_override = None;
2078                        task.mark_terminal_now();
2079                        self.persist_task(&task.paths, &state.metadata)
2080                            .map_err(|e| format!("failed to persist killed state: {e}"))?;
2081                        terminalized = true;
2082                    }
2083                    TaskRuntime::Pty(Some(pty)) => {
2084                        pty.was_killed.store(true, Ordering::SeqCst);
2085                        if let Err(error) = pty.killer.kill() {
2086                            crate::slog_warn!(
2087                                "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2088                            );
2089                        }
2090                        if let Some(pid) = pty.child_pid {
2091                            #[cfg(unix)]
2092                            terminate_pgid(pid as i32, None);
2093                            #[cfg(windows)]
2094                            terminate_pid(pid);
2095                        }
2096                        drop(pty.master.take());
2097
2098                        #[cfg(windows)]
2099                        {
2100                            let default_status = if terminal_status == BgTaskStatus::TimedOut {
2101                                BgTaskStatus::TimedOut
2102                            } else {
2103                                BgTaskStatus::Killed
2104                            };
2105                            pty_forced_terminal_status = Some(
2106                                state
2107                                    .pending_terminal_override
2108                                    .take()
2109                                    .unwrap_or(default_status),
2110                            );
2111                        }
2112                    }
2113                    TaskRuntime::Pty(None) => {}
2114                }
2115
2116                #[cfg(windows)]
2117                if let Some(target_status) = pty_forced_terminal_status {
2118                    if !task.paths.exit.exists() {
2119                        write_kill_marker_if_absent(&task.paths.exit)
2120                            .map_err(|e| format!("failed to write kill marker: {e}"))?;
2121                    }
2122
2123                    let exit_code = terminal_exit_code_for_status(&target_status);
2124                    state.metadata.mark_terminal(target_status, exit_code, None);
2125                    if self.task_has_watch_control(&task.task_id) {
2126                        state.metadata.completion_delivered = true;
2127                    }
2128                    state.pending_terminal_override = None;
2129                    task.mark_terminal_now();
2130                    if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2131                        *runtime = None;
2132                    }
2133                    state.detached = true;
2134                    self.persist_task(&task.paths, &state.metadata)
2135                        .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2136                    terminalized = true;
2137                }
2138            }
2139        }
2140
2141        if terminalized {
2142            self.post_terminal_transition(&task, true)?;
2143        }
2144        Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2145    }
2146
2147    fn finalize_from_marker(
2148        &self,
2149        task: &Arc<BgTask>,
2150        marker: ExitMarker,
2151        reason: Option<String>,
2152    ) -> Result<(), String> {
2153        let watch_controlled = self.task_has_watch_control(&task.task_id);
2154        let mut pty_reader_done = None;
2155        {
2156            let mut state = task
2157                .state
2158                .lock()
2159                .map_err(|_| "background task lock poisoned".to_string())?;
2160            if state.metadata.status.is_terminal() {
2161                state.pending_terminal_override = None;
2162                return Ok(());
2163            }
2164
2165            let pending_override = state.pending_terminal_override.take();
2166            let is_pty = state.metadata.mode == BgMode::Pty;
2167            let updated = self
2168                .update_task_metadata(&task.paths, |metadata| {
2169                    let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2170                        let mut metadata = metadata.clone();
2171                        let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2172                        let exit_code = terminal_exit_code_for_status(&target_status);
2173                        metadata.mark_terminal(target_status, exit_code, reason);
2174                        metadata
2175                    } else {
2176                        terminal_metadata_from_marker(metadata.clone(), marker, reason)
2177                    };
2178                    if watch_controlled {
2179                        new_metadata.completion_delivered = true;
2180                    }
2181                    *metadata = new_metadata;
2182                })
2183                .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2184            state.metadata = updated;
2185            task.mark_terminal_now();
2186            match &mut state.runtime {
2187                // Reap the exited direct child instead of dropping it, so it
2188                // does not linger as a `<defunct>` zombie (issue #91). The
2189                // wrapper writes the exit marker as its final act, so the
2190                // child is already exiting and `wait()` returns immediately.
2191                TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2192                TaskRuntime::Pty(runtime) => {
2193                    pty_reader_done = runtime
2194                        .as_ref()
2195                        .map(|runtime| Arc::clone(&runtime.reader_done));
2196                    *runtime = None;
2197                }
2198            }
2199            state.detached = true;
2200        }
2201
2202        if let Some(reader_done) = pty_reader_done {
2203            let deadline = Instant::now() + Duration::from_millis(200);
2204            while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2205                std::thread::sleep(Duration::from_millis(10));
2206            }
2207        }
2208
2209        // One final scan runs before terminal notification routing so bytes
2210        // printed immediately before exit can win over the exit safety net.
2211        self.scan_task_watch_output(task);
2212
2213        self.post_terminal_transition(task, true)
2214    }
2215
2216    fn enqueue_completion_if_needed(
2217        &self,
2218        metadata: &PersistedTask,
2219        paths: Option<&TaskPaths>,
2220        emit_frame: bool,
2221    ) {
2222        if metadata.status.is_terminal() && !metadata.completion_delivered {
2223            let cache =
2224                paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2225            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2226        }
2227    }
2228
2229    fn render_terminal_output_from_paths(
2230        &self,
2231        metadata: &PersistedTask,
2232        paths: &TaskPaths,
2233    ) -> Option<TerminalOutputCache> {
2234        if metadata.mode == BgMode::Pty {
2235            return None;
2236        }
2237        let mut buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2238        let disk_truncation = buffer.enforce_terminal_cap();
2239        Some(self.render_terminal_output(metadata, &buffer, disk_truncation))
2240    }
2241
2242    fn enqueue_completion_from_parts(
2243        &self,
2244        metadata: &PersistedTask,
2245        buffer: Option<&BgBuffer>,
2246        paths: Option<&TaskPaths>,
2247        emit_frame: bool,
2248        terminal_render: Option<&TerminalOutputCache>,
2249    ) {
2250        // Only the terminal-state guard prevents double-recording here. The
2251        // `completion_delivered` flag is NOT used to gate compression-event
2252        // recording, because `mark_terminal` flips `completion_delivered=true`
2253        // immediately for tasks with `notify_on_completion=false` (foreground
2254        // bash polled via `bash_status`, which is the common case). Pre-emptive
2255        // delivery flagging is correct for the push-frame queue (suppresses
2256        // duplicate user-visible notifications) but would silently skip the
2257        // database insert below. Compression event recording is idempotent at
2258        // the DB layer (unique on harness+session+task_id), so re-entry is
2259        // safe; the dedupe-by-queue check stays for the push frame side.
2260        if !metadata.status.is_terminal() {
2261            return;
2262        }
2263
2264        let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2265            paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2266        } else {
2267            None
2268        };
2269        let render_buffer = buffer.or(owned_buffer.as_ref());
2270        let owned_render = if terminal_render.is_none() {
2271            render_buffer.map(|buffer| {
2272                let mut capped_buffer = buffer.clone();
2273                let disk_truncation = capped_buffer.enforce_terminal_cap();
2274                self.render_terminal_output(metadata, &capped_buffer, disk_truncation)
2275            })
2276        } else {
2277            None
2278        };
2279        let render = terminal_render.or(owned_render.as_ref());
2280
2281        // Completion reminders use the already-rendered terminal output and a
2282        // smaller, exit-aware head+tail cap. They never invoke the compressor
2283        // themselves.
2284        let (output_preview, output_truncated) = render
2285            .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2286            .unwrap_or_else(|| (String::new(), false));
2287
2288        let token_counts = self.completion_token_counts(
2289            metadata,
2290            buffer,
2291            paths,
2292            render.map(|render| render.output_preview.as_str()),
2293        );
2294        let completion = BgCompletion {
2295            task_id: metadata.task_id.clone(),
2296            session_id: metadata.session_id.clone(),
2297            status: metadata.status.clone(),
2298            exit_code: metadata.exit_code,
2299            command: metadata.command.clone(),
2300            output_preview,
2301            output_truncated,
2302            original_tokens: token_counts.original_tokens,
2303            compressed_tokens: token_counts.compressed_tokens,
2304            tokens_skipped: token_counts.tokens_skipped,
2305        };
2306
2307        // Record the compression event BEFORE the push-frame dedupe. Event
2308        // recording has its own idempotency at the DB layer (unique key on
2309        // harness+session+task_id), so it's safe to attempt for every
2310        // terminal-state finalize. Critically, this path runs even when
2311        // `completion_delivered=true` was pre-set by `mark_terminal` for
2312        // foreground bash (`notify_on_completion=false`) — which is the common
2313        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
2314        // after the dedupe guard and never fired for foreground tasks, which
2315        // meant compression accounting was effectively dead for >99% of
2316        // real-world bash usage.
2317        self.record_compression_event_if_applicable(metadata, &token_counts);
2318
2319        let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2320        if watch_controlled {
2321            if emit_frame && !watch_matched {
2322                self.emit_bash_watch_exit(&completion);
2323            }
2324            self.clear_task_watch_state(&metadata.task_id);
2325            return;
2326        }
2327
2328        // Push-frame queue is gated on `completion_delivered` so foreground
2329        // bash with `notify_on_completion=false` does not leak a user-visible
2330        // completion notification. `mark_terminal` pre-sets
2331        // `completion_delivered=true` for those tasks; honoring it here keeps
2332        // the suppression invariant the test
2333        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
2334        // asserts. The compression-event recording above intentionally runs
2335        // before this gate so foreground bash still contributes to the
2336        // session/project aggregates.
2337        if metadata.completion_delivered {
2338            return;
2339        }
2340
2341        // Push-frame queue dedupe stays per-task to prevent duplicate
2342        // user-visible completion notifications.
2343        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2344            if completions
2345                .iter()
2346                .any(|existing| existing.task_id == metadata.task_id)
2347            {
2348                false
2349            } else {
2350                completions.push_back(completion.clone());
2351                true
2352            }
2353        } else {
2354            false
2355        };
2356
2357        if pushed && emit_frame {
2358            self.emit_bash_completed(completion);
2359        }
2360    }
2361
2362    fn record_compression_event_if_applicable(
2363        &self,
2364        metadata: &PersistedTask,
2365        token_counts: &CompletionTokenCounts,
2366    ) {
2367        if metadata.mode == BgMode::Pty {
2368            return;
2369        }
2370
2371        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2372            token_counts.original_tokens,
2373            token_counts.compressed_tokens,
2374            token_counts.original_bytes,
2375            token_counts.compressed_bytes,
2376        ) {
2377            (
2378                Some(original_tokens),
2379                Some(compressed_tokens),
2380                Some(original_bytes),
2381                Some(compressed_bytes),
2382            ) => (
2383                original_tokens,
2384                compressed_tokens,
2385                original_bytes,
2386                compressed_bytes,
2387            ),
2388            _ => {
2389                crate::slog_warn!(
2390                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2391                    metadata.task_id
2392                );
2393                return;
2394            }
2395        };
2396
2397        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2398        let Some(pool) = pool else {
2399            crate::slog_warn!(
2400                "compression event skipped for {}: db_pool not initialized — was configure run?",
2401                metadata.task_id
2402            );
2403            return;
2404        };
2405        let harness = self
2406            .inner
2407            .db_harness
2408            .read()
2409            .ok()
2410            .and_then(|slot| slot.clone());
2411        let Some(harness) = harness else {
2412            crate::slog_warn!(
2413                "compression event insert skipped for {}: harness not configured",
2414                metadata.task_id
2415            );
2416            return;
2417        };
2418
2419        let project_root = metadata
2420            .project_root
2421            .as_deref()
2422            .unwrap_or(&metadata.workdir);
2423        let project_key = crate::path_identity::project_scope_key(project_root);
2424        let row = crate::db::compression_events::CompressionEventRow {
2425            harness: &harness,
2426            session_id: Some(&metadata.session_id),
2427            project_key: &project_key,
2428            tool: "bash",
2429            task_id: Some(&metadata.task_id),
2430            command: Some(&metadata.command),
2431            compressor: if metadata.compressed {
2432                "registry"
2433            } else {
2434                "none"
2435            },
2436            original_bytes,
2437            compressed_bytes,
2438            original_tokens,
2439            compressed_tokens,
2440            created_at: unix_millis() as i64,
2441        };
2442
2443        let conn = match pool.lock() {
2444            Ok(conn) => conn,
2445            Err(_) => {
2446                crate::slog_warn!(
2447                    "compression event insert failed for {}: db mutex poisoned",
2448                    metadata.task_id
2449                );
2450                return;
2451            }
2452        };
2453        match crate::db::compression_events::insert_compression_event(&conn, &row) {
2454            Ok(_) => {
2455                // DEBUG-level: each foreground bash call records one of these,
2456                // which clutters info-level logs without adding diagnostic value.
2457                // Aggregate totals are visible via the status RPC / TUI sidebar.
2458                crate::slog_debug!(
2459                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2460                    metadata.task_id,
2461                    project_key,
2462                    metadata.session_id,
2463                    original_tokens,
2464                    compressed_tokens
2465                );
2466            }
2467            Err(error) => {
2468                crate::slog_warn!(
2469                    "compression event insert failed for {}: {}",
2470                    metadata.task_id,
2471                    error
2472                );
2473            }
2474        }
2475    }
2476
2477    fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2478        let Ok(progress_sender) = self
2479            .inner
2480            .progress_sender
2481            .lock()
2482            .map(|sender| sender.clone())
2483        else {
2484            return;
2485        };
2486        if let Some(sender) = progress_sender.as_ref() {
2487            sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2488                pattern_match.task_id,
2489                session_id.to_string(),
2490                pattern_match.watch_id,
2491                pattern_match.match_text,
2492                pattern_match.match_offset,
2493                pattern_match.context,
2494                pattern_match.once,
2495            )));
2496        }
2497    }
2498
2499    fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2500        let Ok(progress_sender) = self
2501            .inner
2502            .progress_sender
2503            .lock()
2504            .map(|sender| sender.clone())
2505        else {
2506            return;
2507        };
2508        let Some(sender) = progress_sender.as_ref() else {
2509            return;
2510        };
2511        let status = completion_status_text(&completion.status, completion.exit_code);
2512        let preview = completion.output_preview.trim_end();
2513        let context = if preview.is_empty() {
2514            format!("task {} exited ({status})", completion.task_id)
2515        } else {
2516            format!(
2517                "task {} exited ({status})
2518{preview}",
2519                completion.task_id
2520            )
2521        };
2522        sender(PushFrame::BashPatternMatch(
2523            BashPatternMatchFrame::task_exit(
2524                completion.task_id.clone(),
2525                completion.session_id.clone(),
2526                format!("exited ({status})"),
2527                context,
2528            ),
2529        ));
2530    }
2531
2532    fn emit_bash_completed(&self, completion: BgCompletion) {
2533        let Ok(progress_sender) = self
2534            .inner
2535            .progress_sender
2536            .lock()
2537            .map(|sender| sender.clone())
2538        else {
2539            return;
2540        };
2541        let Some(sender) = progress_sender.as_ref() else {
2542            return;
2543        };
2544        // Clone the callback out of the registry mutex before writing to stdout;
2545        // otherwise a blocked push-frame write could pin the mutex and starve
2546        // unrelated progress-sender updates.
2547        // Bg task transitions are discovered by the watchdog thread, so the
2548        // sender is shared behind a Mutex. It still uses the same stdout writer
2549        // closure as foreground progress frames, preserving the existing lock/
2550        // flush behavior in main.rs.
2551        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2552            completion.task_id,
2553            completion.session_id,
2554            completion.status,
2555            completion.exit_code,
2556            completion.command,
2557            completion.output_preview,
2558            completion.output_truncated,
2559            completion.original_tokens,
2560            completion.compressed_tokens,
2561            completion.tokens_skipped,
2562        )));
2563    }
2564
2565    fn completion_token_counts(
2566        &self,
2567        metadata: &PersistedTask,
2568        buffer: Option<&BgBuffer>,
2569        paths: Option<&TaskPaths>,
2570        rendered_output: Option<&str>,
2571    ) -> CompletionTokenCounts {
2572        if metadata.mode == BgMode::Pty {
2573            return CompletionTokenCounts::skipped();
2574        }
2575
2576        let raw = match buffer {
2577            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2578            None => paths
2579                .map(|paths| {
2580                    read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2581                })
2582                .unwrap_or(TokenCountInput::Skipped),
2583        };
2584
2585        let TokenCountInput::Text(raw_output) = raw else {
2586            return CompletionTokenCounts::skipped();
2587        };
2588
2589        let original_tokens = token_count_u32(&raw_output);
2590        let original_bytes = raw_output.len() as i64;
2591        let compressed_output = rendered_output.unwrap_or(&raw_output);
2592        let compressed_tokens = token_count_u32(compressed_output);
2593        let compressed_bytes = compressed_output.len() as i64;
2594        CompletionTokenCounts {
2595            original_tokens: Some(original_tokens),
2596            compressed_tokens: Some(compressed_tokens),
2597            original_bytes: Some(original_bytes),
2598            compressed_bytes: Some(compressed_bytes),
2599            tokens_skipped: false,
2600        }
2601    }
2602
2603    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2604        if !self
2605            .inner
2606            .long_running_reminder_enabled
2607            .load(Ordering::SeqCst)
2608        {
2609            return;
2610        }
2611        let interval_ms = self
2612            .inner
2613            .long_running_reminder_interval_ms
2614            .load(Ordering::SeqCst);
2615        if interval_ms == 0 {
2616            return;
2617        }
2618        let interval = Duration::from_millis(interval_ms);
2619        let now = Instant::now();
2620        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2621            return;
2622        };
2623        let since = last_reminder_at.unwrap_or(task.started);
2624        if now.duration_since(since) < interval {
2625            return;
2626        }
2627        let command = task
2628            .state
2629            .lock()
2630            .map(|state| state.metadata.command.clone())
2631            .unwrap_or_default();
2632        *last_reminder_at = Some(now);
2633        self.emit_bash_long_running(BashLongRunningFrame::new(
2634            task.task_id.clone(),
2635            task.session_id.clone(),
2636            command,
2637            task.started.elapsed().as_millis() as u64,
2638        ));
2639    }
2640
2641    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2642        let Ok(progress_sender) = self
2643            .inner
2644            .progress_sender
2645            .lock()
2646            .map(|sender| sender.clone())
2647        else {
2648            return;
2649        };
2650        if let Some(sender) = progress_sender.as_ref() {
2651            sender(PushFrame::BashLongRunning(frame));
2652        }
2653    }
2654
2655    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2656        self.inner
2657            .tasks
2658            .lock()
2659            .ok()
2660            .and_then(|tasks| tasks.get(task_id).cloned())
2661    }
2662
2663    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2664        self.task(task_id)
2665            .filter(|task| task.session_id == session_id)
2666    }
2667
2668    fn running_count(&self) -> usize {
2669        self.inner
2670            .tasks
2671            .lock()
2672            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2673            .unwrap_or(0)
2674    }
2675
2676    fn start_watchdog(&self) {
2677        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2678            super::watchdog::start(self.clone());
2679        }
2680    }
2681
2682    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2683        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2684    }
2685
2686    #[cfg(test)]
2687    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2688        self.task_for_session(task_id, session_id)
2689            .map(|task| task.paths.json.clone())
2690    }
2691
2692    #[cfg(test)]
2693    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2694        self.task_for_session(task_id, session_id)
2695            .map(|task| task.paths.exit.clone())
2696    }
2697
2698    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
2699    fn generate_unique_task_id(&self) -> Result<String, String> {
2700        for _ in 0..32 {
2701            let candidate = random_slug();
2702            let tasks = self
2703                .inner
2704                .tasks
2705                .lock()
2706                .map_err(|_| "background task registry lock poisoned".to_string())?;
2707            if tasks.contains_key(&candidate) {
2708                continue;
2709            }
2710            let completions = self
2711                .inner
2712                .completions
2713                .lock()
2714                .map_err(|_| "background completions lock poisoned".to_string())?;
2715            if completions
2716                .iter()
2717                .any(|completion| completion.task_id == candidate)
2718            {
2719                continue;
2720            }
2721            return Ok(candidate);
2722        }
2723        Err("failed to allocate unique background task id after 32 attempts".to_string())
2724    }
2725}
2726
2727fn render_compressed_with_recovery(
2728    buffer: &BgBuffer,
2729    mut compressed: CompressionResult,
2730    input_truncated: bool,
2731    disk_truncation: DiskTruncation,
2732) -> TerminalOutputCache {
2733    // Preserve a single canonical trailing newline. A bare `.trim_end()` strips
2734    // the legitimate final newline that `echo` and most commands emit, so
2735    // agent-facing output diverged from native bash ("hello" vs "hello\n") and
2736    // broke the no-JSON-envelope contract. Collapse excess trailing blank lines
2737    // to one, but keep that one when the content had a trailing newline. NOTE:
2738    // the check must read the ORIGINAL text — strip_plain_truncation_marker_lines
2739    // rebuilds via `.lines().join("\n")`, which itself drops the trailing newline.
2740    let had_trailing_newline = compressed.text.ends_with('\n');
2741    let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2742        .trim_end()
2743        .to_string();
2744    if had_trailing_newline && !text.is_empty() {
2745        text.push('\n');
2746    }
2747    compressed.text = text;
2748
2749    let output_path = buffer.output_path().map(|path| path.display().to_string());
2750    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2751    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2752    let mut recovery = RecoveryContext {
2753        dropped_by_class: compressed.dropped_by_class,
2754        had_inner_drop: compressed.had_inner_drop,
2755        offset_hint_eligible: compressed.offset_hint_eligible,
2756        offset_start_line: compressed.offset_start_line,
2757        byte_truncated: input_truncated,
2758        disk_truncated_prefix_bytes: disk_truncation.total_prefix_bytes(),
2759        output_path: output_path.clone(),
2760        stderr_path: stderr_path.clone(),
2761        include_stderr_path,
2762    };
2763
2764    let (output_preview, output_truncated) =
2765        render_body_with_recovery_marker(&compressed.text, &mut recovery);
2766    TerminalOutputCache {
2767        output_preview,
2768        output_truncated,
2769        kind: TerminalOutputKind::Compressed,
2770        output_path,
2771        stderr_path,
2772        recovery: Some(recovery),
2773    }
2774}
2775
2776fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2777    render_body_with_recovery_marker_at_cap(
2778        body,
2779        recovery,
2780        FINAL_OUTPUT_CAP_BYTES,
2781        cap_final_output,
2782        cap_final_output_with_marker,
2783    )
2784}
2785
2786fn render_raw_body_with_recovery_marker(
2787    body: &str,
2788    recovery: &mut RecoveryContext,
2789) -> (String, bool) {
2790    render_body_with_recovery_marker_at_cap(
2791        body,
2792        recovery,
2793        RAW_PASSTHROUGH_CAP_BYTES,
2794        |input| {
2795            super::output::cap_head_tail(
2796                input,
2797                RAW_PASSTHROUGH_CAP_BYTES,
2798                RAW_PASSTHROUGH_HEAD_BYTES,
2799                RAW_PASSTHROUGH_TAIL_BYTES,
2800            )
2801        },
2802        |input, marker| {
2803            super::output::cap_head_tail_with_marker(
2804                input,
2805                RAW_PASSTHROUGH_CAP_BYTES,
2806                RAW_PASSTHROUGH_HEAD_BYTES,
2807                RAW_PASSTHROUGH_TAIL_BYTES,
2808                marker,
2809            )
2810        },
2811    )
2812}
2813
2814fn render_body_with_recovery_marker_at_cap<F, G>(
2815    body: &str,
2816    recovery: &mut RecoveryContext,
2817    cap_bytes: usize,
2818    cap_plain: F,
2819    cap_with_marker: G,
2820) -> (String, bool)
2821where
2822    F: Fn(&str) -> super::output::CappedText,
2823    G: Fn(&str, &str) -> super::output::CappedText,
2824{
2825    let needs_marker = recovery.has_visible_drop();
2826    if body.len() > cap_bytes {
2827        recovery.byte_truncated = true;
2828        if let Some(marker) = recovery_marker(recovery) {
2829            let capped = cap_with_marker(body, &marker);
2830            return (capped.text, true);
2831        }
2832        let capped = cap_plain(body);
2833        return (capped.text, capped.truncated || needs_marker);
2834    }
2835
2836    if !needs_marker {
2837        return (body.to_string(), false);
2838    }
2839
2840    let Some(marker) = recovery_marker(recovery) else {
2841        return (body.to_string(), true);
2842    };
2843    let with_marker = append_recovery_marker(body, &marker);
2844    if with_marker.len() <= cap_bytes {
2845        return (with_marker, true);
2846    }
2847
2848    recovery.byte_truncated = true;
2849    let marker = recovery_marker(recovery).unwrap_or(marker);
2850    let capped = cap_with_marker(body, &marker);
2851    (capped.text, true)
2852}
2853
2854fn append_recovery_marker(body: &str, marker: &str) -> String {
2855    if body.is_empty() {
2856        return marker.to_string();
2857    }
2858    let mut output = body.trim_end().to_string();
2859    output.push('\n');
2860    output.push_str(marker);
2861    output
2862}
2863
2864fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2865    let mut parts = Vec::new();
2866    for (class, count) in &recovery.dropped_by_class {
2867        let label = if *count == 1 {
2868            class.singular()
2869        } else {
2870            class.plural()
2871        };
2872        parts.push(format!("+{count} more {label}"));
2873    }
2874    if recovery.byte_truncated {
2875        parts.push("truncated output".to_string());
2876    }
2877    let disk_truncated_prefix_bytes = recovery.disk_truncated_prefix_bytes;
2878    if disk_truncated_prefix_bytes > 0 {
2879        parts.push(format!(
2880            "truncated {disk_truncated_prefix_bytes} bytes from saved output prefix"
2881        ));
2882    } else if recovery.had_inner_drop && parts.is_empty() {
2883        parts.push("omitted output".to_string());
2884    }
2885
2886    if parts.is_empty() {
2887        return None;
2888    }
2889
2890    let hint = recovery_hint(recovery);
2891    Some(format!("[{}; {hint}]", parts.join(", ")))
2892}
2893
2894fn recovery_hint(recovery: &RecoveryContext) -> String {
2895    // AFT stores stdout/stderr separately and combines them in memory. Class caps,
2896    // middle truncation, and mixed stdout/stderr renders are not line-offset
2897    // portable. Only a single-file contiguous-prefix drop may use `tail -n +N`.
2898    if recovery.offset_hint_eligible
2899        && !recovery.byte_truncated
2900        && recovery.dropped_by_class.is_empty()
2901        && !recovery.include_stderr_path
2902    {
2903        if let (Some(path), Some(line)) =
2904            (recovery.output_path.as_deref(), recovery.offset_start_line)
2905        {
2906            return format!("see remaining: tail -n +{line} {}", quote_path(path));
2907        }
2908    }
2909
2910    let mut paths = Vec::new();
2911    if let Some(path) = recovery.output_path.as_deref() {
2912        paths.push(path);
2913    }
2914    if recovery.include_stderr_path {
2915        if let Some(path) = recovery.stderr_path.as_deref() {
2916            if !paths.contains(&path) {
2917                paths.push(path);
2918            }
2919        }
2920    }
2921
2922    if paths.is_empty() {
2923        return "full output unavailable".to_string();
2924    }
2925
2926    let reads = paths
2927        .into_iter()
2928        .map(|path| format!("read {}", quote_path(path)))
2929        .collect::<Vec<_>>()
2930        .join(" and ");
2931    if recovery.disk_truncated_prefix_bytes > 0 {
2932        format!("retained output: {reads}")
2933    } else {
2934        format!("full output: {reads}")
2935    }
2936}
2937
2938fn strip_plain_truncation_marker_lines(input: &str) -> String {
2939    input
2940        .lines()
2941        .filter(|line| !is_plain_truncation_marker(line.trim()))
2942        .collect::<Vec<_>>()
2943        .join("\n")
2944}
2945
2946fn strip_recovery_marker_lines(input: &str) -> String {
2947    input
2948        .lines()
2949        .filter(|line| !is_recovery_marker(line.trim()))
2950        .collect::<Vec<_>>()
2951        .join("\n")
2952}
2953
2954fn is_plain_truncation_marker(line: &str) -> bool {
2955    let Some(rest) = line.strip_prefix("...<truncated ") else {
2956        return false;
2957    };
2958    let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2959        return false;
2960    };
2961    !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2962}
2963
2964fn is_recovery_marker(line: &str) -> bool {
2965    line.starts_with('[')
2966        && line.ends_with(']')
2967        && (line.contains("full output: read ")
2968            || line.contains("retained output: read ")
2969            || line.contains("see remaining: tail -n +")
2970            || line.contains("full output unavailable"))
2971}
2972
2973fn render_structured_output(
2974    command: &str,
2975    buffer: &BgBuffer,
2976    disk_truncation: DiskTruncation,
2977) -> Option<TerminalOutputCache> {
2978    if !is_gh_structured_command(command) {
2979        return None;
2980    }
2981
2982    let output_path = buffer
2983        .output_path()
2984        .map(|path| path.display().to_string())?;
2985    let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2986    if stdout_bytes == 0 {
2987        return None;
2988    }
2989
2990    if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2991        if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2992            return None;
2993        }
2994        let output_preview = if disk_truncation.total_prefix_bytes() > 0 {
2995            retained_json_output_pointer(
2996                stdout_bytes,
2997                &output_path,
2998                disk_truncation.total_prefix_bytes(),
2999            )
3000        } else {
3001            json_output_pointer(stdout_bytes, &output_path)
3002        };
3003        return Some(TerminalOutputCache {
3004            output_preview,
3005            output_truncated: true,
3006            kind: TerminalOutputKind::Structured,
3007            output_path: Some(output_path),
3008            stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3009            recovery: None,
3010        });
3011    }
3012
3013    let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
3014    if stdout.truncated || !is_structured_body(&stdout.text) {
3015        return None;
3016    }
3017
3018    Some(TerminalOutputCache {
3019        output_preview: stdout.text,
3020        output_truncated: false,
3021        kind: TerminalOutputKind::Structured,
3022        output_path: Some(output_path),
3023        stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3024        recovery: None,
3025    })
3026}
3027
3028fn render_raw_passthrough(
3029    buffer: &BgBuffer,
3030    disk_truncation: DiskTruncation,
3031) -> TerminalOutputCache {
3032    let raw = buffer.read_combined_head_tail(
3033        RAW_PASSTHROUGH_CAP_BYTES,
3034        RAW_PASSTHROUGH_HEAD_BYTES,
3035        RAW_PASSTHROUGH_TAIL_BYTES,
3036    );
3037    let output_path = buffer.output_path().map(|path| path.display().to_string());
3038    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3039    if !raw.truncated && disk_truncation.total_prefix_bytes() == 0 {
3040        return TerminalOutputCache {
3041            output_preview: raw.text,
3042            output_truncated: false,
3043            kind: TerminalOutputKind::Raw,
3044            output_path,
3045            stderr_path,
3046            recovery: None,
3047        };
3048    }
3049
3050    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3051    let mut recovery = RecoveryContext {
3052        dropped_by_class: BTreeMap::new(),
3053        had_inner_drop: false,
3054        offset_hint_eligible: false,
3055        offset_start_line: None,
3056        byte_truncated: raw.truncated,
3057        disk_truncated_prefix_bytes: disk_truncation.total_prefix_bytes(),
3058        output_path: output_path.clone(),
3059        stderr_path: stderr_path.clone(),
3060        include_stderr_path,
3061    };
3062    let (output_preview, output_truncated) =
3063        render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3064    TerminalOutputCache {
3065        output_preview,
3066        output_truncated,
3067        kind: TerminalOutputKind::Raw,
3068        output_path,
3069        stderr_path,
3070        recovery: Some(recovery),
3071    }
3072}
3073
3074fn completion_preview_for_cache(
3075    cache: &TerminalOutputCache,
3076    exit_code: Option<i32>,
3077) -> (String, bool) {
3078    // Reminder previews are sized by exit status: success gets a short tail,
3079    // failure keeps head+tail context (see output.rs completion caps).
3080    let exit_ok = exit_code == Some(0);
3081    let threshold = completion_preview_threshold(exit_ok);
3082    if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3083        if let Some(path) = cache.output_path.as_deref() {
3084            return (
3085                json_output_pointer(cache.output_preview.len() as u64, path),
3086                true,
3087            );
3088        }
3089        return (cache.output_preview.clone(), cache.output_truncated);
3090    }
3091
3092    if let Some(recovery) = cache.recovery.as_ref() {
3093        if cache.output_preview.len() <= threshold {
3094            return (cache.output_preview.clone(), cache.output_truncated);
3095        }
3096        let body = strip_recovery_marker_lines(&cache.output_preview);
3097        let mut completion_recovery = recovery.clone();
3098        completion_recovery.byte_truncated = true;
3099        if let Some(marker) = recovery_marker(&completion_recovery) {
3100            let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3101            return (capped.text, true);
3102        }
3103    }
3104
3105    let capped = cap_completion_output(&cache.output_preview, exit_ok);
3106    (capped.text, cache.output_truncated || capped.truncated)
3107}
3108
3109fn is_gh_structured_command(command: &str) -> bool {
3110    let Some(normalized) = crate::compress::plain_command_for_structured_output(command) else {
3111        return false;
3112    };
3113    let tokens = shell_words_for_flags(&normalized);
3114    let Some(head) = tokens.first() else {
3115        return false;
3116    };
3117    let head_name = Path::new(head)
3118        .file_name()
3119        .and_then(|name| name.to_str())
3120        .unwrap_or(head);
3121    if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3122        return false;
3123    }
3124    tokens.iter().any(|token| {
3125        matches!(token.as_str(), "--json" | "--jq" | "--template")
3126            || token.starts_with("--json=")
3127            || token.starts_with("--jq=")
3128            || token.starts_with("--template=")
3129    })
3130}
3131
3132fn shell_words_for_flags(command: &str) -> Vec<String> {
3133    let mut words = Vec::new();
3134    let mut current = String::new();
3135    let mut in_single = false;
3136    let mut in_double = false;
3137    let mut escaped = false;
3138
3139    for ch in command.chars() {
3140        if escaped {
3141            current.push(ch);
3142            escaped = false;
3143            continue;
3144        }
3145        if ch == '\\' && !in_single {
3146            escaped = true;
3147            continue;
3148        }
3149        if ch == '\'' && !in_double {
3150            in_single = !in_single;
3151            continue;
3152        }
3153        if ch == '"' && !in_single {
3154            in_double = !in_double;
3155            continue;
3156        }
3157        if ch.is_whitespace() && !in_single && !in_double {
3158            if !current.is_empty() {
3159                words.push(std::mem::take(&mut current));
3160            }
3161            continue;
3162        }
3163        if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3164            if !current.is_empty() {
3165                words.push(std::mem::take(&mut current));
3166            }
3167            continue;
3168        }
3169        current.push(ch);
3170    }
3171    if !current.is_empty() {
3172        words.push(current);
3173    }
3174    words
3175}
3176
3177fn is_structured_body(body: &str) -> bool {
3178    let trimmed = body.trim();
3179    if trimmed.is_empty() {
3180        return false;
3181    }
3182    if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3183        return true;
3184    }
3185
3186    let mut saw_line = false;
3187    for line in trimmed
3188        .lines()
3189        .map(str::trim)
3190        .filter(|line| !line.is_empty())
3191    {
3192        saw_line = true;
3193        if serde_json::from_str::<serde_json::Value>(line).is_err() {
3194            return false;
3195        }
3196    }
3197    saw_line
3198}
3199
3200fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3201    let path = match (buffer, stream) {
3202        (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3203        (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3204        (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3205    };
3206    let Some(path) = path else {
3207        return false;
3208    };
3209    let Ok(file) = std::fs::File::open(path) else {
3210        return false;
3211    };
3212    let mut limited = file.take(512);
3213    let mut bytes = Vec::new();
3214    if limited.read_to_end(&mut bytes).is_err() {
3215        return false;
3216    }
3217    String::from_utf8_lossy(&bytes)
3218        .chars()
3219        .find(|ch| !ch.is_whitespace())
3220        .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3221}
3222
3223struct CompletionTokenCounts {
3224    original_tokens: Option<u32>,
3225    compressed_tokens: Option<u32>,
3226    original_bytes: Option<i64>,
3227    compressed_bytes: Option<i64>,
3228    tokens_skipped: bool,
3229}
3230
3231impl CompletionTokenCounts {
3232    fn skipped() -> Self {
3233        Self {
3234            original_tokens: None,
3235            compressed_tokens: None,
3236            original_bytes: None,
3237            compressed_bytes: None,
3238            tokens_skipped: true,
3239        }
3240    }
3241}
3242
3243fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3244    match status {
3245        BgTaskStatus::TimedOut => "timed out".to_string(),
3246        BgTaskStatus::Killed => "killed".to_string(),
3247        _ => exit_code
3248            .map(|code| format!("exit {code}"))
3249            .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3250    }
3251}
3252
3253fn token_count_u32(text: &str) -> u32 {
3254    aft_tokenizer::count_tokens(text)
3255        .try_into()
3256        .unwrap_or(u32::MAX)
3257}
3258
3259impl Default for BgTaskRegistry {
3260    fn default() -> Self {
3261        Self::new(Arc::new(Mutex::new(None)))
3262    }
3263}
3264
3265fn modified_within(path: &Path, grace: Duration) -> bool {
3266    fs::metadata(path)
3267        .and_then(|metadata| metadata.modified())
3268        .ok()
3269        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3270        .map(|age| age < grace)
3271        .unwrap_or(false)
3272}
3273
3274fn canonicalized_path(path: &Path) -> PathBuf {
3275    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3276}
3277
3278fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3279    let now_ms = SystemTime::now()
3280        .duration_since(UNIX_EPOCH)
3281        .ok()
3282        .map(|duration| duration.as_millis() as u64)
3283        .unwrap_or(started_at);
3284    let elapsed_ms = now_ms.saturating_sub(started_at);
3285    Instant::now()
3286        .checked_sub(Duration::from_millis(elapsed_ms))
3287        .unwrap_or_else(Instant::now)
3288}
3289
3290fn gc_quarantine(storage_dir: &Path) {
3291    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3292    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3293        return;
3294    };
3295    for session_entry in session_dirs.flatten() {
3296        let session_quarantine_dir = session_entry.path();
3297        if !session_quarantine_dir.is_dir() {
3298            continue;
3299        }
3300        let entries = match fs::read_dir(&session_quarantine_dir) {
3301            Ok(entries) => entries,
3302            Err(error) => {
3303                crate::slog_warn!(
3304                    "failed to read background task quarantine dir {}: {error}",
3305                    session_quarantine_dir.display()
3306                );
3307                continue;
3308            }
3309        };
3310        for entry in entries.flatten() {
3311            let path = entry.path();
3312            if modified_within(&path, QUARANTINE_GC_GRACE) {
3313                continue;
3314            }
3315            let result = if path.is_dir() {
3316                fs::remove_dir_all(&path)
3317            } else {
3318                fs::remove_file(&path)
3319            };
3320            match result {
3321                Ok(()) => log::debug!(
3322                    "deleted old background task quarantine entry {}",
3323                    path.display()
3324                ),
3325                Err(error) => crate::slog_warn!(
3326                    "failed to delete old background task quarantine entry {}: {error}",
3327                    path.display()
3328                ),
3329            }
3330        }
3331        let _ = fs::remove_dir(&session_quarantine_dir);
3332    }
3333    let _ = fs::remove_dir(&quarantine_root);
3334}
3335
3336enum QuarantineKind {
3337    Corrupt,
3338    Invalid,
3339}
3340
3341fn quarantine_task_json(
3342    storage_dir: &Path,
3343    session_dir: &Path,
3344    json_path: &Path,
3345    kind: QuarantineKind,
3346) -> Result<(), String> {
3347    let session_hash = session_dir
3348        .file_name()
3349        .and_then(|name| name.to_str())
3350        .ok_or_else(|| {
3351            format!(
3352                "invalid background task session dir: {}",
3353                session_dir.display()
3354            )
3355        })?;
3356    let task_name = json_path
3357        .file_name()
3358        .and_then(|name| name.to_str())
3359        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3360    let unix_ts = SystemTime::now()
3361        .duration_since(UNIX_EPOCH)
3362        .map(|duration| duration.as_secs())
3363        .unwrap_or(0);
3364    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3365    fs::create_dir_all(&quarantine_dir).map_err(|e| {
3366        format!(
3367            "failed to create background task quarantine dir {}: {e}",
3368            quarantine_dir.display()
3369        )
3370    })?;
3371    let target_name = quarantine_name(task_name, unix_ts, &kind);
3372    let target = quarantine_dir.join(target_name);
3373    fs::rename(json_path, &target).map_err(|e| {
3374        format!(
3375            "failed to quarantine background task metadata {} to {}: {e}",
3376            json_path.display(),
3377            target.display()
3378        )
3379    })?;
3380
3381    for sibling in task_sibling_paths(json_path) {
3382        if !sibling.exists() {
3383            continue;
3384        }
3385        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3386            crate::slog_warn!(
3387                "skipping background task sibling with invalid name during quarantine: {}",
3388                sibling.display()
3389            );
3390            continue;
3391        };
3392        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3393        if let Err(error) = fs::rename(&sibling, &sibling_target) {
3394            crate::slog_warn!(
3395                "failed to quarantine background task sibling {} to {}: {error}",
3396                sibling.display(),
3397                sibling_target.display()
3398            );
3399        }
3400    }
3401
3402    let _ = fs::remove_dir(session_dir);
3403    Ok(())
3404}
3405
3406fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3407    match kind {
3408        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3409        QuarantineKind::Invalid => {
3410            let path = Path::new(file_name);
3411            let stem = path.file_stem().and_then(|stem| stem.to_str());
3412            let extension = path.extension().and_then(|extension| extension.to_str());
3413            match (stem, extension) {
3414                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3415                _ => format!("{file_name}.invalid.{unix_ts}"),
3416            }
3417        }
3418    }
3419}
3420
3421fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3422    let Some(parent) = json_path.parent() else {
3423        return Vec::new();
3424    };
3425    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3426        return Vec::new();
3427    };
3428    ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3429        .into_iter()
3430        .map(|extension| parent.join(format!("{stem}.{extension}")))
3431        .collect()
3432}
3433
3434fn read_for_token_count_from_disk(
3435    metadata: &PersistedTask,
3436    paths: &TaskPaths,
3437    max_bytes_per_stream: usize,
3438) -> TokenCountInput {
3439    if metadata.mode == BgMode::Pty {
3440        return TokenCountInput::Skipped;
3441    }
3442    // Read up to `max_bytes_per_stream` bytes per stream rather than
3443    // refusing to tokenize anything when the file exceeds the cap.
3444    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
3445    // (see comment there) — large outputs are exactly the tasks that
3446    // benefit most from compression accounting, so silent-skipping
3447    // them defeats the purpose of token tracking.
3448    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3449    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3450    match (stdout, stderr) {
3451        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3452            String::from_utf8_lossy(&stdout).as_ref(),
3453            String::from_utf8_lossy(&stderr).as_ref(),
3454        )),
3455        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3456            String::from_utf8_lossy(&stdout).as_ref(),
3457            "",
3458        )),
3459        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3460            "",
3461            String::from_utf8_lossy(&stderr).as_ref(),
3462        )),
3463        (Err(_), Err(_)) => TokenCountInput::Skipped,
3464    }
3465}
3466
3467/// Read at most `max_bytes` bytes from the END of `path`. Used for
3468/// tokenization where the most recent output is more representative than
3469/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
3470/// opened (genuinely missing or permissions error).
3471fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3472    use std::io::{Read, Seek, SeekFrom};
3473    let mut file = std::fs::File::open(path)?;
3474    let len = file.metadata()?.len();
3475    let read_len = len.min(max_bytes as u64);
3476    if read_len > 0 && len > max_bytes as u64 {
3477        file.seek(SeekFrom::End(-(read_len as i64)))?;
3478    }
3479    let mut bytes = Vec::with_capacity(read_len as usize);
3480    file.read_to_end(&mut bytes)?;
3481    Ok(bytes)
3482}
3483
3484impl BgTask {
3485    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3486        let state = self
3487            .state
3488            .lock()
3489            .unwrap_or_else(|poison| poison.into_inner());
3490        self.snapshot_locked(&state, preview_bytes)
3491    }
3492
3493    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3494        let metadata = &state.metadata;
3495        let duration_ms = metadata.duration_ms.or_else(|| {
3496            metadata
3497                .status
3498                .is_terminal()
3499                .then(|| self.started.elapsed().as_millis() as u64)
3500        });
3501        let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3502            (String::new(), false)
3503        } else if metadata.status.is_terminal() {
3504            state
3505                .terminal_output_cache
3506                .as_ref()
3507                .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3508                .unwrap_or_else(|| (String::new(), false))
3509        } else {
3510            state.buffer.read_tail(preview_bytes)
3511        };
3512        BgTaskSnapshot {
3513            info: BgTaskInfo {
3514                task_id: self.task_id.clone(),
3515                status: metadata.status.clone(),
3516                command: metadata.command.clone(),
3517                mode: metadata.mode.clone(),
3518                started_at: metadata.started_at,
3519                duration_ms,
3520            },
3521            exit_code: metadata.exit_code,
3522            child_pid: metadata.child_pid,
3523            workdir: metadata.workdir.display().to_string(),
3524            output_preview,
3525            output_truncated,
3526            output_path: state
3527                .buffer
3528                .output_path()
3529                .map(|path| path.display().to_string()),
3530            stderr_path: state
3531                .buffer
3532                .stderr_path()
3533                .map(|path| path.display().to_string()),
3534            pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3535            pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3536        }
3537    }
3538
3539    pub(crate) fn is_running(&self) -> bool {
3540        self.state
3541            .lock()
3542            .map(|state| {
3543                state.metadata.status == BgTaskStatus::Running
3544                    || (state.metadata.mode == BgMode::Pty
3545                        && state.metadata.status == BgTaskStatus::Killing)
3546            })
3547            .unwrap_or(false)
3548    }
3549
3550    fn is_terminal(&self) -> bool {
3551        self.state
3552            .lock()
3553            .map(|state| state.metadata.status.is_terminal())
3554            .unwrap_or(false)
3555    }
3556
3557    fn mark_terminal_now(&self) {
3558        if let Ok(mut terminal_at) = self.terminal_at.lock() {
3559            if terminal_at.is_none() {
3560                *terminal_at = Some(Instant::now());
3561            }
3562        }
3563    }
3564
3565    fn set_completion_delivered(
3566        &self,
3567        delivered: bool,
3568        registry: &BgTaskRegistry,
3569    ) -> Result<(), String> {
3570        let mut state = self
3571            .state
3572            .lock()
3573            .map_err(|_| "background task lock poisoned".to_string())?;
3574        let updated = registry
3575            .update_task_metadata(&self.paths, |metadata| {
3576                metadata.completion_delivered = delivered;
3577            })
3578            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3579        state.metadata = updated;
3580        Ok(())
3581    }
3582}
3583
3584/// Reap an exited direct child handle, then clear the slot.
3585///
3586/// Dropping a [`std::process::Child`] does NOT `wait()` on the underlying OS
3587/// process. On Unix a finished-but-unreaped child lingers as a `<defunct>`
3588/// zombie until the AFT process itself exits (issue #91: `[mv] <defunct>`).
3589/// The terminal-transition paths that learn of completion from the
3590/// exit-marker file — rather than from [`BgTaskRegistry::reap_child`]'s
3591/// `try_wait()` — must therefore reap the handle explicitly instead of just
3592/// nulling it.
3593///
3594/// The exit marker is written by the wrapper's final statement (an atomic
3595/// `mv` rename), so by the time we observe the marker the direct child has
3596/// finished its work and is exiting; `wait()` returns essentially
3597/// immediately. We attempt a non-blocking `try_wait()` first so the common
3598/// case never blocks at all, falling back to a (bounded) `wait()` only to
3599/// cover the microsecond window between the rename and process teardown.
3600///
3601/// Callers hold the task state mutex, so this is serialized against
3602/// `reap_child` — there is no double-`wait()` hazard: whichever path acquires
3603/// the lock first reaps and clears the slot, and the other observes `None`.
3604#[cfg(unix)]
3605fn reap_piped_child(child_slot: &mut Option<Child>) {
3606    if let Some(mut child) = child_slot.take() {
3607        if matches!(child.try_wait(), Ok(None)) {
3608            let _ = child.wait();
3609        }
3610    }
3611}
3612
3613/// Windows has no zombie/`<defunct>` concept: dropping the [`Child`] closes
3614/// the process handle, which is the correct release. Preserve the historical
3615/// behavior of simply clearing the slot so the documented Windows PID-recycle
3616/// handling in `reap_child` is unaffected.
3617#[cfg(windows)]
3618fn reap_piped_child(child_slot: &mut Option<Child>) {
3619    *child_slot = None;
3620}
3621
3622fn terminal_metadata_from_marker(
3623    mut metadata: PersistedTask,
3624    marker: ExitMarker,
3625    reason: Option<String>,
3626) -> PersistedTask {
3627    match marker {
3628        ExitMarker::Code(code) => {
3629            let status = if code == 0 {
3630                BgTaskStatus::Completed
3631            } else {
3632                BgTaskStatus::Failed
3633            };
3634            metadata.mark_terminal(status, Some(code), reason);
3635        }
3636        ExitMarker::Killed => metadata.mark_terminal(
3637            BgTaskStatus::Killed,
3638            terminal_exit_code_for_status(&BgTaskStatus::Killed),
3639            reason,
3640        ),
3641    }
3642    metadata
3643}
3644
3645fn terminal_exit_code_for_status(status: &BgTaskStatus) -> Option<i32> {
3646    match status {
3647        BgTaskStatus::TimedOut => Some(124),
3648        BgTaskStatus::Killed => Some(137),
3649        _ => None,
3650    }
3651}
3652
3653#[cfg(unix)]
3654fn write_unix_command_script(command: &str, paths: &TaskPaths) -> Result<PathBuf, String> {
3655    let stem = paths
3656        .json
3657        .file_stem()
3658        .and_then(|stem| stem.to_str())
3659        .unwrap_or("wrapper");
3660    let script_path = paths.dir.join(format!("{stem}.sh"));
3661    fs::write(&script_path, command)
3662        .map_err(|e| format!("failed to write background bash command script: {e}"))?;
3663    Ok(script_path)
3664}
3665
3666#[cfg(unix)]
3667fn detached_shell_command(command_script: &Path, exit_path: &Path) -> Command {
3668    let shell = resolve_posix_shell();
3669    let mut cmd = Command::new(&shell);
3670    // Keep the user-provided command body out of argv and shell `-c` parsing.
3671    // The direct child is still a tiny wrapper so it can write the authoritative
3672    // exit marker after the command script exits (including if that script calls
3673    // `exit`). Passing only file paths through `-c` avoids newline/quote/length
3674    // edge cases where a multi-command script can be mangled before execution.
3675    cmd.arg("-c")
3676        .arg(r#""$0" "$1"; code=$?; printf "%s" "$code" > "$2.tmp.$$"; mv -f "$2.tmp.$$" "$2""#)
3677        .arg(&shell)
3678        .arg(command_script)
3679        .arg(exit_path);
3680    unsafe {
3681        cmd.pre_exec(|| {
3682            if libc::setsid() == -1 {
3683                return Err(std::io::Error::last_os_error());
3684            }
3685            Ok(())
3686        });
3687    }
3688    cmd
3689}
3690
3691#[cfg(unix)]
3692fn resolve_posix_shell() -> PathBuf {
3693    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3694    POSIX_SHELL
3695        .get_or_init(|| {
3696            std::env::var_os("BASH")
3697                .filter(|value| !value.is_empty())
3698                .map(PathBuf::from)
3699                .filter(|path| path.exists())
3700                .or_else(|| which::which("bash").ok())
3701                .or_else(|| which::which("zsh").ok())
3702                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3703        })
3704        .clone()
3705}
3706
3707#[cfg(windows)]
3708fn detached_shell_command_for(
3709    shell: crate::windows_shell::WindowsShell,
3710    command: &str,
3711    exit_path: &Path,
3712    paths: &TaskPaths,
3713    creation_flags: u32,
3714) -> Result<Command, String> {
3715    use crate::windows_shell::WindowsShell;
3716    // Write the wrapper to a temp file alongside the other task files,
3717    // then invoke the shell with the file path as a single clean
3718    // argument. This sidesteps the entire Windows command-line quoting
3719    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
3720    // parser all interacting with embedded quotes in the wrapper).
3721    //
3722    // Path arguments don't need quoting in the same problematic way
3723    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
3724    // contains no characters that need shell escaping; (2) the wrapper
3725    // body's internal quotes never reach the shell command line — the
3726    // shell reads them from disk by file syntax rules, not command-line
3727    // parser rules.
3728    let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3729    let wrapper_ext = match shell {
3730        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3731        WindowsShell::Cmd => "bat",
3732        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
3733        // so the file extension is purely cosmetic; `.sh` matches what an
3734        // operator would expect when grepping the spill directory.
3735        WindowsShell::Posix(_) => "sh",
3736    };
3737    let wrapper_path = paths.dir.join(format!(
3738        "{}.{}",
3739        paths
3740            .json
3741            .file_stem()
3742            .and_then(|s| s.to_str())
3743            .unwrap_or("wrapper"),
3744        wrapper_ext
3745    ));
3746    fs::write(&wrapper_path, wrapper_body)
3747        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3748
3749    let mut cmd = Command::new(shell.binary().as_ref());
3750    match shell {
3751        WindowsShell::Pwsh | WindowsShell::Powershell => {
3752            // -File runs the script with no quoting issues. `-NoLogo`,
3753            // `-NoProfile`, etc. apply to the host before the file runs.
3754            cmd.args([
3755                "-NoLogo",
3756                "-NoProfile",
3757                "-NonInteractive",
3758                "-ExecutionPolicy",
3759                "Bypass",
3760                "-File",
3761            ]);
3762            cmd.arg(&wrapper_path);
3763        }
3764        WindowsShell::Cmd => {
3765            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
3766            // file via /C is well-defined; the file's contents are
3767            // read line-by-line by cmd's batch processor, NOT
3768            // re-interpreted by the /C parser. This avoids the
3769            // "filename syntax incorrect" errors that came from
3770            // having complex compound commands on the cmd line.
3771            cmd.args(["/D", "/C"]);
3772            cmd.arg(&wrapper_path);
3773        }
3774        WindowsShell::Posix(_) => {
3775            // git-bash and other POSIX shells run the wrapper script with
3776            // `<binary> <wrapper-path>` (the wrapper is just a shell
3777            // script). No special flags needed — the `trap` and atomic
3778            // exit-marker rename in `wrapper_script` are POSIX-standard.
3779            cmd.arg(&wrapper_path);
3780        }
3781    }
3782
3783    // Win32 process creation flags. Caller selects whether to include
3784    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
3785    // for the breakaway-fallback strategy.
3786    cmd.creation_flags(creation_flags);
3787    Ok(cmd)
3788}
3789
3790/// Spawn a detached background bash child process.
3791///
3792/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
3793/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
3794/// cmd.exe) and retries with the next candidate when the previous one
3795/// fails to spawn with `NotFound` — the same runtime safety net the
3796/// foreground bash path has, so issue #27 callers landing on cmd.exe
3797/// fallback can also use background bash. The wrapper script is
3798/// regenerated per attempt because PowerShell wrappers embed the shell
3799/// binary by name; the stdout/stderr capture handles are also reopened
3800/// per attempt because `Command::spawn()` consumes them.
3801///
3802/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
3803/// return immediately without retry — they indicate a problem with the
3804/// resolved shell that retrying with a different shell won't fix.
3805fn spawn_detached_child(
3806    command: &str,
3807    paths: &TaskPaths,
3808    workdir: &Path,
3809    env: &HashMap<String, String>,
3810) -> Result<std::process::Child, String> {
3811    #[cfg(not(windows))]
3812    {
3813        let stdout = create_capture_file(&paths.stdout)
3814            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3815        let stderr = create_capture_file(&paths.stderr)
3816            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3817        let command_script = write_unix_command_script(command, paths)?;
3818        detached_shell_command(&command_script, &paths.exit)
3819            .current_dir(workdir)
3820            .envs(env)
3821            .stdin(Stdio::null())
3822            .stdout(Stdio::from(stdout))
3823            .stderr(Stdio::from(stderr))
3824            .spawn()
3825            .map_err(|e| format!("failed to spawn background bash command: {e}"))
3826    }
3827    #[cfg(windows)]
3828    {
3829        use crate::windows_shell::shell_candidates;
3830        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
3831        // legacy foreground bash spawn path. v0.20 routes ALL bash through
3832        // this background spawn helper, including foreground tool calls
3833        // where the model writes PowerShell-syntax (`$var = ...`,
3834        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
3835        // The earlier v0.18-era cmd-first override worked around a
3836        // PowerShell detached-output bug; that bug is fixed at the
3837        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
3838        // see flag block below), so we no longer need to misroute PS
3839        // commands through cmd.
3840        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3841        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
3842        // first (so the bg child outlives the AFT process when AFT is killed),
3843        // then fall back without it for environments where the parent is in a
3844        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
3845        // runners (GitHub Actions windows-2022) and some MDM-managed corp
3846        // environments hit this — `CreateProcess` returns Access Denied (5).
3847        // Without breakaway, the child still runs detached but will be torn
3848        // down with the parent if the parent process group is signaled.
3849        //
3850        // We use CREATE_NO_WINDOW (no visible console window, but the
3851        // child still has a hidden console) rather than DETACHED_PROCESS
3852        // (no console at all). PowerShell-based wrappers that perform
3853        // file I/O via [System.IO.File] need a console handle to flush
3854        // stdout/stderr correctly even when redirected — under
3855        // DETACHED_PROCESS, pwsh sometimes silently exits before
3856        // executing later script statements (the Move-Item that writes
3857        // the exit marker never runs), leaving the bg task forever
3858        // marked Failed: process exited without exit marker. cmd.exe
3859        // wrappers tolerate DETACHED_PROCESS, but switching to
3860        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
3861        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3862        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3863        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3864        let with_breakaway =
3865            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3866        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3867        let mut last_error: Option<String> = None;
3868        for (idx, shell) in candidates.iter().enumerate() {
3869            // Per-shell, try with breakaway first. If the process is in a
3870            // restrictive job, the breakaway flag triggers Access Denied
3871            // (os error 5). Retry once without breakaway.
3872            for &flags in &[with_breakaway, without_breakaway] {
3873                // Re-open capture handles per attempt; spawn() consumes them.
3874                let stdout = create_capture_file(&paths.stdout)
3875                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3876                let stderr = create_capture_file(&paths.stderr)
3877                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3878                let mut cmd =
3879                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3880                cmd.current_dir(workdir)
3881                    .envs(env)
3882                    .stdin(Stdio::null())
3883                    .stdout(Stdio::from(stdout))
3884                    .stderr(Stdio::from(stderr));
3885                match cmd.spawn() {
3886                    Ok(child) => {
3887                        if idx > 0 {
3888                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3889                             the cached PATH probe disagreed with runtime spawn — likely PATH \
3890                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3891                            shell.binary(),
3892                            idx);
3893                        }
3894                        if flags == without_breakaway {
3895                            crate::slog_warn!(
3896                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3897                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
3898                             Spawned without breakaway; the bg task will be torn down if the \
3899                             AFT process group is killed."
3900                            );
3901                        }
3902                        return Ok(child);
3903                    }
3904                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3905                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3906                        shell.binary());
3907                        last_error = Some(format!("{}: {e}", shell.binary()));
3908                        // Skip the without-breakaway retry for NotFound — the
3909                        // binary itself is missing, breakaway flag is irrelevant.
3910                        break;
3911                    }
3912                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3913                        // Access Denied during breakaway — retry without it.
3914                        crate::slog_warn!(
3915                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3916                         Access Denied — retrying {} without breakaway",
3917                            shell.binary()
3918                        );
3919                        last_error = Some(format!("{}: {e}", shell.binary()));
3920                        continue;
3921                    }
3922                    Err(e) => {
3923                        return Err(format!(
3924                            "failed to spawn background bash command via {}: {e}",
3925                            shell.binary()
3926                        ));
3927                    }
3928                }
3929            }
3930        }
3931        Err(format!(
3932            "failed to spawn background bash command: no Windows shell could be spawned. \
3933             Last error: {}. PATH-probed candidates: {:?}",
3934            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3935            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3936        ))
3937    }
3938}
3939
3940fn random_slug() -> String {
3941    let mut bytes = [0u8; 4];
3942    // getrandom is a transitive dependency; use it directly for OS entropy.
3943    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3944        // Extremely unlikely fallback: time + pid mix.
3945        let t = SystemTime::now()
3946            .duration_since(UNIX_EPOCH)
3947            .map(|d| d.subsec_nanos())
3948            .unwrap_or(0);
3949        let p = std::process::id();
3950        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3951    });
3952    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
3953    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3954    format!("bash-{hex}")
3955}
3956
3957#[cfg(test)]
3958mod tests {
3959    use std::collections::HashMap;
3960    use std::fs;
3961    use std::sync::atomic::{AtomicBool, AtomicUsize};
3962    use std::sync::{Arc, Mutex};
3963    use std::time::Duration;
3964    #[cfg(windows)]
3965    use std::time::Instant;
3966
3967    use super::*;
3968
3969    #[cfg(unix)]
3970    const QUICK_SUCCESS_COMMAND: &str = "true";
3971    #[cfg(windows)]
3972    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3973
3974    #[cfg(unix)]
3975    const LONG_RUNNING_COMMAND: &str = "sleep 5";
3976    #[cfg(windows)]
3977    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3978
3979    #[test]
3980    fn gh_structured_detection_rejects_piped_commands() {
3981        assert!(is_gh_structured_command(
3982            "gh issue list --json number,title"
3983        ));
3984        assert!(is_gh_structured_command(
3985            "cd repo && gh issue list --json number,title"
3986        ));
3987
3988        assert!(!is_gh_structured_command(
3989            "gh issue list --json number,title | jq '.[]'"
3990        ));
3991        assert!(!is_gh_structured_command(
3992            "gh issue list --json number,title |"
3993        ));
3994    }
3995
3996    fn insert_terminal_piped_task(
3997        registry: &BgTaskRegistry,
3998        dir: &tempfile::TempDir,
3999        command: &str,
4000        stdout: &str,
4001        stderr: &str,
4002        compressed: bool,
4003    ) -> (String, Arc<BgTask>) {
4004        let task_id = format!("bash-test-{}", random_slug());
4005        let paths = task_paths(dir.path(), "session", &task_id);
4006        fs::create_dir_all(&paths.dir).unwrap();
4007        fs::write(&paths.stdout, stdout).unwrap();
4008        fs::write(&paths.stderr, stderr).unwrap();
4009        let mut metadata = PersistedTask::starting(
4010            task_id.clone(),
4011            "session".to_string(),
4012            command.to_string(),
4013            dir.path().to_path_buf(),
4014            Some(dir.path().to_path_buf()),
4015            Some(30_000),
4016            true,
4017            compressed,
4018        );
4019        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
4020        write_task(&paths.json, &metadata).unwrap();
4021        registry
4022            .insert_rehydrated_task(metadata, paths, true)
4023            .expect("insert terminal task");
4024        let task = registry.task_for_session(&task_id, "session").unwrap();
4025        (task_id, task)
4026    }
4027
4028    fn insert_terminal_pty_task(
4029        registry: &BgTaskRegistry,
4030        dir: &tempfile::TempDir,
4031        pty_output: &str,
4032    ) -> (String, Arc<BgTask>) {
4033        let task_id = format!("bash-test-{}", random_slug());
4034        let paths = task_paths(dir.path(), "session", &task_id);
4035        fs::create_dir_all(&paths.dir).unwrap();
4036        fs::write(&paths.pty, pty_output).unwrap();
4037        let mut metadata = PersistedTask::starting(
4038            task_id.clone(),
4039            "session".to_string(),
4040            "python".to_string(),
4041            dir.path().to_path_buf(),
4042            Some(dir.path().to_path_buf()),
4043            Some(30_000),
4044            true,
4045            true,
4046        );
4047        metadata.mode = BgMode::Pty;
4048        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
4049        write_task(&paths.json, &metadata).unwrap();
4050        registry
4051            .insert_rehydrated_task(metadata, paths, true)
4052            .expect("insert terminal pty task");
4053        let task = registry.task_for_session(&task_id, "session").unwrap();
4054        (task_id, task)
4055    }
4056
4057    #[cfg(unix)]
4058    fn wait_for_terminal_snapshot(
4059        registry: &BgTaskRegistry,
4060        task_id: &str,
4061        session_id: &str,
4062        project: &Path,
4063        storage: &Path,
4064    ) -> BgTaskSnapshot {
4065        let started = Instant::now();
4066        loop {
4067            let snapshot = registry
4068                .status(task_id, session_id, Some(project), Some(storage), 4096)
4069                .expect("spawned task should be visible to status");
4070            if snapshot.info.status.is_terminal() {
4071                return snapshot;
4072            }
4073            assert!(
4074                started.elapsed() < Duration::from_secs(10),
4075                "timed out waiting for task {task_id} to finish; last status={:?}",
4076                snapshot.info.status
4077            );
4078            std::thread::sleep(Duration::from_millis(50));
4079        }
4080    }
4081
4082    fn write_running_project_task(storage: &Path, project: &Path, session: &str, task_id: &str) {
4083        let paths = task_paths(storage, session, task_id);
4084        let mut metadata = PersistedTask::starting(
4085            task_id.to_string(),
4086            session.to_string(),
4087            "sleep 60".to_string(),
4088            project.to_path_buf(),
4089            Some(project.to_path_buf()),
4090            Some(30_000),
4091            true,
4092            true,
4093        );
4094        metadata.status = BgTaskStatus::Running;
4095        write_task(&paths.json, &metadata).unwrap();
4096        fs::write(&paths.stdout, "still running\n").unwrap();
4097        fs::write(&paths.stderr, "").unwrap();
4098    }
4099
4100    #[test]
4101    fn status_replay_filters_same_session_by_project_root() {
4102        let project_a = tempfile::tempdir().unwrap();
4103        let project_b = tempfile::tempdir().unwrap();
4104        let storage = tempfile::tempdir().unwrap();
4105        let session = "shared-session";
4106        let task_id = "bash-project-a";
4107        write_running_project_task(storage.path(), project_a.path(), session, task_id);
4108
4109        let actor_b = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4110        assert!(actor_b
4111            .status(
4112                task_id,
4113                session,
4114                Some(project_b.path()),
4115                Some(storage.path()),
4116                1024,
4117            )
4118            .is_none());
4119        assert!(actor_b.task_for_session(task_id, session).is_none());
4120
4121        let actor_a = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4122        let snapshot = actor_a
4123            .status(
4124                task_id,
4125                session,
4126                Some(project_a.path()),
4127                Some(storage.path()),
4128                1024,
4129            )
4130            .expect("owning project should replay its task");
4131        assert_eq!(snapshot.info.status, BgTaskStatus::Running);
4132    }
4133
4134    #[cfg(unix)]
4135    #[test]
4136    fn multiline_pipeline_stdout_persists_all_lines_after_terminal_status() {
4137        let cases = [
4138            (
4139                "long-first",
4140                "sleep 0.5; printf 'one\\n' | cat\nprintf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4141                vec!["one", "1", "three"],
4142            ),
4143            (
4144                "short-first",
4145                "printf 'one\\n' | cat\nsleep 0.2; printf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4146                vec!["one", "1", "three"],
4147            ),
4148            (
4149                "failing-middle",
4150                "sleep 0.2; printf 'one\\n' | cat\nfalse; printf 'after-false\\n' | cat\nprintf 'three\\n' | cat",
4151                vec!["one", "after-false", "three"],
4152            ),
4153        ];
4154
4155        for (name, command, expected_lines) in cases {
4156            let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4157            let dir = tempfile::tempdir().unwrap();
4158            let session_id = format!("session-{name}");
4159            let task_id = registry
4160                .spawn(
4161                    command,
4162                    session_id.clone(),
4163                    dir.path().to_path_buf(),
4164                    HashMap::new(),
4165                    Some(Duration::from_secs(30)),
4166                    dir.path().to_path_buf(),
4167                    10,
4168                    true,
4169                    true,
4170                    Some(dir.path().to_path_buf()),
4171                )
4172                .unwrap();
4173
4174            let snapshot = wait_for_terminal_snapshot(
4175                &registry,
4176                &task_id,
4177                &session_id,
4178                dir.path(),
4179                dir.path(),
4180            );
4181            assert_eq!(
4182                snapshot.info.status,
4183                BgTaskStatus::Completed,
4184                "{name}: task should complete; snapshot={snapshot:?}"
4185            );
4186            assert_eq!(
4187                snapshot.exit_code,
4188                Some(0),
4189                "{name}: script should use the final command's exit code"
4190            );
4191
4192            let stdout_path = task_paths(dir.path(), &session_id, &task_id).stdout;
4193            let stdout = fs::read_to_string(&stdout_path).unwrap_or_else(|error| {
4194                panic!(
4195                    "{name}: failed to read raw stdout file {}: {error}",
4196                    stdout_path.display()
4197                )
4198            });
4199            let lines: Vec<&str> = stdout.lines().collect();
4200            assert_eq!(
4201                lines,
4202                expected_lines,
4203                "{name}: raw stdout file must include every newline-separated command's output; stdout_path={}",
4204                stdout_path.display()
4205            );
4206        }
4207    }
4208
4209    #[test]
4210    fn recognizes_all_recovery_marker_forms() {
4211        assert!(is_recovery_marker(
4212            "[truncated output; full output: read \"/tmp/out\"]"
4213        ));
4214        assert!(is_recovery_marker(
4215            "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
4216        ));
4217        assert!(is_recovery_marker(
4218            "[truncated output; full output unavailable]"
4219        ));
4220        assert!(is_recovery_marker(
4221            r#"[truncated 123 bytes from saved output prefix; retained output: read "/tmp/out"]"#
4222        ));
4223    }
4224
4225    #[test]
4226    fn recovery_marker_reports_disk_prefix_truncation_as_retained_output() {
4227        let recovery = RecoveryContext {
4228            dropped_by_class: BTreeMap::new(),
4229            had_inner_drop: false,
4230            offset_hint_eligible: false,
4231            offset_start_line: None,
4232            byte_truncated: false,
4233            disk_truncated_prefix_bytes: 4096,
4234            output_path: Some("/tmp/stdout".to_string()),
4235            stderr_path: None,
4236            include_stderr_path: false,
4237        };
4238
4239        let marker = recovery_marker(&recovery).expect("disk truncation must emit marker");
4240
4241        assert!(marker.contains("truncated 4096 bytes from saved output prefix"));
4242        assert!(marker.contains(r#"retained output: read "/tmp/stdout""#));
4243        assert!(!marker.contains("full output: read"));
4244    }
4245
4246    #[test]
4247    fn killed_exit_marker_sets_nonzero_sentinel_exit_code() {
4248        let metadata = PersistedTask::starting(
4249            "task".to_string(),
4250            "session".to_string(),
4251            "cargo test".to_string(),
4252            PathBuf::from("/tmp"),
4253            None,
4254            None,
4255            true,
4256            true,
4257        );
4258
4259        let terminal = terminal_metadata_from_marker(metadata, ExitMarker::Killed, None);
4260
4261        assert_eq!(terminal.status, BgTaskStatus::Killed);
4262        assert_eq!(terminal.exit_code, Some(137));
4263    }
4264
4265    #[test]
4266    fn terminal_status_polls_use_cached_render_once_and_off_lock() {
4267        let registry = BgTaskRegistry::default();
4268        let dir = tempfile::tempdir().unwrap();
4269        let (_task_id, task) = insert_terminal_piped_task(
4270            &registry,
4271            &dir,
4272            "custom-tool --verbose",
4273            &"stdout line\n".repeat(200_000),
4274            "",
4275            true,
4276        );
4277        let calls = Arc::new(AtomicUsize::new(0));
4278        let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4279        let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4280        let calls_for_closure = Arc::clone(&calls);
4281        let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4282        let task_for_closure = Arc::clone(&task_holder);
4283        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4284            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4285            if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4286                if task.state.try_lock().is_ok() {
4287                    unlocked_for_closure.store(true, Ordering::SeqCst);
4288                }
4289            }
4290            CompressionResult::new(format!("compressed {} bytes", output.len()))
4291        });
4292
4293        let first = registry
4294            .status(
4295                &task.task_id,
4296                "session",
4297                None,
4298                Some(dir.path()),
4299                RUNNING_OUTPUT_PREVIEW_BYTES,
4300            )
4301            .unwrap();
4302        let second = registry
4303            .status(
4304                &task.task_id,
4305                "session",
4306                None,
4307                Some(dir.path()),
4308                RUNNING_OUTPUT_PREVIEW_BYTES,
4309            )
4310            .unwrap();
4311        let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4312
4313        assert_eq!(
4314            calls.load(Ordering::SeqCst),
4315            1,
4316            "terminal render must be cached"
4317        );
4318        assert!(
4319            saw_unlocked_state.load(Ordering::SeqCst),
4320            "compressor must run after releasing the task state lock"
4321        );
4322        assert!(first.output_preview.starts_with("compressed "));
4323        assert_eq!(second.output_preview, first.output_preview);
4324        assert_eq!(listed[0].output_preview, first.output_preview);
4325    }
4326
4327    #[test]
4328    fn completion_preview_success_keeps_tail_only() {
4329        // Exit-aware completion previews: a SUCCESSFUL task's reminder keeps a
4330        // short tail only — head context is noise when the command worked
4331        // (regression: the uniform 4 KiB head+tail cap flooded reminders with
4332        // ~1K tokens of build noise per completed task).
4333        let registry = BgTaskRegistry::default();
4334        let dir = tempfile::tempdir().unwrap();
4335        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4336        let (_task_id, task) =
4337            insert_terminal_piped_task(&registry, &dir, "cat big.log", &output, "", false);
4338
4339        registry.post_terminal_transition(&task, true).unwrap();
4340        let completions = registry.drain_completions_for_session(Some("session"));
4341        assert_eq!(completions.len(), 1);
4342        let preview = &completions[0].output_preview;
4343        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4344        assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4345        assert!(completions[0].output_truncated);
4346    }
4347
4348    #[test]
4349    fn completion_preview_failure_keeps_head_and_tail() {
4350        // A FAILED task keeps a small head (first error / command banner) plus
4351        // a larger tail (tracebacks and summaries land at the end).
4352        let registry = BgTaskRegistry::default();
4353        let dir = tempfile::tempdir().unwrap();
4354        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4355        let task_id = format!("bash-test-{}", random_slug());
4356        let paths = task_paths(dir.path(), "session", &task_id);
4357        fs::create_dir_all(&paths.dir).unwrap();
4358        fs::write(&paths.stdout, &output).unwrap();
4359        fs::write(&paths.stderr, "").unwrap();
4360        let mut metadata = PersistedTask::starting(
4361            task_id.clone(),
4362            "session".to_string(),
4363            "cat big.log".to_string(),
4364            dir.path().to_path_buf(),
4365            Some(dir.path().to_path_buf()),
4366            Some(30_000),
4367            true,
4368            false,
4369        );
4370        metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4371        write_task(&paths.json, &metadata).unwrap();
4372        registry
4373            .insert_rehydrated_task(metadata, paths, true)
4374            .expect("insert terminal task");
4375        let task = registry.task_for_session(&task_id, "session").unwrap();
4376
4377        registry.post_terminal_transition(&task, true).unwrap();
4378        let completions = registry.drain_completions_for_session(Some("session"));
4379        assert_eq!(completions.len(), 1);
4380        let preview = &completions[0].output_preview;
4381        assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4382        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4383    }
4384
4385    #[test]
4386    fn has_completions_for_session_matches_pending_delivery() {
4387        let registry = BgTaskRegistry::default();
4388        assert!(!registry.has_completions_for_session(Some("session")));
4389        assert!(!registry.has_completions_for_session(None));
4390
4391        let dir = tempfile::tempdir().unwrap();
4392        let (_task_id, task) =
4393            insert_terminal_piped_task(&registry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4394        registry.post_terminal_transition(&task, true).unwrap();
4395
4396        assert!(registry.has_completions_for_session(Some("session")));
4397        assert!(registry.has_completions_for_session(None));
4398        assert!(!registry.has_completions_for_session(Some("other-session")));
4399
4400        let completions = registry.drain_completions_for_session(Some("session"));
4401        assert_eq!(completions.len(), 1);
4402        assert_eq!(completions[0].task_id, task.task_id);
4403    }
4404
4405    #[test]
4406    fn structured_gh_json_survives_intact_and_ignores_stderr() {
4407        let registry = BgTaskRegistry::default();
4408        let dir = tempfile::tempdir().unwrap();
4409        let calls = Arc::new(AtomicUsize::new(0));
4410        let calls_for_closure = Arc::clone(&calls);
4411        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4412            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4413            CompressionResult::new(output)
4414        });
4415        let (task_id, _task) = insert_terminal_piped_task(
4416            &registry,
4417            &dir,
4418            "gh pr view 123 --json body",
4419            "{\"body\":\"hello\"}",
4420            "warning: stderr must not join json",
4421            true,
4422        );
4423
4424        let snapshot = registry
4425            .status(
4426                &task_id,
4427                "session",
4428                None,
4429                Some(dir.path()),
4430                RUNNING_OUTPUT_PREVIEW_BYTES,
4431            )
4432            .unwrap();
4433
4434        assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4435        assert!(!snapshot.output_preview.contains("warning"));
4436        assert!(!snapshot.output_truncated);
4437        assert_eq!(
4438            calls.load(Ordering::SeqCst),
4439            0,
4440            "structured JSON bypasses compression"
4441        );
4442    }
4443
4444    #[test]
4445    fn registry_emits_single_recovery_marker_for_class_drops() {
4446        let registry = BgTaskRegistry::default();
4447        let dir = tempfile::tempdir().unwrap();
4448        registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4449            let mut dropped = BTreeMap::new();
4450            dropped.insert(DropClass::Error, 18);
4451            dropped.insert(DropClass::Warning, 6);
4452            CompressionResult::with_class_drops("kept diagnostic", dropped)
4453        });
4454        let (task_id, task) =
4455            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4456
4457        let snapshot = registry
4458            .status(
4459                &task_id,
4460                "session",
4461                None,
4462                Some(dir.path()),
4463                RUNNING_OUTPUT_PREVIEW_BYTES,
4464            )
4465            .unwrap();
4466
4467        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4468        assert!(snapshot.output_preview.contains("+18 more errors"));
4469        assert!(snapshot.output_preview.contains("+6 more warnings"));
4470        assert!(snapshot
4471            .output_preview
4472            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4473        assert!(!snapshot.output_preview.contains("tail -n +"));
4474        assert!(snapshot.output_truncated);
4475    }
4476
4477    #[test]
4478    fn registry_marker_reports_semantic_and_byte_drops_once() {
4479        let registry = BgTaskRegistry::default();
4480        let dir = tempfile::tempdir().unwrap();
4481        registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4482            let mut dropped = BTreeMap::new();
4483            dropped.insert(DropClass::Error, 1);
4484            CompressionResult::with_class_drops(
4485                format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4486                dropped,
4487            )
4488        });
4489        let (task_id, _task) =
4490            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4491
4492        let snapshot = registry
4493            .status(
4494                &task_id,
4495                "session",
4496                None,
4497                Some(dir.path()),
4498                RUNNING_OUTPUT_PREVIEW_BYTES,
4499            )
4500            .unwrap();
4501
4502        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4503        assert!(snapshot.output_preview.contains("+1 more error"));
4504        assert!(snapshot.output_preview.contains("truncated output"));
4505        assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4506        assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4507        assert!(!snapshot.output_preview.contains("...<truncated"));
4508        assert!(snapshot.output_truncated);
4509    }
4510
4511    #[test]
4512    fn cargo_stderr_class_drops_name_both_capture_paths() {
4513        let registry = BgTaskRegistry::default();
4514        let dir = tempfile::tempdir().unwrap();
4515        let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4516        registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4517            crate::compress::compress_with_registry_exit_code(
4518                command,
4519                &output,
4520                exit_code,
4521                &filter_registry,
4522            )
4523        });
4524        let stderr = (0..22)
4525            .map(|index| {
4526                format!(
4527                    "error: cargo failure {index}\n  --> src/lib.rs:{}:1\n   |\n{} | boom\n",
4528                    index + 1,
4529                    index + 1
4530                )
4531            })
4532            .collect::<Vec<_>>()
4533            .join("\n");
4534        let (task_id, task) = insert_terminal_piped_task(
4535            &registry,
4536            &dir,
4537            "cargo check",
4538            "Finished dev [unoptimized] target(s) in 0.01s\n",
4539            &stderr,
4540            true,
4541        );
4542
4543        let snapshot = registry
4544            .status(
4545                &task_id,
4546                "session",
4547                None,
4548                Some(dir.path()),
4549                RUNNING_OUTPUT_PREVIEW_BYTES,
4550            )
4551            .unwrap();
4552
4553        assert!(snapshot.output_preview.contains("+2 more errors"));
4554        assert!(snapshot
4555            .output_preview
4556            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4557        assert!(snapshot
4558            .output_preview
4559            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4560        assert!(!snapshot.output_preview.contains("tail -n +"));
4561    }
4562
4563    #[test]
4564    fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4565        let registry = BgTaskRegistry::default();
4566        let dir = tempfile::tempdir().unwrap();
4567        let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4568        let (task_id, task) = insert_terminal_piped_task(
4569            &registry,
4570            &dir,
4571            "cd /repo && gh pr view 123 --json body",
4572            &body,
4573            "",
4574            true,
4575        );
4576
4577        let snapshot = registry
4578            .status(
4579                &task_id,
4580                "session",
4581                None,
4582                Some(dir.path()),
4583                RUNNING_OUTPUT_PREVIEW_BYTES,
4584            )
4585            .unwrap();
4586
4587        assert!(snapshot.output_preview.starts_with("[JSON output "));
4588        assert!(snapshot
4589            .output_preview
4590            .contains(&task.paths.stdout.display().to_string()));
4591        assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4592        assert!(snapshot.output_truncated);
4593    }
4594
4595    #[test]
4596    fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4597        let registry = BgTaskRegistry::default();
4598        let dir = tempfile::tempdir().unwrap();
4599        let filter_registry = crate::compress::toml_filter::build_registry(
4600            crate::compress::builtin_filters::ALL,
4601            None,
4602            None,
4603        );
4604        registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4605            crate::compress::compress_with_registry_exit_code(
4606                command,
4607                &output,
4608                exit_code,
4609                &filter_registry,
4610            )
4611        });
4612        let stdout = format!(
4613            "make[1]: Entering directory `/tmp`\n{}",
4614            (0..100)
4615                .map(|index| format!("compile line {index}"))
4616                .collect::<Vec<_>>()
4617                .join("\n")
4618        );
4619        let (task_id, task) =
4620            insert_terminal_piped_task(&registry, &dir, "make all", &stdout, "", true);
4621
4622        let snapshot = registry
4623            .status(
4624                &task_id,
4625                "session",
4626                None,
4627                Some(dir.path()),
4628                RUNNING_OUTPUT_PREVIEW_BYTES,
4629            )
4630            .unwrap();
4631
4632        assert!(snapshot.output_preview.contains("compile line 99"));
4633        assert!(snapshot.output_preview.contains(&format!(
4634            "full output: read \"{}\"",
4635            task.paths.stdout.display()
4636        )));
4637        assert!(!snapshot
4638            .output_preview
4639            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4640        assert!(!snapshot.output_preview.contains("tail -n +"));
4641    }
4642
4643    #[test]
4644    fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4645        let registry = BgTaskRegistry::default();
4646        let dir = tempfile::tempdir().unwrap();
4647        let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4648        let (task_id, task) =
4649            insert_terminal_piped_task(&registry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4650
4651        let snapshot = registry
4652            .status(
4653                &task_id,
4654                "session",
4655                None,
4656                Some(dir.path()),
4657                RUNNING_OUTPUT_PREVIEW_BYTES,
4658            )
4659            .unwrap();
4660
4661        assert!(snapshot.output_preview.contains("RAW-HEAD"));
4662        assert!(snapshot.output_preview.contains("RAW-TAIL"));
4663        assert!(snapshot.output_preview.contains("truncated output"));
4664        assert!(snapshot
4665            .output_preview
4666            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4667        assert!(snapshot
4668            .output_preview
4669            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4670        assert!(!snapshot.output_preview.contains("tail -n +"));
4671        assert!(snapshot.output_preview.len() > 16 * 1024);
4672        assert!(snapshot.output_truncated);
4673    }
4674
4675    #[test]
4676    fn pty_terminal_snapshot_bypasses_line_compression() {
4677        let registry = BgTaskRegistry::default();
4678        let dir = tempfile::tempdir().unwrap();
4679        let calls = Arc::new(AtomicUsize::new(0));
4680        let calls_for_closure = Arc::clone(&calls);
4681        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4682            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4683            CompressionResult::new(output)
4684        });
4685        let (task_id, _task) = insert_terminal_pty_task(&registry, &dir, "raw\u{1b}[31m pty bytes");
4686
4687        let snapshot = registry
4688            .status(
4689                &task_id,
4690                "session",
4691                None,
4692                Some(dir.path()),
4693                RUNNING_OUTPUT_PREVIEW_BYTES,
4694            )
4695            .unwrap();
4696
4697        assert_eq!(snapshot.info.mode, BgMode::Pty);
4698        assert_eq!(snapshot.output_preview, "");
4699        assert_eq!(calls.load(Ordering::SeqCst), 0);
4700    }
4701
4702    #[test]
4703    fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4704        let registry = BgTaskRegistry::default();
4705        let dir = tempfile::tempdir().unwrap();
4706        let task_id = registry
4707            .spawn_pty(
4708                QUICK_SUCCESS_COMMAND,
4709                "session".to_string(),
4710                dir.path().to_path_buf(),
4711                HashMap::new(),
4712                Some(Duration::from_secs(30)),
4713                dir.path().to_path_buf(),
4714                10,
4715                true,
4716                false,
4717                Some(dir.path().to_path_buf()),
4718                50,
4719                120,
4720            )
4721            .unwrap();
4722
4723        let paths = task_paths(dir.path(), "session", &task_id);
4724        let metadata = read_task(&paths.json).unwrap();
4725        assert_eq!(
4726            metadata.schema_version,
4727            crate::bash_background::persistence::SCHEMA_VERSION
4728        );
4729        assert_eq!(metadata.mode, BgMode::Pty);
4730        assert_eq!(metadata.pty_rows, Some(50));
4731        assert_eq!(metadata.pty_cols, Some(120));
4732
4733        let snapshot = registry
4734            .status(&task_id, "session", None, Some(dir.path()), 1024)
4735            .unwrap();
4736        assert_eq!(snapshot.pty_rows, Some(50));
4737        assert_eq!(snapshot.pty_cols, Some(120));
4738    }
4739
4740    /// Spawn a child process that exits immediately and return it after
4741    /// it has terminated. Used by reap_child tests to simulate the
4742    /// "child exists and is dead" state when the watchdog has already
4743    /// nulled out the original child handle.
4744    fn spawn_dead_child() -> std::process::Child {
4745        #[cfg(unix)]
4746        let mut cmd = std::process::Command::new("true");
4747        #[cfg(windows)]
4748        let mut cmd = {
4749            let mut c = std::process::Command::new("cmd");
4750            c.args(["/c", "exit", "0"]);
4751            c
4752        };
4753        cmd.stdin(std::process::Stdio::null());
4754        cmd.stdout(std::process::Stdio::null());
4755        cmd.stderr(std::process::Stdio::null());
4756        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4757        // Poll try_wait() until the child actually exits, instead of calling
4758        // wait() which closes the OS handle. On Windows, after wait()
4759        // closes the handle, subsequent try_wait() calls (which reap_child
4760        // depends on) return Err — the test was inadvertently giving
4761        // reap_child an unusable child handle. Polling try_wait() keeps the
4762        // handle open and observes natural exit, matching the production
4763        // shape where the watchdog discovers an exited child for the first
4764        // time.
4765        let started = Instant::now();
4766        loop {
4767            match child.try_wait() {
4768                Ok(Some(_)) => break,
4769                Ok(None) => {
4770                    if started.elapsed() > Duration::from_secs(5) {
4771                        panic!("dead-child stand-in did not exit within 5s");
4772                    }
4773                    std::thread::sleep(Duration::from_millis(10));
4774                }
4775                Err(error) => panic!("dead-child try_wait failed: {error}"),
4776            }
4777        }
4778        child
4779    }
4780
4781    #[test]
4782    fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4783        let registry = BgTaskRegistry::default();
4784        let dir = tempfile::tempdir().unwrap();
4785        let task_id = registry
4786            .spawn(
4787                LONG_RUNNING_COMMAND,
4788                "session".to_string(),
4789                dir.path().to_path_buf(),
4790                HashMap::new(),
4791                Some(Duration::from_secs(30)),
4792                dir.path().to_path_buf(),
4793                10,
4794                true,
4795                false,
4796                Some(dir.path().to_path_buf()),
4797            )
4798            .unwrap();
4799        registry
4800            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4801            .unwrap();
4802        assert_eq!(
4803            registry
4804                .drain_completions_for_session(Some("session"))
4805                .len(),
4806            1
4807        );
4808
4809        // Simulate the plugin consuming a sync bash_watch({ exit:true }) result
4810        // locally before the Rust completion queue is drained/acked.
4811        registry.inner.completions.lock().unwrap().clear();
4812
4813        assert_eq!(
4814            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4815            vec![task_id.clone()]
4816        );
4817        assert!(registry
4818            .drain_completions_for_session(Some("session"))
4819            .is_empty());
4820
4821        let paths = task_paths(dir.path(), "session", &task_id);
4822        let metadata = read_task(&paths.json).unwrap();
4823        assert!(metadata.completion_delivered);
4824
4825        let replayed = BgTaskRegistry::default();
4826        replayed
4827            .replay_session_inner(dir.path(), "session", None)
4828            .unwrap();
4829        assert!(replayed
4830            .drain_completions_for_session(Some("session"))
4831            .is_empty());
4832    }
4833
4834    #[test]
4835    fn register_watch_rejects_unknown_task() {
4836        let registry = BgTaskRegistry::default();
4837
4838        let result = registry.register_watch(
4839            "missing-task".to_string(),
4840            WatchPattern::Substring("READY".into()),
4841            true,
4842        );
4843
4844        assert_eq!(result, Err("task_not_found"));
4845    }
4846
4847    #[test]
4848    fn register_watch_on_terminal_task_scans_existing_output() {
4849        let frames = Arc::new(Mutex::new(Vec::new()));
4850        let captured = Arc::clone(&frames);
4851        let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4852            captured.lock().unwrap().push(frame);
4853        })
4854            as Box<dyn Fn(PushFrame) + Send + Sync>);
4855        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4856        let dir = tempfile::tempdir().unwrap();
4857        let task_id = registry
4858            .spawn(
4859                LONG_RUNNING_COMMAND,
4860                "session".to_string(),
4861                dir.path().to_path_buf(),
4862                HashMap::new(),
4863                Some(Duration::from_secs(30)),
4864                dir.path().to_path_buf(),
4865                10,
4866                true,
4867                false,
4868                Some(dir.path().to_path_buf()),
4869            )
4870            .unwrap();
4871        registry
4872            .inner
4873            .shutdown
4874            .store(true, std::sync::atomic::Ordering::SeqCst);
4875        let task = registry.task_for_session(&task_id, "session").unwrap();
4876        std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4877        registry
4878            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4879            .unwrap();
4880        frames.lock().unwrap().clear();
4881        registry.inner.completions.lock().unwrap().clear();
4882
4883        registry
4884            .register_watch(
4885                task_id.clone(),
4886                WatchPattern::Substring("READY".into()),
4887                true,
4888            )
4889            .unwrap();
4890
4891        let frames = frames.lock().unwrap();
4892        let frame = frames
4893            .iter()
4894            .find_map(|frame| match frame {
4895                PushFrame::BashPatternMatch(frame) => Some(frame),
4896                _ => None,
4897            })
4898            .expect("terminal watch registration should emit pattern frame");
4899        assert_eq!(frame.reason, "pattern_match");
4900        assert_eq!(frame.task_id, task_id);
4901        assert_eq!(frame.session_id, "session");
4902        assert_eq!(frame.match_text, "READY");
4903        assert_eq!(frame.match_offset, 0);
4904        assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4905        let metadata = read_task(&task.paths.json).unwrap();
4906        assert!(metadata.completion_delivered);
4907    }
4908
4909    #[test]
4910    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4911        let registry = BgTaskRegistry::default();
4912        let dir = tempfile::tempdir().unwrap();
4913        let task_id = registry
4914            .spawn(
4915                QUICK_SUCCESS_COMMAND,
4916                "session".to_string(),
4917                dir.path().to_path_buf(),
4918                HashMap::new(),
4919                Some(Duration::from_secs(30)),
4920                dir.path().to_path_buf(),
4921                10,
4922                true,
4923                false,
4924                Some(dir.path().to_path_buf()),
4925            )
4926            .unwrap();
4927        registry
4928            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4929            .unwrap();
4930        let completions = registry.drain_completions_for_session(Some("session"));
4931        assert_eq!(completions.len(), 1);
4932        assert_eq!(
4933            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4934            vec![task_id.clone()]
4935        );
4936
4937        registry.cleanup_finished(Duration::ZERO);
4938
4939        assert!(registry.inner.tasks.lock().unwrap().is_empty());
4940    }
4941
4942    #[test]
4943    fn cleanup_finished_retains_undelivered_terminals() {
4944        let registry = BgTaskRegistry::default();
4945        let dir = tempfile::tempdir().unwrap();
4946        let task_id = registry
4947            .spawn(
4948                QUICK_SUCCESS_COMMAND,
4949                "session".to_string(),
4950                dir.path().to_path_buf(),
4951                HashMap::new(),
4952                Some(Duration::from_secs(30)),
4953                dir.path().to_path_buf(),
4954                10,
4955                true,
4956                false,
4957                Some(dir.path().to_path_buf()),
4958            )
4959            .unwrap();
4960        registry
4961            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4962            .unwrap();
4963
4964        registry.cleanup_finished(Duration::ZERO);
4965
4966        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4967    }
4968
4969    /// Verify that the live watchdog path (reap_child) gives an exited
4970    /// child one watchdog pass for its exit marker to land, then marks the
4971    /// task Failed if the next pass still sees no marker.
4972    ///
4973    /// Cross-platform: uses a quick-exiting command that does NOT go
4974    /// through the wrapper script (we manually clear the exit marker
4975    /// after spawn to simulate the wrapper crashing before write).
4976    #[test]
4977    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4978        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4979        let dir = tempfile::tempdir().unwrap();
4980        let task_id = registry
4981            .spawn(
4982                QUICK_SUCCESS_COMMAND,
4983                "session".to_string(),
4984                dir.path().to_path_buf(),
4985                HashMap::new(),
4986                Some(Duration::from_secs(30)),
4987                dir.path().to_path_buf(),
4988                10,
4989                true,
4990                false,
4991                Some(dir.path().to_path_buf()),
4992            )
4993            .unwrap();
4994
4995        let task = registry.task_for_session(&task_id, "session").unwrap();
4996
4997        // Wait for the child to actually exit and the wrapper to either
4998        // write the marker or fail. Then nuke the marker to simulate
4999        // wrapper crash before write. Poll up to 5s; this is plenty for a
5000        // `true`/`cmd /c exit 0` invocation.
5001        let started = Instant::now();
5002        loop {
5003            let exited = {
5004                let mut state = task.state.lock().unwrap();
5005                match &mut state.runtime {
5006                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5007                    _ => true,
5008                }
5009            };
5010            if exited {
5011                break;
5012            }
5013            assert!(
5014                started.elapsed() < Duration::from_secs(5),
5015                "child should exit quickly"
5016            );
5017            std::thread::sleep(Duration::from_millis(20));
5018        }
5019
5020        // Stop the watchdog so it doesn't race with our manual reap_child.
5021        // On fast Windows runners the watchdog ticks (every 500ms) can
5022        // observe the child exit and reap it before this test's assertion
5023        // fires, leaving us with state.child = None and an already-terminal
5024        // status. We specifically want to test reap_child's logic when
5025        // invoked manually on a Running-but-actually-dead task, so we need
5026        // exclusive control over the reap path here.
5027        registry
5028            .inner
5029            .shutdown
5030            .store(true, std::sync::atomic::Ordering::SeqCst);
5031        // Give the watchdog at most one tick (500ms) to notice shutdown
5032        // before we touch task state. Without this, an in-flight watchdog
5033        // iteration could still race with our state setup below.
5034        std::thread::sleep(Duration::from_millis(550));
5035
5036        // Wrapper likely wrote the marker by now; remove it to simulate
5037        // a wrapper crash that exited before persisting the exit code.
5038        let _ = std::fs::remove_file(&task.paths.exit);
5039
5040        // The watchdog may have already reaped the child handle and
5041        // marked the task terminal before we got here. Reset both so
5042        // reap_child has the "Running task whose child just exited"
5043        // shape it's designed to handle. If the original child handle is
5044        // gone, install a quick-exited stand-in so the first reap exercises
5045        // the same try_wait path as production.
5046        //
5047        // CRITICAL on Windows: the watchdog ticks fast enough that the
5048        // JSON on disk may already say `Completed`. `update_task` (called
5049        // by `reap_child`) reads from disk, applies the closure, but
5050        // ROLLS BACK if the original on-disk state was already terminal
5051        // (see persistence.rs::update_task). So we must reset BOTH
5052        // in-memory metadata AND the JSON on disk to a Running state to
5053        // give reap_child the fresh shape it expects to operate on.
5054        {
5055            let mut state = task.state.lock().unwrap();
5056            state.metadata.status = BgTaskStatus::Running;
5057            state.metadata.status_reason = None;
5058            state.metadata.exit_code = None;
5059            state.metadata.finished_at = None;
5060            state.metadata.duration_ms = None;
5061            // Persist the reset state to disk so update_task's terminal
5062            // rollback guard sees a non-terminal starting point.
5063            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5064                .expect("persist reset Running metadata for reap_child test");
5065            // If the watchdog already nulled state.child, we need to
5066            // simulate "child exists and is dead" so reap_child's
5067            // try_wait path runs. Spawn a quick-exit child as a stand-in.
5068            if matches!(state.runtime, TaskRuntime::Piped(None)) {
5069                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5070            }
5071        }
5072        // Clear the terminal_at marker too so mark_terminal_now() can fire
5073        // again inside reap_child.
5074        *task.terminal_at.lock().unwrap() = None;
5075
5076        // Sanity: task is still Running per metadata (replay/poll hasn't
5077        // observed the missing marker yet).
5078        assert!(
5079            task.is_running(),
5080            "precondition: metadata.status == Running"
5081        );
5082        assert!(
5083            !task.paths.exit.exists(),
5084            "precondition: exit marker absent"
5085        );
5086
5087        // First watchdog observation is intentionally insufficient to
5088        // declare failure. A missing marker may just mean the wrapper is
5089        // still completing its tmp-file-to-marker rename, so reap_child only
5090        // drops the child handle and switches to detached PID monitoring.
5091        registry.reap_child(&task);
5092
5093        {
5094            let state = task.state.lock().unwrap();
5095            assert_eq!(
5096                state.metadata.status,
5097                BgTaskStatus::Running,
5098                "first reap must leave status Running while waiting one pass for marker"
5099            );
5100            assert_eq!(
5101                state.metadata.status_reason, None,
5102                "first reap must not record a failure reason"
5103            );
5104            assert!(
5105                matches!(state.runtime, TaskRuntime::Piped(None)),
5106                "child handle must be released after first reap"
5107            );
5108            assert!(
5109                state.detached,
5110                "task must be marked detached after first reap"
5111            );
5112        }
5113
5114        // Second watchdog observation sees the detached PID is dead and the
5115        // marker is still absent. That is strong enough evidence that the
5116        // wrapper exited without persisting an exit code.
5117        registry.reap_child(&task);
5118
5119        let state = task.state.lock().unwrap();
5120        assert!(
5121            state.metadata.status.is_terminal(),
5122            "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
5123            state.metadata.status
5124        );
5125        assert_eq!(
5126            state.metadata.status,
5127            BgTaskStatus::Failed,
5128            "must specifically be Failed (not Killed): status={:?}",
5129            state.metadata.status
5130        );
5131        assert_eq!(
5132            state.metadata.status_reason.as_deref(),
5133            Some("process exited without exit marker"),
5134            "reason must match replay path's wording: {:?}",
5135            state.metadata.status_reason
5136        );
5137        assert!(
5138            matches!(state.runtime, TaskRuntime::Piped(None)),
5139            "child handle must stay released after second reap"
5140        );
5141        assert!(
5142            state.detached,
5143            "task must remain detached after second reap"
5144        );
5145    }
5146
5147    /// Companion to the above: when the exit marker DOES exist on disk
5148    /// at reap_child time, reap_child must NOT mark the task Failed.
5149    /// Instead it leaves status=Running and lets the next poll_task()
5150    /// cycle finalize via the marker.
5151    #[test]
5152    fn reap_child_preserves_running_when_exit_marker_exists() {
5153        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5154        let dir = tempfile::tempdir().unwrap();
5155        let task_id = registry
5156            .spawn(
5157                QUICK_SUCCESS_COMMAND,
5158                "session".to_string(),
5159                dir.path().to_path_buf(),
5160                HashMap::new(),
5161                Some(Duration::from_secs(30)),
5162                dir.path().to_path_buf(),
5163                10,
5164                true,
5165                false,
5166                Some(dir.path().to_path_buf()),
5167            )
5168            .unwrap();
5169
5170        let task = registry.task_for_session(&task_id, "session").unwrap();
5171
5172        // Wait for child to exit AND for the marker to land. Both happen
5173        // shortly after the wrapper finishes — but we want both observed.
5174        let started = Instant::now();
5175        loop {
5176            let exited = {
5177                let mut state = task.state.lock().unwrap();
5178                match &mut state.runtime {
5179                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5180                    _ => true,
5181                }
5182            };
5183            if exited && task.paths.exit.exists() {
5184                break;
5185            }
5186            assert!(
5187                started.elapsed() < Duration::from_secs(5),
5188                "child should exit and write marker quickly"
5189            );
5190            std::thread::sleep(Duration::from_millis(20));
5191        }
5192
5193        // Stop the watchdog so it doesn't race with our manual reap_child.
5194        // On fast Windows runners the watchdog can call poll_task (which
5195        // finalizes via marker) before this test asserts the
5196        // "marker exists, status still Running" invariant. We want
5197        // exclusive control over the reap path.
5198        registry
5199            .inner
5200            .shutdown
5201            .store(true, std::sync::atomic::Ordering::SeqCst);
5202        std::thread::sleep(Duration::from_millis(550));
5203
5204        // If the watchdog already finalized the task before we stopped it,
5205        // restore the test setup: reset status to Running and ensure the
5206        // marker file is still on disk. We're testing reap_child's
5207        // behavior when called manually with both child-exited AND
5208        // marker-present, regardless of whether the watchdog beat us.
5209        {
5210            let mut state = task.state.lock().unwrap();
5211            state.metadata.status = BgTaskStatus::Running;
5212            state.metadata.status_reason = None;
5213            if matches!(state.runtime, TaskRuntime::Piped(None)) {
5214                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5215            }
5216        }
5217        *task.terminal_at.lock().unwrap() = None;
5218        // Make sure the marker is still on disk (poll_task removes it on
5219        // finalization). Recreate it if needed.
5220        if !task.paths.exit.exists() {
5221            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
5222        }
5223
5224        // reap_child sees: child exited, marker exists. It should:
5225        //  - drop state.child / set state.detached = true
5226        //  - NOT change status (poll_task will finalize via marker next tick)
5227        registry.reap_child(&task);
5228
5229        let state = task.state.lock().unwrap();
5230        assert!(
5231            matches!(state.runtime, TaskRuntime::Piped(None)),
5232            "child handle still released even when marker exists"
5233        );
5234        assert!(
5235            state.detached,
5236            "task still marked detached even when marker exists"
5237        );
5238        // Status remains Running because reap_child defers to poll_task
5239        // when a marker exists. It would be wrong for reap to record the
5240        // marker outcome (poll_task does that with proper exit-code
5241        // parsing).
5242        assert_eq!(
5243            state.metadata.status,
5244            BgTaskStatus::Running,
5245            "reap_child must defer to poll_task when marker exists"
5246        );
5247    }
5248
5249    /// Read a process's `ps` state string ("Z", "S", "R", etc). Returns
5250    /// `None` once the PID has been fully reaped (no row), which is the
5251    /// post-reap state we want.
5252    #[cfg(unix)]
5253    fn pid_stat(pid: u32) -> Option<String> {
5254        let output = std::process::Command::new("ps")
5255            .args(["-o", "stat=", "-p", &pid.to_string()])
5256            .output()
5257            .ok()?;
5258        if !output.status.success() {
5259            return None;
5260        }
5261        let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
5262        if stat.is_empty() {
5263            None
5264        } else {
5265            Some(stat)
5266        }
5267    }
5268
5269    /// A `<defunct>` zombie carries `ps` state starting with 'Z'.
5270    #[cfg(unix)]
5271    fn is_zombie(pid: u32) -> bool {
5272        pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5273    }
5274
5275    /// Spawn a child that exits immediately and wait — via `ps`, NOT
5276    /// `try_wait()`/`wait()` — until it is observably a `<defunct>` zombie,
5277    /// then return the still-unreaped handle. This reproduces the exact
5278    /// state issue #91 leaves behind: an exited OS child whose parent has
5279    /// not reaped it.
5280    #[cfg(unix)]
5281    fn spawn_unreaped_zombie() -> std::process::Child {
5282        let child = std::process::Command::new("true")
5283            .stdin(std::process::Stdio::null())
5284            .stdout(std::process::Stdio::null())
5285            .stderr(std::process::Stdio::null())
5286            .spawn()
5287            .expect("spawn zombie stand-in");
5288        let pid = child.id();
5289        let started = Instant::now();
5290        while !is_zombie(pid) {
5291            assert!(
5292                started.elapsed() < Duration::from_secs(5),
5293                "stand-in child should become a zombie within 5s"
5294            );
5295            std::thread::sleep(Duration::from_millis(10));
5296        }
5297        // Return WITHOUT reaping — the handle still owns an unwaited zombie.
5298        child
5299    }
5300
5301    /// Regression test for issue #91: the exit-marker terminal path
5302    /// (`poll_task` -> `finalize_from_marker`) must REAP the direct child
5303    /// handle, not merely drop it. Dropping a `std::process::Child` does not
5304    /// `wait()` on Unix, so the exited child lingers as a `[mv] <defunct>`
5305    /// zombie until AFT exits.
5306    ///
5307    /// We install a known-unreaped zombie into the task's child slot and
5308    /// drive the marker finalize path, then assert the child is gone (reaped)
5309    /// rather than still `<defunct>`.
5310    #[cfg(unix)]
5311    #[test]
5312    fn finalize_from_marker_reaps_child_no_zombie() {
5313        use std::sync::atomic::Ordering;
5314
5315        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5316        let dir = tempfile::tempdir().unwrap();
5317        let task_id = registry
5318            .spawn(
5319                QUICK_SUCCESS_COMMAND,
5320                "session".to_string(),
5321                dir.path().to_path_buf(),
5322                HashMap::new(),
5323                Some(Duration::from_secs(30)),
5324                dir.path().to_path_buf(),
5325                10,
5326                true,
5327                false,
5328                Some(dir.path().to_path_buf()),
5329            )
5330            .unwrap();
5331
5332        // Stop the watchdog so the ONLY terminal-transition path under test
5333        // is the exit-marker finalize (not reap_child's try_wait, which would
5334        // reap the child for us and mask the bug).
5335        registry.inner.shutdown.store(true, Ordering::SeqCst);
5336        std::thread::sleep(Duration::from_millis(550));
5337
5338        let task = registry.task_for_session(&task_id, "session").unwrap();
5339
5340        // Wait for the wrapper's exit marker to land. We deliberately do NOT
5341        // call try_wait()/wait() on the real child here — doing so would reap
5342        // it and defeat the test.
5343        let started = Instant::now();
5344        while !task.paths.exit.exists() {
5345            assert!(
5346                started.elapsed() < Duration::from_secs(5),
5347                "exit marker should land quickly for `true`"
5348            );
5349            std::thread::sleep(Duration::from_millis(20));
5350        }
5351
5352        // Reset to a fresh Running shape and install a guaranteed-unreaped
5353        // zombie as the child handle, so the finalize path's reap behavior is
5354        // exercised deterministically regardless of how the real child was
5355        // handled. Persist Running so update_task's terminal-rollback guard
5356        // sees a non-terminal starting point.
5357        let zombie_pid;
5358        {
5359            let mut state = task.state.lock().unwrap();
5360            state.metadata.status = BgTaskStatus::Running;
5361            state.metadata.status_reason = None;
5362            state.metadata.exit_code = None;
5363            state.metadata.finished_at = None;
5364            state.metadata.duration_ms = None;
5365            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5366                .expect("persist reset Running metadata");
5367            let zombie = spawn_unreaped_zombie();
5368            zombie_pid = zombie.id();
5369            state.runtime = TaskRuntime::Piped(Some(zombie));
5370        }
5371        *task.terminal_at.lock().unwrap() = None;
5372
5373        // Precondition: the installed child is genuinely a `<defunct>` zombie.
5374        assert!(
5375            is_zombie(zombie_pid),
5376            "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5377        );
5378
5379        // Drive the exit-marker terminal path. Before the fix this nulled the
5380        // Child handle without wait(), leaving the zombie behind.
5381        registry.poll_task(&task).unwrap();
5382
5383        {
5384            let state = task.state.lock().unwrap();
5385            assert!(
5386                matches!(state.runtime, TaskRuntime::Piped(None)),
5387                "child handle must be released after marker finalize"
5388            );
5389            assert!(
5390                state.metadata.status.is_terminal(),
5391                "task must be terminal after marker finalize: {:?}",
5392                state.metadata.status
5393            );
5394        }
5395
5396        // The core assertion: the child must have been REAPED, not just
5397        // dropped. A reaped PID has no `ps` row (or at minimum is not 'Z').
5398        assert!(
5399            !is_zombie(zombie_pid),
5400            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5401             after the exit-marker terminal transition"
5402        );
5403    }
5404
5405    /// Companion to the above for the kill path: when a kill observes an
5406    /// already-present exit marker (the child finished on its own first), it
5407    /// must reap the child handle rather than dropping it.
5408    #[cfg(unix)]
5409    #[test]
5410    fn kill_with_existing_marker_reaps_child_no_zombie() {
5411        use std::sync::atomic::Ordering;
5412
5413        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5414        let dir = tempfile::tempdir().unwrap();
5415        let task_id = registry
5416            .spawn(
5417                QUICK_SUCCESS_COMMAND,
5418                "session".to_string(),
5419                dir.path().to_path_buf(),
5420                HashMap::new(),
5421                Some(Duration::from_secs(30)),
5422                dir.path().to_path_buf(),
5423                10,
5424                true,
5425                false,
5426                Some(dir.path().to_path_buf()),
5427            )
5428            .unwrap();
5429
5430        registry.inner.shutdown.store(true, Ordering::SeqCst);
5431        std::thread::sleep(Duration::from_millis(550));
5432
5433        let task = registry.task_for_session(&task_id, "session").unwrap();
5434
5435        let started = Instant::now();
5436        while !task.paths.exit.exists() {
5437            assert!(
5438                started.elapsed() < Duration::from_secs(5),
5439                "exit marker should land quickly for `true`"
5440            );
5441            std::thread::sleep(Duration::from_millis(20));
5442        }
5443
5444        let zombie_pid;
5445        {
5446            let mut state = task.state.lock().unwrap();
5447            state.metadata.status = BgTaskStatus::Running;
5448            state.metadata.status_reason = None;
5449            state.metadata.exit_code = None;
5450            state.metadata.finished_at = None;
5451            state.metadata.duration_ms = None;
5452            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5453                .expect("persist reset Running metadata");
5454            let zombie = spawn_unreaped_zombie();
5455            zombie_pid = zombie.id();
5456            state.runtime = TaskRuntime::Piped(Some(zombie));
5457        }
5458        *task.terminal_at.lock().unwrap() = None;
5459
5460        assert!(
5461            is_zombie(zombie_pid),
5462            "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5463        );
5464
5465        // Kill observes the existing marker and finalizes from it.
5466        registry
5467            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5468            .expect("kill should succeed");
5469
5470        {
5471            let state = task.state.lock().unwrap();
5472            assert!(
5473                matches!(state.runtime, TaskRuntime::Piped(None)),
5474                "child handle must be released after marker-aware kill"
5475            );
5476            assert!(state.metadata.status.is_terminal());
5477        }
5478
5479        assert!(
5480            !is_zombie(zombie_pid),
5481            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5482             after a marker-aware kill"
5483        );
5484    }
5485
5486    #[test]
5487    fn cleanup_finished_keeps_running_tasks() {
5488        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5489        let dir = tempfile::tempdir().unwrap();
5490        let task_id = registry
5491            .spawn(
5492                LONG_RUNNING_COMMAND,
5493                "session".to_string(),
5494                dir.path().to_path_buf(),
5495                HashMap::new(),
5496                Some(Duration::from_secs(30)),
5497                dir.path().to_path_buf(),
5498                10,
5499                true,
5500                false,
5501                Some(dir.path().to_path_buf()),
5502            )
5503            .unwrap();
5504
5505        registry.cleanup_finished(Duration::ZERO);
5506
5507        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5508        let _ = registry.kill(&task_id, "session");
5509    }
5510
5511    #[cfg(windows)]
5512    fn wait_for_file(path: &Path) -> String {
5513        let started = Instant::now();
5514        loop {
5515            if path.exists() {
5516                return fs::read_to_string(path).expect("read file");
5517            }
5518            assert!(
5519                started.elapsed() < Duration::from_secs(30),
5520                "timed out waiting for {}",
5521                path.display()
5522            );
5523            std::thread::sleep(Duration::from_millis(100));
5524        }
5525    }
5526
5527    #[cfg(windows)]
5528    fn spawn_windows_registry_command(
5529        command: &str,
5530    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5531        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5532        let dir = tempfile::tempdir().unwrap();
5533        let task_id = registry
5534            .spawn(
5535                command,
5536                "session".to_string(),
5537                dir.path().to_path_buf(),
5538                HashMap::new(),
5539                Some(Duration::from_secs(30)),
5540                dir.path().to_path_buf(),
5541                10,
5542                false,
5543                false,
5544                Some(dir.path().to_path_buf()),
5545            )
5546            .unwrap();
5547        (registry, dir, task_id)
5548    }
5549
5550    #[cfg(windows)]
5551    #[test]
5552    fn windows_spawn_writes_exit_marker_for_zero_exit() {
5553        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5554        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5555
5556        let content = wait_for_file(&exit_path);
5557
5558        assert_eq!(content.trim(), "0");
5559    }
5560
5561    #[cfg(windows)]
5562    #[test]
5563    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5564        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5565        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5566
5567        let content = wait_for_file(&exit_path);
5568
5569        assert_eq!(content.trim(), "42");
5570    }
5571
5572    #[cfg(windows)]
5573    #[test]
5574    fn windows_spawn_captures_stdout_to_disk() {
5575        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5576        let task = registry.task_for_session(&task_id, "session").unwrap();
5577        let stdout_path = task.paths.stdout.clone();
5578        let exit_path = task.paths.exit.clone();
5579
5580        let _ = wait_for_file(&exit_path);
5581        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5582
5583        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5584    }
5585
5586    #[cfg(windows)]
5587    #[test]
5588    fn windows_spawn_uses_pwsh_when_available() {
5589        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
5590        // (We intentionally pass None for shell_env to keep this test
5591        // independent of the runner's actual env.)
5592        let candidates = crate::windows_shell::shell_candidates_with(
5593            |binary| match binary {
5594                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5595                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5596                _ => None,
5597            },
5598            || None,
5599        );
5600        let shell = candidates.first().expect("at least one candidate").clone();
5601        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5602        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5603    }
5604
5605    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
5606    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
5607    /// evaluation is the default for batch files; parse-time expansion only
5608    /// applies to compound `&`-chained inline commands). Capturing
5609    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
5610    /// command runs records the real run-time exit code.
5611    #[cfg(windows)]
5612    #[test]
5613    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5614        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5615        let script =
5616            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5617
5618        // Batch wrapper: capture exit code into CODE on the line after the
5619        // user command, then write CODE to a temp marker file before
5620        // atomic-renaming it into place.
5621        assert!(
5622            script.contains("set CODE=%ERRORLEVEL%"),
5623            "wrapper must capture exit code into CODE: {script}"
5624        );
5625        assert!(
5626            script.contains("echo %CODE% >"),
5627            "wrapper must echo CODE to a temp marker file: {script}"
5628        );
5629        assert!(
5630            script.contains("move /Y"),
5631            "wrapper must use atomic move to write the marker: {script}"
5632        );
5633        // move output must be redirected to nul to avoid polluting the
5634        // user's captured stdout with "1 file(s) moved." lines.
5635        assert!(
5636            script.contains("> nul"),
5637            "wrapper must redirect move output to nul: {script}"
5638        );
5639        // exit /B %CODE% propagates the real exit code so wait() sees it.
5640        assert!(
5641            script.contains("exit /B %CODE%"),
5642            "wrapper must propagate the captured exit code: {script}"
5643        );
5644        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5645        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5646    }
5647
5648    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
5649    /// written to a `.bat` file where batch-line evaluation captures
5650    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
5651    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
5652    /// internal `"`-quoting from `cmd_quote`).
5653    #[cfg(windows)]
5654    #[test]
5655    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5656        use crate::windows_shell::WindowsShell;
5657        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5658        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5659        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5660        assert_eq!(
5661            args_strs,
5662            vec!["/D", "/S", "/C", "echo wrapped"],
5663            "Cmd::bg_command must prepend /D /S /C"
5664        );
5665    }
5666
5667    /// PowerShell variants don't need `/V:ON`-style flags; their
5668    /// `bg_command()` args stay on the standard `-Command` path.
5669    #[cfg(windows)]
5670    #[test]
5671    fn windows_shell_pwsh_bg_command_uses_standard_args() {
5672        use crate::windows_shell::WindowsShell;
5673        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5674        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5675        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5676        assert!(
5677            args_strs.contains(&"-Command"),
5678            "Pwsh::bg_command must use -Command: {args_strs:?}"
5679        );
5680        assert!(
5681            args_strs.contains(&"Get-Date"),
5682            "Pwsh::bg_command must include the user command body"
5683        );
5684    }
5685}