Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::context::SharedProgressSender;
16use crate::harness::Harness;
17use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
18
19#[cfg(unix)]
20use std::os::unix::process::CommandExt;
21#[cfg(windows)]
22use std::os::windows::process::CommandExt;
23
24use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
25use super::persistence::{
26    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
27    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
28    ExitMarker, PersistedTask, TaskPaths,
29};
30use super::process::is_process_alive;
31#[cfg(unix)]
32use super::process::terminate_pgid;
33#[cfg(windows)]
34use super::process::terminate_pid;
35use super::pty_process::spawn_pty_for_command;
36use super::pty_runtime::PtyRuntime;
37use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
38use super::{BgTaskInfo, BgTaskStatus};
39// Note: `resolve_windows_shell` is no longer imported at module scope —
40// production code in `spawn_detached_child` uses `shell_candidates()`
41// with retry instead, and the function remains in `windows_shell.rs`
42// for tests and as a future helper.
43
44/// Default timeout for background bash tasks: 30 minutes.
45/// Agents can override per-call via the `timeout` parameter (in ms).
46const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
47const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
48const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
49const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
50
51/// Tail-bytes captured into BashCompletedFrame and BgCompletion records so the
52/// plugin can inline a preview into the system-reminder. Sized for ~3-4 lines
53/// of typical command output (git status, test results, exit messages) — short
54/// enough that round-tripping multiple completions in one reminder stays well
55/// under the model's context budget but long enough that most successful runs
56/// don't need a follow-up `bash_status` call.
57const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
58const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
59
60#[derive(Debug, Clone, Serialize)]
61pub struct BgCompletion {
62    pub task_id: String,
63    /// Intentionally omitted from serialized completion payloads: push frames
64    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
65    #[serde(skip_serializing)]
66    pub session_id: String,
67    pub status: BgTaskStatus,
68    pub exit_code: Option<i32>,
69    pub command: String,
70    /// Tail of stdout+stderr (≤300 bytes) at completion time, read once and
71    /// cached so push-frame consumers and `bash_drain_completions` callers see
72    /// the same preview without racing against later output rotation. Empty
73    /// when not captured (e.g., persisted task seen on startup before buffer
74    /// reattachment).
75    #[serde(default, skip_serializing_if = "String::is_empty")]
76    pub output_preview: String,
77    /// True when the captured tail is shorter than the actual output (because
78    /// rotation occurred or the output exceeds the preview cap). Plugins use
79    /// this to render a `…` prefix and signal that `bash_status` would return
80    /// more.
81    #[serde(default, skip_serializing_if = "is_false")]
82    pub output_truncated: bool,
83    /// Token count for raw stdout+stderr before compression. Omitted when any
84    /// stream exceeds the 128 KiB tokenization cap.
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub original_tokens: Option<u32>,
87    /// Token count for the compressed output generated from the same capped
88    /// raw payload. Omitted when raw tokenization is skipped.
89    #[serde(default, skip_serializing_if = "Option::is_none")]
90    pub compressed_tokens: Option<u32>,
91    /// True when a stream exceeded the tokenization cap and counts are absent.
92    #[serde(default, skip_serializing_if = "is_false")]
93    pub tokens_skipped: bool,
94}
95
96fn is_false(v: &bool) -> bool {
97    !*v
98}
99
100#[derive(Debug, Clone, Serialize)]
101pub struct BgTaskSnapshot {
102    #[serde(flatten)]
103    pub info: BgTaskInfo,
104    pub exit_code: Option<i32>,
105    pub child_pid: Option<u32>,
106    pub workdir: String,
107    pub output_preview: String,
108    pub output_truncated: bool,
109    pub output_path: Option<String>,
110    pub stderr_path: Option<String>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub pty_rows: Option<u16>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub pty_cols: Option<u16>,
115}
116
117#[derive(Clone)]
118pub struct BgTaskRegistry {
119    pub(crate) inner: Arc<RegistryInner>,
120}
121
122pub(crate) struct RegistryInner {
123    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
124    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
125    pub(crate) progress_sender: SharedProgressSender,
126    watchdog_started: AtomicBool,
127    pub(crate) shutdown: AtomicBool,
128    pub(crate) long_running_reminder_enabled: AtomicBool,
129    pub(crate) long_running_reminder_interval_ms: AtomicU64,
130    persisted_gc_started: AtomicBool,
131    #[cfg(test)]
132    persisted_gc_runs: AtomicU64,
133    /// Output compression callback. Set by `AppContext` after construction.
134    /// Takes (command, raw_output) and returns compressed text. Called from
135    /// the watchdog thread when a task reaches a terminal state and from
136    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
137    /// uncompressed.
138    pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
139    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
140    pub(crate) db_harness: RwLock<Option<String>>,
141    pub(crate) wake_tx: crossbeam_channel::Sender<()>,
142    pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
143    pub(crate) watch_registry: Mutex<WatchRegistry>,
144}
145
146pub(crate) struct BgTask {
147    pub(crate) task_id: String,
148    pub(crate) session_id: String,
149    pub(crate) paths: TaskPaths,
150    pub(crate) started: Instant,
151    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
152    pub(crate) terminal_at: Mutex<Option<Instant>>,
153    pub(crate) state: Mutex<BgTaskState>,
154}
155
156pub(crate) enum TaskRuntime {
157    Piped(Option<Child>),
158    Pty(Option<PtyRuntime>),
159}
160
161pub(crate) struct BgTaskState {
162    pub(crate) metadata: PersistedTask,
163    pub(crate) runtime: TaskRuntime,
164    pub(crate) detached: bool,
165    /// True once `reap_child` has observed the direct child handle's exit
166    /// via `try_wait()`. Used by the two-pass watchdog to skip the racy
167    /// `is_process_alive(child_pid)` probe on the second pass — we already
168    /// have authoritative evidence that the child is dead, no need to
169    /// re-verify via PID liveness which is unreliable on Windows where
170    /// PIDs can be recycled within seconds.
171    ///
172    /// Remains `false` on replay-restored tasks (those have a `child_pid`
173    /// but never observed exit via this process's `try_wait()`), so those
174    /// continue to fall through to the `is_process_alive` probe path.
175    pub(crate) child_exit_observed: bool,
176    pub(crate) buffer: BgBuffer,
177    /// PTY-only: set for timeout kill intent before signaling the child.
178    pub(crate) pending_terminal_override: Option<BgTaskStatus>,
179}
180
181impl BgTaskRegistry {
182    pub fn new(progress_sender: SharedProgressSender) -> Self {
183        let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
184        Self {
185            inner: Arc::new(RegistryInner {
186                tasks: Mutex::new(HashMap::new()),
187                completions: Mutex::new(VecDeque::new()),
188                progress_sender,
189                watchdog_started: AtomicBool::new(false),
190                shutdown: AtomicBool::new(false),
191                long_running_reminder_enabled: AtomicBool::new(true),
192                long_running_reminder_interval_ms: AtomicU64::new(600_000),
193                persisted_gc_started: AtomicBool::new(false),
194                #[cfg(test)]
195                persisted_gc_runs: AtomicU64::new(0),
196                compressor: Mutex::new(None),
197                db_pool: RwLock::new(None),
198                db_harness: RwLock::new(None),
199                wake_tx,
200                wake_rx,
201                watch_registry: Mutex::new(WatchRegistry::default()),
202            }),
203        }
204    }
205
206    pub fn set_harness(&self, harness: Harness) {
207        if let Ok(mut slot) = self.inner.db_harness.write() {
208            *slot = Some(harness.as_str().to_string());
209        }
210    }
211
212    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
213        if let Ok(mut slot) = self.inner.db_pool.write() {
214            *slot = Some(conn);
215        }
216    }
217
218    pub fn clear_db_pool(&self) {
219        if let Ok(mut slot) = self.inner.db_pool.write() {
220            *slot = None;
221        }
222    }
223
224    /// Install the output-compression callback. Called by `main.rs` after
225    /// `AppContext` is constructed so that snapshot/completion paths can
226    /// invoke `compress::compress_with_registry` without holding a context
227    /// reference. When called multiple times, the latest installation wins.
228    pub fn set_compressor<F>(&self, compressor: F)
229    where
230        F: Fn(&str, String) -> String + Send + Sync + 'static,
231    {
232        if let Ok(mut slot) = self.inner.compressor.lock() {
233            *slot = Some(Box::new(compressor));
234        }
235    }
236
237    /// Apply the installed compressor (if any) to `output`. Returns `output`
238    /// untouched when no compressor is installed.
239    pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
240        let Ok(slot) = self.inner.compressor.lock() else {
241            return output;
242        };
243        match slot.as_ref() {
244            Some(compressor) => compressor(command, output),
245            None => output,
246        }
247    }
248
249    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
250        write_task(&paths.json, metadata)?;
251        self.dual_write_task(paths, metadata);
252        Ok(())
253    }
254
255    fn update_task_metadata<F>(
256        &self,
257        paths: &TaskPaths,
258        update: F,
259    ) -> std::io::Result<PersistedTask>
260    where
261        F: FnOnce(&mut PersistedTask),
262    {
263        let metadata = update_task(&paths.json, update)?;
264        self.dual_write_task(paths, &metadata);
265        Ok(metadata)
266    }
267
268    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
269        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
270        let Some(pool) = pool else {
271            return;
272        };
273        let harness = self
274            .inner
275            .db_harness
276            .read()
277            .ok()
278            .and_then(|slot| slot.clone());
279        let Some(harness) = harness else {
280            crate::slog_warn!(
281                "dual-write bash_task to DB skipped for {}: harness not configured",
282                metadata.task_id
283            );
284            return;
285        };
286        let row = match metadata.to_bash_task_row(&harness, paths) {
287            Ok(row) => row,
288            Err(error) => {
289                crate::slog_warn!(
290                    "dual-write bash_task to DB failed for {}: {}",
291                    metadata.task_id,
292                    error
293                );
294                return;
295            }
296        };
297        let conn = match pool.lock() {
298            Ok(conn) => conn,
299            Err(_) => {
300                crate::slog_warn!(
301                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
302                    metadata.task_id
303                );
304                return;
305            }
306        };
307        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
308            crate::slog_warn!(
309                "dual-write bash_task to DB failed for {}: {}",
310                metadata.task_id,
311                error
312            );
313        }
314    }
315
316    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
317        self.inner
318            .long_running_reminder_enabled
319            .store(enabled, Ordering::SeqCst);
320        self.inner
321            .long_running_reminder_interval_ms
322            .store(interval_ms, Ordering::SeqCst);
323    }
324
325    #[cfg(unix)]
326    #[allow(clippy::too_many_arguments)]
327    pub fn spawn(
328        &self,
329        command: &str,
330        session_id: String,
331        workdir: PathBuf,
332        env: HashMap<String, String>,
333        timeout: Option<Duration>,
334        storage_dir: PathBuf,
335        max_running: usize,
336        notify_on_completion: bool,
337        compressed: bool,
338        project_root: Option<PathBuf>,
339    ) -> Result<String, String> {
340        self.start_watchdog();
341
342        let running = self.running_count();
343        if running >= max_running {
344            return Err(format!(
345                "background bash task limit exceeded: {running} running (max {max_running})"
346            ));
347        }
348
349        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
350        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
351        let task_id = self.generate_unique_task_id()?;
352        let paths = task_paths(&storage_dir, &session_id, &task_id);
353        fs::create_dir_all(&paths.dir)
354            .map_err(|e| format!("failed to create background task dir: {e}"))?;
355
356        let mut metadata = PersistedTask::starting(
357            task_id.clone(),
358            session_id.clone(),
359            command.to_string(),
360            workdir.clone(),
361            project_root,
362            timeout_ms,
363            notify_on_completion,
364            compressed,
365        );
366        self.persist_task(&paths, &metadata)
367            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
368
369        // Pre-create capture files so the watchdog/buffer can always
370        // open them for reading. The spawn helper opens its own handles
371        // per attempt because each `Command::spawn()` consumes them.
372        create_capture_file(&paths.stdout)
373            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
374        create_capture_file(&paths.stderr)
375            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
376
377        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
378            Ok(child) => child,
379            Err(error) => {
380                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
381                let _ = delete_task_bundle(&paths);
382                return Err(error);
383            }
384        };
385
386        let child_pid = child.id();
387        metadata.mark_running(child_pid, child_pid as i32);
388        self.persist_task(&paths, &metadata)
389            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
390
391        let task = Arc::new(BgTask {
392            task_id: task_id.clone(),
393            session_id,
394            paths: paths.clone(),
395            started: Instant::now(),
396            last_reminder_at: Mutex::new(None),
397            terminal_at: Mutex::new(None),
398            state: Mutex::new(BgTaskState {
399                metadata,
400                runtime: TaskRuntime::Piped(Some(child)),
401                detached: false,
402                child_exit_observed: false,
403                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
404                pending_terminal_override: None,
405            }),
406        });
407
408        self.inner
409            .tasks
410            .lock()
411            .map_err(|_| "background task registry lock poisoned".to_string())?
412            .insert(task_id.clone(), task);
413
414        Ok(task_id)
415    }
416
417    #[allow(clippy::too_many_arguments)]
418    pub fn spawn_pty(
419        &self,
420        command: &str,
421        session_id: String,
422        workdir: PathBuf,
423        env: HashMap<String, String>,
424        timeout: Option<Duration>,
425        storage_dir: PathBuf,
426        max_running: usize,
427        notify_on_completion: bool,
428        compressed: bool,
429        project_root: Option<PathBuf>,
430        rows: u16,
431        cols: u16,
432    ) -> Result<String, String> {
433        self.start_watchdog();
434
435        let running = self.running_count();
436        if running >= max_running {
437            return Err(format!(
438                "background bash task limit exceeded: {running} running (max {max_running})"
439            ));
440        }
441
442        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
443        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
444        let task_id = self.generate_unique_task_id()?;
445        let paths = task_paths(&storage_dir, &session_id, &task_id);
446        fs::create_dir_all(&paths.dir)
447            .map_err(|e| format!("failed to create background task dir: {e}"))?;
448
449        let mut metadata = PersistedTask::starting(
450            task_id.clone(),
451            session_id.clone(),
452            command.to_string(),
453            workdir.clone(),
454            project_root,
455            timeout_ms,
456            notify_on_completion,
457            compressed,
458        );
459        metadata.mode = BgMode::Pty;
460        metadata.pty_rows = Some(rows);
461        metadata.pty_cols = Some(cols);
462        self.persist_task(&paths, &metadata)
463            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
464        create_capture_file(&paths.pty)
465            .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
466
467        let runtime = match spawn_pty_for_command(
468            &task_id,
469            &session_id,
470            command,
471            &paths,
472            &workdir,
473            &env,
474            rows,
475            cols,
476            self.inner.wake_tx.clone(),
477        ) {
478            Ok(runtime) => runtime,
479            Err(error) => {
480                crate::slog_warn!(
481                    "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
482                );
483                let _ = delete_task_bundle(&paths);
484                return Err(error);
485            }
486        };
487
488        if let Some(child_pid) = runtime.child_pid {
489            metadata.mark_running(child_pid, child_pid as i32);
490        } else {
491            metadata.status = BgTaskStatus::Running;
492            metadata.pgid = None;
493        }
494        self.persist_task(&paths, &metadata)
495            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
496
497        let task = Arc::new(BgTask {
498            task_id: task_id.clone(),
499            session_id,
500            paths: paths.clone(),
501            started: Instant::now(),
502            last_reminder_at: Mutex::new(None),
503            terminal_at: Mutex::new(None),
504            state: Mutex::new(BgTaskState {
505                metadata,
506                runtime: TaskRuntime::Pty(Some(runtime)),
507                detached: false,
508                child_exit_observed: false,
509                buffer: BgBuffer::pty(paths.pty.clone()),
510                pending_terminal_override: None,
511            }),
512        });
513
514        self.inner
515            .tasks
516            .lock()
517            .map_err(|_| "background task registry lock poisoned".to_string())?
518            .insert(task_id.clone(), task);
519
520        Ok(task_id)
521    }
522
523    #[cfg(windows)]
524    #[allow(clippy::too_many_arguments)]
525    pub fn spawn(
526        &self,
527        command: &str,
528        session_id: String,
529        workdir: PathBuf,
530        env: HashMap<String, String>,
531        timeout: Option<Duration>,
532        storage_dir: PathBuf,
533        max_running: usize,
534        notify_on_completion: bool,
535        compressed: bool,
536        project_root: Option<PathBuf>,
537    ) -> Result<String, String> {
538        self.start_watchdog();
539
540        let running = self.running_count();
541        if running >= max_running {
542            return Err(format!(
543                "background bash task limit exceeded: {running} running (max {max_running})"
544            ));
545        }
546
547        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
548        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
549        let task_id = self.generate_unique_task_id()?;
550        let paths = task_paths(&storage_dir, &session_id, &task_id);
551        fs::create_dir_all(&paths.dir)
552            .map_err(|e| format!("failed to create background task dir: {e}"))?;
553
554        let mut metadata = PersistedTask::starting(
555            task_id.clone(),
556            session_id.clone(),
557            command.to_string(),
558            workdir.clone(),
559            project_root,
560            timeout_ms,
561            notify_on_completion,
562            compressed,
563        );
564        self.persist_task(&paths, &metadata)
565            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
566
567        // Capture files are pre-created so the watchdog/buffer can always
568        // open them for reading even if the child hasn't written anything
569        // yet. The spawn helper opens its own handles per attempt because
570        // each `Command::spawn()` consumes them, and on Windows we may
571        // retry across multiple shell candidates if the first one fails.
572        create_capture_file(&paths.stdout)
573            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
574        create_capture_file(&paths.stderr)
575            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
576
577        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
578            Ok(child) => child,
579            Err(error) => {
580                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
581                let _ = delete_task_bundle(&paths);
582                return Err(error);
583            }
584        };
585
586        let child_pid = child.id();
587        metadata.status = BgTaskStatus::Running;
588        metadata.child_pid = Some(child_pid);
589        metadata.pgid = None;
590        self.persist_task(&paths, &metadata)
591            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
592
593        let task = Arc::new(BgTask {
594            task_id: task_id.clone(),
595            session_id,
596            paths: paths.clone(),
597            started: Instant::now(),
598            last_reminder_at: Mutex::new(None),
599            terminal_at: Mutex::new(None),
600            state: Mutex::new(BgTaskState {
601                metadata,
602                runtime: TaskRuntime::Piped(Some(child)),
603                detached: false,
604                child_exit_observed: false,
605                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
606                pending_terminal_override: None,
607            }),
608        });
609
610        self.inner
611            .tasks
612            .lock()
613            .map_err(|_| "background task registry lock poisoned".to_string())?
614            .insert(task_id.clone(), task);
615
616        Ok(task_id)
617    }
618
619    pub fn write_pty(
620        &self,
621        task_id: &str,
622        session_id: &str,
623        input: &[u8],
624    ) -> Result<usize, String> {
625        let task = self
626            .task_for_session(task_id, session_id)
627            .ok_or_else(|| "task_not_found".to_string())?;
628
629        let writer = {
630            let state = task
631                .state
632                .lock()
633                .map_err(|_| "background task lock poisoned".to_string())?;
634            if state.metadata.mode != BgMode::Pty {
635                return Err("task_not_pty".to_string());
636            }
637            if state.metadata.status.is_terminal() {
638                return Err("task_exited".to_string());
639            }
640            match &state.runtime {
641                TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
642                TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
643                TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
644            }
645        };
646
647        let mut writer = writer
648            .lock()
649            .map_err(|_| "PTY writer lock poisoned".to_string())?;
650        writer
651            .write_all(input)
652            .map_err(|error| format!("failed to write to PTY: {error}"))?;
653        writer
654            .flush()
655            .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
656        Ok(input.len())
657    }
658
659    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
660        self.replay_session_inner(storage_dir, session_id, None)
661    }
662
663    pub fn replay_session_for_project(
664        &self,
665        storage_dir: &Path,
666        session_id: &str,
667        project_root: &Path,
668    ) -> Result<(), String> {
669        self.replay_session_inner(storage_dir, session_id, Some(project_root))
670    }
671
672    fn replay_session_inner(
673        &self,
674        storage_dir: &Path,
675        session_id: &str,
676        project_root: Option<&Path>,
677    ) -> Result<(), String> {
678        self.start_watchdog();
679        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
680            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
681                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
682            }
683        }
684
685        let canonical_project = project_root.map(canonicalized_path);
686        // Replay strategy: DB is the post-v0.27 source of truth. Disk
687        // fallback handles pre-v0.27 tasks that haven't been migrated and
688        // the cold-start `__default__` namespace (configure runs before any
689        // user session exists, so plugin-init triggers a session-less DB
690        // lookup that will be empty until a real session writes a task).
691        //
692        // We deliberately keep the empty-DB / empty-disk path silent — it's
693        // the normal startup case and would otherwise fire on every configure
694        // (see GitHub user report against v0.27.0). INFO-level logs only when
695        // disk actually returned tasks (real migration signal); WARN when the
696        // DB lookup itself errored.
697        let tasks = match self.replay_session_from_db(session_id) {
698            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
699            Some(Ok(_)) => {
700                let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
701                if !disk_tasks.is_empty() {
702                    crate::slog_info!(
703                        "bash task replay: 0 in DB for session {}, {} from disk fallback",
704                        session_id,
705                        disk_tasks.len()
706                    );
707                }
708                disk_tasks
709            }
710            Some(Err(error)) => {
711                crate::slog_warn!(
712                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
713                    session_id,
714                    error
715                );
716                self.replay_session_from_disk(storage_dir, session_id)?
717            }
718            None => {
719                // DB pool unconfigured — common in tests + before harness is set.
720                self.replay_session_from_disk(storage_dir, session_id)?
721            }
722        };
723
724        for mut metadata in tasks {
725            if metadata.session_id != session_id {
726                continue;
727            }
728            if let Some(canonical_project) = canonical_project.as_deref() {
729                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
730                if metadata_project.as_deref() != Some(canonical_project) {
731                    continue;
732                }
733            }
734
735            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
736            match metadata.status {
737                BgTaskStatus::Starting => {
738                    let completion_was_delivered = metadata.completion_delivered;
739                    metadata.mark_terminal(
740                        BgTaskStatus::Failed,
741                        None,
742                        Some("spawn aborted".to_string()),
743                    );
744                    metadata.completion_delivered |= completion_was_delivered;
745                    let _ = self.persist_task(&paths, &metadata);
746                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
747                    self.insert_rehydrated_task(metadata, paths, true)?;
748                }
749                BgTaskStatus::Running | BgTaskStatus::Killing => {
750                    if metadata.mode == BgMode::Pty {
751                        if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
752                            let completion_was_delivered = metadata.completion_delivered;
753                            metadata = terminal_metadata_from_marker(metadata, marker, None);
754                            metadata.completion_delivered |= completion_was_delivered;
755                            let _ = self.persist_task(&paths, &metadata);
756                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
757                            self.insert_rehydrated_task(metadata, paths, true)?;
758                        } else if metadata.status.is_terminal() {
759                            self.insert_rehydrated_task(metadata, paths, true)?;
760                        } else {
761                            let completion_was_delivered = metadata.completion_delivered;
762                            metadata.mark_terminal(
763                                BgTaskStatus::Killed,
764                                None,
765                                Some("pty_lost_on_bridge_restart".to_string()),
766                            );
767                            metadata.completion_delivered |= completion_was_delivered;
768                            let _ = self.persist_task(&paths, &metadata);
769                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
770                            self.insert_rehydrated_task(metadata, paths, true)?;
771                        }
772                    } else if self.running_metadata_is_stale(&metadata) {
773                        let completion_was_delivered = metadata.completion_delivered;
774                        metadata.mark_terminal(
775                            BgTaskStatus::Killed,
776                            None,
777                            Some("orphaned (>24h)".to_string()),
778                        );
779                        metadata.completion_delivered |= completion_was_delivered;
780                        if !paths.exit.exists() {
781                            let _ = write_kill_marker_if_absent(&paths.exit);
782                        }
783                        let _ = self.persist_task(&paths, &metadata);
784                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
785                        self.insert_rehydrated_task(metadata, paths, true)?;
786                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
787                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
788                            "recovered from inconsistent killing state on replay".to_string()
789                        });
790                        if reason.is_some() {
791                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
792                            metadata.task_id);
793                        }
794                        let completion_was_delivered = metadata.completion_delivered;
795                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
796                        metadata.completion_delivered |= completion_was_delivered;
797                        let _ = self.persist_task(&paths, &metadata);
798                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
799                        self.insert_rehydrated_task(metadata, paths, true)?;
800                    } else if metadata.status == BgTaskStatus::Killing {
801                        if !paths.exit.exists() {
802                            let _ = write_kill_marker_if_absent(&paths.exit);
803                        }
804                        let completion_was_delivered = metadata.completion_delivered;
805                        metadata.mark_terminal(
806                            BgTaskStatus::Killed,
807                            None,
808                            Some("recovered from inconsistent killing state on replay".to_string()),
809                        );
810                        metadata.completion_delivered |= completion_was_delivered;
811                        let _ = self.persist_task(&paths, &metadata);
812                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
813                        self.insert_rehydrated_task(metadata, paths, true)?;
814                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
815                        let completion_was_delivered = metadata.completion_delivered;
816                        metadata.mark_terminal(
817                            BgTaskStatus::Failed,
818                            None,
819                            Some("process exited without exit marker".to_string()),
820                        );
821                        metadata.completion_delivered |= completion_was_delivered;
822                        let _ = self.persist_task(&paths, &metadata);
823                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
824                        self.insert_rehydrated_task(metadata, paths, true)?;
825                    } else {
826                        self.insert_rehydrated_task(metadata, paths, true)?;
827                    }
828                }
829                _ if metadata.status.is_terminal() => {
830                    // Borrow `paths` for the completion enqueue BEFORE
831                    // `insert_rehydrated_task` consumes it. The completion
832                    // helper only reads from `paths` (stdout/stderr/exit) to
833                    // reconstruct a tail preview, so it must see the same
834                    // paths the rehydrated task will own.
835                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
836                    self.insert_rehydrated_task(metadata, paths, true)?;
837                }
838                _ => {}
839            }
840        }
841
842        Ok(())
843    }
844
845    fn replay_session_from_db(
846        &self,
847        session_id: &str,
848    ) -> Option<Result<Vec<PersistedTask>, String>> {
849        let pool = self
850            .inner
851            .db_pool
852            .read()
853            .ok()
854            .and_then(|slot| slot.clone())?;
855        let harness = self
856            .inner
857            .db_harness
858            .read()
859            .ok()
860            .and_then(|slot| slot.clone())?;
861        let conn = match pool.lock() {
862            Ok(conn) => conn,
863            Err(_) => return Some(Err("db mutex poisoned".to_string())),
864        };
865        Some(
866            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
867                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
868                .map_err(|error| error.to_string()),
869        )
870    }
871
872    fn replay_session_from_disk(
873        &self,
874        storage_dir: &Path,
875        session_id: &str,
876    ) -> Result<Vec<PersistedTask>, String> {
877        let dir = session_tasks_dir(storage_dir, session_id);
878        if !dir.exists() {
879            return Ok(Vec::new());
880        }
881
882        let entries = fs::read_dir(&dir)
883            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
884        let mut tasks = Vec::new();
885        for entry in entries.flatten() {
886            let path = entry.path();
887            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
888                continue;
889            }
890            match read_task(&path) {
891                Ok(metadata) => tasks.push(metadata),
892                Err(error) => {
893                    crate::slog_warn!(
894                        "quarantining invalid background task metadata {} during replay: {error}",
895                        path.display()
896                    );
897                    if let Err(quarantine_error) =
898                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
899                    {
900                        crate::slog_warn!(
901                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
902                            path.display()
903                        );
904                    }
905                }
906            }
907        }
908        Ok(tasks)
909    }
910
911    pub fn register_watch(
912        &self,
913        task_id: String,
914        pattern: WatchPattern,
915        once: bool,
916    ) -> Result<String, &'static str> {
917        let task = self.task(&task_id).ok_or("task_not_found")?;
918        let (mode, terminal_at_registration, stdout, stderr, pty) = task
919            .state
920            .lock()
921            .map(|state| {
922                (
923                    state.metadata.mode.clone(),
924                    state.metadata.status.is_terminal(),
925                    task.paths.stdout.clone(),
926                    task.paths.stderr.clone(),
927                    task.paths.pty.clone(),
928                )
929            })
930            .map_err(|_| "background_task_lock_poisoned")?;
931
932        let mut terminal_matches = Vec::new();
933        let scanned_terminal = terminal_at_registration;
934        let watch_id = {
935            let mut registry = self
936                .inner
937                .watch_registry
938                .lock()
939                .map_err(|_| "watch_registry_poisoned")?;
940            let watch_id = registry.register(task_id.clone(), pattern, once)?;
941            match &mode {
942                BgMode::Pipes => {
943                    let stdout_key = format!("{task_id}:stdout");
944                    let stderr_key = format!("{task_id}:stderr");
945                    if terminal_at_registration {
946                        registry.set_file_cursor(&stdout_key, 0);
947                        registry.set_file_cursor(&stderr_key, 0);
948                        terminal_matches.extend(registry.scan_file_new_bytes(
949                            &stdout_key,
950                            &task_id,
951                            &stdout,
952                        ));
953                        terminal_matches.extend(registry.scan_file_new_bytes(
954                            &stderr_key,
955                            &task_id,
956                            &stderr,
957                        ));
958                    } else {
959                        registry.prime_file_cursor(&stdout_key, &stdout);
960                        registry.prime_file_cursor(&stderr_key, &stderr);
961                    }
962                }
963                BgMode::Pty => {
964                    let pty_key = format!("{task_id}:pty");
965                    if terminal_at_registration {
966                        registry.set_file_cursor(&pty_key, 0);
967                        terminal_matches
968                            .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
969                    } else {
970                        registry.prime_file_cursor(&pty_key, &pty);
971                    }
972                }
973            }
974            watch_id
975        };
976
977        if task.is_terminal() {
978            if !scanned_terminal {
979                terminal_matches = {
980                    let mut registry = self
981                        .inner
982                        .watch_registry
983                        .lock()
984                        .map_err(|_| "watch_registry_poisoned")?;
985                    match &mode {
986                        BgMode::Pipes => {
987                            let stdout_key = format!("{task_id}:stdout");
988                            let stderr_key = format!("{task_id}:stderr");
989                            registry.set_file_cursor(&stdout_key, 0);
990                            registry.set_file_cursor(&stderr_key, 0);
991                            let mut matches =
992                                registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
993                            matches.extend(registry.scan_file_new_bytes(
994                                &stderr_key,
995                                &task_id,
996                                &stderr,
997                            ));
998                            matches
999                        }
1000                        BgMode::Pty => {
1001                            let pty_key = format!("{task_id}:pty");
1002                            registry.set_file_cursor(&pty_key, 0);
1003                            registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1004                        }
1005                    }
1006                };
1007            }
1008
1009            let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1010            if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1011                if watch_matched {
1012                    let _ = task.set_completion_delivered(true, self);
1013                    self.clear_task_watch_state(&task_id);
1014                }
1015                return Ok(watch_id);
1016            }
1017
1018            let completion = self
1019                .remove_pending_completion(&task_id)
1020                .or_else(|| self.completion_snapshot_for_task(&task, BG_COMPLETION_PREVIEW_BYTES));
1021            if terminal_matches.is_empty() {
1022                if let Some(completion) = completion.as_ref() {
1023                    self.emit_bash_watch_exit(completion);
1024                }
1025            } else {
1026                for pattern_match in terminal_matches {
1027                    self.emit_bash_pattern_match(&task.session_id, pattern_match);
1028                }
1029            }
1030            let _ = task.set_completion_delivered(true, self);
1031            self.clear_task_watch_state(&task_id);
1032        }
1033
1034        Ok(watch_id)
1035    }
1036
1037    pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1038        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1039            registry.unregister(task_id, watch_id);
1040        }
1041    }
1042
1043    pub fn active_watch_count(&self, task_id: &str) -> usize {
1044        self.inner
1045            .watch_registry
1046            .lock()
1047            .map(|registry| registry.active_count(task_id))
1048            .unwrap_or(0)
1049    }
1050
1051    fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1052        self.inner
1053            .watch_registry
1054            .lock()
1055            .map(|registry| {
1056                (
1057                    registry.has_controlled_task(task_id),
1058                    registry.has_matched_task(task_id),
1059                )
1060            })
1061            .unwrap_or((false, false))
1062    }
1063
1064    fn task_has_watch_control(&self, task_id: &str) -> bool {
1065        self.inner
1066            .watch_registry
1067            .lock()
1068            .map(|registry| registry.has_controlled_task(task_id))
1069            .unwrap_or(false)
1070    }
1071
1072    fn clear_task_watch_state(&self, task_id: &str) {
1073        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1074            registry.clear_task(task_id);
1075        }
1076    }
1077
1078    pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1079        let (mode, stdout, stderr, pty) = match task.state.lock() {
1080            Ok(state) => (
1081                state.metadata.mode.clone(),
1082                task.paths.stdout.clone(),
1083                task.paths.stderr.clone(),
1084                task.paths.pty.clone(),
1085            ),
1086            Err(_) => return,
1087        };
1088        let mut matches = Vec::new();
1089        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1090            match mode {
1091                BgMode::Pipes => {
1092                    let stdout_key = format!("{}:stdout", task.task_id);
1093                    let stderr_key = format!("{}:stderr", task.task_id);
1094                    matches.extend(registry.scan_file_new_bytes(
1095                        &stdout_key,
1096                        &task.task_id,
1097                        &stdout,
1098                    ));
1099                    matches.extend(registry.scan_file_new_bytes(
1100                        &stderr_key,
1101                        &task.task_id,
1102                        &stderr,
1103                    ));
1104                }
1105                BgMode::Pty => {
1106                    let pty_key = format!("{}:pty", task.task_id);
1107                    matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1108                }
1109            }
1110        }
1111        for pattern_match in matches {
1112            self.emit_bash_pattern_match(&task.session_id, pattern_match);
1113        }
1114    }
1115
1116    pub fn status(
1117        &self,
1118        task_id: &str,
1119        session_id: &str,
1120        project_root: Option<&Path>,
1121        storage_dir: Option<&Path>,
1122        preview_bytes: usize,
1123    ) -> Option<BgTaskSnapshot> {
1124        let mut task = self.task_for_session(task_id, session_id);
1125        if task.is_none() {
1126            if let Some(storage_dir) = storage_dir {
1127                let _ = self.replay_session(storage_dir, session_id);
1128                task = self.task_for_session(task_id, session_id);
1129            }
1130        }
1131        let Some(task) = task else {
1132            return self.status_relaxed(
1133                task_id,
1134                session_id,
1135                project_root?,
1136                storage_dir?,
1137                preview_bytes,
1138            );
1139        };
1140        let _ = self.poll_task(&task);
1141        let mut snapshot = task.snapshot(preview_bytes);
1142        self.maybe_compress_snapshot(&task, &mut snapshot);
1143        Some(snapshot)
1144    }
1145
1146    fn status_relaxed_task(
1147        &self,
1148        task_id: &str,
1149        project_root: &Path,
1150        storage_dir: &Path,
1151    ) -> Option<Arc<BgTask>> {
1152        let canonical_project = canonicalized_path(project_root);
1153        match self.lookup_relaxed_task_from_db(task_id, project_root) {
1154            Some(Ok(Some(metadata))) => {
1155                if let Some(task) = self.task(task_id) {
1156                    let matches_project = task
1157                        .state
1158                        .lock()
1159                        .map(|state| {
1160                            state
1161                                .metadata
1162                                .project_root
1163                                .as_deref()
1164                                .map(canonicalized_path)
1165                                .as_deref()
1166                                == Some(canonical_project.as_path())
1167                        })
1168                        .unwrap_or(false);
1169                    return matches_project.then_some(task);
1170                }
1171                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1172                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1173                    return None;
1174                }
1175                return self.task(task_id);
1176            }
1177            Some(Ok(None)) => {
1178                crate::slog_info!(
1179                    "bash task relaxed DB miss for {}; falling back to disk",
1180                    task_id
1181                );
1182            }
1183            Some(Err(error)) => {
1184                crate::slog_warn!(
1185                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1186                    task_id,
1187                    error
1188                );
1189            }
1190            None => {
1191                crate::slog_info!(
1192                    "bash task relaxed DB unavailable for {}; falling back to disk",
1193                    task_id
1194                );
1195            }
1196        }
1197        let root = storage_dir.join("bash-tasks");
1198        let entries = fs::read_dir(&root).ok()?;
1199        for entry in entries.flatten() {
1200            let dir = entry.path();
1201            if !dir.is_dir() {
1202                continue;
1203            }
1204            let path = dir.join(format!("{task_id}.json"));
1205            if !path.exists() {
1206                continue;
1207            }
1208            let metadata = match read_task(&path) {
1209                Ok(metadata) => metadata,
1210                Err(error) => {
1211                    crate::slog_warn!(
1212                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1213                        path.display()
1214                    );
1215                    if let Err(quarantine_error) =
1216                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1217                    {
1218                        crate::slog_warn!(
1219                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1220                            path.display()
1221                        );
1222                    }
1223                    continue;
1224                }
1225            };
1226            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1227            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1228                continue;
1229            }
1230            if let Some(task) = self.task(task_id) {
1231                let matches_project = task
1232                    .state
1233                    .lock()
1234                    .map(|state| {
1235                        state
1236                            .metadata
1237                            .project_root
1238                            .as_deref()
1239                            .map(canonicalized_path)
1240                            .as_deref()
1241                            == Some(canonical_project.as_path())
1242                    })
1243                    .unwrap_or(false);
1244                return matches_project.then_some(task);
1245            }
1246            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1247            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1248                return None;
1249            }
1250            return self.task(task_id);
1251        }
1252        None
1253    }
1254
1255    fn lookup_relaxed_task_from_db(
1256        &self,
1257        task_id: &str,
1258        project_root: &Path,
1259    ) -> Option<Result<Option<PersistedTask>, String>> {
1260        let pool = self
1261            .inner
1262            .db_pool
1263            .read()
1264            .ok()
1265            .and_then(|slot| slot.clone())?;
1266        let harness = self
1267            .inner
1268            .db_harness
1269            .read()
1270            .ok()
1271            .and_then(|slot| slot.clone())?;
1272        let conn = match pool.lock() {
1273            Ok(conn) => conn,
1274            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1275        };
1276        let project_key = crate::search_index::project_cache_key(project_root);
1277        Some(
1278            crate::db::bash_tasks::find_bash_task_for_project(
1279                &conn,
1280                &harness,
1281                &project_key,
1282                task_id,
1283            )
1284            .map(|row| row.map(PersistedTask::from))
1285            .map_err(|error| error.to_string()),
1286        )
1287    }
1288
1289    pub(super) fn status_relaxed(
1290        &self,
1291        task_id: &str,
1292        _session_id: &str,
1293        project_root: &Path,
1294        storage_dir: &Path,
1295        preview_bytes: usize,
1296    ) -> Option<BgTaskSnapshot> {
1297        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1298        let _ = self.poll_task(&task);
1299        let mut snapshot = task.snapshot(preview_bytes);
1300        self.maybe_compress_snapshot(&task, &mut snapshot);
1301        Some(snapshot)
1302    }
1303
1304    pub fn kill_relaxed(
1305        &self,
1306        task_id: &str,
1307        project_root: &Path,
1308        storage_dir: &Path,
1309    ) -> Result<BgTaskSnapshot, String> {
1310        let task = self
1311            .status_relaxed_task(task_id, project_root, storage_dir)
1312            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1313        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1314    }
1315
1316    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1317        #[cfg(test)]
1318        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1319
1320        let mut deleted = 0usize;
1321
1322        let root = storage_dir.join("bash-tasks");
1323        if root.exists() {
1324            let session_dirs = fs::read_dir(&root).map_err(|e| {
1325                format!(
1326                    "failed to read background task root {}: {e}",
1327                    root.display()
1328                )
1329            })?;
1330            for session_entry in session_dirs.flatten() {
1331                let session_dir = session_entry.path();
1332                if !session_dir.is_dir() {
1333                    continue;
1334                }
1335                let task_entries = match fs::read_dir(&session_dir) {
1336                    Ok(entries) => entries,
1337                    Err(error) => {
1338                        crate::slog_warn!(
1339                            "failed to read background task session dir {}: {error}",
1340                            session_dir.display()
1341                        );
1342                        continue;
1343                    }
1344                };
1345                for task_entry in task_entries.flatten() {
1346                    let json_path = task_entry.path();
1347                    if json_path
1348                        .extension()
1349                        .and_then(|extension| extension.to_str())
1350                        != Some("json")
1351                    {
1352                        continue;
1353                    }
1354                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
1355                        continue;
1356                    }
1357                    let metadata = match read_task(&json_path) {
1358                        Ok(metadata) => metadata,
1359                        Err(error) => {
1360                            crate::slog_warn!(
1361                                "quarantining corrupt background task metadata {}: {error}",
1362                                json_path.display()
1363                            );
1364                            quarantine_task_json(
1365                                storage_dir,
1366                                &session_dir,
1367                                &json_path,
1368                                QuarantineKind::Corrupt,
1369                            )?;
1370                            continue;
1371                        }
1372                    };
1373                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1374                        continue;
1375                    }
1376                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1377                    match delete_task_bundle(&paths) {
1378                        Ok(()) => {
1379                            deleted += 1;
1380                            log::debug!(
1381                                "deleted persisted background task bundle {}",
1382                                metadata.task_id
1383                            );
1384                        }
1385                        Err(error) => {
1386                            crate::slog_warn!(
1387                                "failed to delete background task bundle {}: {error}",
1388                                metadata.task_id
1389                            );
1390                            continue;
1391                        }
1392                    }
1393                }
1394            }
1395        }
1396        gc_quarantine(storage_dir);
1397        Ok(deleted)
1398    }
1399
1400    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1401        let tasks = self
1402            .inner
1403            .tasks
1404            .lock()
1405            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1406            .unwrap_or_default();
1407        tasks
1408            .into_iter()
1409            .map(|task| {
1410                let _ = self.poll_task(&task);
1411                let mut snapshot = task.snapshot(preview_bytes);
1412                self.maybe_compress_snapshot(&task, &mut snapshot);
1413                snapshot
1414            })
1415            .collect()
1416    }
1417
1418    /// Compress `output_preview` in place when the task is in a terminal
1419    /// state. Live tail of running tasks stays raw so agents debugging
1420    /// long-running bash see exactly what the process emitted, not a
1421    /// heuristic-collapsed view. Per-task opt-out via the `compressed`
1422    /// field on `PersistedTask` short-circuits before the compress pipeline.
1423    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1424        if !snapshot.info.status.is_terminal() {
1425            return;
1426        }
1427        let (compressed_flag, mode) = task
1428            .state
1429            .lock()
1430            .map(|state| (state.metadata.compressed, state.metadata.mode.clone()))
1431            .unwrap_or((true, BgMode::Pipes));
1432        if mode == BgMode::Pty {
1433            return;
1434        }
1435        if !compressed_flag {
1436            return;
1437        }
1438        let raw = std::mem::take(&mut snapshot.output_preview);
1439        snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1440    }
1441
1442    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1443        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1444    }
1445
1446    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1447        let task = self
1448            .task_for_session(task_id, session_id)
1449            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1450        let mut state = task
1451            .state
1452            .lock()
1453            .map_err(|_| "background task lock poisoned".to_string())?;
1454        let updated = self
1455            .update_task_metadata(&task.paths, |metadata| {
1456                metadata.notify_on_completion = true;
1457                metadata.completion_delivered = false;
1458            })
1459            .map_err(|e| format!("failed to promote background task: {e}"))?;
1460        state.metadata = updated;
1461        if state.metadata.status.is_terminal() {
1462            state.buffer.enforce_terminal_cap();
1463            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1464        }
1465        Ok(true)
1466    }
1467
1468    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1469        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1470            .map(|_| ())
1471    }
1472
1473    pub fn cleanup_finished(&self, older_than: Duration) {
1474        let cutoff = Instant::now().checked_sub(older_than);
1475        let removable_paths: Vec<(String, TaskPaths)> =
1476            if let Ok(mut tasks) = self.inner.tasks.lock() {
1477                let removable = tasks
1478                    .iter()
1479                    .filter_map(|(task_id, task)| {
1480                        let delivered_terminal = task
1481                            .state
1482                            .lock()
1483                            .map(|state| {
1484                                state.metadata.status.is_terminal()
1485                                    && state.metadata.completion_delivered
1486                            })
1487                            .unwrap_or(false);
1488                        if !delivered_terminal {
1489                            return None;
1490                        }
1491
1492                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1493                        let expired = match (terminal_at, cutoff) {
1494                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1495                            (Some(_), None) => true,
1496                            (None, _) => false,
1497                        };
1498                        expired.then(|| task_id.clone())
1499                    })
1500                    .collect::<Vec<_>>();
1501
1502                removable
1503                    .into_iter()
1504                    .filter_map(|task_id| {
1505                        tasks
1506                            .remove(&task_id)
1507                            .map(|task| (task_id, task.paths.clone()))
1508                    })
1509                    .collect()
1510            } else {
1511                Vec::new()
1512            };
1513
1514        for (task_id, paths) in removable_paths {
1515            match delete_task_bundle(&paths) {
1516                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1517                Err(error) => crate::slog_warn!(
1518                    "failed to delete persisted background task bundle {task_id}: {error}"
1519                ),
1520            }
1521        }
1522    }
1523
1524    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1525        self.drain_completions_for_session(None)
1526    }
1527
1528    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1529        let completions = match self.inner.completions.lock() {
1530            Ok(completions) => completions,
1531            Err(_) => return Vec::new(),
1532        };
1533
1534        completions
1535            .iter()
1536            .filter(|completion| {
1537                session_id
1538                    .map(|session_id| completion.session_id == session_id)
1539                    .unwrap_or(true)
1540            })
1541            .cloned()
1542            .collect()
1543    }
1544
1545    pub fn ack_completions_for_session(
1546        &self,
1547        session_id: Option<&str>,
1548        task_ids: &[String],
1549    ) -> Vec<String> {
1550        if task_ids.is_empty() {
1551            return Vec::new();
1552        }
1553        let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1554        let mut completion_sessions = HashMap::new();
1555        if let Ok(mut completions) = self.inner.completions.lock() {
1556            completions.retain(|completion| {
1557                let session_matches = session_id
1558                    .map(|session_id| completion.session_id == session_id)
1559                    .unwrap_or(true);
1560                if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1561                    completion_sessions
1562                        .insert(completion.task_id.clone(), completion.session_id.clone());
1563                    false
1564                } else {
1565                    true
1566                }
1567            });
1568        }
1569
1570        let mut delivered = Vec::new();
1571        for task_id in task_ids {
1572            let task = if let Some(session_id) = session_id {
1573                self.task_for_session(task_id, session_id)
1574            } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1575                self.task_for_session(task_id, completion_session_id)
1576            } else {
1577                self.task(task_id)
1578            };
1579            if let Some(task) = task {
1580                if task.set_completion_delivered(true, self).is_ok() {
1581                    delivered.push(task_id.clone());
1582                }
1583            }
1584        }
1585
1586        delivered
1587    }
1588
1589    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1590        self.inner
1591            .completions
1592            .lock()
1593            .map(|completions| {
1594                completions
1595                    .iter()
1596                    .filter(|completion| completion.session_id == session_id)
1597                    .cloned()
1598                    .collect()
1599            })
1600            .unwrap_or_default()
1601    }
1602
1603    fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1604        let mut completions = self.inner.completions.lock().ok()?;
1605        let idx = completions
1606            .iter()
1607            .position(|completion| completion.task_id == task_id)?;
1608        completions.remove(idx)
1609    }
1610
1611    fn completion_snapshot_for_task(
1612        &self,
1613        task: &Arc<BgTask>,
1614        preview_bytes: usize,
1615    ) -> Option<BgCompletion> {
1616        let snapshot = task.snapshot(preview_bytes);
1617        if !snapshot.info.status.is_terminal() {
1618            return None;
1619        }
1620        let output_preview = if snapshot.info.mode == BgMode::Pty {
1621            String::new()
1622        } else {
1623            let compressed = task
1624                .state
1625                .lock()
1626                .map(|state| state.metadata.compressed)
1627                .unwrap_or(true);
1628            if compressed {
1629                self.compress_output(&snapshot.info.command, snapshot.output_preview)
1630            } else {
1631                snapshot.output_preview
1632            }
1633        };
1634        Some(BgCompletion {
1635            task_id: snapshot.info.task_id,
1636            session_id: task.session_id.clone(),
1637            status: snapshot.info.status,
1638            exit_code: snapshot.exit_code,
1639            command: snapshot.info.command,
1640            output_preview,
1641            output_truncated: snapshot.output_truncated,
1642            original_tokens: None,
1643            compressed_tokens: None,
1644            tokens_skipped: false,
1645        })
1646    }
1647
1648    pub fn detach(&self) {
1649        self.inner.shutdown.store(true, Ordering::SeqCst);
1650        if let Ok(mut tasks) = self.inner.tasks.lock() {
1651            for task in tasks.values() {
1652                if let Ok(mut state) = task.state.lock() {
1653                    match &mut state.runtime {
1654                        TaskRuntime::Piped(child) => *child = None,
1655                        TaskRuntime::Pty(runtime) => *runtime = None,
1656                    }
1657                    state.detached = true;
1658                }
1659            }
1660            tasks.clear();
1661        }
1662    }
1663
1664    pub fn shutdown(&self) {
1665        let tasks = self
1666            .inner
1667            .tasks
1668            .lock()
1669            .map(|tasks| {
1670                tasks
1671                    .values()
1672                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1673                    .collect::<Vec<_>>()
1674            })
1675            .unwrap_or_default();
1676        for (task_id, session_id) in tasks {
1677            let _ = self.kill(&task_id, &session_id);
1678        }
1679    }
1680
1681    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1682        if let Ok(state) = task.state.lock() {
1683            if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1684                // On Windows ConPTY, the reader may not observe EOF while the
1685                // master handle is still held in `PtyRuntime`. The waiter writes
1686                // the authoritative exit marker before setting `exit_observed`,
1687                // so once exit is observed we can finalize from that marker and
1688                // drop the runtime, which lets the reader finish. Waiting for
1689                // `reader_done && exit_observed` wedges completed PTY tasks on
1690                // Windows.
1691                if !pty.exit_observed.load(Ordering::SeqCst) {
1692                    return Ok(());
1693                }
1694            }
1695        }
1696        let marker = match read_exit_marker(&task.paths.exit) {
1697            Ok(Some(marker)) => marker,
1698            Ok(None) => return Ok(()),
1699            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1700        };
1701        self.finalize_from_marker(task, marker, None)
1702    }
1703
1704    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1705        let Ok(mut state) = task.state.lock() else {
1706            return;
1707        };
1708        match &mut state.runtime {
1709            TaskRuntime::Piped(child_slot) => {
1710                if let Some(child) = child_slot.as_mut() {
1711                    if matches!(child.try_wait(), Ok(Some(_))) {
1712                        *child_slot = None;
1713                        state.detached = true;
1714                        state.child_exit_observed = true;
1715                    }
1716                } else if state.detached {
1717                    let child_known_dead = state.child_exit_observed
1718                        || state
1719                            .metadata
1720                            .child_pid
1721                            .is_some_and(|pid| !is_process_alive(pid));
1722                    if child_known_dead {
1723                        self.fail_without_exit_marker_if_needed(task, &mut state);
1724                    }
1725                }
1726            }
1727            TaskRuntime::Pty(Some(pty)) => {
1728                if pty.exit_observed.load(Ordering::SeqCst) {
1729                    drop(state);
1730                    let _ = self.poll_task(task);
1731                }
1732            }
1733            TaskRuntime::Pty(None) => {}
1734        }
1735    }
1736
1737    fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1738        if state.metadata.status.is_terminal() {
1739            return;
1740        }
1741        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1742            return;
1743        }
1744        let watch_controlled = self.task_has_watch_control(&task.task_id);
1745        let updated = self.update_task_metadata(&task.paths, |metadata| {
1746            metadata.mark_terminal(
1747                BgTaskStatus::Failed,
1748                None,
1749                Some("process exited without exit marker".to_string()),
1750            );
1751            if watch_controlled {
1752                metadata.completion_delivered = true;
1753            }
1754        });
1755        if let Ok(metadata) = updated {
1756            state.pending_terminal_override = None;
1757            state.metadata = metadata;
1758            task.mark_terminal_now();
1759            state.buffer.enforce_terminal_cap();
1760            self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1761        }
1762    }
1763
1764    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1765        self.inner
1766            .tasks
1767            .lock()
1768            .map(|tasks| {
1769                tasks
1770                    .values()
1771                    .filter(|task| task.is_running())
1772                    .cloned()
1773                    .collect()
1774            })
1775            .unwrap_or_default()
1776    }
1777
1778    fn insert_rehydrated_task(
1779        &self,
1780        metadata: PersistedTask,
1781        paths: TaskPaths,
1782        detached: bool,
1783    ) -> Result<(), String> {
1784        let task_id = metadata.task_id.clone();
1785        let session_id = metadata.session_id.clone();
1786        let started = started_instant_from_unix_millis(metadata.started_at);
1787        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1788        let mode = metadata.mode.clone();
1789        let task = Arc::new(BgTask {
1790            task_id: task_id.clone(),
1791            session_id,
1792            paths: paths.clone(),
1793            started,
1794            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1795            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1796            state: Mutex::new(BgTaskState {
1797                metadata,
1798                runtime: if mode == BgMode::Pty {
1799                    TaskRuntime::Pty(None)
1800                } else {
1801                    TaskRuntime::Piped(None)
1802                },
1803                detached,
1804                // Replay path: we never observed the child handle's exit
1805                // in this process (the previous AFT process did, but its
1806                // observation didn't survive restart). Leave this false so
1807                // the second-pass reap falls through to the
1808                // `is_process_alive(child_pid)` probe rather than declaring
1809                // failure based on stale evidence.
1810                child_exit_observed: false,
1811                buffer: if mode == BgMode::Pty {
1812                    BgBuffer::pty(paths.pty.clone())
1813                } else {
1814                    BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1815                },
1816                pending_terminal_override: None,
1817            }),
1818        });
1819        self.inner
1820            .tasks
1821            .lock()
1822            .map_err(|_| "background task registry lock poisoned".to_string())?
1823            .insert(task_id, task);
1824        Ok(())
1825    }
1826
1827    fn kill_with_status(
1828        &self,
1829        task_id: &str,
1830        session_id: &str,
1831        terminal_status: BgTaskStatus,
1832    ) -> Result<BgTaskSnapshot, String> {
1833        let task = self
1834            .task_for_session(task_id, session_id)
1835            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1836
1837        {
1838            let mut state = task
1839                .state
1840                .lock()
1841                .map_err(|_| "background task lock poisoned".to_string())?;
1842            if state.metadata.status.is_terminal() {
1843                state.pending_terminal_override = None;
1844                return Ok(task.snapshot_locked(&state, 5 * 1024));
1845            }
1846
1847            if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1848                state.metadata =
1849                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1850                if self.task_has_watch_control(&task.task_id) {
1851                    state.metadata.completion_delivered = true;
1852                }
1853                state.pending_terminal_override = None;
1854                task.mark_terminal_now();
1855                match &mut state.runtime {
1856                    // Exit marker already present: the child finished on its
1857                    // own before this kill observed it. Reap it rather than
1858                    // dropping the handle so it doesn't become a zombie
1859                    // (issue #91). The active-kill branch below already
1860                    // `wait()`s after signaling, so this is the only kill
1861                    // path that needed the explicit reap.
1862                    TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
1863                    TaskRuntime::Pty(runtime) => *runtime = None,
1864                }
1865                state.detached = true;
1866                state.buffer.enforce_terminal_cap();
1867                self.persist_task(&task.paths, &state.metadata)
1868                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1869                self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1870                return Ok(task.snapshot_locked(&state, 5 * 1024));
1871            }
1872
1873            let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
1874            if !was_already_killing {
1875                state.metadata.status = BgTaskStatus::Killing;
1876                self.persist_task(&task.paths, &state.metadata)
1877                    .map_err(|e| format!("failed to persist killing state: {e}"))?;
1878            }
1879
1880            #[cfg(unix)]
1881            let pgid = state.metadata.pgid;
1882            #[cfg(windows)]
1883            let child_pid = state.metadata.child_pid;
1884            if !was_already_killing
1885                && state.metadata.mode == BgMode::Pty
1886                && terminal_status == BgTaskStatus::TimedOut
1887            {
1888                state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
1889            }
1890
1891            #[cfg(windows)]
1892            let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
1893
1894            match &mut state.runtime {
1895                TaskRuntime::Piped(child_slot) => {
1896                    #[cfg(unix)]
1897                    if let Some(pgid) = pgid {
1898                        terminate_pgid(pgid, child_slot.as_mut());
1899                    }
1900                    #[cfg(windows)]
1901                    if let Some(child) = child_slot.as_mut() {
1902                        super::process::terminate_process(child);
1903                    } else if let Some(pid) = child_pid {
1904                        terminate_pid(pid);
1905                    }
1906                    if let Some(child) = child_slot.as_mut() {
1907                        let _ = child.wait();
1908                    }
1909                    *child_slot = None;
1910                    state.detached = true;
1911
1912                    if !task.paths.exit.exists() {
1913                        write_kill_marker_if_absent(&task.paths.exit)
1914                            .map_err(|e| format!("failed to write kill marker: {e}"))?;
1915                    }
1916
1917                    let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1918                        Some(124)
1919                    } else {
1920                        None
1921                    };
1922                    state
1923                        .metadata
1924                        .mark_terminal(terminal_status, exit_code, None);
1925                    if self.task_has_watch_control(&task.task_id) {
1926                        state.metadata.completion_delivered = true;
1927                    }
1928                    state.pending_terminal_override = None;
1929                    task.mark_terminal_now();
1930                    self.persist_task(&task.paths, &state.metadata)
1931                        .map_err(|e| format!("failed to persist killed state: {e}"))?;
1932                    state.buffer.enforce_terminal_cap();
1933                    self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1934                }
1935                TaskRuntime::Pty(Some(pty)) => {
1936                    pty.was_killed.store(true, Ordering::SeqCst);
1937                    if let Err(error) = pty.killer.kill() {
1938                        crate::slog_warn!("[pty-kill] {task_id} ChildKiller::kill failed: {error}");
1939                    }
1940                    if let Some(pid) = pty.child_pid {
1941                        #[cfg(unix)]
1942                        terminate_pgid(pid as i32, None);
1943                        #[cfg(windows)]
1944                        terminate_pid(pid);
1945                    }
1946                    drop(pty.master.take());
1947
1948                    #[cfg(windows)]
1949                    {
1950                        let default_status = if terminal_status == BgTaskStatus::TimedOut {
1951                            BgTaskStatus::TimedOut
1952                        } else {
1953                            BgTaskStatus::Killed
1954                        };
1955                        pty_forced_terminal_status = Some(
1956                            state
1957                                .pending_terminal_override
1958                                .take()
1959                                .unwrap_or(default_status),
1960                        );
1961                    }
1962                }
1963                TaskRuntime::Pty(None) => {}
1964            }
1965
1966            #[cfg(windows)]
1967            if let Some(target_status) = pty_forced_terminal_status {
1968                if !task.paths.exit.exists() {
1969                    write_kill_marker_if_absent(&task.paths.exit)
1970                        .map_err(|e| format!("failed to write kill marker: {e}"))?;
1971                }
1972
1973                let exit_code = if target_status == BgTaskStatus::TimedOut {
1974                    Some(124)
1975                } else {
1976                    None
1977                };
1978                state.metadata.mark_terminal(target_status, exit_code, None);
1979                if self.task_has_watch_control(&task.task_id) {
1980                    state.metadata.completion_delivered = true;
1981                }
1982                state.pending_terminal_override = None;
1983                task.mark_terminal_now();
1984                if let TaskRuntime::Pty(runtime) = &mut state.runtime {
1985                    *runtime = None;
1986                }
1987                state.detached = true;
1988                self.persist_task(&task.paths, &state.metadata)
1989                    .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
1990                state.buffer.enforce_terminal_cap();
1991                self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1992            }
1993        }
1994
1995        Ok(task.snapshot(5 * 1024))
1996    }
1997
1998    fn finalize_from_marker(
1999        &self,
2000        task: &Arc<BgTask>,
2001        marker: ExitMarker,
2002        reason: Option<String>,
2003    ) -> Result<(), String> {
2004        let watch_controlled = self.task_has_watch_control(&task.task_id);
2005        let mut pty_reader_done = None;
2006        {
2007            let mut state = task
2008                .state
2009                .lock()
2010                .map_err(|_| "background task lock poisoned".to_string())?;
2011            if state.metadata.status.is_terminal() {
2012                state.pending_terminal_override = None;
2013                return Ok(());
2014            }
2015
2016            let pending_override = state.pending_terminal_override.take();
2017            let is_pty = state.metadata.mode == BgMode::Pty;
2018            let updated = self
2019                .update_task_metadata(&task.paths, |metadata| {
2020                    let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2021                        let mut metadata = metadata.clone();
2022                        let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2023                        let exit_code = if target_status == BgTaskStatus::TimedOut {
2024                            Some(124)
2025                        } else {
2026                            None
2027                        };
2028                        metadata.mark_terminal(target_status, exit_code, reason);
2029                        metadata
2030                    } else {
2031                        terminal_metadata_from_marker(metadata.clone(), marker, reason)
2032                    };
2033                    if watch_controlled {
2034                        new_metadata.completion_delivered = true;
2035                    }
2036                    *metadata = new_metadata;
2037                })
2038                .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2039            state.metadata = updated;
2040            task.mark_terminal_now();
2041            match &mut state.runtime {
2042                // Reap the exited direct child instead of dropping it, so it
2043                // does not linger as a `<defunct>` zombie (issue #91). The
2044                // wrapper writes the exit marker as its final act, so the
2045                // child is already exiting and `wait()` returns immediately.
2046                TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2047                TaskRuntime::Pty(runtime) => {
2048                    pty_reader_done = runtime
2049                        .as_ref()
2050                        .map(|runtime| Arc::clone(&runtime.reader_done));
2051                    *runtime = None;
2052                }
2053            }
2054            state.detached = true;
2055        }
2056
2057        if let Some(reader_done) = pty_reader_done {
2058            let deadline = Instant::now() + Duration::from_millis(200);
2059            while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2060                std::thread::sleep(Duration::from_millis(10));
2061            }
2062        }
2063
2064        // One final scan runs before terminal notification routing so bytes
2065        // printed immediately before exit can win over the exit safety net.
2066        self.scan_task_watch_output(task);
2067
2068        let mut state = task
2069            .state
2070            .lock()
2071            .map_err(|_| "background task lock poisoned".to_string())?;
2072        state.buffer.enforce_terminal_cap();
2073        self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
2074        Ok(())
2075    }
2076
2077    fn enqueue_completion_if_needed(
2078        &self,
2079        metadata: &PersistedTask,
2080        paths: Option<&TaskPaths>,
2081        emit_frame: bool,
2082    ) {
2083        if metadata.status.is_terminal() && !metadata.completion_delivered {
2084            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
2085        }
2086    }
2087
2088    fn enqueue_completion_locked(
2089        &self,
2090        metadata: &PersistedTask,
2091        buffer: Option<&BgBuffer>,
2092        emit_frame: bool,
2093    ) {
2094        self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
2095    }
2096
2097    fn enqueue_completion_from_parts(
2098        &self,
2099        metadata: &PersistedTask,
2100        buffer: Option<&BgBuffer>,
2101        paths: Option<&TaskPaths>,
2102        emit_frame: bool,
2103    ) {
2104        // Only the terminal-state guard prevents double-recording here. The
2105        // `completion_delivered` flag is NOT used to gate compression-event
2106        // recording, because `mark_terminal` flips `completion_delivered=true`
2107        // immediately for tasks with `notify_on_completion=false` (foreground
2108        // bash polled via `bash_status`, which is the common case). Pre-emptive
2109        // delivery flagging is correct for the push-frame queue (suppresses
2110        // duplicate user-visible notifications) but would silently skip the
2111        // database insert below. Compression event recording is idempotent at
2112        // the DB layer (unique on harness+session+task_id), so re-entry is
2113        // safe; the dedupe-by-queue check stays for the push frame side.
2114        if !metadata.status.is_terminal() {
2115            return;
2116        }
2117        // Read tail once at completion time and cache on the BgCompletion so
2118        // both the push-frame consumer (running session) and any later
2119        // `bash_drain_completions` poll (different session, restart) see the
2120        // same preview without racing against rotation.
2121        let (raw_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2122            (String::new(), false)
2123        } else {
2124            match buffer {
2125                Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
2126                None => paths
2127                    .map(|paths| read_tail_from_disk(metadata, paths, BG_COMPLETION_PREVIEW_BYTES))
2128                    .unwrap_or_else(|| (String::new(), false)),
2129            }
2130        };
2131        // Compress at completion time so push-frame consumers and later
2132        // `bash_drain_completions` poll-callers see the same compressed text.
2133        // Per-task `compressed: false` opts out; otherwise the compressor is
2134        // a no-op when `experimental.bash.compress=false`.
2135        let output_preview = if metadata.compressed {
2136            self.compress_output(&metadata.command, raw_preview)
2137        } else {
2138            raw_preview
2139        };
2140        let token_counts = self.completion_token_counts(metadata, buffer, paths);
2141        let completion = BgCompletion {
2142            task_id: metadata.task_id.clone(),
2143            session_id: metadata.session_id.clone(),
2144            status: metadata.status.clone(),
2145            exit_code: metadata.exit_code,
2146            command: metadata.command.clone(),
2147            output_preview,
2148            output_truncated,
2149            original_tokens: token_counts.original_tokens,
2150            compressed_tokens: token_counts.compressed_tokens,
2151            tokens_skipped: token_counts.tokens_skipped,
2152        };
2153
2154        // Record the compression event BEFORE the push-frame dedupe. Event
2155        // recording has its own idempotency at the DB layer (unique key on
2156        // harness+session+task_id), so it's safe to attempt for every
2157        // terminal-state finalize. Critically, this path runs even when
2158        // `completion_delivered=true` was pre-set by `mark_terminal` for
2159        // foreground bash (`notify_on_completion=false`) — which is the common
2160        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
2161        // after the dedupe guard and never fired for foreground tasks, which
2162        // meant compression accounting was effectively dead for >99% of
2163        // real-world bash usage.
2164        self.record_compression_event_if_applicable(metadata, &token_counts);
2165
2166        let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2167        if watch_controlled {
2168            if emit_frame && !watch_matched {
2169                self.emit_bash_watch_exit(&completion);
2170            }
2171            self.clear_task_watch_state(&metadata.task_id);
2172            return;
2173        }
2174
2175        // Push-frame queue is gated on `completion_delivered` so foreground
2176        // bash with `notify_on_completion=false` does not leak a user-visible
2177        // completion notification. `mark_terminal` pre-sets
2178        // `completion_delivered=true` for those tasks; honoring it here keeps
2179        // the suppression invariant the test
2180        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
2181        // asserts. The compression-event recording above intentionally runs
2182        // before this gate so foreground bash still contributes to the
2183        // session/project aggregates.
2184        if metadata.completion_delivered {
2185            return;
2186        }
2187
2188        // Push-frame queue dedupe stays per-task to prevent duplicate
2189        // user-visible completion notifications.
2190        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2191            if completions
2192                .iter()
2193                .any(|existing| existing.task_id == metadata.task_id)
2194            {
2195                false
2196            } else {
2197                completions.push_back(completion.clone());
2198                true
2199            }
2200        } else {
2201            false
2202        };
2203
2204        if pushed && emit_frame {
2205            self.emit_bash_completed(completion);
2206        }
2207    }
2208
2209    fn record_compression_event_if_applicable(
2210        &self,
2211        metadata: &PersistedTask,
2212        token_counts: &CompletionTokenCounts,
2213    ) {
2214        if metadata.mode == BgMode::Pty {
2215            return;
2216        }
2217
2218        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2219            token_counts.original_tokens,
2220            token_counts.compressed_tokens,
2221            token_counts.original_bytes,
2222            token_counts.compressed_bytes,
2223        ) {
2224            (
2225                Some(original_tokens),
2226                Some(compressed_tokens),
2227                Some(original_bytes),
2228                Some(compressed_bytes),
2229            ) => (
2230                original_tokens,
2231                compressed_tokens,
2232                original_bytes,
2233                compressed_bytes,
2234            ),
2235            _ => {
2236                crate::slog_warn!(
2237                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2238                    metadata.task_id
2239                );
2240                return;
2241            }
2242        };
2243
2244        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2245        let Some(pool) = pool else {
2246            crate::slog_warn!(
2247                "compression event skipped for {}: db_pool not initialized — was configure run?",
2248                metadata.task_id
2249            );
2250            return;
2251        };
2252        let harness = self
2253            .inner
2254            .db_harness
2255            .read()
2256            .ok()
2257            .and_then(|slot| slot.clone());
2258        let Some(harness) = harness else {
2259            crate::slog_warn!(
2260                "compression event insert skipped for {}: harness not configured",
2261                metadata.task_id
2262            );
2263            return;
2264        };
2265
2266        let project_root = metadata
2267            .project_root
2268            .as_deref()
2269            .unwrap_or(&metadata.workdir);
2270        let project_key = crate::search_index::project_cache_key(project_root);
2271        let row = crate::db::compression_events::CompressionEventRow {
2272            harness: &harness,
2273            session_id: Some(&metadata.session_id),
2274            project_key: &project_key,
2275            tool: "bash",
2276            task_id: Some(&metadata.task_id),
2277            command: Some(&metadata.command),
2278            compressor: if metadata.compressed {
2279                "registry"
2280            } else {
2281                "none"
2282            },
2283            original_bytes,
2284            compressed_bytes,
2285            original_tokens,
2286            compressed_tokens,
2287            created_at: unix_millis() as i64,
2288        };
2289
2290        let conn = match pool.lock() {
2291            Ok(conn) => conn,
2292            Err(_) => {
2293                crate::slog_warn!(
2294                    "compression event insert failed for {}: db mutex poisoned",
2295                    metadata.task_id
2296                );
2297                return;
2298            }
2299        };
2300        match crate::db::compression_events::insert_compression_event(&conn, &row) {
2301            Ok(_) => {
2302                // DEBUG-level: each foreground bash call records one of these,
2303                // which clutters info-level logs without adding diagnostic value.
2304                // Aggregate totals are visible via the status RPC / TUI sidebar.
2305                crate::slog_debug!(
2306                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2307                    metadata.task_id,
2308                    project_key,
2309                    metadata.session_id,
2310                    original_tokens,
2311                    compressed_tokens
2312                );
2313            }
2314            Err(error) => {
2315                crate::slog_warn!(
2316                    "compression event insert failed for {}: {}",
2317                    metadata.task_id,
2318                    error
2319                );
2320            }
2321        }
2322    }
2323
2324    fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2325        let Ok(progress_sender) = self
2326            .inner
2327            .progress_sender
2328            .lock()
2329            .map(|sender| sender.clone())
2330        else {
2331            return;
2332        };
2333        if let Some(sender) = progress_sender.as_ref() {
2334            sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2335                pattern_match.task_id,
2336                session_id.to_string(),
2337                pattern_match.watch_id,
2338                pattern_match.match_text,
2339                pattern_match.match_offset,
2340                pattern_match.context,
2341                pattern_match.once,
2342            )));
2343        }
2344    }
2345
2346    fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2347        let Ok(progress_sender) = self
2348            .inner
2349            .progress_sender
2350            .lock()
2351            .map(|sender| sender.clone())
2352        else {
2353            return;
2354        };
2355        let Some(sender) = progress_sender.as_ref() else {
2356            return;
2357        };
2358        let status = completion_status_text(&completion.status, completion.exit_code);
2359        let preview = completion.output_preview.trim_end();
2360        let context = if preview.is_empty() {
2361            format!("task {} exited ({status})", completion.task_id)
2362        } else {
2363            format!(
2364                "task {} exited ({status})
2365{preview}",
2366                completion.task_id
2367            )
2368        };
2369        sender(PushFrame::BashPatternMatch(
2370            BashPatternMatchFrame::task_exit(
2371                completion.task_id.clone(),
2372                completion.session_id.clone(),
2373                format!("exited ({status})"),
2374                context,
2375            ),
2376        ));
2377    }
2378
2379    fn emit_bash_completed(&self, completion: BgCompletion) {
2380        let Ok(progress_sender) = self
2381            .inner
2382            .progress_sender
2383            .lock()
2384            .map(|sender| sender.clone())
2385        else {
2386            return;
2387        };
2388        let Some(sender) = progress_sender.as_ref() else {
2389            return;
2390        };
2391        // Clone the callback out of the registry mutex before writing to stdout;
2392        // otherwise a blocked push-frame write could pin the mutex and starve
2393        // unrelated progress-sender updates.
2394        // Bg task transitions are discovered by the watchdog thread, so the
2395        // sender is shared behind a Mutex. It still uses the same stdout writer
2396        // closure as foreground progress frames, preserving the existing lock/
2397        // flush behavior in main.rs.
2398        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2399            completion.task_id,
2400            completion.session_id,
2401            completion.status,
2402            completion.exit_code,
2403            completion.command,
2404            completion.output_preview,
2405            completion.output_truncated,
2406            completion.original_tokens,
2407            completion.compressed_tokens,
2408            completion.tokens_skipped,
2409        )));
2410    }
2411
2412    fn completion_token_counts(
2413        &self,
2414        metadata: &PersistedTask,
2415        buffer: Option<&BgBuffer>,
2416        paths: Option<&TaskPaths>,
2417    ) -> CompletionTokenCounts {
2418        if metadata.mode == BgMode::Pty {
2419            return CompletionTokenCounts::skipped();
2420        }
2421
2422        let raw = match buffer {
2423            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2424            None => paths
2425                .map(|paths| {
2426                    read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2427                })
2428                .unwrap_or(TokenCountInput::Skipped),
2429        };
2430
2431        let TokenCountInput::Text(raw_output) = raw else {
2432            return CompletionTokenCounts::skipped();
2433        };
2434
2435        let original_tokens = token_count_u32(&raw_output);
2436        let original_bytes = raw_output.len() as i64;
2437        let compressed_output = if metadata.compressed {
2438            self.compress_output(&metadata.command, raw_output)
2439        } else {
2440            raw_output
2441        };
2442        let compressed_tokens = token_count_u32(&compressed_output);
2443        let compressed_bytes = compressed_output.len() as i64;
2444        CompletionTokenCounts {
2445            original_tokens: Some(original_tokens),
2446            compressed_tokens: Some(compressed_tokens),
2447            original_bytes: Some(original_bytes),
2448            compressed_bytes: Some(compressed_bytes),
2449            tokens_skipped: false,
2450        }
2451    }
2452
2453    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2454        if !self
2455            .inner
2456            .long_running_reminder_enabled
2457            .load(Ordering::SeqCst)
2458        {
2459            return;
2460        }
2461        let interval_ms = self
2462            .inner
2463            .long_running_reminder_interval_ms
2464            .load(Ordering::SeqCst);
2465        if interval_ms == 0 {
2466            return;
2467        }
2468        let interval = Duration::from_millis(interval_ms);
2469        let now = Instant::now();
2470        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2471            return;
2472        };
2473        let since = last_reminder_at.unwrap_or(task.started);
2474        if now.duration_since(since) < interval {
2475            return;
2476        }
2477        let command = task
2478            .state
2479            .lock()
2480            .map(|state| state.metadata.command.clone())
2481            .unwrap_or_default();
2482        *last_reminder_at = Some(now);
2483        self.emit_bash_long_running(BashLongRunningFrame::new(
2484            task.task_id.clone(),
2485            task.session_id.clone(),
2486            command,
2487            task.started.elapsed().as_millis() as u64,
2488        ));
2489    }
2490
2491    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2492        let Ok(progress_sender) = self
2493            .inner
2494            .progress_sender
2495            .lock()
2496            .map(|sender| sender.clone())
2497        else {
2498            return;
2499        };
2500        if let Some(sender) = progress_sender.as_ref() {
2501            sender(PushFrame::BashLongRunning(frame));
2502        }
2503    }
2504
2505    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2506        self.inner
2507            .tasks
2508            .lock()
2509            .ok()
2510            .and_then(|tasks| tasks.get(task_id).cloned())
2511    }
2512
2513    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2514        self.task(task_id)
2515            .filter(|task| task.session_id == session_id)
2516    }
2517
2518    fn running_count(&self) -> usize {
2519        self.inner
2520            .tasks
2521            .lock()
2522            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2523            .unwrap_or(0)
2524    }
2525
2526    fn start_watchdog(&self) {
2527        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2528            super::watchdog::start(self.clone());
2529        }
2530    }
2531
2532    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2533        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2534    }
2535
2536    #[cfg(test)]
2537    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2538        self.task_for_session(task_id, session_id)
2539            .map(|task| task.paths.json.clone())
2540    }
2541
2542    #[cfg(test)]
2543    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2544        self.task_for_session(task_id, session_id)
2545            .map(|task| task.paths.exit.clone())
2546    }
2547
2548    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
2549    fn generate_unique_task_id(&self) -> Result<String, String> {
2550        for _ in 0..32 {
2551            let candidate = random_slug();
2552            let tasks = self
2553                .inner
2554                .tasks
2555                .lock()
2556                .map_err(|_| "background task registry lock poisoned".to_string())?;
2557            if tasks.contains_key(&candidate) {
2558                continue;
2559            }
2560            let completions = self
2561                .inner
2562                .completions
2563                .lock()
2564                .map_err(|_| "background completions lock poisoned".to_string())?;
2565            if completions
2566                .iter()
2567                .any(|completion| completion.task_id == candidate)
2568            {
2569                continue;
2570            }
2571            return Ok(candidate);
2572        }
2573        Err("failed to allocate unique background task id after 32 attempts".to_string())
2574    }
2575}
2576
2577struct CompletionTokenCounts {
2578    original_tokens: Option<u32>,
2579    compressed_tokens: Option<u32>,
2580    original_bytes: Option<i64>,
2581    compressed_bytes: Option<i64>,
2582    tokens_skipped: bool,
2583}
2584
2585impl CompletionTokenCounts {
2586    fn skipped() -> Self {
2587        Self {
2588            original_tokens: None,
2589            compressed_tokens: None,
2590            original_bytes: None,
2591            compressed_bytes: None,
2592            tokens_skipped: true,
2593        }
2594    }
2595}
2596
2597fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
2598    match status {
2599        BgTaskStatus::TimedOut => "timed out".to_string(),
2600        BgTaskStatus::Killed => "killed".to_string(),
2601        _ => exit_code
2602            .map(|code| format!("exit {code}"))
2603            .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
2604    }
2605}
2606
2607fn token_count_u32(text: &str) -> u32 {
2608    aft_tokenizer::count_tokens(text)
2609        .try_into()
2610        .unwrap_or(u32::MAX)
2611}
2612
2613impl Default for BgTaskRegistry {
2614    fn default() -> Self {
2615        Self::new(Arc::new(Mutex::new(None)))
2616    }
2617}
2618
2619fn modified_within(path: &Path, grace: Duration) -> bool {
2620    fs::metadata(path)
2621        .and_then(|metadata| metadata.modified())
2622        .ok()
2623        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
2624        .map(|age| age < grace)
2625        .unwrap_or(false)
2626}
2627
2628fn canonicalized_path(path: &Path) -> PathBuf {
2629    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
2630}
2631
2632fn started_instant_from_unix_millis(started_at: u64) -> Instant {
2633    let now_ms = SystemTime::now()
2634        .duration_since(UNIX_EPOCH)
2635        .ok()
2636        .map(|duration| duration.as_millis() as u64)
2637        .unwrap_or(started_at);
2638    let elapsed_ms = now_ms.saturating_sub(started_at);
2639    Instant::now()
2640        .checked_sub(Duration::from_millis(elapsed_ms))
2641        .unwrap_or_else(Instant::now)
2642}
2643
2644fn gc_quarantine(storage_dir: &Path) {
2645    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
2646    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
2647        return;
2648    };
2649    for session_entry in session_dirs.flatten() {
2650        let session_quarantine_dir = session_entry.path();
2651        if !session_quarantine_dir.is_dir() {
2652            continue;
2653        }
2654        let entries = match fs::read_dir(&session_quarantine_dir) {
2655            Ok(entries) => entries,
2656            Err(error) => {
2657                crate::slog_warn!(
2658                    "failed to read background task quarantine dir {}: {error}",
2659                    session_quarantine_dir.display()
2660                );
2661                continue;
2662            }
2663        };
2664        for entry in entries.flatten() {
2665            let path = entry.path();
2666            if modified_within(&path, QUARANTINE_GC_GRACE) {
2667                continue;
2668            }
2669            let result = if path.is_dir() {
2670                fs::remove_dir_all(&path)
2671            } else {
2672                fs::remove_file(&path)
2673            };
2674            match result {
2675                Ok(()) => log::debug!(
2676                    "deleted old background task quarantine entry {}",
2677                    path.display()
2678                ),
2679                Err(error) => crate::slog_warn!(
2680                    "failed to delete old background task quarantine entry {}: {error}",
2681                    path.display()
2682                ),
2683            }
2684        }
2685        let _ = fs::remove_dir(&session_quarantine_dir);
2686    }
2687    let _ = fs::remove_dir(&quarantine_root);
2688}
2689
2690enum QuarantineKind {
2691    Corrupt,
2692    Invalid,
2693}
2694
2695fn quarantine_task_json(
2696    storage_dir: &Path,
2697    session_dir: &Path,
2698    json_path: &Path,
2699    kind: QuarantineKind,
2700) -> Result<(), String> {
2701    let session_hash = session_dir
2702        .file_name()
2703        .and_then(|name| name.to_str())
2704        .ok_or_else(|| {
2705            format!(
2706                "invalid background task session dir: {}",
2707                session_dir.display()
2708            )
2709        })?;
2710    let task_name = json_path
2711        .file_name()
2712        .and_then(|name| name.to_str())
2713        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2714    let unix_ts = SystemTime::now()
2715        .duration_since(UNIX_EPOCH)
2716        .map(|duration| duration.as_secs())
2717        .unwrap_or(0);
2718    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2719    fs::create_dir_all(&quarantine_dir).map_err(|e| {
2720        format!(
2721            "failed to create background task quarantine dir {}: {e}",
2722            quarantine_dir.display()
2723        )
2724    })?;
2725    let target_name = quarantine_name(task_name, unix_ts, &kind);
2726    let target = quarantine_dir.join(target_name);
2727    fs::rename(json_path, &target).map_err(|e| {
2728        format!(
2729            "failed to quarantine background task metadata {} to {}: {e}",
2730            json_path.display(),
2731            target.display()
2732        )
2733    })?;
2734
2735    for sibling in task_sibling_paths(json_path) {
2736        if !sibling.exists() {
2737            continue;
2738        }
2739        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2740            crate::slog_warn!(
2741                "skipping background task sibling with invalid name during quarantine: {}",
2742                sibling.display()
2743            );
2744            continue;
2745        };
2746        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2747        if let Err(error) = fs::rename(&sibling, &sibling_target) {
2748            crate::slog_warn!(
2749                "failed to quarantine background task sibling {} to {}: {error}",
2750                sibling.display(),
2751                sibling_target.display()
2752            );
2753        }
2754    }
2755
2756    let _ = fs::remove_dir(session_dir);
2757    Ok(())
2758}
2759
2760fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2761    match kind {
2762        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2763        QuarantineKind::Invalid => {
2764            let path = Path::new(file_name);
2765            let stem = path.file_stem().and_then(|stem| stem.to_str());
2766            let extension = path.extension().and_then(|extension| extension.to_str());
2767            match (stem, extension) {
2768                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2769                _ => format!("{file_name}.invalid.{unix_ts}"),
2770            }
2771        }
2772    }
2773}
2774
2775fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2776    let Some(parent) = json_path.parent() else {
2777        return Vec::new();
2778    };
2779    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2780        return Vec::new();
2781    };
2782    ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
2783        .into_iter()
2784        .map(|extension| parent.join(format!("{stem}.{extension}")))
2785        .collect()
2786}
2787
2788fn read_tail_from_disk(
2789    metadata: &PersistedTask,
2790    paths: &TaskPaths,
2791    max_bytes: usize,
2792) -> (String, bool) {
2793    if metadata.mode == BgMode::Pty {
2794        return read_file_tail_capped(&paths.pty, max_bytes)
2795            .map(|bytes| {
2796                let truncated = fs::metadata(&paths.pty)
2797                    .map(|metadata| metadata.len() > max_bytes as u64)
2798                    .unwrap_or(false);
2799                (String::from_utf8_lossy(&bytes).into_owned(), truncated)
2800            })
2801            .unwrap_or_else(|_| (String::new(), false));
2802    }
2803    let stdout = fs::read(&paths.stdout).unwrap_or_default();
2804    let stderr = fs::read(&paths.stderr).unwrap_or_default();
2805    let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2806    bytes.extend_from_slice(&stdout);
2807    bytes.extend_from_slice(&stderr);
2808    if bytes.len() <= max_bytes {
2809        return (String::from_utf8_lossy(&bytes).into_owned(), false);
2810    }
2811    let start = bytes.len().saturating_sub(max_bytes);
2812    (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2813}
2814
2815fn read_for_token_count_from_disk(
2816    metadata: &PersistedTask,
2817    paths: &TaskPaths,
2818    max_bytes_per_stream: usize,
2819) -> TokenCountInput {
2820    if metadata.mode == BgMode::Pty {
2821        return TokenCountInput::Skipped;
2822    }
2823    // Read up to `max_bytes_per_stream` bytes per stream rather than
2824    // refusing to tokenize anything when the file exceeds the cap.
2825    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
2826    // (see comment there) — large outputs are exactly the tasks that
2827    // benefit most from compression accounting, so silent-skipping
2828    // them defeats the purpose of token tracking.
2829    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2830    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2831    match (stdout, stderr) {
2832        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2833            String::from_utf8_lossy(&stdout).as_ref(),
2834            String::from_utf8_lossy(&stderr).as_ref(),
2835        )),
2836        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2837            String::from_utf8_lossy(&stdout).as_ref(),
2838            "",
2839        )),
2840        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2841            "",
2842            String::from_utf8_lossy(&stderr).as_ref(),
2843        )),
2844        (Err(_), Err(_)) => TokenCountInput::Skipped,
2845    }
2846}
2847
2848/// Read at most `max_bytes` bytes from the END of `path`. Used for
2849/// tokenization where the most recent output is more representative than
2850/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
2851/// opened (genuinely missing or permissions error).
2852fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2853    use std::io::{Read, Seek, SeekFrom};
2854    let mut file = std::fs::File::open(path)?;
2855    let len = file.metadata()?.len();
2856    let read_len = len.min(max_bytes as u64);
2857    if read_len > 0 && len > max_bytes as u64 {
2858        file.seek(SeekFrom::End(-(read_len as i64)))?;
2859    }
2860    let mut bytes = Vec::with_capacity(read_len as usize);
2861    file.read_to_end(&mut bytes)?;
2862    Ok(bytes)
2863}
2864
2865impl BgTask {
2866    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2867        let state = self
2868            .state
2869            .lock()
2870            .unwrap_or_else(|poison| poison.into_inner());
2871        self.snapshot_locked(&state, preview_bytes)
2872    }
2873
2874    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2875        let metadata = &state.metadata;
2876        let duration_ms = metadata.duration_ms.or_else(|| {
2877            metadata
2878                .status
2879                .is_terminal()
2880                .then(|| self.started.elapsed().as_millis() as u64)
2881        });
2882        let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2883            (String::new(), false)
2884        } else {
2885            state.buffer.read_tail(preview_bytes)
2886        };
2887        BgTaskSnapshot {
2888            info: BgTaskInfo {
2889                task_id: self.task_id.clone(),
2890                status: metadata.status.clone(),
2891                command: metadata.command.clone(),
2892                mode: metadata.mode.clone(),
2893                started_at: metadata.started_at,
2894                duration_ms,
2895            },
2896            exit_code: metadata.exit_code,
2897            child_pid: metadata.child_pid,
2898            workdir: metadata.workdir.display().to_string(),
2899            output_preview,
2900            output_truncated,
2901            output_path: state
2902                .buffer
2903                .output_path()
2904                .map(|path| path.display().to_string()),
2905            stderr_path: state
2906                .buffer
2907                .stderr_path()
2908                .map(|path| path.display().to_string()),
2909            pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
2910            pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
2911        }
2912    }
2913
2914    pub(crate) fn is_running(&self) -> bool {
2915        self.state
2916            .lock()
2917            .map(|state| {
2918                state.metadata.status == BgTaskStatus::Running
2919                    || (state.metadata.mode == BgMode::Pty
2920                        && state.metadata.status == BgTaskStatus::Killing)
2921            })
2922            .unwrap_or(false)
2923    }
2924
2925    fn is_terminal(&self) -> bool {
2926        self.state
2927            .lock()
2928            .map(|state| state.metadata.status.is_terminal())
2929            .unwrap_or(false)
2930    }
2931
2932    fn mark_terminal_now(&self) {
2933        if let Ok(mut terminal_at) = self.terminal_at.lock() {
2934            if terminal_at.is_none() {
2935                *terminal_at = Some(Instant::now());
2936            }
2937        }
2938    }
2939
2940    fn set_completion_delivered(
2941        &self,
2942        delivered: bool,
2943        registry: &BgTaskRegistry,
2944    ) -> Result<(), String> {
2945        let mut state = self
2946            .state
2947            .lock()
2948            .map_err(|_| "background task lock poisoned".to_string())?;
2949        let updated = registry
2950            .update_task_metadata(&self.paths, |metadata| {
2951                metadata.completion_delivered = delivered;
2952            })
2953            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2954        state.metadata = updated;
2955        Ok(())
2956    }
2957}
2958
2959/// Reap an exited direct child handle, then clear the slot.
2960///
2961/// Dropping a [`std::process::Child`] does NOT `wait()` on the underlying OS
2962/// process. On Unix a finished-but-unreaped child lingers as a `<defunct>`
2963/// zombie until the AFT process itself exits (issue #91: `[mv] <defunct>`).
2964/// The terminal-transition paths that learn of completion from the
2965/// exit-marker file — rather than from [`BgTaskRegistry::reap_child`]'s
2966/// `try_wait()` — must therefore reap the handle explicitly instead of just
2967/// nulling it.
2968///
2969/// The exit marker is written by the wrapper's final statement (an atomic
2970/// `mv` rename), so by the time we observe the marker the direct child has
2971/// finished its work and is exiting; `wait()` returns essentially
2972/// immediately. We attempt a non-blocking `try_wait()` first so the common
2973/// case never blocks at all, falling back to a (bounded) `wait()` only to
2974/// cover the microsecond window between the rename and process teardown.
2975///
2976/// Callers hold the task state mutex, so this is serialized against
2977/// `reap_child` — there is no double-`wait()` hazard: whichever path acquires
2978/// the lock first reaps and clears the slot, and the other observes `None`.
2979#[cfg(unix)]
2980fn reap_piped_child(child_slot: &mut Option<Child>) {
2981    if let Some(mut child) = child_slot.take() {
2982        if matches!(child.try_wait(), Ok(None)) {
2983            let _ = child.wait();
2984        }
2985    }
2986}
2987
2988/// Windows has no zombie/`<defunct>` concept: dropping the [`Child`] closes
2989/// the process handle, which is the correct release. Preserve the historical
2990/// behavior of simply clearing the slot so the documented Windows PID-recycle
2991/// handling in `reap_child` is unaffected.
2992#[cfg(windows)]
2993fn reap_piped_child(child_slot: &mut Option<Child>) {
2994    *child_slot = None;
2995}
2996
2997fn terminal_metadata_from_marker(
2998    mut metadata: PersistedTask,
2999    marker: ExitMarker,
3000    reason: Option<String>,
3001) -> PersistedTask {
3002    match marker {
3003        ExitMarker::Code(code) => {
3004            let status = if code == 0 {
3005                BgTaskStatus::Completed
3006            } else {
3007                BgTaskStatus::Failed
3008            };
3009            metadata.mark_terminal(status, Some(code), reason);
3010        }
3011        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3012    }
3013    metadata
3014}
3015
3016#[cfg(unix)]
3017fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
3018    let shell = resolve_posix_shell();
3019    let mut cmd = Command::new(&shell);
3020    cmd.arg("-c")
3021        .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
3022        .arg(&shell)
3023        .arg(command)
3024        .arg(exit_path);
3025    unsafe {
3026        cmd.pre_exec(|| {
3027            if libc::setsid() == -1 {
3028                return Err(std::io::Error::last_os_error());
3029            }
3030            Ok(())
3031        });
3032    }
3033    cmd
3034}
3035
3036#[cfg(unix)]
3037fn resolve_posix_shell() -> PathBuf {
3038    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3039    POSIX_SHELL
3040        .get_or_init(|| {
3041            std::env::var_os("BASH")
3042                .filter(|value| !value.is_empty())
3043                .map(PathBuf::from)
3044                .filter(|path| path.exists())
3045                .or_else(|| which::which("bash").ok())
3046                .or_else(|| which::which("zsh").ok())
3047                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3048        })
3049        .clone()
3050}
3051
3052#[cfg(windows)]
3053fn detached_shell_command_for(
3054    shell: crate::windows_shell::WindowsShell,
3055    command: &str,
3056    exit_path: &Path,
3057    paths: &TaskPaths,
3058    creation_flags: u32,
3059) -> Result<Command, String> {
3060    use crate::windows_shell::WindowsShell;
3061    // Write the wrapper to a temp file alongside the other task files,
3062    // then invoke the shell with the file path as a single clean
3063    // argument. This sidesteps the entire Windows command-line quoting
3064    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
3065    // parser all interacting with embedded quotes in the wrapper).
3066    //
3067    // Path arguments don't need quoting in the same problematic way
3068    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
3069    // contains no characters that need shell escaping; (2) the wrapper
3070    // body's internal quotes never reach the shell command line — the
3071    // shell reads them from disk by file syntax rules, not command-line
3072    // parser rules.
3073    let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3074    let wrapper_ext = match shell {
3075        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3076        WindowsShell::Cmd => "bat",
3077        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
3078        // so the file extension is purely cosmetic; `.sh` matches what an
3079        // operator would expect when grepping the spill directory.
3080        WindowsShell::Posix(_) => "sh",
3081    };
3082    let wrapper_path = paths.dir.join(format!(
3083        "{}.{}",
3084        paths
3085            .json
3086            .file_stem()
3087            .and_then(|s| s.to_str())
3088            .unwrap_or("wrapper"),
3089        wrapper_ext
3090    ));
3091    fs::write(&wrapper_path, wrapper_body)
3092        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3093
3094    let mut cmd = Command::new(shell.binary().as_ref());
3095    match shell {
3096        WindowsShell::Pwsh | WindowsShell::Powershell => {
3097            // -File runs the script with no quoting issues. `-NoLogo`,
3098            // `-NoProfile`, etc. apply to the host before the file runs.
3099            cmd.args([
3100                "-NoLogo",
3101                "-NoProfile",
3102                "-NonInteractive",
3103                "-ExecutionPolicy",
3104                "Bypass",
3105                "-File",
3106            ]);
3107            cmd.arg(&wrapper_path);
3108        }
3109        WindowsShell::Cmd => {
3110            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
3111            // file via /C is well-defined; the file's contents are
3112            // read line-by-line by cmd's batch processor, NOT
3113            // re-interpreted by the /C parser. This avoids the
3114            // "filename syntax incorrect" errors that came from
3115            // having complex compound commands on the cmd line.
3116            cmd.args(["/D", "/C"]);
3117            cmd.arg(&wrapper_path);
3118        }
3119        WindowsShell::Posix(_) => {
3120            // git-bash and other POSIX shells run the wrapper script with
3121            // `<binary> <wrapper-path>` (the wrapper is just a shell
3122            // script). No special flags needed — the `trap` and atomic
3123            // exit-marker rename in `wrapper_script` are POSIX-standard.
3124            cmd.arg(&wrapper_path);
3125        }
3126    }
3127
3128    // Win32 process creation flags. Caller selects whether to include
3129    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
3130    // for the breakaway-fallback strategy.
3131    cmd.creation_flags(creation_flags);
3132    Ok(cmd)
3133}
3134
3135/// Spawn a detached background bash child process.
3136///
3137/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
3138/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
3139/// cmd.exe) and retries with the next candidate when the previous one
3140/// fails to spawn with `NotFound` — the same runtime safety net the
3141/// foreground bash path has, so issue #27 callers landing on cmd.exe
3142/// fallback can also use background bash. The wrapper script is
3143/// regenerated per attempt because PowerShell wrappers embed the shell
3144/// binary by name; the stdout/stderr capture handles are also reopened
3145/// per attempt because `Command::spawn()` consumes them.
3146///
3147/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
3148/// return immediately without retry — they indicate a problem with the
3149/// resolved shell that retrying with a different shell won't fix.
3150fn spawn_detached_child(
3151    command: &str,
3152    paths: &TaskPaths,
3153    workdir: &Path,
3154    env: &HashMap<String, String>,
3155) -> Result<std::process::Child, String> {
3156    #[cfg(not(windows))]
3157    {
3158        let stdout = create_capture_file(&paths.stdout)
3159            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3160        let stderr = create_capture_file(&paths.stderr)
3161            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3162        detached_shell_command(command, &paths.exit)
3163            .current_dir(workdir)
3164            .envs(env)
3165            .stdin(Stdio::null())
3166            .stdout(Stdio::from(stdout))
3167            .stderr(Stdio::from(stderr))
3168            .spawn()
3169            .map_err(|e| format!("failed to spawn background bash command: {e}"))
3170    }
3171    #[cfg(windows)]
3172    {
3173        use crate::windows_shell::shell_candidates;
3174        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
3175        // legacy foreground bash spawn path. v0.20 routes ALL bash through
3176        // this background spawn helper, including foreground tool calls
3177        // where the model writes PowerShell-syntax (`$var = ...`,
3178        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
3179        // The earlier v0.18-era cmd-first override worked around a
3180        // PowerShell detached-output bug; that bug is fixed at the
3181        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
3182        // see flag block below), so we no longer need to misroute PS
3183        // commands through cmd.
3184        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3185        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
3186        // first (so the bg child outlives the AFT process when AFT is killed),
3187        // then fall back without it for environments where the parent is in a
3188        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
3189        // runners (GitHub Actions windows-2022) and some MDM-managed corp
3190        // environments hit this — `CreateProcess` returns Access Denied (5).
3191        // Without breakaway, the child still runs detached but will be torn
3192        // down with the parent if the parent process group is signaled.
3193        //
3194        // We use CREATE_NO_WINDOW (no visible console window, but the
3195        // child still has a hidden console) rather than DETACHED_PROCESS
3196        // (no console at all). PowerShell-based wrappers that perform
3197        // file I/O via [System.IO.File] need a console handle to flush
3198        // stdout/stderr correctly even when redirected — under
3199        // DETACHED_PROCESS, pwsh sometimes silently exits before
3200        // executing later script statements (the Move-Item that writes
3201        // the exit marker never runs), leaving the bg task forever
3202        // marked Failed: process exited without exit marker. cmd.exe
3203        // wrappers tolerate DETACHED_PROCESS, but switching to
3204        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
3205        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3206        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3207        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3208        let with_breakaway =
3209            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3210        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3211        let mut last_error: Option<String> = None;
3212        for (idx, shell) in candidates.iter().enumerate() {
3213            // Per-shell, try with breakaway first. If the process is in a
3214            // restrictive job, the breakaway flag triggers Access Denied
3215            // (os error 5). Retry once without breakaway.
3216            for &flags in &[with_breakaway, without_breakaway] {
3217                // Re-open capture handles per attempt; spawn() consumes them.
3218                let stdout = create_capture_file(&paths.stdout)
3219                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3220                let stderr = create_capture_file(&paths.stderr)
3221                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3222                let mut cmd =
3223                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3224                cmd.current_dir(workdir)
3225                    .envs(env)
3226                    .stdin(Stdio::null())
3227                    .stdout(Stdio::from(stdout))
3228                    .stderr(Stdio::from(stderr));
3229                match cmd.spawn() {
3230                    Ok(child) => {
3231                        if idx > 0 {
3232                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3233                             the cached PATH probe disagreed with runtime spawn — likely PATH \
3234                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3235                            shell.binary(),
3236                            idx);
3237                        }
3238                        if flags == without_breakaway {
3239                            crate::slog_warn!(
3240                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3241                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
3242                             Spawned without breakaway; the bg task will be torn down if the \
3243                             AFT process group is killed."
3244                            );
3245                        }
3246                        return Ok(child);
3247                    }
3248                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3249                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3250                        shell.binary());
3251                        last_error = Some(format!("{}: {e}", shell.binary()));
3252                        // Skip the without-breakaway retry for NotFound — the
3253                        // binary itself is missing, breakaway flag is irrelevant.
3254                        break;
3255                    }
3256                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3257                        // Access Denied during breakaway — retry without it.
3258                        crate::slog_warn!(
3259                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3260                         Access Denied — retrying {} without breakaway",
3261                            shell.binary()
3262                        );
3263                        last_error = Some(format!("{}: {e}", shell.binary()));
3264                        continue;
3265                    }
3266                    Err(e) => {
3267                        return Err(format!(
3268                            "failed to spawn background bash command via {}: {e}",
3269                            shell.binary()
3270                        ));
3271                    }
3272                }
3273            }
3274        }
3275        Err(format!(
3276            "failed to spawn background bash command: no Windows shell could be spawned. \
3277             Last error: {}. PATH-probed candidates: {:?}",
3278            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3279            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3280        ))
3281    }
3282}
3283
3284fn random_slug() -> String {
3285    let mut bytes = [0u8; 4];
3286    // getrandom is a transitive dependency; use it directly for OS entropy.
3287    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3288        // Extremely unlikely fallback: time + pid mix.
3289        let t = SystemTime::now()
3290            .duration_since(UNIX_EPOCH)
3291            .map(|d| d.subsec_nanos())
3292            .unwrap_or(0);
3293        let p = std::process::id();
3294        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3295    });
3296    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
3297    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3298    format!("bash-{hex}")
3299}
3300
3301#[cfg(test)]
3302mod tests {
3303    use std::collections::HashMap;
3304    #[cfg(windows)]
3305    use std::fs;
3306    use std::sync::{Arc, Mutex};
3307    use std::time::Duration;
3308    #[cfg(windows)]
3309    use std::time::Instant;
3310
3311    use super::*;
3312
3313    #[cfg(unix)]
3314    const QUICK_SUCCESS_COMMAND: &str = "true";
3315    #[cfg(windows)]
3316    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3317
3318    #[cfg(unix)]
3319    const LONG_RUNNING_COMMAND: &str = "sleep 5";
3320    #[cfg(windows)]
3321    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3322
3323    #[test]
3324    fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
3325        let registry = BgTaskRegistry::default();
3326        let dir = tempfile::tempdir().unwrap();
3327        let task_id = registry
3328            .spawn_pty(
3329                QUICK_SUCCESS_COMMAND,
3330                "session".to_string(),
3331                dir.path().to_path_buf(),
3332                HashMap::new(),
3333                Some(Duration::from_secs(30)),
3334                dir.path().to_path_buf(),
3335                10,
3336                true,
3337                false,
3338                Some(dir.path().to_path_buf()),
3339                50,
3340                120,
3341            )
3342            .unwrap();
3343
3344        let paths = task_paths(dir.path(), "session", &task_id);
3345        let metadata = read_task(&paths.json).unwrap();
3346        assert_eq!(
3347            metadata.schema_version,
3348            crate::bash_background::persistence::SCHEMA_VERSION
3349        );
3350        assert_eq!(metadata.mode, BgMode::Pty);
3351        assert_eq!(metadata.pty_rows, Some(50));
3352        assert_eq!(metadata.pty_cols, Some(120));
3353
3354        let snapshot = registry
3355            .status(&task_id, "session", None, Some(dir.path()), 1024)
3356            .unwrap();
3357        assert_eq!(snapshot.pty_rows, Some(50));
3358        assert_eq!(snapshot.pty_cols, Some(120));
3359    }
3360
3361    /// Spawn a child process that exits immediately and return it after
3362    /// it has terminated. Used by reap_child tests to simulate the
3363    /// "child exists and is dead" state when the watchdog has already
3364    /// nulled out the original child handle.
3365    fn spawn_dead_child() -> std::process::Child {
3366        #[cfg(unix)]
3367        let mut cmd = std::process::Command::new("true");
3368        #[cfg(windows)]
3369        let mut cmd = {
3370            let mut c = std::process::Command::new("cmd");
3371            c.args(["/c", "exit", "0"]);
3372            c
3373        };
3374        cmd.stdin(std::process::Stdio::null());
3375        cmd.stdout(std::process::Stdio::null());
3376        cmd.stderr(std::process::Stdio::null());
3377        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
3378        // Poll try_wait() until the child actually exits, instead of calling
3379        // wait() which closes the OS handle. On Windows, after wait()
3380        // closes the handle, subsequent try_wait() calls (which reap_child
3381        // depends on) return Err — the test was inadvertently giving
3382        // reap_child an unusable child handle. Polling try_wait() keeps the
3383        // handle open and observes natural exit, matching the production
3384        // shape where the watchdog discovers an exited child for the first
3385        // time.
3386        let started = Instant::now();
3387        loop {
3388            match child.try_wait() {
3389                Ok(Some(_)) => break,
3390                Ok(None) => {
3391                    if started.elapsed() > Duration::from_secs(5) {
3392                        panic!("dead-child stand-in did not exit within 5s");
3393                    }
3394                    std::thread::sleep(Duration::from_millis(10));
3395                }
3396                Err(error) => panic!("dead-child try_wait failed: {error}"),
3397            }
3398        }
3399        child
3400    }
3401
3402    #[test]
3403    fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
3404        let registry = BgTaskRegistry::default();
3405        let dir = tempfile::tempdir().unwrap();
3406        let task_id = registry
3407            .spawn(
3408                LONG_RUNNING_COMMAND,
3409                "session".to_string(),
3410                dir.path().to_path_buf(),
3411                HashMap::new(),
3412                Some(Duration::from_secs(30)),
3413                dir.path().to_path_buf(),
3414                10,
3415                true,
3416                false,
3417                Some(dir.path().to_path_buf()),
3418            )
3419            .unwrap();
3420        registry
3421            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3422            .unwrap();
3423        assert_eq!(
3424            registry
3425                .drain_completions_for_session(Some("session"))
3426                .len(),
3427            1
3428        );
3429
3430        // Simulate the plugin consuming a sync bash_watch({ exit:true }) result
3431        // locally before the Rust completion queue is drained/acked.
3432        registry.inner.completions.lock().unwrap().clear();
3433
3434        assert_eq!(
3435            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3436            vec![task_id.clone()]
3437        );
3438        assert!(registry
3439            .drain_completions_for_session(Some("session"))
3440            .is_empty());
3441
3442        let paths = task_paths(dir.path(), "session", &task_id);
3443        let metadata = read_task(&paths.json).unwrap();
3444        assert!(metadata.completion_delivered);
3445
3446        let replayed = BgTaskRegistry::default();
3447        replayed
3448            .replay_session_inner(dir.path(), "session", None)
3449            .unwrap();
3450        assert!(replayed
3451            .drain_completions_for_session(Some("session"))
3452            .is_empty());
3453    }
3454
3455    #[test]
3456    fn register_watch_rejects_unknown_task() {
3457        let registry = BgTaskRegistry::default();
3458
3459        let result = registry.register_watch(
3460            "missing-task".to_string(),
3461            WatchPattern::Substring("READY".into()),
3462            true,
3463        );
3464
3465        assert_eq!(result, Err("task_not_found"));
3466    }
3467
3468    #[test]
3469    fn register_watch_on_terminal_task_scans_existing_output() {
3470        let frames = Arc::new(Mutex::new(Vec::new()));
3471        let captured = Arc::clone(&frames);
3472        let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
3473            captured.lock().unwrap().push(frame);
3474        })
3475            as Box<dyn Fn(PushFrame) + Send + Sync>);
3476        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
3477        let dir = tempfile::tempdir().unwrap();
3478        let task_id = registry
3479            .spawn(
3480                LONG_RUNNING_COMMAND,
3481                "session".to_string(),
3482                dir.path().to_path_buf(),
3483                HashMap::new(),
3484                Some(Duration::from_secs(30)),
3485                dir.path().to_path_buf(),
3486                10,
3487                true,
3488                false,
3489                Some(dir.path().to_path_buf()),
3490            )
3491            .unwrap();
3492        registry
3493            .inner
3494            .shutdown
3495            .store(true, std::sync::atomic::Ordering::SeqCst);
3496        let task = registry.task_for_session(&task_id, "session").unwrap();
3497        std::fs::write(&task.paths.stdout, "READY\n").unwrap();
3498        registry
3499            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3500            .unwrap();
3501        frames.lock().unwrap().clear();
3502        registry.inner.completions.lock().unwrap().clear();
3503
3504        registry
3505            .register_watch(
3506                task_id.clone(),
3507                WatchPattern::Substring("READY".into()),
3508                true,
3509            )
3510            .unwrap();
3511
3512        let frames = frames.lock().unwrap();
3513        let frame = frames
3514            .iter()
3515            .find_map(|frame| match frame {
3516                PushFrame::BashPatternMatch(frame) => Some(frame),
3517                _ => None,
3518            })
3519            .expect("terminal watch registration should emit pattern frame");
3520        assert_eq!(frame.reason, "pattern_match");
3521        assert_eq!(frame.task_id, task_id);
3522        assert_eq!(frame.session_id, "session");
3523        assert_eq!(frame.match_text, "READY");
3524        assert_eq!(frame.match_offset, 0);
3525        assert_eq!(registry.active_watch_count(&frame.task_id), 0);
3526        let metadata = read_task(&task.paths.json).unwrap();
3527        assert!(metadata.completion_delivered);
3528    }
3529
3530    #[test]
3531    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
3532        let registry = BgTaskRegistry::default();
3533        let dir = tempfile::tempdir().unwrap();
3534        let task_id = registry
3535            .spawn(
3536                QUICK_SUCCESS_COMMAND,
3537                "session".to_string(),
3538                dir.path().to_path_buf(),
3539                HashMap::new(),
3540                Some(Duration::from_secs(30)),
3541                dir.path().to_path_buf(),
3542                10,
3543                true,
3544                false,
3545                Some(dir.path().to_path_buf()),
3546            )
3547            .unwrap();
3548        registry
3549            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3550            .unwrap();
3551        let completions = registry.drain_completions_for_session(Some("session"));
3552        assert_eq!(completions.len(), 1);
3553        assert_eq!(
3554            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3555            vec![task_id.clone()]
3556        );
3557
3558        registry.cleanup_finished(Duration::ZERO);
3559
3560        assert!(registry.inner.tasks.lock().unwrap().is_empty());
3561    }
3562
3563    #[test]
3564    fn cleanup_finished_retains_undelivered_terminals() {
3565        let registry = BgTaskRegistry::default();
3566        let dir = tempfile::tempdir().unwrap();
3567        let task_id = registry
3568            .spawn(
3569                QUICK_SUCCESS_COMMAND,
3570                "session".to_string(),
3571                dir.path().to_path_buf(),
3572                HashMap::new(),
3573                Some(Duration::from_secs(30)),
3574                dir.path().to_path_buf(),
3575                10,
3576                true,
3577                false,
3578                Some(dir.path().to_path_buf()),
3579            )
3580            .unwrap();
3581        registry
3582            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3583            .unwrap();
3584
3585        registry.cleanup_finished(Duration::ZERO);
3586
3587        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3588    }
3589
3590    /// Verify that the live watchdog path (reap_child) gives an exited
3591    /// child one watchdog pass for its exit marker to land, then marks the
3592    /// task Failed if the next pass still sees no marker.
3593    ///
3594    /// Cross-platform: uses a quick-exiting command that does NOT go
3595    /// through the wrapper script (we manually clear the exit marker
3596    /// after spawn to simulate the wrapper crashing before write).
3597    #[test]
3598    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
3599        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3600        let dir = tempfile::tempdir().unwrap();
3601        let task_id = registry
3602            .spawn(
3603                QUICK_SUCCESS_COMMAND,
3604                "session".to_string(),
3605                dir.path().to_path_buf(),
3606                HashMap::new(),
3607                Some(Duration::from_secs(30)),
3608                dir.path().to_path_buf(),
3609                10,
3610                true,
3611                false,
3612                Some(dir.path().to_path_buf()),
3613            )
3614            .unwrap();
3615
3616        let task = registry.task_for_session(&task_id, "session").unwrap();
3617
3618        // Wait for the child to actually exit and the wrapper to either
3619        // write the marker or fail. Then nuke the marker to simulate
3620        // wrapper crash before write. Poll up to 5s; this is plenty for a
3621        // `true`/`cmd /c exit 0` invocation.
3622        let started = Instant::now();
3623        loop {
3624            let exited = {
3625                let mut state = task.state.lock().unwrap();
3626                match &mut state.runtime {
3627                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3628                    _ => true,
3629                }
3630            };
3631            if exited {
3632                break;
3633            }
3634            assert!(
3635                started.elapsed() < Duration::from_secs(5),
3636                "child should exit quickly"
3637            );
3638            std::thread::sleep(Duration::from_millis(20));
3639        }
3640
3641        // Stop the watchdog so it doesn't race with our manual reap_child.
3642        // On fast Windows runners the watchdog ticks (every 500ms) can
3643        // observe the child exit and reap it before this test's assertion
3644        // fires, leaving us with state.child = None and an already-terminal
3645        // status. We specifically want to test reap_child's logic when
3646        // invoked manually on a Running-but-actually-dead task, so we need
3647        // exclusive control over the reap path here.
3648        registry
3649            .inner
3650            .shutdown
3651            .store(true, std::sync::atomic::Ordering::SeqCst);
3652        // Give the watchdog at most one tick (500ms) to notice shutdown
3653        // before we touch task state. Without this, an in-flight watchdog
3654        // iteration could still race with our state setup below.
3655        std::thread::sleep(Duration::from_millis(550));
3656
3657        // Wrapper likely wrote the marker by now; remove it to simulate
3658        // a wrapper crash that exited before persisting the exit code.
3659        let _ = std::fs::remove_file(&task.paths.exit);
3660
3661        // The watchdog may have already reaped the child handle and
3662        // marked the task terminal before we got here. Reset both so
3663        // reap_child has the "Running task whose child just exited"
3664        // shape it's designed to handle. If the original child handle is
3665        // gone, install a quick-exited stand-in so the first reap exercises
3666        // the same try_wait path as production.
3667        //
3668        // CRITICAL on Windows: the watchdog ticks fast enough that the
3669        // JSON on disk may already say `Completed`. `update_task` (called
3670        // by `reap_child`) reads from disk, applies the closure, but
3671        // ROLLS BACK if the original on-disk state was already terminal
3672        // (see persistence.rs::update_task). So we must reset BOTH
3673        // in-memory metadata AND the JSON on disk to a Running state to
3674        // give reap_child the fresh shape it expects to operate on.
3675        {
3676            let mut state = task.state.lock().unwrap();
3677            state.metadata.status = BgTaskStatus::Running;
3678            state.metadata.status_reason = None;
3679            state.metadata.exit_code = None;
3680            state.metadata.finished_at = None;
3681            state.metadata.duration_ms = None;
3682            // Persist the reset state to disk so update_task's terminal
3683            // rollback guard sees a non-terminal starting point.
3684            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3685                .expect("persist reset Running metadata for reap_child test");
3686            // If the watchdog already nulled state.child, we need to
3687            // simulate "child exists and is dead" so reap_child's
3688            // try_wait path runs. Spawn a quick-exit child as a stand-in.
3689            if matches!(state.runtime, TaskRuntime::Piped(None)) {
3690                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3691            }
3692        }
3693        // Clear the terminal_at marker too so mark_terminal_now() can fire
3694        // again inside reap_child.
3695        *task.terminal_at.lock().unwrap() = None;
3696
3697        // Sanity: task is still Running per metadata (replay/poll hasn't
3698        // observed the missing marker yet).
3699        assert!(
3700            task.is_running(),
3701            "precondition: metadata.status == Running"
3702        );
3703        assert!(
3704            !task.paths.exit.exists(),
3705            "precondition: exit marker absent"
3706        );
3707
3708        // First watchdog observation is intentionally insufficient to
3709        // declare failure. A missing marker may just mean the wrapper is
3710        // still completing its tmp-file-to-marker rename, so reap_child only
3711        // drops the child handle and switches to detached PID monitoring.
3712        registry.reap_child(&task);
3713
3714        {
3715            let state = task.state.lock().unwrap();
3716            assert_eq!(
3717                state.metadata.status,
3718                BgTaskStatus::Running,
3719                "first reap must leave status Running while waiting one pass for marker"
3720            );
3721            assert_eq!(
3722                state.metadata.status_reason, None,
3723                "first reap must not record a failure reason"
3724            );
3725            assert!(
3726                matches!(state.runtime, TaskRuntime::Piped(None)),
3727                "child handle must be released after first reap"
3728            );
3729            assert!(
3730                state.detached,
3731                "task must be marked detached after first reap"
3732            );
3733        }
3734
3735        // Second watchdog observation sees the detached PID is dead and the
3736        // marker is still absent. That is strong enough evidence that the
3737        // wrapper exited without persisting an exit code.
3738        registry.reap_child(&task);
3739
3740        let state = task.state.lock().unwrap();
3741        assert!(
3742            state.metadata.status.is_terminal(),
3743            "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
3744            state.metadata.status
3745        );
3746        assert_eq!(
3747            state.metadata.status,
3748            BgTaskStatus::Failed,
3749            "must specifically be Failed (not Killed): status={:?}",
3750            state.metadata.status
3751        );
3752        assert_eq!(
3753            state.metadata.status_reason.as_deref(),
3754            Some("process exited without exit marker"),
3755            "reason must match replay path's wording: {:?}",
3756            state.metadata.status_reason
3757        );
3758        assert!(
3759            matches!(state.runtime, TaskRuntime::Piped(None)),
3760            "child handle must stay released after second reap"
3761        );
3762        assert!(
3763            state.detached,
3764            "task must remain detached after second reap"
3765        );
3766    }
3767
3768    /// Companion to the above: when the exit marker DOES exist on disk
3769    /// at reap_child time, reap_child must NOT mark the task Failed.
3770    /// Instead it leaves status=Running and lets the next poll_task()
3771    /// cycle finalize via the marker.
3772    #[test]
3773    fn reap_child_preserves_running_when_exit_marker_exists() {
3774        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3775        let dir = tempfile::tempdir().unwrap();
3776        let task_id = registry
3777            .spawn(
3778                QUICK_SUCCESS_COMMAND,
3779                "session".to_string(),
3780                dir.path().to_path_buf(),
3781                HashMap::new(),
3782                Some(Duration::from_secs(30)),
3783                dir.path().to_path_buf(),
3784                10,
3785                true,
3786                false,
3787                Some(dir.path().to_path_buf()),
3788            )
3789            .unwrap();
3790
3791        let task = registry.task_for_session(&task_id, "session").unwrap();
3792
3793        // Wait for child to exit AND for the marker to land. Both happen
3794        // shortly after the wrapper finishes — but we want both observed.
3795        let started = Instant::now();
3796        loop {
3797            let exited = {
3798                let mut state = task.state.lock().unwrap();
3799                match &mut state.runtime {
3800                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3801                    _ => true,
3802                }
3803            };
3804            if exited && task.paths.exit.exists() {
3805                break;
3806            }
3807            assert!(
3808                started.elapsed() < Duration::from_secs(5),
3809                "child should exit and write marker quickly"
3810            );
3811            std::thread::sleep(Duration::from_millis(20));
3812        }
3813
3814        // Stop the watchdog so it doesn't race with our manual reap_child.
3815        // On fast Windows runners the watchdog can call poll_task (which
3816        // finalizes via marker) before this test asserts the
3817        // "marker exists, status still Running" invariant. We want
3818        // exclusive control over the reap path.
3819        registry
3820            .inner
3821            .shutdown
3822            .store(true, std::sync::atomic::Ordering::SeqCst);
3823        std::thread::sleep(Duration::from_millis(550));
3824
3825        // If the watchdog already finalized the task before we stopped it,
3826        // restore the test setup: reset status to Running and ensure the
3827        // marker file is still on disk. We're testing reap_child's
3828        // behavior when called manually with both child-exited AND
3829        // marker-present, regardless of whether the watchdog beat us.
3830        {
3831            let mut state = task.state.lock().unwrap();
3832            state.metadata.status = BgTaskStatus::Running;
3833            state.metadata.status_reason = None;
3834            if matches!(state.runtime, TaskRuntime::Piped(None)) {
3835                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3836            }
3837        }
3838        *task.terminal_at.lock().unwrap() = None;
3839        // Make sure the marker is still on disk (poll_task removes it on
3840        // finalization). Recreate it if needed.
3841        if !task.paths.exit.exists() {
3842            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
3843        }
3844
3845        // reap_child sees: child exited, marker exists. It should:
3846        //  - drop state.child / set state.detached = true
3847        //  - NOT change status (poll_task will finalize via marker next tick)
3848        registry.reap_child(&task);
3849
3850        let state = task.state.lock().unwrap();
3851        assert!(
3852            matches!(state.runtime, TaskRuntime::Piped(None)),
3853            "child handle still released even when marker exists"
3854        );
3855        assert!(
3856            state.detached,
3857            "task still marked detached even when marker exists"
3858        );
3859        // Status remains Running because reap_child defers to poll_task
3860        // when a marker exists. It would be wrong for reap to record the
3861        // marker outcome (poll_task does that with proper exit-code
3862        // parsing).
3863        assert_eq!(
3864            state.metadata.status,
3865            BgTaskStatus::Running,
3866            "reap_child must defer to poll_task when marker exists"
3867        );
3868    }
3869
3870    /// Read a process's `ps` state string ("Z", "S", "R", etc). Returns
3871    /// `None` once the PID has been fully reaped (no row), which is the
3872    /// post-reap state we want.
3873    #[cfg(unix)]
3874    fn pid_stat(pid: u32) -> Option<String> {
3875        let output = std::process::Command::new("ps")
3876            .args(["-o", "stat=", "-p", &pid.to_string()])
3877            .output()
3878            .ok()?;
3879        if !output.status.success() {
3880            return None;
3881        }
3882        let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
3883        if stat.is_empty() {
3884            None
3885        } else {
3886            Some(stat)
3887        }
3888    }
3889
3890    /// A `<defunct>` zombie carries `ps` state starting with 'Z'.
3891    #[cfg(unix)]
3892    fn is_zombie(pid: u32) -> bool {
3893        pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
3894    }
3895
3896    /// Spawn a child that exits immediately and wait — via `ps`, NOT
3897    /// `try_wait()`/`wait()` — until it is observably a `<defunct>` zombie,
3898    /// then return the still-unreaped handle. This reproduces the exact
3899    /// state issue #91 leaves behind: an exited OS child whose parent has
3900    /// not reaped it.
3901    #[cfg(unix)]
3902    fn spawn_unreaped_zombie() -> std::process::Child {
3903        let child = std::process::Command::new("true")
3904            .stdin(std::process::Stdio::null())
3905            .stdout(std::process::Stdio::null())
3906            .stderr(std::process::Stdio::null())
3907            .spawn()
3908            .expect("spawn zombie stand-in");
3909        let pid = child.id();
3910        let started = Instant::now();
3911        while !is_zombie(pid) {
3912            assert!(
3913                started.elapsed() < Duration::from_secs(5),
3914                "stand-in child should become a zombie within 5s"
3915            );
3916            std::thread::sleep(Duration::from_millis(10));
3917        }
3918        // Return WITHOUT reaping — the handle still owns an unwaited zombie.
3919        child
3920    }
3921
3922    /// Regression test for issue #91: the exit-marker terminal path
3923    /// (`poll_task` -> `finalize_from_marker`) must REAP the direct child
3924    /// handle, not merely drop it. Dropping a `std::process::Child` does not
3925    /// `wait()` on Unix, so the exited child lingers as a `[mv] <defunct>`
3926    /// zombie until AFT exits.
3927    ///
3928    /// We install a known-unreaped zombie into the task's child slot and
3929    /// drive the marker finalize path, then assert the child is gone (reaped)
3930    /// rather than still `<defunct>`.
3931    #[cfg(unix)]
3932    #[test]
3933    fn finalize_from_marker_reaps_child_no_zombie() {
3934        use std::sync::atomic::Ordering;
3935
3936        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3937        let dir = tempfile::tempdir().unwrap();
3938        let task_id = registry
3939            .spawn(
3940                QUICK_SUCCESS_COMMAND,
3941                "session".to_string(),
3942                dir.path().to_path_buf(),
3943                HashMap::new(),
3944                Some(Duration::from_secs(30)),
3945                dir.path().to_path_buf(),
3946                10,
3947                true,
3948                false,
3949                Some(dir.path().to_path_buf()),
3950            )
3951            .unwrap();
3952
3953        // Stop the watchdog so the ONLY terminal-transition path under test
3954        // is the exit-marker finalize (not reap_child's try_wait, which would
3955        // reap the child for us and mask the bug).
3956        registry.inner.shutdown.store(true, Ordering::SeqCst);
3957        std::thread::sleep(Duration::from_millis(550));
3958
3959        let task = registry.task_for_session(&task_id, "session").unwrap();
3960
3961        // Wait for the wrapper's exit marker to land. We deliberately do NOT
3962        // call try_wait()/wait() on the real child here — doing so would reap
3963        // it and defeat the test.
3964        let started = Instant::now();
3965        while !task.paths.exit.exists() {
3966            assert!(
3967                started.elapsed() < Duration::from_secs(5),
3968                "exit marker should land quickly for `true`"
3969            );
3970            std::thread::sleep(Duration::from_millis(20));
3971        }
3972
3973        // Reset to a fresh Running shape and install a guaranteed-unreaped
3974        // zombie as the child handle, so the finalize path's reap behavior is
3975        // exercised deterministically regardless of how the real child was
3976        // handled. Persist Running so update_task's terminal-rollback guard
3977        // sees a non-terminal starting point.
3978        let zombie_pid;
3979        {
3980            let mut state = task.state.lock().unwrap();
3981            state.metadata.status = BgTaskStatus::Running;
3982            state.metadata.status_reason = None;
3983            state.metadata.exit_code = None;
3984            state.metadata.finished_at = None;
3985            state.metadata.duration_ms = None;
3986            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3987                .expect("persist reset Running metadata");
3988            let zombie = spawn_unreaped_zombie();
3989            zombie_pid = zombie.id();
3990            state.runtime = TaskRuntime::Piped(Some(zombie));
3991        }
3992        *task.terminal_at.lock().unwrap() = None;
3993
3994        // Precondition: the installed child is genuinely a `<defunct>` zombie.
3995        assert!(
3996            is_zombie(zombie_pid),
3997            "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
3998        );
3999
4000        // Drive the exit-marker terminal path. Before the fix this nulled the
4001        // Child handle without wait(), leaving the zombie behind.
4002        registry.poll_task(&task).unwrap();
4003
4004        {
4005            let state = task.state.lock().unwrap();
4006            assert!(
4007                matches!(state.runtime, TaskRuntime::Piped(None)),
4008                "child handle must be released after marker finalize"
4009            );
4010            assert!(
4011                state.metadata.status.is_terminal(),
4012                "task must be terminal after marker finalize: {:?}",
4013                state.metadata.status
4014            );
4015        }
4016
4017        // The core assertion: the child must have been REAPED, not just
4018        // dropped. A reaped PID has no `ps` row (or at minimum is not 'Z').
4019        assert!(
4020            !is_zombie(zombie_pid),
4021            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
4022             after the exit-marker terminal transition"
4023        );
4024    }
4025
4026    /// Companion to the above for the kill path: when a kill observes an
4027    /// already-present exit marker (the child finished on its own first), it
4028    /// must reap the child handle rather than dropping it.
4029    #[cfg(unix)]
4030    #[test]
4031    fn kill_with_existing_marker_reaps_child_no_zombie() {
4032        use std::sync::atomic::Ordering;
4033
4034        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4035        let dir = tempfile::tempdir().unwrap();
4036        let task_id = registry
4037            .spawn(
4038                QUICK_SUCCESS_COMMAND,
4039                "session".to_string(),
4040                dir.path().to_path_buf(),
4041                HashMap::new(),
4042                Some(Duration::from_secs(30)),
4043                dir.path().to_path_buf(),
4044                10,
4045                true,
4046                false,
4047                Some(dir.path().to_path_buf()),
4048            )
4049            .unwrap();
4050
4051        registry.inner.shutdown.store(true, Ordering::SeqCst);
4052        std::thread::sleep(Duration::from_millis(550));
4053
4054        let task = registry.task_for_session(&task_id, "session").unwrap();
4055
4056        let started = Instant::now();
4057        while !task.paths.exit.exists() {
4058            assert!(
4059                started.elapsed() < Duration::from_secs(5),
4060                "exit marker should land quickly for `true`"
4061            );
4062            std::thread::sleep(Duration::from_millis(20));
4063        }
4064
4065        let zombie_pid;
4066        {
4067            let mut state = task.state.lock().unwrap();
4068            state.metadata.status = BgTaskStatus::Running;
4069            state.metadata.status_reason = None;
4070            state.metadata.exit_code = None;
4071            state.metadata.finished_at = None;
4072            state.metadata.duration_ms = None;
4073            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4074                .expect("persist reset Running metadata");
4075            let zombie = spawn_unreaped_zombie();
4076            zombie_pid = zombie.id();
4077            state.runtime = TaskRuntime::Piped(Some(zombie));
4078        }
4079        *task.terminal_at.lock().unwrap() = None;
4080
4081        assert!(
4082            is_zombie(zombie_pid),
4083            "precondition: stand-in child {zombie_pid} must be a zombie before kill"
4084        );
4085
4086        // Kill observes the existing marker and finalizes from it.
4087        registry
4088            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4089            .expect("kill should succeed");
4090
4091        {
4092            let state = task.state.lock().unwrap();
4093            assert!(
4094                matches!(state.runtime, TaskRuntime::Piped(None)),
4095                "child handle must be released after marker-aware kill"
4096            );
4097            assert!(state.metadata.status.is_terminal());
4098        }
4099
4100        assert!(
4101            !is_zombie(zombie_pid),
4102            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
4103             after a marker-aware kill"
4104        );
4105    }
4106
4107    #[test]
4108    fn cleanup_finished_keeps_running_tasks() {
4109        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4110        let dir = tempfile::tempdir().unwrap();
4111        let task_id = registry
4112            .spawn(
4113                LONG_RUNNING_COMMAND,
4114                "session".to_string(),
4115                dir.path().to_path_buf(),
4116                HashMap::new(),
4117                Some(Duration::from_secs(30)),
4118                dir.path().to_path_buf(),
4119                10,
4120                true,
4121                false,
4122                Some(dir.path().to_path_buf()),
4123            )
4124            .unwrap();
4125
4126        registry.cleanup_finished(Duration::ZERO);
4127
4128        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4129        let _ = registry.kill(&task_id, "session");
4130    }
4131
4132    #[cfg(windows)]
4133    fn wait_for_file(path: &Path) -> String {
4134        let started = Instant::now();
4135        loop {
4136            if path.exists() {
4137                return fs::read_to_string(path).expect("read file");
4138            }
4139            assert!(
4140                started.elapsed() < Duration::from_secs(30),
4141                "timed out waiting for {}",
4142                path.display()
4143            );
4144            std::thread::sleep(Duration::from_millis(100));
4145        }
4146    }
4147
4148    #[cfg(windows)]
4149    fn spawn_windows_registry_command(
4150        command: &str,
4151    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
4152        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4153        let dir = tempfile::tempdir().unwrap();
4154        let task_id = registry
4155            .spawn(
4156                command,
4157                "session".to_string(),
4158                dir.path().to_path_buf(),
4159                HashMap::new(),
4160                Some(Duration::from_secs(30)),
4161                dir.path().to_path_buf(),
4162                10,
4163                false,
4164                false,
4165                Some(dir.path().to_path_buf()),
4166            )
4167            .unwrap();
4168        (registry, dir, task_id)
4169    }
4170
4171    #[cfg(windows)]
4172    #[test]
4173    fn windows_spawn_writes_exit_marker_for_zero_exit() {
4174        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
4175        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
4176
4177        let content = wait_for_file(&exit_path);
4178
4179        assert_eq!(content.trim(), "0");
4180    }
4181
4182    #[cfg(windows)]
4183    #[test]
4184    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
4185        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
4186        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
4187
4188        let content = wait_for_file(&exit_path);
4189
4190        assert_eq!(content.trim(), "42");
4191    }
4192
4193    #[cfg(windows)]
4194    #[test]
4195    fn windows_spawn_captures_stdout_to_disk() {
4196        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
4197        let task = registry.task_for_session(&task_id, "session").unwrap();
4198        let stdout_path = task.paths.stdout.clone();
4199        let exit_path = task.paths.exit.clone();
4200
4201        let _ = wait_for_file(&exit_path);
4202        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
4203
4204        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
4205    }
4206
4207    #[cfg(windows)]
4208    #[test]
4209    fn windows_spawn_uses_pwsh_when_available() {
4210        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
4211        // (We intentionally pass None for shell_env to keep this test
4212        // independent of the runner's actual env.)
4213        let candidates = crate::windows_shell::shell_candidates_with(
4214            |binary| match binary {
4215                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
4216                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
4217                _ => None,
4218            },
4219            || None,
4220        );
4221        let shell = candidates.first().expect("at least one candidate").clone();
4222        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
4223        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
4224    }
4225
4226    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
4227    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
4228    /// evaluation is the default for batch files; parse-time expansion only
4229    /// applies to compound `&`-chained inline commands). Capturing
4230    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
4231    /// command runs records the real run-time exit code.
4232    #[cfg(windows)]
4233    #[test]
4234    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
4235        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
4236        let script =
4237            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
4238
4239        // Batch wrapper: capture exit code into CODE on the line after the
4240        // user command, then write CODE to a temp marker file before
4241        // atomic-renaming it into place.
4242        assert!(
4243            script.contains("set CODE=%ERRORLEVEL%"),
4244            "wrapper must capture exit code into CODE: {script}"
4245        );
4246        assert!(
4247            script.contains("echo %CODE% >"),
4248            "wrapper must echo CODE to a temp marker file: {script}"
4249        );
4250        assert!(
4251            script.contains("move /Y"),
4252            "wrapper must use atomic move to write the marker: {script}"
4253        );
4254        // move output must be redirected to nul to avoid polluting the
4255        // user's captured stdout with "1 file(s) moved." lines.
4256        assert!(
4257            script.contains("> nul"),
4258            "wrapper must redirect move output to nul: {script}"
4259        );
4260        // exit /B %CODE% propagates the real exit code so wait() sees it.
4261        assert!(
4262            script.contains("exit /B %CODE%"),
4263            "wrapper must propagate the captured exit code: {script}"
4264        );
4265        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
4266        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
4267    }
4268
4269    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
4270    /// written to a `.bat` file where batch-line evaluation captures
4271    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
4272    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
4273    /// internal `"`-quoting from `cmd_quote`).
4274    #[cfg(windows)]
4275    #[test]
4276    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
4277        use crate::windows_shell::WindowsShell;
4278        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
4279        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
4280        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
4281        assert_eq!(
4282            args_strs,
4283            vec!["/D", "/S", "/C", "echo wrapped"],
4284            "Cmd::bg_command must prepend /D /S /C"
4285        );
4286    }
4287
4288    /// PowerShell variants don't need `/V:ON`-style flags; their args
4289    /// are the same for foreground (`command()`) and background
4290    /// (`bg_command()`).
4291    #[cfg(windows)]
4292    #[test]
4293    fn windows_shell_pwsh_bg_command_uses_standard_args() {
4294        use crate::windows_shell::WindowsShell;
4295        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
4296        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
4297        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
4298        assert!(
4299            args_strs.contains(&"-Command"),
4300            "Pwsh::bg_command must use -Command: {args_strs:?}"
4301        );
4302        assert!(
4303            args_strs.contains(&"Get-Date"),
4304            "Pwsh::bg_command must include the user command body"
4305        );
4306    }
4307
4308    /// Issue #27 Oracle review P1 + P2 test gap: end-to-end proof that the
4309    /// **cmd.exe-specific** wrapper path captures the user command's
4310    /// run-time exit code correctly. The existing
4311    /// `windows_spawn_writes_exit_marker_for_nonzero_exit` test would also
4312    /// pass with the buggy `%ERRORLEVEL%` wrapper if the Windows machine
4313    /// had pwsh.exe or powershell.exe on PATH (which is typical) — the
4314    /// outer wrapper would be PowerShell, not cmd, and PowerShell's
4315    /// `$LASTEXITCODE` captures the inner `cmd /c exit 42` correctly.
4316    ///
4317    /// This test directly spawns via `WindowsShell::Cmd.bg_command()` to
4318    /// force the cmd-wrapper code path, then writes the exit marker and
4319    /// asserts it contains "42" not "0". With the pre-fix `%ERRORLEVEL%`
4320    /// wrapper, this test would fail because `%ERRORLEVEL%` parse-time
4321    /// expansion would record cmd's startup ERRORLEVEL (typically 0)
4322    /// regardless of what the user command returned.
4323    /// **Disabled.** This test exercises `WindowsShell::Cmd.bg_command()` —
4324    /// the inline command-line wrapper helper that production code does
4325    /// NOT use anymore. v0.19.4 switched bg-bash to a file-based wrapper
4326    /// (`<task>.bat` / `<task>.ps1`) because the inline cmd-line quoting
4327    /// produced silent failures on Windows 11 (move /Y could not parse
4328    /// path arguments through cmd's /C parser). The `bg_command` helper
4329    /// is kept only for parity with `WindowsShell::Cmd.command()` shape;
4330    /// the production spawn path goes through `detached_shell_command_for`
4331    /// which writes the wrapper to disk and invokes `cmd /V:ON /D /C
4332    /// <bat-path>`.
4333    ///
4334    /// The `!ERRORLEVEL!` correctness this test was meant to verify is
4335    /// covered live by the Windows e2e harness scenario 2d
4336    /// (`bg bash records non-zero exit code (cmd /c exit 42)`), which
4337    /// exercises the real file-based wrapper end-to-end via the protocol.
4338    #[allow(dead_code)]
4339    #[cfg(any())] // disabled on all targets
4340    fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
4341}