Skip to main content

aft/bash_background/
registry.rs

1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, StreamKind, TokenCountInput};
27use super::output::{
28    cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29    cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30    COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES, COMPRESS_INPUT_TAIL_BYTES,
31    FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES, RAW_PASSTHROUGH_HEAD_BYTES,
32    RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES, STRUCTURED_OUTPUT_CAP_BYTES,
33};
34use super::persistence::{
35    create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
36    task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
37    ExitMarker, PersistedTask, TaskPaths,
38};
39use super::process::is_process_alive;
40#[cfg(unix)]
41use super::process::terminate_pgid;
42#[cfg(windows)]
43use super::process::terminate_pid;
44use super::pty_process::spawn_pty_for_command;
45use super::pty_runtime::PtyRuntime;
46use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
47use super::{BgTaskInfo, BgTaskStatus};
48/// Default timeout for background bash tasks: 30 minutes.
49/// Agents can override per-call via the `timeout` parameter (in ms).
50const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
51const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
52const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
53const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
54
55const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
56
57#[derive(Debug, Clone, Serialize)]
58pub struct BgCompletion {
59    pub task_id: String,
60    /// Intentionally omitted from serialized completion payloads: push frames
61    /// carry `session_id` at the BashCompletedFrame envelope level for routing.
62    #[serde(skip_serializing)]
63    pub session_id: String,
64    pub status: BgTaskStatus,
65    pub exit_code: Option<i32>,
66    pub command: String,
67    /// Small head+tail preview of the cached terminal render at completion time,
68    /// cached so push-frame consumers and `bash_drain_completions` callers see
69    /// the same preview without racing against later output rotation. Empty
70    /// when not captured (e.g., persisted task seen on startup before buffer
71    /// reattachment).
72    #[serde(default, skip_serializing_if = "String::is_empty")]
73    pub output_preview: String,
74    /// True when the captured tail is shorter than the actual output (because
75    /// rotation occurred or the output exceeds the preview cap). Plugins use
76    /// this to render a `…` prefix and signal that `bash_status` would return
77    /// more.
78    #[serde(default, skip_serializing_if = "is_false")]
79    pub output_truncated: bool,
80    /// Token count for raw stdout+stderr before compression. Omitted when any
81    /// stream exceeds the 128 KiB tokenization cap.
82    #[serde(default, skip_serializing_if = "Option::is_none")]
83    pub original_tokens: Option<u32>,
84    /// Token count for the compressed output generated from the same capped
85    /// raw payload. Omitted when raw tokenization is skipped.
86    #[serde(default, skip_serializing_if = "Option::is_none")]
87    pub compressed_tokens: Option<u32>,
88    /// True when a stream exceeded the tokenization cap and counts are absent.
89    #[serde(default, skip_serializing_if = "is_false")]
90    pub tokens_skipped: bool,
91}
92
93fn is_false(v: &bool) -> bool {
94    !*v
95}
96
97#[derive(Debug, Clone, Serialize)]
98pub struct BgTaskSnapshot {
99    #[serde(flatten)]
100    pub info: BgTaskInfo,
101    pub exit_code: Option<i32>,
102    pub child_pid: Option<u32>,
103    pub workdir: String,
104    pub output_preview: String,
105    pub output_truncated: bool,
106    pub output_path: Option<String>,
107    pub stderr_path: Option<String>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub pty_rows: Option<u16>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    pub pty_cols: Option<u16>,
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115enum TerminalOutputKind {
116    Compressed,
117    Raw,
118    Structured,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122struct TerminalOutputCache {
123    output_preview: String,
124    output_truncated: bool,
125    kind: TerminalOutputKind,
126    output_path: Option<String>,
127    stderr_path: Option<String>,
128    recovery: Option<RecoveryContext>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
132struct RecoveryContext {
133    dropped_by_class: BTreeMap<DropClass, usize>,
134    had_inner_drop: bool,
135    offset_hint_eligible: bool,
136    offset_start_line: Option<usize>,
137    byte_truncated: bool,
138    output_path: Option<String>,
139    stderr_path: Option<String>,
140    include_stderr_path: bool,
141}
142
143impl RecoveryContext {
144    fn has_visible_drop(&self) -> bool {
145        self.byte_truncated || self.had_inner_drop || !self.dropped_by_class.is_empty()
146    }
147}
148
149#[derive(Clone)]
150pub struct BgTaskRegistry {
151    pub(crate) inner: Arc<RegistryInner>,
152}
153
154pub(crate) struct RegistryInner {
155    pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
156    pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
157    pub(crate) progress_sender: SharedProgressSender,
158    watchdog_started: AtomicBool,
159    pub(crate) shutdown: AtomicBool,
160    pub(crate) long_running_reminder_enabled: AtomicBool,
161    pub(crate) long_running_reminder_interval_ms: AtomicU64,
162    persisted_gc_started: AtomicBool,
163    #[cfg(test)]
164    persisted_gc_runs: AtomicU64,
165    /// Output compression callback. Set by `AppContext` after construction.
166    /// Takes (command, raw_output, exit_code) and returns compressed text. Called from
167    /// the watchdog thread when a task reaches a terminal state and from
168    /// `bash_status`/`list` snapshot reads. When `None`, output is returned
169    /// uncompressed.
170    pub(crate) compressor:
171        Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
172    pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
173    pub(crate) db_harness: RwLock<Option<String>>,
174    pub(crate) wake_tx: crossbeam_channel::Sender<()>,
175    pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
176    pub(crate) watch_registry: Mutex<WatchRegistry>,
177}
178
179pub(crate) struct BgTask {
180    pub(crate) task_id: String,
181    pub(crate) session_id: String,
182    pub(crate) paths: TaskPaths,
183    pub(crate) started: Instant,
184    pub(crate) last_reminder_at: Mutex<Option<Instant>>,
185    pub(crate) terminal_at: Mutex<Option<Instant>>,
186    pub(crate) state: Mutex<BgTaskState>,
187}
188
189pub(crate) enum TaskRuntime {
190    Piped(Option<Child>),
191    Pty(Option<PtyRuntime>),
192}
193
194pub(crate) struct BgTaskState {
195    pub(crate) metadata: PersistedTask,
196    pub(crate) runtime: TaskRuntime,
197    pub(crate) detached: bool,
198    /// True once `reap_child` has observed the direct child handle's exit
199    /// via `try_wait()`. Used by the two-pass watchdog to skip the racy
200    /// `is_process_alive(child_pid)` probe on the second pass — we already
201    /// have authoritative evidence that the child is dead, no need to
202    /// re-verify via PID liveness which is unreliable on Windows where
203    /// PIDs can be recycled within seconds.
204    ///
205    /// Remains `false` on replay-restored tasks (those have a `child_pid`
206    /// but never observed exit via this process's `try_wait()`), so those
207    /// continue to fall through to the `is_process_alive` probe path.
208    pub(crate) child_exit_observed: bool,
209    pub(crate) buffer: BgBuffer,
210    terminal_output_cache: Option<TerminalOutputCache>,
211    /// PTY-only: set for timeout kill intent before signaling the child.
212    pub(crate) pending_terminal_override: Option<BgTaskStatus>,
213}
214
215fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
216    session_id
217        .map(|session_id| completion.session_id == session_id)
218        .unwrap_or(true)
219}
220
221impl BgTaskRegistry {
222    pub fn new(progress_sender: SharedProgressSender) -> Self {
223        let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
224        Self {
225            inner: Arc::new(RegistryInner {
226                tasks: Mutex::new(HashMap::new()),
227                completions: Mutex::new(VecDeque::new()),
228                progress_sender,
229                watchdog_started: AtomicBool::new(false),
230                shutdown: AtomicBool::new(false),
231                long_running_reminder_enabled: AtomicBool::new(true),
232                long_running_reminder_interval_ms: AtomicU64::new(600_000),
233                persisted_gc_started: AtomicBool::new(false),
234                #[cfg(test)]
235                persisted_gc_runs: AtomicU64::new(0),
236                compressor: Mutex::new(None),
237                db_pool: RwLock::new(None),
238                db_harness: RwLock::new(None),
239                wake_tx,
240                wake_rx,
241                watch_registry: Mutex::new(WatchRegistry::default()),
242            }),
243        }
244    }
245
246    pub fn set_harness(&self, harness: Harness) {
247        if let Ok(mut slot) = self.inner.db_harness.write() {
248            *slot = Some(harness.as_str().to_string());
249        }
250    }
251
252    pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
253        if let Ok(mut slot) = self.inner.db_pool.write() {
254            *slot = Some(conn);
255        }
256    }
257
258    pub fn clear_db_pool(&self) {
259        if let Ok(mut slot) = self.inner.db_pool.write() {
260            *slot = None;
261        }
262    }
263
264    /// Install the output-compression callback. Called by `main.rs` after
265    /// `AppContext` is constructed so that snapshot/completion paths can
266    /// invoke `compress::compress_with_registry` without holding a context
267    /// reference. When called multiple times, the latest installation wins.
268    pub fn set_compressor<F>(&self, compressor: F)
269    where
270        F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
271    {
272        self.set_compressor_with_exit_code(move |command, output, _exit_code| {
273            compressor(command, output)
274        });
275    }
276
277    pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
278    where
279        F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
280    {
281        if let Ok(mut slot) = self.inner.compressor.lock() {
282            *slot = Some(Box::new(compressor));
283        }
284    }
285
286    /// Apply the installed compressor (if any) to `output`. Returns `output`
287    /// untouched when no compressor is installed.
288    pub(crate) fn compress_output(
289        &self,
290        command: &str,
291        output: String,
292        exit_code: Option<i32>,
293    ) -> CompressionResult {
294        let Ok(slot) = self.inner.compressor.lock() else {
295            return CompressionResult::new(output);
296        };
297        match slot.as_ref() {
298            Some(compressor) => compressor(command, output, exit_code),
299            None => CompressionResult::new(output),
300        }
301    }
302
303    fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
304        let (metadata, buffer) = {
305            let state = task.state.lock().ok()?;
306            if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
307                return None;
308            }
309            if let Some(cache) = state.terminal_output_cache.clone() {
310                return Some(cache);
311            }
312            (state.metadata.clone(), state.buffer.clone())
313        };
314
315        let mut cap_buffer = buffer.clone();
316        cap_buffer.enforce_terminal_cap();
317        let cache = self.render_terminal_output(&metadata, &buffer);
318        let mut state = task.state.lock().ok()?;
319        if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
320            return None;
321        }
322        if let Some(existing) = state.terminal_output_cache.clone() {
323            return Some(existing);
324        }
325        state.terminal_output_cache = Some(cache.clone());
326        Some(cache)
327    }
328
329    fn render_terminal_output(
330        &self,
331        metadata: &PersistedTask,
332        buffer: &BgBuffer,
333    ) -> TerminalOutputCache {
334        if metadata.mode == BgMode::Pty {
335            return TerminalOutputCache {
336                output_preview: String::new(),
337                output_truncated: false,
338                kind: TerminalOutputKind::Raw,
339                output_path: buffer.output_path().map(|path| path.display().to_string()),
340                stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
341                recovery: None,
342            };
343        }
344
345        if let Some(structured) = render_structured_output(&metadata.command, buffer) {
346            return structured;
347        }
348
349        if !metadata.compressed {
350            return render_raw_passthrough(buffer);
351        }
352
353        let raw = buffer.read_combined_head_tail(
354            COMPRESS_INPUT_CAP_BYTES,
355            COMPRESS_INPUT_HEAD_BYTES,
356            COMPRESS_INPUT_TAIL_BYTES,
357        );
358        let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
359        render_compressed_with_recovery(buffer, compressed, raw.truncated)
360    }
361
362    fn snapshot_with_terminal_cache(
363        &self,
364        task: &Arc<BgTask>,
365        preview_bytes: usize,
366    ) -> BgTaskSnapshot {
367        let mut snapshot = task.snapshot(preview_bytes);
368        self.maybe_compress_snapshot(task, &mut snapshot);
369        snapshot
370    }
371
372    fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
373        let (metadata, buffer) = {
374            let state = task
375                .state
376                .lock()
377                .map_err(|_| "background task lock poisoned".to_string())?;
378            if !state.metadata.status.is_terminal() {
379                return Ok(());
380            }
381            (state.metadata.clone(), state.buffer.clone())
382        };
383
384        let mut cap_buffer = buffer.clone();
385        cap_buffer.enforce_terminal_cap();
386        let cache = self.ensure_terminal_output_cache(task);
387        self.enqueue_completion_from_parts(
388            &metadata,
389            Some(&buffer),
390            None,
391            emit_frame,
392            cache.as_ref(),
393        );
394        Ok(())
395    }
396
397    fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
398        write_task(&paths.json, metadata)?;
399        self.dual_write_task(paths, metadata);
400        Ok(())
401    }
402
403    fn update_task_metadata<F>(
404        &self,
405        paths: &TaskPaths,
406        update: F,
407    ) -> std::io::Result<PersistedTask>
408    where
409        F: FnOnce(&mut PersistedTask),
410    {
411        let metadata = update_task(&paths.json, update)?;
412        self.dual_write_task(paths, &metadata);
413        Ok(metadata)
414    }
415
416    fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
417        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
418        let Some(pool) = pool else {
419            return;
420        };
421        let harness = self
422            .inner
423            .db_harness
424            .read()
425            .ok()
426            .and_then(|slot| slot.clone());
427        let Some(harness) = harness else {
428            crate::slog_warn!(
429                "dual-write bash_task to DB skipped for {}: harness not configured",
430                metadata.task_id
431            );
432            return;
433        };
434        let row = match metadata.to_bash_task_row(&harness, paths) {
435            Ok(row) => row,
436            Err(error) => {
437                crate::slog_warn!(
438                    "dual-write bash_task to DB failed for {}: {}",
439                    metadata.task_id,
440                    error
441                );
442                return;
443            }
444        };
445        let conn = match pool.lock() {
446            Ok(conn) => conn,
447            Err(_) => {
448                crate::slog_warn!(
449                    "dual-write bash_task to DB failed for {}: db mutex poisoned",
450                    metadata.task_id
451                );
452                return;
453            }
454        };
455        if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
456            crate::slog_warn!(
457                "dual-write bash_task to DB failed for {}: {}",
458                metadata.task_id,
459                error
460            );
461        }
462    }
463
464    pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
465        self.inner
466            .long_running_reminder_enabled
467            .store(enabled, Ordering::SeqCst);
468        self.inner
469            .long_running_reminder_interval_ms
470            .store(interval_ms, Ordering::SeqCst);
471    }
472
473    #[cfg(unix)]
474    #[allow(clippy::too_many_arguments)]
475    pub fn spawn(
476        &self,
477        command: &str,
478        session_id: String,
479        workdir: PathBuf,
480        env: HashMap<String, String>,
481        timeout: Option<Duration>,
482        storage_dir: PathBuf,
483        max_running: usize,
484        notify_on_completion: bool,
485        compressed: bool,
486        project_root: Option<PathBuf>,
487    ) -> Result<String, String> {
488        self.start_watchdog();
489
490        let running = self.running_count();
491        if running >= max_running {
492            return Err(format!(
493                "background bash task limit exceeded: {running} running (max {max_running})"
494            ));
495        }
496
497        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
498        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
499        let task_id = self.generate_unique_task_id()?;
500        let paths = task_paths(&storage_dir, &session_id, &task_id);
501        fs::create_dir_all(&paths.dir)
502            .map_err(|e| format!("failed to create background task dir: {e}"))?;
503
504        let mut metadata = PersistedTask::starting(
505            task_id.clone(),
506            session_id.clone(),
507            command.to_string(),
508            workdir.clone(),
509            project_root,
510            timeout_ms,
511            notify_on_completion,
512            compressed,
513        );
514        self.persist_task(&paths, &metadata)
515            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
516
517        // Pre-create capture files so the watchdog/buffer can always
518        // open them for reading. The spawn helper opens its own handles
519        // per attempt because each `Command::spawn()` consumes them.
520        create_capture_file(&paths.stdout)
521            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
522        create_capture_file(&paths.stderr)
523            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
524
525        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
526            Ok(child) => child,
527            Err(error) => {
528                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
529                let _ = delete_task_bundle(&paths);
530                return Err(error);
531            }
532        };
533
534        let child_pid = child.id();
535        metadata.mark_running(child_pid, child_pid as i32);
536        self.persist_task(&paths, &metadata)
537            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
538
539        let task = Arc::new(BgTask {
540            task_id: task_id.clone(),
541            session_id,
542            paths: paths.clone(),
543            started: Instant::now(),
544            last_reminder_at: Mutex::new(None),
545            terminal_at: Mutex::new(None),
546            state: Mutex::new(BgTaskState {
547                metadata,
548                runtime: TaskRuntime::Piped(Some(child)),
549                detached: false,
550                child_exit_observed: false,
551                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
552                terminal_output_cache: None,
553                pending_terminal_override: None,
554            }),
555        });
556
557        self.inner
558            .tasks
559            .lock()
560            .map_err(|_| "background task registry lock poisoned".to_string())?
561            .insert(task_id.clone(), task);
562
563        Ok(task_id)
564    }
565
566    #[allow(clippy::too_many_arguments)]
567    pub fn spawn_pty(
568        &self,
569        command: &str,
570        session_id: String,
571        workdir: PathBuf,
572        env: HashMap<String, String>,
573        timeout: Option<Duration>,
574        storage_dir: PathBuf,
575        max_running: usize,
576        notify_on_completion: bool,
577        compressed: bool,
578        project_root: Option<PathBuf>,
579        rows: u16,
580        cols: u16,
581    ) -> Result<String, String> {
582        self.start_watchdog();
583
584        let running = self.running_count();
585        if running >= max_running {
586            return Err(format!(
587                "background bash task limit exceeded: {running} running (max {max_running})"
588            ));
589        }
590
591        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
592        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
593        let task_id = self.generate_unique_task_id()?;
594        let paths = task_paths(&storage_dir, &session_id, &task_id);
595        fs::create_dir_all(&paths.dir)
596            .map_err(|e| format!("failed to create background task dir: {e}"))?;
597
598        let mut metadata = PersistedTask::starting(
599            task_id.clone(),
600            session_id.clone(),
601            command.to_string(),
602            workdir.clone(),
603            project_root,
604            timeout_ms,
605            notify_on_completion,
606            compressed,
607        );
608        metadata.mode = BgMode::Pty;
609        metadata.pty_rows = Some(rows);
610        metadata.pty_cols = Some(cols);
611        self.persist_task(&paths, &metadata)
612            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
613        create_capture_file(&paths.pty)
614            .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
615
616        let runtime = match spawn_pty_for_command(
617            &task_id,
618            &session_id,
619            command,
620            &paths,
621            &workdir,
622            &env,
623            rows,
624            cols,
625            self.inner.wake_tx.clone(),
626        ) {
627            Ok(runtime) => runtime,
628            Err(error) => {
629                crate::slog_warn!(
630                    "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
631                );
632                let _ = delete_task_bundle(&paths);
633                return Err(error);
634            }
635        };
636
637        if let Some(child_pid) = runtime.child_pid {
638            metadata.mark_running(child_pid, child_pid as i32);
639        } else {
640            metadata.status = BgTaskStatus::Running;
641            metadata.pgid = None;
642        }
643        self.persist_task(&paths, &metadata)
644            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
645
646        let task = Arc::new(BgTask {
647            task_id: task_id.clone(),
648            session_id,
649            paths: paths.clone(),
650            started: Instant::now(),
651            last_reminder_at: Mutex::new(None),
652            terminal_at: Mutex::new(None),
653            state: Mutex::new(BgTaskState {
654                metadata,
655                runtime: TaskRuntime::Pty(Some(runtime)),
656                detached: false,
657                child_exit_observed: false,
658                buffer: BgBuffer::pty(paths.pty.clone()),
659                terminal_output_cache: None,
660                pending_terminal_override: None,
661            }),
662        });
663
664        self.inner
665            .tasks
666            .lock()
667            .map_err(|_| "background task registry lock poisoned".to_string())?
668            .insert(task_id.clone(), task);
669
670        Ok(task_id)
671    }
672
673    #[cfg(windows)]
674    #[allow(clippy::too_many_arguments)]
675    pub fn spawn(
676        &self,
677        command: &str,
678        session_id: String,
679        workdir: PathBuf,
680        env: HashMap<String, String>,
681        timeout: Option<Duration>,
682        storage_dir: PathBuf,
683        max_running: usize,
684        notify_on_completion: bool,
685        compressed: bool,
686        project_root: Option<PathBuf>,
687    ) -> Result<String, String> {
688        self.start_watchdog();
689
690        let running = self.running_count();
691        if running >= max_running {
692            return Err(format!(
693                "background bash task limit exceeded: {running} running (max {max_running})"
694            ));
695        }
696
697        let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
698        let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
699        let task_id = self.generate_unique_task_id()?;
700        let paths = task_paths(&storage_dir, &session_id, &task_id);
701        fs::create_dir_all(&paths.dir)
702            .map_err(|e| format!("failed to create background task dir: {e}"))?;
703
704        let mut metadata = PersistedTask::starting(
705            task_id.clone(),
706            session_id.clone(),
707            command.to_string(),
708            workdir.clone(),
709            project_root,
710            timeout_ms,
711            notify_on_completion,
712            compressed,
713        );
714        self.persist_task(&paths, &metadata)
715            .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
716
717        // Capture files are pre-created so the watchdog/buffer can always
718        // open them for reading even if the child hasn't written anything
719        // yet. The spawn helper opens its own handles per attempt because
720        // each `Command::spawn()` consumes them, and on Windows we may
721        // retry across multiple shell candidates if the first one fails.
722        create_capture_file(&paths.stdout)
723            .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
724        create_capture_file(&paths.stderr)
725            .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
726
727        let child = match spawn_detached_child(command, &paths, &workdir, &env) {
728            Ok(child) => child,
729            Err(error) => {
730                crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
731                let _ = delete_task_bundle(&paths);
732                return Err(error);
733            }
734        };
735
736        let child_pid = child.id();
737        metadata.status = BgTaskStatus::Running;
738        metadata.child_pid = Some(child_pid);
739        metadata.pgid = None;
740        self.persist_task(&paths, &metadata)
741            .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
742
743        let task = Arc::new(BgTask {
744            task_id: task_id.clone(),
745            session_id,
746            paths: paths.clone(),
747            started: Instant::now(),
748            last_reminder_at: Mutex::new(None),
749            terminal_at: Mutex::new(None),
750            state: Mutex::new(BgTaskState {
751                metadata,
752                runtime: TaskRuntime::Piped(Some(child)),
753                detached: false,
754                child_exit_observed: false,
755                buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
756                terminal_output_cache: None,
757                pending_terminal_override: None,
758            }),
759        });
760
761        self.inner
762            .tasks
763            .lock()
764            .map_err(|_| "background task registry lock poisoned".to_string())?
765            .insert(task_id.clone(), task);
766
767        Ok(task_id)
768    }
769
770    pub fn write_pty(
771        &self,
772        task_id: &str,
773        session_id: &str,
774        input: &[u8],
775    ) -> Result<usize, String> {
776        let task = self
777            .task_for_session(task_id, session_id)
778            .ok_or_else(|| "task_not_found".to_string())?;
779
780        let writer = {
781            let state = task
782                .state
783                .lock()
784                .map_err(|_| "background task lock poisoned".to_string())?;
785            if state.metadata.mode != BgMode::Pty {
786                return Err("task_not_pty".to_string());
787            }
788            if state.metadata.status.is_terminal() {
789                return Err("task_exited".to_string());
790            }
791            match &state.runtime {
792                TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
793                TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
794                TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
795            }
796        };
797
798        let mut writer = writer
799            .lock()
800            .map_err(|_| "PTY writer lock poisoned".to_string())?;
801        writer
802            .write_all(input)
803            .map_err(|error| format!("failed to write to PTY: {error}"))?;
804        writer
805            .flush()
806            .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
807        Ok(input.len())
808    }
809
810    pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
811        self.replay_session_inner(storage_dir, session_id, None)
812    }
813
814    pub fn replay_session_for_project(
815        &self,
816        storage_dir: &Path,
817        session_id: &str,
818        project_root: &Path,
819    ) -> Result<(), String> {
820        self.replay_session_inner(storage_dir, session_id, Some(project_root))
821    }
822
823    fn replay_session_inner(
824        &self,
825        storage_dir: &Path,
826        session_id: &str,
827        project_root: Option<&Path>,
828    ) -> Result<(), String> {
829        self.start_watchdog();
830        if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
831            if let Err(error) = self.maybe_gc_persisted(storage_dir) {
832                crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
833            }
834        }
835
836        let canonical_project = project_root.map(canonicalized_path);
837        // Replay strategy: DB is the post-v0.27 source of truth. Disk
838        // fallback handles pre-v0.27 tasks that haven't been migrated and
839        // the cold-start `__default__` namespace (configure runs before any
840        // user session exists, so plugin-init triggers a session-less DB
841        // lookup that will be empty until a real session writes a task).
842        //
843        // We deliberately keep the empty-DB / empty-disk path silent — it's
844        // the normal startup case and would otherwise fire on every configure
845        // (see GitHub user report against v0.27.0). INFO-level logs only when
846        // disk actually returned tasks (real migration signal); WARN when the
847        // DB lookup itself errored.
848        let tasks = match self.replay_session_from_db(session_id) {
849            Some(Ok(tasks)) if !tasks.is_empty() => tasks,
850            Some(Ok(_)) => {
851                let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
852                if !disk_tasks.is_empty() {
853                    crate::slog_info!(
854                        "bash task replay: 0 in DB for session {}, {} from disk fallback",
855                        session_id,
856                        disk_tasks.len()
857                    );
858                }
859                disk_tasks
860            }
861            Some(Err(error)) => {
862                crate::slog_warn!(
863                    "bash task replay DB lookup failed for session {}; falling back to disk: {}",
864                    session_id,
865                    error
866                );
867                self.replay_session_from_disk(storage_dir, session_id)?
868            }
869            None => {
870                // DB pool unconfigured — common in tests + before harness is set.
871                self.replay_session_from_disk(storage_dir, session_id)?
872            }
873        };
874
875        for mut metadata in tasks {
876            if metadata.session_id != session_id {
877                continue;
878            }
879            if let Some(canonical_project) = canonical_project.as_deref() {
880                let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
881                if metadata_project.as_deref() != Some(canonical_project) {
882                    continue;
883                }
884            }
885
886            let paths = task_paths(storage_dir, session_id, &metadata.task_id);
887            match metadata.status {
888                BgTaskStatus::Starting => {
889                    let completion_was_delivered = metadata.completion_delivered;
890                    metadata.mark_terminal(
891                        BgTaskStatus::Failed,
892                        None,
893                        Some("spawn aborted".to_string()),
894                    );
895                    metadata.completion_delivered |= completion_was_delivered;
896                    let _ = self.persist_task(&paths, &metadata);
897                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
898                    self.insert_rehydrated_task(metadata, paths, true)?;
899                }
900                BgTaskStatus::Running | BgTaskStatus::Killing => {
901                    if metadata.mode == BgMode::Pty {
902                        if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
903                            let completion_was_delivered = metadata.completion_delivered;
904                            metadata = terminal_metadata_from_marker(metadata, marker, None);
905                            metadata.completion_delivered |= completion_was_delivered;
906                            let _ = self.persist_task(&paths, &metadata);
907                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
908                            self.insert_rehydrated_task(metadata, paths, true)?;
909                        } else if metadata.status.is_terminal() {
910                            self.insert_rehydrated_task(metadata, paths, true)?;
911                        } else {
912                            let completion_was_delivered = metadata.completion_delivered;
913                            metadata.mark_terminal(
914                                BgTaskStatus::Killed,
915                                None,
916                                Some("pty_lost_on_bridge_restart".to_string()),
917                            );
918                            metadata.completion_delivered |= completion_was_delivered;
919                            let _ = self.persist_task(&paths, &metadata);
920                            self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
921                            self.insert_rehydrated_task(metadata, paths, true)?;
922                        }
923                    } else if self.running_metadata_is_stale(&metadata) {
924                        let completion_was_delivered = metadata.completion_delivered;
925                        metadata.mark_terminal(
926                            BgTaskStatus::Killed,
927                            None,
928                            Some("orphaned (>24h)".to_string()),
929                        );
930                        metadata.completion_delivered |= completion_was_delivered;
931                        if !paths.exit.exists() {
932                            let _ = write_kill_marker_if_absent(&paths.exit);
933                        }
934                        let _ = self.persist_task(&paths, &metadata);
935                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
936                        self.insert_rehydrated_task(metadata, paths, true)?;
937                    } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
938                        let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
939                            "recovered from inconsistent killing state on replay".to_string()
940                        });
941                        if reason.is_some() {
942                            crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
943                            metadata.task_id);
944                        }
945                        let completion_was_delivered = metadata.completion_delivered;
946                        metadata = terminal_metadata_from_marker(metadata, marker, reason);
947                        metadata.completion_delivered |= completion_was_delivered;
948                        let _ = self.persist_task(&paths, &metadata);
949                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
950                        self.insert_rehydrated_task(metadata, paths, true)?;
951                    } else if metadata.status == BgTaskStatus::Killing {
952                        if !paths.exit.exists() {
953                            let _ = write_kill_marker_if_absent(&paths.exit);
954                        }
955                        let completion_was_delivered = metadata.completion_delivered;
956                        metadata.mark_terminal(
957                            BgTaskStatus::Killed,
958                            None,
959                            Some("recovered from inconsistent killing state on replay".to_string()),
960                        );
961                        metadata.completion_delivered |= completion_was_delivered;
962                        let _ = self.persist_task(&paths, &metadata);
963                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
964                        self.insert_rehydrated_task(metadata, paths, true)?;
965                    } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
966                        let completion_was_delivered = metadata.completion_delivered;
967                        metadata.mark_terminal(
968                            BgTaskStatus::Failed,
969                            None,
970                            Some("process exited without exit marker".to_string()),
971                        );
972                        metadata.completion_delivered |= completion_was_delivered;
973                        let _ = self.persist_task(&paths, &metadata);
974                        self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
975                        self.insert_rehydrated_task(metadata, paths, true)?;
976                    } else {
977                        self.insert_rehydrated_task(metadata, paths, true)?;
978                    }
979                }
980                _ if metadata.status.is_terminal() => {
981                    // Borrow `paths` for the completion enqueue BEFORE
982                    // `insert_rehydrated_task` consumes it. The completion
983                    // helper only reads from `paths` (stdout/stderr/exit) to
984                    // reconstruct a tail preview, so it must see the same
985                    // paths the rehydrated task will own.
986                    self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
987                    self.insert_rehydrated_task(metadata, paths, true)?;
988                }
989                _ => {}
990            }
991        }
992
993        Ok(())
994    }
995
996    fn replay_session_from_db(
997        &self,
998        session_id: &str,
999    ) -> Option<Result<Vec<PersistedTask>, String>> {
1000        let pool = self
1001            .inner
1002            .db_pool
1003            .read()
1004            .ok()
1005            .and_then(|slot| slot.clone())?;
1006        let harness = self
1007            .inner
1008            .db_harness
1009            .read()
1010            .ok()
1011            .and_then(|slot| slot.clone())?;
1012        let conn = match pool.lock() {
1013            Ok(conn) => conn,
1014            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1015        };
1016        Some(
1017            crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1018                .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1019                .map_err(|error| error.to_string()),
1020        )
1021    }
1022
1023    fn replay_session_from_disk(
1024        &self,
1025        storage_dir: &Path,
1026        session_id: &str,
1027    ) -> Result<Vec<PersistedTask>, String> {
1028        let dir = session_tasks_dir(storage_dir, session_id);
1029        if !dir.exists() {
1030            return Ok(Vec::new());
1031        }
1032
1033        let entries = fs::read_dir(&dir)
1034            .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1035        let mut tasks = Vec::new();
1036        for entry in entries.flatten() {
1037            let path = entry.path();
1038            if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1039                continue;
1040            }
1041            match read_task(&path) {
1042                Ok(metadata) => tasks.push(metadata),
1043                Err(error) => {
1044                    crate::slog_warn!(
1045                        "quarantining invalid background task metadata {} during replay: {error}",
1046                        path.display()
1047                    );
1048                    if let Err(quarantine_error) =
1049                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1050                    {
1051                        crate::slog_warn!(
1052                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1053                            path.display()
1054                        );
1055                    }
1056                }
1057            }
1058        }
1059        Ok(tasks)
1060    }
1061
1062    pub fn register_watch(
1063        &self,
1064        task_id: String,
1065        pattern: WatchPattern,
1066        once: bool,
1067    ) -> Result<String, &'static str> {
1068        let task = self.task(&task_id).ok_or("task_not_found")?;
1069        let (mode, terminal_at_registration, stdout, stderr, pty) = task
1070            .state
1071            .lock()
1072            .map(|state| {
1073                (
1074                    state.metadata.mode.clone(),
1075                    state.metadata.status.is_terminal(),
1076                    task.paths.stdout.clone(),
1077                    task.paths.stderr.clone(),
1078                    task.paths.pty.clone(),
1079                )
1080            })
1081            .map_err(|_| "background_task_lock_poisoned")?;
1082
1083        let mut terminal_matches = Vec::new();
1084        let scanned_terminal = terminal_at_registration;
1085        let watch_id = {
1086            let mut registry = self
1087                .inner
1088                .watch_registry
1089                .lock()
1090                .map_err(|_| "watch_registry_poisoned")?;
1091            let watch_id = registry.register(task_id.clone(), pattern, once)?;
1092            match &mode {
1093                BgMode::Pipes => {
1094                    let stdout_key = format!("{task_id}:stdout");
1095                    let stderr_key = format!("{task_id}:stderr");
1096                    if terminal_at_registration {
1097                        registry.set_file_cursor(&stdout_key, 0);
1098                        registry.set_file_cursor(&stderr_key, 0);
1099                        terminal_matches.extend(registry.scan_file_new_bytes(
1100                            &stdout_key,
1101                            &task_id,
1102                            &stdout,
1103                        ));
1104                        terminal_matches.extend(registry.scan_file_new_bytes(
1105                            &stderr_key,
1106                            &task_id,
1107                            &stderr,
1108                        ));
1109                    } else {
1110                        registry.prime_file_cursor(&stdout_key, &stdout);
1111                        registry.prime_file_cursor(&stderr_key, &stderr);
1112                    }
1113                }
1114                BgMode::Pty => {
1115                    let pty_key = format!("{task_id}:pty");
1116                    if terminal_at_registration {
1117                        registry.set_file_cursor(&pty_key, 0);
1118                        terminal_matches
1119                            .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1120                    } else {
1121                        registry.prime_file_cursor(&pty_key, &pty);
1122                    }
1123                }
1124            }
1125            watch_id
1126        };
1127
1128        if task.is_terminal() {
1129            if !scanned_terminal {
1130                terminal_matches = {
1131                    let mut registry = self
1132                        .inner
1133                        .watch_registry
1134                        .lock()
1135                        .map_err(|_| "watch_registry_poisoned")?;
1136                    match &mode {
1137                        BgMode::Pipes => {
1138                            let stdout_key = format!("{task_id}:stdout");
1139                            let stderr_key = format!("{task_id}:stderr");
1140                            registry.set_file_cursor(&stdout_key, 0);
1141                            registry.set_file_cursor(&stderr_key, 0);
1142                            let mut matches =
1143                                registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1144                            matches.extend(registry.scan_file_new_bytes(
1145                                &stderr_key,
1146                                &task_id,
1147                                &stderr,
1148                            ));
1149                            matches
1150                        }
1151                        BgMode::Pty => {
1152                            let pty_key = format!("{task_id}:pty");
1153                            registry.set_file_cursor(&pty_key, 0);
1154                            registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1155                        }
1156                    }
1157                };
1158            }
1159
1160            let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1161            if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1162                if watch_matched {
1163                    let _ = task.set_completion_delivered(true, self);
1164                    self.clear_task_watch_state(&task_id);
1165                }
1166                return Ok(watch_id);
1167            }
1168
1169            let completion = self
1170                .remove_pending_completion(&task_id)
1171                .or_else(|| self.completion_snapshot_for_task(&task));
1172            if terminal_matches.is_empty() {
1173                if let Some(completion) = completion.as_ref() {
1174                    self.emit_bash_watch_exit(completion);
1175                }
1176            } else {
1177                for pattern_match in terminal_matches {
1178                    self.emit_bash_pattern_match(&task.session_id, pattern_match);
1179                }
1180            }
1181            let _ = task.set_completion_delivered(true, self);
1182            self.clear_task_watch_state(&task_id);
1183        }
1184
1185        Ok(watch_id)
1186    }
1187
1188    pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1189        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1190            registry.unregister(task_id, watch_id);
1191        }
1192    }
1193
1194    pub fn active_watch_count(&self, task_id: &str) -> usize {
1195        self.inner
1196            .watch_registry
1197            .lock()
1198            .map(|registry| registry.active_count(task_id))
1199            .unwrap_or(0)
1200    }
1201
1202    fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1203        self.inner
1204            .watch_registry
1205            .lock()
1206            .map(|registry| {
1207                (
1208                    registry.has_controlled_task(task_id),
1209                    registry.has_matched_task(task_id),
1210                )
1211            })
1212            .unwrap_or((false, false))
1213    }
1214
1215    fn task_has_watch_control(&self, task_id: &str) -> bool {
1216        self.inner
1217            .watch_registry
1218            .lock()
1219            .map(|registry| registry.has_controlled_task(task_id))
1220            .unwrap_or(false)
1221    }
1222
1223    fn clear_task_watch_state(&self, task_id: &str) {
1224        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1225            registry.clear_task(task_id);
1226        }
1227    }
1228
1229    pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1230        let (mode, stdout, stderr, pty) = match task.state.lock() {
1231            Ok(state) => (
1232                state.metadata.mode.clone(),
1233                task.paths.stdout.clone(),
1234                task.paths.stderr.clone(),
1235                task.paths.pty.clone(),
1236            ),
1237            Err(_) => return,
1238        };
1239        let mut matches = Vec::new();
1240        if let Ok(mut registry) = self.inner.watch_registry.lock() {
1241            match mode {
1242                BgMode::Pipes => {
1243                    let stdout_key = format!("{}:stdout", task.task_id);
1244                    let stderr_key = format!("{}:stderr", task.task_id);
1245                    matches.extend(registry.scan_file_new_bytes(
1246                        &stdout_key,
1247                        &task.task_id,
1248                        &stdout,
1249                    ));
1250                    matches.extend(registry.scan_file_new_bytes(
1251                        &stderr_key,
1252                        &task.task_id,
1253                        &stderr,
1254                    ));
1255                }
1256                BgMode::Pty => {
1257                    let pty_key = format!("{}:pty", task.task_id);
1258                    matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1259                }
1260            }
1261        }
1262        for pattern_match in matches {
1263            self.emit_bash_pattern_match(&task.session_id, pattern_match);
1264        }
1265    }
1266
1267    pub fn status(
1268        &self,
1269        task_id: &str,
1270        session_id: &str,
1271        project_root: Option<&Path>,
1272        storage_dir: Option<&Path>,
1273        preview_bytes: usize,
1274    ) -> Option<BgTaskSnapshot> {
1275        let mut task = self.task_for_session(task_id, session_id);
1276        if task.is_none() {
1277            if let Some(storage_dir) = storage_dir {
1278                let _ = self.replay_session(storage_dir, session_id);
1279                task = self.task_for_session(task_id, session_id);
1280            }
1281        }
1282        let Some(task) = task else {
1283            return self.status_relaxed(
1284                task_id,
1285                session_id,
1286                project_root?,
1287                storage_dir?,
1288                preview_bytes,
1289            );
1290        };
1291        let _ = self.poll_task(&task);
1292        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1293    }
1294
1295    fn status_relaxed_task(
1296        &self,
1297        task_id: &str,
1298        project_root: &Path,
1299        storage_dir: &Path,
1300    ) -> Option<Arc<BgTask>> {
1301        let canonical_project = canonicalized_path(project_root);
1302        match self.lookup_relaxed_task_from_db(task_id, project_root) {
1303            Some(Ok(Some(metadata))) => {
1304                if let Some(task) = self.task(task_id) {
1305                    let matches_project = task
1306                        .state
1307                        .lock()
1308                        .map(|state| {
1309                            state
1310                                .metadata
1311                                .project_root
1312                                .as_deref()
1313                                .map(canonicalized_path)
1314                                .as_deref()
1315                                == Some(canonical_project.as_path())
1316                        })
1317                        .unwrap_or(false);
1318                    return matches_project.then_some(task);
1319                }
1320                let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1321                if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1322                    return None;
1323                }
1324                return self.task(task_id);
1325            }
1326            Some(Ok(None)) => {
1327                crate::slog_info!(
1328                    "bash task relaxed DB miss for {}; falling back to disk",
1329                    task_id
1330                );
1331            }
1332            Some(Err(error)) => {
1333                crate::slog_warn!(
1334                    "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1335                    task_id,
1336                    error
1337                );
1338            }
1339            None => {
1340                crate::slog_info!(
1341                    "bash task relaxed DB unavailable for {}; falling back to disk",
1342                    task_id
1343                );
1344            }
1345        }
1346        let root = storage_dir.join("bash-tasks");
1347        let entries = fs::read_dir(&root).ok()?;
1348        for entry in entries.flatten() {
1349            let dir = entry.path();
1350            if !dir.is_dir() {
1351                continue;
1352            }
1353            let path = dir.join(format!("{task_id}.json"));
1354            if !path.exists() {
1355                continue;
1356            }
1357            let metadata = match read_task(&path) {
1358                Ok(metadata) => metadata,
1359                Err(error) => {
1360                    crate::slog_warn!(
1361                        "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1362                        path.display()
1363                    );
1364                    if let Err(quarantine_error) =
1365                        quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1366                    {
1367                        crate::slog_warn!(
1368                            "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1369                            path.display()
1370                        );
1371                    }
1372                    continue;
1373                }
1374            };
1375            let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1376            if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1377                continue;
1378            }
1379            if let Some(task) = self.task(task_id) {
1380                let matches_project = task
1381                    .state
1382                    .lock()
1383                    .map(|state| {
1384                        state
1385                            .metadata
1386                            .project_root
1387                            .as_deref()
1388                            .map(canonicalized_path)
1389                            .as_deref()
1390                            == Some(canonical_project.as_path())
1391                    })
1392                    .unwrap_or(false);
1393                return matches_project.then_some(task);
1394            }
1395            let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1396            if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1397                return None;
1398            }
1399            return self.task(task_id);
1400        }
1401        None
1402    }
1403
1404    fn lookup_relaxed_task_from_db(
1405        &self,
1406        task_id: &str,
1407        project_root: &Path,
1408    ) -> Option<Result<Option<PersistedTask>, String>> {
1409        let pool = self
1410            .inner
1411            .db_pool
1412            .read()
1413            .ok()
1414            .and_then(|slot| slot.clone())?;
1415        let harness = self
1416            .inner
1417            .db_harness
1418            .read()
1419            .ok()
1420            .and_then(|slot| slot.clone())?;
1421        let conn = match pool.lock() {
1422            Ok(conn) => conn,
1423            Err(_) => return Some(Err("db mutex poisoned".to_string())),
1424        };
1425        let project_key = crate::search_index::project_cache_key(project_root);
1426        Some(
1427            crate::db::bash_tasks::find_bash_task_for_project(
1428                &conn,
1429                &harness,
1430                &project_key,
1431                task_id,
1432            )
1433            .map(|row| row.map(PersistedTask::from))
1434            .map_err(|error| error.to_string()),
1435        )
1436    }
1437
1438    pub(super) fn status_relaxed(
1439        &self,
1440        task_id: &str,
1441        _session_id: &str,
1442        project_root: &Path,
1443        storage_dir: &Path,
1444        preview_bytes: usize,
1445    ) -> Option<BgTaskSnapshot> {
1446        let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1447        let _ = self.poll_task(&task);
1448        Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1449    }
1450
1451    pub fn kill_relaxed(
1452        &self,
1453        task_id: &str,
1454        project_root: &Path,
1455        storage_dir: &Path,
1456    ) -> Result<BgTaskSnapshot, String> {
1457        let task = self
1458            .status_relaxed_task(task_id, project_root, storage_dir)
1459            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1460        self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1461    }
1462
1463    pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1464        #[cfg(test)]
1465        self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1466
1467        let mut deleted = 0usize;
1468
1469        let root = storage_dir.join("bash-tasks");
1470        if root.exists() {
1471            let session_dirs = fs::read_dir(&root).map_err(|e| {
1472                format!(
1473                    "failed to read background task root {}: {e}",
1474                    root.display()
1475                )
1476            })?;
1477            for session_entry in session_dirs.flatten() {
1478                let session_dir = session_entry.path();
1479                if !session_dir.is_dir() {
1480                    continue;
1481                }
1482                let task_entries = match fs::read_dir(&session_dir) {
1483                    Ok(entries) => entries,
1484                    Err(error) => {
1485                        crate::slog_warn!(
1486                            "failed to read background task session dir {}: {error}",
1487                            session_dir.display()
1488                        );
1489                        continue;
1490                    }
1491                };
1492                for task_entry in task_entries.flatten() {
1493                    let json_path = task_entry.path();
1494                    if json_path
1495                        .extension()
1496                        .and_then(|extension| extension.to_str())
1497                        != Some("json")
1498                    {
1499                        continue;
1500                    }
1501                    if modified_within(&json_path, PERSISTED_GC_GRACE) {
1502                        continue;
1503                    }
1504                    let metadata = match read_task(&json_path) {
1505                        Ok(metadata) => metadata,
1506                        Err(error) => {
1507                            crate::slog_warn!(
1508                                "quarantining corrupt background task metadata {}: {error}",
1509                                json_path.display()
1510                            );
1511                            quarantine_task_json(
1512                                storage_dir,
1513                                &session_dir,
1514                                &json_path,
1515                                QuarantineKind::Corrupt,
1516                            )?;
1517                            continue;
1518                        }
1519                    };
1520                    if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1521                        continue;
1522                    }
1523                    let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1524                    match delete_task_bundle(&paths) {
1525                        Ok(()) => {
1526                            deleted += 1;
1527                            log::debug!(
1528                                "deleted persisted background task bundle {}",
1529                                metadata.task_id
1530                            );
1531                        }
1532                        Err(error) => {
1533                            crate::slog_warn!(
1534                                "failed to delete background task bundle {}: {error}",
1535                                metadata.task_id
1536                            );
1537                            continue;
1538                        }
1539                    }
1540                }
1541            }
1542        }
1543        gc_quarantine(storage_dir);
1544        Ok(deleted)
1545    }
1546
1547    pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1548        let tasks = self
1549            .inner
1550            .tasks
1551            .lock()
1552            .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1553            .unwrap_or_default();
1554        tasks
1555            .into_iter()
1556            .map(|task| {
1557                let _ = self.poll_task(&task);
1558                self.snapshot_with_terminal_cache(&task, preview_bytes)
1559            })
1560            .collect()
1561    }
1562
1563    /// Replace terminal pipe snapshots with the task's cached rendered output.
1564    /// Running tasks stay raw (tail-only) so agents debugging a live process see
1565    /// exactly what it emitted. PTY tasks are explicitly excluded: their raw
1566    /// terminal bytes are rendered by the plugin's PTY path, not the line
1567    /// compressor.
1568    fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1569        if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1570            return;
1571        }
1572        if let Some(cache) = self.ensure_terminal_output_cache(task) {
1573            snapshot.output_preview = cache.output_preview;
1574            snapshot.output_truncated = cache.output_truncated;
1575        }
1576    }
1577
1578    pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1579        self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1580    }
1581
1582    pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1583        let task = self
1584            .task_for_session(task_id, session_id)
1585            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1586        let terminal_after_promote = {
1587            let mut state = task
1588                .state
1589                .lock()
1590                .map_err(|_| "background task lock poisoned".to_string())?;
1591            let updated = self
1592                .update_task_metadata(&task.paths, |metadata| {
1593                    metadata.notify_on_completion = true;
1594                    metadata.completion_delivered = false;
1595                })
1596                .map_err(|e| format!("failed to promote background task: {e}"))?;
1597            state.metadata = updated;
1598            state.metadata.status.is_terminal()
1599        };
1600        if terminal_after_promote {
1601            self.post_terminal_transition(&task, true)?;
1602        }
1603        Ok(true)
1604    }
1605
1606    pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1607        self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1608            .map(|_| ())
1609    }
1610
1611    pub fn cleanup_finished(&self, older_than: Duration) {
1612        let cutoff = Instant::now().checked_sub(older_than);
1613        let removable_paths: Vec<(String, TaskPaths)> =
1614            if let Ok(mut tasks) = self.inner.tasks.lock() {
1615                let removable = tasks
1616                    .iter()
1617                    .filter_map(|(task_id, task)| {
1618                        let delivered_terminal = task
1619                            .state
1620                            .lock()
1621                            .map(|state| {
1622                                state.metadata.status.is_terminal()
1623                                    && state.metadata.completion_delivered
1624                            })
1625                            .unwrap_or(false);
1626                        if !delivered_terminal {
1627                            return None;
1628                        }
1629
1630                        let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1631                        let expired = match (terminal_at, cutoff) {
1632                            (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1633                            (Some(_), None) => true,
1634                            (None, _) => false,
1635                        };
1636                        expired.then(|| task_id.clone())
1637                    })
1638                    .collect::<Vec<_>>();
1639
1640                removable
1641                    .into_iter()
1642                    .filter_map(|task_id| {
1643                        tasks
1644                            .remove(&task_id)
1645                            .map(|task| (task_id, task.paths.clone()))
1646                    })
1647                    .collect()
1648            } else {
1649                Vec::new()
1650            };
1651
1652        for (task_id, paths) in removable_paths {
1653            match delete_task_bundle(&paths) {
1654                Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1655                Err(error) => crate::slog_warn!(
1656                    "failed to delete persisted background task bundle {task_id}: {error}"
1657                ),
1658            }
1659        }
1660    }
1661
1662    pub fn drain_completions(&self) -> Vec<BgCompletion> {
1663        self.drain_completions_for_session(None)
1664    }
1665
1666    pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1667        let completions = match self.inner.completions.lock() {
1668            Ok(completions) => completions,
1669            Err(_) => return Vec::new(),
1670        };
1671
1672        completions
1673            .iter()
1674            .filter(|completion| completion_matches_session(completion, session_id))
1675            .cloned()
1676            .collect()
1677    }
1678
1679    pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1680        match self.inner.completions.lock() {
1681            Ok(completions) => completions
1682                .iter()
1683                .any(|completion| completion_matches_session(completion, session_id)),
1684            // Bias to safety: if the queue state cannot be inspected cheaply,
1685            // let callers take the existing drain path rather than risk
1686            // suppressing a pending completion.
1687            Err(_) => true,
1688        }
1689    }
1690
1691    pub fn ack_completions_for_session(
1692        &self,
1693        session_id: Option<&str>,
1694        task_ids: &[String],
1695    ) -> Vec<String> {
1696        if task_ids.is_empty() {
1697            return Vec::new();
1698        }
1699        let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1700        let mut completion_sessions = HashMap::new();
1701        if let Ok(mut completions) = self.inner.completions.lock() {
1702            completions.retain(|completion| {
1703                let session_matches = session_id
1704                    .map(|session_id| completion.session_id == session_id)
1705                    .unwrap_or(true);
1706                if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1707                    completion_sessions
1708                        .insert(completion.task_id.clone(), completion.session_id.clone());
1709                    false
1710                } else {
1711                    true
1712                }
1713            });
1714        }
1715
1716        let mut delivered = Vec::new();
1717        for task_id in task_ids {
1718            let task = if let Some(session_id) = session_id {
1719                self.task_for_session(task_id, session_id)
1720            } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1721                self.task_for_session(task_id, completion_session_id)
1722            } else {
1723                self.task(task_id)
1724            };
1725            if let Some(task) = task {
1726                if task.set_completion_delivered(true, self).is_ok() {
1727                    delivered.push(task_id.clone());
1728                }
1729            }
1730        }
1731
1732        delivered
1733    }
1734
1735    pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1736        self.inner
1737            .completions
1738            .lock()
1739            .map(|completions| {
1740                completions
1741                    .iter()
1742                    .filter(|completion| completion.session_id == session_id)
1743                    .cloned()
1744                    .collect()
1745            })
1746            .unwrap_or_default()
1747    }
1748
1749    fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1750        let mut completions = self.inner.completions.lock().ok()?;
1751        let idx = completions
1752            .iter()
1753            .position(|completion| completion.task_id == task_id)?;
1754        completions.remove(idx)
1755    }
1756
1757    fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1758        let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1759        if !snapshot.info.status.is_terminal() {
1760            return None;
1761        }
1762        let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1763            (String::new(), false)
1764        } else {
1765            self.ensure_terminal_output_cache(task)
1766                .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1767                .unwrap_or_else(|| (String::new(), false))
1768        };
1769        Some(BgCompletion {
1770            task_id: snapshot.info.task_id,
1771            session_id: task.session_id.clone(),
1772            status: snapshot.info.status,
1773            exit_code: snapshot.exit_code,
1774            command: snapshot.info.command,
1775            output_preview,
1776            output_truncated,
1777            original_tokens: None,
1778            compressed_tokens: None,
1779            tokens_skipped: false,
1780        })
1781    }
1782
1783    pub fn detach(&self) {
1784        self.inner.shutdown.store(true, Ordering::SeqCst);
1785        if let Ok(mut tasks) = self.inner.tasks.lock() {
1786            for task in tasks.values() {
1787                if let Ok(mut state) = task.state.lock() {
1788                    match &mut state.runtime {
1789                        TaskRuntime::Piped(child) => *child = None,
1790                        TaskRuntime::Pty(runtime) => *runtime = None,
1791                    }
1792                    state.detached = true;
1793                }
1794            }
1795            tasks.clear();
1796        }
1797    }
1798
1799    pub fn shutdown(&self) {
1800        let tasks = self
1801            .inner
1802            .tasks
1803            .lock()
1804            .map(|tasks| {
1805                tasks
1806                    .values()
1807                    .map(|task| (task.task_id.clone(), task.session_id.clone()))
1808                    .collect::<Vec<_>>()
1809            })
1810            .unwrap_or_default();
1811        for (task_id, session_id) in tasks {
1812            let _ = self.kill(&task_id, &session_id);
1813        }
1814    }
1815
1816    pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1817        if let Ok(state) = task.state.lock() {
1818            if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1819                // On Windows ConPTY, the reader may not observe EOF while the
1820                // master handle is still held in `PtyRuntime`. The waiter writes
1821                // the authoritative exit marker before setting `exit_observed`,
1822                // so once exit is observed we can finalize from that marker and
1823                // drop the runtime, which lets the reader finish. Waiting for
1824                // `reader_done && exit_observed` wedges completed PTY tasks on
1825                // Windows.
1826                if !pty.exit_observed.load(Ordering::SeqCst) {
1827                    return Ok(());
1828                }
1829            }
1830        }
1831        let marker = match read_exit_marker(&task.paths.exit) {
1832            Ok(Some(marker)) => marker,
1833            Ok(None) => return Ok(()),
1834            Err(error) => return Err(format!("failed to read exit marker: {error}")),
1835        };
1836        self.finalize_from_marker(task, marker, None)
1837    }
1838
1839    pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1840        let mut needs_completion = false;
1841        {
1842            let Ok(mut state) = task.state.lock() else {
1843                return;
1844            };
1845            match &mut state.runtime {
1846                TaskRuntime::Piped(child_slot) => {
1847                    if let Some(child) = child_slot.as_mut() {
1848                        if matches!(child.try_wait(), Ok(Some(_))) {
1849                            *child_slot = None;
1850                            state.detached = true;
1851                            state.child_exit_observed = true;
1852                        }
1853                    } else if state.detached {
1854                        let child_known_dead = state.child_exit_observed
1855                            || state
1856                                .metadata
1857                                .child_pid
1858                                .is_some_and(|pid| !is_process_alive(pid));
1859                        if child_known_dead {
1860                            needs_completion =
1861                                self.fail_without_exit_marker_if_needed(task, &mut state);
1862                        }
1863                    }
1864                }
1865                TaskRuntime::Pty(Some(pty)) => {
1866                    if pty.exit_observed.load(Ordering::SeqCst) {
1867                        drop(state);
1868                        let _ = self.poll_task(task);
1869                        return;
1870                    }
1871                }
1872                TaskRuntime::Pty(None) => {}
1873            }
1874        }
1875        if needs_completion {
1876            let _ = self.post_terminal_transition(task, true);
1877        }
1878    }
1879
1880    fn fail_without_exit_marker_if_needed(
1881        &self,
1882        task: &Arc<BgTask>,
1883        state: &mut BgTaskState,
1884    ) -> bool {
1885        if state.metadata.status.is_terminal() {
1886            return false;
1887        }
1888        if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1889            return false;
1890        }
1891        let watch_controlled = self.task_has_watch_control(&task.task_id);
1892        let updated = self.update_task_metadata(&task.paths, |metadata| {
1893            metadata.mark_terminal(
1894                BgTaskStatus::Failed,
1895                None,
1896                Some("process exited without exit marker".to_string()),
1897            );
1898            if watch_controlled {
1899                metadata.completion_delivered = true;
1900            }
1901        });
1902        if let Ok(metadata) = updated {
1903            state.pending_terminal_override = None;
1904            state.metadata = metadata;
1905            task.mark_terminal_now();
1906            return true;
1907        }
1908        false
1909    }
1910
1911    pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1912        self.inner
1913            .tasks
1914            .lock()
1915            .map(|tasks| {
1916                tasks
1917                    .values()
1918                    .filter(|task| task.is_running())
1919                    .cloned()
1920                    .collect()
1921            })
1922            .unwrap_or_default()
1923    }
1924
1925    fn insert_rehydrated_task(
1926        &self,
1927        metadata: PersistedTask,
1928        paths: TaskPaths,
1929        detached: bool,
1930    ) -> Result<(), String> {
1931        let task_id = metadata.task_id.clone();
1932        let session_id = metadata.session_id.clone();
1933        let started = started_instant_from_unix_millis(metadata.started_at);
1934        let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1935        let mode = metadata.mode.clone();
1936        let task = Arc::new(BgTask {
1937            task_id: task_id.clone(),
1938            session_id,
1939            paths: paths.clone(),
1940            started,
1941            last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1942            terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1943            state: Mutex::new(BgTaskState {
1944                metadata,
1945                runtime: if mode == BgMode::Pty {
1946                    TaskRuntime::Pty(None)
1947                } else {
1948                    TaskRuntime::Piped(None)
1949                },
1950                detached,
1951                // Replay path: we never observed the child handle's exit
1952                // in this process (the previous AFT process did, but its
1953                // observation didn't survive restart). Leave this false so
1954                // the second-pass reap falls through to the
1955                // `is_process_alive(child_pid)` probe rather than declaring
1956                // failure based on stale evidence.
1957                child_exit_observed: false,
1958                buffer: if mode == BgMode::Pty {
1959                    BgBuffer::pty(paths.pty.clone())
1960                } else {
1961                    BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1962                },
1963                terminal_output_cache: None,
1964                pending_terminal_override: None,
1965            }),
1966        });
1967        self.inner
1968            .tasks
1969            .lock()
1970            .map_err(|_| "background task registry lock poisoned".to_string())?
1971            .insert(task_id, task);
1972        Ok(())
1973    }
1974
1975    fn kill_with_status(
1976        &self,
1977        task_id: &str,
1978        session_id: &str,
1979        terminal_status: BgTaskStatus,
1980    ) -> Result<BgTaskSnapshot, String> {
1981        let task = self
1982            .task_for_session(task_id, session_id)
1983            .ok_or_else(|| format!("background task not found: {task_id}"))?;
1984        let mut terminalized = false;
1985
1986        {
1987            let mut state = task
1988                .state
1989                .lock()
1990                .map_err(|_| "background task lock poisoned".to_string())?;
1991            if state.metadata.status.is_terminal() {
1992                state.pending_terminal_override = None;
1993            } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1994                state.metadata =
1995                    terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1996                if self.task_has_watch_control(&task.task_id) {
1997                    state.metadata.completion_delivered = true;
1998                }
1999                state.pending_terminal_override = None;
2000                task.mark_terminal_now();
2001                match &mut state.runtime {
2002                    // Exit marker already present: the child finished on its
2003                    // own before this kill observed it. Reap it rather than
2004                    // dropping the handle so it doesn't become a zombie
2005                    // (issue #91). The active-kill branch below already
2006                    // `wait()`s after signaling, so this is the only kill
2007                    // path that needed the explicit reap.
2008                    TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2009                    TaskRuntime::Pty(runtime) => *runtime = None,
2010                }
2011                state.detached = true;
2012                self.persist_task(&task.paths, &state.metadata)
2013                    .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2014                terminalized = true;
2015            } else {
2016                let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2017                if !was_already_killing {
2018                    state.metadata.status = BgTaskStatus::Killing;
2019                    self.persist_task(&task.paths, &state.metadata)
2020                        .map_err(|e| format!("failed to persist killing state: {e}"))?;
2021                }
2022
2023                #[cfg(unix)]
2024                let pgid = state.metadata.pgid;
2025                #[cfg(windows)]
2026                let child_pid = state.metadata.child_pid;
2027                if !was_already_killing
2028                    && state.metadata.mode == BgMode::Pty
2029                    && terminal_status == BgTaskStatus::TimedOut
2030                {
2031                    state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2032                }
2033
2034                #[cfg(windows)]
2035                let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2036
2037                match &mut state.runtime {
2038                    TaskRuntime::Piped(child_slot) => {
2039                        #[cfg(unix)]
2040                        if let Some(pgid) = pgid {
2041                            terminate_pgid(pgid, child_slot.as_mut());
2042                        }
2043                        #[cfg(windows)]
2044                        if let Some(child) = child_slot.as_mut() {
2045                            super::process::terminate_process(child);
2046                        } else if let Some(pid) = child_pid {
2047                            terminate_pid(pid);
2048                        }
2049                        if let Some(child) = child_slot.as_mut() {
2050                            let _ = child.wait();
2051                        }
2052                        *child_slot = None;
2053                        state.detached = true;
2054
2055                        if !task.paths.exit.exists() {
2056                            write_kill_marker_if_absent(&task.paths.exit)
2057                                .map_err(|e| format!("failed to write kill marker: {e}"))?;
2058                        }
2059
2060                        let exit_code = if terminal_status == BgTaskStatus::TimedOut {
2061                            Some(124)
2062                        } else {
2063                            None
2064                        };
2065                        state
2066                            .metadata
2067                            .mark_terminal(terminal_status, exit_code, None);
2068                        if self.task_has_watch_control(&task.task_id) {
2069                            state.metadata.completion_delivered = true;
2070                        }
2071                        state.pending_terminal_override = None;
2072                        task.mark_terminal_now();
2073                        self.persist_task(&task.paths, &state.metadata)
2074                            .map_err(|e| format!("failed to persist killed state: {e}"))?;
2075                        terminalized = true;
2076                    }
2077                    TaskRuntime::Pty(Some(pty)) => {
2078                        pty.was_killed.store(true, Ordering::SeqCst);
2079                        if let Err(error) = pty.killer.kill() {
2080                            crate::slog_warn!(
2081                                "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2082                            );
2083                        }
2084                        if let Some(pid) = pty.child_pid {
2085                            #[cfg(unix)]
2086                            terminate_pgid(pid as i32, None);
2087                            #[cfg(windows)]
2088                            terminate_pid(pid);
2089                        }
2090                        drop(pty.master.take());
2091
2092                        #[cfg(windows)]
2093                        {
2094                            let default_status = if terminal_status == BgTaskStatus::TimedOut {
2095                                BgTaskStatus::TimedOut
2096                            } else {
2097                                BgTaskStatus::Killed
2098                            };
2099                            pty_forced_terminal_status = Some(
2100                                state
2101                                    .pending_terminal_override
2102                                    .take()
2103                                    .unwrap_or(default_status),
2104                            );
2105                        }
2106                    }
2107                    TaskRuntime::Pty(None) => {}
2108                }
2109
2110                #[cfg(windows)]
2111                if let Some(target_status) = pty_forced_terminal_status {
2112                    if !task.paths.exit.exists() {
2113                        write_kill_marker_if_absent(&task.paths.exit)
2114                            .map_err(|e| format!("failed to write kill marker: {e}"))?;
2115                    }
2116
2117                    let exit_code = if target_status == BgTaskStatus::TimedOut {
2118                        Some(124)
2119                    } else {
2120                        None
2121                    };
2122                    state.metadata.mark_terminal(target_status, exit_code, None);
2123                    if self.task_has_watch_control(&task.task_id) {
2124                        state.metadata.completion_delivered = true;
2125                    }
2126                    state.pending_terminal_override = None;
2127                    task.mark_terminal_now();
2128                    if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2129                        *runtime = None;
2130                    }
2131                    state.detached = true;
2132                    self.persist_task(&task.paths, &state.metadata)
2133                        .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2134                    terminalized = true;
2135                }
2136            }
2137        }
2138
2139        if terminalized {
2140            self.post_terminal_transition(&task, true)?;
2141        }
2142        Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2143    }
2144
2145    fn finalize_from_marker(
2146        &self,
2147        task: &Arc<BgTask>,
2148        marker: ExitMarker,
2149        reason: Option<String>,
2150    ) -> Result<(), String> {
2151        let watch_controlled = self.task_has_watch_control(&task.task_id);
2152        let mut pty_reader_done = None;
2153        {
2154            let mut state = task
2155                .state
2156                .lock()
2157                .map_err(|_| "background task lock poisoned".to_string())?;
2158            if state.metadata.status.is_terminal() {
2159                state.pending_terminal_override = None;
2160                return Ok(());
2161            }
2162
2163            let pending_override = state.pending_terminal_override.take();
2164            let is_pty = state.metadata.mode == BgMode::Pty;
2165            let updated = self
2166                .update_task_metadata(&task.paths, |metadata| {
2167                    let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2168                        let mut metadata = metadata.clone();
2169                        let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2170                        let exit_code = if target_status == BgTaskStatus::TimedOut {
2171                            Some(124)
2172                        } else {
2173                            None
2174                        };
2175                        metadata.mark_terminal(target_status, exit_code, reason);
2176                        metadata
2177                    } else {
2178                        terminal_metadata_from_marker(metadata.clone(), marker, reason)
2179                    };
2180                    if watch_controlled {
2181                        new_metadata.completion_delivered = true;
2182                    }
2183                    *metadata = new_metadata;
2184                })
2185                .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2186            state.metadata = updated;
2187            task.mark_terminal_now();
2188            match &mut state.runtime {
2189                // Reap the exited direct child instead of dropping it, so it
2190                // does not linger as a `<defunct>` zombie (issue #91). The
2191                // wrapper writes the exit marker as its final act, so the
2192                // child is already exiting and `wait()` returns immediately.
2193                TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2194                TaskRuntime::Pty(runtime) => {
2195                    pty_reader_done = runtime
2196                        .as_ref()
2197                        .map(|runtime| Arc::clone(&runtime.reader_done));
2198                    *runtime = None;
2199                }
2200            }
2201            state.detached = true;
2202        }
2203
2204        if let Some(reader_done) = pty_reader_done {
2205            let deadline = Instant::now() + Duration::from_millis(200);
2206            while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2207                std::thread::sleep(Duration::from_millis(10));
2208            }
2209        }
2210
2211        // One final scan runs before terminal notification routing so bytes
2212        // printed immediately before exit can win over the exit safety net.
2213        self.scan_task_watch_output(task);
2214
2215        self.post_terminal_transition(task, true)
2216    }
2217
2218    fn enqueue_completion_if_needed(
2219        &self,
2220        metadata: &PersistedTask,
2221        paths: Option<&TaskPaths>,
2222        emit_frame: bool,
2223    ) {
2224        if metadata.status.is_terminal() && !metadata.completion_delivered {
2225            let cache =
2226                paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2227            self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2228        }
2229    }
2230
2231    fn render_terminal_output_from_paths(
2232        &self,
2233        metadata: &PersistedTask,
2234        paths: &TaskPaths,
2235    ) -> Option<TerminalOutputCache> {
2236        if metadata.mode == BgMode::Pty {
2237            return None;
2238        }
2239        let buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2240        Some(self.render_terminal_output(metadata, &buffer))
2241    }
2242
2243    fn enqueue_completion_from_parts(
2244        &self,
2245        metadata: &PersistedTask,
2246        buffer: Option<&BgBuffer>,
2247        paths: Option<&TaskPaths>,
2248        emit_frame: bool,
2249        terminal_render: Option<&TerminalOutputCache>,
2250    ) {
2251        // Only the terminal-state guard prevents double-recording here. The
2252        // `completion_delivered` flag is NOT used to gate compression-event
2253        // recording, because `mark_terminal` flips `completion_delivered=true`
2254        // immediately for tasks with `notify_on_completion=false` (foreground
2255        // bash polled via `bash_status`, which is the common case). Pre-emptive
2256        // delivery flagging is correct for the push-frame queue (suppresses
2257        // duplicate user-visible notifications) but would silently skip the
2258        // database insert below. Compression event recording is idempotent at
2259        // the DB layer (unique on harness+session+task_id), so re-entry is
2260        // safe; the dedupe-by-queue check stays for the push frame side.
2261        if !metadata.status.is_terminal() {
2262            return;
2263        }
2264
2265        let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2266            paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2267        } else {
2268            None
2269        };
2270        let render_buffer = buffer.or(owned_buffer.as_ref());
2271        let owned_render = if terminal_render.is_none() {
2272            render_buffer.map(|buffer| self.render_terminal_output(metadata, buffer))
2273        } else {
2274            None
2275        };
2276        let render = terminal_render.or(owned_render.as_ref());
2277
2278        // Completion reminders use the already-rendered terminal output and a
2279        // smaller, exit-aware head+tail cap. They never invoke the compressor
2280        // themselves.
2281        let (output_preview, output_truncated) = render
2282            .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2283            .unwrap_or_else(|| (String::new(), false));
2284
2285        let token_counts = self.completion_token_counts(
2286            metadata,
2287            buffer,
2288            paths,
2289            render.map(|render| render.output_preview.as_str()),
2290        );
2291        let completion = BgCompletion {
2292            task_id: metadata.task_id.clone(),
2293            session_id: metadata.session_id.clone(),
2294            status: metadata.status.clone(),
2295            exit_code: metadata.exit_code,
2296            command: metadata.command.clone(),
2297            output_preview,
2298            output_truncated,
2299            original_tokens: token_counts.original_tokens,
2300            compressed_tokens: token_counts.compressed_tokens,
2301            tokens_skipped: token_counts.tokens_skipped,
2302        };
2303
2304        // Record the compression event BEFORE the push-frame dedupe. Event
2305        // recording has its own idempotency at the DB layer (unique key on
2306        // harness+session+task_id), so it's safe to attempt for every
2307        // terminal-state finalize. Critically, this path runs even when
2308        // `completion_delivered=true` was pre-set by `mark_terminal` for
2309        // foreground bash (`notify_on_completion=false`) — which is the common
2310        // case for OpenCode/Pi `bash` tool calls. Previously this code lived
2311        // after the dedupe guard and never fired for foreground tasks, which
2312        // meant compression accounting was effectively dead for >99% of
2313        // real-world bash usage.
2314        self.record_compression_event_if_applicable(metadata, &token_counts);
2315
2316        let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2317        if watch_controlled {
2318            if emit_frame && !watch_matched {
2319                self.emit_bash_watch_exit(&completion);
2320            }
2321            self.clear_task_watch_state(&metadata.task_id);
2322            return;
2323        }
2324
2325        // Push-frame queue is gated on `completion_delivered` so foreground
2326        // bash with `notify_on_completion=false` does not leak a user-visible
2327        // completion notification. `mark_terminal` pre-sets
2328        // `completion_delivered=true` for those tasks; honoring it here keeps
2329        // the suppression invariant the test
2330        // `no_notify_foreground_poll_completion_does_not_enqueue_completion`
2331        // asserts. The compression-event recording above intentionally runs
2332        // before this gate so foreground bash still contributes to the
2333        // session/project aggregates.
2334        if metadata.completion_delivered {
2335            return;
2336        }
2337
2338        // Push-frame queue dedupe stays per-task to prevent duplicate
2339        // user-visible completion notifications.
2340        let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2341            if completions
2342                .iter()
2343                .any(|existing| existing.task_id == metadata.task_id)
2344            {
2345                false
2346            } else {
2347                completions.push_back(completion.clone());
2348                true
2349            }
2350        } else {
2351            false
2352        };
2353
2354        if pushed && emit_frame {
2355            self.emit_bash_completed(completion);
2356        }
2357    }
2358
2359    fn record_compression_event_if_applicable(
2360        &self,
2361        metadata: &PersistedTask,
2362        token_counts: &CompletionTokenCounts,
2363    ) {
2364        if metadata.mode == BgMode::Pty {
2365            return;
2366        }
2367
2368        let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2369            token_counts.original_tokens,
2370            token_counts.compressed_tokens,
2371            token_counts.original_bytes,
2372            token_counts.compressed_bytes,
2373        ) {
2374            (
2375                Some(original_tokens),
2376                Some(compressed_tokens),
2377                Some(original_bytes),
2378                Some(compressed_bytes),
2379            ) => (
2380                original_tokens,
2381                compressed_tokens,
2382                original_bytes,
2383                compressed_bytes,
2384            ),
2385            _ => {
2386                crate::slog_warn!(
2387                    "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2388                    metadata.task_id
2389                );
2390                return;
2391            }
2392        };
2393
2394        let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2395        let Some(pool) = pool else {
2396            crate::slog_warn!(
2397                "compression event skipped for {}: db_pool not initialized — was configure run?",
2398                metadata.task_id
2399            );
2400            return;
2401        };
2402        let harness = self
2403            .inner
2404            .db_harness
2405            .read()
2406            .ok()
2407            .and_then(|slot| slot.clone());
2408        let Some(harness) = harness else {
2409            crate::slog_warn!(
2410                "compression event insert skipped for {}: harness not configured",
2411                metadata.task_id
2412            );
2413            return;
2414        };
2415
2416        let project_root = metadata
2417            .project_root
2418            .as_deref()
2419            .unwrap_or(&metadata.workdir);
2420        let project_key = crate::search_index::project_cache_key(project_root);
2421        let row = crate::db::compression_events::CompressionEventRow {
2422            harness: &harness,
2423            session_id: Some(&metadata.session_id),
2424            project_key: &project_key,
2425            tool: "bash",
2426            task_id: Some(&metadata.task_id),
2427            command: Some(&metadata.command),
2428            compressor: if metadata.compressed {
2429                "registry"
2430            } else {
2431                "none"
2432            },
2433            original_bytes,
2434            compressed_bytes,
2435            original_tokens,
2436            compressed_tokens,
2437            created_at: unix_millis() as i64,
2438        };
2439
2440        let conn = match pool.lock() {
2441            Ok(conn) => conn,
2442            Err(_) => {
2443                crate::slog_warn!(
2444                    "compression event insert failed for {}: db mutex poisoned",
2445                    metadata.task_id
2446                );
2447                return;
2448            }
2449        };
2450        match crate::db::compression_events::insert_compression_event(&conn, &row) {
2451            Ok(_) => {
2452                // DEBUG-level: each foreground bash call records one of these,
2453                // which clutters info-level logs without adding diagnostic value.
2454                // Aggregate totals are visible via the status RPC / TUI sidebar.
2455                crate::slog_debug!(
2456                    "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2457                    metadata.task_id,
2458                    project_key,
2459                    metadata.session_id,
2460                    original_tokens,
2461                    compressed_tokens
2462                );
2463            }
2464            Err(error) => {
2465                crate::slog_warn!(
2466                    "compression event insert failed for {}: {}",
2467                    metadata.task_id,
2468                    error
2469                );
2470            }
2471        }
2472    }
2473
2474    fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2475        let Ok(progress_sender) = self
2476            .inner
2477            .progress_sender
2478            .lock()
2479            .map(|sender| sender.clone())
2480        else {
2481            return;
2482        };
2483        if let Some(sender) = progress_sender.as_ref() {
2484            sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2485                pattern_match.task_id,
2486                session_id.to_string(),
2487                pattern_match.watch_id,
2488                pattern_match.match_text,
2489                pattern_match.match_offset,
2490                pattern_match.context,
2491                pattern_match.once,
2492            )));
2493        }
2494    }
2495
2496    fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2497        let Ok(progress_sender) = self
2498            .inner
2499            .progress_sender
2500            .lock()
2501            .map(|sender| sender.clone())
2502        else {
2503            return;
2504        };
2505        let Some(sender) = progress_sender.as_ref() else {
2506            return;
2507        };
2508        let status = completion_status_text(&completion.status, completion.exit_code);
2509        let preview = completion.output_preview.trim_end();
2510        let context = if preview.is_empty() {
2511            format!("task {} exited ({status})", completion.task_id)
2512        } else {
2513            format!(
2514                "task {} exited ({status})
2515{preview}",
2516                completion.task_id
2517            )
2518        };
2519        sender(PushFrame::BashPatternMatch(
2520            BashPatternMatchFrame::task_exit(
2521                completion.task_id.clone(),
2522                completion.session_id.clone(),
2523                format!("exited ({status})"),
2524                context,
2525            ),
2526        ));
2527    }
2528
2529    fn emit_bash_completed(&self, completion: BgCompletion) {
2530        let Ok(progress_sender) = self
2531            .inner
2532            .progress_sender
2533            .lock()
2534            .map(|sender| sender.clone())
2535        else {
2536            return;
2537        };
2538        let Some(sender) = progress_sender.as_ref() else {
2539            return;
2540        };
2541        // Clone the callback out of the registry mutex before writing to stdout;
2542        // otherwise a blocked push-frame write could pin the mutex and starve
2543        // unrelated progress-sender updates.
2544        // Bg task transitions are discovered by the watchdog thread, so the
2545        // sender is shared behind a Mutex. It still uses the same stdout writer
2546        // closure as foreground progress frames, preserving the existing lock/
2547        // flush behavior in main.rs.
2548        sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2549            completion.task_id,
2550            completion.session_id,
2551            completion.status,
2552            completion.exit_code,
2553            completion.command,
2554            completion.output_preview,
2555            completion.output_truncated,
2556            completion.original_tokens,
2557            completion.compressed_tokens,
2558            completion.tokens_skipped,
2559        )));
2560    }
2561
2562    fn completion_token_counts(
2563        &self,
2564        metadata: &PersistedTask,
2565        buffer: Option<&BgBuffer>,
2566        paths: Option<&TaskPaths>,
2567        rendered_output: Option<&str>,
2568    ) -> CompletionTokenCounts {
2569        if metadata.mode == BgMode::Pty {
2570            return CompletionTokenCounts::skipped();
2571        }
2572
2573        let raw = match buffer {
2574            Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2575            None => paths
2576                .map(|paths| {
2577                    read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2578                })
2579                .unwrap_or(TokenCountInput::Skipped),
2580        };
2581
2582        let TokenCountInput::Text(raw_output) = raw else {
2583            return CompletionTokenCounts::skipped();
2584        };
2585
2586        let original_tokens = token_count_u32(&raw_output);
2587        let original_bytes = raw_output.len() as i64;
2588        let compressed_output = rendered_output.unwrap_or(&raw_output);
2589        let compressed_tokens = token_count_u32(compressed_output);
2590        let compressed_bytes = compressed_output.len() as i64;
2591        CompletionTokenCounts {
2592            original_tokens: Some(original_tokens),
2593            compressed_tokens: Some(compressed_tokens),
2594            original_bytes: Some(original_bytes),
2595            compressed_bytes: Some(compressed_bytes),
2596            tokens_skipped: false,
2597        }
2598    }
2599
2600    pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2601        if !self
2602            .inner
2603            .long_running_reminder_enabled
2604            .load(Ordering::SeqCst)
2605        {
2606            return;
2607        }
2608        let interval_ms = self
2609            .inner
2610            .long_running_reminder_interval_ms
2611            .load(Ordering::SeqCst);
2612        if interval_ms == 0 {
2613            return;
2614        }
2615        let interval = Duration::from_millis(interval_ms);
2616        let now = Instant::now();
2617        let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2618            return;
2619        };
2620        let since = last_reminder_at.unwrap_or(task.started);
2621        if now.duration_since(since) < interval {
2622            return;
2623        }
2624        let command = task
2625            .state
2626            .lock()
2627            .map(|state| state.metadata.command.clone())
2628            .unwrap_or_default();
2629        *last_reminder_at = Some(now);
2630        self.emit_bash_long_running(BashLongRunningFrame::new(
2631            task.task_id.clone(),
2632            task.session_id.clone(),
2633            command,
2634            task.started.elapsed().as_millis() as u64,
2635        ));
2636    }
2637
2638    fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2639        let Ok(progress_sender) = self
2640            .inner
2641            .progress_sender
2642            .lock()
2643            .map(|sender| sender.clone())
2644        else {
2645            return;
2646        };
2647        if let Some(sender) = progress_sender.as_ref() {
2648            sender(PushFrame::BashLongRunning(frame));
2649        }
2650    }
2651
2652    fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2653        self.inner
2654            .tasks
2655            .lock()
2656            .ok()
2657            .and_then(|tasks| tasks.get(task_id).cloned())
2658    }
2659
2660    fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2661        self.task(task_id)
2662            .filter(|task| task.session_id == session_id)
2663    }
2664
2665    fn running_count(&self) -> usize {
2666        self.inner
2667            .tasks
2668            .lock()
2669            .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2670            .unwrap_or(0)
2671    }
2672
2673    fn start_watchdog(&self) {
2674        if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2675            super::watchdog::start(self.clone());
2676        }
2677    }
2678
2679    fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2680        unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2681    }
2682
2683    #[cfg(test)]
2684    pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2685        self.task_for_session(task_id, session_id)
2686            .map(|task| task.paths.json.clone())
2687    }
2688
2689    #[cfg(test)]
2690    pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2691        self.task_for_session(task_id, session_id)
2692            .map(|task| task.paths.exit.clone())
2693    }
2694
2695    /// Generate a `bash-{16hex}` slug that is unique against live tasks and queued completions.
2696    fn generate_unique_task_id(&self) -> Result<String, String> {
2697        for _ in 0..32 {
2698            let candidate = random_slug();
2699            let tasks = self
2700                .inner
2701                .tasks
2702                .lock()
2703                .map_err(|_| "background task registry lock poisoned".to_string())?;
2704            if tasks.contains_key(&candidate) {
2705                continue;
2706            }
2707            let completions = self
2708                .inner
2709                .completions
2710                .lock()
2711                .map_err(|_| "background completions lock poisoned".to_string())?;
2712            if completions
2713                .iter()
2714                .any(|completion| completion.task_id == candidate)
2715            {
2716                continue;
2717            }
2718            return Ok(candidate);
2719        }
2720        Err("failed to allocate unique background task id after 32 attempts".to_string())
2721    }
2722}
2723
2724fn render_compressed_with_recovery(
2725    buffer: &BgBuffer,
2726    mut compressed: CompressionResult,
2727    input_truncated: bool,
2728) -> TerminalOutputCache {
2729    // Preserve a single canonical trailing newline. A bare `.trim_end()` strips
2730    // the legitimate final newline that `echo` and most commands emit, so
2731    // agent-facing output diverged from native bash ("hello" vs "hello\n") and
2732    // broke the no-JSON-envelope contract. Collapse excess trailing blank lines
2733    // to one, but keep that one when the content had a trailing newline. NOTE:
2734    // the check must read the ORIGINAL text — strip_plain_truncation_marker_lines
2735    // rebuilds via `.lines().join("\n")`, which itself drops the trailing newline.
2736    let had_trailing_newline = compressed.text.ends_with('\n');
2737    let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2738        .trim_end()
2739        .to_string();
2740    if had_trailing_newline && !text.is_empty() {
2741        text.push('\n');
2742    }
2743    compressed.text = text;
2744
2745    let output_path = buffer.output_path().map(|path| path.display().to_string());
2746    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2747    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2748    let mut recovery = RecoveryContext {
2749        dropped_by_class: compressed.dropped_by_class,
2750        had_inner_drop: compressed.had_inner_drop,
2751        offset_hint_eligible: compressed.offset_hint_eligible,
2752        offset_start_line: compressed.offset_start_line,
2753        byte_truncated: input_truncated,
2754        output_path: output_path.clone(),
2755        stderr_path: stderr_path.clone(),
2756        include_stderr_path,
2757    };
2758
2759    let (output_preview, output_truncated) =
2760        render_body_with_recovery_marker(&compressed.text, &mut recovery);
2761    TerminalOutputCache {
2762        output_preview,
2763        output_truncated,
2764        kind: TerminalOutputKind::Compressed,
2765        output_path,
2766        stderr_path,
2767        recovery: Some(recovery),
2768    }
2769}
2770
2771fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2772    render_body_with_recovery_marker_at_cap(
2773        body,
2774        recovery,
2775        FINAL_OUTPUT_CAP_BYTES,
2776        cap_final_output,
2777        cap_final_output_with_marker,
2778    )
2779}
2780
2781fn render_raw_body_with_recovery_marker(
2782    body: &str,
2783    recovery: &mut RecoveryContext,
2784) -> (String, bool) {
2785    render_body_with_recovery_marker_at_cap(
2786        body,
2787        recovery,
2788        RAW_PASSTHROUGH_CAP_BYTES,
2789        |input| {
2790            super::output::cap_head_tail(
2791                input,
2792                RAW_PASSTHROUGH_CAP_BYTES,
2793                RAW_PASSTHROUGH_HEAD_BYTES,
2794                RAW_PASSTHROUGH_TAIL_BYTES,
2795            )
2796        },
2797        |input, marker| {
2798            super::output::cap_head_tail_with_marker(
2799                input,
2800                RAW_PASSTHROUGH_CAP_BYTES,
2801                RAW_PASSTHROUGH_HEAD_BYTES,
2802                RAW_PASSTHROUGH_TAIL_BYTES,
2803                marker,
2804            )
2805        },
2806    )
2807}
2808
2809fn render_body_with_recovery_marker_at_cap<F, G>(
2810    body: &str,
2811    recovery: &mut RecoveryContext,
2812    cap_bytes: usize,
2813    cap_plain: F,
2814    cap_with_marker: G,
2815) -> (String, bool)
2816where
2817    F: Fn(&str) -> super::output::CappedText,
2818    G: Fn(&str, &str) -> super::output::CappedText,
2819{
2820    let needs_marker = recovery.has_visible_drop();
2821    if body.len() > cap_bytes {
2822        recovery.byte_truncated = true;
2823        if let Some(marker) = recovery_marker(recovery) {
2824            let capped = cap_with_marker(body, &marker);
2825            return (capped.text, true);
2826        }
2827        let capped = cap_plain(body);
2828        return (capped.text, capped.truncated || needs_marker);
2829    }
2830
2831    if !needs_marker {
2832        return (body.to_string(), false);
2833    }
2834
2835    let Some(marker) = recovery_marker(recovery) else {
2836        return (body.to_string(), true);
2837    };
2838    let with_marker = append_recovery_marker(body, &marker);
2839    if with_marker.len() <= cap_bytes {
2840        return (with_marker, true);
2841    }
2842
2843    recovery.byte_truncated = true;
2844    let marker = recovery_marker(recovery).unwrap_or(marker);
2845    let capped = cap_with_marker(body, &marker);
2846    (capped.text, true)
2847}
2848
2849fn append_recovery_marker(body: &str, marker: &str) -> String {
2850    if body.is_empty() {
2851        return marker.to_string();
2852    }
2853    let mut output = body.trim_end().to_string();
2854    output.push('\n');
2855    output.push_str(marker);
2856    output
2857}
2858
2859fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2860    let mut parts = Vec::new();
2861    for (class, count) in &recovery.dropped_by_class {
2862        let label = if *count == 1 {
2863            class.singular()
2864        } else {
2865            class.plural()
2866        };
2867        parts.push(format!("+{count} more {label}"));
2868    }
2869    if recovery.byte_truncated {
2870        parts.push("truncated output".to_string());
2871    } else if recovery.had_inner_drop && parts.is_empty() {
2872        parts.push("omitted output".to_string());
2873    }
2874
2875    if parts.is_empty() {
2876        return None;
2877    }
2878
2879    let hint = recovery_hint(recovery);
2880    Some(format!("[{}; {hint}]", parts.join(", ")))
2881}
2882
2883fn recovery_hint(recovery: &RecoveryContext) -> String {
2884    // AFT stores stdout/stderr separately and combines them in memory. Class caps,
2885    // middle truncation, and mixed stdout/stderr renders are not line-offset
2886    // portable. Only a single-file contiguous-prefix drop may use `tail -n +N`.
2887    if recovery.offset_hint_eligible
2888        && !recovery.byte_truncated
2889        && recovery.dropped_by_class.is_empty()
2890        && !recovery.include_stderr_path
2891    {
2892        if let (Some(path), Some(line)) =
2893            (recovery.output_path.as_deref(), recovery.offset_start_line)
2894        {
2895            return format!("see remaining: tail -n +{line} {}", quote_path(path));
2896        }
2897    }
2898
2899    let mut paths = Vec::new();
2900    if let Some(path) = recovery.output_path.as_deref() {
2901        paths.push(path);
2902    }
2903    if recovery.include_stderr_path {
2904        if let Some(path) = recovery.stderr_path.as_deref() {
2905            if !paths.contains(&path) {
2906                paths.push(path);
2907            }
2908        }
2909    }
2910
2911    if paths.is_empty() {
2912        return "full output unavailable".to_string();
2913    }
2914
2915    let reads = paths
2916        .into_iter()
2917        .map(|path| format!("read {}", quote_path(path)))
2918        .collect::<Vec<_>>()
2919        .join(" and ");
2920    format!("full output: {reads}")
2921}
2922
2923fn strip_plain_truncation_marker_lines(input: &str) -> String {
2924    input
2925        .lines()
2926        .filter(|line| !is_plain_truncation_marker(line.trim()))
2927        .collect::<Vec<_>>()
2928        .join("\n")
2929}
2930
2931fn strip_recovery_marker_lines(input: &str) -> String {
2932    input
2933        .lines()
2934        .filter(|line| !is_recovery_marker(line.trim()))
2935        .collect::<Vec<_>>()
2936        .join("\n")
2937}
2938
2939fn is_plain_truncation_marker(line: &str) -> bool {
2940    let Some(rest) = line.strip_prefix("...<truncated ") else {
2941        return false;
2942    };
2943    let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2944        return false;
2945    };
2946    !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2947}
2948
2949fn is_recovery_marker(line: &str) -> bool {
2950    line.starts_with('[')
2951        && line.ends_with(']')
2952        && (line.contains("full output: read ")
2953            || line.contains("see remaining: tail -n +")
2954            || line.contains("full output unavailable"))
2955}
2956
2957fn render_structured_output(command: &str, buffer: &BgBuffer) -> Option<TerminalOutputCache> {
2958    if !is_gh_structured_command(command) {
2959        return None;
2960    }
2961
2962    let output_path = buffer
2963        .output_path()
2964        .map(|path| path.display().to_string())?;
2965    let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2966    if stdout_bytes == 0 {
2967        return None;
2968    }
2969
2970    if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2971        if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2972            return None;
2973        }
2974        return Some(TerminalOutputCache {
2975            output_preview: json_output_pointer(stdout_bytes, &output_path),
2976            output_truncated: true,
2977            kind: TerminalOutputKind::Structured,
2978            output_path: Some(output_path),
2979            stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2980            recovery: None,
2981        });
2982    }
2983
2984    let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
2985    if stdout.truncated || !is_structured_body(&stdout.text) {
2986        return None;
2987    }
2988
2989    Some(TerminalOutputCache {
2990        output_preview: stdout.text,
2991        output_truncated: false,
2992        kind: TerminalOutputKind::Structured,
2993        output_path: Some(output_path),
2994        stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2995        recovery: None,
2996    })
2997}
2998
2999fn render_raw_passthrough(buffer: &BgBuffer) -> TerminalOutputCache {
3000    let raw = buffer.read_combined_head_tail(
3001        RAW_PASSTHROUGH_CAP_BYTES,
3002        RAW_PASSTHROUGH_HEAD_BYTES,
3003        RAW_PASSTHROUGH_TAIL_BYTES,
3004    );
3005    let output_path = buffer.output_path().map(|path| path.display().to_string());
3006    let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3007    if !raw.truncated {
3008        return TerminalOutputCache {
3009            output_preview: raw.text,
3010            output_truncated: false,
3011            kind: TerminalOutputKind::Raw,
3012            output_path,
3013            stderr_path,
3014            recovery: None,
3015        };
3016    }
3017
3018    let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3019    let mut recovery = RecoveryContext {
3020        dropped_by_class: BTreeMap::new(),
3021        had_inner_drop: false,
3022        offset_hint_eligible: false,
3023        offset_start_line: None,
3024        byte_truncated: true,
3025        output_path: output_path.clone(),
3026        stderr_path: stderr_path.clone(),
3027        include_stderr_path,
3028    };
3029    let (output_preview, output_truncated) =
3030        render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3031    TerminalOutputCache {
3032        output_preview,
3033        output_truncated,
3034        kind: TerminalOutputKind::Raw,
3035        output_path,
3036        stderr_path,
3037        recovery: Some(recovery),
3038    }
3039}
3040
3041fn completion_preview_for_cache(
3042    cache: &TerminalOutputCache,
3043    exit_code: Option<i32>,
3044) -> (String, bool) {
3045    // Reminder previews are sized by exit status: success gets a short tail,
3046    // failure keeps head+tail context (see output.rs completion caps).
3047    let exit_ok = exit_code == Some(0);
3048    let threshold = completion_preview_threshold(exit_ok);
3049    if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3050        if let Some(path) = cache.output_path.as_deref() {
3051            return (
3052                json_output_pointer(cache.output_preview.len() as u64, path),
3053                true,
3054            );
3055        }
3056        return (cache.output_preview.clone(), cache.output_truncated);
3057    }
3058
3059    if let Some(recovery) = cache.recovery.as_ref() {
3060        if cache.output_preview.len() <= threshold {
3061            return (cache.output_preview.clone(), cache.output_truncated);
3062        }
3063        let body = strip_recovery_marker_lines(&cache.output_preview);
3064        let mut completion_recovery = recovery.clone();
3065        completion_recovery.byte_truncated = true;
3066        if let Some(marker) = recovery_marker(&completion_recovery) {
3067            let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3068            return (capped.text, true);
3069        }
3070    }
3071
3072    let capped = cap_completion_output(&cache.output_preview, exit_ok);
3073    (capped.text, cache.output_truncated || capped.truncated)
3074}
3075
3076fn is_gh_structured_command(command: &str) -> bool {
3077    let normalized = crate::compress::normalize_command_for_dispatch(command)
3078        .unwrap_or_else(|| command.trim_start().to_string());
3079    let tokens = shell_words_for_flags(&normalized);
3080    let Some(head) = tokens.first() else {
3081        return false;
3082    };
3083    let head_name = Path::new(head)
3084        .file_name()
3085        .and_then(|name| name.to_str())
3086        .unwrap_or(head);
3087    if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3088        return false;
3089    }
3090    tokens.iter().any(|token| {
3091        matches!(token.as_str(), "--json" | "--jq" | "--template")
3092            || token.starts_with("--json=")
3093            || token.starts_with("--jq=")
3094            || token.starts_with("--template=")
3095    })
3096}
3097
3098fn shell_words_for_flags(command: &str) -> Vec<String> {
3099    let mut words = Vec::new();
3100    let mut current = String::new();
3101    let mut in_single = false;
3102    let mut in_double = false;
3103    let mut escaped = false;
3104
3105    for ch in command.chars() {
3106        if escaped {
3107            current.push(ch);
3108            escaped = false;
3109            continue;
3110        }
3111        if ch == '\\' && !in_single {
3112            escaped = true;
3113            continue;
3114        }
3115        if ch == '\'' && !in_double {
3116            in_single = !in_single;
3117            continue;
3118        }
3119        if ch == '"' && !in_single {
3120            in_double = !in_double;
3121            continue;
3122        }
3123        if ch.is_whitespace() && !in_single && !in_double {
3124            if !current.is_empty() {
3125                words.push(std::mem::take(&mut current));
3126            }
3127            continue;
3128        }
3129        if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3130            if !current.is_empty() {
3131                words.push(std::mem::take(&mut current));
3132            }
3133            continue;
3134        }
3135        current.push(ch);
3136    }
3137    if !current.is_empty() {
3138        words.push(current);
3139    }
3140    words
3141}
3142
3143fn is_structured_body(body: &str) -> bool {
3144    let trimmed = body.trim();
3145    if trimmed.is_empty() {
3146        return false;
3147    }
3148    if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3149        return true;
3150    }
3151
3152    let mut saw_line = false;
3153    for line in trimmed
3154        .lines()
3155        .map(str::trim)
3156        .filter(|line| !line.is_empty())
3157    {
3158        saw_line = true;
3159        if serde_json::from_str::<serde_json::Value>(line).is_err() {
3160            return false;
3161        }
3162    }
3163    saw_line
3164}
3165
3166fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3167    let path = match (buffer, stream) {
3168        (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3169        (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3170        (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3171    };
3172    let Some(path) = path else {
3173        return false;
3174    };
3175    let Ok(file) = std::fs::File::open(path) else {
3176        return false;
3177    };
3178    let mut limited = file.take(512);
3179    let mut bytes = Vec::new();
3180    if limited.read_to_end(&mut bytes).is_err() {
3181        return false;
3182    }
3183    String::from_utf8_lossy(&bytes)
3184        .chars()
3185        .find(|ch| !ch.is_whitespace())
3186        .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3187}
3188
3189struct CompletionTokenCounts {
3190    original_tokens: Option<u32>,
3191    compressed_tokens: Option<u32>,
3192    original_bytes: Option<i64>,
3193    compressed_bytes: Option<i64>,
3194    tokens_skipped: bool,
3195}
3196
3197impl CompletionTokenCounts {
3198    fn skipped() -> Self {
3199        Self {
3200            original_tokens: None,
3201            compressed_tokens: None,
3202            original_bytes: None,
3203            compressed_bytes: None,
3204            tokens_skipped: true,
3205        }
3206    }
3207}
3208
3209fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3210    match status {
3211        BgTaskStatus::TimedOut => "timed out".to_string(),
3212        BgTaskStatus::Killed => "killed".to_string(),
3213        _ => exit_code
3214            .map(|code| format!("exit {code}"))
3215            .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3216    }
3217}
3218
3219fn token_count_u32(text: &str) -> u32 {
3220    aft_tokenizer::count_tokens(text)
3221        .try_into()
3222        .unwrap_or(u32::MAX)
3223}
3224
3225impl Default for BgTaskRegistry {
3226    fn default() -> Self {
3227        Self::new(Arc::new(Mutex::new(None)))
3228    }
3229}
3230
3231fn modified_within(path: &Path, grace: Duration) -> bool {
3232    fs::metadata(path)
3233        .and_then(|metadata| metadata.modified())
3234        .ok()
3235        .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3236        .map(|age| age < grace)
3237        .unwrap_or(false)
3238}
3239
3240fn canonicalized_path(path: &Path) -> PathBuf {
3241    fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3242}
3243
3244fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3245    let now_ms = SystemTime::now()
3246        .duration_since(UNIX_EPOCH)
3247        .ok()
3248        .map(|duration| duration.as_millis() as u64)
3249        .unwrap_or(started_at);
3250    let elapsed_ms = now_ms.saturating_sub(started_at);
3251    Instant::now()
3252        .checked_sub(Duration::from_millis(elapsed_ms))
3253        .unwrap_or_else(Instant::now)
3254}
3255
3256fn gc_quarantine(storage_dir: &Path) {
3257    let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3258    let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3259        return;
3260    };
3261    for session_entry in session_dirs.flatten() {
3262        let session_quarantine_dir = session_entry.path();
3263        if !session_quarantine_dir.is_dir() {
3264            continue;
3265        }
3266        let entries = match fs::read_dir(&session_quarantine_dir) {
3267            Ok(entries) => entries,
3268            Err(error) => {
3269                crate::slog_warn!(
3270                    "failed to read background task quarantine dir {}: {error}",
3271                    session_quarantine_dir.display()
3272                );
3273                continue;
3274            }
3275        };
3276        for entry in entries.flatten() {
3277            let path = entry.path();
3278            if modified_within(&path, QUARANTINE_GC_GRACE) {
3279                continue;
3280            }
3281            let result = if path.is_dir() {
3282                fs::remove_dir_all(&path)
3283            } else {
3284                fs::remove_file(&path)
3285            };
3286            match result {
3287                Ok(()) => log::debug!(
3288                    "deleted old background task quarantine entry {}",
3289                    path.display()
3290                ),
3291                Err(error) => crate::slog_warn!(
3292                    "failed to delete old background task quarantine entry {}: {error}",
3293                    path.display()
3294                ),
3295            }
3296        }
3297        let _ = fs::remove_dir(&session_quarantine_dir);
3298    }
3299    let _ = fs::remove_dir(&quarantine_root);
3300}
3301
3302enum QuarantineKind {
3303    Corrupt,
3304    Invalid,
3305}
3306
3307fn quarantine_task_json(
3308    storage_dir: &Path,
3309    session_dir: &Path,
3310    json_path: &Path,
3311    kind: QuarantineKind,
3312) -> Result<(), String> {
3313    let session_hash = session_dir
3314        .file_name()
3315        .and_then(|name| name.to_str())
3316        .ok_or_else(|| {
3317            format!(
3318                "invalid background task session dir: {}",
3319                session_dir.display()
3320            )
3321        })?;
3322    let task_name = json_path
3323        .file_name()
3324        .and_then(|name| name.to_str())
3325        .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3326    let unix_ts = SystemTime::now()
3327        .duration_since(UNIX_EPOCH)
3328        .map(|duration| duration.as_secs())
3329        .unwrap_or(0);
3330    let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3331    fs::create_dir_all(&quarantine_dir).map_err(|e| {
3332        format!(
3333            "failed to create background task quarantine dir {}: {e}",
3334            quarantine_dir.display()
3335        )
3336    })?;
3337    let target_name = quarantine_name(task_name, unix_ts, &kind);
3338    let target = quarantine_dir.join(target_name);
3339    fs::rename(json_path, &target).map_err(|e| {
3340        format!(
3341            "failed to quarantine background task metadata {} to {}: {e}",
3342            json_path.display(),
3343            target.display()
3344        )
3345    })?;
3346
3347    for sibling in task_sibling_paths(json_path) {
3348        if !sibling.exists() {
3349            continue;
3350        }
3351        let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3352            crate::slog_warn!(
3353                "skipping background task sibling with invalid name during quarantine: {}",
3354                sibling.display()
3355            );
3356            continue;
3357        };
3358        let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3359        if let Err(error) = fs::rename(&sibling, &sibling_target) {
3360            crate::slog_warn!(
3361                "failed to quarantine background task sibling {} to {}: {error}",
3362                sibling.display(),
3363                sibling_target.display()
3364            );
3365        }
3366    }
3367
3368    let _ = fs::remove_dir(session_dir);
3369    Ok(())
3370}
3371
3372fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3373    match kind {
3374        QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3375        QuarantineKind::Invalid => {
3376            let path = Path::new(file_name);
3377            let stem = path.file_stem().and_then(|stem| stem.to_str());
3378            let extension = path.extension().and_then(|extension| extension.to_str());
3379            match (stem, extension) {
3380                (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3381                _ => format!("{file_name}.invalid.{unix_ts}"),
3382            }
3383        }
3384    }
3385}
3386
3387fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3388    let Some(parent) = json_path.parent() else {
3389        return Vec::new();
3390    };
3391    let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3392        return Vec::new();
3393    };
3394    ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3395        .into_iter()
3396        .map(|extension| parent.join(format!("{stem}.{extension}")))
3397        .collect()
3398}
3399
3400fn read_for_token_count_from_disk(
3401    metadata: &PersistedTask,
3402    paths: &TaskPaths,
3403    max_bytes_per_stream: usize,
3404) -> TokenCountInput {
3405    if metadata.mode == BgMode::Pty {
3406        return TokenCountInput::Skipped;
3407    }
3408    // Read up to `max_bytes_per_stream` bytes per stream rather than
3409    // refusing to tokenize anything when the file exceeds the cap.
3410    // Mirror the in-memory `BgBuffer::read_for_token_count` policy
3411    // (see comment there) — large outputs are exactly the tasks that
3412    // benefit most from compression accounting, so silent-skipping
3413    // them defeats the purpose of token tracking.
3414    let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3415    let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3416    match (stdout, stderr) {
3417        (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3418            String::from_utf8_lossy(&stdout).as_ref(),
3419            String::from_utf8_lossy(&stderr).as_ref(),
3420        )),
3421        (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3422            String::from_utf8_lossy(&stdout).as_ref(),
3423            "",
3424        )),
3425        (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3426            "",
3427            String::from_utf8_lossy(&stderr).as_ref(),
3428        )),
3429        (Err(_), Err(_)) => TokenCountInput::Skipped,
3430    }
3431}
3432
3433/// Read at most `max_bytes` bytes from the END of `path`. Used for
3434/// tokenization where the most recent output is more representative than
3435/// an arbitrarily-capped beginning. Returns `Err` if the file cannot be
3436/// opened (genuinely missing or permissions error).
3437fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3438    use std::io::{Read, Seek, SeekFrom};
3439    let mut file = std::fs::File::open(path)?;
3440    let len = file.metadata()?.len();
3441    let read_len = len.min(max_bytes as u64);
3442    if read_len > 0 && len > max_bytes as u64 {
3443        file.seek(SeekFrom::End(-(read_len as i64)))?;
3444    }
3445    let mut bytes = Vec::with_capacity(read_len as usize);
3446    file.read_to_end(&mut bytes)?;
3447    Ok(bytes)
3448}
3449
3450impl BgTask {
3451    fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3452        let state = self
3453            .state
3454            .lock()
3455            .unwrap_or_else(|poison| poison.into_inner());
3456        self.snapshot_locked(&state, preview_bytes)
3457    }
3458
3459    fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3460        let metadata = &state.metadata;
3461        let duration_ms = metadata.duration_ms.or_else(|| {
3462            metadata
3463                .status
3464                .is_terminal()
3465                .then(|| self.started.elapsed().as_millis() as u64)
3466        });
3467        let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3468            (String::new(), false)
3469        } else if metadata.status.is_terminal() {
3470            state
3471                .terminal_output_cache
3472                .as_ref()
3473                .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3474                .unwrap_or_else(|| (String::new(), false))
3475        } else {
3476            state.buffer.read_tail(preview_bytes)
3477        };
3478        BgTaskSnapshot {
3479            info: BgTaskInfo {
3480                task_id: self.task_id.clone(),
3481                status: metadata.status.clone(),
3482                command: metadata.command.clone(),
3483                mode: metadata.mode.clone(),
3484                started_at: metadata.started_at,
3485                duration_ms,
3486            },
3487            exit_code: metadata.exit_code,
3488            child_pid: metadata.child_pid,
3489            workdir: metadata.workdir.display().to_string(),
3490            output_preview,
3491            output_truncated,
3492            output_path: state
3493                .buffer
3494                .output_path()
3495                .map(|path| path.display().to_string()),
3496            stderr_path: state
3497                .buffer
3498                .stderr_path()
3499                .map(|path| path.display().to_string()),
3500            pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3501            pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3502        }
3503    }
3504
3505    pub(crate) fn is_running(&self) -> bool {
3506        self.state
3507            .lock()
3508            .map(|state| {
3509                state.metadata.status == BgTaskStatus::Running
3510                    || (state.metadata.mode == BgMode::Pty
3511                        && state.metadata.status == BgTaskStatus::Killing)
3512            })
3513            .unwrap_or(false)
3514    }
3515
3516    fn is_terminal(&self) -> bool {
3517        self.state
3518            .lock()
3519            .map(|state| state.metadata.status.is_terminal())
3520            .unwrap_or(false)
3521    }
3522
3523    fn mark_terminal_now(&self) {
3524        if let Ok(mut terminal_at) = self.terminal_at.lock() {
3525            if terminal_at.is_none() {
3526                *terminal_at = Some(Instant::now());
3527            }
3528        }
3529    }
3530
3531    fn set_completion_delivered(
3532        &self,
3533        delivered: bool,
3534        registry: &BgTaskRegistry,
3535    ) -> Result<(), String> {
3536        let mut state = self
3537            .state
3538            .lock()
3539            .map_err(|_| "background task lock poisoned".to_string())?;
3540        let updated = registry
3541            .update_task_metadata(&self.paths, |metadata| {
3542                metadata.completion_delivered = delivered;
3543            })
3544            .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3545        state.metadata = updated;
3546        Ok(())
3547    }
3548}
3549
3550/// Reap an exited direct child handle, then clear the slot.
3551///
3552/// Dropping a [`std::process::Child`] does NOT `wait()` on the underlying OS
3553/// process. On Unix a finished-but-unreaped child lingers as a `<defunct>`
3554/// zombie until the AFT process itself exits (issue #91: `[mv] <defunct>`).
3555/// The terminal-transition paths that learn of completion from the
3556/// exit-marker file — rather than from [`BgTaskRegistry::reap_child`]'s
3557/// `try_wait()` — must therefore reap the handle explicitly instead of just
3558/// nulling it.
3559///
3560/// The exit marker is written by the wrapper's final statement (an atomic
3561/// `mv` rename), so by the time we observe the marker the direct child has
3562/// finished its work and is exiting; `wait()` returns essentially
3563/// immediately. We attempt a non-blocking `try_wait()` first so the common
3564/// case never blocks at all, falling back to a (bounded) `wait()` only to
3565/// cover the microsecond window between the rename and process teardown.
3566///
3567/// Callers hold the task state mutex, so this is serialized against
3568/// `reap_child` — there is no double-`wait()` hazard: whichever path acquires
3569/// the lock first reaps and clears the slot, and the other observes `None`.
3570#[cfg(unix)]
3571fn reap_piped_child(child_slot: &mut Option<Child>) {
3572    if let Some(mut child) = child_slot.take() {
3573        if matches!(child.try_wait(), Ok(None)) {
3574            let _ = child.wait();
3575        }
3576    }
3577}
3578
3579/// Windows has no zombie/`<defunct>` concept: dropping the [`Child`] closes
3580/// the process handle, which is the correct release. Preserve the historical
3581/// behavior of simply clearing the slot so the documented Windows PID-recycle
3582/// handling in `reap_child` is unaffected.
3583#[cfg(windows)]
3584fn reap_piped_child(child_slot: &mut Option<Child>) {
3585    *child_slot = None;
3586}
3587
3588fn terminal_metadata_from_marker(
3589    mut metadata: PersistedTask,
3590    marker: ExitMarker,
3591    reason: Option<String>,
3592) -> PersistedTask {
3593    match marker {
3594        ExitMarker::Code(code) => {
3595            let status = if code == 0 {
3596                BgTaskStatus::Completed
3597            } else {
3598                BgTaskStatus::Failed
3599            };
3600            metadata.mark_terminal(status, Some(code), reason);
3601        }
3602        ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3603    }
3604    metadata
3605}
3606
3607#[cfg(unix)]
3608fn write_unix_command_script(command: &str, paths: &TaskPaths) -> Result<PathBuf, String> {
3609    let stem = paths
3610        .json
3611        .file_stem()
3612        .and_then(|stem| stem.to_str())
3613        .unwrap_or("wrapper");
3614    let script_path = paths.dir.join(format!("{stem}.sh"));
3615    fs::write(&script_path, command)
3616        .map_err(|e| format!("failed to write background bash command script: {e}"))?;
3617    Ok(script_path)
3618}
3619
3620#[cfg(unix)]
3621fn detached_shell_command(command_script: &Path, exit_path: &Path) -> Command {
3622    let shell = resolve_posix_shell();
3623    let mut cmd = Command::new(&shell);
3624    // Keep the user-provided command body out of argv and shell `-c` parsing.
3625    // The direct child is still a tiny wrapper so it can write the authoritative
3626    // exit marker after the command script exits (including if that script calls
3627    // `exit`). Passing only file paths through `-c` avoids newline/quote/length
3628    // edge cases where a multi-command script can be mangled before execution.
3629    cmd.arg("-c")
3630        .arg(r#""$0" "$1"; code=$?; printf "%s" "$code" > "$2.tmp.$$"; mv -f "$2.tmp.$$" "$2""#)
3631        .arg(&shell)
3632        .arg(command_script)
3633        .arg(exit_path);
3634    unsafe {
3635        cmd.pre_exec(|| {
3636            if libc::setsid() == -1 {
3637                return Err(std::io::Error::last_os_error());
3638            }
3639            Ok(())
3640        });
3641    }
3642    cmd
3643}
3644
3645#[cfg(unix)]
3646fn resolve_posix_shell() -> PathBuf {
3647    static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3648    POSIX_SHELL
3649        .get_or_init(|| {
3650            std::env::var_os("BASH")
3651                .filter(|value| !value.is_empty())
3652                .map(PathBuf::from)
3653                .filter(|path| path.exists())
3654                .or_else(|| which::which("bash").ok())
3655                .or_else(|| which::which("zsh").ok())
3656                .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3657        })
3658        .clone()
3659}
3660
3661#[cfg(windows)]
3662fn detached_shell_command_for(
3663    shell: crate::windows_shell::WindowsShell,
3664    command: &str,
3665    exit_path: &Path,
3666    paths: &TaskPaths,
3667    creation_flags: u32,
3668) -> Result<Command, String> {
3669    use crate::windows_shell::WindowsShell;
3670    // Write the wrapper to a temp file alongside the other task files,
3671    // then invoke the shell with the file path as a single clean
3672    // argument. This sidesteps the entire Windows command-line quoting
3673    // mess (Rust std-lib quoting + cmd /C parser + PowerShell -Command
3674    // parser all interacting with embedded quotes in the wrapper).
3675    //
3676    // Path arguments don't need quoting in the same problematic way
3677    // because: (1) we use no-space task IDs (bash-XXXXXXXX) so the path
3678    // contains no characters that need shell escaping; (2) the wrapper
3679    // body's internal quotes never reach the shell command line — the
3680    // shell reads them from disk by file syntax rules, not command-line
3681    // parser rules.
3682    let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3683    let wrapper_ext = match shell {
3684        WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3685        WindowsShell::Cmd => "bat",
3686        // POSIX shells (git-bash etc.) execute the wrapper through `-c`,
3687        // so the file extension is purely cosmetic; `.sh` matches what an
3688        // operator would expect when grepping the spill directory.
3689        WindowsShell::Posix(_) => "sh",
3690    };
3691    let wrapper_path = paths.dir.join(format!(
3692        "{}.{}",
3693        paths
3694            .json
3695            .file_stem()
3696            .and_then(|s| s.to_str())
3697            .unwrap_or("wrapper"),
3698        wrapper_ext
3699    ));
3700    fs::write(&wrapper_path, wrapper_body)
3701        .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3702
3703    let mut cmd = Command::new(shell.binary().as_ref());
3704    match shell {
3705        WindowsShell::Pwsh | WindowsShell::Powershell => {
3706            // -File runs the script with no quoting issues. `-NoLogo`,
3707            // `-NoProfile`, etc. apply to the host before the file runs.
3708            cmd.args([
3709                "-NoLogo",
3710                "-NoProfile",
3711                "-NonInteractive",
3712                "-ExecutionPolicy",
3713                "Bypass",
3714                "-File",
3715            ]);
3716            cmd.arg(&wrapper_path);
3717        }
3718        WindowsShell::Cmd => {
3719            // `cmd /D /C "<bat-file-path>"` — invoking a .bat
3720            // file via /C is well-defined; the file's contents are
3721            // read line-by-line by cmd's batch processor, NOT
3722            // re-interpreted by the /C parser. This avoids the
3723            // "filename syntax incorrect" errors that came from
3724            // having complex compound commands on the cmd line.
3725            cmd.args(["/D", "/C"]);
3726            cmd.arg(&wrapper_path);
3727        }
3728        WindowsShell::Posix(_) => {
3729            // git-bash and other POSIX shells run the wrapper script with
3730            // `<binary> <wrapper-path>` (the wrapper is just a shell
3731            // script). No special flags needed — the `trap` and atomic
3732            // exit-marker rename in `wrapper_script` are POSIX-standard.
3733            cmd.arg(&wrapper_path);
3734        }
3735    }
3736
3737    // Win32 process creation flags. Caller selects whether to include
3738    // CREATE_BREAKAWAY_FROM_JOB — see `detached_shell_command_for` callers
3739    // for the breakaway-fallback strategy.
3740    cmd.creation_flags(creation_flags);
3741    Ok(cmd)
3742}
3743
3744/// Spawn a detached background bash child process.
3745///
3746/// On Unix this is a single spawn against `/bin/sh`. On Windows it walks
3747/// `WindowsShell::shell_candidates()` (pwsh.exe → powershell.exe →
3748/// cmd.exe) and retries with the next candidate when the previous one
3749/// fails to spawn with `NotFound` — the same runtime safety net the
3750/// foreground bash path has, so issue #27 callers landing on cmd.exe
3751/// fallback can also use background bash. The wrapper script is
3752/// regenerated per attempt because PowerShell wrappers embed the shell
3753/// binary by name; the stdout/stderr capture handles are also reopened
3754/// per attempt because `Command::spawn()` consumes them.
3755///
3756/// Errors other than `NotFound` (PermissionDenied, OutOfMemory, etc.)
3757/// return immediately without retry — they indicate a problem with the
3758/// resolved shell that retrying with a different shell won't fix.
3759fn spawn_detached_child(
3760    command: &str,
3761    paths: &TaskPaths,
3762    workdir: &Path,
3763    env: &HashMap<String, String>,
3764) -> Result<std::process::Child, String> {
3765    #[cfg(not(windows))]
3766    {
3767        let stdout = create_capture_file(&paths.stdout)
3768            .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3769        let stderr = create_capture_file(&paths.stderr)
3770            .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3771        let command_script = write_unix_command_script(command, paths)?;
3772        detached_shell_command(&command_script, &paths.exit)
3773            .current_dir(workdir)
3774            .envs(env)
3775            .stdin(Stdio::null())
3776            .stdout(Stdio::from(stdout))
3777            .stderr(Stdio::from(stderr))
3778            .spawn()
3779            .map_err(|e| format!("failed to spawn background bash command: {e}"))
3780    }
3781    #[cfg(windows)]
3782    {
3783        use crate::windows_shell::shell_candidates;
3784        // Spawn priority: pwsh → powershell → git-bash → cmd. Same as the
3785        // legacy foreground bash spawn path. v0.20 routes ALL bash through
3786        // this background spawn helper, including foreground tool calls
3787        // where the model writes PowerShell-syntax (`$var = ...`,
3788        // `Start-Sleep`, `Add-Content`) — those fail outright under cmd.
3789        // The earlier v0.18-era cmd-first override worked around a
3790        // PowerShell detached-output bug; that bug is fixed at the
3791        // process-flag layer (CREATE_NO_WINDOW instead of DETACHED_PROCESS,
3792        // see flag block below), so we no longer need to misroute PS
3793        // commands through cmd.
3794        let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3795        // Win32 process creation flags. We try with CREATE_BREAKAWAY_FROM_JOB
3796        // first (so the bg child outlives the AFT process when AFT is killed),
3797        // then fall back without it for environments where the parent is in a
3798        // Job Object that doesn't grant `JOB_OBJECT_LIMIT_BREAKAWAY_OK`. CI
3799        // runners (GitHub Actions windows-2022) and some MDM-managed corp
3800        // environments hit this — `CreateProcess` returns Access Denied (5).
3801        // Without breakaway, the child still runs detached but will be torn
3802        // down with the parent if the parent process group is signaled.
3803        //
3804        // We use CREATE_NO_WINDOW (no visible console window, but the
3805        // child still has a hidden console) rather than DETACHED_PROCESS
3806        // (no console at all). PowerShell-based wrappers that perform
3807        // file I/O via [System.IO.File] need a console handle to flush
3808        // stdout/stderr correctly even when redirected — under
3809        // DETACHED_PROCESS, pwsh sometimes silently exits before
3810        // executing later script statements (the Move-Item that writes
3811        // the exit marker never runs), leaving the bg task forever
3812        // marked Failed: process exited without exit marker. cmd.exe
3813        // wrappers tolerate DETACHED_PROCESS, but switching to
3814        // CREATE_NO_WINDOW costs nothing for cmd and unblocks pwsh.
3815        const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3816        const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3817        const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3818        let with_breakaway =
3819            FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3820        let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3821        let mut last_error: Option<String> = None;
3822        for (idx, shell) in candidates.iter().enumerate() {
3823            // Per-shell, try with breakaway first. If the process is in a
3824            // restrictive job, the breakaway flag triggers Access Denied
3825            // (os error 5). Retry once without breakaway.
3826            for &flags in &[with_breakaway, without_breakaway] {
3827                // Re-open capture handles per attempt; spawn() consumes them.
3828                let stdout = create_capture_file(&paths.stdout)
3829                    .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3830                let stderr = create_capture_file(&paths.stderr)
3831                    .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3832                let mut cmd =
3833                    detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3834                cmd.current_dir(workdir)
3835                    .envs(env)
3836                    .stdin(Stdio::null())
3837                    .stdout(Stdio::from(stdout))
3838                    .stderr(Stdio::from(stderr));
3839                match cmd.spawn() {
3840                    Ok(child) => {
3841                        if idx > 0 {
3842                            crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3843                             the cached PATH probe disagreed with runtime spawn — likely PATH \
3844                             inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3845                            shell.binary(),
3846                            idx);
3847                        }
3848                        if flags == without_breakaway {
3849                            crate::slog_warn!(
3850                                "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3851                             (likely a restrictive Job Object — CI sandbox or MDM policy). \
3852                             Spawned without breakaway; the bg task will be torn down if the \
3853                             AFT process group is killed."
3854                            );
3855                        }
3856                        return Ok(child);
3857                    }
3858                    Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3859                        crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3860                        shell.binary());
3861                        last_error = Some(format!("{}: {e}", shell.binary()));
3862                        // Skip the without-breakaway retry for NotFound — the
3863                        // binary itself is missing, breakaway flag is irrelevant.
3864                        break;
3865                    }
3866                    Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3867                        // Access Denied during breakaway — retry without it.
3868                        crate::slog_warn!(
3869                            "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3870                         Access Denied — retrying {} without breakaway",
3871                            shell.binary()
3872                        );
3873                        last_error = Some(format!("{}: {e}", shell.binary()));
3874                        continue;
3875                    }
3876                    Err(e) => {
3877                        return Err(format!(
3878                            "failed to spawn background bash command via {}: {e}",
3879                            shell.binary()
3880                        ));
3881                    }
3882                }
3883            }
3884        }
3885        Err(format!(
3886            "failed to spawn background bash command: no Windows shell could be spawned. \
3887             Last error: {}. PATH-probed candidates: {:?}",
3888            last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3889            candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3890        ))
3891    }
3892}
3893
3894fn random_slug() -> String {
3895    let mut bytes = [0u8; 4];
3896    // getrandom is a transitive dependency; use it directly for OS entropy.
3897    getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3898        // Extremely unlikely fallback: time + pid mix.
3899        let t = SystemTime::now()
3900            .duration_since(UNIX_EPOCH)
3901            .map(|d| d.subsec_nanos())
3902            .unwrap_or(0);
3903        let p = std::process::id();
3904        bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3905    });
3906    // `bash-` + 8 lowercase hex chars — compact, OS-entropy backed.
3907    let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3908    format!("bash-{hex}")
3909}
3910
3911#[cfg(test)]
3912mod tests {
3913    use std::collections::HashMap;
3914    use std::fs;
3915    use std::sync::atomic::{AtomicBool, AtomicUsize};
3916    use std::sync::{Arc, Mutex};
3917    use std::time::Duration;
3918    #[cfg(windows)]
3919    use std::time::Instant;
3920
3921    use super::*;
3922
3923    #[cfg(unix)]
3924    const QUICK_SUCCESS_COMMAND: &str = "true";
3925    #[cfg(windows)]
3926    const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3927
3928    #[cfg(unix)]
3929    const LONG_RUNNING_COMMAND: &str = "sleep 5";
3930    #[cfg(windows)]
3931    const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3932
3933    fn insert_terminal_piped_task(
3934        registry: &BgTaskRegistry,
3935        dir: &tempfile::TempDir,
3936        command: &str,
3937        stdout: &str,
3938        stderr: &str,
3939        compressed: bool,
3940    ) -> (String, Arc<BgTask>) {
3941        let task_id = format!("bash-test-{}", random_slug());
3942        let paths = task_paths(dir.path(), "session", &task_id);
3943        fs::create_dir_all(&paths.dir).unwrap();
3944        fs::write(&paths.stdout, stdout).unwrap();
3945        fs::write(&paths.stderr, stderr).unwrap();
3946        let mut metadata = PersistedTask::starting(
3947            task_id.clone(),
3948            "session".to_string(),
3949            command.to_string(),
3950            dir.path().to_path_buf(),
3951            Some(dir.path().to_path_buf()),
3952            Some(30_000),
3953            true,
3954            compressed,
3955        );
3956        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3957        write_task(&paths.json, &metadata).unwrap();
3958        registry
3959            .insert_rehydrated_task(metadata, paths, true)
3960            .expect("insert terminal task");
3961        let task = registry.task_for_session(&task_id, "session").unwrap();
3962        (task_id, task)
3963    }
3964
3965    fn insert_terminal_pty_task(
3966        registry: &BgTaskRegistry,
3967        dir: &tempfile::TempDir,
3968        pty_output: &str,
3969    ) -> (String, Arc<BgTask>) {
3970        let task_id = format!("bash-test-{}", random_slug());
3971        let paths = task_paths(dir.path(), "session", &task_id);
3972        fs::create_dir_all(&paths.dir).unwrap();
3973        fs::write(&paths.pty, pty_output).unwrap();
3974        let mut metadata = PersistedTask::starting(
3975            task_id.clone(),
3976            "session".to_string(),
3977            "python".to_string(),
3978            dir.path().to_path_buf(),
3979            Some(dir.path().to_path_buf()),
3980            Some(30_000),
3981            true,
3982            true,
3983        );
3984        metadata.mode = BgMode::Pty;
3985        metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3986        write_task(&paths.json, &metadata).unwrap();
3987        registry
3988            .insert_rehydrated_task(metadata, paths, true)
3989            .expect("insert terminal pty task");
3990        let task = registry.task_for_session(&task_id, "session").unwrap();
3991        (task_id, task)
3992    }
3993
3994    #[cfg(unix)]
3995    fn wait_for_terminal_snapshot(
3996        registry: &BgTaskRegistry,
3997        task_id: &str,
3998        session_id: &str,
3999        project: &Path,
4000        storage: &Path,
4001    ) -> BgTaskSnapshot {
4002        let started = Instant::now();
4003        loop {
4004            let snapshot = registry
4005                .status(task_id, session_id, Some(project), Some(storage), 4096)
4006                .expect("spawned task should be visible to status");
4007            if snapshot.info.status.is_terminal() {
4008                return snapshot;
4009            }
4010            assert!(
4011                started.elapsed() < Duration::from_secs(10),
4012                "timed out waiting for task {task_id} to finish; last status={:?}",
4013                snapshot.info.status
4014            );
4015            std::thread::sleep(Duration::from_millis(50));
4016        }
4017    }
4018
4019    #[cfg(unix)]
4020    #[test]
4021    fn multiline_pipeline_stdout_persists_all_lines_after_terminal_status() {
4022        let cases = [
4023            (
4024                "long-first",
4025                "sleep 0.5; printf 'one\\n' | cat\nprintf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4026                vec!["one", "1", "three"],
4027            ),
4028            (
4029                "short-first",
4030                "printf 'one\\n' | cat\nsleep 0.2; printf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4031                vec!["one", "1", "three"],
4032            ),
4033            (
4034                "failing-middle",
4035                "sleep 0.2; printf 'one\\n' | cat\nfalse; printf 'after-false\\n' | cat\nprintf 'three\\n' | cat",
4036                vec!["one", "after-false", "three"],
4037            ),
4038        ];
4039
4040        for (name, command, expected_lines) in cases {
4041            let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4042            let dir = tempfile::tempdir().unwrap();
4043            let session_id = format!("session-{name}");
4044            let task_id = registry
4045                .spawn(
4046                    command,
4047                    session_id.clone(),
4048                    dir.path().to_path_buf(),
4049                    HashMap::new(),
4050                    Some(Duration::from_secs(30)),
4051                    dir.path().to_path_buf(),
4052                    10,
4053                    true,
4054                    true,
4055                    Some(dir.path().to_path_buf()),
4056                )
4057                .unwrap();
4058
4059            let snapshot = wait_for_terminal_snapshot(
4060                &registry,
4061                &task_id,
4062                &session_id,
4063                dir.path(),
4064                dir.path(),
4065            );
4066            assert_eq!(
4067                snapshot.info.status,
4068                BgTaskStatus::Completed,
4069                "{name}: task should complete; snapshot={snapshot:?}"
4070            );
4071            assert_eq!(
4072                snapshot.exit_code,
4073                Some(0),
4074                "{name}: script should use the final command's exit code"
4075            );
4076
4077            let stdout_path = task_paths(dir.path(), &session_id, &task_id).stdout;
4078            let stdout = fs::read_to_string(&stdout_path).unwrap_or_else(|error| {
4079                panic!(
4080                    "{name}: failed to read raw stdout file {}: {error}",
4081                    stdout_path.display()
4082                )
4083            });
4084            let lines: Vec<&str> = stdout.lines().collect();
4085            assert_eq!(
4086                lines,
4087                expected_lines,
4088                "{name}: raw stdout file must include every newline-separated command's output; stdout_path={}",
4089                stdout_path.display()
4090            );
4091        }
4092    }
4093
4094    #[test]
4095    fn recognizes_all_recovery_marker_forms() {
4096        assert!(is_recovery_marker(
4097            "[truncated output; full output: read \"/tmp/out\"]"
4098        ));
4099        assert!(is_recovery_marker(
4100            "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
4101        ));
4102        assert!(is_recovery_marker(
4103            "[truncated output; full output unavailable]"
4104        ));
4105    }
4106
4107    #[test]
4108    fn terminal_status_polls_use_cached_render_once_and_off_lock() {
4109        let registry = BgTaskRegistry::default();
4110        let dir = tempfile::tempdir().unwrap();
4111        let (_task_id, task) = insert_terminal_piped_task(
4112            &registry,
4113            &dir,
4114            "custom-tool --verbose",
4115            &"stdout line\n".repeat(200_000),
4116            "",
4117            true,
4118        );
4119        let calls = Arc::new(AtomicUsize::new(0));
4120        let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4121        let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4122        let calls_for_closure = Arc::clone(&calls);
4123        let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4124        let task_for_closure = Arc::clone(&task_holder);
4125        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4126            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4127            if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4128                if task.state.try_lock().is_ok() {
4129                    unlocked_for_closure.store(true, Ordering::SeqCst);
4130                }
4131            }
4132            CompressionResult::new(format!("compressed {} bytes", output.len()))
4133        });
4134
4135        let first = registry
4136            .status(
4137                &task.task_id,
4138                "session",
4139                None,
4140                Some(dir.path()),
4141                RUNNING_OUTPUT_PREVIEW_BYTES,
4142            )
4143            .unwrap();
4144        let second = registry
4145            .status(
4146                &task.task_id,
4147                "session",
4148                None,
4149                Some(dir.path()),
4150                RUNNING_OUTPUT_PREVIEW_BYTES,
4151            )
4152            .unwrap();
4153        let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4154
4155        assert_eq!(
4156            calls.load(Ordering::SeqCst),
4157            1,
4158            "terminal render must be cached"
4159        );
4160        assert!(
4161            saw_unlocked_state.load(Ordering::SeqCst),
4162            "compressor must run after releasing the task state lock"
4163        );
4164        assert!(first.output_preview.starts_with("compressed "));
4165        assert_eq!(second.output_preview, first.output_preview);
4166        assert_eq!(listed[0].output_preview, first.output_preview);
4167    }
4168
4169    #[test]
4170    fn completion_preview_success_keeps_tail_only() {
4171        // Exit-aware completion previews: a SUCCESSFUL task's reminder keeps a
4172        // short tail only — head context is noise when the command worked
4173        // (regression: the uniform 4 KiB head+tail cap flooded reminders with
4174        // ~1K tokens of build noise per completed task).
4175        let registry = BgTaskRegistry::default();
4176        let dir = tempfile::tempdir().unwrap();
4177        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4178        let (_task_id, task) =
4179            insert_terminal_piped_task(&registry, &dir, "cat big.log", &output, "", false);
4180
4181        registry.post_terminal_transition(&task, true).unwrap();
4182        let completions = registry.drain_completions_for_session(Some("session"));
4183        assert_eq!(completions.len(), 1);
4184        let preview = &completions[0].output_preview;
4185        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4186        assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4187        assert!(completions[0].output_truncated);
4188    }
4189
4190    #[test]
4191    fn completion_preview_failure_keeps_head_and_tail() {
4192        // A FAILED task keeps a small head (first error / command banner) plus
4193        // a larger tail (tracebacks and summaries land at the end).
4194        let registry = BgTaskRegistry::default();
4195        let dir = tempfile::tempdir().unwrap();
4196        let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4197        let task_id = format!("bash-test-{}", random_slug());
4198        let paths = task_paths(dir.path(), "session", &task_id);
4199        fs::create_dir_all(&paths.dir).unwrap();
4200        fs::write(&paths.stdout, &output).unwrap();
4201        fs::write(&paths.stderr, "").unwrap();
4202        let mut metadata = PersistedTask::starting(
4203            task_id.clone(),
4204            "session".to_string(),
4205            "cat big.log".to_string(),
4206            dir.path().to_path_buf(),
4207            Some(dir.path().to_path_buf()),
4208            Some(30_000),
4209            true,
4210            false,
4211        );
4212        metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4213        write_task(&paths.json, &metadata).unwrap();
4214        registry
4215            .insert_rehydrated_task(metadata, paths, true)
4216            .expect("insert terminal task");
4217        let task = registry.task_for_session(&task_id, "session").unwrap();
4218
4219        registry.post_terminal_transition(&task, true).unwrap();
4220        let completions = registry.drain_completions_for_session(Some("session"));
4221        assert_eq!(completions.len(), 1);
4222        let preview = &completions[0].output_preview;
4223        assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4224        assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4225    }
4226
4227    #[test]
4228    fn has_completions_for_session_matches_pending_delivery() {
4229        let registry = BgTaskRegistry::default();
4230        assert!(!registry.has_completions_for_session(Some("session")));
4231        assert!(!registry.has_completions_for_session(None));
4232
4233        let dir = tempfile::tempdir().unwrap();
4234        let (_task_id, task) =
4235            insert_terminal_piped_task(&registry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4236        registry.post_terminal_transition(&task, true).unwrap();
4237
4238        assert!(registry.has_completions_for_session(Some("session")));
4239        assert!(registry.has_completions_for_session(None));
4240        assert!(!registry.has_completions_for_session(Some("other-session")));
4241
4242        let completions = registry.drain_completions_for_session(Some("session"));
4243        assert_eq!(completions.len(), 1);
4244        assert_eq!(completions[0].task_id, task.task_id);
4245    }
4246
4247    #[test]
4248    fn structured_gh_json_survives_intact_and_ignores_stderr() {
4249        let registry = BgTaskRegistry::default();
4250        let dir = tempfile::tempdir().unwrap();
4251        let calls = Arc::new(AtomicUsize::new(0));
4252        let calls_for_closure = Arc::clone(&calls);
4253        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4254            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4255            CompressionResult::new(output)
4256        });
4257        let (task_id, _task) = insert_terminal_piped_task(
4258            &registry,
4259            &dir,
4260            "gh pr view 123 --json body",
4261            "{\"body\":\"hello\"}",
4262            "warning: stderr must not join json",
4263            true,
4264        );
4265
4266        let snapshot = registry
4267            .status(
4268                &task_id,
4269                "session",
4270                None,
4271                Some(dir.path()),
4272                RUNNING_OUTPUT_PREVIEW_BYTES,
4273            )
4274            .unwrap();
4275
4276        assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4277        assert!(!snapshot.output_preview.contains("warning"));
4278        assert!(!snapshot.output_truncated);
4279        assert_eq!(
4280            calls.load(Ordering::SeqCst),
4281            0,
4282            "structured JSON bypasses compression"
4283        );
4284    }
4285
4286    #[test]
4287    fn registry_emits_single_recovery_marker_for_class_drops() {
4288        let registry = BgTaskRegistry::default();
4289        let dir = tempfile::tempdir().unwrap();
4290        registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4291            let mut dropped = BTreeMap::new();
4292            dropped.insert(DropClass::Error, 18);
4293            dropped.insert(DropClass::Warning, 6);
4294            CompressionResult::with_class_drops("kept diagnostic", dropped)
4295        });
4296        let (task_id, task) =
4297            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4298
4299        let snapshot = registry
4300            .status(
4301                &task_id,
4302                "session",
4303                None,
4304                Some(dir.path()),
4305                RUNNING_OUTPUT_PREVIEW_BYTES,
4306            )
4307            .unwrap();
4308
4309        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4310        assert!(snapshot.output_preview.contains("+18 more errors"));
4311        assert!(snapshot.output_preview.contains("+6 more warnings"));
4312        assert!(snapshot
4313            .output_preview
4314            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4315        assert!(!snapshot.output_preview.contains("tail -n +"));
4316        assert!(snapshot.output_truncated);
4317    }
4318
4319    #[test]
4320    fn registry_marker_reports_semantic_and_byte_drops_once() {
4321        let registry = BgTaskRegistry::default();
4322        let dir = tempfile::tempdir().unwrap();
4323        registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4324            let mut dropped = BTreeMap::new();
4325            dropped.insert(DropClass::Error, 1);
4326            CompressionResult::with_class_drops(
4327                format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4328                dropped,
4329            )
4330        });
4331        let (task_id, _task) =
4332            insert_terminal_piped_task(&registry, &dir, "custom-tool", "raw", "", true);
4333
4334        let snapshot = registry
4335            .status(
4336                &task_id,
4337                "session",
4338                None,
4339                Some(dir.path()),
4340                RUNNING_OUTPUT_PREVIEW_BYTES,
4341            )
4342            .unwrap();
4343
4344        assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4345        assert!(snapshot.output_preview.contains("+1 more error"));
4346        assert!(snapshot.output_preview.contains("truncated output"));
4347        assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4348        assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4349        assert!(!snapshot.output_preview.contains("...<truncated"));
4350        assert!(snapshot.output_truncated);
4351    }
4352
4353    #[test]
4354    fn cargo_stderr_class_drops_name_both_capture_paths() {
4355        let registry = BgTaskRegistry::default();
4356        let dir = tempfile::tempdir().unwrap();
4357        let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4358        registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4359            crate::compress::compress_with_registry_exit_code(
4360                command,
4361                &output,
4362                exit_code,
4363                &filter_registry,
4364            )
4365        });
4366        let stderr = (0..22)
4367            .map(|index| {
4368                format!(
4369                    "error: cargo failure {index}\n  --> src/lib.rs:{}:1\n   |\n{} | boom\n",
4370                    index + 1,
4371                    index + 1
4372                )
4373            })
4374            .collect::<Vec<_>>()
4375            .join("\n");
4376        let (task_id, task) = insert_terminal_piped_task(
4377            &registry,
4378            &dir,
4379            "cargo check",
4380            "Finished dev [unoptimized] target(s) in 0.01s\n",
4381            &stderr,
4382            true,
4383        );
4384
4385        let snapshot = registry
4386            .status(
4387                &task_id,
4388                "session",
4389                None,
4390                Some(dir.path()),
4391                RUNNING_OUTPUT_PREVIEW_BYTES,
4392            )
4393            .unwrap();
4394
4395        assert!(snapshot.output_preview.contains("+2 more errors"));
4396        assert!(snapshot
4397            .output_preview
4398            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4399        assert!(snapshot
4400            .output_preview
4401            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4402        assert!(!snapshot.output_preview.contains("tail -n +"));
4403    }
4404
4405    #[test]
4406    fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4407        let registry = BgTaskRegistry::default();
4408        let dir = tempfile::tempdir().unwrap();
4409        let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4410        let (task_id, task) = insert_terminal_piped_task(
4411            &registry,
4412            &dir,
4413            "cd /repo && gh pr view 123 --json body",
4414            &body,
4415            "",
4416            true,
4417        );
4418
4419        let snapshot = registry
4420            .status(
4421                &task_id,
4422                "session",
4423                None,
4424                Some(dir.path()),
4425                RUNNING_OUTPUT_PREVIEW_BYTES,
4426            )
4427            .unwrap();
4428
4429        assert!(snapshot.output_preview.starts_with("[JSON output "));
4430        assert!(snapshot
4431            .output_preview
4432            .contains(&task.paths.stdout.display().to_string()));
4433        assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4434        assert!(snapshot.output_truncated);
4435    }
4436
4437    #[test]
4438    fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4439        let registry = BgTaskRegistry::default();
4440        let dir = tempfile::tempdir().unwrap();
4441        let filter_registry = crate::compress::toml_filter::build_registry(
4442            crate::compress::builtin_filters::ALL,
4443            None,
4444            None,
4445        );
4446        registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4447            crate::compress::compress_with_registry_exit_code(
4448                command,
4449                &output,
4450                exit_code,
4451                &filter_registry,
4452            )
4453        });
4454        let stdout = format!(
4455            "make[1]: Entering directory `/tmp`\n{}",
4456            (0..100)
4457                .map(|index| format!("compile line {index}"))
4458                .collect::<Vec<_>>()
4459                .join("\n")
4460        );
4461        let (task_id, task) =
4462            insert_terminal_piped_task(&registry, &dir, "make all", &stdout, "", true);
4463
4464        let snapshot = registry
4465            .status(
4466                &task_id,
4467                "session",
4468                None,
4469                Some(dir.path()),
4470                RUNNING_OUTPUT_PREVIEW_BYTES,
4471            )
4472            .unwrap();
4473
4474        assert!(snapshot.output_preview.contains("compile line 99"));
4475        assert!(snapshot.output_preview.contains(&format!(
4476            "full output: read \"{}\"",
4477            task.paths.stdout.display()
4478        )));
4479        assert!(!snapshot
4480            .output_preview
4481            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4482        assert!(!snapshot.output_preview.contains("tail -n +"));
4483    }
4484
4485    #[test]
4486    fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4487        let registry = BgTaskRegistry::default();
4488        let dir = tempfile::tempdir().unwrap();
4489        let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4490        let (task_id, task) =
4491            insert_terminal_piped_task(&registry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4492
4493        let snapshot = registry
4494            .status(
4495                &task_id,
4496                "session",
4497                None,
4498                Some(dir.path()),
4499                RUNNING_OUTPUT_PREVIEW_BYTES,
4500            )
4501            .unwrap();
4502
4503        assert!(snapshot.output_preview.contains("RAW-HEAD"));
4504        assert!(snapshot.output_preview.contains("RAW-TAIL"));
4505        assert!(snapshot.output_preview.contains("truncated output"));
4506        assert!(snapshot
4507            .output_preview
4508            .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4509        assert!(snapshot
4510            .output_preview
4511            .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4512        assert!(!snapshot.output_preview.contains("tail -n +"));
4513        assert!(snapshot.output_preview.len() > 16 * 1024);
4514        assert!(snapshot.output_truncated);
4515    }
4516
4517    #[test]
4518    fn pty_terminal_snapshot_bypasses_line_compression() {
4519        let registry = BgTaskRegistry::default();
4520        let dir = tempfile::tempdir().unwrap();
4521        let calls = Arc::new(AtomicUsize::new(0));
4522        let calls_for_closure = Arc::clone(&calls);
4523        registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4524            calls_for_closure.fetch_add(1, Ordering::SeqCst);
4525            CompressionResult::new(output)
4526        });
4527        let (task_id, _task) = insert_terminal_pty_task(&registry, &dir, "raw\u{1b}[31m pty bytes");
4528
4529        let snapshot = registry
4530            .status(
4531                &task_id,
4532                "session",
4533                None,
4534                Some(dir.path()),
4535                RUNNING_OUTPUT_PREVIEW_BYTES,
4536            )
4537            .unwrap();
4538
4539        assert_eq!(snapshot.info.mode, BgMode::Pty);
4540        assert_eq!(snapshot.output_preview, "");
4541        assert_eq!(calls.load(Ordering::SeqCst), 0);
4542    }
4543
4544    #[test]
4545    fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4546        let registry = BgTaskRegistry::default();
4547        let dir = tempfile::tempdir().unwrap();
4548        let task_id = registry
4549            .spawn_pty(
4550                QUICK_SUCCESS_COMMAND,
4551                "session".to_string(),
4552                dir.path().to_path_buf(),
4553                HashMap::new(),
4554                Some(Duration::from_secs(30)),
4555                dir.path().to_path_buf(),
4556                10,
4557                true,
4558                false,
4559                Some(dir.path().to_path_buf()),
4560                50,
4561                120,
4562            )
4563            .unwrap();
4564
4565        let paths = task_paths(dir.path(), "session", &task_id);
4566        let metadata = read_task(&paths.json).unwrap();
4567        assert_eq!(
4568            metadata.schema_version,
4569            crate::bash_background::persistence::SCHEMA_VERSION
4570        );
4571        assert_eq!(metadata.mode, BgMode::Pty);
4572        assert_eq!(metadata.pty_rows, Some(50));
4573        assert_eq!(metadata.pty_cols, Some(120));
4574
4575        let snapshot = registry
4576            .status(&task_id, "session", None, Some(dir.path()), 1024)
4577            .unwrap();
4578        assert_eq!(snapshot.pty_rows, Some(50));
4579        assert_eq!(snapshot.pty_cols, Some(120));
4580    }
4581
4582    /// Spawn a child process that exits immediately and return it after
4583    /// it has terminated. Used by reap_child tests to simulate the
4584    /// "child exists and is dead" state when the watchdog has already
4585    /// nulled out the original child handle.
4586    fn spawn_dead_child() -> std::process::Child {
4587        #[cfg(unix)]
4588        let mut cmd = std::process::Command::new("true");
4589        #[cfg(windows)]
4590        let mut cmd = {
4591            let mut c = std::process::Command::new("cmd");
4592            c.args(["/c", "exit", "0"]);
4593            c
4594        };
4595        cmd.stdin(std::process::Stdio::null());
4596        cmd.stdout(std::process::Stdio::null());
4597        cmd.stderr(std::process::Stdio::null());
4598        let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4599        // Poll try_wait() until the child actually exits, instead of calling
4600        // wait() which closes the OS handle. On Windows, after wait()
4601        // closes the handle, subsequent try_wait() calls (which reap_child
4602        // depends on) return Err — the test was inadvertently giving
4603        // reap_child an unusable child handle. Polling try_wait() keeps the
4604        // handle open and observes natural exit, matching the production
4605        // shape where the watchdog discovers an exited child for the first
4606        // time.
4607        let started = Instant::now();
4608        loop {
4609            match child.try_wait() {
4610                Ok(Some(_)) => break,
4611                Ok(None) => {
4612                    if started.elapsed() > Duration::from_secs(5) {
4613                        panic!("dead-child stand-in did not exit within 5s");
4614                    }
4615                    std::thread::sleep(Duration::from_millis(10));
4616                }
4617                Err(error) => panic!("dead-child try_wait failed: {error}"),
4618            }
4619        }
4620        child
4621    }
4622
4623    #[test]
4624    fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4625        let registry = BgTaskRegistry::default();
4626        let dir = tempfile::tempdir().unwrap();
4627        let task_id = registry
4628            .spawn(
4629                LONG_RUNNING_COMMAND,
4630                "session".to_string(),
4631                dir.path().to_path_buf(),
4632                HashMap::new(),
4633                Some(Duration::from_secs(30)),
4634                dir.path().to_path_buf(),
4635                10,
4636                true,
4637                false,
4638                Some(dir.path().to_path_buf()),
4639            )
4640            .unwrap();
4641        registry
4642            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4643            .unwrap();
4644        assert_eq!(
4645            registry
4646                .drain_completions_for_session(Some("session"))
4647                .len(),
4648            1
4649        );
4650
4651        // Simulate the plugin consuming a sync bash_watch({ exit:true }) result
4652        // locally before the Rust completion queue is drained/acked.
4653        registry.inner.completions.lock().unwrap().clear();
4654
4655        assert_eq!(
4656            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4657            vec![task_id.clone()]
4658        );
4659        assert!(registry
4660            .drain_completions_for_session(Some("session"))
4661            .is_empty());
4662
4663        let paths = task_paths(dir.path(), "session", &task_id);
4664        let metadata = read_task(&paths.json).unwrap();
4665        assert!(metadata.completion_delivered);
4666
4667        let replayed = BgTaskRegistry::default();
4668        replayed
4669            .replay_session_inner(dir.path(), "session", None)
4670            .unwrap();
4671        assert!(replayed
4672            .drain_completions_for_session(Some("session"))
4673            .is_empty());
4674    }
4675
4676    #[test]
4677    fn register_watch_rejects_unknown_task() {
4678        let registry = BgTaskRegistry::default();
4679
4680        let result = registry.register_watch(
4681            "missing-task".to_string(),
4682            WatchPattern::Substring("READY".into()),
4683            true,
4684        );
4685
4686        assert_eq!(result, Err("task_not_found"));
4687    }
4688
4689    #[test]
4690    fn register_watch_on_terminal_task_scans_existing_output() {
4691        let frames = Arc::new(Mutex::new(Vec::new()));
4692        let captured = Arc::clone(&frames);
4693        let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4694            captured.lock().unwrap().push(frame);
4695        })
4696            as Box<dyn Fn(PushFrame) + Send + Sync>);
4697        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4698        let dir = tempfile::tempdir().unwrap();
4699        let task_id = registry
4700            .spawn(
4701                LONG_RUNNING_COMMAND,
4702                "session".to_string(),
4703                dir.path().to_path_buf(),
4704                HashMap::new(),
4705                Some(Duration::from_secs(30)),
4706                dir.path().to_path_buf(),
4707                10,
4708                true,
4709                false,
4710                Some(dir.path().to_path_buf()),
4711            )
4712            .unwrap();
4713        registry
4714            .inner
4715            .shutdown
4716            .store(true, std::sync::atomic::Ordering::SeqCst);
4717        let task = registry.task_for_session(&task_id, "session").unwrap();
4718        std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4719        registry
4720            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4721            .unwrap();
4722        frames.lock().unwrap().clear();
4723        registry.inner.completions.lock().unwrap().clear();
4724
4725        registry
4726            .register_watch(
4727                task_id.clone(),
4728                WatchPattern::Substring("READY".into()),
4729                true,
4730            )
4731            .unwrap();
4732
4733        let frames = frames.lock().unwrap();
4734        let frame = frames
4735            .iter()
4736            .find_map(|frame| match frame {
4737                PushFrame::BashPatternMatch(frame) => Some(frame),
4738                _ => None,
4739            })
4740            .expect("terminal watch registration should emit pattern frame");
4741        assert_eq!(frame.reason, "pattern_match");
4742        assert_eq!(frame.task_id, task_id);
4743        assert_eq!(frame.session_id, "session");
4744        assert_eq!(frame.match_text, "READY");
4745        assert_eq!(frame.match_offset, 0);
4746        assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4747        let metadata = read_task(&task.paths.json).unwrap();
4748        assert!(metadata.completion_delivered);
4749    }
4750
4751    #[test]
4752    fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4753        let registry = BgTaskRegistry::default();
4754        let dir = tempfile::tempdir().unwrap();
4755        let task_id = registry
4756            .spawn(
4757                QUICK_SUCCESS_COMMAND,
4758                "session".to_string(),
4759                dir.path().to_path_buf(),
4760                HashMap::new(),
4761                Some(Duration::from_secs(30)),
4762                dir.path().to_path_buf(),
4763                10,
4764                true,
4765                false,
4766                Some(dir.path().to_path_buf()),
4767            )
4768            .unwrap();
4769        registry
4770            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4771            .unwrap();
4772        let completions = registry.drain_completions_for_session(Some("session"));
4773        assert_eq!(completions.len(), 1);
4774        assert_eq!(
4775            registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4776            vec![task_id.clone()]
4777        );
4778
4779        registry.cleanup_finished(Duration::ZERO);
4780
4781        assert!(registry.inner.tasks.lock().unwrap().is_empty());
4782    }
4783
4784    #[test]
4785    fn cleanup_finished_retains_undelivered_terminals() {
4786        let registry = BgTaskRegistry::default();
4787        let dir = tempfile::tempdir().unwrap();
4788        let task_id = registry
4789            .spawn(
4790                QUICK_SUCCESS_COMMAND,
4791                "session".to_string(),
4792                dir.path().to_path_buf(),
4793                HashMap::new(),
4794                Some(Duration::from_secs(30)),
4795                dir.path().to_path_buf(),
4796                10,
4797                true,
4798                false,
4799                Some(dir.path().to_path_buf()),
4800            )
4801            .unwrap();
4802        registry
4803            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4804            .unwrap();
4805
4806        registry.cleanup_finished(Duration::ZERO);
4807
4808        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4809    }
4810
4811    /// Verify that the live watchdog path (reap_child) gives an exited
4812    /// child one watchdog pass for its exit marker to land, then marks the
4813    /// task Failed if the next pass still sees no marker.
4814    ///
4815    /// Cross-platform: uses a quick-exiting command that does NOT go
4816    /// through the wrapper script (we manually clear the exit marker
4817    /// after spawn to simulate the wrapper crashing before write).
4818    #[test]
4819    fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4820        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4821        let dir = tempfile::tempdir().unwrap();
4822        let task_id = registry
4823            .spawn(
4824                QUICK_SUCCESS_COMMAND,
4825                "session".to_string(),
4826                dir.path().to_path_buf(),
4827                HashMap::new(),
4828                Some(Duration::from_secs(30)),
4829                dir.path().to_path_buf(),
4830                10,
4831                true,
4832                false,
4833                Some(dir.path().to_path_buf()),
4834            )
4835            .unwrap();
4836
4837        let task = registry.task_for_session(&task_id, "session").unwrap();
4838
4839        // Wait for the child to actually exit and the wrapper to either
4840        // write the marker or fail. Then nuke the marker to simulate
4841        // wrapper crash before write. Poll up to 5s; this is plenty for a
4842        // `true`/`cmd /c exit 0` invocation.
4843        let started = Instant::now();
4844        loop {
4845            let exited = {
4846                let mut state = task.state.lock().unwrap();
4847                match &mut state.runtime {
4848                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4849                    _ => true,
4850                }
4851            };
4852            if exited {
4853                break;
4854            }
4855            assert!(
4856                started.elapsed() < Duration::from_secs(5),
4857                "child should exit quickly"
4858            );
4859            std::thread::sleep(Duration::from_millis(20));
4860        }
4861
4862        // Stop the watchdog so it doesn't race with our manual reap_child.
4863        // On fast Windows runners the watchdog ticks (every 500ms) can
4864        // observe the child exit and reap it before this test's assertion
4865        // fires, leaving us with state.child = None and an already-terminal
4866        // status. We specifically want to test reap_child's logic when
4867        // invoked manually on a Running-but-actually-dead task, so we need
4868        // exclusive control over the reap path here.
4869        registry
4870            .inner
4871            .shutdown
4872            .store(true, std::sync::atomic::Ordering::SeqCst);
4873        // Give the watchdog at most one tick (500ms) to notice shutdown
4874        // before we touch task state. Without this, an in-flight watchdog
4875        // iteration could still race with our state setup below.
4876        std::thread::sleep(Duration::from_millis(550));
4877
4878        // Wrapper likely wrote the marker by now; remove it to simulate
4879        // a wrapper crash that exited before persisting the exit code.
4880        let _ = std::fs::remove_file(&task.paths.exit);
4881
4882        // The watchdog may have already reaped the child handle and
4883        // marked the task terminal before we got here. Reset both so
4884        // reap_child has the "Running task whose child just exited"
4885        // shape it's designed to handle. If the original child handle is
4886        // gone, install a quick-exited stand-in so the first reap exercises
4887        // the same try_wait path as production.
4888        //
4889        // CRITICAL on Windows: the watchdog ticks fast enough that the
4890        // JSON on disk may already say `Completed`. `update_task` (called
4891        // by `reap_child`) reads from disk, applies the closure, but
4892        // ROLLS BACK if the original on-disk state was already terminal
4893        // (see persistence.rs::update_task). So we must reset BOTH
4894        // in-memory metadata AND the JSON on disk to a Running state to
4895        // give reap_child the fresh shape it expects to operate on.
4896        {
4897            let mut state = task.state.lock().unwrap();
4898            state.metadata.status = BgTaskStatus::Running;
4899            state.metadata.status_reason = None;
4900            state.metadata.exit_code = None;
4901            state.metadata.finished_at = None;
4902            state.metadata.duration_ms = None;
4903            // Persist the reset state to disk so update_task's terminal
4904            // rollback guard sees a non-terminal starting point.
4905            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4906                .expect("persist reset Running metadata for reap_child test");
4907            // If the watchdog already nulled state.child, we need to
4908            // simulate "child exists and is dead" so reap_child's
4909            // try_wait path runs. Spawn a quick-exit child as a stand-in.
4910            if matches!(state.runtime, TaskRuntime::Piped(None)) {
4911                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4912            }
4913        }
4914        // Clear the terminal_at marker too so mark_terminal_now() can fire
4915        // again inside reap_child.
4916        *task.terminal_at.lock().unwrap() = None;
4917
4918        // Sanity: task is still Running per metadata (replay/poll hasn't
4919        // observed the missing marker yet).
4920        assert!(
4921            task.is_running(),
4922            "precondition: metadata.status == Running"
4923        );
4924        assert!(
4925            !task.paths.exit.exists(),
4926            "precondition: exit marker absent"
4927        );
4928
4929        // First watchdog observation is intentionally insufficient to
4930        // declare failure. A missing marker may just mean the wrapper is
4931        // still completing its tmp-file-to-marker rename, so reap_child only
4932        // drops the child handle and switches to detached PID monitoring.
4933        registry.reap_child(&task);
4934
4935        {
4936            let state = task.state.lock().unwrap();
4937            assert_eq!(
4938                state.metadata.status,
4939                BgTaskStatus::Running,
4940                "first reap must leave status Running while waiting one pass for marker"
4941            );
4942            assert_eq!(
4943                state.metadata.status_reason, None,
4944                "first reap must not record a failure reason"
4945            );
4946            assert!(
4947                matches!(state.runtime, TaskRuntime::Piped(None)),
4948                "child handle must be released after first reap"
4949            );
4950            assert!(
4951                state.detached,
4952                "task must be marked detached after first reap"
4953            );
4954        }
4955
4956        // Second watchdog observation sees the detached PID is dead and the
4957        // marker is still absent. That is strong enough evidence that the
4958        // wrapper exited without persisting an exit code.
4959        registry.reap_child(&task);
4960
4961        let state = task.state.lock().unwrap();
4962        assert!(
4963            state.metadata.status.is_terminal(),
4964            "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
4965            state.metadata.status
4966        );
4967        assert_eq!(
4968            state.metadata.status,
4969            BgTaskStatus::Failed,
4970            "must specifically be Failed (not Killed): status={:?}",
4971            state.metadata.status
4972        );
4973        assert_eq!(
4974            state.metadata.status_reason.as_deref(),
4975            Some("process exited without exit marker"),
4976            "reason must match replay path's wording: {:?}",
4977            state.metadata.status_reason
4978        );
4979        assert!(
4980            matches!(state.runtime, TaskRuntime::Piped(None)),
4981            "child handle must stay released after second reap"
4982        );
4983        assert!(
4984            state.detached,
4985            "task must remain detached after second reap"
4986        );
4987    }
4988
4989    /// Companion to the above: when the exit marker DOES exist on disk
4990    /// at reap_child time, reap_child must NOT mark the task Failed.
4991    /// Instead it leaves status=Running and lets the next poll_task()
4992    /// cycle finalize via the marker.
4993    #[test]
4994    fn reap_child_preserves_running_when_exit_marker_exists() {
4995        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4996        let dir = tempfile::tempdir().unwrap();
4997        let task_id = registry
4998            .spawn(
4999                QUICK_SUCCESS_COMMAND,
5000                "session".to_string(),
5001                dir.path().to_path_buf(),
5002                HashMap::new(),
5003                Some(Duration::from_secs(30)),
5004                dir.path().to_path_buf(),
5005                10,
5006                true,
5007                false,
5008                Some(dir.path().to_path_buf()),
5009            )
5010            .unwrap();
5011
5012        let task = registry.task_for_session(&task_id, "session").unwrap();
5013
5014        // Wait for child to exit AND for the marker to land. Both happen
5015        // shortly after the wrapper finishes — but we want both observed.
5016        let started = Instant::now();
5017        loop {
5018            let exited = {
5019                let mut state = task.state.lock().unwrap();
5020                match &mut state.runtime {
5021                    TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5022                    _ => true,
5023                }
5024            };
5025            if exited && task.paths.exit.exists() {
5026                break;
5027            }
5028            assert!(
5029                started.elapsed() < Duration::from_secs(5),
5030                "child should exit and write marker quickly"
5031            );
5032            std::thread::sleep(Duration::from_millis(20));
5033        }
5034
5035        // Stop the watchdog so it doesn't race with our manual reap_child.
5036        // On fast Windows runners the watchdog can call poll_task (which
5037        // finalizes via marker) before this test asserts the
5038        // "marker exists, status still Running" invariant. We want
5039        // exclusive control over the reap path.
5040        registry
5041            .inner
5042            .shutdown
5043            .store(true, std::sync::atomic::Ordering::SeqCst);
5044        std::thread::sleep(Duration::from_millis(550));
5045
5046        // If the watchdog already finalized the task before we stopped it,
5047        // restore the test setup: reset status to Running and ensure the
5048        // marker file is still on disk. We're testing reap_child's
5049        // behavior when called manually with both child-exited AND
5050        // marker-present, regardless of whether the watchdog beat us.
5051        {
5052            let mut state = task.state.lock().unwrap();
5053            state.metadata.status = BgTaskStatus::Running;
5054            state.metadata.status_reason = None;
5055            if matches!(state.runtime, TaskRuntime::Piped(None)) {
5056                state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5057            }
5058        }
5059        *task.terminal_at.lock().unwrap() = None;
5060        // Make sure the marker is still on disk (poll_task removes it on
5061        // finalization). Recreate it if needed.
5062        if !task.paths.exit.exists() {
5063            std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
5064        }
5065
5066        // reap_child sees: child exited, marker exists. It should:
5067        //  - drop state.child / set state.detached = true
5068        //  - NOT change status (poll_task will finalize via marker next tick)
5069        registry.reap_child(&task);
5070
5071        let state = task.state.lock().unwrap();
5072        assert!(
5073            matches!(state.runtime, TaskRuntime::Piped(None)),
5074            "child handle still released even when marker exists"
5075        );
5076        assert!(
5077            state.detached,
5078            "task still marked detached even when marker exists"
5079        );
5080        // Status remains Running because reap_child defers to poll_task
5081        // when a marker exists. It would be wrong for reap to record the
5082        // marker outcome (poll_task does that with proper exit-code
5083        // parsing).
5084        assert_eq!(
5085            state.metadata.status,
5086            BgTaskStatus::Running,
5087            "reap_child must defer to poll_task when marker exists"
5088        );
5089    }
5090
5091    /// Read a process's `ps` state string ("Z", "S", "R", etc). Returns
5092    /// `None` once the PID has been fully reaped (no row), which is the
5093    /// post-reap state we want.
5094    #[cfg(unix)]
5095    fn pid_stat(pid: u32) -> Option<String> {
5096        let output = std::process::Command::new("ps")
5097            .args(["-o", "stat=", "-p", &pid.to_string()])
5098            .output()
5099            .ok()?;
5100        if !output.status.success() {
5101            return None;
5102        }
5103        let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
5104        if stat.is_empty() {
5105            None
5106        } else {
5107            Some(stat)
5108        }
5109    }
5110
5111    /// A `<defunct>` zombie carries `ps` state starting with 'Z'.
5112    #[cfg(unix)]
5113    fn is_zombie(pid: u32) -> bool {
5114        pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5115    }
5116
5117    /// Spawn a child that exits immediately and wait — via `ps`, NOT
5118    /// `try_wait()`/`wait()` — until it is observably a `<defunct>` zombie,
5119    /// then return the still-unreaped handle. This reproduces the exact
5120    /// state issue #91 leaves behind: an exited OS child whose parent has
5121    /// not reaped it.
5122    #[cfg(unix)]
5123    fn spawn_unreaped_zombie() -> std::process::Child {
5124        let child = std::process::Command::new("true")
5125            .stdin(std::process::Stdio::null())
5126            .stdout(std::process::Stdio::null())
5127            .stderr(std::process::Stdio::null())
5128            .spawn()
5129            .expect("spawn zombie stand-in");
5130        let pid = child.id();
5131        let started = Instant::now();
5132        while !is_zombie(pid) {
5133            assert!(
5134                started.elapsed() < Duration::from_secs(5),
5135                "stand-in child should become a zombie within 5s"
5136            );
5137            std::thread::sleep(Duration::from_millis(10));
5138        }
5139        // Return WITHOUT reaping — the handle still owns an unwaited zombie.
5140        child
5141    }
5142
5143    /// Regression test for issue #91: the exit-marker terminal path
5144    /// (`poll_task` -> `finalize_from_marker`) must REAP the direct child
5145    /// handle, not merely drop it. Dropping a `std::process::Child` does not
5146    /// `wait()` on Unix, so the exited child lingers as a `[mv] <defunct>`
5147    /// zombie until AFT exits.
5148    ///
5149    /// We install a known-unreaped zombie into the task's child slot and
5150    /// drive the marker finalize path, then assert the child is gone (reaped)
5151    /// rather than still `<defunct>`.
5152    #[cfg(unix)]
5153    #[test]
5154    fn finalize_from_marker_reaps_child_no_zombie() {
5155        use std::sync::atomic::Ordering;
5156
5157        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5158        let dir = tempfile::tempdir().unwrap();
5159        let task_id = registry
5160            .spawn(
5161                QUICK_SUCCESS_COMMAND,
5162                "session".to_string(),
5163                dir.path().to_path_buf(),
5164                HashMap::new(),
5165                Some(Duration::from_secs(30)),
5166                dir.path().to_path_buf(),
5167                10,
5168                true,
5169                false,
5170                Some(dir.path().to_path_buf()),
5171            )
5172            .unwrap();
5173
5174        // Stop the watchdog so the ONLY terminal-transition path under test
5175        // is the exit-marker finalize (not reap_child's try_wait, which would
5176        // reap the child for us and mask the bug).
5177        registry.inner.shutdown.store(true, Ordering::SeqCst);
5178        std::thread::sleep(Duration::from_millis(550));
5179
5180        let task = registry.task_for_session(&task_id, "session").unwrap();
5181
5182        // Wait for the wrapper's exit marker to land. We deliberately do NOT
5183        // call try_wait()/wait() on the real child here — doing so would reap
5184        // it and defeat the test.
5185        let started = Instant::now();
5186        while !task.paths.exit.exists() {
5187            assert!(
5188                started.elapsed() < Duration::from_secs(5),
5189                "exit marker should land quickly for `true`"
5190            );
5191            std::thread::sleep(Duration::from_millis(20));
5192        }
5193
5194        // Reset to a fresh Running shape and install a guaranteed-unreaped
5195        // zombie as the child handle, so the finalize path's reap behavior is
5196        // exercised deterministically regardless of how the real child was
5197        // handled. Persist Running so update_task's terminal-rollback guard
5198        // sees a non-terminal starting point.
5199        let zombie_pid;
5200        {
5201            let mut state = task.state.lock().unwrap();
5202            state.metadata.status = BgTaskStatus::Running;
5203            state.metadata.status_reason = None;
5204            state.metadata.exit_code = None;
5205            state.metadata.finished_at = None;
5206            state.metadata.duration_ms = None;
5207            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5208                .expect("persist reset Running metadata");
5209            let zombie = spawn_unreaped_zombie();
5210            zombie_pid = zombie.id();
5211            state.runtime = TaskRuntime::Piped(Some(zombie));
5212        }
5213        *task.terminal_at.lock().unwrap() = None;
5214
5215        // Precondition: the installed child is genuinely a `<defunct>` zombie.
5216        assert!(
5217            is_zombie(zombie_pid),
5218            "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5219        );
5220
5221        // Drive the exit-marker terminal path. Before the fix this nulled the
5222        // Child handle without wait(), leaving the zombie behind.
5223        registry.poll_task(&task).unwrap();
5224
5225        {
5226            let state = task.state.lock().unwrap();
5227            assert!(
5228                matches!(state.runtime, TaskRuntime::Piped(None)),
5229                "child handle must be released after marker finalize"
5230            );
5231            assert!(
5232                state.metadata.status.is_terminal(),
5233                "task must be terminal after marker finalize: {:?}",
5234                state.metadata.status
5235            );
5236        }
5237
5238        // The core assertion: the child must have been REAPED, not just
5239        // dropped. A reaped PID has no `ps` row (or at minimum is not 'Z').
5240        assert!(
5241            !is_zombie(zombie_pid),
5242            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5243             after the exit-marker terminal transition"
5244        );
5245    }
5246
5247    /// Companion to the above for the kill path: when a kill observes an
5248    /// already-present exit marker (the child finished on its own first), it
5249    /// must reap the child handle rather than dropping it.
5250    #[cfg(unix)]
5251    #[test]
5252    fn kill_with_existing_marker_reaps_child_no_zombie() {
5253        use std::sync::atomic::Ordering;
5254
5255        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5256        let dir = tempfile::tempdir().unwrap();
5257        let task_id = registry
5258            .spawn(
5259                QUICK_SUCCESS_COMMAND,
5260                "session".to_string(),
5261                dir.path().to_path_buf(),
5262                HashMap::new(),
5263                Some(Duration::from_secs(30)),
5264                dir.path().to_path_buf(),
5265                10,
5266                true,
5267                false,
5268                Some(dir.path().to_path_buf()),
5269            )
5270            .unwrap();
5271
5272        registry.inner.shutdown.store(true, Ordering::SeqCst);
5273        std::thread::sleep(Duration::from_millis(550));
5274
5275        let task = registry.task_for_session(&task_id, "session").unwrap();
5276
5277        let started = Instant::now();
5278        while !task.paths.exit.exists() {
5279            assert!(
5280                started.elapsed() < Duration::from_secs(5),
5281                "exit marker should land quickly for `true`"
5282            );
5283            std::thread::sleep(Duration::from_millis(20));
5284        }
5285
5286        let zombie_pid;
5287        {
5288            let mut state = task.state.lock().unwrap();
5289            state.metadata.status = BgTaskStatus::Running;
5290            state.metadata.status_reason = None;
5291            state.metadata.exit_code = None;
5292            state.metadata.finished_at = None;
5293            state.metadata.duration_ms = None;
5294            crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5295                .expect("persist reset Running metadata");
5296            let zombie = spawn_unreaped_zombie();
5297            zombie_pid = zombie.id();
5298            state.runtime = TaskRuntime::Piped(Some(zombie));
5299        }
5300        *task.terminal_at.lock().unwrap() = None;
5301
5302        assert!(
5303            is_zombie(zombie_pid),
5304            "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5305        );
5306
5307        // Kill observes the existing marker and finalizes from it.
5308        registry
5309            .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5310            .expect("kill should succeed");
5311
5312        {
5313            let state = task.state.lock().unwrap();
5314            assert!(
5315                matches!(state.runtime, TaskRuntime::Piped(None)),
5316                "child handle must be released after marker-aware kill"
5317            );
5318            assert!(state.metadata.status.is_terminal());
5319        }
5320
5321        assert!(
5322            !is_zombie(zombie_pid),
5323            "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5324             after a marker-aware kill"
5325        );
5326    }
5327
5328    #[test]
5329    fn cleanup_finished_keeps_running_tasks() {
5330        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5331        let dir = tempfile::tempdir().unwrap();
5332        let task_id = registry
5333            .spawn(
5334                LONG_RUNNING_COMMAND,
5335                "session".to_string(),
5336                dir.path().to_path_buf(),
5337                HashMap::new(),
5338                Some(Duration::from_secs(30)),
5339                dir.path().to_path_buf(),
5340                10,
5341                true,
5342                false,
5343                Some(dir.path().to_path_buf()),
5344            )
5345            .unwrap();
5346
5347        registry.cleanup_finished(Duration::ZERO);
5348
5349        assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5350        let _ = registry.kill(&task_id, "session");
5351    }
5352
5353    #[cfg(windows)]
5354    fn wait_for_file(path: &Path) -> String {
5355        let started = Instant::now();
5356        loop {
5357            if path.exists() {
5358                return fs::read_to_string(path).expect("read file");
5359            }
5360            assert!(
5361                started.elapsed() < Duration::from_secs(30),
5362                "timed out waiting for {}",
5363                path.display()
5364            );
5365            std::thread::sleep(Duration::from_millis(100));
5366        }
5367    }
5368
5369    #[cfg(windows)]
5370    fn spawn_windows_registry_command(
5371        command: &str,
5372    ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5373        let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5374        let dir = tempfile::tempdir().unwrap();
5375        let task_id = registry
5376            .spawn(
5377                command,
5378                "session".to_string(),
5379                dir.path().to_path_buf(),
5380                HashMap::new(),
5381                Some(Duration::from_secs(30)),
5382                dir.path().to_path_buf(),
5383                10,
5384                false,
5385                false,
5386                Some(dir.path().to_path_buf()),
5387            )
5388            .unwrap();
5389        (registry, dir, task_id)
5390    }
5391
5392    #[cfg(windows)]
5393    #[test]
5394    fn windows_spawn_writes_exit_marker_for_zero_exit() {
5395        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5396        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5397
5398        let content = wait_for_file(&exit_path);
5399
5400        assert_eq!(content.trim(), "0");
5401    }
5402
5403    #[cfg(windows)]
5404    #[test]
5405    fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5406        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5407        let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5408
5409        let content = wait_for_file(&exit_path);
5410
5411        assert_eq!(content.trim(), "42");
5412    }
5413
5414    #[cfg(windows)]
5415    #[test]
5416    fn windows_spawn_captures_stdout_to_disk() {
5417        let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5418        let task = registry.task_for_session(&task_id, "session").unwrap();
5419        let stdout_path = task.paths.stdout.clone();
5420        let exit_path = task.paths.exit.clone();
5421
5422        let _ = wait_for_file(&exit_path);
5423        let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5424
5425        assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5426    }
5427
5428    #[cfg(windows)]
5429    #[test]
5430    fn windows_spawn_uses_pwsh_when_available() {
5431        // Without $SHELL set, $SHELL probe yields None and pwsh wins.
5432        // (We intentionally pass None for shell_env to keep this test
5433        // independent of the runner's actual env.)
5434        let candidates = crate::windows_shell::shell_candidates_with(
5435            |binary| match binary {
5436                "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5437                "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5438                _ => None,
5439            },
5440            || None,
5441        );
5442        let shell = candidates.first().expect("at least one candidate").clone();
5443        assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5444        assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5445    }
5446
5447    /// Issue #27 Oracle review P1, updated: cmd wrapper writes a `.bat` file
5448    /// that batch-evaluates `%ERRORLEVEL%` on its own line (line-by-line
5449    /// evaluation is the default for batch files; parse-time expansion only
5450    /// applies to compound `&`-chained inline commands). Capturing
5451    /// `%ERRORLEVEL%` into `set CODE=%ERRORLEVEL%` immediately after the user
5452    /// command runs records the real run-time exit code.
5453    #[cfg(windows)]
5454    #[test]
5455    fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5456        let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5457        let script =
5458            crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5459
5460        // Batch wrapper: capture exit code into CODE on the line after the
5461        // user command, then write CODE to a temp marker file before
5462        // atomic-renaming it into place.
5463        assert!(
5464            script.contains("set CODE=%ERRORLEVEL%"),
5465            "wrapper must capture exit code into CODE: {script}"
5466        );
5467        assert!(
5468            script.contains("echo %CODE% >"),
5469            "wrapper must echo CODE to a temp marker file: {script}"
5470        );
5471        assert!(
5472            script.contains("move /Y"),
5473            "wrapper must use atomic move to write the marker: {script}"
5474        );
5475        // move output must be redirected to nul to avoid polluting the
5476        // user's captured stdout with "1 file(s) moved." lines.
5477        assert!(
5478            script.contains("> nul"),
5479            "wrapper must redirect move output to nul: {script}"
5480        );
5481        // exit /B %CODE% propagates the real exit code so wait() sees it.
5482        assert!(
5483            script.contains("exit /B %CODE%"),
5484            "wrapper must propagate the captured exit code: {script}"
5485        );
5486        assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5487        assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5488    }
5489
5490    /// `bg_command()` for Cmd no longer needs `/V:ON` — the wrapper is now
5491    /// written to a `.bat` file where batch-line evaluation captures
5492    /// `%ERRORLEVEL%` correctly without delayed expansion. We still need
5493    /// `/D` (skip AutoRun) and `/S` (simple quote-stripping for paths with
5494    /// internal `"`-quoting from `cmd_quote`).
5495    #[cfg(windows)]
5496    #[test]
5497    fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5498        use crate::windows_shell::WindowsShell;
5499        let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5500        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5501        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5502        assert_eq!(
5503            args_strs,
5504            vec!["/D", "/S", "/C", "echo wrapped"],
5505            "Cmd::bg_command must prepend /D /S /C"
5506        );
5507    }
5508
5509    /// PowerShell variants don't need `/V:ON`-style flags; their
5510    /// `bg_command()` args stay on the standard `-Command` path.
5511    #[cfg(windows)]
5512    #[test]
5513    fn windows_shell_pwsh_bg_command_uses_standard_args() {
5514        use crate::windows_shell::WindowsShell;
5515        let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5516        let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5517        let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5518        assert!(
5519            args_strs.contains(&"-Command"),
5520            "Pwsh::bg_command must use -Command: {args_strs:?}"
5521        );
5522        assert!(
5523            args_strs.contains(&"Get-Date"),
5524            "Pwsh::bg_command must include the user command body"
5525        );
5526    }
5527}